Skip to content

Fix: Node.js Stream Error — Pipe Not Working, Backpressure, or Premature Close

FixDevs ·

Quick Answer

How to fix Node.js stream issues — pipe and pipeline errors, backpressure handling, Transform streams, async iteration, error propagation, and common stream anti-patterns.

The Problem

A Node.js stream pipe produces an error or stops unexpectedly:

const fs = require('fs');

const readable = fs.createReadStream('large-file.csv');
const writable = fs.createWriteStream('output.csv');

readable.pipe(writable);
// Error: write after end
// Or: the output file is empty
// Or: the process hangs without completing

Or a Transform stream silently drops data:

const { Transform } = require('stream');

const upper = new Transform({
  transform(chunk, encoding, callback) {
    this.push(chunk.toString().toUpperCase());
    // Missing: callback()  ← stream halts here
  },
});

Or an error in one piped stream crashes the process instead of being caught:

readable.pipe(transform).pipe(writable);
// If transform throws: UnhandledPromiseRejection or uncaught exception
// The error doesn't propagate to writable

Why This Happens

Node.js streams have behaviors that aren’t obvious until they break:

  • .pipe() doesn’t propagate errors — if a stream in a .pipe() chain emits an error, the destination stream is NOT automatically closed or destroyed. You must handle each stream’s error event individually.
  • Missing callback() in Transform — every call to transform() must call its callback when done, even if no data is pushed. Without it, the stream stalls permanently.
  • Backpressure ignored — when writable.write() returns false, the readable should pause until drain is emitted. Ignoring this overflows the writable’s buffer.
  • Async operations in streams — mixing async/await with the callback-based stream API requires care. An unhandled rejection in a transform doesn’t emit an error event.
  • Stream consumed twice — a readable stream can only be consumed once. Piping it to two destinations or reading it after it’s already been piped causes the second consumer to get nothing.

Fix 1: Use pipeline() Instead of pipe()

stream.pipeline() is the modern replacement for .pipe(). It handles error propagation and stream cleanup automatically:

const { pipeline } = require('stream');
const fs = require('fs');
const zlib = require('zlib');

// WRONG — pipe() doesn't propagate errors or clean up
fs.createReadStream('input.txt')
  .pipe(zlib.createGzip())
  .pipe(fs.createWriteStream('output.txt.gz'));
// If zlib fails: writable stays open, disk space leaks

// CORRECT — pipeline() cleans up all streams on error
pipeline(
  fs.createReadStream('input.txt'),
  zlib.createGzip(),
  fs.createWriteStream('output.txt.gz'),
  (err) => {
    if (err) {
      console.error('Pipeline failed:', err);
    } else {
      console.log('Pipeline succeeded');
    }
  }
);

// With Promise (Node.js 15+)
const { pipeline } = require('stream/promises');

async function compress() {
  await pipeline(
    fs.createReadStream('input.txt'),
    zlib.createGzip(),
    fs.createWriteStream('output.txt.gz'),
  );
  console.log('Done');
}

pipeline() vs pipe() comparison:

.pipe()pipeline()
Error propagationManual (each stream)Automatic
Stream cleanup on errorManualAutomatic
Callback/PromiseNoYes
Multiple transformsChained callsSingle call
RecommendedNo (legacy)Yes

Fix 2: Always Call callback() in Transform Streams

Every invocation of the transform() method must call its callback:

const { Transform } = require('stream');

// WRONG — missing callback() call stalls the stream
const brokenTransform = new Transform({
  transform(chunk, encoding, callback) {
    const processed = chunk.toString().toUpperCase();
    this.push(processed);
    // callback() not called → stream hangs
  },
});

// CORRECT — always call callback when done
const upperCaseTransform = new Transform({
  transform(chunk, encoding, callback) {
    const processed = chunk.toString().toUpperCase();
    this.push(processed);
    callback();  // Signal: ready for next chunk
  },
});

// CORRECT — callback takes optional error and data arguments
const parseJSONTransform = new Transform({
  objectMode: true,  // Work with objects, not Buffers
  transform(chunk, encoding, callback) {
    try {
      const obj = JSON.parse(chunk.toString());
      callback(null, obj);  // Pass transformed data via callback
    } catch (err) {
      callback(err);  // Pass error — stream emits 'error' event
    }
  },
});

Async transform function:

const asyncTransform = new Transform({
  transform(chunk, encoding, callback) {
    // Can't use async directly — wrap with a self-invoking async function
    (async () => {
      const result = await processChunk(chunk);
      this.push(result);
      callback();
    })().catch(callback);  // Pass async errors to callback
  },
});

Fix 3: Handle Backpressure

Backpressure prevents memory overflow when the writable can’t keep up with the readable:

const fs = require('fs');

const readable = fs.createReadStream('huge-file.bin');
const writable = fs.createWriteStream('destination.bin');

// WRONG — ignore backpressure signal, overwhelms writable buffer
readable.on('data', (chunk) => {
  writable.write(chunk);  // write() may return false — ignored here
});

// CORRECT — handle backpressure
readable.on('data', (chunk) => {
  const canContinue = writable.write(chunk);
  if (!canContinue) {
    readable.pause();  // Stop reading until writable drains
  }
});

writable.on('drain', () => {
  readable.resume();  // Writable buffer cleared, resume reading
});

readable.on('end', () => {
  writable.end();
});

readable.on('error', (err) => console.error('Read error:', err));
writable.on('error', (err) => console.error('Write error:', err));

Note: pipeline() handles backpressure automatically. Use it instead of manually wiring pause/resume.

Fix 4: Propagate Errors in .pipe() Chains

When you must use .pipe(), add error handlers to every stream in the chain:

const readable = fs.createReadStream('input.csv');
const transform = new MyTransform();
const writable = fs.createWriteStream('output.csv');

function destroyAll(err) {
  console.error('Stream error:', err);
  readable.destroy();
  transform.destroy();
  writable.destroy();
}

readable.on('error', destroyAll);
transform.on('error', destroyAll);
writable.on('error', destroyAll);

readable.pipe(transform).pipe(writable);

Or switch to pipeline() entirely — it’s the proper solution:

// This replaces all the manual error handling above
pipeline(readable, transform, writable, (err) => {
  if (err) console.error('Pipeline failed:', err);
});

Fix 5: Use Async Iteration for Simpler Stream Consumption

Node.js 12+ supports for await...of on readable streams, which is cleaner than event-based reading:

const fs = require('fs');
const readline = require('readline');

// Read a file line by line
async function processLines(filePath) {
  const fileStream = fs.createReadStream(filePath);
  const rl = readline.createInterface({ input: fileStream });

  for await (const line of rl) {
    if (line.startsWith('#')) continue;  // Skip comments
    await processLine(line);
  }
}

// Process a readable stream chunk by chunk
async function processStream(readable) {
  for await (const chunk of readable) {
    console.log('Chunk:', chunk.length, 'bytes');
  }
}

// With error handling
async function safeProcessStream(readable) {
  try {
    for await (const chunk of readable) {
      await handleChunk(chunk);
    }
  } catch (err) {
    console.error('Stream error during iteration:', err);
    readable.destroy();
  }
}

Collecting all stream data into a buffer:

async function streamToBuffer(readable) {
  const chunks = [];
  for await (const chunk of readable) {
    chunks.push(chunk);
  }
  return Buffer.concat(chunks);
}

async function streamToString(readable, encoding = 'utf8') {
  const buffer = await streamToBuffer(readable);
  return buffer.toString(encoding);
}

// Usage
const content = await streamToString(fs.createReadStream('file.txt'));

Fix 6: Create Custom Readable and Writable Streams

Common patterns for building streams correctly:

const { Readable, Writable, Transform } = require('stream');

// Custom Readable — push data on demand
class CounterStream extends Readable {
  constructor(max) {
    super({ objectMode: true });
    this.current = 0;
    this.max = max;
  }

  _read() {
    if (this.current <= this.max) {
      this.push(this.current++);
    } else {
      this.push(null);  // Signal end of stream
    }
  }
}

// Custom Writable — consume data
class CollectorStream extends Writable {
  constructor() {
    super({ objectMode: true });
    this.collected = [];
  }

  _write(chunk, encoding, callback) {
    this.collected.push(chunk);
    callback();  // Must call — signals ready for next chunk
  }

  _final(callback) {
    // Called when writable is ending — flush any remaining data
    console.log('Collected:', this.collected);
    callback();
  }
}

// Custom Transform
class DoubleTransform extends Transform {
  constructor() {
    super({ objectMode: true });
  }

  _transform(chunk, encoding, callback) {
    this.push(chunk * 2);
    callback();
  }
}

// Use them together
const { pipeline } = require('stream/promises');

await pipeline(
  new CounterStream(10),
  new DoubleTransform(),
  new CollectorStream(),
);

Fix 7: Read Streams Correctly from HTTP Responses

Streaming HTTP responses has specific patterns:

const https = require('https');
const fs = require('fs');
const { pipeline } = require('stream/promises');

// Download a file using streams — memory efficient for large files
async function downloadFile(url, destPath) {
  const response = await new Promise((resolve, reject) => {
    https.get(url, resolve).on('error', reject);
  });

  if (response.statusCode !== 200) {
    response.resume();  // Drain the response to free memory
    throw new Error(`Download failed: ${response.statusCode}`);
  }

  await pipeline(
    response,
    fs.createWriteStream(destPath),
  );
}

// With the 'got' or 'undici' library (simpler API)
import got from 'got';

const { pipeline } = require('stream/promises');

await pipeline(
  got.stream('https://example.com/large-file.zip'),
  fs.createWriteStream('file.zip'),
);

Still Not Working?

'finish' vs 'end' event'end' is emitted on Readable streams when all data is consumed. 'finish' is emitted on Writable streams when writable.end() is called and all data is flushed. Using the wrong event for the wrong stream type is a common source of timing bugs.

Stream is paused and never resumes — if you attach a 'data' event listener after attaching a 'readable' listener, the stream state can get confused. Use one API consistently: either event-based ('data'/'end') or the readable.read() API.

Streams in objectMode — by default, streams work with Buffer or string. Pass { objectMode: true } in the constructor options when working with JavaScript objects. Mixing objectMode and non-objectMode streams in a pipeline causes type errors.

write() after end() — calling writable.write() after writable.end() throws Error: write after end. This often happens in concurrent scenarios where multiple async operations try to write to the same stream. Use a queue or ensure writes are serialized.

For related Node.js issues, see Fix: Node.js Heap Out of Memory and Fix: Node.js Uncaught Exception.

F

FixDevs

Solo developer based in Japan. Every solution is cross-referenced with official documentation and tested before publishing.

Was this article helpful?

Related Articles