Web_Advance
  • 本書簡介
  • Node.js 部分
    • Node 版本管理
    • 使用NPM
      • Yarn
    • 開始Node
    • Worker Thread
    • REPL
    • TCP
    • path
    • Cluster and Child_process
    • assert (自訂拋出的錯誤)
    • Stream(流)
    • util (工具類)
    • EventEmitter
    • fs 文件操作
    • Buffer
      • Binary Diff
      • 查看 Binary 檔案內容
    • Process (進程)
    • 錯誤處理
  • OS
  • Async Hook
  • TCP
  • HTTP
    • 有關爬蟲
    • HTTP/2
    • HTTP Protocal
  • HTTPS
    • HTTPS 流程
    • SSL pinning
    • HTTPS 封包解密
    • 建立自簽發 HTTPS 證書
    • 幫網站加上HTTPS
    • HTTPS原理
  • Crypto加密
  • 有關繼承
  • JS 基本
    • JavaScript 迴圈與異步處理
  • 使用 Express
    • 上傳檔案
    • 圖片伺服器
    • 簡單範例
  • 使用 Nest.js
  • 使用MongoDB
    • 設置帳戶登入權限
    • Mongoose 框架
    • 進階Mongo
    • 基本環境操作
    • MongoDB Sharding
  • 使用MySQL
    • Schema 架構設計
    • SQL 語法
    • SQL Procedure
    • Node.js 操作 MySQL
    • 使用 Sequelize
      • DB Migration
      • function
  • 使用PostgreSQL
    • 常見問題
    • replica
    • 基本指令
    • 使用Node.js操控pg
    • SQL 基礎
  • 使用TypeORM
  • RethinkDB
  • CSS 深度探討
    • Width, Height
  • React
    • 第三方組件
      • Formik
    • styled component
    • propTypes
    • React webpack 部署
    • React util
    • 寫component並且publish
    • create-react-app
    • Context API
    • i18n
    • Server side render
    • Next.js教學
    • Higher order component 與 Recompose
    • component 間 互相存取
    • React hook
  • React router
    • 自己寫一個Router
  • Redux
    • Redux Toolkit
    • 小技巧
    • Redux sagas
    • compose
  • React Native
    • adb
    • InApp Billing
    • Icon
    • SVG
    • Firebase
      • Phone Auth
    • 自動化測試
    • Splash screen
    • Websocket
    • Googla OAuth
      • iOS
      • Android
    • Facebook OAuth
      • iOS
      • Android
    • IOS
    • 第三方組件
      • Auth Code Input
      • Country Code Picker
      • onboarding screen
      • Toast
    • ESlint
    • Push notification
    • Android 上架步驟
    • Expo
    • router
      • react-navigation套件
    • 原生組件
      • RefreshControl
      • Modal
      • Alert
      • button
      • KeyboardAvoidingView
      • Drawer
      • Image
    • 限制螢幕垂直與水平
    • NativeBase UI
    • Debug
    • 常見問題
    • Network
    • 硬體操作
      • 隱藏鍵盤
      • 地理位置
      • 相機與圖片庫存取
    • Async Storage
    • Animation
    • Admob
  • JS 模組化
  • 使用 Webpack
  • 使用 Babel
  • JWT Token
  • ES6 ES7 ES8
    • Array method
    • ES8 Async
    • ES6 Proxy
    • ES6 Object
    • ES6 Arrow function
    • ES6 Promise
    • ES6 Symbol
    • ES6 Generator
    • ES6 Set,Map
    • ES6 Class
  • 模板引擎
    • Mustache
    • Handlebars.js
    • EJS
  • ESLint
  • 部屬到OpenShift
  • OpenStack
  • OAuth
    • Twitter OAuth
    • Google authenticator
    • facebook oauth
      • facebook like ,share
    • google oauth
  • Redis
  • 做一個簡單的markdown editor
  • Websocket
    • WebSocket 相關 Protocol
  • Sublime 安裝套件
  • Google api
    • Cloud Run
    • speech api
    • place autocomplete
    • Geocode
    • Map
      • React map
    • vision api
    • Google-recaptcha
    • Google sheet
  • Instagram API
  • Markdown 與 code pretty js
  • HTML5
    • IntersectionObserver
    • HTML5 audio
    • HTML5 Video 與 WebRTC
      • WebRTC 進階
      • WebEX API
    • HTML5 IndexedDB
  • Google Cloud Platform (GCP)
    • Cloud Storage
    • Cloud storage 串接 Cloud CDN
  • Vim 編輯器
  • 使用nginx
    • config
  • Unix 實用指令
    • 新 VPS 安裝流程
    • Ubuntu 22 安裝
    • Shell Script 教學
  • Git 實用指令
    • Git hook
    • 加上 SSH-key 到 GitHub
    • GPG簽名
  • SSH 實用指令
  • 有關Fetch與axios與跨域請求
  • 圖片上傳相關
    • imgur API
  • JS 格式轉換
  • js trick
  • AWS
    • AWS EBS
    • AWS HTTPS 憑證
    • AWS Cloudfront、ELB、ACL
    • AWS Athena
    • AWS CloudWatch、SNS
    • AWS RDS
    • AWS lambda
      • 範例
      • 加上權限控管
    • AWS S3
    • AWS DynamoDB
      • 結合Lambda
    • 快速把 EC2 串上 AWS Cloudfront CDN
    • AWS 證照相關
  • 有關日期Date
  • VS code 編輯器
    • VSCode 外掛 Plugin
  • CI with Gitlab&Jenkins
  • API 測試
    • Postman
      • 設置 Postman 環境變數
    • API Blueprint
    • swagger
      • 註解寫在Code內生成swagger UI
  • Javascript 實用Lib
  • 遠端寫程式
  • Quicktime錄影注意事項
  • Web開發進階Bug
  • Web壓力測試
  • LineBot
  • PM2部署
  • i18n
  • VPN
  • Protocol Buffers
  • Docker教學
    • LXC LXD
    • Docker Compose
    • Docker 原理
    • Docker 指令
  • E2E Testing
    • Cypress
    • PlayWright
    • Puppeteer 與其他 UI 測試工具
  • Unit Test (Jest & enzyme)
    • React Testing Library
    • mocha
  • Cassandra
    • cluster
  • Distribute Web
    • Dat project
    • IPFS project
  • Cluster and Child_process
  • 打包應用程式
  • Java
    • 使用gradle結合docker
  • Debug 頁面
  • Proxy
  • Chrome extension
  • 消息系統
    • RabbitMQ
  • 金流串接
    • Paypal
    • spgateway智富通
    • Stripe 串接
  • 有關Log
  • 設定 feature flag
  • Azure
    • Face API
    • Image Analyze API
    • Azure Serverless
    • Cosmos DB
      • 使用 SDK
      • 以 RESTful 操作 DB
      • 一致性策略與 DB replicate
  • NodeBB 筆記
  • 瀏覽器快取與緩存(Etag, If-None-Match)
  • 瀏覽器快取與緩存(Expires, Last-modified, Cache-Control)
  • Node.js 第三方模組
    • OpenCV
  • Kubernetes
    • 本地測試 MiniKube
  • Ngrok 使用
  • Telegram MiniAPP 開發
  • Firebase 教學
  • 演算法筆記
  • 圖表
    • Echart
    • TradingView 圖表
    • D3
    • 熱力圖 heatmap
  • 後端緩存 Cache
  • 資料一致性
  • Web 安全機制
    • Cookie 與 LocalStorage
  • Vue
    • Element UI
    • Devtool
    • Vuex
    • Vue router
  • 相關網路協議
    • 網路 IP 基礎
    • Google Search 技巧
    • 網路診斷工具
    • IP
    • DNS
  • GitLab 與 Drone
  • SMTP、POP、IMAP
    • SendGrid
  • IPC
  • 串流服務
    • Twilio
    • Agora
  • 其他資源
  • GraphQL
  • Typescript
  • UI 相關資源
  • FFmpeg
  • Unity 遊戲開發筆記
  • Influx DB
  • Windows 相關
  • DALL·E 3
  • Coap
  • Slack API
  • 資訊安全
    • 破解 ZIP 密碼
Powered by GitBook
On this page
  • 範例
  • Worker 搭配 Promise all
  • 然後我們把它改成 worker_thread 版本,並搭配 Promise
  • 使用 microjob module
  • Share memory
  • Atomics
  • Worker 間通訊

Was this helpful?

Edit on GitHub
  1. Node.js 部分

Worker Thread

Previous開始NodeNextREPL

Last updated 4 years ago

Was this helpful?

大部分 API 均與前端 web worker 相同,用來有效處理 CPU bound 的資料。

相關特性

1.多個 worker 不能同時存取主程式相同變數,所以不會有 race condition 問題,透過 postMessage 讓 worker 傳訊息給主程式。

2.假設是通過 postMessage 傳遞 Object ,建議先 JSON.stringify 然後 parse,速度會較快。

範例

app.js

const { Worker } = require('worker_threads');
const path = require('path');

const worker1 = new Worker(path.resolve('./worker.js'));

worker1.on('message', (message) => {
  console.log(`Main thread got message: ${message}`);
});
worker1.postMessage('ping');

worker.js

const { parentPort } = require('worker_threads');

parentPort.on('message', (message) => {
  console.log(`worker got message: ${message}`);
  parentPort.postMessage('pong');
});

之後執行 node app.js

worker got message: ping
Main thread got message: pong

Worker 搭配 Promise all

讓所有 worker 都執行完畢後再繼續程式

我們先看一下原本的 single thread 程式

const crypto = require("crypto");
const sha256 = (s) => crypto.createHash("sha256").update(s).digest();
const totalDataLength = 2000000;
console.time('single thread');
Array.from(Array(totalDataLength)).map(() => sha256(Math.random().toString()));
console.timeEnd('single thread');

耗時大約 10s

然後我們把它改成 worker_thread 版本,並搭配 Promise

把原本的 2000000 個部分,分給四個 thread 計算。

app.js

const { Worker } = require("worker_threads");
const path = require("path");
console.time('thread')
let workerPool = [];
const threadCount = 4;
const totalDataLength = 2000000;
for (let i = 0; i < threadCount; i++) {
  const workerInstance = new Promise((resolve, reject) => {
    const worker = new Worker(path.resolve("./worker.js"));
    worker.on("message", ({ data }) => {
      resolve(data);
    });
    worker.postMessage({
      arrayLength: totalDataLength / threadCount,
    });
  });
  workerPool.push(workerInstance);
}
Promise.all(workerPool)
  .then((values) => {
    console.log(values);
    console.timeEnd('thread')
  })
  .catch((err) => {
    console.log(err);
  });

worker.js

const { parentPort } = require("worker_threads");
const crypto = require("crypto");
const sha256 = (s) => crypto.createHash("sha256").update(s).digest();

parentPort.on("message", ({ arrayLength }) => {
  const shaArray = Array.from(Array(arrayLength)).map((num) => sha256(String(num)));
  parentPort.postMessage({
    data: shaArray,
  });
});

耗時大約 5s

使用 microjob module

(async () => {
  const { job, start, stop } = require("microjob");

  try {
    // start the worker pool
    console.time("microjob");
    await start();

    // this function will be executed in another thread
    const res = await job(() => {
      let i = 0;
      const result = [];
      const threadCount = 4;
      const totalDataLength = 2000000;
      for (i = 0; i < threadCount; i++) {
        // heavy CPU load ...
        const crypto = require("crypto");
        const sha256 = (s) => crypto.createHash("sha256").update(s).digest();
        const shaArray = Array.from(Array(totalDataLength / threadCount)).map((num) =>
          sha256(String(num))
        );
        result.push(shaArray);
      }
      return result;
    });

    console.log(res);
    console.timeEnd("microjob");
  } catch (err) {
    console.error(err);
  } finally {
    // shutdown worker pool
    await stop();
  }
})();

Share memory

可以用 Share memory 的方法來傳遞參數,取代 postMessage,速度會比較快。

app.js

const { Worker } = require("worker_threads");
const path = require("path");
console.time('thread')
let workerPool = [];
const threadCount = 4;
const totalDataLength = 2000000;
const shareMemory = Array(threadCount).fill(new SharedArrayBuffer(totalDataLength / threadCount))

for (let i = 0; i < threadCount; i++) {
  const workerInstance = new Promise((resolve, reject) => {
    const worker = new Worker(path.resolve("./worker.js"));
    worker.on("message", () => {
      resolve();
    });
    worker.postMessage({
      shareMemory: shareMemory[i],
      arrayLength: totalDataLength / threadCount,
    });
  });
  workerPool.push(workerInstance);
}
Promise.all(workerPool)
  .then(() => {
    console.log(shareMemory)
    console.timeEnd('thread')
  })
  .catch((err) => {
    console.log(err);
  });

worker.js

const { parentPort } = require("worker_threads");

parentPort.on("message", ({ arrayLength, shareMemory }) => {
  let uint8Arr = new Uint8Array(shareMemory);
  const shaArray = Array.from(Array(arrayLength)).fill(1).map((num) => Number(num) * 2);
  uint8Arr.set(new Uint8Array(shaArray))
  parentPort.postMessage({});
});

平均速度約 90ms

假設換回原本 postMessage方式

app.js

const { Worker } = require("worker_threads");
const path = require("path");
console.time('thread')
let workerPool = [];
const threadCount = 4;
const totalDataLength = 2000000;

for (let i = 0; i < threadCount; i++) {
  const workerInstance = new Promise((resolve, reject) => {
    const worker = new Worker(path.resolve("./worker.js"));
    worker.on("message", (data) => {
      resolve(data);
    });
    worker.postMessage({
      arrayLength: totalDataLength / threadCount,
    });
  });
  workerPool.push(workerInstance);
}
Promise.all(workerPool)
  .then((data) => {
    console.log(data)
    console.timeEnd('thread')
  })
  .catch((err) => {
    console.log(err);
  });

worker.js

const { parentPort } = require("worker_threads");

parentPort.on("message", ({ arrayLength }) => {
  const shaArray = Array.from(Array(arrayLength)).fill(1).map((num) => Number(num) * 2);
  parentPort.postMessage({
    data: shaArray,
  });
});

可發現需要耗時 150ms 左右,約為 share memory 的兩倍。

資料序列化的時間

假設我們把原本 share memory 範例的 sharedArrayBuffer 在最後拿到資料後全部轉為 array 可發現時間又變回 150ms 左右,所以大部分時先是花在資料的序列化。

    const arr = Array.from(new Uint8Array(shareMemory[0]))
    console.log(arr)

Atomics

使用 share memory 時為了避免存取相同記憶體產生 race condition,可以使用

Atomics.store
Atomics.load
Atomics.notify
Atomics.wait

Worker 間通訊

new MessageChannel()

發現比原本原生的更慢,可以查看issue:

https://github.com/wilk/microjob/issues/65
Worker threads | Node.js v17.4.0 Documentation
Logo