This release is 4 versions behind 0.4.0 — the latest version of @upyo/pool. Jump to latest
Built and signed on GitHub ActionsBuilt and signed on GitHub Actions
Built and signed on GitHub Actions
Works with
•JSR Score100%•This package works with Cloudflare Workers, Node.js, Deno, Bun, Browsers




Downloads1/wk
•Published2 months ago (0.3.1)
Pool transport for Upyo email library—provides load balancing and failover for multiple email providers
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289import type { Message, Receipt, Transport, TransportOptions } from "jsr:@upyo/core@^0.3.1"; import { createPoolConfig, type PoolConfig, type PoolStrategy, type ResolvedPoolConfig, } from "./config.ts"; import type { Strategy } from "./strategies/strategy.ts"; import { RoundRobinStrategy } from "./strategies/round-robin-strategy.ts"; import { WeightedStrategy } from "./strategies/weighted-strategy.ts"; import { PriorityStrategy } from "./strategies/priority-strategy.ts"; import { SelectorStrategy } from "./strategies/selector-strategy.ts"; /** * Pool transport that combines multiple transports with various load balancing * and failover strategies. * * This transport implements the same `Transport` interface, making it a drop-in * replacement for any single transport. It distributes messages across multiple * underlying transports based on the configured strategy. * * @example Round-robin load balancing * ```typescript * import { PoolTransport } from "@upyo/pool"; * * const transport = new PoolTransport({ * strategy: "round-robin", * transports: [ * { transport: mailgunTransport }, * { transport: sendgridTransport }, * { transport: sesTransport }, * ], * }); * ``` * * @example Priority-based failover * ```typescript * const transport = new PoolTransport({ * strategy: "priority", * transports: [ * { transport: primaryTransport, priority: 100 }, * { transport: backupTransport, priority: 50 }, * { transport: lastResortTransport, priority: 10 }, * ], * }); * ``` * * @example Custom routing with selectors * ```typescript * const transport = new PoolTransport({ * strategy: "selector-based", * transports: [ * { * transport: bulkEmailTransport, * selector: (msg) => msg.tags?.includes("newsletter"), * }, * { * transport: transactionalTransport, * selector: (msg) => msg.priority === "high", * }, * { transport: defaultTransport }, // Catches everything else * ], * }); * ``` * * @since 0.3.0 */ export class PoolTransport implements Transport, AsyncDisposable { /** * The resolved configuration used by this pool transport. */ readonly config: ResolvedPoolConfig; private readonly strategy: Strategy; /** * Creates a new PoolTransport instance. * * @param config Configuration options for the pool transport. * @throws {Error} If the configuration is invalid. */ constructor(config: PoolConfig) { this.config = createPoolConfig(config); this.strategy = this.createStrategy(this.config.strategy); } /** * Sends a single email message using the pool strategy. * * The transport is selected based on the configured strategy. If the * selected transport fails, the pool will retry with other transports * up to the configured retry limit. * * @param message The email message to send. * @param options Optional transport options including abort signal. * @returns A promise that resolves to a receipt indicating success or failure. */ async send(message: Message, options?: TransportOptions): Promise<Receipt> { const attemptedIndices = new Set<number>(); const errors: string[] = []; // Track errors from all attempts // let lastReceipt: Receipt | null = null; // Removed as it was unused for (let attempt = 0; attempt < this.config.maxRetries; attempt++) { // Check for cancellation if (options?.signal?.aborted) { throw new DOMException("The operation was aborted.", "AbortError"); } // Select a transport const selection = this.strategy.select( message, this.config.transports, attemptedIndices, ); if (!selection) { // No more transports available break; } attemptedIndices.add(selection.index); try { // Apply timeout if configured const sendOptions = this.createSendOptions(options); // Send the message const receipt = await selection.entry.transport.send( message, sendOptions, ); // Track receipt for potential error collection // lastReceipt = receipt; // Removed as lastReceipt was removed if (receipt.successful) { return receipt; } // Collect error messages for failed attempts errors.push(...receipt.errorMessages); } catch (error) { // Handle transport errors if (error instanceof DOMException && error.name === "AbortError") { throw error; } const errorMessage = error instanceof Error ? error.message : String(error); errors.push(errorMessage); } } // All attempts failed return { successful: false, errorMessages: errors.length > 0 ? errors : ["All transports failed to send the message"], }; } /** * Sends multiple email messages using the pool strategy. * * Each message is sent individually using the `send` method, respecting * the configured strategy and retry logic. * * @param messages An iterable or async iterable of messages to send. * @param options Optional transport options including abort signal. * @returns An async iterable of receipts, one for each message. */ async *sendMany( messages: Iterable<Message> | AsyncIterable<Message>, options?: TransportOptions, ): AsyncIterable<Receipt> { // Reset strategy state for batch operations this.strategy.reset(); for await (const message of messages) { if (options?.signal?.aborted) { throw new DOMException("The operation was aborted.", "AbortError"); } yield await this.send(message, options); } } /** * Disposes of all underlying transports that support disposal. * * This method is called automatically when using the `await using` syntax. * It ensures proper cleanup of resources held by the underlying transports. */ async [Symbol.asyncDispose](): Promise<void> { const disposalPromises: Promise<void>[] = []; for (const entry of this.config.transports) { const transport = entry.transport; // Check for AsyncDisposable if ( typeof (transport as unknown as AsyncDisposable)[ Symbol.asyncDispose ] === "function" ) { const asyncDispose = (transport as unknown as AsyncDisposable) [Symbol.asyncDispose](); disposalPromises.push(Promise.resolve(asyncDispose)); } // Check for Disposable else if ( typeof (transport as unknown as Disposable)[Symbol.dispose] === "function" ) { try { (transport as unknown as Disposable)[Symbol.dispose](); } catch { // Ignore disposal errors } } } // Wait for all async disposals to complete await Promise.allSettled(disposalPromises); } /** * Creates a strategy instance based on the strategy type or returns the provided strategy. */ private createStrategy(strategy: PoolStrategy | Strategy): Strategy { // If it's already a Strategy instance, return it directly if (typeof strategy === "object" && strategy !== null) { return strategy; } // Handle built-in strategy names switch (strategy) { case "round-robin": return new RoundRobinStrategy(); case "weighted": return new WeightedStrategy(); case "priority": return new PriorityStrategy(); case "selector-based": return new SelectorStrategy(); default: throw new Error(`Unknown strategy: ${strategy}`); } } /** * Creates send options with timeout if configured. */ private createSendOptions( options?: TransportOptions, ): TransportOptions | undefined { if (!this.config.timeout) { return options; } // Create AbortController for timeout const controller = new AbortController(); const timeoutId = setTimeout( () => controller.abort(), this.config.timeout, ); // Combine with existing signal if present if (options?.signal) { options.signal.addEventListener("abort", () => { clearTimeout(timeoutId); controller.abort(); }); } // Clean up timeout when done controller.signal.addEventListener("abort", () => { clearTimeout(timeoutId); }); return { ...options, signal: controller.signal, }; } }