Node.jsは伝統的にシングルスレッドで、非同期I/Oにより並行処理を実現してきました。このモデルはI/Oバウンドのワークロードに優れていますが、CPU負荷の高い処理はイベントループをブロックし、アプリケーションの応答性を低下させます。Worker Threads(Node.js 12で安定化)は、別のV8アイソレートでJavaScriptを実行することで、単一プロセス内での真の並列実行を可能にします。本記事では、ワーカースレッドを本番環境で活用する実践的なパターンを解説します。
Workerのライフサイクルと通信
ワーカーを作成するには、独自のV8アイソレートで実行される別のJavaScriptファイルが必要です。各ワーカーは独立したヒープとイベントループを持ちます。
// main.js
const { Worker } = require("worker_threads");
const worker = new Worker("./worker.js", {
workerData: { input: largeDataset },
});
worker.on("message", (result) => {
logger.info({ result }, "Worker完了");
});
worker.on("error", (err) => {
logger.error({ err }, "Worker失敗");
});
worker.on("exit", (code) => {
if (code !== 0) logger.error({ exitCode: code }, "Workerクラッシュ");
});
// worker.js
const { parentPort, workerData } = require("worker_threads");
const result = processData(workerData.input);
parentPort.postMessage(result);
ワーカーは構造化クローンアルゴリズムを使用してメッセージを複製します。大きなバイナリデータにはTransferableオブジェクトを使用することで、コピーオーバーヘッドを回避できます。
SharedArrayBufferによる共有メモリ
メッセージコピーのコストが高すぎる高スループットシナリオでは、SharedArrayBufferがゼロコピーの共有メモリを提供します。Atomics操作でアクセスを調整し、競合状態を防止します。
// main.js
const sharedBuffer = new SharedArrayBuffer(4 * 1024 * 1024);
const sharedArray = new Int32Array(sharedBuffer);
const worker = new Worker("./worker.js");
worker.postMessage({ sharedBuffer });
Atomics.wait(sharedArray, 0, 0);
const result = sharedArray[1];
// worker.js
const { parentPort, workerData } = require("worker_threads");
const sharedArray = new Int32Array(workerData.sharedBuffer);
sharedArray[1] = computeResult();
Atomics.store(sharedArray, 0, 1);
Atomics.notify(sharedArray, 0);
| メカニズム | オーバーヘッド | ユースケース |
|---|---|---|
postMessage(構造化クローン) | 呼び出し毎に中程度 | ほとんどのタスク、複雑なオブジェクト |
| Transferableオブジェクト | 低い(ゼロコピー) | 大きなバッファ、バイナリデータ |
SharedArrayBuffer + Atomics | 最小 | 高頻度更新、ストリーミングデータ |
スレッドプールの実装
すべてのタスクに対して新しいWorkerインスタンスを作成すると起動コストがかかります。スレッドプールは再利用可能なワーカーセットを維持し、タスクを効率的に分散します。
class WorkerPool {
constructor(workerPath, numThreads = os.cpus().length) {
this.workers = [];
this.queue = [];
for (let i = 0; i < numThreads; i++) {
const worker = new Worker(workerPath);
worker.on("message", (result) => this._complete(worker, result));
worker.on("error", (err) => this._fail(worker, err));
this.workers.push({ worker, busy: false });
}
}
execute(task) {
return new Promise((resolve, reject) => {
const available = this.workers.find((w) => !w.busy);
if (available) {
available.busy = true;
available.worker.postMessage(task);
available.resolve = resolve;
available.reject = reject;
} else {
this.queue.push({ task, resolve, reject });
}
});
}
_complete(worker, result) {
worker.resolve(result);
this._next(worker);
}
_next(worker) {
if (this.queue.length > 0) {
const next = this.queue.shift();
worker.postMessage(next.task);
worker.resolve = next.resolve;
worker.reject = next.reject;
} else {
worker.busy = false;
}
}
}
プールサイズはCPUコア数に合わせて設定します。コア数より多いワーカーを作成するとコンテキストスイッチのオーバーヘッドが増加します。
ユースケース:画像処理とデータ変換
画像のリサイズ、フォーマット変換、フィルタリングなどの処理はCPUバウンドであり、メインスレッドで実行するとイベントループをブロックします。
// image-worker.js
const sharp = require("sharp");
const { parentPort, workerData } = require("worker_threads");
sharp(workerData.input)
.resize(800, 600)
.jpeg({ quality: 80 })
.toBuffer()
.then((output) => parentPort.postMessage(output));
ワーカースレッドは以下のようなCPU負荷の高いデータ処理にも適しています:
- 大規模ペイロードのJSONパースと検証
- CSV/Excelファイル処理
- zlibやbrotliによるデータ圧縮・解凍
- bcryptやargon2によるパスワードハッシュ化
- PDF生成とレンダリング
ベンチマークでは、CPU負荷の高い処理をワーカーにオフロードすることで、p99イベントループレイテンシが5〜10倍改善されることが示されています。
子プロセスおよびClusterとの比較
| 機能 | Worker Threads | 子プロセス | Cluster |
|---|---|---|---|
| メモリモデル | 共有(同一プロセス) | 別プロセス | 別プロセス |
| 起動時間 | ~5-10 ms | ~20-50 ms | ~20-50 ms |
| 通信 | 構造化クローン + 共有メモリ | シリアライズIPC | シリアライズIPC |
| 最適な用途 | CPUバウンド処理 | 分離、ネイティブアドオン | I/OバウンドHTTP |
ClusterモジュールはHTTPリクエスト処理用に複数のNode.jsプロセスをフォークします。ワーカースレッドは各クラスタワーカー内でCPUバウンド処理を担当することで、両者を補完できます。
if (cluster.isPrimary) {
for (let i = 0; i < numCPUs; i++) cluster.fork();
} else {
const pool = new WorkerPool("./cpu-worker.js");
app.get("/process", async (req, res) => {
const result = await pool.execute(req.query.data);
res.json(result);
});
}
モニタリングとデバッグ
ワーカースレッドのデバッグには特有のアプローチが必要です。ライフサイクルイベントを監視し、ワーカー内からメモリ使用量を追跡し、--inspect-brkを使用してChrome DevToolsでデバッグします。ワーカーの応答性を確認するヘルスチェックを実装し、クラッシュしたワーカーを自動的に再起動するポリシーを導入しましょう。
結論
Worker ThreadsはNode.jsに真の並列処理をもたらし、CPUバウンドのワークロードに対する重要なギャップを埋めます。適切に設計されたスレッドプール、Atomics同期を用いた共有メモリ、注意深いタスク選択を通じて、イベントループの応答性を維持しながらアプリケーションのスループットを劇的に向上させることができます。まずアプリケーション内のCPU負荷の高い処理を特定し、適切なエラーハンドリングを備えたスレッドプールを実装し、レイテンシ改善をベンチマークで検証してください。
