Worker Thread
Last updated
Was this helpful?
Last updated
Was this helpful?
Was this helpful?
大部分 API 均與前端 web worker 相同,用來有效處理 CPU bound 的資料。
1.多個 worker 不能同時存取主程式相同變數,所以不會有 race condition 問題,透過 postMessage
讓 worker 傳訊息給主程式。
2.假設是通過 postMessage 傳遞 Object ,建議先 JSON.stringify 然後 parse,速度會較快。
app.js
const { Worker } = require('worker_threads');
const path = require('path');
const worker1 = new Worker(path.resolve('./worker.js'));
worker1.on('message', (message) => {
console.log(`Main thread got message: ${message}`);
});
worker1.postMessage('ping');
worker.js
const { parentPort } = require('worker_threads');
parentPort.on('message', (message) => {
console.log(`worker got message: ${message}`);
parentPort.postMessage('pong');
});
之後執行 node app.js
worker got message: ping
Main thread got message: pong
讓所有 worker 都執行完畢後再繼續程式
const crypto = require("crypto");
const sha256 = (s) => crypto.createHash("sha256").update(s).digest();
const totalDataLength = 2000000;
console.time('single thread');
Array.from(Array(totalDataLength)).map(() => sha256(Math.random().toString()));
console.timeEnd('single thread');
耗時大約 10s
把原本的 2000000 個部分,分給四個 thread 計算。
app.js
const { Worker } = require("worker_threads");
const path = require("path");
console.time('thread')
let workerPool = [];
const threadCount = 4;
const totalDataLength = 2000000;
for (let i = 0; i < threadCount; i++) {
const workerInstance = new Promise((resolve, reject) => {
const worker = new Worker(path.resolve("./worker.js"));
worker.on("message", ({ data }) => {
resolve(data);
});
worker.postMessage({
arrayLength: totalDataLength / threadCount,
});
});
workerPool.push(workerInstance);
}
Promise.all(workerPool)
.then((values) => {
console.log(values);
console.timeEnd('thread')
})
.catch((err) => {
console.log(err);
});
worker.js
const { parentPort } = require("worker_threads");
const crypto = require("crypto");
const sha256 = (s) => crypto.createHash("sha256").update(s).digest();
parentPort.on("message", ({ arrayLength }) => {
const shaArray = Array.from(Array(arrayLength)).map((num) => sha256(String(num)));
parentPort.postMessage({
data: shaArray,
});
});
耗時大約 5s
(async () => {
const { job, start, stop } = require("microjob");
try {
// start the worker pool
console.time("microjob");
await start();
// this function will be executed in another thread
const res = await job(() => {
let i = 0;
const result = [];
const threadCount = 4;
const totalDataLength = 2000000;
for (i = 0; i < threadCount; i++) {
// heavy CPU load ...
const crypto = require("crypto");
const sha256 = (s) => crypto.createHash("sha256").update(s).digest();
const shaArray = Array.from(Array(totalDataLength / threadCount)).map((num) =>
sha256(String(num))
);
result.push(shaArray);
}
return result;
});
console.log(res);
console.timeEnd("microjob");
} catch (err) {
console.error(err);
} finally {
// shutdown worker pool
await stop();
}
})();
發現比原本原生的更慢,可以查看issue: https://github.com/wilk/microjob/issues/65
可以用 Share memory 的方法來傳遞參數,取代 postMessage,速度會比較快。
app.js
const { Worker } = require("worker_threads");
const path = require("path");
console.time('thread')
let workerPool = [];
const threadCount = 4;
const totalDataLength = 2000000;
const shareMemory = Array(threadCount).fill(new SharedArrayBuffer(totalDataLength / threadCount))
for (let i = 0; i < threadCount; i++) {
const workerInstance = new Promise((resolve, reject) => {
const worker = new Worker(path.resolve("./worker.js"));
worker.on("message", () => {
resolve();
});
worker.postMessage({
shareMemory: shareMemory[i],
arrayLength: totalDataLength / threadCount,
});
});
workerPool.push(workerInstance);
}
Promise.all(workerPool)
.then(() => {
console.log(shareMemory)
console.timeEnd('thread')
})
.catch((err) => {
console.log(err);
});
worker.js
const { parentPort } = require("worker_threads");
parentPort.on("message", ({ arrayLength, shareMemory }) => {
let uint8Arr = new Uint8Array(shareMemory);
const shaArray = Array.from(Array(arrayLength)).fill(1).map((num) => Number(num) * 2);
uint8Arr.set(new Uint8Array(shaArray))
parentPort.postMessage({});
});
平均速度約 90ms
app.js
const { Worker } = require("worker_threads");
const path = require("path");
console.time('thread')
let workerPool = [];
const threadCount = 4;
const totalDataLength = 2000000;
for (let i = 0; i < threadCount; i++) {
const workerInstance = new Promise((resolve, reject) => {
const worker = new Worker(path.resolve("./worker.js"));
worker.on("message", (data) => {
resolve(data);
});
worker.postMessage({
arrayLength: totalDataLength / threadCount,
});
});
workerPool.push(workerInstance);
}
Promise.all(workerPool)
.then((data) => {
console.log(data)
console.timeEnd('thread')
})
.catch((err) => {
console.log(err);
});
worker.js
const { parentPort } = require("worker_threads");
parentPort.on("message", ({ arrayLength }) => {
const shaArray = Array.from(Array(arrayLength)).fill(1).map((num) => Number(num) * 2);
parentPort.postMessage({
data: shaArray,
});
});
可發現需要耗時 150ms 左右,約為 share memory 的兩倍。
假設我們把原本 share memory 範例的 sharedArrayBuffer 在最後拿到資料後全部轉為 array 可發現時間又變回 150ms 左右,所以大部分時先是花在資料的序列化。
const arr = Array.from(new Uint8Array(shareMemory[0]))
console.log(arr)
使用 share memory 時為了避免存取相同記憶體產生 race condition,可以使用
Atomics.store
Atomics.load
Atomics.notify
Atomics.wait
new MessageChannel()