Base class that implements some shared logic for all poolifier pools.
Worker extends IWorker
- Type of worker which manages this pool.
eventTarget: EventTarget
workerNodes: IWorkerNode<Worker, Data>[]
backPressure: boolean
Whether the pool is back pressured or not.
promiseResponseMap: Map<`${string}-${string}-${string}-${string}-${string}`, PromiseResponseWrapper<Response>>
The task execution response promise map:
key
: The message id of each submitted task.value
: An object that contains task's worker node key, execution response promise resolve and reject callbacks.
When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
The pool type.
If it is 'dynamic'
, it provides the max
property.
worker: WorkerType
The worker type.
workerChoiceStrategiesContext: WorkerChoiceStrategiesContext<Worker, Data, Response>
Worker choice strategies context referencing worker choice algorithms implementation.
workerMessageListener: (message: MessageValue<Response>) => void
This method is the message listener registered on each worker.
backPressureEventEmitted: boolean
Whether the pool back pressure event has been emitted or not.
busyEventEmitted: boolean
Whether the pool busy event has been emitted or not.
destroying: boolean
Whether the pool is destroying or not.
getTaskFunctionWorkerChoiceStrategy: (name?: string) => WorkerChoiceStrategy | undefined
Gets task function worker choice strategy, if any.
getWorkerChoiceStrategies: () => Set<WorkerChoiceStrategy>
Gets the worker choice strategies registered in this pool.
getWorkerNodeTaskFunctionPriority: () => number | undefined
Gets worker node task function priority, if any.
getWorkerNodeTaskFunctionWorkerChoiceStrategy: () => WorkerChoiceStrategy | undefined
Gets worker node task function worker choice strategy, if any.
handleWorkerNodeBackPressureEvent: (event: CustomEvent<WorkerNodeEventDetail>) => void
handleWorkerNodeIdleEvent: (event: CustomEvent<WorkerNodeEventDetail>,) => void
isStealingRatioReached: () => boolean
readyEventEmitted: boolean
Whether the pool ready event has been emitted or not.
startTimestamp: number
The start timestamp of the pool.
startingMinimumNumberOfWorkers: boolean
Whether the minimum number of workers is starting or not.
taskFunctions: Map<string, TaskFunctionObject<Data, Response>>
The task functions added at runtime map:
key
: The task function name.value
: The task function object.
utilization: number
The approximate pool utilization.
workerNodeStealTask: (workerNodeKey: number) => Task<Data> | undefined
addTaskFunction(name: string,fn: TaskFunction<Data, Response> | TaskFunctionObject<Data, Response>,): Promise<boolean>
addWorkerNode(workerNode: IWorkerNode<Worker, Data>): number
Adds the given worker node in the pool worker nodes.
afterTaskExecutionHook(workerNodeKey: number,message: MessageValue<Response>,): void
Hook executed after the worker task execution. Can be overridden.
afterWorkerNodeSetup(workerNodeKey: number): void
Method hooked up after a worker node has been newly created. Can be overridden.
beforeTaskExecutionHook(): void
Hook executed before the worker task execution. Can be overridden.
buildTasksQueueOptions(tasksQueueOptions: TasksQueueOptions | undefined): TasksQueueOptions
cannotStealTask(): boolean
checkAndEmitDynamicWorkerCreationEvents(): void
Emits dynamic worker creation events.
checkAndEmitDynamicWorkerDestructionEvents(): void
Emits dynamic worker destruction events.
checkAndEmitReadyEvent(): void
checkAndEmitTaskDequeuingEvents(): void
checkAndEmitTaskExecutionEvents(): void
checkAndEmitTaskExecutionFinishedEvents(): void
checkAndEmitTaskQueuingEvents(): void
checkMessageWorkerId(message: MessageValue<Data | Response>): void
Checks if the worker id sent in the received message from a worker is valid.
checkMinimumNumberOfWorkers(minimumNumberOfWorkers: number | undefined): void
checkPoolOptions(opts: PoolOptions): void
checkPoolType(): void
checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions: WorkerChoiceStrategyOptions | undefined): void
chooseWorkerNode(name?: string): number
Chooses a worker node for the next task.
createAndSetupDynamicWorkerNode(): number
Creates a new, completely set up dynamic worker node.
createAndSetupWorkerNode(): number
Creates a new, completely set up worker node.
createWorkerNode(): IWorkerNode<Worker, Data>
Creates a worker node.
dequeueTask(workerNodeKey: number): Task<Data> | undefined
deregisterWorkerMessageListener<Message extends Data | Response>(workerNodeKey: number,listener: (message: MessageValue<Message>) => void,): void
Deregisters a listener callback on the worker given its worker node key.
destroyWorkerNode(workerNodeKey: number): Promise<void>
Terminates the worker node given its worker node key.
enableTasksQueue(enable: boolean,tasksQueueOptions?: TasksQueueOptions,): void
enqueueTask(): number
executeTask(): void
Executes the given task on the worker given its worker node key.
flagWorkerNodeAsNotReady(workerNodeKey: number): void
flushTasksQueue(workerNodeKey: number): number
flushTasksQueues(): void
getTasksQueuePriority(): boolean
getWorkerInfo(workerNodeKey: number): WorkerInfo | undefined
Gets the worker information given its worker node key.
getWorkerNodeKeyByWorkerId(workerId: string | undefined): number
Gets the worker node key given its worker id.
handleTask(): void
handleTaskExecutionResponse(message: MessageValue<Response>): void
handleWorkerReadyResponse(message: MessageValue<Response>): void
hasTaskFunction(name: string): boolean
initEventTarget(): void
initWorkerNodeUsage(workerNode: IWorkerNode<Worker, Data>): void
Initializes the worker node usage with sensible default values gathered during runtime.
internalBackPressure(): boolean
Whether the worker nodes are back pressured or not.
internalBusy(): boolean
Whether worker nodes are executing concurrently their tasks quota or not.
isWorkerNodeBackPressured(workerNodeKey: number): boolean
isWorkerNodeBusy(workerNodeKey: number): boolean
isWorkerNodeIdle(workerNodeKey: number): boolean
isWorkerNodeStealing(workerNodeKey: number): boolean
mapExecute(): Promise<Response[]>
redistributeQueuedTasks(sourceWorkerNodeKey: number): void
registerOnceWorkerMessageListener<Message extends Data | Response>(workerNodeKey: number,listener: (message: MessageValue<Message>) => void,): void
Registers once a listener callback on the worker given its worker node key.
registerWorkerMessageListener<Message extends Data | Response>(workerNodeKey: number,listener: (message: MessageValue<Message>) => void,): void
Registers a listener callback on the worker given its worker node key.
removeTaskFunction(name: string): Promise<boolean>
removeWorkerNode(workerNode: IWorkerNode<Worker, Data>): void
Removes the worker node from the pool worker nodes.
resetTaskSequentiallyStolenStatisticsWorkerUsage(): void
sendKillMessageToWorker(workerNodeKey: number): Promise<void>
sendStartupMessageToWorker(workerNodeKey: number): void
Sends the startup message to worker given its worker node key.
sendStatisticsMessageToWorker(workerNodeKey: number): void
Sends the statistics message to worker given its worker node key.
sendTaskFunctionOperationToWorker(workerNodeKey: number,message: MessageValue<Data>,): Promise<boolean>
sendTaskFunctionOperationToWorkers(message: MessageValue<Data>): Promise<boolean>
sendToWorker(): void
Sends a message to worker given its worker node key.
setDefaultTaskFunction(name: string): Promise<boolean>
setTaskStealing(): void
setTasksQueueOptions(tasksQueueOptions: TasksQueueOptions | undefined): void
setTasksQueuePriority(workerNodeKey: number): void
setTasksQueueSize(size: number): void
setTasksStealingOnBackPressure(): void
setWorkerChoiceStrategy(workerChoiceStrategy: WorkerChoiceStrategy,workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions,): void
setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions: WorkerChoiceStrategyOptions | undefined): boolean
setupHook(): void
Setup hook to execute code before worker nodes are created in the abstract constructor. Can be overridden.
shallCreateDynamicWorker(): boolean
Conditions for dynamic worker creation.
shallExecuteTask(workerNodeKey: number): boolean
shallUpdateTaskFunctionWorkerUsage(workerNodeKey: number): boolean
Whether the worker node shall update its task function worker usage or not.
start(): void
startMinimumNumberOfWorkers(initWorkerNodeUsage?: boolean): void
Starts the minimum number of workers.
tasksQueueSize(workerNodeKey: number): number
unsetTaskStealing(): void
unsetTasksStealingOnBackPressure(): void
updateTaskSequentiallyStolenStatisticsWorkerUsage(): void
updateTaskStolenStatisticsWorkerUsage(): void