Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: pool.cancelPendingTasks method #53

Merged
merged 1 commit into from
Apr 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 11 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,21 @@ export default ({ a, b }) => {

We have a similar API to Piscina, so for more information, you can read Piscina's detailed [documentation](https://github.com/piscinajs/piscina#piscina---the-nodejs-worker-pool) and apply the same techniques here.

###### Additional Options
### Tinypool specific APIs

#### Pool constructor options

- `isolateWorkers`: Default to `false`. Always starts with a fresh worker when running tasks to isolate the environment.
- `workerId`: Each worker now has an id ( <= `maxThreads`) that can be imported
from `tinypool` in the worker itself (or `process.__tinypool_state__.workerId`)
- `terminateTimeout`: Defaults to `null`. If terminating a worker takes `terminateTimeout` amount of milliseconds to execute, an error is raised.

#### Pool methods

- `cancelPendingTasks()`: Gracefully cancels all pending tasks without stopping or interfering with on-going tasks. This method is useful when your tasks may have side effects and should not be terminated forcefully during task execution. If your tasks don't have any side effects you may want to use [`{ signal }`](https://github.com/piscinajs/piscina#cancelable-tasks) option for forcefully terminating all tasks, including the on-going ones, instead.

#### Exports

- `workerId`: Each worker now has an id ( <= `maxThreads`) that can be imported from `tinypool` in the worker itself (or `process.__tinypool_state__.workerId`).

## Authors

| <a href="https://github.com/Aslemammad"> <img width='150' src="https://avatars.githubusercontent.com/u/37929992?v=4" /><br> Mohammad Bagher </a> |
Expand Down
2 changes: 2 additions & 0 deletions src/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -70,13 +70,15 @@ export interface Transferable {

export interface Task {
readonly [kQueueOptions]: object | null
cancel(): void
}

export interface TaskQueue {
readonly size: number
shift(): Task | null
remove(task: Task): void
push(task: Task): void
cancel(): void
}

export function isTaskQueue(value: any): boolean {
Expand Down
24 changes: 24 additions & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,16 @@ class AbortError extends Error {
}
}

class CancelError extends Error {
constructor() {
super('The task has been cancelled')
}

get name() {
return 'CancelError'
}
}

type ResourceLimits = Worker extends {
resourceLimits?: infer T
}
Expand Down Expand Up @@ -114,6 +124,13 @@ class ArrayTaskQueue implements TaskQueue {
assert.notStrictEqual(index, -1)
this.tasks.splice(index, 1)
}

cancel(): void {
while (this.tasks.length > 0) {
const task = this.tasks.pop()
task?.cancel()
}
}
}

interface Options {
Expand Down Expand Up @@ -245,6 +262,7 @@ class TaskInfo extends AsyncResource implements Task {
workerInfo: WorkerInfo | null = null
created: number
started: number
cancel: () => void

constructor(
task: any,
Expand All @@ -259,6 +277,7 @@ class TaskInfo extends AsyncResource implements Task {
this.callback = callback
this.task = task
this.transferList = transferList
this.cancel = () => this.callback(new CancelError(), null)

// If the task is a Transferable returned by
// Tinypool.move(), then add it to the transferList
Expand Down Expand Up @@ -1072,6 +1091,11 @@ class Tinypool extends EventEmitterAsyncResource {
return Math.max(pool.taskQueue.size - pool.pendingCapacity(), 0)
}

cancelPendingTasks() {
const pool = this.#pool
pool.taskQueue.cancel()
}

get completed(): number {
return this.#pool.completed
}
Expand Down
48 changes: 48 additions & 0 deletions test/task-queue.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -260,3 +260,51 @@ test('custom task queue works', async () => {
expect(pushCalled).toBeTruthy()
expect(shiftCalled).toBeTruthy()
})

test('queued tasks can be cancelled', async () => {
const pool = new Tinypool({
filename: resolve(__dirname, 'fixtures/sleep.js'),
minThreads: 0,
maxThreads: 1,
})

const time = 500
const taskCount = 10

const promises = []
let finishedTasks = 0
let cancelledTasks = 0

for (const _ of Array(taskCount)) {
const promise = pool
.run({ time })
.then(() => {
finishedTasks++
})
.catch((error) => {
if (error.message !== 'The task has been cancelled') {
throw error
}
cancelledTasks++
})
promises.push(promise)
}

// Wait for the first task to start
await new Promise((resolve) => setTimeout(resolve, time / 2))
expect(pool.queueSize).toBe(taskCount - 1)

// One task is running, cancel the pending ones
pool.cancelPendingTasks()

// The first task should still be on-going, pending ones should have started their cancellation
expect(finishedTasks).toBe(0)
expect(pool.queueSize).toBe(0)

await Promise.all(promises)

expect({ finishedTasks, cancelledTasks }).toEqual({
finishedTasks: 1,
cancelledTasks: taskCount - 1,
})
})