A complete implementation of the Observable pattern in TypeScript, featuring core operators (map, filter, take) and demonstrating reactive event stream handling without external dependencies.
// Core types for our Observable implementation
type Observer<T> = {
next: (value: T) => void;
error?: (err: Error) => void;
complete?: () => void;
};
type Subscription = {
unsubscribe: () => void;
};
type SubscriberFunction<T> = (observer: Observer<T>) => (() => void) | void;
// Observable class implementation
class Observable<T> {
private subscriberFn: SubscriberFunction<T>;
constructor(subscriberFn: SubscriberFunction<T>) {
this.subscriberFn = subscriberFn;
}
subscribe(observer: Observer<T> | ((value: T) => void)): Subscription {
// Normalize observer - allow passing just a next function
const normalizedObserver: Observer<T> =
typeof observer === "function"
? { next: observer, error: undefined, complete: undefined }
: observer;
let isUnsubscribed = false;
// Wrap observer to handle unsubscription
const safeObserver: Observer<T> = {
next: (value: T) => {
if (!isUnsubscribed) {
normalizedObserver.next(value);
}
},
error: (err: Error) => {
if (!isUnsubscribed && normalizedObserver.error) {
normalizedObserver.error(err);
}
},
complete: () => {
if (!isUnsubscribed && normalizedObserver.complete) {
normalizedObserver.complete();
}
},
};
// Execute subscriber function and capture teardown
const teardown = this.subscriberFn(safeObserver);
return {
unsubscribe: () => {
isUnsubscribed = true;
if (teardown) {
teardown();
}
},
};
}
// Transform each emitted value
map<U>(project: (value: T) => U): Observable<U> {
return new Observable<U>((observer) => {
const subscription = this.subscribe({
next: (value) => {
try {
observer.next(project(value));
} catch (err) {
observer.error?.(err instanceof Error ? err : new Error(String(err)));
}
},
error: observer.error,
complete: observer.complete,
});
return () => subscription.unsubscribe();
});
}
// Filter emitted values based on predicate
filter(predicate: (value: T) => boolean): Observable<T> {
return new Observable<T>((observer) => {
const subscription = this.subscribe({
next: (value) => {
try {
if (predicate(value)) {
observer.next(value);
}
} catch (err) {
observer.error?.(err instanceof Error ? err : new Error(String(err)));
}
},
error: observer.error,
complete: observer.complete,
});
return () => subscription.unsubscribe();
});
}
// Take only the first n values then complete
take(count: number): Observable<T> {
return new Observable<T>((observer) => {
let taken = 0;
const subscription = this.subscribe({
next: (value) => {
if (taken < count) {
taken++;
observer.next(value);
if (taken === count) {
observer.complete?.();
subscription.unsubscribe();
}
}
},
error: observer.error,
complete: observer.complete,
});
return () => subscription.unsubscribe();
});
}
// Static factory: create Observable from array
static from<T>(values: T[]): Observable<T> {
return new Observable<T>((observer) => {
for (const value of values) {
observer.next(value);
}
observer.complete?.();
});
}
// Static factory: create Observable from interval
static interval(ms: number): Observable<number> {
return new Observable<number>((observer) => {
let count = 0;
const id = setInterval(() => {
observer.next(count++);
}, ms);
return () => clearInterval(id);
});
}
// Static factory: create Observable from DOM event
static fromEvent<K extends keyof HTMLElementEventMap>(
element: HTMLElement,
eventName: K
): Observable<HTMLElementEventMap[K]> {
return new Observable<HTMLElementEventMap[K]>((observer) => {
const handler = (event: HTMLElementEventMap[K]) => observer.next(event);
element.addEventListener(eventName, handler as EventListener);
return () => element.removeEventListener(eventName, handler as EventListener);
});
}
}
// ============ Usage Examples ============
// Example 1: Basic Observable with operators
console.log("--- Example 1: Array with operators ---");
const numbers$ = Observable.from([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
const subscription1 = numbers$
.filter((n) => n % 2 === 0) // Keep even numbers
.map((n) => n * 10) // Multiply by 10
.take(3) // Take first 3
.subscribe({
next: (value) => console.log("Value:", value),
complete: () => console.log("Complete!"),
});
// Example 2: Interval with auto-cleanup
console.log("\n--- Example 2: Interval stream ---");
const interval$ = Observable.interval(100);
const subscription2 = interval$
.map((n) => `Tick ${n}`)
.take(5)
.subscribe({
next: (msg) => console.log(msg),
complete: () => console.log("Interval complete!"),
});
// Example 3: Custom Observable with teardown
console.log("\n--- Example 3: Custom with teardown ---");
const custom$ = new Observable<string>((observer) => {
console.log("Observable started");
observer.next("Hello");
observer.next("World");
const timeoutId = setTimeout(() => {
observer.next("Delayed");
observer.complete?.();
}, 200);
// Teardown function - called on unsubscribe
return () => {
console.log("Cleanup executed");
clearTimeout(timeoutId);
};
});
const subscription3 = custom$.subscribe({
next: (v) => console.log("Received:", v),
complete: () => console.log("Done!"),
});
// Unsubscribe early to demonstrate cleanup
setTimeout(() => {
console.log("\nUnsubscribing early...");
subscription3.unsubscribe();
}, 100);
// Example 4: Simulated click stream (works in browser)
console.log("\n--- Example 4: Event stream simulation ---");
const simulatedClicks$ = new Observable<{ x: number; y: number }>((observer) => {
// Simulate clicks for demo purposes
const clicks = [
{ x: 100, y: 200 },
{ x: 150, y: 250 },
{ x: 200, y: 300 },
{ x: 50, y: 400 },
];
let i = 0;
const id = setInterval(() => {
if (i < clicks.length) {
observer.next(clicks[i++]);
} else {
observer.complete?.();
clearInterval(id);
}
}, 150);
return () => clearInterval(id);
});
simulatedClicks$
.filter((pos) => pos.x > 100) // Only clicks past x=100
.map((pos) => `Click at (${pos.x}, ${pos.y})`)
.subscribe({
next: (msg) => console.log(msg),
complete: () => console.log("Click stream ended"),
});This Observable implementation follows the producer-consumer pattern where the Observable acts as a lazy data producer that only starts emitting values when subscribed to. The core design decision is using a subscriber function that receives an observer and optionally returns a teardown function. This pattern enables clean resource management - when you unsubscribe, any timers, event listeners, or connections get properly cleaned up.
The Observer interface uses three callbacks: next for values, error for failures, and complete for signaling the stream has ended. The subscribe method wraps the provided observer in a safe observer that checks the unsubscription state before forwarding calls. This prevents emissions to observers that have already unsubscribed, which is crucial for avoiding memory leaks and unexpected behavior in long-lived applications.
The operators (map, filter, take) are implemented as methods that return new Observables, creating a pipeline without modifying the original. Each operator subscribes to the source Observable and applies its transformation logic. The take operator demonstrates an important pattern: it can complete the stream early and automatically unsubscribe from the source, preventing further processing. Error handling in operators wraps projection functions in try-catch blocks to properly propagate errors through the error channel.
The static factory methods (from, interval, fromEvent) show common Observable creation patterns. The fromEvent factory is particularly useful in browser environments for converting DOM events into streams. Note how each factory properly implements teardown logic - interval clears its timer, and fromEvent removes its event listener. This is what makes Observables memory-safe compared to raw callback patterns.
Use this pattern when you need to handle asynchronous data streams with composition and cancellation support. It's ideal for UI events, WebSocket messages, or any scenario where you're processing multiple values over time. Avoid this pattern for simple one-shot async operations where Promises suffice, or in performance-critical hot paths where the operator chain overhead matters. For production applications with complex stream requirements, consider using RxJS which provides battle-tested implementations with many more operators and scheduler support.