Node.jsストリームは、プラットフォームの最も強力でありながら過小評価されている機能のひとつです。ストリームを使用すると、データ全体をメモリにロードするのではなく、到着したデータを断片的に処理できます。そのため、大規模ファイル、ネットワーク通信、リアルタイムデータ変換に不可欠です。本ガイドでは、ストリームの基礎と堅牢なデータパイプラインを構築するための実践的パターンを解説します。
ストリームタイプの理解
Node.jsには4つの基本的なストリームタイプがあり、データ処理パイプラインでそれぞれ異なる役割を果たします。
| タイプ | 方向 | 目的 | 例 |
|---|---|---|---|
| Readable | ソース | データ生成 | fs.createReadStream、HTTPリクエスト |
| Writable | シンク | データ消費 | fs.createWriteStream、HTTPレスポンス |
| Transform | 両方 | データ変換 | zlib.createGzip、crypto.createCipher |
| Duplex | 両方 | 独立した読み書き | TCPソケット |
各ストリームはEventEmitterを継承し、特定の内部メソッドを実装します。
パイプパターン:ストリームの接続
pipeline()関数はNode.jsでストリームを接続する推奨方法です。古い.pipe()メソッドとは異なり、pipeline()はエラーを適切に伝搬し、チェーン内のストリームが失敗したときにリソースをクリーンアップします。
const { pipeline } = require("stream/promises");
const { createReadStream, createWriteStream } = require("fs");
const { createGzip } = require("zlib");
async function compressFile(source, destination) {
try {
await pipeline(
createReadStream(source),
createGzip(),
createWriteStream(destination)
);
console.log("圧縮完了");
} catch (err) {
console.error("パイプライン失敗:", err);
}
}
pipeline()では、読み取りストリームが失敗した場合、すべての下流ストリームが自動的に破棄され、エラーが伝搬されます。.pipe()にはこの動作がなく、中間ストリームのエラーが検出されずにリソースがリークする可能性があります。
バックプレッシャー:フロー制御
バックプレッシャーは、ReadableストリームがWritableストリームの消費速度より速くデータを生成するときに発生します。Node.jsは内部バッファリングとhighWaterMarkしきい値を通じてこれを自動的に処理します。
const { createReadStream, createWriteStream } = require("fs");
const readable = createReadStream("large-file.txt", { highWaterMark: 16 * 1024 });
const writable = createWriteStream("output.txt", { highWaterMark: 16 * 1024 });
writable.on("drain", () => {
console.log("バッファ解放、読み取り再開");
readable.resume();
});
readable.on("data", (chunk) => {
const canContinue = writable.write(chunk);
if (!canContinue) {
console.log("バックプレッシャー検出、一時停止");
readable.pause();
}
});
drainイベントは、Writableストリームがより多くのデータを受け入れられる状態になったことを通知します。バックプレッシャーを無視すると、メモリが際限なく消費され、高負荷時にプロセスがクラッシュする可能性があります。pipeline()関数はこの調整を自動的に行います。
オブジェクトモードストリーム
デフォルトではストリームはBufferと文字列を操作します。オブジェクトモードストリームはJavaScriptオブジェクトを扱うため、JSON変換やCSVパースパイプラインに有用です。
const { Transform } = require("stream");
const jsonParser = new Transform({
readableObjectMode: true,
writableObjectMode: true,
transform(chunk, encoding, callback) {
const transformed = {
...chunk,
processedAt: new Date().toISOString(),
};
callback(null, transformed);
},
});
オブジェクトモードは、オブジェクトのアロケーションとガベージコレクションによりバイナリモードよりパフォーマンスオーバーヘッドがあります。構造化データアクセスが必要な場合のみ使用してください。
Async Iterationとストリーム
モダンなNode.jsは、Readableストリームでfor await...ofをサポートしており、イベントベースの消費に代わるよりクリーンな方法を提供します。
const { createReadStream } = require("fs");
const { createInterface } = require("readline");
async function processCSV(filePath) {
const rl = createInterface({
input: createReadStream(filePath),
crlfDelay: Infinity,
});
for await (const line of rl) {
const row = line.split(",");
await processRow(row);
}
}
このパターンは、async/await制御フローとストリーム処理を統合する必要がある場合に特に有用です。
ファイル処理パイプライン
ストリームは、使用可能なメモリを超える大規模ファイルの処理に優れています。
const { createReadStream, createWriteStream } = require("fs");
const { createGunzip } = require("zlib");
const { pipeline } = require("stream/promises");
async function decompressLogs() {
await pipeline(
createReadStream("server.log.gz"),
createGunzip(),
createWriteStream("server.log")
);
}
同じパターンは暗号化(crypto.createDecipher)、画像処理(sharp)、ログ解析(readline + createInterface)にも適用できます。ストリームベースの処理はファイルサイズに関係なくメモリ使用量が一定であるのに対し、バッファ方式では入力に比例したO(n)メモリが必要です。
エラーハンドリングとカスタムストリーム
エラーハンドリングは.pipe()とpipeline()で大きく異なります。本番コードでは常にpipeline()を使用してください。カスタムストリームでは_transform()をオーバーライドし、コールバックにErrorを渡すことでエラーを通知します。
class ValidateTransform extends Transform {
_transform(chunk, encoding, callback) {
if (!this._isValid(chunk)) {
callback(new Error("不正なデータを受信"));
return;
}
callback(null, this._process(chunk));
}
}
finished()ユーティリティは、ストリームが正常終了またはエラー終了したことを検出します。
パフォーマンス最適化
highWaterMarkの調整はスループットに大きな影響を与えます。値が大きいと読み取り操作の回数が減りますが、メモリ使用量が増加します。ファイル転送では64KB〜1MBが一般的です。cork()とuncork()を使用して書き込みをバッチ化すると、システムコールのオーバーヘッドを削減できます。
writable.cork();
for (const chunk of chunks) {
writable.write(chunk);
}
process.nextTick(() => writable.uncork());
結論
Node.jsストリームは、メモリ効率が高く、構成可能なデータ処理アプローチを提供します。4つのストリームタイプを理解し、エラー安全な接続にpipeline()を使用し、バックプレッシャーを尊重することで、あらゆるサイズのファイルを処理する堅牢なデータパイプラインを構築できます。まずバッファ中心の処理をストリームベースに移行し、アプリケーションのメモリ削減効果を測定してみてください。
