Skip to content

Commit

Permalink
fix: destroy issue and use maximum workers #44
Browse files Browse the repository at this point in the history
fix: destroy issue and use maximum workers
  • Loading branch information
Aslemammad authored Jan 26, 2023
2 parents c5705f6 + 2e304d0 commit 03fbfdf
Show file tree
Hide file tree
Showing 6 changed files with 50 additions and 20 deletions.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
],
"scripts": {
"test:ci": "node --experimental-vm-modules node_modules/jest/bin/jest.js --no-coverage --runInBand",
"test:dev": "node --experimental-vm-modules node_modules/jest/bin/jest.js --watch --no-coverage",
"test:dev": "node --experimental-vm-modules --trace-warnings node_modules/jest/bin/jest.js --watch --no-coverage",
"dev": "tsup --watch",
"build": "tsup",
"publish": "clean-publish",
Expand Down
17 changes: 13 additions & 4 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -447,10 +447,9 @@ class WorkerInfo extends AsynchronouslyCreatedResource {
)
}

destroy(): void {
this.worker.terminate()
async destroy(): Promise<void> {
await this.worker.terminate()
this.port.close()
this.freeWorkerId()
this.clearIdleTimeout()
for (const taskInfo of this.taskInfos.values()) {
taskInfo.done(Errors.ThreadTermination())
Expand Down Expand Up @@ -603,6 +602,14 @@ class ThreadPool {
this._ensureMinimumWorkers()
this.startingUp = false
}
_ensureEnoughWorkersForTaskQueue(): void {
while (
this.workers.size < this.taskQueue.size &&
this.workers.size < this.options.maxThreads
) {
this._addNewWorker()
}
}

_ensureMaximumWorkers(): void {
while (this.workers.size < this.options.maxThreads) {
Expand Down Expand Up @@ -765,6 +772,8 @@ class ThreadPool {
}

_removeWorker(workerInfo: WorkerInfo): void {
workerInfo.freeWorkerId()

workerInfo.destroy()

this.workers.delete(workerInfo)
Expand Down Expand Up @@ -843,7 +852,7 @@ class ThreadPool {
// When `isolateWorkers` is enabled, remove the worker after task is finished
if (this.options.isolateWorkers && taskInfo.workerInfo) {
this._removeWorker(taskInfo.workerInfo)
this._ensureMinimumWorkers()
this._ensureEnoughWorkersForTaskQueue()
}
},
signal,
Expand Down
4 changes: 1 addition & 3 deletions test/fixtures/sleep.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
'use strict'

const { promisify } = require('util')
import { promisify } from 'util'
const sleep = promisify(setTimeout)

const buf = new Uint32Array(new SharedArrayBuffer(4))
Expand Down
13 changes: 13 additions & 0 deletions test/pool-destroy.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@ import { dirname, resolve } from 'path'
import { Tinypool } from 'tinypool'
import { fileURLToPath } from 'url'

const sleep = async (num: number) =>
await new Promise((res) => setTimeout(res, num))

const __dirname = dirname(fileURLToPath(import.meta.url))

test('can destroy pool while tasks are running', async () => {
Expand All @@ -11,3 +14,13 @@ test('can destroy pool while tasks are running', async () => {
setImmediate(() => pool.destroy())
expect(pool.run('while(1){}')).rejects.toThrow(/Terminating worker thread/)
})

test('destroy after initializing should work (#43)', async () => {
const pool = new Tinypool({
filename: resolve(__dirname, 'fixtures/sleep.js'),
isolateWorkers: true,
})

expect(pool.run({})).rejects.toThrow(/Terminating worker thread/)
setImmediate(() => pool.destroy())
})
29 changes: 18 additions & 11 deletions test/simple.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -153,16 +153,6 @@ test('named tasks work', async () => {
expect(await worker.run({})).toBe('b')
})

test('can destroy pool while tasks are running', async () => {
const pool = new Tinypool({
filename: resolve(__dirname, 'fixtures/eval.js'),
})
setImmediate(() => pool.destroy())
expect(async () => await pool.run('while(1){}')).rejects.toThrow(
/Terminating worker thread/
)
})

test('isolateWorkers: false', async () => {
const pool = new Tinypool({
filename: resolve(__dirname, 'fixtures/isolated.js'),
Expand Down Expand Up @@ -261,5 +251,22 @@ test('workerId should never be duplicated', async () => {
}

await pool.destroy()
await sleep(5000)
await sleep(3000)
}, 30000)

test('isolateWorkers: true with minThreads of 0 should not halt (#42)', async () => {
const minThreads = 0,
maxThreads = 6
const pool = new Tinypool({
filename: resolve(__dirname, 'fixtures/isolated.js'),
minThreads,
maxThreads,
isolateWorkers: true,
})
// https://github.com/tinylibs/tinypool/pull/44#discussion_r1070169279
const promises = []
for (let i = 0; i < maxThreads + 1; i++) {
promises.push(pool.run({}))
}
await Promise.all(promises)
})
5 changes: 4 additions & 1 deletion test/uncaught-exception-from-handler.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ import { dirname, resolve } from 'path'
import { Tinypool } from 'tinypool'
import { fileURLToPath } from 'url'
import { once } from 'events'
const sleep = async (num: number) =>
await new Promise((res) => setTimeout(res, num))

const __dirname = dirname(fileURLToPath(import.meta.url))

Expand All @@ -23,7 +25,7 @@ test('uncaught exception in immediate resets Worker', async () => {
await expect(
pool.run(`
setImmediate(() => { throw new Error("not_caught") });
new Promise(() => {}) /* act as if we were doing some work */
new Promise(() => {}) // act as if we were doing some work
`)
).rejects.toThrow(/not_caught/)
})
Expand Down Expand Up @@ -59,6 +61,7 @@ test('using parentPort is treated as an error', async () => {
await expect(
pool.run(`
(async () => {
console.log();
const parentPort = (await import('worker_threads')).parentPort;
parentPort.postMessage("some message");
new Promise(() => {}) /* act as if we were doing some work */
Expand Down

0 comments on commit 03fbfdf

Please sign in to comment.