Fix: Node.js Stream Error — Pipe Not Working, Backpressure, or Premature Close
Quick Answer
How to fix Node.js stream issues — pipe and pipeline errors, backpressure handling, Transform streams, async iteration, error propagation, and common stream anti-patterns.
The Problem
A Node.js stream pipe produces an error or stops unexpectedly:
const fs = require('fs');
const readable = fs.createReadStream('large-file.csv');
const writable = fs.createWriteStream('output.csv');
readable.pipe(writable);
// Error: write after end
// Or: the output file is empty
// Or: the process hangs without completingOr a Transform stream silently drops data:
const { Transform } = require('stream');
const upper = new Transform({
transform(chunk, encoding, callback) {
this.push(chunk.toString().toUpperCase());
// Missing: callback() ← stream halts here
},
});Or an error in one piped stream crashes the process instead of being caught:
readable.pipe(transform).pipe(writable);
// If transform throws: UnhandledPromiseRejection or uncaught exception
// The error doesn't propagate to writableWhy This Happens
Node.js streams have behaviors that aren’t obvious until they break:
.pipe()doesn’t propagate errors — if a stream in a.pipe()chain emits an error, the destination stream is NOT automatically closed or destroyed. You must handle each stream’s error event individually.- Missing
callback()in Transform — every call totransform()must call itscallbackwhen done, even if no data is pushed. Without it, the stream stalls permanently. - Backpressure ignored — when
writable.write()returnsfalse, the readable should pause untildrainis emitted. Ignoring this overflows the writable’s buffer. - Async operations in streams — mixing
async/awaitwith the callback-based stream API requires care. An unhandled rejection in a transform doesn’t emit an error event. - Stream consumed twice — a readable stream can only be consumed once. Piping it to two destinations or reading it after it’s already been piped causes the second consumer to get nothing.
Fix 1: Use pipeline() Instead of pipe()
stream.pipeline() is the modern replacement for .pipe(). It handles error propagation and stream cleanup automatically:
const { pipeline } = require('stream');
const fs = require('fs');
const zlib = require('zlib');
// WRONG — pipe() doesn't propagate errors or clean up
fs.createReadStream('input.txt')
.pipe(zlib.createGzip())
.pipe(fs.createWriteStream('output.txt.gz'));
// If zlib fails: writable stays open, disk space leaks
// CORRECT — pipeline() cleans up all streams on error
pipeline(
fs.createReadStream('input.txt'),
zlib.createGzip(),
fs.createWriteStream('output.txt.gz'),
(err) => {
if (err) {
console.error('Pipeline failed:', err);
} else {
console.log('Pipeline succeeded');
}
}
);
// With Promise (Node.js 15+)
const { pipeline } = require('stream/promises');
async function compress() {
await pipeline(
fs.createReadStream('input.txt'),
zlib.createGzip(),
fs.createWriteStream('output.txt.gz'),
);
console.log('Done');
}pipeline() vs pipe() comparison:
.pipe() | pipeline() | |
|---|---|---|
| Error propagation | Manual (each stream) | Automatic |
| Stream cleanup on error | Manual | Automatic |
| Callback/Promise | No | Yes |
| Multiple transforms | Chained calls | Single call |
| Recommended | No (legacy) | Yes |
Fix 2: Always Call callback() in Transform Streams
Every invocation of the transform() method must call its callback:
const { Transform } = require('stream');
// WRONG — missing callback() call stalls the stream
const brokenTransform = new Transform({
transform(chunk, encoding, callback) {
const processed = chunk.toString().toUpperCase();
this.push(processed);
// callback() not called → stream hangs
},
});
// CORRECT — always call callback when done
const upperCaseTransform = new Transform({
transform(chunk, encoding, callback) {
const processed = chunk.toString().toUpperCase();
this.push(processed);
callback(); // Signal: ready for next chunk
},
});
// CORRECT — callback takes optional error and data arguments
const parseJSONTransform = new Transform({
objectMode: true, // Work with objects, not Buffers
transform(chunk, encoding, callback) {
try {
const obj = JSON.parse(chunk.toString());
callback(null, obj); // Pass transformed data via callback
} catch (err) {
callback(err); // Pass error — stream emits 'error' event
}
},
});Async transform function:
const asyncTransform = new Transform({
transform(chunk, encoding, callback) {
// Can't use async directly — wrap with a self-invoking async function
(async () => {
const result = await processChunk(chunk);
this.push(result);
callback();
})().catch(callback); // Pass async errors to callback
},
});Fix 3: Handle Backpressure
Backpressure prevents memory overflow when the writable can’t keep up with the readable:
const fs = require('fs');
const readable = fs.createReadStream('huge-file.bin');
const writable = fs.createWriteStream('destination.bin');
// WRONG — ignore backpressure signal, overwhelms writable buffer
readable.on('data', (chunk) => {
writable.write(chunk); // write() may return false — ignored here
});
// CORRECT — handle backpressure
readable.on('data', (chunk) => {
const canContinue = writable.write(chunk);
if (!canContinue) {
readable.pause(); // Stop reading until writable drains
}
});
writable.on('drain', () => {
readable.resume(); // Writable buffer cleared, resume reading
});
readable.on('end', () => {
writable.end();
});
readable.on('error', (err) => console.error('Read error:', err));
writable.on('error', (err) => console.error('Write error:', err));Note:
pipeline()handles backpressure automatically. Use it instead of manually wiringpause/resume.
Fix 4: Propagate Errors in .pipe() Chains
When you must use .pipe(), add error handlers to every stream in the chain:
const readable = fs.createReadStream('input.csv');
const transform = new MyTransform();
const writable = fs.createWriteStream('output.csv');
function destroyAll(err) {
console.error('Stream error:', err);
readable.destroy();
transform.destroy();
writable.destroy();
}
readable.on('error', destroyAll);
transform.on('error', destroyAll);
writable.on('error', destroyAll);
readable.pipe(transform).pipe(writable);Or switch to pipeline() entirely — it’s the proper solution:
// This replaces all the manual error handling above
pipeline(readable, transform, writable, (err) => {
if (err) console.error('Pipeline failed:', err);
});Fix 5: Use Async Iteration for Simpler Stream Consumption
Node.js 12+ supports for await...of on readable streams, which is cleaner than event-based reading:
const fs = require('fs');
const readline = require('readline');
// Read a file line by line
async function processLines(filePath) {
const fileStream = fs.createReadStream(filePath);
const rl = readline.createInterface({ input: fileStream });
for await (const line of rl) {
if (line.startsWith('#')) continue; // Skip comments
await processLine(line);
}
}
// Process a readable stream chunk by chunk
async function processStream(readable) {
for await (const chunk of readable) {
console.log('Chunk:', chunk.length, 'bytes');
}
}
// With error handling
async function safeProcessStream(readable) {
try {
for await (const chunk of readable) {
await handleChunk(chunk);
}
} catch (err) {
console.error('Stream error during iteration:', err);
readable.destroy();
}
}Collecting all stream data into a buffer:
async function streamToBuffer(readable) {
const chunks = [];
for await (const chunk of readable) {
chunks.push(chunk);
}
return Buffer.concat(chunks);
}
async function streamToString(readable, encoding = 'utf8') {
const buffer = await streamToBuffer(readable);
return buffer.toString(encoding);
}
// Usage
const content = await streamToString(fs.createReadStream('file.txt'));Fix 6: Create Custom Readable and Writable Streams
Common patterns for building streams correctly:
const { Readable, Writable, Transform } = require('stream');
// Custom Readable — push data on demand
class CounterStream extends Readable {
constructor(max) {
super({ objectMode: true });
this.current = 0;
this.max = max;
}
_read() {
if (this.current <= this.max) {
this.push(this.current++);
} else {
this.push(null); // Signal end of stream
}
}
}
// Custom Writable — consume data
class CollectorStream extends Writable {
constructor() {
super({ objectMode: true });
this.collected = [];
}
_write(chunk, encoding, callback) {
this.collected.push(chunk);
callback(); // Must call — signals ready for next chunk
}
_final(callback) {
// Called when writable is ending — flush any remaining data
console.log('Collected:', this.collected);
callback();
}
}
// Custom Transform
class DoubleTransform extends Transform {
constructor() {
super({ objectMode: true });
}
_transform(chunk, encoding, callback) {
this.push(chunk * 2);
callback();
}
}
// Use them together
const { pipeline } = require('stream/promises');
await pipeline(
new CounterStream(10),
new DoubleTransform(),
new CollectorStream(),
);Fix 7: Read Streams Correctly from HTTP Responses
Streaming HTTP responses has specific patterns:
const https = require('https');
const fs = require('fs');
const { pipeline } = require('stream/promises');
// Download a file using streams — memory efficient for large files
async function downloadFile(url, destPath) {
const response = await new Promise((resolve, reject) => {
https.get(url, resolve).on('error', reject);
});
if (response.statusCode !== 200) {
response.resume(); // Drain the response to free memory
throw new Error(`Download failed: ${response.statusCode}`);
}
await pipeline(
response,
fs.createWriteStream(destPath),
);
}
// With the 'got' or 'undici' library (simpler API)
import got from 'got';
const { pipeline } = require('stream/promises');
await pipeline(
got.stream('https://example.com/large-file.zip'),
fs.createWriteStream('file.zip'),
);Still Not Working?
'finish' vs 'end' event — 'end' is emitted on Readable streams when all data is consumed. 'finish' is emitted on Writable streams when writable.end() is called and all data is flushed. Using the wrong event for the wrong stream type is a common source of timing bugs.
Stream is paused and never resumes — if you attach a 'data' event listener after attaching a 'readable' listener, the stream state can get confused. Use one API consistently: either event-based ('data'/'end') or the readable.read() API.
Streams in objectMode — by default, streams work with Buffer or string. Pass { objectMode: true } in the constructor options when working with JavaScript objects. Mixing objectMode and non-objectMode streams in a pipeline causes type errors.
write() after end() — calling writable.write() after writable.end() throws Error: write after end. This often happens in concurrent scenarios where multiple async operations try to write to the same stream. Use a queue or ensure writes are serialized.
For related Node.js issues, see Fix: Node.js Heap Out of Memory and Fix: Node.js Uncaught Exception.
Solo developer based in Japan. Every solution is cross-referenced with official documentation and tested before publishing.
Was this article helpful?
Related Articles
Fix: Fastify Not Working — 404, Plugin Encapsulation, and Schema Validation Errors
How to fix Fastify issues — route 404 from plugin encapsulation, reply already sent, FST_ERR_VALIDATION, request.body undefined, @fastify/cors, hooks not running, and TypeScript type inference.
Fix: Bun Not Working — Node.js Module Incompatible, Native Addon Fails, or bun test Errors
How to fix Bun runtime issues — Node.js API compatibility, native addons (node-gyp), Bun.serve vs Node http, bun test differences from Jest, and common package incompatibilities.
Fix: Node.js UnhandledPromiseRejection and uncaughtException — Crashing Server
How to handle Node.js uncaughtException and unhandledRejection events — graceful shutdown, error logging, async error boundaries, and keeping servers alive safely.
Fix: Socket.IO CORS Error — Cross-Origin Connection Blocked
How to fix Socket.IO CORS errors — server-side CORS configuration, credential handling, polling vs WebSocket transport, proxy setup, and common connection failures.