
Node.js 多线程 —— worker_threads 初体验
JavaScript 是一个具有「单线程」特性的「简单」语言,适合完成一些「简单的任务」,比如验证表单、当访客离开页面时改变页面标题和 favicon、或者渲染整个页面、双向绑定与视图更新(等等,这听起来可不像是什么「简单的任务」)。
支持多线程的后端语言常常具有专门的机制在进程之间同步数据,而 Node.js 决定添加多线程支持,就需要一些变通方案了。
Node.js 是「单线程」的吗?
在讨论如何编写 Node.js 多线程应用之前,先谈谈 Node.js 本身是不是「单线程」的。答案:是,也可以说不是。
打开你的终端,输入 node
然后回车,进入 Node.js 的 REPL 模式。接着打开你的「任务管理器」或者「活动监视器」或者 top
(取决于你正在使用的系统),你会发现 node
的进程数可不是 1。
Node.js 启动后的进程数不是 1 甚至远大于 1,不是因为进程池,而是因为 V8 实例是多线程的:
- 主线程:获取、编译、执行代码
- 编译线程:当主线程在执行时,编译线程可以优化代码
- Profiler 线程:记录方法耗时的线程
- 其它线程:比如支持并行 GC 的多线程
Node.js:Event Loop 和 Worker Pool
众所周知,JavaScript 使用 Event Loop 处理主线程。对于 Node.js,耗时操作如 crypto
和 fs
(前者 CPU 密集,后者 I/O 密集)在 Worker Pool 中进行。Worker Pool 是 libuv 实现的,其执行模型是单独创建一个进程,在这个进程中同步执行任务,然后将结果返回到 Event Loop 中,Event Loop 可以通过 callback 获取并使用结果。基于这个执行模式我们可以写出这样的代码:
const { readFile } = require('fs');
fs.readFile('path/to/something', (err, content) => {
if (err) console.error(err);
console.log(content);
});
以上是一个非阻塞代码的示例,我们不必同步等待某事的发生。只需告诉 Worker Pool 去读取文件,并用结果去调用提供的函数即可。由于 Worker Pool 也有自己的线程,因此 Event Loop 不会被阻塞。
在需要对数据进行复杂计算的时候(比如 AI、机器学习、大数据、或者 OI-Wiki 的 MathJax 公式渲染),主线程(同时也是唯一的线程)会被阻塞以至于不能执行其他任务,因此 Node.js 一般不适合用于执行耗时函数。
如果想要了解更多,你应该看看 Node.js 官网上的这篇文章:「Don't Block the Event Loop (or the Worker Pool)」
好耶,是 Worker Threads
从 Node.js 10.5.0 开始,Node.js 提出了真正意义的多线程支持。
让我们新建一个 Worker,然后看看 worker_threads
可以拿来做什么:
const { Worker } = require('worker_threads');
const worker = new Worker(path, { data });
worker.on('message', msg => { /* ... */ });
worker.on('error', err => { /* ... */ });
worker.on('exit', exitcode => { /* ... */ });
worker.on('online', () => { /* ... */ });
通过创建一个 Worker
类的实例,我们获得了一个 Worker。Worker
实例的第一个参数 path
是 Workers 的路径,第二个参数 data
是这个 Worker 在启动时可以读到的数据。正如你所见,Worker 之间的通信基于事件,因此我们为事件设置了 Listener 和回调。使用回调是因为 Worker 可以发送不止一个 message
。当然,如果你的 Worker 只会发送一个 message,那么用 Promise.resolve
也是可以的。
Worker 支持监听四种事件:
- message:Worker 将数据发送到主线程时就会触发
message
事件 - error:Worker 中任何没有被 catch 的 Error 都会触发这一事件,同时 Worker 会被中止
- exit:Worker 退出时触发。在 Worker 中
process.exit()
得到的 exitCode 是 0,worker.terminate()
得到的 exitCode 是 1 - online:当 Worker 解析完毕开始执行时触发
进程间通信
如果需要将数据发送到另一个线程,可以使用 postMessage
方法:
port.postMessage(data[, transferList]);
第一个参数是一个被「The structured clone algorithm」复制到另一个线程的对象。一般的,这部分内存会被主线程和 Worker 共用。
一般最常见的用法是将 Worker 的数据发送到主线程:
// worker.js
const { parentPort } = require('worker_threads');
parentPort.postMessage(data);
同样,通过 parentPort
也可以监听从主线程发过来的信息:
// worker.js
const { parentPort } = require('worker_threads');
parentPort.on('message', data => { /* ... */ });
如果要在 Worker 中使用主线程中 new Worker
时第二个参数传入的数据,应该使用 workerData
属性:
// worker.js
const { workerData } = require('worker_threads');
关于
worker_threads
的其它属性、如何修改线程间的共用内存,以及如何在线程之间使用 MessageChanel 通信,请参考 Node.js Worker Threads 的相关文档。
实现一个 Worker Pool
需要注意的是,创建、执行、销毁一个 Worker 的开销是很大的。频繁创建 Worker 消耗的 CPU 算力很快就会抵消多线程带来的好处,越来越多的监听器甚至可能会导致 OOM。所以为每一个任务创建一个 Workers 是很不现实的,在实践中应该实现一个 Worker Pool,在初始化时将会创建 有限数量 的 Worker 并加载单一的 worker.js
,主线程通过进程间通信的方法将要执行的任务传给 Worker,而 Worker 也通过进程间通信的方法将任务的结果回传给主线程,当所有任务完成后,这些 Worker 将会被统一销毁。
以上是一个简单的 Worker Pool 的使用逻辑,你可以查看 我在 Hexo 中提出的 Worker Thread 提案 查看设计思路和实例代码。是的 Hexo 用户们,你们朝思暮想的 Hexo 多核渲染终于要来了!
现在,让我们开始编写一个 WorkerPool
类,初始化时它应该做这些事情:
- 创建指定数量的 Workers
- 创建一个任务队列
- 创建一个 Workers 索引,以及用于追踪他们是否处于激活状态的索引
// 获取当前设备的 CPU 线程数目,作为 numberOfThreads 的默认值。
const { length: cpusLength } = require('os').cpus();
class WorkerPool = {
constructor(workerPath, numberOfThreads = cpusLength) {
if (numberOfThreads < 1) {
throw new Error('Number of threads should be greater or equal than 1!');
}
this.workerPath = workerPath;
this.numberOfThreads = numberOfThreads;
// 任务队列
this._queue = [];
// Worker 索引
this._workersById = {};
// Worker 激活状态索引
this._activeWorkersById = {};
// 创建 Workers
for (let i = 0; i < this.numberOfThreads; i++) {
const worker = new Worker(workerPath);
this._workersById[i] = worker;
// 将这些 Worker 设置为未激活状态
this._activeWorkersById[i] = false;
}
}
}
在添加 Worker 执行之前,我们还需要做两件事情,检查是否有空闲的 Worker 用于执行任务、以及 Worker 本身的执行。
首先是检查空闲的 Worker,这个并不难:
getInactiveWorkerId() {
for (let i = 0; i < this.numberOfThreads; i++) {
if (!this._activeWorkersById[i]) return i;
}
return -1;
}
接下来是调用 Worker 执行,目的是在指定的 Worker 里执行指定的任务:
runWorker(workerId, taskObj) {
const worker = this._workersById[workerId];
// 当任务执行完毕后执行
const doAfterTaskIsFinished = () => {
// 去除所有的 Listener,不然一次次添加不同的 Listener 会 OOM 的
worker.removeAllListeners('message');
worker.removeAllListeners('error');
// 将这个 Worker 设为未激活状态
this._activeWorkersById[workerId] = false;
if (this._queue.length) {
// 任务队列非空,使用该 Worker 执行任务队列中第一个任务
this.runWorker(workerId, this._queue.shift());
}
};
// 将这个 Worker 设置为激活状态
this._activeWorkersById[workerId] = true;
// 设置两个回调,用于 Worker 的监听器
const messageCallback = result => {
taskObj.cb(null, result);
doAfterTaskIsFinished();
};
const errorCallback = error => {
taskObj.cb(error);
doAfterTaskIsFinished();
};
// 为 Worker 添加 'message' 和 'error' 两个 Listener
worker.once('message', messageCallback);
worker.once('error', errorCallback);
// 将数据传给 Worker 供其获取和执行
worker.postMessage(taskObj.data);
}
有了这两个方法,我们就可以实现 run()
方法了。你应该通过上述代码看出 taskObj
的结构了,如果没有,那么接下来的代码也会告诉你答案。
run(data) {
// Promise 是个好东西
return new Promise((resolve, reject) => {
// 调用 getInactiveWorkerId() 获取一个空闲的 Worker
const availableWorkerId = this.getInactiveWorkerId();
const taskObj = {
data,
cb: (error, result) => {
// 虽然 Workers 需要使用 Listener 和 Callback,但这不能阻止我们使用 Promise,对吧?
// 不,你不能 util.promisify(taskObj) 。人不能,至少不应该。
if (error) reject(error);
return resolve(result);
}
};
if (availableWorkerId === -1) {
// 当前没有空闲的 Workers 了,把任务丢进队列里,这样一旦有 Workers 空闲时就会开始执行。
this._queue.push(taskObj);
return null;
}
// 有一个空闲的 Worker,用它执行任务
this.runWorker(availableWorkerId, taskObj);
})
}
剩下的就是 destroy
方法了,调用 worker.terminate()
销毁所有创建的 Worker:
destroy(force = false) {
for (let i = 0; i < this.numberOfThreads; i++) {
if (this._activeWorkersById[i] && !force) {
// 通常情况下,不应该在还有 Worker 在执行的时候就销毁它,这一定是什么地方出了问题,所以还是抛个 Error 比较好
// 不过保留一个 force 参数,总有人用得到的
throw new Error(`The worker ${i} is still runing!`);
}
// 销毁这个 Worker
this._workersById[i].terminate();
}
}
}
如果你想看完整的代码的话,请参考 我给 hexo-util
编写的 WorkerPool
以及相关的文档。
实践:使用 Worker Threads 加速 OI-Wiki 的 MathJax 渲染
上述的 WorkerPool 实现适用于以下的执行模式:
我们拿到一个独立的 Input => 交给 Worker 进行 CPU 密集计算 => Worker 吐出来一个独立的 Output
虽然看过了 Worker Pool 的实现,你可能还是不知道这个 Worker Pool 应该怎么用(你甚至还不知道 worker.js
该怎么写)。所以接下来让我们看一个 Worker Pool 的实践。
OI-Wiki 是 OI 竞赛的知识整合站点,其中大部分数学公式依赖 MathJax 进行预渲染。过去他们使用 shell 和 find
遍历 mkdocs 生成的 300 多个 HTML 文件、为每个 HTML 文件起一个 Node.js Shell 进行渲染,完全渲染一次的耗时将近 550 秒。
首先让我们把整个过程拆开,看看有什么可以放在 Worker 中执行:
遍历生成的 HTML 文件 => 读取每一个 HTML 文件 => 渲染 MathJax => 将渲染的结果覆写到 HTML 文件
虽然没有明显的 Input 和 Output,但是我们可以把「读取、渲染、写入」看成一个独立的任务,因为遍历到的每一个 HTML 文件都需要执行这一任务。所以,我们的 worker.js
可以这么写:
// worker.js
const { isMainThread, parentPort } = require('worker_threads');
if (isMainThread) throw new Error('嘿!你为什么在主线程跑这个玩意!!??');
const { readFile, writeFile } = require('fs').promises;
const { mjpage } = require('mathjax-node-page');
// 因为 mathjax-node-page 是一个基于回调的同步函数,我们得把它 promisify 一下
const { promisify } = require('util');
// mjpage 方法的回调中,result 是第一个参数,而不存在 error 参数,因此我们不得不写一个 Custom Promisify
mjpage[promisify.custom] = input => {
return new Promise((resolve, reject) => {
try {
mjpage(
// mjpage 参数 start
input,
{ format: ["TeX"] },
{ svg: true, ex: 8, cjkCharWidth: 18, linebreaks: true },
// mjpage 参数 end
resolve // 将 Promise.resolve 作为原始函数的回调
);
} catch (e) {
reject(e);
}
});
};
const mathJaxRenderer = promisify(mjpage);
async function renderer(filename) {
// 从参数中获取文件名并读取文件内容
const content = await readFile(filename);
// 预处理读到的文件内容
const preProcesed = content
.replace(/<span class="MathJax_Preview">.+?<\/span><script type="math\/tex">/gi, '<script type="math/tex">')
.replace(/<div class="MathJax_Preview">[\s\S]*?<\/div>/gi, '');
try {
// MathJax 渲染
result = await mathJaxRenderer(preProcesed);
} catch (e) {
console.error(`${filename} rendered failed, detailed error see below:`);
console.error(e);
}
if (result) {
console.log(`${filename} rendered finished.`);
// 将结果写入文件
return writeFile(filename, result);
}
return;
}
// 以上是主函数定义,接下来是 Worker 相关
// 从主线程获取数据,传入的数据为需要渲染的文件名
parentPort.on('message', async filename => {
await renderer(filename);
// 渲染完成后,向主线程发送「Done」
// 虽然最终结果被写入文件,不需要返回,但是 WorkerPool 仍然需要一个 Worker 执行完毕的信号
parentPort.postMessage('Done');
});
接下来是主线程的代码了:
// render_mathjax.js
const { WorkerPool } = require('path/to/workerpool');
// bluebird 的 Promise 不仅特别快,还特别实用
const Promise = require('bluebird');
const { join, dirname } = require('path');
// hexo-fs 是个好东西,包含了很多实用的 fs API 封装,比如递归遍历目录的方法 listDir
const { listDir } = require('hexo-fs');
// mkdocs 生成的 HTML 目录
const distDir = join(dirname(__dirname) + '/site');
// worker.js 路径
const workerPath = join(__dirname + '/worker.js');
// 初始化一个 Worker Pool
const pool = new WorkerPool(workerPath);
// 使用 Promise.all 确保当所有任务都 fullfill 后再执行 then。
Promise.all(listDir(distDir).map(async item => {
// 遍历目录下每一个文件,如果文件的后缀是 .html 则新增一个任务
if (item.endsWith('.html')) {
const filename = join(distDir, item);
// 将 HTML 的绝对路径作为任务 data 传给 Worker
await pool.run(filename);
}
})).then(() => {
// 所有的 HTML 都渲染完了,这时候可以将 WorkerPool 销毁了
pool.destroy();
});
在使用时,在 shell 中执行下述命令即可:
node ./render_mathjax.js
使用 Node.js 的 Worker Threads 重写以后,由于任务并行和多核心利用,在双核的 Travis CI 上 OI Wiki 的 MathJax 渲染用时从 550 秒减少到了 160 秒,减少了 70%。
你可以在 OI-wiki/OI-wiki#2288 中看到相关的代码改动。