A production-ready TypeScript WorkerPool class that maintains a pool of Web Workers, automatically distributes tasks to idle workers, queues tasks when all workers are busy, and provides type-safe Promise-based results.
// worker-pool.ts
interface Task<TInput, TOutput> {
id: string;
input: TInput;
resolve: (value: TOutput) => void;
reject: (reason: Error) => void;
}
interface WorkerWrapper {
worker: Worker;
busy: boolean;
currentTaskId: string | null;
}
interface WorkerMessage<TOutput> {
taskId: string;
result?: TOutput;
error?: string;
}
export class WorkerPool<TInput, TOutput> {
private workers: WorkerWrapper[] = [];
private taskQueue: Task<TInput, TOutput>[] = [];
private taskIdCounter = 0;
private isTerminated = false;
constructor(
private readonly workerScript: string | URL,
private readonly poolSize: number = navigator.hardwareConcurrency || 4
) {
this.initializeWorkers();
}
private initializeWorkers(): void {
for (let i = 0; i < this.poolSize; i++) {
const worker = new Worker(this.workerScript, { type: 'module' });
const wrapper: WorkerWrapper = {
worker,
busy: false,
currentTaskId: null
};
worker.onmessage = (event: MessageEvent<WorkerMessage<TOutput>>) => {
this.handleWorkerMessage(wrapper, event.data);
};
worker.onerror = (error: ErrorEvent) => {
this.handleWorkerError(wrapper, error);
};
this.workers.push(wrapper);
}
}
private handleWorkerMessage(
wrapper: WorkerWrapper,
message: WorkerMessage<TOutput>
): void {
const { taskId, result, error } = message;
const task = this.findTaskById(taskId);
if (task) {
if (error) {
task.reject(new Error(error));
} else {
task.resolve(result as TOutput);
}
this.removeTask(taskId);
}
wrapper.busy = false;
wrapper.currentTaskId = null;
this.processNextTask();
}
private handleWorkerError(wrapper: WorkerWrapper, error: ErrorEvent): void {
if (wrapper.currentTaskId) {
const task = this.findTaskById(wrapper.currentTaskId);
if (task) {
task.reject(new Error(`Worker error: ${error.message}`));
this.removeTask(wrapper.currentTaskId);
}
}
const index = this.workers.indexOf(wrapper);
if (index !== -1) {
wrapper.worker.terminate();
if (!this.isTerminated) {
const newWorker = new Worker(this.workerScript, { type: 'module' });
const newWrapper: WorkerWrapper = {
worker: newWorker,
busy: false,
currentTaskId: null
};
newWorker.onmessage = (event: MessageEvent<WorkerMessage<TOutput>>) => {
this.handleWorkerMessage(newWrapper, event.data);
};
newWorker.onerror = (err: ErrorEvent) => {
this.handleWorkerError(newWrapper, err);
};
this.workers[index] = newWrapper;
this.processNextTask();
}
}
}
private pendingTasks: Map<string, Task<TInput, TOutput>> = new Map();
private findTaskById(taskId: string): Task<TInput, TOutput> | undefined {
return this.pendingTasks.get(taskId);
}
private removeTask(taskId: string): void {
this.pendingTasks.delete(taskId);
}
private getIdleWorker(): WorkerWrapper | undefined {
return this.workers.find(w => !w.busy);
}
private processNextTask(): void {
if (this.taskQueue.length === 0) return;
const idleWorker = this.getIdleWorker();
if (!idleWorker) return;
const task = this.taskQueue.shift()!;
this.executeTask(idleWorker, task);
}
private executeTask(
wrapper: WorkerWrapper,
task: Task<TInput, TOutput>
): void {
wrapper.busy = true;
wrapper.currentTaskId = task.id;
this.pendingTasks.set(task.id, task);
wrapper.worker.postMessage({
taskId: task.id,
input: task.input
});
}
public exec(input: TInput): Promise<TOutput> {
if (this.isTerminated) {
return Promise.reject(new Error('WorkerPool has been terminated'));
}
return new Promise<TOutput>((resolve, reject) => {
const task: Task<TInput, TOutput> = {
id: `task_${++this.taskIdCounter}_${Date.now()}`,
input,
resolve,
reject
};
const idleWorker = this.getIdleWorker();
if (idleWorker) {
this.executeTask(idleWorker, task);
} else {
this.taskQueue.push(task);
}
});
}
public execBatch(inputs: TInput[]): Promise<TOutput[]> {
return Promise.all(inputs.map(input => this.exec(input)));
}
public getStats(): {
poolSize: number;
busyWorkers: number;
idleWorkers: number;
queuedTasks: number;
pendingTasks: number;
} {
const busyWorkers = this.workers.filter(w => w.busy).length;
return {
poolSize: this.poolSize,
busyWorkers,
idleWorkers: this.poolSize - busyWorkers,
queuedTasks: this.taskQueue.length,
pendingTasks: this.pendingTasks.size
};
}
public terminate(): void {
this.isTerminated = true;
for (const task of this.taskQueue) {
task.reject(new Error('WorkerPool terminated'));
}
this.taskQueue = [];
for (const [, task] of this.pendingTasks) {
task.reject(new Error('WorkerPool terminated'));
}
this.pendingTasks.clear();
for (const wrapper of this.workers) {
wrapper.worker.terminate();
}
this.workers = [];
}
}
// Example worker script (save as compute-worker.ts)
// self.onmessage = (event: MessageEvent<{ taskId: string; input: number }>) => {
// const { taskId, input } = event.data;
// try {
// // Simulate CPU-intensive work
// let result = 0;
// for (let i = 0; i < input * 1000000; i++) {
// result += Math.sqrt(i);
// }
// self.postMessage({ taskId, result });
// } catch (error) {
// self.postMessage({ taskId, error: (error as Error).message });
// }
// };
// Usage example:
// const pool = new WorkerPool<number, number>(
// new URL('./compute-worker.ts', import.meta.url),
// 4
// );
//
// const result = await pool.exec(100);
// console.log('Result:', result);
//
// const results = await pool.execBatch([10, 20, 30, 40]);
// console.log('Batch results:', results);
//
// console.log('Pool stats:', pool.getStats());
//
// pool.terminate();This WorkerPool implementation follows the object pool pattern, a classic concurrency design pattern that pre-allocates expensive resources (Web Workers in this case) and reuses them for multiple tasks. The pool maintains a fixed number of workers equal to the CPU's logical cores by default, preventing resource exhaustion while maximizing parallelism.
The core mechanism uses a task queue combined with worker availability tracking. When you call exec(), the pool first checks for an idle worker. If one exists, the task executes immediately; otherwise, it enters the queue. Each completed task triggers processNextTask(), which dequeues the next waiting task if any workers have become available. This creates a self-regulating system that maintains maximum throughput without overwhelming the browser.
Type safety is achieved through generics <TInput, TOutput> that flow through the entire system. The Task interface captures both the input data and the Promise resolution callbacks, allowing the pool to complete Promises when workers respond. Worker messages include a taskId that correlates responses with their originating tasks, essential since worker communication is inherently asynchronous and unordered.
Error handling addresses three failure modes: task-level errors (caught and serialized by the worker), worker crashes (handled by onerror), and pool termination. When a worker crashes, the implementation automatically spawns a replacement and continues processing queued tasks. The terminate() method provides graceful shutdown by rejecting all pending and queued tasks before destroying workers, preventing memory leaks and orphaned Promises.
Use this pattern when you have CPU-intensive operations that would block the main thread, such as image processing, complex calculations, or data parsing. Avoid it for I/O-bound tasks where async/await suffices, or for simple operations where worker instantiation overhead exceeds the computation time. The pool size should typically match navigator.hardwareConcurrency for CPU-bound work, but consider reducing it if tasks use significant memory or if you need to preserve UI responsiveness.