Async queue with concurrency limit
typescriptA TypeScript implementation of an async queue that processes tasks with a configurable concurrency limit, ensuring controlled parallel execution of asynchronous operations.
Code
interface QueueTask<T> {
task: () => Promise<T>;
resolve: (value: T) => void;
reject: (error: unknown) => void;
}
class AsyncQueue<T = unknown> {
private readonly concurrency: number;
private running: number = 0;
private readonly queue: QueueTask<T>[] = [];
constructor(concurrency: number) {
if (concurrency < 1) {
throw new Error('Concurrency must be at least 1');
}
this.concurrency = concurrency;
}
async add(task: () => Promise<T>): Promise<T> {
return new Promise<T>((resolve, reject) => {
this.queue.push({ task, resolve, reject });
this.processNext();
});
}
private async processNext(): Promise<void> {
if (this.running >= this.concurrency || this.queue.length === 0) {
return;
}
const item = this.queue.shift();
if (!item) return;
this.running++;
try {
const result = await item.task();
item.resolve(result);
} catch (error) {
item.reject(error);
} finally {
this.running--;
this.processNext();
}
}
get pending(): number {
return this.queue.length;
}
get active(): number {
return this.running;
}
get size(): number {
return this.queue.length + this.running;
}
}
// Example: Processing 10 tasks with concurrency 3
const simulateApiCall = (id: number, duration: number): Promise<string> => {
return new Promise((resolve) => {
console.log(`[${new Date().toISOString()}] Task ${id} started (will take ${duration}ms)`);
setTimeout(() => {
console.log(`[${new Date().toISOString()}] Task ${id} completed`);
resolve(`Result from task ${id}`);
}, duration);
});
};
const runExample = async (): Promise<void> => {
const queue = new AsyncQueue<string>(3);
console.log('Starting 10 tasks with concurrency limit of 3\n');
const tasks = Array.from({ length: 10 }, (_, i) => {
const taskId = i + 1;
const duration = Math.floor(Math.random() * 1000) + 500;
return queue.add(() => simulateApiCall(taskId, duration));
});
const results = await Promise.all(tasks);
console.log('\nAll tasks completed!');
console.log('Results:', results);
};
runExample();How It Works
This async queue implementation solves a common problem in JavaScript applications: controlling the number of concurrent asynchronous operations. Without such a limit, firing off hundreds of API calls simultaneously could overwhelm servers, hit rate limits, or exhaust system resources. The queue ensures that at most N operations run at any given time.
The core mechanism uses a combination of an array-based queue and a Promise wrapper. When you call add(), the task function is wrapped in a new Promise whose resolve and reject functions are stored alongside the task. This allows the queue to control when each task starts while still returning a Promise to the caller that resolves with the task's result. The processNext() method is the scheduler—it checks if there's room to run another task (running < concurrency) and if there are pending tasks, then starts the next one.
The finally block in processNext() is crucial for reliability. Regardless of whether a task succeeds or fails, we decrement the running counter and attempt to process the next task. This ensures the queue doesn't get stuck if a task throws an error. The error is properly propagated to the caller via the stored reject function, allowing individual error handling per task.
The generic type parameter <T> provides type safety throughout. When you create new AsyncQueue<string>(3), TypeScript knows that add() must receive a function returning Promise<string>, and the returned Promise resolves to string. This catches type mismatches at compile time rather than runtime.
Use this pattern when making bulk API requests, processing large batches of files, or any scenario where you need controlled parallelism. Avoid it for CPU-bound tasks (use Worker threads instead) or when tasks must run strictly in sequence (use a simple async loop). For production use, consider adding features like timeout handling, priority queuing, or pause/resume capabilities depending on your needs.