Skip to content

Commit

Permalink
fix(sandbox): fix issue where job could stay in active forever (#2979)
Browse files Browse the repository at this point in the history
  • Loading branch information
manast authored Dec 21, 2024
1 parent 347b618 commit c0a6bcd
Show file tree
Hide file tree
Showing 6 changed files with 150 additions and 29 deletions.
11 changes: 9 additions & 2 deletions src/classes/child-pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ export class ChildPool {
};
}

async retain(processFile: string, exitHandler: any): Promise<Child> {
async retain(processFile: string): Promise<Child> {
let child = this.getFree(processFile).pop();

if (child) {
Expand All @@ -41,10 +41,17 @@ export class ChildPool {
workerThreadsOptions: this.opts.workerThreadsOptions,
});

child.on('exit', exitHandler);
child.on('exit', this.remove.bind(this, child));

try {
await child.init();

// Check status here as well, in case the child exited before we could
// retain it.
if (child.exitCode !== null || child.signalCode !== null) {
throw new Error('Child exited before it could be retained');
}

this.retained[child.pid] = child;

return child;
Expand Down
4 changes: 3 additions & 1 deletion src/classes/child.ts
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,9 @@ export class Child extends EventEmitter {
if (this.childProcess) {
return this.childProcess.pid;
} else if (this.worker) {
return this.worker.threadId;
// Worker threads pids can become negative when they are terminated
// so we need to use the absolute value to index the retained object
return Math.abs(this.worker.threadId);
} else {
throw new Error('No child process or worker thread');
}
Expand Down
7 changes: 3 additions & 4 deletions src/classes/sandbox.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ const sandbox = <T, R, N extends string>(
);
};

child = await childPool.retain(processFile, exitHandler);
child = await childPool.retain(processFile);
child.on('exit', exitHandler);

msgHandler = async (msg: ChildMessage) => {
switch (msg.cmd) {
Expand Down Expand Up @@ -76,9 +77,7 @@ const sandbox = <T, R, N extends string>(
if (child) {
child.off('message', msgHandler);
child.off('exit', exitHandler);
if (child.exitCode !== null || /SIG.*/.test(`${child.signalCode}`)) {
childPool.remove(child);
} else {
if (child.exitCode === null && child.signalCode === null) {
childPool.release(child);
}
}
Expand Down
2 changes: 1 addition & 1 deletion tests/fixtures/fixture_processor.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
const delay = require('./delay');

module.exports = function (/*job*/) {
return delay(500).then(() => {
return delay(1000).then(() => {
return 42;
});
};
2 changes: 1 addition & 1 deletion tests/fixtures/fixture_processor_exit.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
const delay = require('./delay');

module.exports = function (/*job*/) {
return delay(500).then(() => {
return delay(200).then(() => {
delay(100).then(() => {
process.exit(0);
});
Expand Down
153 changes: 133 additions & 20 deletions tests/test_sandboxed_process.ts
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,11 @@ describe('Sandboxed process using child processes', () => {
it('should allow to pass workerForkOptions with timeout', async function () {
const processFile = __dirname + '/fixtures/fixture_processor.js';

// Note that this timeout will not kill the child process immediately, but
// will wait for the child process to resolve all its promises before killing it.
// Therefore the job will not be "cancelled" but will be completed.
const workerForkOptions = {
timeout: 50,
timeout: 1000,
} as any;
const worker = new Worker(queueName, processFile, {
autorun: false,
Expand All @@ -100,25 +103,29 @@ describe('Sandboxed process using child processes', () => {
workerForkOptions,
});

const failing = new Promise<void>((resolve, reject) => {
worker.on('failed', async (job, error) => {
const completing = new Promise<void>((resolve, reject) => {
worker.on('completed', async (job, error) => {
try {
expect([
'Unexpected exit code: null signal: SIGTERM',
'Unexpected exit code: 0 signal: null',
]).to.include(error.message);
const retainedChild = Object.values(
worker['childPool'].retained,
)[0];
expect(retainedChild).to.be.undefined;
resolve();
} catch (err) {
reject(err);
}
});
});

await delay(500);

await queue.add('test', { foo: 'bar' });

worker.run();

await failing;
await delay(600);

await completing;

await worker.close();
});
Expand Down Expand Up @@ -1123,6 +1130,116 @@ function sandboxProcessTests(
await worker.close();
});

describe('when child process a job and its killed direcly after completing', () => {
it('should process the next job in a new child process', async () => {
const processFile = __dirname + '/fixtures/fixture_processor.js';
const worker = new Worker(queueName, processFile, {
connection,
prefix,
drainDelay: 1,
useWorkerThreads,
});

let counter = 0;
let completing;
const completing2 = new Promise<void>((resolve2, reject2) => {
completing = new Promise<void>((resolve, reject) => {
worker.on('completed', async (job: Job, value: any) => {
try {
expect(job.data).to.be.eql({ foo: 'bar' });
expect(value).to.be.eql(42);
expect(
Object.keys(worker['childPool'].retained),
).to.have.lengthOf(0);
expect(worker['childPool'].free[processFile]).to.have.lengthOf(
1,
);
if (counter == 0) {
counter++;
resolve();
} else {
resolve2();
}
} catch (err) {
if (counter == 0) {
return reject(err);
}
reject2(err);
}
});
});
});

await queue.add('foobar', { foo: 'bar' });

await completing;

const child1 = worker['childPool'].free[processFile][0];

await child1.kill('SIGTERM');

await queue.add('foobar', { foo: 'bar' });

await completing2;

const child2 = worker['childPool'].free[processFile][0];

expect(child1).to.not.equal(child2);

await worker.close();
});
});

describe('when child process a job and its killed with SIGKILL while processing', () => {
it('should fail with an unexpected error', async function () {
const processFile = __dirname + '/fixtures/fixture_processor.js';

const worker = new Worker(queueName, processFile, {
autorun: false,
connection,
prefix,
drainDelay: 1,
});

const started = new Promise<void>((resolve, reject) => {
worker.on('active', async (job, prev) => {
expect(prev).to.be.equal('waiting');
resolve();
});
});

const failing = new Promise<void>((resolve, reject) => {
worker.on('failed', async (job, error) => {
try {
expect([
'Unexpected exit code: null signal: SIGKILL',
'Unexpected exit code: 0 signal: null',
]).to.include(error.message);
resolve();
} catch (err) {
reject(err);
}
});
});

await queue.add('test', { foo: 'bar' });

worker.run();

await started;

// Need some time to create the child job and start processing
await delay(250);

const retainedChild = Object.values(worker['childPool'].retained)[0];
await retainedChild.kill('SIGKILL');

await failing;

await worker.close();
});
});

describe('when function is not exported', () => {
it('throws an error', async () => {
const processFile =
Expand Down Expand Up @@ -1158,28 +1275,24 @@ function sandboxProcessTests(
const completing = new Promise<void>((resolve, reject) => {
worker.on('completed', async job => {
try {
expect(job.returnvalue).to.be.undefined;
expect(Object.keys(worker['childPool'].retained)).to.have.lengthOf(
0,
);
expect(worker['childPool'].getAllFree()).to.have.lengthOf(1);
await delay(500);
expect(Object.keys(worker['childPool'].retained)).to.have.lengthOf(
0,
);
expect(worker['childPool'].getAllFree()).to.have.lengthOf(1);
expect(job!.returnvalue).to.be.undefined;
resolve();
} catch (err) {
reject(err);
} finally {
await worker.close();
}
});
});

await queue.add('test', { foo: 'bar' });

await completing;

await delay(200);

expect(Object.keys(worker['childPool'].retained)).to.have.lengthOf(0);
expect(worker['childPool'].getAllFree()).to.have.lengthOf(0);

await worker.close();
});

it('should allow the job to complete and then exit on worker close', async function () {
Expand Down

0 comments on commit c0a6bcd

Please sign in to comment.