Skip to main content
This release is 22 versions behind 1.4.1 — the latest version of @fedify/fedify. Jump to latest

Built and signed on GitHub Actions

An ActivityPub/fediverse server framework

This package works with Node.js, Deno, Bun
This package works with Node.js
This package works with Deno
This package works with Bun
JSR Score
94%
Published
3 months ago (1.2.1)
// deno-lint-ignore-file no-explicit-any /** * Additional options for enqueuing a message in a queue. * * @since 0.5.0 */ export interface MessageQueueEnqueueOptions { /** * The delay before the message is enqueued. No delay by default. * * It must not be negative. */ delay?: Temporal.Duration; } /** * Additional options for listening to a message queue. * * @since 1.0.0 */ export interface MessageQueueListenOptions { /** * The signal to abort listening to the message queue. */ signal?: AbortSignal; } /** * An abstract interface for a message queue. * * @since 0.5.0 */ export interface MessageQueue { /** * Enqueues a message in the queue. * @param message The message to enqueue. * @param options Additional options for enqueuing the message. */ enqueue(message: any, options?: MessageQueueEnqueueOptions): Promise<void>; /** * Listens for messages in the queue. * @param handler The handler for messages in the queue. * @param options Additional options for listening to the message queue. * @returns A promise that resolves when the listening is done. It never * rejects, and is resolved when the signal is aborted. If no * signal is provided, it never resolves. */ listen( handler: (message: any) => Promise<void> | void, options?: MessageQueueListenOptions, ): Promise<void>; } /** * Additional options for {@link InProcessMessageQueue}. * @since 1.0.0 */ export interface InProcessMessageQueueOptions { /** * The interval to poll for messages in the queue. 5 seconds by default. * @default `{ seconds: 5 }` */ pollInterval?: Temporal.Duration | Temporal.DurationLike; } /** * A message queue that processes messages in the same process. * Do not use this in production as it does neither persist messages nor * distribute them across multiple processes. * * @since 0.5.0 */ export class InProcessMessageQueue implements MessageQueue { #messages: any[]; #monitors: Record<ReturnType<typeof crypto.randomUUID>, () => void>; #pollIntervalMs: number; /** * Constructs a new {@link InProcessMessageQueue} with the given options. * @param options Additional options for the in-process message queue. */ constructor(options: InProcessMessageQueueOptions = {}) { this.#messages = []; this.#monitors = {}; this.#pollIntervalMs = Temporal.Duration.from( options.pollInterval ?? { seconds: 5 }, ).total("millisecond"); } enqueue(message: any, options?: MessageQueueEnqueueOptions): Promise<void> { const delay = options?.delay == null ? 0 : Math.max(options.delay.total("millisecond"), 0); if (delay > 0) { setTimeout( () => this.enqueue(message, { ...options, delay: undefined }), delay, ); return Promise.resolve(); } this.#messages.push(message); for (const monitorId in this.#monitors) { this.#monitors[monitorId as ReturnType<typeof crypto.randomUUID>](); } return Promise.resolve(); } async listen( handler: (message: any) => Promise<void> | void, options: MessageQueueListenOptions = {}, ): Promise<void> { const signal = options.signal; while (signal == null || !signal.aborted) { while (this.#messages.length > 0) { const message = this.#messages.shift(); await handler(message); } await this.#wait(this.#pollIntervalMs, signal); } } #wait(ms: number, signal?: AbortSignal): Promise<void> { let timer: ReturnType<typeof setTimeout> | null = null; return Promise.any([ new Promise<void>((resolve) => { signal?.addEventListener("abort", () => { if (timer != null) clearTimeout(timer); resolve(); }, { once: true }); const monitorId = crypto.randomUUID(); this.#monitors[monitorId] = () => { delete this.#monitors[monitorId]; if (timer != null) clearTimeout(timer); resolve(); }; }), new Promise<void>((resolve) => timer = setTimeout(resolve, ms)), ]); } } type Uuid = ReturnType<typeof crypto.randomUUID>; /** * A message queue that processes messages in parallel. It takes another * {@link MessageQueue}, and processes messages in parallel up to a certain * number of workers. * * Actually, it's rather a decorator than a queue itself. * * Note that the workers do not run in truly parallel, in the sense that they * are not running in separate threads or processes. They are running in the * same process, but are scheduled to run in parallel. Hence, this is useful * for I/O-bound tasks, but not for CPU-bound tasks, which is okay for Fedify's * workloads. * * @since 1.0.0 */ export class ParallelMessageQueue implements MessageQueue { readonly queue: MessageQueue; readonly workers: number; /** * Constructs a new {@link ParallelMessageQueue} with the given queue and * number of workers. * @param queue The message queue to use under the hood. Note that * {@link ParallelMessageQueue} cannot be nested. * @param workers The number of workers to process messages in parallel. * @throws {TypeError} If the given queue is an instance of * {@link ParallelMessageQueue}. */ constructor(queue: MessageQueue, workers: number) { if (queue instanceof ParallelMessageQueue) { throw new TypeError("Cannot nest ParallelMessageQueue."); } this.queue = queue; this.workers = workers; } enqueue(message: any, options?: MessageQueueEnqueueOptions): Promise<void> { return this.queue.enqueue(message, options); } listen( handler: (message: any) => Promise<void> | void, options: MessageQueueListenOptions = {}, ): Promise<void> { const workers = new Map<Uuid, Promise<Uuid>>(); return this.queue.listen(async (message) => { while (workers.size >= this.workers) { const consumedId = await Promise.any(workers.values()); workers.delete(consumedId); } const workerId = crypto.randomUUID(); const promise = this.#work(workerId, handler, message); workers.set(workerId, promise); }, options); } async #work( workerId: Uuid, handler: (message: any) => Promise<void> | void, message: any, ): Promise<Uuid> { await this.#sleep(0); await handler(message); return workerId; } #sleep(ms: number): Promise<void> { return new Promise((resolve) => setTimeout(resolve, ms)); } }