Fix: Node.js Stream pipeline() Not Working — Backpressure, Error Propagation, AbortSignal, and Web Streams Interop
Part of: JavaScript & TypeScript Errors
Quick Answer
How to fix Node.js stream/promises pipeline errors — uncaught stream errors, backpressure ignored, AbortSignal not propagating, async iterators in pipeline, Transform stream object mode, and converting between Node and Web Streams.
The Error
You write a streaming pipeline and an error crashes your process:
import { createReadStream, createWriteStream } from "node:fs";
const src = createReadStream("input.bin");
const dst = createWriteStream("output.bin");
src.pipe(dst);
// If src errors mid-read, dst leaks an open file descriptor and the process can crash.Or pipeline() swallows your Transform’s errors:
import { pipeline } from "node:stream/promises";
await pipeline(
createReadStream("input.txt"),
new Transform({ transform(chunk, enc, cb) { throw new Error("nope"); } }),
createWriteStream("output.txt"),
);
// pipeline awaits and rejects — but only if you use the promise version.Or AbortSignal doesn’t cancel a long-running pipeline:
const controller = new AbortController();
setTimeout(() => controller.abort(), 1000);
await pipeline(slowSource, slowSink);
// Keeps running until completion — signal not wired.Or you mix Node and Web Streams and types break:
const webReadable: ReadableStream = ...;
await pipeline(webReadable, createWriteStream("out.bin"));
// TypeError: source.on is not a functionWhy This Happens
Node.js streams have three layers of API:
- Legacy
.pipe()from EventEmitter days. Doesn’t handle errors well. Error in any stream leaves others dangling. stream.pipeline(...)callback form. Handles cleanup and error propagation.stream/promisespipeline()— same but returns a Promise. The modern default.
Backpressure (the consumer pausing the producer when it can’t keep up) is automatic with pipeline(). With pipe() you have to wire it manually.
Web Streams (ReadableStream/WritableStream) are a separate, browser-compatible API. Node has both — interop requires conversion via Readable.fromWeb() / Readable.toWeb().
The reason these issues feel hard to debug is that the stream classes in Node carry over thirteen years of design decisions. The original Stream class was a thin wrapper over EventEmitter, where data and end events drove everything. Readable and Writable (Streams 2) introduced backpressure via the write() return value and the drain event. Streams 3 added the read() method and paused mode. Then async iterators arrived (for await...of), and the standard Web Streams API came to the runtime via node:stream/web. The result is that the same conceptual operation — “pipe a source into a sink” — has at least five legitimate implementations, each with subtly different error and cleanup semantics. pipeline() exists specifically to paper over those differences and give you one mental model: it’s a Promise that resolves on success, rejects with the first error, and always destroys every stream in the chain.
The pipeline() contract has a few sharp edges worth knowing. It treats any error as fatal — there’s no “continue on transform error” mode; if you want that, write a wrapper Transform that swallows errors and emits an error sentinel downstream. It also runs streams in order: pipeline(a, b, c) connects a → b → c, not a fan-out. For fan-out you need PassThrough and Promise.all(finished(...)). And it does not buffer — if your source is faster than your sink, memory does not balloon, but your sink will receive backpressure signals. Forgetting to honor those signals (a sink that ignores write’s return value) is the most common cause of “my pipeline silently corrupts data.”
Fix 1: Use pipeline() From stream/promises
The modern pattern:
import { pipeline } from "node:stream/promises";
import { createReadStream, createWriteStream } from "node:fs";
import { createGzip } from "node:zlib";
await pipeline(
createReadStream("input.txt"),
createGzip(),
createWriteStream("output.txt.gz"),
);pipeline():
- Wires all the streams together.
- Closes downstream when upstream finishes.
- Closes upstream when downstream fails.
- Surfaces the first error as a rejection.
- Cleans up file descriptors on error.
For a stream that fails partway:
try {
await pipeline(src, transform, dst);
console.log("done");
} catch (err) {
console.error("pipeline failed:", err);
}Inside the try block, you don’t need stream-specific error handling — pipeline collects them all.
Pro Tip: Always prefer stream/promises pipeline() over the callback stream.pipeline() and certainly over .pipe(). The promise form composes with async/await and proper try/catch.
Fix 2: Backpressure Just Works (Mostly)
With pipeline(), backpressure is automatic:
const src = createReadStream("huge.bin"); // 10 GB file
const transform = new Transform({ ... }); // Slow
const dst = createWriteStream("output.bin");
await pipeline(src, transform, dst);
// src reads only as fast as transform can process.
// transform writes only as fast as dst accepts.
// Memory usage stays bounded.With raw .pipe(), you need to respect write()’s return value:
// Without backpressure (bad):
src.on("data", (chunk) => dst.write(chunk));
// src may emit faster than dst drains; memory grows.
// With backpressure (good):
src.on("data", (chunk) => {
const ok = dst.write(chunk);
if (!ok) src.pause();
});
dst.on("drain", () => src.resume());This is exactly what pipeline() does for you.
Common Mistake: Using for await on a stream then writing without checking backpressure:
// Bad — accumulates memory:
for await (const chunk of src) {
dst.write(chunk); // Doesn't wait for dst to drain
}
// Good — explicit await:
for await (const chunk of src) {
if (!dst.write(chunk)) {
await new Promise((resolve) => dst.once("drain", resolve));
}
}Or just use pipeline().
Fix 3: Transform Streams With Error Handling
import { Transform } from "node:stream";
const upper = new Transform({
transform(chunk, encoding, callback) {
try {
const result = chunk.toString().toUpperCase();
callback(null, result);
} catch (err) {
callback(err);
}
},
});
await pipeline(src, upper, dst);Three rules for Transform:
- Always call
callback. Forgetting to call it hangs the pipeline. - Pass errors as the first arg.
callback(err)— pipeline catches it. - Pass output as the second arg.
callback(null, chunk)— emits the transformed chunk.
For async transforms:
const fetchEnriched = new Transform({
objectMode: true,
async transform(item, encoding, callback) {
try {
const data = await fetchExternal(item.id);
callback(null, { ...item, data });
} catch (err) {
callback(err);
}
},
});objectMode: true lets the stream pass JS objects instead of strings/Buffers.
For higher-order patterns, use compose():
import { compose, pipeline } from "node:stream/promises";
const combined = compose(filter, enrich, format);
await pipeline(src, combined, dst);compose returns a single Transform that chains the inputs.
Fix 4: AbortSignal for Cancellation
const controller = new AbortController();
setTimeout(() => controller.abort(), 5000);
try {
await pipeline(src, transform, dst, { signal: controller.signal });
} catch (err) {
if (err.name === "AbortError") {
console.log("cancelled");
} else {
throw err;
}
}The signal option propagates to all streams; abort destroys them all and rejects the promise.
For interactive cancellation (e.g. user clicks cancel):
const controller = new AbortController();
uiCancelButton.onclick = () => controller.abort();
await pipeline(networkRead, transform, fileWrite, { signal: controller.signal });The abort closes the network socket, stops the transform, closes the file. Cleanup is automatic.
Common Mistake: Wrapping pipeline() in a setTimeout(..., timeout) and trying to manually destroy streams on timeout. Use the signal option — it’s cleaner and handles cleanup correctly.
Fix 5: Async Iterators as Pipeline Sources
Any async iterable can be the source:
async function* generateLines() {
for (let i = 0; i < 1000; i++) {
yield `line ${i}\n`;
}
}
await pipeline(generateLines, createWriteStream("output.txt"));pipeline accepts:
- Readable streams.
- Async generators / iterables.
- Strings, Buffers (one-shot).
- Functions returning iterables.
For transforms via async generators:
async function* upperCase(source) {
for await (const chunk of source) {
yield chunk.toString().toUpperCase();
}
}
await pipeline(
createReadStream("input.txt"),
upperCase,
createWriteStream("output.txt"),
);This is often clearer than a Transform subclass — straight async code with yield for emitting.
Pro Tip: For data-processing pipelines with multiple steps, async generators are easier to write and debug than Transform streams. The Transform class has its place when you need fine-grained control (writev, finalv, etc.).
Fix 6: Web Streams Interop
Convert between Node and Web Streams with the static helpers:
import { Readable, Writable } from "node:stream";
// Node Readable → Web ReadableStream:
const nodeReadable = createReadStream("input.txt");
const webReadable: ReadableStream<Uint8Array> = Readable.toWeb(nodeReadable);
// Web ReadableStream → Node Readable:
const response = await fetch("https://api.example.com/big.json");
const nodeReadable = Readable.fromWeb(response.body!);
await pipeline(nodeReadable, createWriteStream("downloaded.json"));For fetch responses specifically:
const response = await fetch(url);
await pipeline(
Readable.fromWeb(response.body!),
createWriteStream("downloaded.bin"),
);For Writables:
import { WritableStream } from "node:stream/web";
const webWritable: WritableStream<Uint8Array> = ...;
const nodeWritable = Writable.fromWeb(webWritable);
await pipeline(src, nodeWritable);Note: Web Streams in Node.js are the same standard as in browsers but the implementations differ in subtle ways (object mode, error timing). For Node-only code, prefer Node streams — only convert at API boundaries.
Fix 7: Performance — Highwatermark and Object Mode
Default chunk buffering is 16 KB (64 for object mode). For high-throughput files:
const src = createReadStream("huge.bin", { highWaterMark: 1024 * 1024 }); // 1 MB chunks
const dst = createWriteStream("out.bin", { highWaterMark: 1024 * 1024 });
await pipeline(src, dst);Larger chunks = fewer syscalls, better throughput at the cost of more memory.
For object streams (records, not bytes):
const lineStream = new Transform({
objectMode: true,
highWaterMark: 100, // Up to 100 buffered objects
transform(chunk, enc, cb) {
// Process per record
},
});objectMode: true is essential for streams of structured data — without it, Node tries to coerce to Buffer.
Common Mistake: Mixing object-mode and byte-mode streams in one pipeline. The first non-object-mode stream coerces objects to strings via String(). Either keep the pipeline all-object-mode or serialize/deserialize explicitly.
Fix 8: Memory Leaks From Forgotten Cleanup
For pipeline(), cleanup is automatic. For raw .pipe() or manual streams:
import { finished } from "node:stream/promises";
const src = createReadStream("input.bin");
const dst = createWriteStream("output.bin");
src.pipe(dst);
try {
await finished(dst); // Wait for downstream to finish
} catch (err) {
src.destroy(err);
dst.destroy(err);
throw err;
}finished() is the Promise-based version of stream’s “finish”/“end” events. Use it when pipeline() doesn’t fit (e.g. you need to keep the source open for multiple sinks).
For multiplexing (one source → many sinks):
import { PassThrough } from "node:stream";
const source = createReadStream("data.bin");
const sink1 = createWriteStream("copy1.bin");
const sink2 = createWriteStream("copy2.bin");
const tee = new PassThrough();
source.pipe(tee);
tee.pipe(sink1);
tee.pipe(sink2);
await Promise.all([finished(sink1), finished(sink2)]);For more sinks, write a Tee stream that emits to N destinations.
Node Streams vs RxJS vs IxJS vs Async Iterators vs Web Streams API
Streams are not a single concept in JavaScript — they’re several overlapping ones. Picking the right abstraction matters because each has its own backpressure model and error semantics.
Node streams (node:stream). Push-based with backpressure feedback. Sources emit data; sinks tell sources to slow down via write()’s return value. Best for I/O (files, sockets, HTTP bodies, zlib). Backpressure is automatic with pipeline(). Error propagation needs the modern pipeline() to work cleanly. Strong for byte streams; awkward for object streams that need merge/zip/throttle.
Web Streams API (node:stream/web, also in browsers). Standardized, browser-compatible push-based streams with a similar backpressure model (ReadableStream, WritableStream, TransformStream). Better designed than legacy Node streams (cleaner cancellation, explicit controller.error()), but smaller ecosystem in Node and slightly higher overhead. Useful when sharing code between Node and the browser, or when consuming fetch() response bodies.
Async iterators (for await...of). Pull-based. The consumer drives the producer by calling .next(). Backpressure is implicit — the producer can’t produce faster than the consumer asks. Best for sequential processing where each step is async. Composable with simple async function* generators. Weakness: no built-in parallelism, no merge/zip operators, and writing to a stream still requires honoring backpressure.
RxJS (Observables). Push-based reactive streams. Rich operator library (map, filter, debounceTime, mergeMap, switchMap, etc.) for event-driven UI and dataflow programming. No native backpressure — RxJS observables are “hot” by default and producers won’t slow down unless you use specific operators like throttleTime or buffer. Best for UI events, WebSocket message handling, and complex async coordination. Not the right tool for high-throughput byte streaming.
IxJS (Iterables, the LINQ-like sibling of RxJS). Same operator vocabulary as RxJS but applied to (sync and async) iterables instead of observables. Pull-based. Best when you want RxJS-style declarative pipelines over collections without the push-model overhead.
Highland.js / mostjs. Legacy libraries that predate widespread async iterator support. Mostly historical.
Rough rule: byte I/O → Node streams. Async data pipeline → async generators. UI/event dataflow → RxJS. Cross-runtime byte streams → Web Streams. Don’t reach for RxJS just because you want operators — for await...of plus a few helper generators usually wins for backend code.
Still Not Working?
A few less-obvious failures:
stream emit error after end. A stream finished but an error fired afterward (cleanup error). Listen forerrorbefore destruction completes.Premature closeerrors. A downstream closed before upstream finished. Usually means a write to a closed file/socket. Usepipeline()to handle cleanup uniformly.Cannot read properties of null (reading 'read'). A stream was destroyed mid-pipeline. Check whether something is calling.destroy()externally.- High CPU usage in a pipeline. Transform is sync-CPU-heavy. Offload to a Worker thread or use
worker_threads+ a message-based pipeline pattern. - Different behavior between Node versions. Streams API has had subtle changes across 18/20/22. Pin Node version or test on the lowest supported.
- Compression slower than expected. zlib level defaults to 6 (balanced). For speed:
createGzip({ level: 1 }). For size:createGzip({ level: 9 }). Cannot pipe, not readable. Trying to pipe from a Writable. Direction matters — sources go on the left of pipeline, sinks on the right.- Pipeline returns immediately without processing. Empty source.
createReadStream("missing.txt")doesn’t throw; it emits anerrorevent during the pipeline. - Mixing
pipeline()withpipe()in the same chain. Streams partway down apipe()chain don’t seepipeline()’s abort signal, so cancellation leaks. Pick one model and stay in it. Readable.toWeb()produces a stream that doesn’t enqueue data. Symptom: the WebReadableStreamreader returnsdone: trueimmediately. Usually means the Node Readable was already inpausedmode (default) and you didn’t trigger a read. Touch.resume()before conversion or setobjectModeconsistently.- High-throughput Transform mutates upstream chunks. Buffers passed to
transform()are not yours to keep — Node may reuse them aftercallback()is called. AlwaysBuffer.from(chunk)if you need to retain a reference past the callback.
For related Node, async, and IO issues, see Node stream error, Node heap out of memory, Node uncaught exception, and RxJS not working.
Solo developer based in Japan. Every solution is cross-referenced with official documentation and tested before publishing.
Was this article helpful?
Related Articles
Fix: Node.js JavaScript Heap Out of Memory
How to fix Node.js 'JavaScript heap out of memory' — increasing heap size, finding memory leaks with heap snapshots, fixing common leak patterns, and stream-based processing for large data.
Fix: UnhandledPromiseRejectionWarning / UnhandledPromiseRejection
How to fix UnhandledPromiseRejectionWarning in Node.js and unhandled promise rejection errors in JavaScript caused by missing catch handlers, async/await mistakes, and event emitter errors.
Fix: Jest Timeout — Exceeded timeout of 5000ms for a test
How to fix Jest 'Exceeded timeout of 5000ms for a test' errors caused by unresolved promises, missing done callbacks, async/await mistakes, and slow database or network calls in tests.
Fix: pnpm Catalog Protocol Not Working — Cannot Find Catalog, Resolution Errors, and Lockfile Issues
Fix pnpm 9.5+ catalog protocol errors — Cannot find catalog default, ERR_PNPM_CATALOG_ENTRY_INVALID_SPEC, stale lockfile state, and tool incompatibility with catalog: references in monorepos.