Semaphore for async concurrency control
typescriptA production-ready TypeScript Semaphore implementation that controls concurrent async operations, featuring acquire/release semantics and automatic cleanup for rate-limiting scenarios.
Code
interface QueuedResolver {
resolve: () => void;
reject: (reason?: unknown) => void;
}
class Semaphore {
private permits: number;
private readonly maxPermits: number;
private readonly queue: QueuedResolver[] = [];
constructor(maxPermits: number) {
if (maxPermits < 1) {
throw new Error('Semaphore requires at least 1 permit');
}
this.permits = maxPermits;
this.maxPermits = maxPermits;
}
async acquire(): Promise<void> {
if (this.permits > 0) {
this.permits--;
return Promise.resolve();
}
return new Promise<void>((resolve, reject) => {
this.queue.push({ resolve, reject });
});
}
release(): void {
if (this.queue.length > 0) {
const next = this.queue.shift()!;
next.resolve();
} else if (this.permits < this.maxPermits) {
this.permits++;
}
}
async withPermit<T>(fn: () => Promise<T>): Promise<T> {
await this.acquire();
try {
return await fn();
} finally {
this.release();
}
}
get availablePermits(): number {
return this.permits;
}
get waitingCount(): number {
return this.queue.length;
}
cancelAll(reason: string = 'Semaphore cancelled'): void {
while (this.queue.length > 0) {
const waiter = this.queue.shift()!;
waiter.reject(new Error(reason));
}
}
}
interface ApiResponse<T> {
data: T;
timestamp: number;
}
interface User {
id: number;
name: string;
email: string;
}
const simulateApiCall = async (userId: number): Promise<ApiResponse<User>> => {
const delay = 500 + Math.random() * 1000;
await new Promise(resolve => setTimeout(resolve, delay));
if (Math.random() < 0.1) {
throw new Error(`API error for user ${userId}`);
}
return {
data: {
id: userId,
name: `User ${userId}`,
email: `user${userId}@example.com`
},
timestamp: Date.now()
};
};
const fetchUsersWithRateLimit = async (userIds: number[], maxConcurrent: number): Promise<Map<number, User | Error>> => {
const semaphore = new Semaphore(maxConcurrent);
const results = new Map<number, User | Error>();
const fetchUser = async (userId: number): Promise<void> => {
await semaphore.withPermit(async () => {
console.log(`[${new Date().toISOString()}] Fetching user ${userId} (${semaphore.waitingCount} waiting)`);
try {
const response = await simulateApiCall(userId);
results.set(userId, response.data);
console.log(`[${new Date().toISOString()}] Completed user ${userId}`);
} catch (error) {
results.set(userId, error instanceof Error ? error : new Error(String(error)));
console.log(`[${new Date().toISOString()}] Failed user ${userId}`);
}
});
};
await Promise.all(userIds.map(fetchUser));
return results;
};
const runDemo = async (): Promise<void> => {
console.log('Starting rate-limited API calls demo...\n');
console.log('Fetching 10 users with max 3 concurrent requests:\n');
const userIds = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
const startTime = Date.now();
const results = await fetchUsersWithRateLimit(userIds, 3);
const elapsed = Date.now() - startTime;
console.log(`\nCompleted in ${elapsed}ms`);
console.log('\nResults:');
results.forEach((result, userId) => {
if (result instanceof Error) {
console.log(` User ${userId}: ERROR - ${result.message}`);
} else {
console.log(` User ${userId}: ${result.name} (${result.email})`);
}
});
};
runDemo().catch(console.error);How It Works
This Semaphore implementation uses a classic counting semaphore pattern adapted for JavaScript's async/await model. The core concept is simple: maintain a count of available permits and a queue of waiting promises. When acquire() is called with permits available, it immediately decrements the count and resolves. When no permits are available, it creates a new Promise and stores its resolver in a queue, effectively pausing the caller until a permit becomes available.
The release() method is the counterpart that either wakes up the next waiter in the queue or increments the permit count. A critical design decision here is that release() directly resolves the next queued promise rather than incrementing permits first. This ensures strict FIFO ordering and prevents race conditions where a new caller might steal a permit from a long-waiting request. The implementation also guards against over-releasing by checking if permits exceed maxPermits.
The withPermit() helper method implements the RAII (Resource Acquisition Is Initialization) pattern using try/finally, ensuring permits are always released even when the wrapped operation throws. This is essential for production code because forgotten release() calls lead to deadlocks. The finally block guarantees cleanup regardless of how the async operation completes, making the API much safer to use than manual acquire/release pairs.
The example demonstrates rate-limiting API calls by limiting concurrent requests to 3. Without the semaphore, all 10 requests would fire simultaneously, potentially overwhelming the API server or hitting rate limits. With the semaphore, requests are naturally throttled while still executing as fast as the limit allows. Notice how Promise.all() still processes everything concurrently from the caller's perspective, but the semaphore internally serializes acquisition.
Consider using this pattern when calling external APIs with rate limits, managing database connection pools, controlling file system access, or limiting memory-intensive operations. Avoid it for CPU-bound tasks (use Worker threads instead) or when operations are already naturally serialized. For more sophisticated needs, consider adding timeout support to acquire(), priority queues, or fair scheduling algorithms.