A production-ready TypeScript implementation of a Node.js stream pipeline that processes large CSV files efficiently using stream/promises pipeline(), custom Transform streams for parsing and filtering, with proper error handling and backpressure support.
import { createReadStream, createWriteStream } from 'node:fs';
import { pipeline } from 'node:stream/promises';
import { Transform, TransformCallback } from 'node:stream';
// Type definitions
interface CsvRecord {
[key: string]: string;
}
interface FilteredRecord {
id: string;
name: string;
email: string;
amount: number;
processed: boolean;
}
interface PipelineStats {
totalRows: number;
filteredRows: number;
writtenRows: number;
processingTimeMs: number;
}
// CSV Parser Transform - converts raw CSV lines to objects
class CsvParserTransform extends Transform {
private headers: string[] = [];
private isFirstLine = true;
private buffer = '';
private rowCount = 0;
constructor() {
super({ objectMode: true, highWaterMark: 64 });
}
_transform(chunk: Buffer, encoding: BufferEncoding, callback: TransformCallback): void {
this.buffer += chunk.toString();
const lines = this.buffer.split('\n');
this.buffer = lines.pop() || '';
try {
for (const line of lines) {
const trimmedLine = line.trim();
if (!trimmedLine) continue;
const values = this.parseCsvLine(trimmedLine);
if (this.isFirstLine) {
this.headers = values.map(h => h.toLowerCase().trim());
this.isFirstLine = false;
continue;
}
const record: CsvRecord = {};
this.headers.forEach((header, index) => {
record[header] = values[index] || '';
});
this.rowCount++;
this.push(record);
}
callback();
} catch (error) {
callback(error instanceof Error ? error : new Error(String(error)));
}
}
_flush(callback: TransformCallback): void {
if (this.buffer.trim()) {
try {
const values = this.parseCsvLine(this.buffer.trim());
const record: CsvRecord = {};
this.headers.forEach((header, index) => {
record[header] = values[index] || '';
});
this.rowCount++;
this.push(record);
} catch (error) {
return callback(error instanceof Error ? error : new Error(String(error)));
}
}
callback();
}
private parseCsvLine(line: string): string[] {
const result: string[] = [];
let current = '';
let inQuotes = false;
for (let i = 0; i < line.length; i++) {
const char = line[i];
const nextChar = line[i + 1];
if (inQuotes) {
if (char === '"' && nextChar === '"') {
current += '"';
i++;
} else if (char === '"') {
inQuotes = false;
} else {
current += char;
}
} else {
if (char === '"') {
inQuotes = true;
} else if (char === ',') {
result.push(current.trim());
current = '';
} else {
current += char;
}
}
}
result.push(current.trim());
return result;
}
getRowCount(): number {
return this.rowCount;
}
}
// Filter Transform - filters and transforms records
class FilterTransform extends Transform {
private filteredCount = 0;
private passedCount = 0;
private filterFn: (record: CsvRecord) => boolean;
constructor(filterFn: (record: CsvRecord) => boolean) {
super({ objectMode: true, highWaterMark: 64 });
this.filterFn = filterFn;
}
_transform(record: CsvRecord, encoding: BufferEncoding, callback: TransformCallback): void {
try {
if (this.filterFn(record)) {
const transformed: FilteredRecord = {
id: record.id || '',
name: record.name || '',
email: record.email || '',
amount: parseFloat(record.amount) || 0,
processed: true
};
this.passedCount++;
this.push(transformed);
} else {
this.filteredCount++;
}
callback();
} catch (error) {
callback(error instanceof Error ? error : new Error(String(error)));
}
}
getStats(): { filtered: number; passed: number } {
return { filtered: this.filteredCount, passed: this.passedCount };
}
}
// JSON Line Writer Transform - converts objects to JSON lines
class JsonLineWriterTransform extends Transform {
private writtenCount = 0;
private isFirst = true;
constructor() {
super({ objectMode: true, highWaterMark: 64 });
}
_transform(record: FilteredRecord, encoding: BufferEncoding, callback: TransformCallback): void {
try {
const prefix = this.isFirst ? '[\n' : ',\n';
this.isFirst = false;
const line = prefix + ' ' + JSON.stringify(record);
this.writtenCount++;
callback(null, line);
} catch (error) {
callback(error instanceof Error ? error : new Error(String(error)));
}
}
_flush(callback: TransformCallback): void {
this.push(this.writtenCount > 0 ? '\n]\n' : '[]\n');
callback();
}
getWrittenCount(): number {
return this.writtenCount;
}
}
// Main pipeline function
async function processCsvPipeline(
inputPath: string,
outputPath: string,
filterFn: (record: CsvRecord) => boolean
): Promise<PipelineStats> {
const startTime = performance.now();
const readStream = createReadStream(inputPath, {
encoding: 'utf8',
highWaterMark: 64 * 1024
});
const csvParser = new CsvParserTransform();
const filterTransform = new FilterTransform(filterFn);
const jsonWriter = new JsonLineWriterTransform();
const writeStream = createWriteStream(outputPath, {
encoding: 'utf8',
highWaterMark: 64 * 1024
});
await pipeline(
readStream,
csvParser,
filterTransform,
jsonWriter,
writeStream
);
const endTime = performance.now();
const filterStats = filterTransform.getStats();
return {
totalRows: csvParser.getRowCount(),
filteredRows: filterStats.filtered,
writtenRows: jsonWriter.getWrittenCount(),
processingTimeMs: Math.round(endTime - startTime)
};
}
// Example usage with sample data generation
async function main(): Promise<void> {
const { writeFileSync } = await import('node:fs');
// Generate sample CSV for demonstration
const sampleCsv = `id,name,email,amount
1,Alice Johnson,alice@example.com,150.50
2,Bob Smith,bob@example.com,75.25
3,Charlie Brown,charlie@example.com,200.00
4,Diana Ross,diana@example.com,50.00
5,Eve Wilson,eve@example.com,300.75
6,Frank Miller,frank@example.com,25.00
7,Grace Lee,grace@example.com,175.50
8,Henry Ford,henry@example.com,90.00
9,Iris Chen,iris@example.com,250.25
10,Jack Davis,jack@example.com,125.00`;
writeFileSync('input.csv', sampleCsv);
console.log('Created sample input.csv');
try {
const stats = await processCsvPipeline(
'input.csv',
'output.json',
(record) => parseFloat(record.amount) >= 100
);
console.log('Pipeline completed successfully!');
console.log(`Total rows processed: ${stats.totalRows}`);
console.log(`Rows filtered out: ${stats.filteredRows}`);
console.log(`Rows written: ${stats.writtenRows}`);
console.log(`Processing time: ${stats.processingTimeMs}ms`);
} catch (error) {
console.error('Pipeline failed:', error);
process.exit(1);
}
}
main();This implementation demonstrates a production-ready approach to processing large CSV files using Node.js streams. The key insight is that streams process data in chunks, maintaining constant memory usage regardless of file size. By using pipeline() from stream/promises, we get automatic error propagation and proper cleanup of all streams if any stage fails.
The architecture consists of four connected stages: a file read stream, a CSV parser transform, a filter transform, and a JSON writer transform feeding into a file write stream. Each transform extends Node.js's Transform class and operates in objectMode, allowing us to pass JavaScript objects between stages rather than raw buffers. The highWaterMark option controls backpressure—when a downstream stage can't keep up, upstream stages automatically pause, preventing memory bloat.
The CSV parser handles edge cases that naive implementations miss: quoted fields containing commas, escaped quotes within fields, and partial lines at chunk boundaries. The buffer accumulation pattern (this.buffer += chunk; lines = buffer.split('\n'); buffer = lines.pop()) ensures we never process incomplete lines. This is critical because chunk boundaries don't respect line boundaries in the source file.
Error handling follows Node.js stream conventions: errors in _transform are passed to the callback, which propagates them through the pipeline. The pipeline() function ensures all streams are properly destroyed on error, preventing resource leaks. The stats tracking provides observability without affecting performance—counters are incremented in-place with no additional allocations.
Use this pattern when processing files larger than available memory, when you need backpressure management, or when building ETL pipelines. Avoid it for small files where the overhead isn't justified, or when you need random access to records (streams are sequential by nature). For CSV parsing in production, consider established libraries like csv-parse which handle more edge cases, but this implementation demonstrates the core streaming concepts clearly.