Featured image of post Node.js Streams: Practical Guide for Data Processing

Node.js Streams: Practical Guide for Data Processing

Practical guide to Node.js streams for efficient data processing. Covers readable, writable, transform streams, backpressure, piping, object mode, and memory optimization.

Node.js streams are one of the most powerful yet underutilized features of the platform. They enable processing data piece by piece as it arrives, rather than loading entire datasets into memory. This makes them essential for working with large files, network communication, and real-time data transformation. This guide covers stream fundamentals and practical patterns for building robust data pipelines.

Understanding Stream Types

Node.js provides four fundamental stream types, each serving a distinct role in data processing pipelines.

TypeDirectionPurposeExample
ReadableSourceProduce datafs.createReadStream, HTTP request
WritableSinkConsume datafs.createWriteStream, HTTP response
TransformBothModify datazlib.createGzip, crypto.createCipher
DuplexBothIndependent read/writeTCP socket

Each stream extends EventEmitter and implements specific internal methods. Readable streams use _read() to fetch data, writable streams use _write() to accept data, and transform streams use _transform() to process each chunk.


The Pipeline Pattern

The pipeline() function is the recommended way to connect streams in Node.js. Unlike the older .pipe() method, pipeline() properly propagates errors and cleans up resources when a stream in the chain fails.

const { pipeline } = require("stream/promises");
const { createReadStream, createWriteStream } = require("fs");
const { createGzip } = require("zlib");

async function compressFile(source, destination) {
  try {
    await pipeline(
      createReadStream(source),
      createGzip(),
      createWriteStream(destination)
    );
    console.log("Compression complete");
  } catch (err) {
    console.error("Pipeline failed:", err);
  }
}

With pipeline(), if the read stream fails, all downstream streams are automatically destroyed and the error is propagated. The .pipe() method lacks this behavior — errors from intermediate streams can go unnoticed, leaving dangling resources.


Backpressure: Flow Control

Backpressure occurs when a readable stream produces data faster than a writable stream can consume. Node.js handles this automatically through internal buffering and the highWaterMark threshold. When the writable’s internal buffer exceeds highWaterMark, readable.pipe() pauses the source stream until the buffer drains.

const { createReadStream, createWriteStream } = require("fs");

const readable = createReadStream("large-file.txt", { highWaterMark: 16 * 1024 });
const writable = createWriteStream("output.txt", { highWaterMark: 16 * 1024 });

writable.on("drain", () => {
  console.log("Buffer drained, resuming reading");
});

readable.on("data", (chunk) => {
  const canContinue = writable.write(chunk);
  if (!canContinue) {
    console.log("Backpressure detected, pausing");
    readable.pause();
  }
});

The drain event signals that the writable stream is ready for more data. Ignoring backpressure leads to unbounded memory growth and eventual process crashes under high load. The pipeline() function handles this coordination automatically.


Object Mode Streams

By default, streams operate on Buffers and strings. Object mode streams work with JavaScript objects, which is useful for JSON transformation and CSV parsing pipelines.

const { Transform } = require("stream");

const jsonParser = new Transform({
  readableObjectMode: true,
  writableObjectMode: true,
  transform(chunk, encoding, callback) {
    // chunk is a JavaScript object
    const transformed = {
      ...chunk,
      processedAt: new Date().toISOString(),
    };
    callback(null, transformed);
  },
});

// Usage: read NDJSON, transform objects, write NDJSON
await pipeline(
  createReadStream("input.ndjson"),
  new LineSplitter(),          // custom: NDJSON lines → objects
  jsonParser,                   // transform objects
  new ObjectStringifier(),      // custom: objects → NDJSON lines
  createWriteStream("output.ndjson")
);

Object mode has performance overhead compared to binary mode because of object allocation and garbage collection. Use it only when your processing logic requires structured data access.


Async Iteration with Streams

Modern Node.js supports for await...of on readable streams, offering a cleaner alternative to event-based consumption:

const { createReadStream } = require("fs");
const { createInterface } = require("readline");

async function processCSV(filePath) {
  const rl = createInterface({
    input: createReadStream(filePath),
    crlfDelay: Infinity,
  });

  for await (const line of rl) {
    const row = line.split(",");
    // Process each row without loading the entire file
    await processRow(row);
  }
}

This pattern is particularly useful when you need to integrate stream processing with async/await control flow, such as database inserts or HTTP requests within the loop.


File Processing Pipelines

Streams excel at processing large files that would exceed available memory. A gzip decompression pipeline demonstrates the pattern:

const { createReadStream, createWriteStream } = require("fs");
const { createGunzip } = require("zlib");
const { pipeline } = require("stream/promises");

async function decompressLogs() {
  await pipeline(
    createReadStream("server.log.gz"),
    createGunzip(),
    createWriteStream("server.log")
  );
}

The same pattern applies to encryption (using crypto.createDecipher), image processing (chaining sharp transforms), and log parsing (using readline with createInterface). Stream-based processing keeps memory usage constant regardless of file size, whereas buffered approaches require O(n) memory proportional to the input.


Error Handling and Custom Streams

Error handling differs significantly between .pipe() and pipeline(). Always use pipeline() for production code. For custom streams, override _transform() and signal errors by passing an Error to the callback:

class ValidateTransform extends Transform {
  _transform(chunk, encoding, callback) {
    if (!this._isValid(chunk)) {
      callback(new Error("Invalid data received"));
      return;
    }
    callback(null, this._process(chunk));
  }
}

The finished() utility detects when a stream has ended, whether successfully or due to an error:

const { finished } = require("stream/promises");

try {
  await finished(readableStream);
  console.log("Stream completed");
} catch (err) {
  console.error("Stream failed:", err);
}

Performance Optimization

Tuning highWaterMark can significantly impact throughput. A larger value reduces the number of read operations but increases memory usage. For high-throughput file transfers, values between 64 KB and 1 MB are common. Use cork() and uncork() to batch writes to a writable stream, reducing system call overhead:

writable.cork();
for (const chunk of chunks) {
  writable.write(chunk);
}
process.nextTick(() => writable.uncork());

Conclusion

Node.js streams provide a memory-efficient, composable approach to data processing. By understanding the four stream types, using pipeline() for error-safe connections, and respecting backpressure, you can build robust data pipelines that handle files of any size. Start by migrating buffer-heavy operations to stream-based alternatives and measure the memory savings in your application.