Featured image of post Node.js Worker Threads:実践的な並列処理 Featured image of post Node.js Worker Threads:実践的な並列処理

Node.js Worker Threads:実践的な並列処理

Node.jsのWorker Threadsを使った並列処理の実践ガイド。スレッド通信、共有メモリ、スレッドプール、画像処理やデータ変換のユースケースを解説。

Node.jsは伝統的にシングルスレッドで、非同期I/Oにより並行処理を実現してきました。このモデルはI/Oバウンドのワークロードに優れていますが、CPU負荷の高い処理はイベントループをブロックし、アプリケーションの応答性を低下させます。Worker Threads(Node.js 12で安定化)は、別のV8アイソレートでJavaScriptを実行することで、単一プロセス内での真の並列実行を可能にします。本記事では、ワーカースレッドを本番環境で活用する実践的なパターンを解説します。

Workerのライフサイクルと通信

ワーカーを作成するには、独自のV8アイソレートで実行される別のJavaScriptファイルが必要です。各ワーカーは独立したヒープとイベントループを持ちます。

// main.js
const { Worker } = require("worker_threads");

const worker = new Worker("./worker.js", {
  workerData: { input: largeDataset },
});

worker.on("message", (result) => {
  logger.info({ result }, "Worker完了");
});

worker.on("error", (err) => {
  logger.error({ err }, "Worker失敗");
});

worker.on("exit", (code) => {
  if (code !== 0) logger.error({ exitCode: code }, "Workerクラッシュ");
});
// worker.js
const { parentPort, workerData } = require("worker_threads");
const result = processData(workerData.input);
parentPort.postMessage(result);

ワーカーは構造化クローンアルゴリズムを使用してメッセージを複製します。大きなバイナリデータにはTransferableオブジェクトを使用することで、コピーオーバーヘッドを回避できます。


SharedArrayBufferによる共有メモリ

メッセージコピーのコストが高すぎる高スループットシナリオでは、SharedArrayBufferがゼロコピーの共有メモリを提供します。Atomics操作でアクセスを調整し、競合状態を防止します。

// main.js
const sharedBuffer = new SharedArrayBuffer(4 * 1024 * 1024);
const sharedArray = new Int32Array(sharedBuffer);
const worker = new Worker("./worker.js");
worker.postMessage({ sharedBuffer });

Atomics.wait(sharedArray, 0, 0);
const result = sharedArray[1];
// worker.js
const { parentPort, workerData } = require("worker_threads");
const sharedArray = new Int32Array(workerData.sharedBuffer);
sharedArray[1] = computeResult();
Atomics.store(sharedArray, 0, 1);
Atomics.notify(sharedArray, 0);
メカニズムオーバーヘッドユースケース
postMessage(構造化クローン)呼び出し毎に中程度ほとんどのタスク、複雑なオブジェクト
Transferableオブジェクト低い(ゼロコピー)大きなバッファ、バイナリデータ
SharedArrayBuffer + Atomics最小高頻度更新、ストリーミングデータ

スレッドプールの実装

すべてのタスクに対して新しいWorkerインスタンスを作成すると起動コストがかかります。スレッドプールは再利用可能なワーカーセットを維持し、タスクを効率的に分散します。

class WorkerPool {
  constructor(workerPath, numThreads = os.cpus().length) {
    this.workers = [];
    this.queue = [];
    for (let i = 0; i < numThreads; i++) {
      const worker = new Worker(workerPath);
      worker.on("message", (result) => this._complete(worker, result));
      worker.on("error", (err) => this._fail(worker, err));
      this.workers.push({ worker, busy: false });
    }
  }

  execute(task) {
    return new Promise((resolve, reject) => {
      const available = this.workers.find((w) => !w.busy);
      if (available) {
        available.busy = true;
        available.worker.postMessage(task);
        available.resolve = resolve;
        available.reject = reject;
      } else {
        this.queue.push({ task, resolve, reject });
      }
    });
  }

  _complete(worker, result) {
    worker.resolve(result);
    this._next(worker);
  }

  _next(worker) {
    if (this.queue.length > 0) {
      const next = this.queue.shift();
      worker.postMessage(next.task);
      worker.resolve = next.resolve;
      worker.reject = next.reject;
    } else {
      worker.busy = false;
    }
  }
}

プールサイズはCPUコア数に合わせて設定します。コア数より多いワーカーを作成するとコンテキストスイッチのオーバーヘッドが増加します。


ユースケース:画像処理とデータ変換

画像のリサイズ、フォーマット変換、フィルタリングなどの処理はCPUバウンドであり、メインスレッドで実行するとイベントループをブロックします。

// image-worker.js
const sharp = require("sharp");
const { parentPort, workerData } = require("worker_threads");

sharp(workerData.input)
  .resize(800, 600)
  .jpeg({ quality: 80 })
  .toBuffer()
  .then((output) => parentPort.postMessage(output));

ワーカースレッドは以下のようなCPU負荷の高いデータ処理にも適しています:

  • 大規模ペイロードのJSONパースと検証
  • CSV/Excelファイル処理
  • zlibやbrotliによるデータ圧縮・解凍
  • bcryptやargon2によるパスワードハッシュ化
  • PDF生成とレンダリング

ベンチマークでは、CPU負荷の高い処理をワーカーにオフロードすることで、p99イベントループレイテンシが5〜10倍改善されることが示されています。


子プロセスおよびClusterとの比較

機能Worker Threads子プロセスCluster
メモリモデル共有(同一プロセス)別プロセス別プロセス
起動時間~5-10 ms~20-50 ms~20-50 ms
通信構造化クローン + 共有メモリシリアライズIPCシリアライズIPC
最適な用途CPUバウンド処理分離、ネイティブアドオンI/OバウンドHTTP

ClusterモジュールはHTTPリクエスト処理用に複数のNode.jsプロセスをフォークします。ワーカースレッドは各クラスタワーカー内でCPUバウンド処理を担当することで、両者を補完できます。

if (cluster.isPrimary) {
  for (let i = 0; i < numCPUs; i++) cluster.fork();
} else {
  const pool = new WorkerPool("./cpu-worker.js");
  app.get("/process", async (req, res) => {
    const result = await pool.execute(req.query.data);
    res.json(result);
  });
}

モニタリングとデバッグ

ワーカースレッドのデバッグには特有のアプローチが必要です。ライフサイクルイベントを監視し、ワーカー内からメモリ使用量を追跡し、--inspect-brkを使用してChrome DevToolsでデバッグします。ワーカーの応答性を確認するヘルスチェックを実装し、クラッシュしたワーカーを自動的に再起動するポリシーを導入しましょう。


結論

Worker ThreadsはNode.jsに真の並列処理をもたらし、CPUバウンドのワークロードに対する重要なギャップを埋めます。適切に設計されたスレッドプール、Atomics同期を用いた共有メモリ、注意深いタスク選択を通じて、イベントループの応答性を維持しながらアプリケーションのスループットを劇的に向上させることができます。まずアプリケーション内のCPU負荷の高い処理を特定し、適切なエラーハンドリングを備えたスレッドプールを実装し、レイテンシ改善をベンチマークで検証してください。