Node.jsでマルチスレッドを使う方法
筆者は、Write for Donations プログラムの一環として、Open Sourcing Mental Illness に寄付をすることを選びました。
はじめに
Node.jsはJavaScriptコードを単一のスレッドで実行するため、あなたのコードは一度に1つのタスクしか処理できません。ただし、Node.js自体はマルチスレッドであり、I/O操作(ディスクからのファイル読み込みやネットワークリクエストなど)を処理するlibuvライブラリを通じて隠れたスレッドを提供します。隠れたスレッドの利用により、Node.jsは非同期メソッドを提供し、メインスレッドをブロックすることなくI/Oリクエストを行うことができます。
Node.jsには隠れたスレッドが存在しますが、複雑な計算、画像のリサイズ、またはビデオの圧縮などのCPU負荷の高いタスクをオフロードすることはできません。JavaScriptはシングルスレッドであるため、CPU負荷の高いタスクが実行されると、メインスレッドがブロックされ、タスクが完了するまで他のコードが実行されません。他のスレッドを使わずにCPUによって制約されたタスクの処理速度を上げる唯一の方法は、プロセッサの速度を上げることです。
しかし、最近の数年間、CPUは速くなっていません。代わりに、コンピュータは余分なコアを搭載しており、8コア以上のコンピュータが一般的になっています。しかし、この傾向にも関わらず、JavaScriptはシングルスレッドであるため、CPUに負荷のかかるタスクを高速化したり、メインスレッドを壊さないようにするために、コンピュータの余分なコアを活用することはできません。
この問題を解決するために、Node.jsではworker-threadsモジュールが導入されました。これにより、スレッドを作成し、複数のJavaScriptタスクを並列で実行することができます。スレッドがタスクを終了したら、主スレッドに結果のメッセージを送信し、コードの他の部分で使用できるようにします。workerスレッドを使用する利点は、CPUに負荷がかかるタスクが主スレッドをブロックせず、タスクを複数のワーカーに分割して最適化することができる点です。
このチュートリアルでは、メインスレッドをブロックするCPU集中タスクを持つNode.jsアプリを作成します。次に、worker-threadsモジュールを使用して、メインスレッドをブロックせずにCPU集中タスクを別のスレッドにオフロードします。最後に、CPUに束縛されたタスクを分割し、4つのスレッドで並行して作業させることでタスクの処理速度を向上させます。
要件
このチュートリアルを完了するためには、以下が必要です:
- A multi-core system with four or more cores. You can still follow the tutorial from Steps 1 through 6 on a dual-core system. However, Step 7 requires four cores to see the performance improvements.
- A Node.js development environment. If you’re on Ubuntu 22.04, install the recent version of Node.js by following step 3 of How To Install Node.js on Ubuntu 22.04. If you’re on another operating system, see How to Install Node.js and Create a Local Development Environment.
- A good understanding of the event loop, callbacks, and promises in JavaScript, which you can find in our tutorial, Understanding the Event Loop, Callbacks, Promises, and Async/Await in JavaScript.
- Basic knowledge of how to use the Express web framework. Check out our guide, How To Get Started with Node.js and Express.
プロジェクトのセットアップと依存関係のインストールを行う。
このステップでは、プロジェクトディレクトリを作成し、npmを初期化し、必要な依存関係をすべてインストールします。
最初に、プロジェクトディレクトリを作成して移動してください。
- mkdir multi-threading_demo
- cd multi-threading_demo
mkdirコマンドはディレクトリを作成し、cdコマンドは新しく作成したディレクトリに作業ディレクトリを変更します。
その後、npmのnpm initコマンドを使用してプロジェクトディレクトリを初期化します。
- npm init -y
-yオプションはすべてのデフォルトオプションを受け付けます。
コマンドが実行されると、出力は次のようになります。
Wrote to /home/sammy/multi-threading_demo/package.json:
{
"name": "multi-threading_demo",
"version": "1.0.0",
"description": "",
"main": "index.js",
"scripts": {
"test": "echo \"Error: no test specified\" && exit 1"
},
"keywords": [],
"author": "",
"license": "ISC"
}
次に、Node.jsのウェブフレームワークであるexpressをインストールしてください。
- npm install express
Expressを使用して、ブロッキングとノンブロッキングのエンドポイントを持つサーバーアプリケーションを作成します。
Node.jsには、デフォルトでworker-threadsモジュールが含まれているため、インストールする必要はありません。
あなたは今、必要なパッケージをインストールしました。次に、プロセスやスレッドについて詳しく学び、コンピュータ上での実行方法についても学びます。
プロセスとスレッドの理解
CPU バウンドタスクを書き始めてそれを別のスレッドにオフロードする前に、まず、プロセスとスレッドの定義やそれらの違いを理解する必要があります。さらに重要なことに、プロセスとスレッドが単一またはマルチコアのコンピュータシステム上でどのように実行されるのか、再確認します。
プロセス
プロセスはオペレーティングシステム内で実行されるプログラムです。それは独自のメモリを持ち、他の実行中のプログラムのメモリを見たりアクセスしたりすることはできません。また、プログラム内で現在実行されている命令を示す命令ポインタも持っています。一度に実行されるタスクは1つだけです。
これを理解するためには、Node.jsのプログラムを作成し、実行時に終了しないように無限ループを含めます。
「nano」やお好みのテキストエディタを使用して、「process.js」ファイルを作成し、開きます。
- nano process.js
「process.js」ファイルに、以下のコードを入力してください。
const process_name = process.argv.slice(2)[0];
count = 0;
while (true) {
count++;
if (count == 2000 || count == 4000) {
console.log(`${process_name}: ${count}`);
}
}
最初の行では、process.argvプロパティがプログラムのコマンドライン引数を含む配列を返します。それから、JavaScriptのslice()メソッドを引数2で結合して、インデックス2以降の配列の浅いコピーを作成します。これにより、Node.jsのパスとプログラムのファイル名という最初の2つの引数をスキップします。次に、括弧の記法を使ってスライスされた配列から最初の引数を取得し、process_name変数に格納します。
その後、whileループを定義し、ループが永遠に続くようにtrueの条件を渡します。ループ内では、各反復ごとにcount変数が1増加されます。その後、countが2000または4000と等しいかどうかを確認するif文が続きます。条件が真の場合、console.log()メソッドがターミナルにメッセージを出力します。
CTRL+Xを使ってファイルを保存して閉じ、変更を保存するためにYを押してください。
ノードコマンドを使ってプログラムを実行してください。
- node process.js A &
Aはプログラムに渡され、process_name変数に格納されるコマンドライン引数です。&は末尾にあり、Nodeプログラムをバックグラウンドで実行することができ、シェルにさらなるコマンドを入力することができます。
プログラムを実行すると、以下のような出力が表示されます。
[1] 7754 A: 2000 A: 4000
数値7754は、オペレーティングシステムがそれに割り当てたプロセスIDです。A:2000とA:4000は、そのプログラムの出力です。
nodeコマンドを使用してプログラムを実行すると、プロセスが作成されます。オペレーティングシステムはプログラムのためにメモリを割り当て、コンピュータのディスク上にプログラムの実行ファイルを見つけ、プログラムをメモリに読み込みます。その後、プロセスにプロセスIDが割り当てられ、プログラムの実行が開始されます。この時点で、プログラムはプロセスとなります。
プロセスが実行されていると、そのプロセスIDがオペレーティングシステムのプロセスリストに追加され、htop、top、またはpsなどのツールで確認することができます。これらのツールは、プロセスに関する詳細情報を提供し、停止または優先順位をつけるためのオプションも提供します。
ノードプロセスの簡単なサマリーを取得するには、端末でENTERキーを押してプロンプトを戻します。次に、psコマンドを実行してノードプロセスを表示します。
- ps |grep node
現在のユーザーに関連するすべてのプロセスを表示するためのpsコマンドを使用します。パイプ演算子|はpsの出力をgrepフィルターに渡し、Nodeプロセスのみを表示します。
コマンドを実行すると、以下のような出力が得られます。
7754 pts/0 00:21:49 node
一つのプログラムから無数のプロセスを作成することができます。たとえば、次のコマンドを使用して、異なる引数を持つさらに3つのプロセスを作成し、バックグラウンドに配置できます。
- node process.js B & node process.js C & node process.js D &
コマンドで、process.js プログラムのインスタンスをさらに3つ作成しました。& シンボルは各プロセスをバックグラウンドにします。
コマンドを実行すると、出力は以下のように似ています(順序が異なる場合もあります)。
[2] 7821 [3] 7822 [4] 7823 D: 2000 D: 4000 B: 2000 B: 4000 C: 2000 C: 4000
出力ではわかる通り、各プロセスはカウントが2000と4000に達した時にプロセス名をターミナルにログとして出力しました。各プロセスは他のプロセスが実行されていることには気付いていません。プロセスDはプロセスCのことを知らず、その逆も同様です。どちらのプロセスで何が起ころうとも、他のNode.jsプロセスには影響しません。
もしあなたが出力を注意深く調べると、出力の順序が3つのプロセスを作成した時の順序とは異なっていることがわかります。コマンドを実行するとき、プロセスの引数は B、C、D の順でした。しかし、現在の出力は D、B、C の順です。その理由は、OSがCPU上でどのプロセスを実行するかを決定するスケジューリングアルゴリズムを持っているからです。
シングルコアのマシンでは、プロセスは同時に実行されます。つまり、オペレーティングシステムは一定の間隔でプロセス間を切り替えます。例えば、プロセスDが一定時間実行され、その状態がどこかに保存された後、OSはプロセスBを一定時間実行するようスケジュールします。このような切り替えが繰り返され、全てのタスクが完了するまで行われます。出力から見ると、各プロセスは完全に実行されたように見えますが、実際にはOSスケジューラが常に切り替えを行っています。
マルチコアシステムでは、4つのコアがあると仮定して、OSはそれぞれのプロセスを同時に各コアで実行するようにスケジュールします。これが並列処理として知られています。ただし、もしプロセスをさらに4つ作成すると(合計8つになる場合)、各コアは2つのプロセスを同時に実行し、それらが終了するまで並行して処理を行います。
糸
スレッドはプロセスと同様で、独自の命令ポインタを持ち、一度に1つのJavaScriptタスクを実行することができます。プロセスと異なり、スレッドは独自のメモリを持ちません。代わりに、プロセスのメモリ内に存在します。プロセスを作成すると、worker_threadsモジュールを使用してJavaScriptコードを並列に実行する複数のスレッドを作成することができます。さらに、スレッドはメッセージのやり取りやプロセスのメモリ内でのデータ共有を通じてお互いに通信することができます。これにより、スレッドはプロセスと比較して軽量化されます。なぜなら、スレッドの生成はオペレーティングシステムからメモリを要求しないからです。
スレッドの実行に関しては、プロセスの実行と似たような挙動を示します。もし単一のコアシステム上で複数のスレッドが実行されている場合、オペレーティングシステムは定期的な間隔でスレッドを切り替えて、各スレッドにCPUで直接実行する機会を与えます。マルチコアシステムでは、オペレーティングシステムはスレッドを全てのコアにスケジュールし、JavaScriptコードを同時に実行します。もし利用可能なコア以上のスレッドを作成してしまった場合、各コアは複数のスレッドを同時に実行します。
それでは、ENTERキーを押して、killコマンドで現在実行中のすべてのNodeプロセスを停止してください。
- sudo kill -9 `pgrep node`
pgrepは、killコマンドに対して4つのノードプロセスのプロセスIDを返します。-9オプションは、killがSIGKILLシグナルを送信するように指示します。
コマンドを実行すると、次のような出力が表示されます。
[1] Killed node process.js A [2] Killed node process.js B [3] Killed node process.js C [4] Killed node process.js D
時々、出力は遅れて表示され、後で別のコマンドを実行した時に現れることがあります。
次のセクションでは、プロセスとスレッドの違いを理解したので、Node.jsの隠れたスレッドを扱います。
Node.jsにおける隠れたスレッドの理解
Node.jsは追加のスレッドを提供するため、マルチスレッドと見なされています。このセクションでは、Node.jsの隠れたスレッドについて詳しく調べ、I/O操作を非同期とするのに役立ちます。
導入部で述べた通り、JavaScriptはシングルスレッドであり、全てのJavaScriptコードは単一のスレッドで実行されます。これには、プログラムのソースコードやプログラムに含まれるサードパーティのライブラリも含まれます。プログラムがファイルの読み込みやネットワークリクエストなどのI/O操作を行う場合、これはメインスレッドをブロックします。
しかしながら、Node.jsはlibuvライブラリを実装しており、これによってNode.jsプロセスには4つの追加スレッドが提供されます。これらのスレッドを使用することで、I/O操作は別々に処理され、完了するとイベントループはI/Oタスクに関連付けられたコールバックをマイクロタスクキューに追加します。メインスレッドの呼び出しスタックがクリアされると、コールバックが呼び出しスタックに追加され、それから実行されます。これを明確にするために、指定されたI/Oタスクに関連付けられたコールバックは並列で実行されません。ただし、ファイルの読み取りやネットワークリクエストなどのタスク自体はスレッドの助けを借りて並行して行われます。I/Oタスクが終了すると、コールバックはメインスレッドで実行されます。
これらの4つのスレッドに加えて、V8エンジンは、自動ガベージコレクションなどの処理を担当するための2つのスレッドも提供します。これにより、プロセス内の総スレッド数は7つになります:1つはメインスレッド、4つはNode.jsスレッド、そして2つはV8スレッドです。
すべてのNode.jsプロセスが7つのスレッドを持っていることを確認するために、process.jsファイルを再度実行してバックグラウンドで実行してください。
- node process.js A &
ターミナルは、プロセスIDやプログラムからの出力を記録します。
[1] 9933 A: 2000 A: 4000
プロセスIDをどこかにメモして、ENTERキーを押してからもう一度プロンプトを使用できるようにしてください。
スレッドを見るには、topコマンドを実行し、出力に表示されるプロセスIDを渡してください。
- top -H -p 9933
-Hは、topにスレッドを表示するよう指示します。-pフラグは、topに指定したプロセスIDの活動のみを監視するよう指示します。
コマンドを実行すると、出力は以下のように見えるでしょう。
top – 09:21:11 up 15:00, 1 user, load average: 0.99, 0.60, 0.26 Threads: 7 total, 1 running, 6 sleeping, 0 stopped, 0 zombie %Cpu(s): 24.8 us, 0.3 sy, 0.0 ni, 75.0 id, 0.0 wa, 0.0 hi, 0.0 si, 0.0 st MiB Mem : 7951.2 total, 6756.1 free, 248.4 used, 946.7 buff/cache MiB Swap: 0.0 total, 0.0 free, 0.0 used. 7457.4 avail Mem PID USER PR NI VIRT RES SHR S %CPU %MEM TIME+ COMMAND 9933 node-us+ 20 0 597936 51864 33956 R 99.9 0.6 4:19.64 node 9934 node-us+ 20 0 597936 51864 33956 S 0.0 0.6 0:00.00 node 9935 node-us+ 20 0 597936 51864 33956 S 0.0 0.6 0:00.84 node 9936 node-us+ 20 0 597936 51864 33956 S 0.0 0.6 0:00.83 node 9937 node-us+ 20 0 597936 51864 33956 S 0.0 0.6 0:00.93 node 9938 node-us+ 20 0 597936 51864 33956 S 0.0 0.6 0:00.83 node 9939 node-us+ 20 0 597936 51864 33956 S 0.0 0.6 0:00.00 node
出力を見ると分かる通り、Node.jsプロセスは合計7つのスレッドを持っています。JavaScriptを実行するためのメインスレッドが1つあり、Node.jsスレッドが4つ、そしてV8スレッドが2つあります。
前に議論されたように、I/O操作を非同期化するためには、4つのNode.jsスレッドが使用されます。これらのスレッドはそのタスクに対して適切に機能し、I/O操作のためにスレッドを自分で作成することは、アプリケーションのパフォーマンスをさらに低下させる可能性があります。一方、CPUに負荷のかかるタスクに関しては、同じことは言えません。CPUに負荷のかかるタスクはプロセス内で利用可能な追加スレッドを使用せずに、メインスレッドをブロックさせます。
以下のコマンドを使用して、qを押してtopを終了し、Nodeプロセスを停止してください。
- kill -9 9933
Node.jsのプロセス内のスレッドについて理解したので、次のセクションではCPUに負荷のかかるタスクを書いて、それがメインスレッドにどのように影響を与えるかを観察します。
ワーカースレッドを使わずにCPU処理を行うタスクの作成
このセクションでは、非同期のルートと、CPU バウンドタスクを実行するブロッキングルートを持つ Express アプリを作成します。
最初に、お好みのエディタでindex.jsを開いてください。
- nano index.js
index.jsファイルに、基本的なサーバーを作成するための以下のコードを追加してください。
const express = require("express");
const app = express();
const port = process.env.PORT || 3000;
app.get("/non-blocking/", (req, res) => {
res.status(200).send("This page is non-blocking");
});
app.listen(port, () => {
console.log(`App listening on port ${port}`);
});
次のコードブロックでは、Expressを使ってHTTPサーバーを作成します。最初の行では、expressモジュールをインポートします。次に、app変数にはExpressのインスタンスが保持されます。その後、サーバーがリッスンすべきポート番号を保持するport変数が定義されます。
これに従って、app.get(‘/non-blocking’)を使用してGETリクエストが送信されるべきルートを定義します。最後に、app.listen()メソッドを呼び出して、サーバーにポート3000での受信を開始するように指示します。
次に、CPUを多く使用するタスクを含む別のルート「/ブロッキング/」を定義します。
...
app.get("/blocking", async (req, res) => {
let counter = 0;
for (let i = 0; i < 20_000_000_000; i++) {
counter++;
}
res.status(200).send(`result is ${counter}`);
});
app.listen(port, () => {
console.log(`App listening on port ${port}`);
});
app.get(“/blocking”)メソッドを使用して、/blocking ルートを定義します。このメソッドは第二引数として非同期のコールバック関数を取ります。このコールバック関数は、CPUを多く使用するタスクを async キーワードと共に実行します。コールバック内で、20億回繰り返し処理を行い、各反復でカウンター変数を1ずつ増やします。このタスクはCPU上で実行され、完了するまで数秒かかります。
この時点で、index.jsファイルは次のようになります。 (Kono jiten de, index.js fainaru wa, jiyū ni wa nari masu)
const express = require("express");
const app = express();
const port = process.env.PORT || 3000;
app.get("/non-blocking/", (req, res) => {
res.status(200).send("This page is non-blocking");
});
app.get("/blocking", async (req, res) => {
let counter = 0;
for (let i = 0; i < 20_000_000_000; i++) {
counter++;
}
res.status(200).send(`result is ${counter}`);
});
app.listen(port, () => {
console.log(`App listening on port ${port}`);
});
ファイルを保存し、終了してから、次のコマンドを使ってサーバーを起動してください。
- node index.js
コマンドを実行すると、次のような出力が表示されます。
App listening on port 3000
これは、サーバーが稼働しており、サービスの提供に準備ができていることを示しています。
今、お好きなブラウザでhttp://localhost:3000/non-blockingを訪れてください。すると、「このページは非同期です」というメッセージが即座に表示されます。
Note
Expressサーバーがまだ実行されている間に、ローカルコンピューターで別のターミナルを開き、以下のコマンドを入力します:
ssh -L 3000:localhost:3000 自分の非ルートユーザー@サーバーのIP
サーバーに接続したら、ローカルマシンのウェブブラウザでhttp://localhost:3000/non-blockingに移動します。残りのチュートリアルの間、二つ目のターミナルは開いたままにしておきましょう。
次に、新しいタブを開き、http://localhost:3000/blockingにアクセスしてください。ページが読み込まれる間に、すばやくさらに2つのタブを開いて、再びhttp://localhost:3000/non-blockingにアクセスしてください。すると、即座のレスポンスは得られず、ページは読み込みを続けます。/blockingルートの読み込みが完了し、レスポンス結果が20000000000で返されてから、他のルートもレスポンスを返すようになります。
全ての非ブロッキングルートが動作しないのは、ブロッキングルートがロードされるためであり、それはCPUによるループがメインスレッドをブロックしているためです。メインスレッドがブロックされている時、Node.jsはCPUによるタスクが終了するまでリクエストを処理することができません。したがって、もしアプリケーションが数千の同時GETリクエストを非ブロッキングルートに対して行っている場合、ブロッキングルートへの一回のアクセスで全てのアプリケーションルートが応答しなくなるだけです。
ご覧の通り、メインスレッドをブロックすることは、ユーザーのアプリ体験に悪影響を及ぼす可能性があります。この問題を解決するためには、CPUに負荷のかかるタスクを別のスレッドにオフロードし、メインスレッドが他のHTTPリクエストに対応し続ける必要があります。
それで、CTRL+Cを押してサーバーを停止します。index.jsファイルにさらに変更を加えた後、次のセクションでサーバーを再起動します。サーバーを停止する理由は、Node.jsがファイルに新しい変更があった場合に自動でリフレッシュしないためです。
CPU密集なタスクがアプリケーションに与えるネガティブな影響を理解したので、約束(promises)を使ってメインスレッドのブロッキングを回避しようと思います。
CPU-BoundタスクのオフローディングをPromisesを使用して行う
開発者がCPUボンドのタスクからのブロッキング効果を学ぶと、コードを非同期化するためにプロミスを利用することがあります。この直感は、readFile()やwriteFile()などの非同期プロミスベースのI/Oメソッドを使用する知識に基づいています。しかし、I/O操作はNode.jsの非表示スレッドを使用しており、CPUボンドのタスクには使用されません。それにもかかわらず、このセクションでは、CPUボンドのタスクをプロミスでラップして非同期化しようと試みます。これは機能しませんが、次のセクションで使用するワーカースレッドの価値を見るのに役立ちます。
Note: Japanese transcription may not be 100% accurate.
エディタでindex.jsファイルを再度開きます。
- nano index.js
index.js ファイルから、CPU 負荷の高いタスクを含むハイライトされたコードを削除してください。
...
app.get("/blocking", async (req, res) => {
let counter = 0;
for (let i = 0; i < 20_000_000_000; i++) {
counter++;
}
res.status(200).send(`result is ${counter}`);
});
...
次に、以下のハイライトされたコードを追加して、プロミスを返す関数を含めてください。
...
function calculateCount() {
return new Promise((resolve, reject) => {
let counter = 0;
for (let i = 0; i < 20_000_000_000; i++) {
counter++;
}
resolve(counter);
});
}
app.get("/blocking", async (req, res) => {
res.status(200).send(`result is ${counter}`);
}
calculateCount() 関数は、/blocking ハンドラー関数にあった計算を含んでいます。この関数は、新しいPromise構文で初期化されたPromiseを返します。このPromiseには、成功または失敗を処理するためのresolveとrejectパラメータを持つコールバックが渡されます。forループが実行を終えると、Promiseはcounter変数の値で解決されます。
次に、index.jsファイルの/blocking/ハンドラ関数でcalculateCount()関数を呼び出してください。
app.get("/blocking", async (req, res) => {
const counter = await calculateCount();
res.status(200).send(`result is ${counter}`);
});
ここでは、計算カウント関数をawaitキーワードを前置して呼び出し、プロミスの解決を待ちます。プロミスが解決されると、カウンタ変数は解決された値に設定されます。
あなたの完全なコードは、以下のようになります:
const express = require("express");
const app = express();
const port = process.env.PORT || 3000;
app.get("/non-blocking/", (req, res) => {
res.status(200).send("This page is non-blocking");
});
function calculateCount() {
return new Promise((resolve, reject) => {
let counter = 0;
for (let i = 0; i < 20_000_000_000; i++) {
counter++;
}
resolve(counter);
});
}
app.get("/blocking", async (req, res) => {
const counter = await calculateCount();
res.status(200).send(`result is ${counter}`);
});
app.listen(port, () => {
console.log(`App listening on port ${port}`);
});
ファイルを保存して終了し、サーバーを再起動してください。
- node index.js
ウェブブラウザでhttp://localhost:3000/blockingにアクセスし、ロードする間にhttp://localhost:3000/non-blockingのタブを素早くリロードしてください。すると、非ブロッキングなルートも影響を受け、全てのルートが/blockingルートの読み込みの終了を待ちます。ルートがまだ影響を受けているため、PromiseではJavaScriptコードを並列実行することができず、CPUに負荷がかかるタスクを非ブロッキングにすることはできません。
それで、CTRL+Cでアプリケーションサーバーを停止してください。
CPUバウンドタスクを非同期にするためのメカニズムを提供しないことを知った今、Node.jsのワーカースレッドモジュールを使用してCPUバウンドタスクを別のスレッドにオフロードします。
ワーカースレッドモジュールを使用して、CPUに負荷のかかるタスクをオフロードします。
この節では、メインスレッドをブロックすることなく、CPU負荷の高いタスクを別のスレッドにオフロードするために、ワーカースレッドモジュールを使用します。そのために、CPU負荷の高いタスクを含むworker.jsファイルを作成します。index.jsファイルでは、ワーカースレッドモジュールを使用してスレッドを初期化し、worker.jsファイルでのタスクをメインスレッドと並列に実行します。タスクが完了すると、ワーカースレッドは結果を含むメッセージをメインスレッドに送信します。
まず、nprocコマンドを使用して2つ以上のコアを持っていることを確認してください。
- nproc
4
もし2つ以上のコアが表示された場合は、この手順を進めることができます。
次に、テキストエディタでworker.jsファイルを作成して開いてください。
- nano worker.js
あなたのworker.jsファイルに、worker-threadsモジュールをインポートし、CPU集中タスクを実行するための以下のコードを追加してください。
const { parentPort } = require("worker_threads");
let counter = 0;
for (let i = 0; i < 20_000_000_000; i++) {
counter++;
}
最初の行は、worker_threadsモジュールをロードし、parentPortクラスを抽出します。このクラスは、メインスレッドにメッセージを送信するために使用できるメソッドを提供します。次に、index.jsファイルのcalculateCount()関数に現在存在するCPU集中型のタスクがあります。このステップでは、index.jsからこの関数を削除します。
それに続いて、下に示したハイライトされたコードを追加してください。
const { parentPort } = require("worker_threads");
let counter = 0;
for (let i = 0; i < 20_000_000_000; i++) {
counter++;
}
parentPort.postMessage(counter);
ここでは、parentPortクラスのpostMessage()メソッドを呼び出し、counter変数に格納されているCPUバウンドタスクの結果を含むメッセージをメインスレッドに送信します。
ファイルを保存して閉じてください。テキストエディタでindex.jsを開いてください。
- nano index.js
worker.jsにすでにCPUバウンドタスクがあるので、index.jsからハイライトされたコードを削除してください。
const express = require("express");
const app = express();
const port = process.env.PORT || 3000;
app.get("/non-blocking/", (req, res) => {
res.status(200).send("This page is non-blocking");
});
function calculateCount() {
return new Promise((resolve, reject) => {
let counter = 0;
for (let i = 0; i < 20_000_000_000; i++) {
counter++;
}
resolve(counter);
});
}
app.get("/blocking", async (req, res) => {
const counter = await calculateCount();
res.status(200).send(`result is ${counter}`);
});
app.listen(port, () => {
console.log(`App listening on port ${port}`);
});
次に、app.get(“/blocking”)のコールバック内に、スレッドを初期化するための以下のコードを追加してください。
const express = require("express");
const { Worker } = require("worker_threads");
...
app.get("/blocking", async (req, res) => {
const worker = new Worker("./worker.js");
worker.on("message", (data) => {
res.status(200).send(`result is ${data}`);
});
worker.on("error", (msg) => {
res.status(404).send(`An error occurred: ${msg}`);
});
});
...
最初に、worker_threadsモジュールをインポートし、Workerクラスを展開します。app.get(“/blocking”)コールバック内で、newキーワードを使用してWorkerのインスタンスを作成します。その後、worker.jsファイルパスを引数としてWorkerを呼び出します。これにより、新しいスレッドが作成され、worker.jsファイルのコードが別のコアでスレッド上で実行されます。
これに続いて、”message” メソッドを使用して、worker インスタンスにイベントを付加し、メッセージイベントを受信するためにリスニングします。worker.js ファイルから結果が含まれるメッセージが受信された場合、それはメソッドのコールバックのパラメータとして渡され、CPUバウンドタスクの結果を含むユーザーへのレスポンスが返されます。
次に、on(“error”)メソッドを使用してエラーイベントをリッスンするために、別のイベントをワーカーインスタンスに追加します。エラーが発生した場合、コールバックはエラーメッセージを含んだ404レスポンスをユーザーに返します。
あなたの完全なファイルは、以下のようになります。
const express = require("express");
const { Worker } = require("worker_threads");
const app = express();
const port = process.env.PORT || 3000;
app.get("/non-blocking/", (req, res) => {
res.status(200).send("This page is non-blocking");
});
app.get("/blocking", async (req, res) => {
const worker = new Worker("./worker.js");
worker.on("message", (data) => {
res.status(200).send(`result is ${data}`);
});
worker.on("error", (msg) => {
res.status(404).send(`An error occurred: ${msg}`);
});
});
app.listen(port, () => {
console.log(`App listening on port ${port}`);
});
ファイルを保存して終了し、その後サーバーを実行してください。
- node index.js
Webブラウザで再びhttp://localhost:3000/blockingタブを訪れてください。ロードが完了する前に、すべてのhttp://localhost:3000/non-blockingタブをリフレッシュしてください。すると、/blockingルートのロードが完了するのを待たずに、非常に即座にロードされることに気付くはずです。これは、CPUに負荷のかかるタスクが別のスレッドにオフロードされ、メインスレッドがすべての着信リクエストを処理するためです。
今、CTRL+Cを使ってサーバーを停止してください。
CPU を使用するタスクを非同期化するために、ワーカースレッドを使用できるようになりました。CPU を多く使用するタスクのパフォーマンスを向上させるために、ワーカースレッドを4つ使用します。
4つのワーカースレッドを使用してCPU負荷の高いタスクを最適化する
このセクションでは、CPUを多く使用するタスクを4つのワーカースレッドに分散させ、タスクを早く終了させて/blockingルートのロード時間を短縮します。
同じタスクをより多くのワーカースレッドで処理するためには、タスクを分割する必要があります。このタスクは200億回のループが含まれるため、200億を使用するスレッドの数で分割します。この場合は4です。200_000_000_000 / 4 を計算すると、5_000_000_000 となります。したがって、各スレッドは0から5_000_000_000までの範囲でループし、カウンターを1ずつ増やします。各スレッドが終了すると、結果を含むメッセージをメインスレッドに送信します。メインスレッドが4つのスレッドからそれぞれメッセージを受け取ったら、結果を結合し、ユーザーに応答を送信します。
もし大量の配列を繰り返し処理するタスクがある場合でも、同じ手法を使うことができます。たとえば、ディレクトリ内の800枚の画像のサイズを変更したい場合、画像ファイルのパスを含む配列を作成します。次に、800を4(スレッド数)で割り、各スレッドが処理する範囲を決めます。スレッド1は、配列のインデックス0から199までの画像をリサイズし、スレッド2はインデックス200から399までの画像をリサイズし、依次に処理を行います。
最初に、4つ以上のコアを持っているかどうかを確認してください。
- nproc
4
cpコマンドを使ってworker.jsファイルのコピーを作成してください。
- cp worker.js four_workers.js
現在のindex.jsとworker.jsのファイルはそのまま残されますので、後でこのセクションの変更とパフォーマンスを比較するために再実行することができます。
次に、テキストエディタでfour_workers.jsファイルを開いてください。
- nano four_workers.js
「four_workers.js」ファイルに、workerDataオブジェクトをインポートするためのハイライトされたコードを追加してください。
const { workerData, parentPort } = require("worker_threads");
let counter = 0;
for (let i = 0; i < 20_000_000_000 / workerData.thread_count; i++) {
counter++;
}
parentPort.postMessage(counter);
最初に、WorkerDataオブジェクトを抽出します。このオブジェクトには、メインスレッドから初期化時に渡されたデータが含まれます(index.jsファイルでまもなく行う予定です)。オブジェクトには、スレッドの数である4を含むthread_countプロパティがあります。次にforループで、値20_000_000_000を4で割ることで、5_000_000_000という結果が得られます。
ファイルを保存して閉じた後、index.jsファイルをコピーしてください。
- cp index.js index_four_workers.js
エディタでindex_four_workers.js ファイルを開いてください。
- nano index_four_workers.js
あなたのindex_four_workers.jsファイルに、スレッドインスタンスを作成するためのハイライトされたコードを追加してください。
...
const app = express();
const port = process.env.PORT || 3000;
const THREAD_COUNT = 4;
...
function createWorker() {
return new Promise(function (resolve, reject) {
const worker = new Worker("./four_workers.js", {
workerData: { thread_count: THREAD_COUNT },
});
});
}
app.get("/blocking", async (req, res) => {
...
})
...
最初に、作成するスレッド数を指定するTHREAD_COUNTという定数を定義します。後に、サーバーにより多くのコアが搭載されると、スケーリングにはTHREAD_COUNTの値を使用したいスレッド数に変更する必要があります。
次に、createWorker()関数はPromiseを作成して返します。Promiseのコールバック内で、最初の引数にfour_workers.jsファイルのファイルパスをWorkerクラスに渡すことで新しいスレッドを初期化します。次に、2番目の引数としてオブジェクトを渡します。その後、オブジェクトにworkerDataプロパティを割り当てますが、その値は別のオブジェクトです。最後に、thread_countプロパティにはTHREAD_COUNT定数で指定されたスレッド数が値として割り当てられます。workerDataオブジェクトは、以前にworkers.jsファイルで参照したオブジェクトです。
約束が解決されるかエラーが発生するか確認するために、以下のハイライト部分を追加してください。
...
function createWorker() {
return new Promise(function (resolve, reject) {
const worker = new Worker("./four_workers.js", {
workerData: { thread_count: THREAD_COUNT },
});
worker.on("message", (data) => {
resolve(data);
});
worker.on("error", (msg) => {
reject(`An error ocurred: ${msg}`);
});
});
}
...
労働者スレッドがメインスレッドにメッセージを送信すると、プロミスは返されたデータで解決します。ただし、エラーが発生した場合は、プロミスはエラーメッセージを返します。
新しいスレッドを初期化し、そのスレッドからデータを返す関数を定義したので、app.get(“/blocking”)内でその関数を使用して新しいスレッドを生成します。
しかし最初に、createWorker()機能で既にこの機能を定義しているため、次に以下のハイライトコードを削除してください。
...
app.get("/blocking", async (req, res) => {
const worker = new Worker("./worker.js");
worker.on("message", (data) => {
res.status(200).send(`result is ${data}`);
});
worker.on("error", (msg) => {
res.status(404).send(`An error ocurred: ${msg}`);
});
});
...
削除されたコードの代わりに、以下のコードを追加して4つのワークスレッドを初期化してください。
...
app.get("/blocking", async (req, res) => {
const workerPromises = [];
for (let i = 0; i < THREAD_COUNT; i++) {
workerPromises.push(createWorker());
}
});
...
最初に、空の配列を含むworkerPromises変数を作成します。次に、THREAD_COUNTという値の回数だけ繰り返しを行います(この値は4です)。各繰り返しの間に、createWorker()関数を呼び出して新しいスレッドを作成します。その後、関数が返すpromiseオブジェクトをJavaScriptのpushメソッドを使ってworkerPromises配列に追加します。ループが終了すると、workerPromisesはcreateWorker()関数を4回呼び出すごとに返されるpromiseオブジェクトが4つ含まれることになります。
今、以下にハイライトされたコードを追加して、プロミスが解決するのを待ち、ユーザーに応答を返します。
app.get("/blocking", async (req, res) => {
const workerPromises = [];
for (let i = 0; i < THREAD_COUNT; i++) {
workerPromises.push(createWorker());
}
const thread_results = await Promise.all(workerPromises);
const total =
thread_results[0] +
thread_results[1] +
thread_results[2] +
thread_results[3];
res.status(200).send(`result is ${total}`);
});
workerPromisesという配列には、createWorker()を呼び出して返されたプロミスが含まれているため、Promise.all()メソッドの前にawait構文をつけ、そのall()メソッドをworkerPromisesを引数として呼び出します。Promise.all()メソッドは、配列内のすべてのプロミスが解決するのを待ちます。それが起こると、thread_results変数にはプロミスが解決した値が格納されます。計算は4つのワーカーに分割されていたため、ブラケット表記構文を使ってthread_resultsからそれぞれの値を取得し、すべてを合計します。合計値が求まったら、それをページに返します。
完全なファイルは、以下のようになります。
const express = require("express");
const { Worker } = require("worker_threads");
const app = express();
const port = process.env.PORT || 3000;
const THREAD_COUNT = 4;
app.get("/non-blocking/", (req, res) => {
res.status(200).send("This page is non-blocking");
});
function createWorker() {
return new Promise(function (resolve, reject) {
const worker = new Worker("./four_workers.js", {
workerData: { thread_count: THREAD_COUNT },
});
worker.on("message", (data) => {
resolve(data);
});
worker.on("error", (msg) => {
reject(`An error ocurred: ${msg}`);
});
});
}
app.get("/blocking", async (req, res) => {
const workerPromises = [];
for (let i = 0; i < THREAD_COUNT; i++) {
workerPromises.push(createWorker());
}
const thread_results = await Promise.all(workerPromises);
const total =
thread_results[0] +
thread_results[1] +
thread_results[2] +
thread_results[3];
res.status(200).send(`result is ${total}`);
});
app.listen(port, () => {
console.log(`App listening on port ${port}`);
});
ファイルを保存して閉じてください。このファイルを実行する前に、まずindex.jsを実行して応答時間を測定してください。
- node index.js
次に、ローカルのコンピュータで新しいターミナルを開き、以下のcurlコマンドを入力してください。このコマンドは、/blockingルートからの応答までにかかる時間を測定します。
- time curl –get http://localhost:3000/blocking
timeコマンドは、curlコマンドの実行時間を計測します。curlコマンドは指定されたURLにHTTPリクエストを送信し、–getオプションはcurlにGETリクエストを行うよう指示します。
コマンドが実行されると、出力は次のようになります。
real 0m28.882s user 0m0.018s sys 0m0.000s
ハイライトされた出力によると、レスポンスを取得するのに約28秒かかりますが、それはお使いのコンピューターによって異なる可能性があります。
次に、CTRL+Cでサーバーを停止し、index_four_workers.jsファイルを実行してください。
- node index_four_workers.js
二つ目の端末で再度/blockingルートを訪れてください。
- time curl –get http://localhost:3000/blocking
以下のような出力が表示されます。
real 0m8.491s user 0m0.011s sys 0m0.005s
出力結果によると、約8秒かかることがわかります。これは、読み込み時間をおおよそ70%短縮できたことを意味します。
四つのワーカースレッドを使用してCPUによって制約されるタスクを成功裏に最適化しました。もし4つ以上のコアを持つマシンがある場合は、THREAD_COUNTをその数値に更新すれば、さらに読み込み時間を短縮できます。
結論とは、日本語でいうと「結論」となります。
この記事では、メインスレッドをブロックするCPUバウンドタスクを持つNodeアプリを作成しました。その後、非同期処理を行うためにプロミスを使用する試みが失敗しました。その後、CPUバウンドタスクを別のスレッドにオフロードするためにworker_threadsモジュールを使用しました。最後に、CPU集中タスクの処理速度を向上させるために、worker_threadsモジュールを使用して4つのスレッドを作成しました。
次のステップとして、オプションについて詳しく知りたい場合は、Node.jsのワーカースレッドのドキュメンテーションを参照してください。また、CPU集中型のタスクのためのワーカープールを作成することができる「piscina」ライブラリもチェックできます。Node.jsの学習を続けたい場合は、チュートリアルシリーズ「How To Code in Node.js」を参照してください。