Skip to main content

Built and signed on GitHub Actions

Fast and small web worker pool

This package works with Deno, Bun, BrowsersIt is unknown whether this package works with Cloudflare Workers
It is unknown whether this package works with Cloudflare Workers
This package works with Deno
This package works with Bun
This package works with Browsers
JSR Score
100%
Published
5 months ago (0.4.31)
class AbstractPool
implements IPool<Worker, Data, Response>

Base class that implements some shared logic for all poolifier pools.

Constructors

new
AbstractPool(
minimumNumberOfWorkers: number,
fileURL: URL,
maximumNumberOfWorkers?: number,
)

Constructs a new poolifier pool.

Type Parameters

Worker extends IWorker
  • Type of worker which manages this pool.
Data = unknown
  • Type of data sent to the worker. This can only be structured-cloneable data.
Response = unknown
  • Type of execution response. This can only be structured-cloneable data.

Properties

readonly
info: PoolInfo
protected
abstract
readonly
backPressure: boolean

Whether the pool is back pressured or not.

protected
abstract
readonly
busy: boolean

Whether the pool is busy or not.

The task execution response promise map:

  • key: The message id of each submitted task.
  • value: An object that contains task's worker node key, execution response promise resolve and reject callbacks.

When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.

protected
abstract
readonly
type: PoolType

The pool type.

If it is 'dynamic', it provides the max property.

protected
abstract
readonly
worker: WorkerType

The worker type.

Worker choice strategies context referencing worker choice algorithms implementation.

protected
readonly
workerMessageListener: (message: MessageValue<Response>) => void

This method is the message listener registered on each worker.

Whether the pool back pressure event has been emitted or not.

Whether the pool busy event has been emitted or not.

Whether the pool is destroying or not.

private
readonly
getTaskFunctionWorkerChoiceStrategy: (name?: string) => WorkerChoiceStrategy | undefined

Gets task function worker choice strategy, if any.

Gets the worker choice strategies registered in this pool.

private
readonly
getWorkerNodeTaskFunctionPriority: (
workerNodeKey: number,
name?: string,
) => number | undefined

Gets worker node task function priority, if any.

private
readonly
getWorkerNodeTaskFunctionWorkerChoiceStrategy: (
workerNodeKey: number,
name?: string,
) => WorkerChoiceStrategy | undefined

Gets worker node task function worker choice strategy, if any.

private
readonly
handleWorkerNodeIdleEvent: (
previousStolenTask?: Task<Data>,
) => void
private
readonly
isStealingRatioReached: () => boolean
private
readonly
ready: boolean

Whether the pool is ready or not.

Whether the pool ready event has been emitted or not.

private
abstract
startTimestamp: number

The start timestamp of the pool.

Whether the pool is started or not.

Whether the pool is starting or not.

Whether the minimum number of workers is starting or not.

private
readonly
stealTask: (
sourceWorkerNode: IWorkerNode<Worker, Data>,
destinationWorkerNodeKey: number,
) => Task<Data> | undefined

The task functions added at runtime map:

  • key: The task function name.
  • value: The task function object.
private
readonly
utilization: number

The approximate pool utilization.

private
readonly
workerNodeStealTask: (workerNodeKey: number) => Task<Data> | undefined

Methods

private
addWorkerNode(workerNode: IWorkerNode<Worker, Data>): number

Adds the given worker node in the pool worker nodes.

protected
afterTaskExecutionHook(
workerNodeKey: number,
): void

Hook executed after the worker task execution. Can be overridden.

protected
afterWorkerNodeSetup(workerNodeKey: number): void

Method hooked up after a worker node has been newly created. Can be overridden.

protected
beforeTaskExecutionHook(
workerNodeKey: number,
task: Task<Data>,
): void

Hook executed before the worker task execution. Can be overridden.

private
buildTasksQueueOptions(tasksQueueOptions: TasksQueueOptions | undefined): TasksQueueOptions

Emits dynamic worker creation events.

Emits dynamic worker destruction events.

private
checkAndEmitReadyEvent(): void

Checks if the worker id sent in the received message from a worker is valid.

private
checkMinimumNumberOfWorkers(minimumNumberOfWorkers: number | undefined): void
private
checkPoolOptions(opts: PoolOptions): void
private
checkPoolType(): void
private
checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions: WorkerChoiceStrategyOptions | undefined): void

Chooses a worker node for the next task.

Creates a new, completely set up dynamic worker node.

Creates a new, completely set up worker node.

Creates a worker node.

private
dequeueTask(workerNodeKey: number): Task<Data> | undefined
protected
abstract
deregisterWorkerMessageListener<Message extends Data | Response>(
workerNodeKey: number,
listener: (message: MessageValue<Message>) => void,
): void

Deregisters a listener callback on the worker given its worker node key.

protected
destroyWorkerNode(workerNodeKey: number): Promise<void>

Terminates the worker node given its worker node key.

enableTasksQueue(
enable: boolean,
tasksQueueOptions?: TasksQueueOptions,
): void
private
enqueueTask(
workerNodeKey: number,
task: Task<Data>,
): number
execute(
data?: Data,
name?: string,
transferList?: readonly Transferable[],
): Promise<Response>
private
executeTask(
workerNodeKey: number,
task: Task<Data>,
): void

Executes the given task on the worker given its worker node key.

protected
flagWorkerNodeAsNotReady(workerNodeKey: number): void
protected
flushTasksQueue(workerNodeKey: number): number
private
flushTasksQueues(): void
protected
getWorkerInfo(workerNodeKey: number): WorkerInfo | undefined

Gets the worker information given its worker node key.

private
getWorkerNodeKeyByWorkerId(workerId: string | undefined): number

Gets the worker node key given its worker id.

private
handleTask(
workerNodeKey: number,
task: Task<Data>,
): void
private
initEventTarget(): void
private
initWorkerNodeUsage(workerNode: IWorkerNode<Worker, Data>): void

Initializes the worker node usage with sensible default values gathered during runtime.

Whether the worker nodes are back pressured or not.

protected
internalBusy(): boolean

Whether worker nodes are executing concurrently their tasks quota or not.

internalExecute(
data?: Data,
name?: string,
transferList?: Transferable[],
): Promise<Response>
protected
abstract
isMain(): boolean

Returns whether the worker is the main worker or not.

private
isWorkerNodeBackPressured(workerNodeKey: number): boolean
private
isWorkerNodeBusy(workerNodeKey: number): boolean
private
isWorkerNodeIdle(workerNodeKey: number): boolean
private
isWorkerNodeStealing(workerNodeKey: number): boolean
mapExecute(
data: Iterable<Data>,
name?: string,
transferList?: readonly Transferable[],
): Promise<Response[]>
private
redistributeQueuedTasks(sourceWorkerNodeKey: number): void
protected
abstract
registerOnceWorkerMessageListener<Message extends Data | Response>(
workerNodeKey: number,
listener: (message: MessageValue<Message>) => void,
): void

Registers once a listener callback on the worker given its worker node key.

protected
abstract
registerWorkerMessageListener<Message extends Data | Response>(
workerNodeKey: number,
listener: (message: MessageValue<Message>) => void,
): void

Registers a listener callback on the worker given its worker node key.

protected
removeWorkerNode(workerNode: IWorkerNode<Worker, Data>): void

Removes the worker node from the pool worker nodes.

private
resetTaskSequentiallyStolenStatisticsWorkerUsage(
workerNodeKey: number,
taskName?: string,
): void
private
sendKillMessageToWorker(workerNodeKey: number): Promise<void>
protected
abstract
sendStartupMessageToWorker(workerNodeKey: number): void

Sends the startup message to worker given its worker node key.

private
sendStatisticsMessageToWorker(workerNodeKey: number): void

Sends the statistics message to worker given its worker node key.

protected
abstract
sendToWorker(
workerNodeKey: number,
message: MessageValue<Data>,
transferList?: readonly Transferable[],
): void

Sends a message to worker given its worker node key.

private
setTaskStealing(): void
setTasksQueueOptions(tasksQueueOptions: TasksQueueOptions | undefined): void
private
setTasksQueuePriority(workerNodeKey: number): void
private
setTasksQueueSize(size: number): void
setWorkerChoiceStrategy(
workerChoiceStrategy: WorkerChoiceStrategy,
workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions,
): void
setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions: WorkerChoiceStrategyOptions | undefined): boolean
protected
setupHook(): void

Setup hook to execute code before worker nodes are created in the abstract constructor. Can be overridden.

protected
abstract
shallCreateDynamicWorker(): boolean

Conditions for dynamic worker creation.

private
shallExecuteTask(workerNodeKey: number): boolean

Whether the worker node shall update its task function worker usage or not.

start(): void
private
startMinimumNumberOfWorkers(initWorkerNodeUsage?: boolean): void

Starts the minimum number of workers.

private
tasksQueueSize(workerNodeKey: number): number
private
unsetTaskStealing(): void
private
updateTaskSequentiallyStolenStatisticsWorkerUsage(
workerNodeKey: number,
taskName?: string,
previousTaskName?: string,
): void
private
updateTaskStolenStatisticsWorkerUsage(
workerNodeKey: number,
taskName: string,
): void

Add Package

deno add jsr:@poolifier/poolifier-web-worker

Import symbol

import { AbstractPool } from "@poolifier/poolifier-web-worker";

---- OR ----

Import directly with a jsr specifier

import { AbstractPool } from "jsr:@poolifier/poolifier-web-worker";

Add Package

bunx jsr add @poolifier/poolifier-web-worker

Import symbol

import { AbstractPool } from "@poolifier/poolifier-web-worker";