Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix/abort listener execution #484

Open
wants to merge 32 commits into
base: master
Choose a base branch
from
Open
Changes from 1 commit
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
b274b8e
feat: add std stream event emit for abort handlers
joshLong145 Nov 30, 2024
ba38228
ref: remove log
joshLong145 Nov 30, 2024
f9b4919
feat: add event emit from abort handler
joshLong145 Dec 2, 2024
1f235f3
ref: lower test timeouts
joshLong145 Dec 2, 2024
9ac04b4
ref: change option assignment to tracked task
joshLong145 Dec 3, 2024
650128d
fmt
joshLong145 Dec 4, 2024
85de985
ref: change abort listener execution to fuffil on first promise to re…
joshLong145 Dec 7, 2024
18e4853
feat: add tracking tasks to busy check
joshLong145 Dec 8, 2024
0685ae3
feat: add event handlers for abort start and resolution
joshLong145 Dec 11, 2024
4a892d2
test: abort test refactor (wip)
joshLong145 Dec 11, 2024
e824a79
ref: add default for tracking task options
joshLong145 Dec 11, 2024
efb12c6
ref: rename event handler
joshLong145 Dec 11, 2024
8987fa3
ref: refactor std handler
joshLong145 Dec 12, 2024
f5c38fd
ref: move tracking task deletion after task check
joshLong145 Dec 12, 2024
622ed7d
Merge branch 'feat/std-streams-task-abort' of github.com:joshLong145/…
joshLong145 Dec 12, 2024
9d85663
ref: move abort resolver handler call to worker termination if rejecting
joshLong145 Dec 13, 2024
7c1e099
dev: refactor abort example to show implemented changes
joshLong145 Dec 13, 2024
29c2a51
Merge branch 'master' of github.com:josdejong/workerpool into fix/abo…
joshLong145 Jan 1, 2025
ad9534d
ref: add default for abort resolution handler
joshLong145 Jan 1, 2025
7d24650
move to terminationHandler over pool controlled worker cleanup
joshLong145 Feb 6, 2025
0e0448a
clarify example
joshLong145 Feb 6, 2025
e183786
refactor event listeners for abort op lifecycle
joshLong145 Feb 9, 2025
7e9fa7e
cleanup tests
joshLong145 Feb 9, 2025
90c86ac
update example
joshLong145 Feb 12, 2025
518e7ad
cleanup WorkerHandler
joshLong145 Feb 16, 2025
700be64
rename taskResolver abortResolver change type to promise
joshLong145 Feb 17, 2025
d331a58
remove event argument types for test compilation
joshLong145 Feb 17, 2025
a1c7ccf
update new event handler argument types
joshLong145 Feb 17, 2025
e44d73a
ref: delete tracking task on abort timeout
joshLong145 Feb 23, 2025
0b1b9f9
feat: add abortResolver passthrough to WorkerHandler from pool exec o…
joshLong145 Feb 23, 2025
9e9f7a9
test: abortResolver tests
joshLong145 Feb 23, 2025
1023cc8
ref: rename AbortStartArgs.abortResolver to abortPromise
joshLong145 Feb 23, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
feat: add std stream event emit for abort handlers
joshLong145 committed Nov 30, 2024
commit b274b8ef5ef3403aca17130a39986b89995ae5f7
13 changes: 8 additions & 5 deletions src/WorkerHandler.js
Original file line number Diff line number Diff line change
@@ -220,13 +220,15 @@ function objectToError (obj) {

function handleEmittedStdPayload(handler, payload) {
// TODO: refactor if parallel task execution gets added
if (Object.keys(handler.processing).length !== 1) {
return;
}
var task = Object.values(handler.processing)[0]
if (task.options && typeof task.options.on === 'function') {
if (task && task.options && typeof task.options.on === 'function') {
task.options.on(payload);
}

task = Object.values(handler.tracking)[0];
if (task && task.options && typeof task.options.on === "function") {
task.options.on(payload);
}
}

/**
@@ -422,7 +424,8 @@ WorkerHandler.prototype.exec = function(method, params, resolver, options) {
if (error instanceof Promise.CancellationError || error instanceof Promise.TimeoutError) {
me.tracking[id] = {
id,
resolver: Promise.defer()
resolver: Promise.defer(),
options: me.processing[id].options,
};

// remove this task from the queue. It is already rejected (hence this
21 changes: 20 additions & 1 deletion test/Pool.test.js
Original file line number Diff line number Diff line change
@@ -1354,7 +1354,7 @@ describe('Pool', function () {
maxWorkers: 1,
onCreateWorker: () => {
workerCount += 1;
}
},
});

let task = pool.exec('asyncTimeout', [], {});
@@ -1559,6 +1559,25 @@ describe('Pool', function () {
});
});
});

it('should trigger event stdout in abort handler', function (done) {
var pool = createPool(__dirname + '/workers/cleanup-abort.js', {
maxWorkers: 1,
workerType: 'process',
emitStdStreams: true,
workerTerminateTimeout: 1000,
});

pool.exec('stdoutStreamOnAbort', [], {
on: function (payload) {
assert.strictEqual(payload.stdout.trim(), "Hello, world!");
console.log(payload);
pool.terminate();
done();
}
}).timeout(100);

});
});

describe('validate', () => {
18 changes: 16 additions & 2 deletions test/workers/cleanup-abort.js
Original file line number Diff line number Diff line change
@@ -5,8 +5,7 @@ function asyncTimeout() {
return new Promise(function (resolve) {
let timeout = setTimeout(() => {
resolve();
}, 5000);

}, 5000);
me.worker.addAbortListener(async function () {
clearTimeout(timeout);
resolve();
@@ -34,11 +33,26 @@ function asyncAbortHandlerNeverResolves() {
});
}

function stdoutStreamOnAbort() {
var me = this;
return new Promise(function (resolve) {
let timeout = setTimeout(() => {
resolve();
}, 5000);

me.worker.addAbortListener(async function () {
console.log("Hello, world!");
resolve();
});
});
}

// create a worker and register public functions
workerpool.worker(
{
asyncTimeout: asyncTimeout,
asyncAbortHandlerNeverResolves: asyncAbortHandlerNeverResolves,
stdoutStreamOnAbort: stdoutStreamOnAbort,
},
{
abortListenerTimeout: 1000