大部分 API 均與前端 web worker 相同,用來有效處理 CPU bound 的資料。
相關特性
1.多個 worker 不能同時存取主程式相同變數,所以不會有 race condition 問題,透過 postMessage
讓 worker 傳訊息給主程式。
2.假設是通過 postMessage 傳遞 Object ,建議先 JSON.stringify 然後 parse,速度會較快。
範例
app.js
const { Worker } = require('worker_threads');
const path = require('path');
const worker1 = new Worker(path.resolve('./worker.js'));
worker1.on('message', (message) => {
console.log(`Main thread got message: ${message}`);
});
worker1.postMessage('ping');
worker.js
const { parentPort } = require('worker_threads');
parentPort.on('message', (message) => {
console.log(`worker got message: ${message}`);
parentPort.postMessage('pong');
});
之後執行 node app.js
worker got message: ping
Main thread got message: pong
Worker 搭配 Promise all
讓所有 worker 都執行完畢後再繼續程式
我們先看一下原本的 single thread 程式
const crypto = require("crypto");
const sha256 = (s) => crypto.createHash("sha256").update(s).digest();
const totalDataLength = 2000000;
console.time('single thread');
Array.from(Array(totalDataLength)).map(() => sha256(Math.random().toString()));
console.timeEnd('single thread');
耗時大約 10s
然後我們把它改成 worker_thread 版本,並搭配 Promise
把原本的 2000000 個部分,分給四個 thread 計算。
app.js
const { Worker } = require("worker_threads");
const path = require("path");
console.time('thread')
let workerPool = [];
const threadCount = 4;
const totalDataLength = 2000000;
for (let i = 0; i < threadCount; i++) {
const workerInstance = new Promise((resolve, reject) => {
const worker = new Worker(path.resolve("./worker.js"));
worker.on("message", ({ data }) => {
resolve(data);
});
worker.postMessage({
arrayLength: totalDataLength / threadCount,
});
});
workerPool.push(workerInstance);
}
Promise.all(workerPool)
.then((values) => {
console.log(values);
console.timeEnd('thread')
})
.catch((err) => {
console.log(err);
});
worker.js
const { parentPort } = require("worker_threads");
const crypto = require("crypto");
const sha256 = (s) => crypto.createHash("sha256").update(s).digest();
parentPort.on("message", ({ arrayLength }) => {
const shaArray = Array.from(Array(arrayLength)).map((num) => sha256(String(num)));
parentPort.postMessage({
data: shaArray,
});
});
耗時大約 5s
使用 microjob module
(async () => {
const { job, start, stop } = require("microjob");
try {
// start the worker pool
console.time("microjob");
await start();
// this function will be executed in another thread
const res = await job(() => {
let i = 0;
const result = [];
const threadCount = 4;
const totalDataLength = 2000000;
for (i = 0; i < threadCount; i++) {
// heavy CPU load ...
const crypto = require("crypto");
const sha256 = (s) => crypto.createHash("sha256").update(s).digest();
const shaArray = Array.from(Array(totalDataLength / threadCount)).map((num) =>
sha256(String(num))
);
result.push(shaArray);
}
return result;
});
console.log(res);
console.timeEnd("microjob");
} catch (err) {
console.error(err);
} finally {
// shutdown worker pool
await stop();
}
})();
發現比原本原生的更慢,可以查看issue: https://github.com/wilk/microjob/issues/65
Share memory
可以用 Share memory 的方法來傳遞參數,取代 postMessage,速度會比較快。
app.js
const { Worker } = require("worker_threads");
const path = require("path");
console.time('thread')
let workerPool = [];
const threadCount = 4;
const totalDataLength = 2000000;
const shareMemory = Array(threadCount).fill(new SharedArrayBuffer(totalDataLength / threadCount))
for (let i = 0; i < threadCount; i++) {
const workerInstance = new Promise((resolve, reject) => {
const worker = new Worker(path.resolve("./worker.js"));
worker.on("message", () => {
resolve();
});
worker.postMessage({
shareMemory: shareMemory[i],
arrayLength: totalDataLength / threadCount,
});
});
workerPool.push(workerInstance);
}
Promise.all(workerPool)
.then(() => {
console.log(shareMemory)
console.timeEnd('thread')
})
.catch((err) => {
console.log(err);
});
worker.js
const { parentPort } = require("worker_threads");
parentPort.on("message", ({ arrayLength, shareMemory }) => {
let uint8Arr = new Uint8Array(shareMemory);
const shaArray = Array.from(Array(arrayLength)).fill(1).map((num) => Number(num) * 2);
uint8Arr.set(new Uint8Array(shaArray))
parentPort.postMessage({});
});
平均速度約 90ms
假設換回原本 postMessage方式
app.js
const { Worker } = require("worker_threads");
const path = require("path");
console.time('thread')
let workerPool = [];
const threadCount = 4;
const totalDataLength = 2000000;
for (let i = 0; i < threadCount; i++) {
const workerInstance = new Promise((resolve, reject) => {
const worker = new Worker(path.resolve("./worker.js"));
worker.on("message", (data) => {
resolve(data);
});
worker.postMessage({
arrayLength: totalDataLength / threadCount,
});
});
workerPool.push(workerInstance);
}
Promise.all(workerPool)
.then((data) => {
console.log(data)
console.timeEnd('thread')
})
.catch((err) => {
console.log(err);
});
worker.js
const { parentPort } = require("worker_threads");
parentPort.on("message", ({ arrayLength }) => {
const shaArray = Array.from(Array(arrayLength)).fill(1).map((num) => Number(num) * 2);
parentPort.postMessage({
data: shaArray,
});
});
可發現需要耗時 150ms 左右,約為 share memory 的兩倍。
資料序列化的時間
假設我們把原本 share memory 範例的 sharedArrayBuffer 在最後拿到資料後全部轉為 array 可發現時間又變回 150ms 左右,所以大部分時先是花在資料的序列化。
const arr = Array.from(new Uint8Array(shareMemory[0]))
console.log(arr)
Atomics
使用 share memory 時為了避免存取相同記憶體產生 race condition,可以使用
Atomics.store
Atomics.load
Atomics.notify
Atomics.wait
Worker 間通訊