Skip to content

Commit

Permalink
feat: pool.cancelPendingTasks method (#53)
Browse files Browse the repository at this point in the history
  • Loading branch information
AriPerkkio authored Apr 13, 2023
1 parent 9a496d6 commit 65c9b36
Show file tree
Hide file tree
Showing 4 changed files with 85 additions and 3 deletions.
14 changes: 11 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,21 @@ export default ({ a, b }) => {
We have a similar API to Piscina, so for more information, you can read Piscina's detailed [documentation](https://github.com/piscinajs/piscina#piscina---the-nodejs-worker-pool) and apply the same techniques here.
###### Additional Options
### Tinypool specific APIs
#### Pool constructor options
- `isolateWorkers`: Default to `false`. Always starts with a fresh worker when running tasks to isolate the environment.
- `workerId`: Each worker now has an id ( <= `maxThreads`) that can be imported
from `tinypool` in the worker itself (or `process.__tinypool_state__.workerId`)
- `terminateTimeout`: Defaults to `null`. If terminating a worker takes `terminateTimeout` amount of milliseconds to execute, an error is raised.
#### Pool methods
- `cancelPendingTasks()`: Gracefully cancels all pending tasks without stopping or interfering with on-going tasks. This method is useful when your tasks may have side effects and should not be terminated forcefully during task execution. If your tasks don't have any side effects you may want to use [`{ signal }`](https://github.com/piscinajs/piscina#cancelable-tasks) option for forcefully terminating all tasks, including the on-going ones, instead.
#### Exports
- `workerId`: Each worker now has an id ( <= `maxThreads`) that can be imported from `tinypool` in the worker itself (or `process.__tinypool_state__.workerId`).
## Authors
| <a href="https://github.com/Aslemammad"> <img width='150' src="https://avatars.githubusercontent.com/u/37929992?v=4" /><br> Mohammad Bagher </a> |
Expand Down
2 changes: 2 additions & 0 deletions src/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -70,13 +70,15 @@ export interface Transferable {

export interface Task {
readonly [kQueueOptions]: object | null
cancel(): void
}

export interface TaskQueue {
readonly size: number
shift(): Task | null
remove(task: Task): void
push(task: Task): void
cancel(): void
}

export function isTaskQueue(value: any): boolean {
Expand Down
24 changes: 24 additions & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,16 @@ class AbortError extends Error {
}
}

class CancelError extends Error {
constructor() {
super('The task has been cancelled')
}

get name() {
return 'CancelError'
}
}

type ResourceLimits = Worker extends {
resourceLimits?: infer T
}
Expand Down Expand Up @@ -114,6 +124,13 @@ class ArrayTaskQueue implements TaskQueue {
assert.notStrictEqual(index, -1)
this.tasks.splice(index, 1)
}

cancel(): void {
while (this.tasks.length > 0) {
const task = this.tasks.pop()
task?.cancel()
}
}
}

interface Options {
Expand Down Expand Up @@ -245,6 +262,7 @@ class TaskInfo extends AsyncResource implements Task {
workerInfo: WorkerInfo | null = null
created: number
started: number
cancel: () => void

constructor(
task: any,
Expand All @@ -259,6 +277,7 @@ class TaskInfo extends AsyncResource implements Task {
this.callback = callback
this.task = task
this.transferList = transferList
this.cancel = () => this.callback(new CancelError(), null)

// If the task is a Transferable returned by
// Tinypool.move(), then add it to the transferList
Expand Down Expand Up @@ -1072,6 +1091,11 @@ class Tinypool extends EventEmitterAsyncResource {
return Math.max(pool.taskQueue.size - pool.pendingCapacity(), 0)
}

cancelPendingTasks() {
const pool = this.#pool
pool.taskQueue.cancel()
}

get completed(): number {
return this.#pool.completed
}
Expand Down
48 changes: 48 additions & 0 deletions test/task-queue.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -260,3 +260,51 @@ test('custom task queue works', async () => {
expect(pushCalled).toBeTruthy()
expect(shiftCalled).toBeTruthy()
})

test('queued tasks can be cancelled', async () => {
const pool = new Tinypool({
filename: resolve(__dirname, 'fixtures/sleep.js'),
minThreads: 0,
maxThreads: 1,
})

const time = 500
const taskCount = 10

const promises = []
let finishedTasks = 0
let cancelledTasks = 0

for (const _ of Array(taskCount)) {
const promise = pool
.run({ time })
.then(() => {
finishedTasks++
})
.catch((error) => {
if (error.message !== 'The task has been cancelled') {
throw error
}
cancelledTasks++
})
promises.push(promise)
}

// Wait for the first task to start
await new Promise((resolve) => setTimeout(resolve, time / 2))
expect(pool.queueSize).toBe(taskCount - 1)

// One task is running, cancel the pending ones
pool.cancelPendingTasks()

// The first task should still be on-going, pending ones should have started their cancellation
expect(finishedTasks).toBe(0)
expect(pool.queueSize).toBe(0)

await Promise.all(promises)

expect({ finishedTasks, cancelledTasks }).toEqual({
finishedTasks: 1,
cancelledTasks: taskCount - 1,
})
})

0 comments on commit 65c9b36

Please sign in to comment.