使用多线程在Node.js中的方法如何?
作者选择将“Open Sourcing Mental Illness”作为“为捐款而写”计划的捐赠对象。
引言
Node.js在单个线程中运行JavaScript代码,这意味着你的代码一次只能执行一个任务。然而,Node.js本身是多线程的,并通过libuv库提供了隐藏线程,用于处理像从磁盘读取文件或网络请求之类的I/O操作。通过使用隐藏线程,Node.js提供了异步方法,允许你的代码在不阻塞主线程的情况下进行I/O请求。
尽管Node.js具有隐藏线程,但不能使用它们来卸载CPU密集型任务,例如复杂计算、图像调整或视频压缩。由于JavaScript是单线程的,当进行CPU密集型任务时,它会阻塞主线程,其他代码在任务完成之前不会执行。没有使用其他线程的情况下,提速CPU绑定任务的唯一方法是增加处理器的速度。
然而,近年来CPU的速度并没有变得更快。相反,计算机现在配备了更多的核心,拥有8个或更多核心的计算机变得更为普遍。尽管如此,由于JavaScript是单线程的,你的代码将无法充分利用计算机上额外的核心来加快CPU限制任务的速度或避免中断主线程。
为了解决这个问题,Node.js 引入了 worker-threads 模块,它允许你创建线程并同时执行多个 JavaScript 任务。一旦一个线程完成一个任务,它会向主线程发送包含操作结果的消息,以便可以与代码的其他部分一起使用。使用 worker 线程的优势在于解决了由于 CPU 密集型任务而阻塞主线程的问题,并且可以将任务分割和分配给多个工作线程进行优化。
在本教程中,您将使用Node.js创建一个占用CPU的任务应用程序,该任务会阻塞主线程。接下来,您将使用worker-threads模块将CPU密集型任务转移到另一个线程以避免阻塞主线程。最后,您将将CPU绑定任务分割,并让四个线程并行处理以加快任务速度。
前提条件
要完成这个教程,你需要:
- A multi-core system with four or more cores. You can still follow the tutorial from Steps 1 through 6 on a dual-core system. However, Step 7 requires four cores to see the performance improvements.
- A Node.js development environment. If you’re on Ubuntu 22.04, install the recent version of Node.js by following step 3 of How To Install Node.js on Ubuntu 22.04. If you’re on another operating system, see How to Install Node.js and Create a Local Development Environment.
- A good understanding of the event loop, callbacks, and promises in JavaScript, which you can find in our tutorial, Understanding the Event Loop, Callbacks, Promises, and Async/Await in JavaScript.
- Basic knowledge of how to use the Express web framework. Check out our guide, How To Get Started with Node.js and Express.
建立项目和安装依赖项
在这一步中,你将创建项目目录,初始化npm,并安装所有必要的依赖项。
首先,创建并进入项目目录。
- mkdir multi-threading_demo
- cd multi-threading_demo
mkdir命令用于创建目录,cd命令则将工作目录更改为新创建的目录。
接下来,使用npm init命令初始化项目目录。
- npm init -y
-y选项接受所有的默认选项。
当命令运行时,您的输出将类似于这样:
Wrote to /home/sammy/multi-threading_demo/package.json:
{
"name": "multi-threading_demo",
"version": "1.0.0",
"description": "",
"main": "index.js",
"scripts": {
"test": "echo \"Error: no test specified\" && exit 1"
},
"keywords": [],
"author": "",
"license": "ISC"
}
接下来,在Node.js web框架中安装Express。
- npm install express
你将使用Express创建一个具有阻塞和非阻塞端点的服务器应用程序。
Node.js默认情况下附带了worker-threads模块,因此您无需安装。
你已经安装好了所需的软件包。接下来,你将学习更多关于进程和线程以及它们在计算机上如何执行的内容。
理解进程和线程
在你开始编写与CPU绑定的任务并将其分派给独立线程之前,你首先需要了解进程和线程以及它们之间的区别。最重要的是,你将会回顾进程和线程在单核或多核计算机系统上的执行方式。
处理
一个进程是在操作系统中运行的程序。它有自己的内存,无法看到或访问其他正在运行的程序的内存。它还有一个指令指针,指示当前正在执行的程序的指令。一次只能执行一个任务。
为了理解这个,你需要创建一个带有无限循环的Node.js程序,这样它在运行时不会退出。
使用nano或你首选的文本编辑器创建并打开process.js 文件。
- nano process.js
在你的 process.js 文件中,输入以下代码。
const process_name = process.argv.slice(2)[0];
count = 0;
while (true) {
count++;
if (count == 2000 || count == 4000) {
console.log(`${process_name}: ${count}`);
}
}
在第一行中,process.argv属性返回一个包含程序命令行参数的数组。然后,你使用JavaScript的slice()方法,并提供一个参数2,来创建从索引2开始的数组的浅拷贝。这样做可以跳过前两个参数,即Node.js的路径和程序文件名。接下来,你使用方括号表示法语法来从切片后的数组中检索第一个参数,并将其存储在process_name变量中。
然后,你定义一个while循环,并传入一个始终为真的条件使循环永远执行。在循环中,每次迭代时计数变量增加1。接着是一个if语句,检查计数是否等于2000或4000。如果条件为真,console.log()方法将在终端中记录一条消息。
使用CTRL+X保存并关闭您的文件,然后按Y保存更改。
使用node命令运行该程序。
- node process.js A &
A是传递给程序并存储在process_name变量中的命令行参数。 结尾处的&允许Node程序在后台运行,这样您可以在Shell中输入更多命令。
当你运行程序时,你会看到类似于以下的输出。
[1] 7754 A: 2000 A: 4000
7754是操作系统分配给它的进程ID。程序输出为:A:2000和A:4000。
当您使用node命令运行程序时,您创建了一个进程。操作系统会为程序分配内存,在计算机的磁盘上定位程序可执行文件,并将程序加载到内存中。然后,系统会为其分配一个进程ID并开始执行程序。此时,您的程序已经成为一个进程。
当进程正在运行时,它的进程ID将被添加到操作系统的进程列表中,并可以通过htop、top或ps等工具进行查看。这些工具提供更多关于进程的详细信息,同时还可以选择停止或设置进程的优先级。
要快速获得关于Node进程的摘要,请在终端中按回车键以获得提示符。接下来,运行ps命令以查看Node进程。
- ps |grep node
ps命令列出了系统上与当前用户相关的所有进程。使用管道操作符| 将所有ps输出传递给grep过滤器,以列出仅Node进程。
运行该命令将会产生类似以下的输出结果。
7754 pts/0 00:21:49 node
您可以从一个程序中创建无数个进程。比如,使用以下命令创建三个具有不同参数的进程,并将它们放在后台执行:
- node process.js B & node process.js C & node process.js D &
在该命令中,您创建了三个process.js程序的更多实例。符号&将每个进程放置在后台。
运行该命令后,输出将类似于以下内容(顺序可能会有所不同):
[2] 7821 [3] 7822 [4] 7823 D: 2000 D: 4000 B: 2000 B: 4000 C: 2000 C: 4000
正如您在输出结果中所看到的,当计数达到2000和4000时,每个进程都会将进程名称记录在终端中。每个进程都不知道其他进程正在运行:进程D不知道进程C的存在,反之亦然。无论在哪个进程中发生的任何事情都不会影响其他Node.js进程。
如果您仔细检查输出,您会发现输出的顺序并不是您创建三个进程时的顺序。在运行命令时,进程的参数顺序是B,C和D。但现在,顺序是D,B和C。原因是操作系统具有调度算法,决定在给定时间在CPU上运行哪个进程。
在单核机器上,进程并发执行。也就是说,操作系统在规定的时间间隔内在进程之间切换。例如,进程D执行一段时间后,它的状态被保存在某个地方,操作系统调度进程B执行一段时间,如此往复。直到所有任务都完成为止。从输出结果看,似乎每个进程都已经完成了,但实际上,操作系统的调度器不断在它们之间进行切换。
在多核系统上,假设你有四个核心,操作系统会同时安排每个进程在每个核心上执行。这被称为并行性。然而,如果你创建了四个额外的进程(总共八个),每个核心将同时执行两个进程,直到它们完成。
线程
线程就像进程一样:它们有自己的指令指针,每次只能执行一个JavaScript任务。与进程不同,线程没有自己的内存。相反,它们存在于进程的内存中。创建进程时,可以使用worker_threads模块创建多个线程来并行执行JavaScript代码。此外,线程可以通过消息传递或在进程的内存中共享数据与彼此通信。相比于进程,这使得线程更加轻量级,因为生成线程不会向操作系统请求更多的内存。
当涉及到线程的执行时,它们与进程有着类似的行为。如果在单核系统上运行多个线程,操作系统会定期在它们之间切换,使每个线程有机会直接在单个CPU上执行。在多核系统上,操作系统将线程安排在所有核心上同时执行JavaScript代码。如果创建的线程数超过可用的核心数,每个核心将同时执行多个线程。
在此之后,按下回车键,然后使用kill命令停止当前正在运行的所有Node进程。
- sudo kill -9 `pgrep node`
pgrep命令将返回四个Node进程的进程ID给kill命令。-9选项指示kill发送一个SIGKILL信号。
运行该命令后,您将看到类似以下输出的结果:
[1] Killed node process.js A [2] Killed node process.js B [3] Killed node process.js C [4] Killed node process.js D
有时候,在你稍后运行其他命令时,输出可能会延迟显示出来。
既然你已经了解了进程和线程的区别,接下来你将在下一节中使用Node.js中隐藏的线程进行工作。
理解 Node.js 中的隐藏线程
Node.js提供了额外的线程,这就是为什么它被认为是多线程的原因。在本节中,您将了解Node.js中的隐藏线程,这些线程有助于使I / O操作非阻塞。
如介绍中所提到的,JavaScript 是单线程的,所有的 JavaScript 代码都在一个线程中执行。这包括您的程序源代码以及您在程序中引入的第三方库。当程序执行 I/O 操作读取文件或进行网络请求时,这会阻塞主线程。
然而,Node.js实现了libuv库,为Node.js进程提供了四个额外的线程。通过这些线程,I/O操作被分开处理,当它们完成时,事件循环将与I/O任务相关联的回调添加到微任务队列中。当主线程的调用堆栈清空时,回调被推入调用堆栈,然后执行。为了使这一点清楚,与给定的I/O任务相关联的回调不会并行执行;然而,借助这些线程,读取文件或网络请求的任务本身是并行进行的。一旦I/O任务完成,回调在主线程中运行。
除了这四个线程外,V8引擎还提供了两个线程来处理自动垃圾回收等任务。这样,一个进程中的线程总数就变成了七个:一个主线程,四个Node.js线程以及两个V8线程。
在后台运行process.js文件并重复运行,以确认每个Node.js进程都具有七个线程。
- node process.js A &
终端会记录程序的进程ID和输出内容。
[1] 9933 A: 2000 A: 4000
请在某处记录下进程ID,然后按下回车键以便再次使用提示符。
要查看线程,请运行top命令,并将显示在输出中的进程ID传递给它。
- top -H -p 9933
-H 命令 top 显示进程中的线程。-p 标志指示 top 仅监视给定进程ID中的活动。
当你运行命令时,你的输出将类似于下面的情况:
top – 09:21:11 up 15:00, 1 user, load average: 0.99, 0.60, 0.26 Threads: 7 total, 1 running, 6 sleeping, 0 stopped, 0 zombie %Cpu(s): 24.8 us, 0.3 sy, 0.0 ni, 75.0 id, 0.0 wa, 0.0 hi, 0.0 si, 0.0 st MiB Mem : 7951.2 total, 6756.1 free, 248.4 used, 946.7 buff/cache MiB Swap: 0.0 total, 0.0 free, 0.0 used. 7457.4 avail Mem PID USER PR NI VIRT RES SHR S %CPU %MEM TIME+ COMMAND 9933 node-us+ 20 0 597936 51864 33956 R 99.9 0.6 4:19.64 node 9934 node-us+ 20 0 597936 51864 33956 S 0.0 0.6 0:00.00 node 9935 node-us+ 20 0 597936 51864 33956 S 0.0 0.6 0:00.84 node 9936 node-us+ 20 0 597936 51864 33956 S 0.0 0.6 0:00.83 node 9937 node-us+ 20 0 597936 51864 33956 S 0.0 0.6 0:00.93 node 9938 node-us+ 20 0 597936 51864 33956 S 0.0 0.6 0:00.83 node 9939 node-us+ 20 0 597936 51864 33956 S 0.0 0.6 0:00.00 node
正如您在输出中所看到的,Node.js进程总共有七个线程:一个用于执行JavaScript的主线程,四个Node.js线程和两个V8线程。
如前所述,四个Node.js线程用于I/O操作,使它们变为非阻塞状态。它们在这方面表现良好,自己为I/O操作创建线程可能会降低应用程序性能。然而,对于CPU密集型任务就无法这样说了。CPU密集型任务不会使用进程中的额外线程,而是会阻塞主线程。
现在按下q键退出top,并使用以下命令停止Node进程:
- kill -9 9933
既然你已经了解了Node.js进程中的线程,接下来你将在下一个部分中编写一个CPU密集型的任务,并观察它对主线程的影响。
在没有工作线程的情况下创建一个CPU密集型任务
在本章中,您将构建一个Express应用程序,其中包含一个非阻塞的路由和一个运行CPU绑定任务的阻塞路由。
首先,在你喜欢的编辑器中打开index.js文件。
- nano index.js
在你的index.js文件中,添加以下代码以创建一个基本的服务器:
const express = require("express");
const app = express();
const port = process.env.PORT || 3000;
app.get("/non-blocking/", (req, res) => {
res.status(200).send("This page is non-blocking");
});
app.listen(port, () => {
console.log(`App listening on port ${port}`);
});
在下面的代码块中,你使用 Express 创建了一个HTTP服务器。在第一行中,你导入了express模块。接下来,你将app变量设置为一个Express实例。然后,你定义了port变量,用于保存服务器应该监听的端口号。
紧接着,你使用app.get(‘/non-blocking’)来定义应该发送GET请求的路由。最后,你调用app.listen()方法来指示服务器开始在3000端口上进行监听。
接下来,定义另一条路由/阻塞/,其中包含一个占用CPU资源的繁重任务。
...
app.get("/blocking", async (req, res) => {
let counter = 0;
for (let i = 0; i < 20_000_000_000; i++) {
counter++;
}
res.status(200).send(`result is ${counter}`);
});
app.listen(port, () => {
console.log(`App listening on port ${port}`);
});
你可以使用 app.get(“/blocking”) 来定义“阻塞路由”,该路由接受一个以 async 关键字为前缀的异步回调函数作为第二个参数,用于运行一个耗费 CPU 资源的任务。在回调函数内部,你可以创建一个 for 循环,循环迭代 200 亿次,并且在每次迭代中将计数器变量加1。该任务在 CPU 上运行,需要几秒钟才能完成。
此时,你的index.js文件将以如下方式呈现:
const express = require("express");
const app = express();
const port = process.env.PORT || 3000;
app.get("/non-blocking/", (req, res) => {
res.status(200).send("This page is non-blocking");
});
app.get("/blocking", async (req, res) => {
let counter = 0;
for (let i = 0; i < 20_000_000_000; i++) {
counter++;
}
res.status(200).send(`result is ${counter}`);
});
app.listen(port, () => {
console.log(`App listening on port ${port}`);
});
保存并退出文件,然后使用以下命令启动服务器。
- node index.js
当你运行这个命令时,你将会看到类似于以下的输出。
App listening on port 3000
这显示服务器正在运行并准备服务。
现在,在您喜欢的浏览器中访问http://localhost:3000/non-blocking。您会立即看到一条带有“此页面是非阻塞的”消息的回应。
Note
在Express服务器仍在运行时,在您的本地计算机上打开另一个终端,并输入以下命令:
ssh -L 3000:localhost:3000 your-non-root-user@yourserver-ip
连接到服务器后,在本地机器的Web浏览器中导航到http://localhost:3000/non-blocking。在本教程的其余部分中保持第二个终端开启。
接下来,打开一个新的标签页访问http://localhost:3000/blocking。页面加载时,迅速打开另外两个标签页并再次访问http://localhost:3000/non-blocking。你将会发现你不会立即得到响应,并且页面将不断尝试加载。只有在/blocking路径完成加载并返回结果20000000000后,其余的路径才会返回响应。
所有非阻塞路由不起作用的原因是因为有一个占用CPU的循环阻塞了主线程。当主线程被阻塞时,Node.js无法响应任何请求,直到CPU绑定任务完成。因此,如果你的应用程序有成千上万个同时发起GET请求的非阻塞路由,只需访问一次阻塞路由,就足以使得所有应用程序路由无法响应。
根据您所见,阻塞主线程可能会影响用户使用您的应用的体验。为解决此问题,您需要将消耗CPU资源的任务转移到另一个线程,以便主线程可以继续处理其他HTTP请求。
在完成这一步之后,按下CTRL + C停止服务器。在对index.js文件进行更多修改后,您将在下一节重新启动服务器。停止服务器的原因是Node.js在文件进行新的更改时不会自动刷新。
既然你明白了CPU密集型任务对应用程序的负面影响,现在你可以尝试使用Promise来避免阻塞主线程。
使用承诺来卸载一个CPU密集型任务
通常,当开发者了解到由于CPU密集型任务产生的阻塞性能问题时,他们会使用承诺(promises)来使代码变得非阻塞。这种直觉源自于使用非阻塞的基于承诺的I/O方法,如readFile()和writeFile()。但是如你所学,I/O操作利用了Node.js的隐藏线程,而CPU密集型任务则没有使用这些线程。尽管如此,在本节中,你将把CPU密集型任务封装在一个承诺中,以试图将其变为非阻塞。这种方法不会起作用,但它将帮助你看到使用工作线程的价值,而这将会在下一节中介绍。
请再次在编辑器中打开index.js文件。
- nano index.js
在你的index.js文件中,删除包含CPU密集型任务的突出显示代码。
...
app.get("/blocking", async (req, res) => {
let counter = 0;
for (let i = 0; i < 20_000_000_000; i++) {
counter++;
}
res.status(200).send(`result is ${counter}`);
});
...
接下来,添加下面突出显示的代码,其中包含一个返回 Promise 的函数。
...
function calculateCount() {
return new Promise((resolve, reject) => {
let counter = 0;
for (let i = 0; i < 20_000_000_000; i++) {
counter++;
}
resolve(counter);
});
}
app.get("/blocking", async (req, res) => {
res.status(200).send(`result is ${counter}`);
}
函数calculateCount()现在包含了你在/blocking handler函数中的计算。该函数返回一个promise,它使用新的Promise语法进行初始化。这个promise采用一个带有resolve和reject参数的回调函数,用来处理成功或失败。当for循环运行结束时,promise会通过counter变量的值来解析。
接下来,在index.js文件中的/blocking/处理函数中调用calculateCount()函数。
app.get("/blocking", async (req, res) => {
const counter = await calculateCount();
res.status(200).send(`result is ${counter}`);
});
在这里,你调用calculateCount()函数,并在前面加上await关键字来等待promise解析。一旦promise解析完成,计数器变量就会被设置为解析后的值。
您的完整代码现在将如下所示:
const express = require("express");
const app = express();
const port = process.env.PORT || 3000;
app.get("/non-blocking/", (req, res) => {
res.status(200).send("This page is non-blocking");
});
function calculateCount() {
return new Promise((resolve, reject) => {
let counter = 0;
for (let i = 0; i < 20_000_000_000; i++) {
counter++;
}
resolve(counter);
});
}
app.get("/blocking", async (req, res) => {
const counter = await calculateCount();
res.status(200).send(`result is ${counter}`);
});
app.listen(port, () => {
console.log(`App listening on port ${port}`);
});
保存并退出文件,然后重新启动服务器。
- node index.js
在您的网络浏览器中,访问 http://localhost:3000/blocking,并在加载时,快速重新加载 http://localhost:3000/non-blocking 标签页。您会发现,非阻塞路由仍然受到影响,它们都将等待 /blocking 路由完成加载。因为路由仍然受到影响,所以承诺无法使JavaScript代码并行执行,也不能用于使CPU密集任务变为非阻塞。
用CTRL+C停止应用服务器。
既然你知道Promise不提供任何机制将CPU绑定任务变为非阻塞,那么你将会使用Node.js的worker-threads模块将CPU绑定任务分离到一个独立的线程中。
使用worker-threads模块来卸载一个CPU绑定的任务。
在这部分中,您将使用worker-threads模块将一个CPU密集型任务转移到另一个线程,以避免阻塞主线程。为了做到这一点,您将创建一个worker.js文件,其中包含CPU密集型任务。在index.js文件中,您将使用worker-threads模块初始化线程并在worker.js文件中启动任务,使其与主线程并行运行。任务完成后,工作线程将向主线程发送包含结果的消息。
首先,使用nproc命令验证您是否拥有2个或更多内核。
- nproc
4
如果显示出两个或更多个核心,你可以进行下一步。
接下来,在你的文本编辑器中创建并打开worker.js文件。
- nano worker.js
在您的 worker.js 文件中,添加以下代码以导入 worker-threads 模块并执行 CPU 密集型任务。
const { parentPort } = require("worker_threads");
let counter = 0;
for (let i = 0; i < 20_000_000_000; i++) {
counter++;
}
第一行加载了`worker_threads`模块,并提取了`parentPort`类。该类提供了可以用来向主线程发送消息的方法。接下来,你有一个在`index.js`文件中当前位于`calculateCount()`函数内的CPU密集型任务。在后面的步骤中,你将从`index.js`中删除这个函数。
按照这个步骤,在下面添加标记的代码:
const { parentPort } = require("worker_threads");
let counter = 0;
for (let i = 0; i < 20_000_000_000; i++) {
counter++;
}
parentPort.postMessage(counter);
在这里,您调用了parentPort类的postMessage()方法,它会向主线程发送一条消息,消息中包含计数器变量中存储的CPU密集型任务的结果。
保存并退出文件。在您的文本编辑器中打开 index.js 文件。
- nano index.js
既然你已经在worker.js中有了CPU绑定的任务,那就从index.js中删除掉被标记的代码。
const express = require("express");
const app = express();
const port = process.env.PORT || 3000;
app.get("/non-blocking/", (req, res) => {
res.status(200).send("This page is non-blocking");
});
function calculateCount() {
return new Promise((resolve, reject) => {
let counter = 0;
for (let i = 0; i < 20_000_000_000; i++) {
counter++;
}
resolve(counter);
});
}
app.get("/blocking", async (req, res) => {
const counter = await calculateCount();
res.status(200).send(`result is ${counter}`);
});
app.listen(port, () => {
console.log(`App listening on port ${port}`);
});
接下来,在app.get(“/blocking”)的回调函数中,添加以下代码来初始化线程:
const express = require("express");
const { Worker } = require("worker_threads");
...
app.get("/blocking", async (req, res) => {
const worker = new Worker("./worker.js");
worker.on("message", (data) => {
res.status(200).send(`result is ${data}`);
});
worker.on("error", (msg) => {
res.status(404).send(`An error occurred: ${msg}`);
});
});
...
首先,您导入worker_threads模块并解压Worker类。在app.get(“/blocking”)回调函数中,您使用new关键字创建一个Worker实例,其后是对Worker的调用,并将worker.js文件路径作为参数传递给它。这将创建一个新线程,worker.js文件中的代码将在另一个核心上的线程中开始运行。
随后,您可以使用on(“message”)方法将事件附加到工作实例上,以监听消息事件。当接收到来自worker.js文件的包含结果的消息时,它将作为参数传递给方法的回调函数,并返回一个包含CPU密集型任务结果的响应给用户。
接下来,您可以使用on(“error”)方法将另一个事件附加到工作实例,以侦听错误事件。如果出现错误,回调函数将返回一个包含错误消息的404响应给用户。
您的完整文件现在将如下所示:
const express = require("express");
const { Worker } = require("worker_threads");
const app = express();
const port = process.env.PORT || 3000;
app.get("/non-blocking/", (req, res) => {
res.status(200).send("This page is non-blocking");
});
app.get("/blocking", async (req, res) => {
const worker = new Worker("./worker.js");
worker.on("message", (data) => {
res.status(200).send(`result is ${data}`);
});
worker.on("error", (msg) => {
res.status(404).send(`An error occurred: ${msg}`);
});
});
app.listen(port, () => {
console.log(`App listening on port ${port}`);
});
保存并退出您的文件,然后运行服务器。
- node index.js
再次在您的网络浏览器中访问http://localhost:3000/blocking标签。在它加载完成之前,刷新所有的http://localhost:3000/non-blocking标签。您现在应该注意到它们会立即加载,而无需等待/blocking路由加载完成。这是因为CPU绑定的任务已经被分配到另一个线程上,主线程处理所有传入的请求。
现在,使用CTRL+C停止您的服务器。
现在,你可以使用工作线程使CPU密集型任务不再阻塞,因此你将使用四个工作线程来改善CPU密集型任务的性能。
使用四个工作线程优化一个CPU密集型任务
在这个部分中,你将把CPU密集任务分配给四个工作线程,以便它们可以更快地完成任务并缩短/blocking路线的加载时间。
为了让更多工作线程同时处理同一任务,你需要将任务分割。由于此任务需要循环200亿次,你将200亿除以你想要使用的线程数量。在本例中,是4个线程。计算200_000_000_000 / 4将得到5_000_000_000。因此每个线程将循环从0到5_000_000_000,并将计数器增加1。当每个线程完成时,它将向主线程发送包含结果的消息。一旦主线程分别接收到来自四个线程的消息,你将合并结果并向用户发送一个响应。
如果你有一个需要迭代大型数组的任务,你也可以使用相同的方法。例如,如果你想要调整目录中的800个图像大小,你可以创建一个包含所有图像文件路径的数组。接下来,将800除以4(线程数),让每个线程处理一个范围。第一个线程将调整数组索引从0到199的图像大小,第二个线程从索引200到399,依此类推。
首先,确认您的电脑拥有四个或更多核心。
- nproc
4
使用cp命令复制worker.js文件。
- cp worker.js four_workers.js
当前的index.js和worker.js文件将保持不变,以便稍后可以再次运行它们以与本节中的更改进行性能比较。
接下来,打开您的文本编辑器中的four_workers.js文件。
- nano four_workers.js
在你的four_workers.js文件中,添加如下突出代码来引入workerData对象:
const { workerData, parentPort } = require("worker_threads");
let counter = 0;
for (let i = 0; i < 20_000_000_000 / workerData.thread_count; i++) {
counter++;
}
parentPort.postMessage(counter);
首先,你提取WorkerData对象,该对象包含从主线程传递的数据,在初始化线程时(即将在index.js文件中完成)传入。该对象有一个thread_count属性,表示线程数,为4。接下来在for循环中,将20_000_000_000除以4,得到5_000_000_000。
保存并关闭你的文件,然后复制index.js文件。
- cp index.js index_four_workers.js
在你的编辑器中打开index_four_workers.js文件。
- nano index_four_workers.js
在你的index_four_workers.js文件中,添加突出显示的代码来创建一个线程实例。
...
const app = express();
const port = process.env.PORT || 3000;
const THREAD_COUNT = 4;
...
function createWorker() {
return new Promise(function (resolve, reject) {
const worker = new Worker("./four_workers.js", {
workerData: { thread_count: THREAD_COUNT },
});
});
}
app.get("/blocking", async (req, res) => {
...
})
...
首先,您定义THREAD_COUNT常量,其中包含您想创建的线程数目。之后,当您的服务器上有更多核心时,扩展会涉及将THREAD_COUNT的值更改为您想要使用的线程数目。
接下来,createWorker()函数创建并返回一个Promise。在Promise回调函数中,通过将四_worker.js文件的文件路径作为第一个参数传递给Worker类,初始化一个新的线程。然后,将一个对象作为第二个参数传递。接下来,将该对象分配给workerData属性,其值为另一个对象。最后,将该对象分配给thread_count属性,其值为THREAD_COUNT常量中的线程数。workerData对象是之前在workers.js文件中引用的那个对象。
为了确保承诺能够成功解决或抛出错误,请添加下面突出部分的代码行:
...
function createWorker() {
return new Promise(function (resolve, reject) {
const worker = new Worker("./four_workers.js", {
workerData: { thread_count: THREAD_COUNT },
});
worker.on("message", (data) => {
resolve(data);
});
worker.on("error", (msg) => {
reject(`An error ocurred: ${msg}`);
});
});
}
...
当工作线程向主线程发送消息时,承诺将通过返回的数据来解决。然而,如果发生错误,承诺将返回一个错误消息。
现在您已经定义了一个初始化新线程并从线程返回数据的函数,您将在app.get(“/blocking”)中使用该函数来创建新的线程。
现在您已经定义了一个初始化新线程并返回线程数据的函数,您将在app.get(“/blocking”)中使用该函数来生成新线程。
但是首先,删除以下突出显示的代码,因为您已经在 createWorker() 函数中定义了此功能。
...
app.get("/blocking", async (req, res) => {
const worker = new Worker("./worker.js");
worker.on("message", (data) => {
res.status(200).send(`result is ${data}`);
});
worker.on("error", (msg) => {
res.status(404).send(`An error ocurred: ${msg}`);
});
});
...
删除代码后,添加以下代码来初始化四个工作线程:
...
app.get("/blocking", async (req, res) => {
const workerPromises = [];
for (let i = 0; i < THREAD_COUNT; i++) {
workerPromises.push(createWorker());
}
});
...
首先,您创建了一个名为workerPromises的变量,其中包含一个空数组。接下来,您按照THREAD_COUNT的值(即4)进行相应次数的迭代。在每次迭代中,您调用createWorker()函数创建一个新的线程。然后,使用JavaScript的push方法将函数返回的promise对象推入workerPromises数组中。当循环结束时,workerPromises数组中将包含四个经过createWorker()函数调用返回的promise对象。
现在,在下面添加以下突出显示的代码,等待承诺解决并向用户返回响应。
app.get("/blocking", async (req, res) => {
const workerPromises = [];
for (let i = 0; i < THREAD_COUNT; i++) {
workerPromises.push(createWorker());
}
const thread_results = await Promise.all(workerPromises);
const total =
thread_results[0] +
thread_results[1] +
thread_results[2] +
thread_results[3];
res.status(200).send(`result is ${total}`);
});
由于workerPromises数组包含通过调用createWorker()返回的promises,所以您在Promise.all()方法前面加上await语法并调用all()方法,将workerPromises作为其参数。Promise.all()方法等待数组中的所有promises都解决。当这发生时,thread_results变量包含promises解决的值。由于计算任务被分配给了四个workers,您可以使用方括号表示法从thread_results中获取每个值,并将它们全部相加。一旦相加,您将总的值返回给页面。
现在你的完整文件应该是这样的:
const express = require("express");
const { Worker } = require("worker_threads");
const app = express();
const port = process.env.PORT || 3000;
const THREAD_COUNT = 4;
app.get("/non-blocking/", (req, res) => {
res.status(200).send("This page is non-blocking");
});
function createWorker() {
return new Promise(function (resolve, reject) {
const worker = new Worker("./four_workers.js", {
workerData: { thread_count: THREAD_COUNT },
});
worker.on("message", (data) => {
resolve(data);
});
worker.on("error", (msg) => {
reject(`An error ocurred: ${msg}`);
});
});
}
app.get("/blocking", async (req, res) => {
const workerPromises = [];
for (let i = 0; i < THREAD_COUNT; i++) {
workerPromises.push(createWorker());
}
const thread_results = await Promise.all(workerPromises);
const total =
thread_results[0] +
thread_results[1] +
thread_results[2] +
thread_results[3];
res.status(200).send(`result is ${total}`);
});
app.listen(port, () => {
console.log(`App listening on port ${port}`);
});
保存并关闭您的文件。在运行此文件之前,请先运行index.js来测量其响应时间。
- node index.js
接下来,在你的本地电脑上打开一个新的终端,并输入以下curl命令来测量从/blocking路由接收到响应所需的时间:
- time curl –get http://localhost:3000/blocking
时间命令用来测量curl命令的运行时间。curl命令向给定的URL发送HTTP请求,–get选项告诉curl进行GET请求。
当命令运行时,你的输出将类似于这样:
real 0m28.882s user 0m0.018s sys 0m0.000s
高亮显示的输出显示,获取响应大约需要28秒,可能因您的计算机而异。
接下来,使用CTRL+C停止服务器,并运行index_four_workers.js文件。
- node index_four_workers.js
再次在你的第二个终端访问“/blocking”路由。
- time curl –get http://localhost:3000/blocking
你会看到与以下相符的输出。
real 0m8.491s user 0m0.011s sys 0m0.005s
输出结果显示,大约需要8秒,这意味着您将加载时间缩短了近70%。
你成功地通过使用四个工作线程来优化了CPU密集型任务。如果你有一台具有超过四个核心的机器,请将THREAD_COUNT更新为该数字,这样你将进一步减少加载时间。
结论
在这篇文章中,你用 Node 应用程序构建了一个会阻塞主线程的 CPU 密集型任务。接着,你尝试使用 promises 使任务变为非阻塞,但未成功。然后,你使用了 worker_threads 模块将 CPU 密集型任务转移到另一个线程中,使其变为非阻塞。最后,你使用 worker_threads 模块创建了四个线程来加速 CPU 密集型任务。
作为下一步,您可以查看Node.js的Worker线程文档,了解更多选项。此外,您还可以查看piscina库,它允许您为CPU密集型任务创建一个工作线程池。如果您想继续学习Node.js,请参阅教程系列《如何在Node.js中编码》。