Skip to content

Commit

Permalink
feat: add isolateWorkers support (#4)
Browse files Browse the repository at this point in the history
* feat: add `isolateWorkers` support

* chore: update

* chore: add test
  • Loading branch information
antfu authored Dec 21, 2021
1 parent f812015 commit 0adf67b
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 3 deletions.
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ export default ({ a, b }) => {
We have a similar API to Piscina, so for more information, you can read Piscina's detailed [documentation](https://github.com/piscinajs/piscina#piscina---the-nodejs-worker-pool) and apply the same techniques here.
###### Additional Options
- `isolateWorkers`: Default to `false`. Always starts with a fresh worker when running tasks to isolate the environment.
## Credits
[The Vitest team](https://vitest.dev/) for giving me the chance of creating and maintaing this project for vitest.
Expand Down
11 changes: 8 additions & 3 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ interface Options {
workerData?: any
taskQueue?: TaskQueue
trackUnmanagedFds?: boolean
isolateWorkers?: boolean
}

interface FilledOptions extends Options {
Expand Down Expand Up @@ -774,6 +775,9 @@ class ThreadPool {
}
filename = maybeFileURLToPath(filename)

// Look for a Worker with a minimum number of tasks it is currently running.
let workerInfo: WorkerInfo | null = this.workers.findAvailable()

let resolve: (result: any) => void
let reject: (err: Error) => void
// eslint-disable-next-line
Expand All @@ -788,6 +792,9 @@ class ThreadPool {
name,
(err: Error | null, result: any) => {
this.completed++
if (workerInfo && this.options.isolateWorkers) {
this._removeWorker(workerInfo)
}
if (err !== null) {
reject(err)
} else {
Expand Down Expand Up @@ -842,9 +849,6 @@ class ThreadPool {
return ret
}

// Look for a Worker with a minimum number of tasks it is currently running.
let workerInfo: WorkerInfo | null = this.workers.findAvailable()

// If we want the ability to abort this task, use only workers that have
// no running tasks.
if (workerInfo !== null && workerInfo.currentUsage() > 0 && signal) {
Expand Down Expand Up @@ -877,6 +881,7 @@ class ThreadPool {
taskInfo.started = now
workerInfo.postTask(taskInfo)
this._maybeDrain()

return ret
}

Expand Down
3 changes: 3 additions & 0 deletions test/fixtures/isolated.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
let count = 0

export default () => count++
20 changes: 20 additions & 0 deletions test/simple.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -156,3 +156,23 @@ test('can destroy pool while tasks are running', async () => {
/Terminating worker thread/
)
})

test('isolateWorkers: false', async () => {
const pool = new Tinypool({
filename: resolve(__dirname, 'fixtures/isolated.js'),
isolateWorkers: false,
})
expect(await pool.run({})).toBe(1)
expect(await pool.run({})).toBe(2)
expect(await pool.run({})).toBe(3)
})

test('isolateWorkers: true', async () => {
const pool = new Tinypool({
filename: resolve(__dirname, 'fixtures/isolated.js'),
isolateWorkers: true,
})
expect(await pool.run({})).toBe(1)
expect(await pool.run({})).toBe(1)
expect(await pool.run({})).toBe(1)
})

0 comments on commit 0adf67b

Please sign in to comment.