Skip to content

Commit

Permalink
fix: prevent hang when process is overwritten (#83)
Browse files Browse the repository at this point in the history
  • Loading branch information
AriPerkkio authored Mar 25, 2024
1 parent 6ca7bb4 commit 18c8684
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 6 deletions.
11 changes: 7 additions & 4 deletions src/entry/process.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ process.__tinypool_state__ = {
workerId: Number(process.env.TINYPOOL_WORKER_ID),
}

const memoryUsage = process.memoryUsage.bind(process)
const send = process.send!.bind(process)

process.on('message', (message: IncomingMessage) => {
// Message was not for port or pool
// It's likely end-users own communication between main and worker
Expand All @@ -36,7 +39,7 @@ process.on('message', (message: IncomingMessage) => {
await getHandler(filename, name)
}

process.send!(<OutgoingMessage>{
send!(<OutgoingMessage>{
ready: true,
source: 'pool',
__tinypool_worker_message__: true,
Expand Down Expand Up @@ -69,7 +72,7 @@ async function onMessage(message: IncomingMessage & { source: 'port' }) {
taskId,
result,
error: null,
usedMemory: process.memoryUsage().heapUsed,
usedMemory: memoryUsage().heapUsed,
}

// If the task used e.g. console.log(), wait for the stream to drain
Expand All @@ -89,11 +92,11 @@ async function onMessage(message: IncomingMessage & { source: 'port' }) {
taskId,
result: null,
error: serializeError(error),
usedMemory: process.memoryUsage().heapUsed,
usedMemory: memoryUsage().heapUsed,
}
}

process.send!(response)
send!(response)
}

function serializeError(error: unknown) {
Expand Down
5 changes: 3 additions & 2 deletions src/entry/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ process.__tinypool_state__ = {
workerId: tinypoolPrivateData.workerId,
}

const memoryUsage = process.memoryUsage.bind(process)
let useAtomics: boolean = process.env.PISCINA_DISABLE_ATOMICS !== '1'

// We should only receive this message once, when the Worker starts. It gives
Expand Down Expand Up @@ -110,7 +111,7 @@ function onMessage(
taskId,
result: result,
error: null,
usedMemory: process.memoryUsage().heapUsed,
usedMemory: memoryUsage().heapUsed,
}

// If the task used e.g. console.log(), wait for the stream to drain
Expand All @@ -130,7 +131,7 @@ function onMessage(
// It may be worth taking a look at the error cloning algorithm we
// use in Node.js core here, it's quite a bit more flexible
error,
usedMemory: process.memoryUsage().heapUsed,
usedMemory: memoryUsage().heapUsed,
}
}
currentTasks--
Expand Down
32 changes: 32 additions & 0 deletions test/globals.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
import * as path from 'path'
import { fileURLToPath } from 'url'
import { Tinypool } from 'tinypool'

const __dirname = path.dirname(fileURLToPath(import.meta.url))

describe.each(['worker_threads', 'child_process'] as const)('%s', (runtime) => {
test("doesn't hang when process is overwritten", async () => {
const pool = createPool({ runtime })

const result = await pool.run(`
(async () => {
return new Promise(resolve => {
globalThis.process = { exit: resolve };
process.exit("exit() from overwritten process");
});
})();
`)
expect(result).toBe('exit() from overwritten process')
})
})

function createPool(options: Partial<Tinypool['options']>) {
const pool = new Tinypool({
filename: path.resolve(__dirname, 'fixtures/eval.js'),
minThreads: 1,
maxThreads: 1,
...options,
})

return pool
}

0 comments on commit 18c8684

Please sign in to comment.