Node.js 多线程 —— worker_threads 初体验

Node.js 多线程 —— worker_threads 初体验

技术向约 3.3 千字

JavaScript 是一个具有「单线程」特性的「简单」语言,适合完成一些「简单的任务」,比如验证表单、当访客离开页面时改变页面标题和 favicon、或者渲染整个页面、双向绑定与视图更新(等等,这听起来可不像是什么「简单的任务」)。

支持多线程的后端语言常常具有专门的机制在进程之间同步数据,而 Node.js 决定添加多线程支持,就需要一些变通方案了。

Node.js 是「单线程」的吗?

在讨论如何编写 Node.js 多线程应用之前,先谈谈 Node.js 本身是不是「单线程」的。答案:是,也可以说不是。

打开你的终端,输入 node 然后回车,进入 Node.js 的 REPL 模式。接着打开你的「任务管理器」或者「活动监视器」或者 top (取决于你正在使用的系统),你会发现 node 的进程数可不是 1。

nodejs_thread

Node.js 启动后的进程数不是 1 甚至远大于 1,不是因为进程池,而是因为 V8 实例是多线程的:

  • 主线程:获取、编译、执行代码
  • 编译线程:当主线程在执行时,编译线程可以优化代码
  • Profiler 线程:记录方法耗时的线程
  • 其它线程:比如支持并行 GC 的多线程

Node.js:Event Loop 和 Worker Pool

众所周知,JavaScript 使用 Event Loop 处理主线程。对于 Node.js,耗时操作如 cryptofs(前者 CPU 密集,后者 I/O 密集)在 Worker Pool 中进行。Worker Pool 是 libuv 实现的,其执行模型是单独创建一个进程,在这个进程中同步执行任务,然后将结果返回到 Event Loop 中,Event Loop 可以通过 callback 获取并使用结果。基于这个执行模式我们可以写出这样的代码:

const { readFile } = require('fs');

fs.readFile('path/to/something', (err, content) => {
  if (err) console.error(err);
  console.log(content);
});

以上是一个非阻塞代码的示例,我们不必同步等待某事的发生。只需告诉 Worker Pool 去读取文件,并用结果去调用提供的函数即可。由于 Worker Pool 也有自己的线程,因此 Event Loop 不会被阻塞。

在需要对数据进行复杂计算的时候(比如 AI、机器学习、大数据、或者 OI-Wiki 的 MathJax 公式渲染),主线程(同时也是唯一的线程)会被阻塞以至于不能执行其他任务,因此 Node.js 一般不适合用于执行耗时函数。

如果想要了解更多,你应该看看 Node.js 官网上的这篇文章:「Don't Block the Event Loop (or the Worker Pool)

好耶,是 Worker Threads

从 Node.js 10.5.0 开始,Node.js 提出了真正意义的多线程支持。

让我们新建一个 Worker,然后看看 worker_threads 可以拿来做什么:

const { Worker } = require('worker_threads');

const worker = new Worker(path, { data });
worker.on('message', msg => { /* ... */ });
worker.on('error', err => { /* ... */ });
worker.on('exit', exitcode => { /* ... */ });
worker.on('online', () => { /* ... */ });

通过创建一个 Worker 类的实例,我们获得了一个 Worker。Worker 实例的第一个参数 path 是 Workers 的路径,第二个参数 data 是这个 Worker 在启动时可以读到的数据。正如你所见,Worker 之间的通信基于事件,因此我们为事件设置了 Listener 和回调。使用回调是因为 Worker 可以发送不止一个 message。当然,如果你的 Worker 只会发送一个 message,那么用 Promise.resolve 也是可以的。

Worker 支持监听四种事件:

  • message:Worker 将数据发送到主线程时就会触发 message 事件
  • error:Worker 中任何没有被 catch 的 Error 都会触发这一事件,同时 Worker 会被中止
  • exit:Worker 退出时触发。在 Worker 中 process.exit() 得到的 exitCode 是 0,worker.terminate() 得到的 exitCode 是 1
  • online:当 Worker 解析完毕开始执行时触发

进程间通信

如果需要将数据发送到另一个线程,可以使用 postMessage 方法:

port.postMessage(data[, transferList]);

第一个参数是一个被「The structured clone algorithm」复制到另一个线程的对象。一般的,这部分内存会被主线程和 Worker 共用。

一般最常见的用法是将 Worker 的数据发送到主线程:

// worker.js
const { parentPort } = require('worker_threads');
parentPort.postMessage(data);

同样,通过 parentPort 也可以监听从主线程发过来的信息:

// worker.js
const { parentPort } = require('worker_threads');
parentPort.on('message', data => { /* ... */ });

如果要在 Worker 中使用主线程中 new Worker 时第二个参数传入的数据,应该使用 workerData 属性:

// worker.js
const { workerData } = require('worker_threads');

关于 worker_threads 的其它属性、如何修改线程间的共用内存,以及如何在线程之间使用 MessageChanel 通信,请参考 Node.js Worker Threads 的相关文档

实现一个 Worker Pool

需要注意的是,创建、执行、销毁一个 Worker 的开销是很大的。频繁创建 Worker 消耗的 CPU 算力很快就会抵消多线程带来的好处,越来越多的监听器甚至可能会导致 OOM。所以为每一个任务创建一个 Workers 是很不现实的,在实践中应该实现一个 Worker Pool,在初始化时将会创建 有限数量 的 Worker 并加载单一的 worker.js,主线程通过进程间通信的方法将要执行的任务传给 Worker,而 Worker 也通过进程间通信的方法将任务的结果回传给主线程,当所有任务完成后,这些 Worker 将会被统一销毁。

以上是一个简单的 Worker Pool 的使用逻辑,你可以查看 我在 Hexo 中提出的 Worker Thread 提案 查看设计思路和实例代码。是的 Hexo 用户们,你们朝思暮想的 Hexo 多核渲染终于要来了!

现在,让我们开始编写一个 WorkerPool 类,初始化时它应该做这些事情:

  • 创建指定数量的 Workers
  • 创建一个任务队列
  • 创建一个 Workers 索引,以及用于追踪他们是否处于激活状态的索引
// 获取当前设备的 CPU 线程数目,作为 numberOfThreads 的默认值。
const { length: cpusLength } = require('os').cpus();

class WorkerPool = {
  constructor(workerPath, numberOfThreads = cpusLength) {
    if (numberOfThreads < 1) {
      throw new Error('Number of threads should be greater or equal than 1!');
    }

    this.workerPath = workerPath;
    this.numberOfThreads = numberOfThreads;

    // 任务队列
    this._queue = [];
    // Worker 索引
    this._workersById = {};
    // Worker 激活状态索引
    this._activeWorkersById = {};

    // 创建 Workers
    for (let i = 0; i < this.numberOfThreads; i++) {
      const worker = new Worker(workerPath);

      this._workersById[i] = worker;
      // 将这些 Worker 设置为未激活状态
      this._activeWorkersById[i] = false;
    }
  }
}

在添加 Worker 执行之前,我们还需要做两件事情,检查是否有空闲的 Worker 用于执行任务、以及 Worker 本身的执行。

首先是检查空闲的 Worker,这个并不难:

getInactiveWorkerId() {
  for (let i = 0; i < this.numberOfThreads; i++) {
    if (!this._activeWorkersById[i]) return i;
  }
  return -1;
}

接下来是调用 Worker 执行,目的是在指定的 Worker 里执行指定的任务:

runWorker(workerId, taskObj) {
  const worker = this._workersById[workerId];

  // 当任务执行完毕后执行
  const doAfterTaskIsFinished = () => {
    // 去除所有的 Listener,不然一次次添加不同的 Listener 会 OOM 的
    worker.removeAllListeners('message');
    worker.removeAllListeners('error');
    // 将这个 Worker 设为未激活状态
    this._activeWorkersById[workerId] = false;

    if (this._queue.length) {
      // 任务队列非空,使用该 Worker 执行任务队列中第一个任务
      this.runWorker(workerId, this._queue.shift());
    }
  };

  // 将这个 Worker 设置为激活状态
  this._activeWorkersById[workerId] = true;
  // 设置两个回调,用于 Worker 的监听器
  const messageCallback = result => {
    taskObj.cb(null, result);
    doAfterTaskIsFinished();
  };
  const errorCallback = error => {
    taskObj.cb(error);
    doAfterTaskIsFinished();
  };

  // 为 Worker 添加 'message' 和 'error' 两个 Listener
  worker.once('message', messageCallback);
  worker.once('error', errorCallback);
  // 将数据传给 Worker 供其获取和执行
  worker.postMessage(taskObj.data);
}

有了这两个方法,我们就可以实现 run() 方法了。你应该通过上述代码看出 taskObj 的结构了,如果没有,那么接下来的代码也会告诉你答案。

run(data) {
  // Promise 是个好东西
  return new Promise((resolve, reject) => {
    // 调用 getInactiveWorkerId() 获取一个空闲的 Worker
    const availableWorkerId = this.getInactiveWorkerId();

    const taskObj = {
      data,
      cb: (error, result) => {
        // 虽然 Workers 需要使用 Listener 和 Callback,但这不能阻止我们使用 Promise,对吧?
        // 不,你不能 util.promisify(taskObj) 。人不能,至少不应该。
        if (error) reject(error);
        return resolve(result);
      }
    };
    
    if (availableWorkerId === -1) {
      // 当前没有空闲的 Workers 了,把任务丢进队列里,这样一旦有 Workers 空闲时就会开始执行。
      this._queue.push(taskObj);
      return null;
    }

    // 有一个空闲的 Worker,用它执行任务
    this.runWorker(availableWorkerId, taskObj);
  })
}

剩下的就是 destroy 方法了,调用 worker.terminate() 销毁所有创建的 Worker:

destroy(force = false) {
    for (let i = 0; i < this.numberOfThreads; i++) {
      if (this._activeWorkersById[i] && !force) {
        // 通常情况下,不应该在还有 Worker 在执行的时候就销毁它,这一定是什么地方出了问题,所以还是抛个 Error 比较好
        // 不过保留一个 force 参数,总有人用得到的
        throw new Error(`The worker ${i} is still runing!`);
      }

      // 销毁这个 Worker
      this._workersById[i].terminate();
    }
  }
}

如果你想看完整的代码的话,请参考 我给 hexo-util 编写的 WorkerPool 以及相关的文档。

实践:使用 Worker Threads 加速 OI-Wiki 的 MathJax 渲染

上述的 WorkerPool 实现适用于以下的执行模式:

我们拿到一个独立的 Input => 交给 Worker 进行 CPU 密集计算 => Worker 吐出来一个独立的 Output

虽然看过了 Worker Pool 的实现,你可能还是不知道这个 Worker Pool 应该怎么用(你甚至还不知道 worker.js 该怎么写)。所以接下来让我们看一个 Worker Pool 的实践。

OI-Wiki 是 OI 竞赛的知识整合站点,其中大部分数学公式依赖 MathJax 进行预渲染。过去他们使用 shell 和 find 遍历 mkdocs 生成的 300 多个 HTML 文件、为每个 HTML 文件起一个 Node.js Shell 进行渲染,完全渲染一次的耗时将近 550 秒。

首先让我们把整个过程拆开,看看有什么可以放在 Worker 中执行:

遍历生成的 HTML 文件 => 读取每一个 HTML 文件 => 渲染 MathJax => 将渲染的结果覆写到 HTML 文件

虽然没有明显的 Input 和 Output,但是我们可以把「读取、渲染、写入」看成一个独立的任务,因为遍历到的每一个 HTML 文件都需要执行这一任务。所以,我们的 worker.js 可以这么写:

// worker.js
const { isMainThread, parentPort } = require('worker_threads');

if (isMainThread) throw new Error('嘿!你为什么在主线程跑这个玩意!!??');

const { readFile, writeFile } = require('fs').promises;
const { mjpage } = require('mathjax-node-page');
// 因为 mathjax-node-page 是一个基于回调的同步函数,我们得把它 promisify 一下
const { promisify } = require('util');
// mjpage 方法的回调中,result 是第一个参数,而不存在 error 参数,因此我们不得不写一个 Custom Promisify
mjpage[promisify.custom] = input => {
  return new Promise((resolve, reject) => {
    try {
      mjpage(
        // mjpage 参数 start
        input,
        { format: ["TeX"] },
        { svg: true, ex: 8, cjkCharWidth: 18, linebreaks: true },
        // mjpage 参数 end
        resolve // 将 Promise.resolve 作为原始函数的回调
      );
    } catch (e) {
      reject(e);
    }
  });
};
const mathJaxRenderer = promisify(mjpage);

async function renderer(filename) {
  // 从参数中获取文件名并读取文件内容
  const content = await readFile(filename);
  // 预处理读到的文件内容
  const preProcesed = content
    .replace(/<span class="MathJax_Preview">.+?<\/span><script type="math\/tex">/gi, '<script type="math/tex">')
    .replace(/<div class="MathJax_Preview">[\s\S]*?<\/div>/gi, '');

  try {
    // MathJax 渲染
    result = await mathJaxRenderer(preProcesed);
  } catch (e) {
    console.error(`${filename} rendered failed, detailed error see below:`);
    console.error(e);
  }

  if (result) {
    console.log(`${filename} rendered finished.`);
    // 将结果写入文件
    return writeFile(filename, result);
  }

  return;
}

// 以上是主函数定义,接下来是 Worker 相关
// 从主线程获取数据,传入的数据为需要渲染的文件名
parentPort.on('message', async filename => {
  await renderer(filename);
  // 渲染完成后,向主线程发送「Done」
  // 虽然最终结果被写入文件,不需要返回,但是 WorkerPool 仍然需要一个 Worker 执行完毕的信号
  parentPort.postMessage('Done');
});

接下来是主线程的代码了:

// render_mathjax.js
const { WorkerPool } = require('path/to/workerpool');
// bluebird 的 Promise 不仅特别快,还特别实用
const Promise = require('bluebird');
const { join, dirname } = require('path');
// hexo-fs 是个好东西,包含了很多实用的 fs API 封装,比如递归遍历目录的方法 listDir
const { listDir } = require('hexo-fs');
// mkdocs 生成的 HTML 目录
const distDir = join(dirname(__dirname) + '/site');
// worker.js 路径
const workerPath = join(__dirname + '/worker.js');
// 初始化一个 Worker Pool
const pool = new WorkerPool(workerPath);

// 使用 Promise.all 确保当所有任务都 fullfill 后再执行 then。
Promise.all(listDir(distDir).map(async item => {
  // 遍历目录下每一个文件,如果文件的后缀是 .html 则新增一个任务
  if (item.endsWith('.html')) {
    const filename = join(distDir, item);
    // 将 HTML 的绝对路径作为任务 data 传给 Worker
    await pool.run(filename);
  }
})).then(() => {
  // 所有的 HTML 都渲染完了,这时候可以将 WorkerPool 销毁了
  pool.destroy();
});

在使用时,在 shell 中执行下述命令即可:

node ./render_mathjax.js

使用 Node.js 的 Worker Threads 重写以后,由于任务并行和多核心利用,在双核的 Travis CI 上 OI Wiki 的 MathJax 渲染用时从 550 秒减少到了 160 秒,减少了 70%。

你可以在 OI-wiki/OI-wiki#2288 中看到相关的代码改动。

Node.js 多线程 —— worker_threads 初体验
本文作者
Sukka
发布于
2020-06-12
许可协议
转载或引用本文时请遵守许可协议,注明出处、不得用于商业用途!
如果你喜欢我的文章,或者我的文章有帮到你,可以考虑一下打赏作者
评论加载中...