Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(sandbox): fix issue where job could stay in active forever #2979

Merged
merged 1 commit into from
Dec 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions src/classes/child-pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ export class ChildPool {
};
}

async retain(processFile: string, exitHandler: any): Promise<Child> {
async retain(processFile: string): Promise<Child> {
let child = this.getFree(processFile).pop();

if (child) {
Expand All @@ -41,10 +41,17 @@ export class ChildPool {
workerThreadsOptions: this.opts.workerThreadsOptions,
});

child.on('exit', exitHandler);
child.on('exit', this.remove.bind(this, child));

try {
await child.init();

// Check status here as well, in case the child exited before we could
// retain it.
if (child.exitCode !== null || child.signalCode !== null) {
throw new Error('Child exited before it could be retained');
}

this.retained[child.pid] = child;

return child;
Expand Down
4 changes: 3 additions & 1 deletion src/classes/child.ts
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,9 @@ export class Child extends EventEmitter {
if (this.childProcess) {
return this.childProcess.pid;
} else if (this.worker) {
return this.worker.threadId;
// Worker threads pids can become negative when they are terminated
// so we need to use the absolute value to index the retained object
return Math.abs(this.worker.threadId);
} else {
throw new Error('No child process or worker thread');
}
Expand Down
7 changes: 3 additions & 4 deletions src/classes/sandbox.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ const sandbox = <T, R, N extends string>(
);
};

child = await childPool.retain(processFile, exitHandler);
child = await childPool.retain(processFile);
child.on('exit', exitHandler);

msgHandler = async (msg: ChildMessage) => {
switch (msg.cmd) {
Expand Down Expand Up @@ -76,9 +77,7 @@ const sandbox = <T, R, N extends string>(
if (child) {
child.off('message', msgHandler);
child.off('exit', exitHandler);
if (child.exitCode !== null || /SIG.*/.test(`${child.signalCode}`)) {
childPool.remove(child);
} else {
if (child.exitCode === null && child.signalCode === null) {
childPool.release(child);
}
}
Expand Down
2 changes: 1 addition & 1 deletion tests/fixtures/fixture_processor.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
const delay = require('./delay');

module.exports = function (/*job*/) {
return delay(500).then(() => {
return delay(1000).then(() => {
return 42;
});
};
2 changes: 1 addition & 1 deletion tests/fixtures/fixture_processor_exit.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
const delay = require('./delay');

module.exports = function (/*job*/) {
return delay(500).then(() => {
return delay(200).then(() => {
delay(100).then(() => {
process.exit(0);
});
Expand Down
153 changes: 133 additions & 20 deletions tests/test_sandboxed_process.ts
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,11 @@ describe('Sandboxed process using child processes', () => {
it('should allow to pass workerForkOptions with timeout', async function () {
const processFile = __dirname + '/fixtures/fixture_processor.js';

// Note that this timeout will not kill the child process immediately, but
// will wait for the child process to resolve all its promises before killing it.
// Therefore the job will not be "cancelled" but will be completed.
const workerForkOptions = {
timeout: 50,
timeout: 1000,
} as any;
const worker = new Worker(queueName, processFile, {
autorun: false,
Expand All @@ -100,25 +103,29 @@ describe('Sandboxed process using child processes', () => {
workerForkOptions,
});

const failing = new Promise<void>((resolve, reject) => {
worker.on('failed', async (job, error) => {
const completing = new Promise<void>((resolve, reject) => {
worker.on('completed', async (job, error) => {
try {
expect([
'Unexpected exit code: null signal: SIGTERM',
'Unexpected exit code: 0 signal: null',
]).to.include(error.message);
const retainedChild = Object.values(
worker['childPool'].retained,
)[0];
expect(retainedChild).to.be.undefined;
resolve();
} catch (err) {
reject(err);
}
});
});

await delay(500);

await queue.add('test', { foo: 'bar' });

worker.run();

await failing;
await delay(600);

await completing;

await worker.close();
});
Expand Down Expand Up @@ -1123,6 +1130,116 @@ function sandboxProcessTests(
await worker.close();
});

describe('when child process a job and its killed direcly after completing', () => {
it('should process the next job in a new child process', async () => {
const processFile = __dirname + '/fixtures/fixture_processor.js';
const worker = new Worker(queueName, processFile, {
connection,
prefix,
drainDelay: 1,
useWorkerThreads,
});

let counter = 0;
let completing;
const completing2 = new Promise<void>((resolve2, reject2) => {
completing = new Promise<void>((resolve, reject) => {
worker.on('completed', async (job: Job, value: any) => {
try {
expect(job.data).to.be.eql({ foo: 'bar' });
expect(value).to.be.eql(42);
expect(
Object.keys(worker['childPool'].retained),
).to.have.lengthOf(0);
expect(worker['childPool'].free[processFile]).to.have.lengthOf(
1,
);
if (counter == 0) {
counter++;
resolve();
} else {
resolve2();
}
} catch (err) {
if (counter == 0) {
return reject(err);
}
reject2(err);
}
});
});
});

await queue.add('foobar', { foo: 'bar' });

await completing;

const child1 = worker['childPool'].free[processFile][0];

await child1.kill('SIGTERM');

await queue.add('foobar', { foo: 'bar' });

await completing2;

const child2 = worker['childPool'].free[processFile][0];

expect(child1).to.not.equal(child2);

await worker.close();
});
});

describe('when child process a job and its killed with SIGKILL while processing', () => {
it('should fail with an unexpected error', async function () {
const processFile = __dirname + '/fixtures/fixture_processor.js';

const worker = new Worker(queueName, processFile, {
autorun: false,
connection,
prefix,
drainDelay: 1,
});

const started = new Promise<void>((resolve, reject) => {
worker.on('active', async (job, prev) => {
expect(prev).to.be.equal('waiting');
resolve();
});
});

const failing = new Promise<void>((resolve, reject) => {
worker.on('failed', async (job, error) => {
try {
expect([
'Unexpected exit code: null signal: SIGKILL',
'Unexpected exit code: 0 signal: null',
]).to.include(error.message);
resolve();
} catch (err) {
reject(err);
}
});
});

await queue.add('test', { foo: 'bar' });

worker.run();

await started;

// Need some time to create the child job and start processing
await delay(250);

const retainedChild = Object.values(worker['childPool'].retained)[0];
await retainedChild.kill('SIGKILL');

await failing;

await worker.close();
});
});

describe('when function is not exported', () => {
it('throws an error', async () => {
const processFile =
Expand Down Expand Up @@ -1158,28 +1275,24 @@ function sandboxProcessTests(
const completing = new Promise<void>((resolve, reject) => {
worker.on('completed', async job => {
try {
expect(job.returnvalue).to.be.undefined;
expect(Object.keys(worker['childPool'].retained)).to.have.lengthOf(
0,
);
expect(worker['childPool'].getAllFree()).to.have.lengthOf(1);
await delay(500);
expect(Object.keys(worker['childPool'].retained)).to.have.lengthOf(
0,
);
expect(worker['childPool'].getAllFree()).to.have.lengthOf(1);
expect(job!.returnvalue).to.be.undefined;
resolve();
} catch (err) {
reject(err);
} finally {
await worker.close();
}
});
});

await queue.add('test', { foo: 'bar' });

await completing;

await delay(200);

expect(Object.keys(worker['childPool'].retained)).to.have.lengthOf(0);
expect(worker['childPool'].getAllFree()).to.have.lengthOf(0);

await worker.close();
});

it('should allow the job to complete and then exit on worker close', async function () {
Expand Down
Loading