From c8b4c20efdfdb09560d4161d5df7b1e039f3fbc7 Mon Sep 17 00:00:00 2001 From: AriPerkkio Date: Wed, 12 Apr 2023 11:50:18 +0300 Subject: [PATCH] feat: `pool.cancelPendingTasks` method --- README.md | 14 +++++++++--- src/common.ts | 2 ++ src/index.ts | 24 +++++++++++++++++++++ test/task-queue.test.ts | 48 +++++++++++++++++++++++++++++++++++++++++ 4 files changed, 85 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index 1f7e4a2..7b7a6b5 100644 --- a/README.md +++ b/README.md @@ -42,13 +42,21 @@ export default ({ a, b }) => { We have a similar API to Piscina, so for more information, you can read Piscina's detailed [documentation](https://github.com/piscinajs/piscina#piscina---the-nodejs-worker-pool) and apply the same techniques here. -###### Additional Options +### Tinypool specific APIs + +#### Pool constructor options - `isolateWorkers`: Default to `false`. Always starts with a fresh worker when running tasks to isolate the environment. -- `workerId`: Each worker now has an id ( <= `maxThreads`) that can be imported - from `tinypool` in the worker itself (or `process.__tinypool_state__.workerId`) - `terminateTimeout`: Defaults to `null`. If terminating a worker takes `terminateTimeout` amount of milliseconds to execute, an error is raised. +#### Pool methods + +- `cancelPendingTasks()`: Gracefully cancels all pending tasks without stopping or interfering with on-going tasks. This method is useful when your tasks may have side effects and should not be terminated forcefully during task execution. If your tasks don't have any side effects you may want to use [`{ signal }`](https://github.com/piscinajs/piscina#cancelable-tasks) option for forcefully terminating all tasks, including the on-going ones, instead. + +#### Exports + +- `workerId`: Each worker now has an id ( <= `maxThreads`) that can be imported from `tinypool` in the worker itself (or `process.__tinypool_state__.workerId`). + ## Authors |
Mohammad Bagher
| diff --git a/src/common.ts b/src/common.ts index fcf9c09..13cc6d2 100644 --- a/src/common.ts +++ b/src/common.ts @@ -70,6 +70,7 @@ export interface Transferable { export interface Task { readonly [kQueueOptions]: object | null + cancel(): void } export interface TaskQueue { @@ -77,6 +78,7 @@ export interface TaskQueue { shift(): Task | null remove(task: Task): void push(task: Task): void + cancel(): void } export function isTaskQueue(value: any): boolean { diff --git a/src/index.ts b/src/index.ts index a63382e..7bf54f8 100644 --- a/src/index.ts +++ b/src/index.ts @@ -83,6 +83,16 @@ class AbortError extends Error { } } +class CancelError extends Error { + constructor() { + super('The task has been cancelled') + } + + get name() { + return 'CancelError' + } +} + type ResourceLimits = Worker extends { resourceLimits?: infer T } @@ -114,6 +124,13 @@ class ArrayTaskQueue implements TaskQueue { assert.notStrictEqual(index, -1) this.tasks.splice(index, 1) } + + cancel(): void { + while (this.tasks.length > 0) { + const task = this.tasks.pop() + task?.cancel() + } + } } interface Options { @@ -245,6 +262,7 @@ class TaskInfo extends AsyncResource implements Task { workerInfo: WorkerInfo | null = null created: number started: number + cancel: () => void constructor( task: any, @@ -259,6 +277,7 @@ class TaskInfo extends AsyncResource implements Task { this.callback = callback this.task = task this.transferList = transferList + this.cancel = () => this.callback(new CancelError(), null) // If the task is a Transferable returned by // Tinypool.move(), then add it to the transferList @@ -1072,6 +1091,11 @@ class Tinypool extends EventEmitterAsyncResource { return Math.max(pool.taskQueue.size - pool.pendingCapacity(), 0) } + cancelPendingTasks() { + const pool = this.#pool + pool.taskQueue.cancel() + } + get completed(): number { return this.#pool.completed } diff --git a/test/task-queue.test.ts b/test/task-queue.test.ts index 9686628..9257867 100644 --- a/test/task-queue.test.ts +++ b/test/task-queue.test.ts @@ -260,3 +260,51 @@ test('custom task queue works', async () => { expect(pushCalled).toBeTruthy() expect(shiftCalled).toBeTruthy() }) + +test('queued tasks can be cancelled', async () => { + const pool = new Tinypool({ + filename: resolve(__dirname, 'fixtures/sleep.js'), + minThreads: 0, + maxThreads: 1, + }) + + const time = 500 + const taskCount = 10 + + const promises = [] + let finishedTasks = 0 + let cancelledTasks = 0 + + for (const _ of Array(taskCount)) { + const promise = pool + .run({ time }) + .then(() => { + finishedTasks++ + }) + .catch((error) => { + if (error.message !== 'The task has been cancelled') { + throw error + } + cancelledTasks++ + }) + promises.push(promise) + } + + // Wait for the first task to start + await new Promise((resolve) => setTimeout(resolve, time / 2)) + expect(pool.queueSize).toBe(taskCount - 1) + + // One task is running, cancel the pending ones + pool.cancelPendingTasks() + + // The first task should still be on-going, pending ones should have started their cancellation + expect(finishedTasks).toBe(0) + expect(pool.queueSize).toBe(0) + + await Promise.all(promises) + + expect({ finishedTasks, cancelledTasks }).toEqual({ + finishedTasks: 1, + cancelledTasks: taskCount - 1, + }) +})