Skip to content

Commit

Permalink
feat: support Bun Worker runtime
Browse files Browse the repository at this point in the history
- Instead of using Node APIs like in #70, use Bun's native Workers API
  • Loading branch information
AriPerkkio committed Sep 27, 2023
1 parent a5b6669 commit 963e02a
Show file tree
Hide file tree
Showing 10 changed files with 372 additions and 25 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
.nyc_output
.vscode
node_modules
bun.lockb
dist
coverage
5 changes: 5 additions & 0 deletions benchmark/fixtures/wrap-add-bun.mjs
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
import add from './add.mjs'

self.onmessage = (event) => {
postMessage(add(event.data))
}
132 changes: 111 additions & 21 deletions benchmark/isolate-benchmark.mjs
Original file line number Diff line number Diff line change
@@ -1,35 +1,66 @@
/*
* Benchmark for testing whether Tinypool's worker creation and teardown is expensive.
* Benchmark focusing on the performance `isolateWorkers` option
*
* Options:
* - `--rounds` (optional) - Specify how many iterations to run
* - `--threads` (optional) - Specify how many threads to use
*/
import { cpus } from 'node:os'
import { Worker } from 'node:worker_threads'

import * as os from 'node:os'
import * as WorkerThreads from 'node:worker_threads'

import Tinypool from '../dist/esm/index.js'

const THREADS = cpus().length - 1
const ROUNDS = 5_000
const IS_BUN = process.versions.bun !== undefined
const USE_ATOMICS = !IS_BUN
const THREADS = parseInt(getArgument('--threads') ?? getMaxThreads(), 10)
const ROUNDS = parseInt(getArgument('--rounds') ?? '5_000', 10)

console.log('Options:', { THREADS, ROUNDS, IS_BUN }, '\n')

if (IS_BUN) {
await logTime(
"Tinypool { runtime: 'bun_workers' }",
runTinypool('bun_workers')
)

await logTime('Native Bun workers', runBunWorkers())
process.exit(0)
}

await logTime(
"Tinypool { runtime: 'worker_threds' }",
runTinypool('worker_threds')
)
await logTime(
"Tinypool { runtime: 'child_process' }",
runTinypool('child_process')
)

await logTime('Tinypool', runTinypool)
await logTime('Worker threads', runWorkerThreads)
await logTime('Native node:worker_threads', runNodeWorkerThreads())

async function runTinypool() {
function runTinypool(runtime) {
const pool = new Tinypool({
runtime,
filename: new URL('./fixtures/add.mjs', import.meta.url).href,
isolateWorkers: true,
minThreads: THREADS,
maxThreads: THREADS,
useAtomics: USE_ATOMICS,
})

await Promise.all(
Array(ROUNDS)
.fill()
.map(() => pool.run({ a: 1, b: 2 }))
)
return async function run() {
await Promise.all(
Array(ROUNDS)
.fill()
.map(() => pool.run({ a: 1, b: 2 }))
)
}
}

async function runWorkerThreads() {
function runNodeWorkerThreads() {
async function task() {
const worker = new Worker('./fixtures/wrap-add.mjs')
const worker = new WorkerThreads.Worker('./fixtures/wrap-add.mjs')
worker.postMessage({ a: 1, b: 2 })

await new Promise((resolve, reject) =>
Expand All @@ -50,16 +81,75 @@ async function runWorkerThreads() {
}
}

await Promise.all(
Array(THREADS)
.fill(execute)
.map((task) => task())
)
return async function run() {
await Promise.all(
Array(THREADS)
.fill(execute)
.map((task) => task())
)
}
}

function runBunWorkers() {
async function task() {
const worker = new Worker('./fixtures/wrap-add-bun.mjs')
worker.postMessage({ a: 1, b: 2 })

await new Promise((resolve, reject) => {
worker.onmessage = (event) =>
event.data === 3 ? resolve() : reject('Not 3')
})

await worker.terminate()
}

const pool = Array(ROUNDS).fill(task)

async function execute() {
const task = pool.shift()

if (task) {
await task()
return execute()
}
}

return async function run() {
await Promise.all(
Array(THREADS)
.fill(execute)
.map((task) => task())
)
}
}

function getArgument(flag) {
const index = process.argv.indexOf(flag)
if (index === -1) return

return process.argv[index + 1]
}

function getMaxThreads() {
return os.availableParallelism?.() || os.cpus().length - 1
}

async function logTime(label, method) {
console.log(`${label} | START`)

const start = process.hrtime.bigint()
await method()
const end = process.hrtime.bigint()
console.log(label, 'took', ((end - start) / 1_000_000n).toString(), 'ms')

console.log(`${label} | END ${((end - start) / 1_000_000n).toString()} ms`)

console.log('Cooling down for 2s')
const interval = setInterval(() => process.stdout.write('.'), 100)
await sleep(2_000)
clearInterval(interval)
console.log(' ✓\n')
}

async function sleep(ms) {
await new Promise((resolve) => setTimeout(resolve, ms))
}
4 changes: 4 additions & 0 deletions bun-test/fixtures/eval.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
// eslint-disable-next-line no-eval
export default function (code) {
return eval(code)
}
59 changes: 59 additions & 0 deletions bun-test/runtime.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
import * as path from 'path'
import { fileURLToPath } from 'url'
import { Tinypool } from '../dist/esm'

const __dirname = path.dirname(fileURLToPath(import.meta.url))

describe('Bun Workers', () => {
test('runs code in Bun Worker', async () => {
const pool = createPool({ runtime: 'bun_workers' })

const result = await pool.run(`
(async () => {
return {
sum: 11 + 12,
isMainThread: Bun.isMainThread,
pid: process.pid,
}
})()
`)
expect(result.sum).toBe(23)
expect(result.isMainThread).toBe(false)
expect(result.pid).toBe(process.pid)
})

test('sets tinypool state', async () => {
const pool = createPool({ runtime: 'bun_workers' })

const result = await pool.run('process.__tinypool_state__')
expect(result.isTinypoolWorker).toBe(true)
expect(result.isBunWorker).toBe(true)
expect(result.isWorkerThread).toBe(undefined)
expect(result.isChildProcess).toBe(undefined)
})

test('errors are serialized', async () => {
const pool = createPool({ runtime: 'bun_workers' })

const error = await pool
.run("throw new TypeError('Test message');")
.catch((e: Error) => e)

expect(error.name).toBe('TypeError')
expect(error.message).toBe('Test message')

// Nope Bun does not do this
// expect(error.stack).toMatch('fixtures/eval.js')
})
})

function createPool(options) {
const pool = new Tinypool({
filename: path.resolve(__dirname, './fixtures/eval.js'),
minThreads: 1,
maxThreads: 1,
...options,
})

return pool
}
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
"scripts": {
"test:ci": "node --experimental-vm-modules node_modules/jest/bin/jest.js --no-coverage --runInBand",
"test:dev": "node --experimental-vm-modules --trace-warnings node_modules/jest/bin/jest.js --watch --no-coverage",
"test:bun": "bun --bun test bun-test/**",
"dev": "tsup --watch",
"build": "tsup",
"publish": "clean-publish",
Expand Down
78 changes: 78 additions & 0 deletions src/entry/bun-worker.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
import {
StartupMessage,
ReadyMessage,
RequestMessage,
ResponseMessage,
} from '../common'
import { getHandler, throwInNextTick } from './utils'
import { stderr, stdout } from 'src/utils'

process.__tinypool_state__ = {
isTinypoolWorker: true,
isBunWorker: true,
workerData: null,
workerId: 1,
}

self.onmessage = onWorkerMessage

function onWorkerMessage(event: MessageEvent<StartupMessage>) {
const { filename, name } = event.data

;(async function () {
if (filename !== null) {
await getHandler(filename, name)
}

const readyMessage: ReadyMessage = { ready: true }
self.postMessage(readyMessage, '')
})().catch(throwInNextTick)

if (event.ports?.[0]) {
event.ports[0].start()
event.ports[0].onmessage = onPortMessage.bind(null, event.ports[0])
}
}

function onPortMessage(port: MessagePort, event: MessageEvent<RequestMessage>) {
const message = event.data
const { taskId, task, filename, name } = message

;(async function () {
let response: ResponseMessage

try {
const handler = await getHandler(filename, name)
if (handler === null) {
throw new Error(`No handler function exported from ${filename}`)
}
let result = await handler(task)
response = {
taskId,
result: result,
error: null,
usedMemory: process.memoryUsage().heapUsed,
}

// If the task used e.g. console.log(), wait for the stream to drain
// before potentially entering the `Atomics.wait()` loop, and before
// returning the result so that messages will always be printed even
// if the process would otherwise be ready to exit.
if (stdout()?.writableLength! > 0) {
await new Promise((resolve) => process.stdout.write('', resolve))
}
if (stderr()?.writableLength! > 0) {
await new Promise((resolve) => process.stderr.write('', resolve))
}
} catch (error) {
response = {
taskId,
result: null,
error,
usedMemory: process.memoryUsage().heapUsed,
}
}

port.postMessage(response)
})().catch(throwInNextTick)
}
7 changes: 6 additions & 1 deletion src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import {
} from './common'
import ThreadWorker from './runtime/thread-worker'
import ProcessWorker from './runtime/process-worker'
import BunWorker from './runtime/bun-worker'

declare global {
namespace NodeJS {
Expand All @@ -44,6 +45,7 @@ declare global {
isTinypoolWorker: boolean
isWorkerThread?: boolean
isChildProcess?: boolean
isBunWorker?: boolean
workerData: any
workerId: number
}
Expand Down Expand Up @@ -135,7 +137,7 @@ class ArrayTaskQueue implements TaskQueue {

interface Options {
filename?: string | null
runtime?: 'worker_threads' | 'child_process'
runtime?: 'worker_threads' | 'child_process' | 'bun_workers'
name?: string
minThreads?: number
maxThreads?: number
Expand Down Expand Up @@ -699,6 +701,8 @@ class ThreadPool {
const worker =
this.options.runtime === 'child_process'
? new ProcessWorker()
: this.options.runtime === 'bun_workers'
? new BunWorker()
: new ThreadWorker()

worker.initialize({
Expand Down Expand Up @@ -740,6 +744,7 @@ class ThreadPool {
}

const { port1, port2 } = new MessageChannel()
port1.start()
const workerInfo = new WorkerInfo(
worker,
port1,
Expand Down
Loading

0 comments on commit 963e02a

Please sign in to comment.