Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: terminateTimeout option #50

Merged
merged 1 commit into from
Mar 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ We have a similar API to Piscina, so for more information, you can read Piscina'
- `isolateWorkers`: Default to `false`. Always starts with a fresh worker when running tasks to isolate the environment.
- `workerId`: Each worker now has an id ( <= `maxThreads`) that can be imported
from `tinypool` in the worker itself (or `process.__tinypool_state__.workerId`)
- `terminateTimeout`: Defaults to `null`. If terminating a worker takes `terminateTimeout` amount of milliseconds to execute, an error is raised.

## Authors

Expand Down
55 changes: 41 additions & 14 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ interface Options {
minThreads?: number
maxThreads?: number
idleTimeout?: number
terminateTimeout?: number
maxQueue?: number | 'auto'
concurrentTasksPerWorker?: number
useAtomics?: boolean
Expand Down Expand Up @@ -447,14 +448,38 @@ class WorkerInfo extends AsynchronouslyCreatedResource {
)
}

async destroy(): Promise<void> {
await this.worker.terminate()
this.port.close()
this.clearIdleTimeout()
for (const taskInfo of this.taskInfos.values()) {
taskInfo.done(Errors.ThreadTermination())
}
this.taskInfos.clear()
async destroy(timeout?: number): Promise<void> {
let resolve: () => void
let reject: (err: Error) => void

const ret = new Promise<void>((res, rej) => {
resolve = res
reject = rej
})

const timer = timeout
? setTimeout(
() => reject(new Error('Failed to terminate worker')),
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One idea I had was that here we could tell the worker to process.exit if main thread's worker.terminate was taking too long or got stuck.

setTimeout(() => {
  this.worker.postMessage('TERMINATE_NOW')
  reject(new Error('Failed to terminate worker')
})

Worker would catch this:

// worker.ts
parentPort.on('message', (message) => {
  // Main thread tried to terminate this worker but failed, let's force the exit here in worker
  if (message === 'TERMINATE_NOW') {
    process.exit()
  }
})

But it seems that this does not work. The worker never receives the message. I guess the worker.terminate() has already closed the message channel even though it has not yet resolved completely.

timeout
)
: null

this.worker.terminate().then(() => {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can't we just await the terminate instead of using the then keyword? if not, why?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We cannot await as the outer Promise has to be the return value.

This outer Promise is required since setTimeout is used to decide when destroy function should reject. This would not work:

async function destroy() {
  setTimeout(() => {
    throw new Error("Timeout error")
  }, 1000);

  await sleep(2000);
}
destroy().then(() => console.log("success")).catch(() => console.log("Failed"))

> Uncaught Error: Timeout error
> success

Wrapping everything in a outer Promise helps:

async function destroy() {
  let resolve, reject;
  const outerPromise = new Promise((res, rej) => {
    resolve = res
    reject = rej
  })
  
  setTimeout(() => {
    reject(new Error("Timeout error"))
  }, 1000);

  sleep(2000).then(resolve);

  return outerPromise;
}
destroy().then(() => console.log("success")).catch(() => console.log("Failed"))

> Failed

Similar pattern is used in runTask:

tinypool/src/index.ts

Lines 832 to 838 in d5e5738

let resolve: (result: any) => void
let reject: (err: Error) => void
// eslint-disable-next-line
const ret = new Promise((res, rej) => {
resolve = res
reject = rej
})

if (timer !== null) {
clearTimeout(timer)
}

this.port.close()
this.clearIdleTimeout()
for (const taskInfo of this.taskInfos.values()) {
taskInfo.done(Errors.ThreadTermination())
}
this.taskInfos.clear()

resolve()
})

return ret
}

clearIdleTimeout(): void {
Expand Down Expand Up @@ -771,12 +796,12 @@ class ThreadPool {
}
}

_removeWorker(workerInfo: WorkerInfo): void {
_removeWorker(workerInfo: WorkerInfo): Promise<void> {
workerInfo.freeWorkerId()

workerInfo.destroy()

this.workers.delete(workerInfo)

return workerInfo.destroy(this.options.terminateTimeout)
}

_onWorkerAvailable(workerInfo: WorkerInfo): void {
Expand Down Expand Up @@ -845,14 +870,16 @@ class ThreadPool {
this.completed++
if (err !== null) {
reject(err)
} else {
resolve(result)
}

// When `isolateWorkers` is enabled, remove the worker after task is finished
if (this.options.isolateWorkers && taskInfo.workerInfo) {
this._removeWorker(taskInfo.workerInfo)
this._ensureEnoughWorkersForTaskQueue()
.then(() => this._ensureEnoughWorkersForTaskQueue())
.then(() => resolve(result))
.catch(reject)
} else {
resolve(result)
}
},
signal,
Expand Down
32 changes: 32 additions & 0 deletions test/termination-timeout.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
import { dirname, resolve } from 'path'
import { Tinypool } from 'tinypool'
import { fileURLToPath } from 'url'

const __dirname = dirname(fileURLToPath(import.meta.url))
const cleanups: (() => Promise<unknown>)[] = []

afterEach(async () => {
await Promise.all(cleanups.splice(0).map((cleanup) => cleanup()))
})

test('termination timeout throws when worker does not terminate in time', async () => {
const pool = new Tinypool({
filename: resolve(__dirname, 'fixtures/sleep.js'),
terminateTimeout: 10,
minThreads: 1,
maxThreads: 2,
isolateWorkers: true,
})

expect(pool.threads.length).toBe(1)

const worker = pool.threads[0]
expect(worker).toBeTruthy()

cleanups.push(worker!.terminate.bind(worker))
worker!.terminate = () => new Promise(() => {})

await expect(pool.run('default')).rejects.toThrowError(
'Failed to terminate worker'
)
})