From 9a43a3f45080fbf6d4c4e1a4f99f83ebe0d238ce Mon Sep 17 00:00:00 2001 From: roggervalf Date: Sat, 5 Oct 2024 12:08:43 -0500 Subject: [PATCH 1/8] fix(sandbox): catch exit errors --- src/classes/child-pool.ts | 10 ++- src/classes/sandbox.ts | 105 +++++++++++++++++--------------- tests/test_child-pool.ts | 47 +++++++------- tests/test_sandboxed_process.ts | 50 +++++++++++++-- 4 files changed, 133 insertions(+), 79 deletions(-) diff --git a/src/classes/child-pool.ts b/src/classes/child-pool.ts index 61434c0c39..a5da54742d 100644 --- a/src/classes/child-pool.ts +++ b/src/classes/child-pool.ts @@ -27,7 +27,7 @@ export class ChildPool { }; } - async retain(processFile: string): Promise { + async retain(processFile: string, rejectCb: Function): Promise { let child = this.getFree(processFile).pop(); if (child) { @@ -40,7 +40,13 @@ export class ChildPool { workerForkOptions: this.opts.workerForkOptions, workerThreadsOptions: this.opts.workerThreadsOptions, }); - child.on('exit', this.remove.bind(this, child)); + + child.on('exit', (exitCode: any, signal: any) => { + this.remove.bind(this, child); + rejectCb( + new Error('Unexpected exit code: ' + exitCode + ' signal: ' + signal), + ); + }); try { await child.init(); diff --git a/src/classes/sandbox.ts b/src/classes/sandbox.ts index c07e14cb93..3c6bdafbee 100644 --- a/src/classes/sandbox.ts +++ b/src/classes/sandbox.ts @@ -1,5 +1,6 @@ import { ChildCommand, ParentCommand } from '../enums'; import { ChildMessage } from '../interfaces'; +import { Child } from './child'; import { ChildPool } from './child-pool'; import { Job } from './job'; @@ -8,65 +9,69 @@ const sandbox = ( childPool: ChildPool, ) => { return async function process(job: Job, token?: string): Promise { - const child = await childPool.retain(processFile); + let child: Child; let msgHandler: any; - let exitHandler: any; - - await child.send({ - cmd: ChildCommand.Start, - job: job.asJSONSandbox(), - token, - }); + try { + const done: Promise = new Promise((resolve, reject) => { + const initChild = async () => { + try { + child = await childPool.retain(processFile, reject); - const done: Promise = new Promise((resolve, reject) => { - msgHandler = async (msg: ChildMessage) => { - switch (msg.cmd) { - case ParentCommand.Completed: - resolve(msg.value); - break; - case ParentCommand.Failed: - case ParentCommand.Error: { - const err = new Error(); - Object.assign(err, msg.value); - reject(err); - break; - } - case ParentCommand.Progress: - await job.updateProgress(msg.value); - break; - case ParentCommand.Log: - await job.log(msg.value); - break; - case ParentCommand.MoveToDelayed: - await job.moveToDelayed(msg.value?.timestamp, msg.value?.token); - break; - case ParentCommand.Update: - await job.updateData(msg.value); - break; - } - }; + msgHandler = async (msg: ChildMessage) => { + switch (msg.cmd) { + case ParentCommand.Completed: + resolve(msg.value); + break; + case ParentCommand.Failed: + case ParentCommand.Error: { + const err = new Error(); + Object.assign(err, msg.value); + reject(err); + break; + } + case ParentCommand.Progress: + await job.updateProgress(msg.value); + break; + case ParentCommand.Log: + await job.log(msg.value); + break; + case ParentCommand.MoveToDelayed: + await job.moveToDelayed( + msg.value?.timestamp, + msg.value?.token, + ); + break; + case ParentCommand.Update: + await job.updateData(msg.value); + break; + } + }; - exitHandler = (exitCode: any, signal: any) => { - reject( - new Error('Unexpected exit code: ' + exitCode + ' signal: ' + signal), - ); - }; + child.on('message', msgHandler); - child.on('message', msgHandler); - child.on('exit', exitHandler); - }); + child.send({ + cmd: ChildCommand.Start, + job: job.asJSONSandbox(), + token, + }); + } catch (error) { + reject(error); + } + }; + initChild(); + }); - try { await done; return done; } finally { - child.off('message', msgHandler); - child.off('exit', exitHandler); + if (child) { + child.off('message', msgHandler); - if (child.exitCode !== null || /SIG.*/.test(`${child.signalCode}`)) { - childPool.remove(child); - } else { - childPool.release(child); + if (child.exitCode !== null || /SIG.*/.test(`${child.signalCode}`)) { + childPool.remove(child); + } else { + childPool.release(child); + } } } }; diff --git a/tests/test_child-pool.ts b/tests/test_child-pool.ts index 99ec80064d..c53769a447 100644 --- a/tests/test_child-pool.ts +++ b/tests/test_child-pool.ts @@ -2,6 +2,7 @@ import { expect } from 'chai'; import { ChildPool } from '../src/classes'; import { join } from 'path'; +const NoopProc = () => {}; describe('Child pool for Child Processes', () => { sandboxProcessTests(); }); @@ -32,70 +33,70 @@ function sandboxProcessTests( it('should return same child if free', async () => { const processor = __dirname + '/fixtures/fixture_processor_bar.js'; - const child = await pool.retain(processor); + const child = await pool.retain(processor, NoopProc); expect(child).to.be.ok; pool.release(child); expect(pool.retained).to.be.empty; - const newChild = await pool.retain(processor); + const newChild = await pool.retain(processor, NoopProc); expect(child).to.be.eql(newChild); }); it('should return a new child if reused the last free one', async () => { const processor = __dirname + '/fixtures/fixture_processor_bar.js'; - let child = await pool.retain(processor); + let child = await pool.retain(processor, NoopProc); expect(child).to.be.ok; pool.release(child); expect(pool.retained).to.be.empty; - let newChild = await pool.retain(processor); + let newChild = await pool.retain(processor, NoopProc); expect(child).to.be.eql(newChild); child = newChild; - newChild = await pool.retain(processor); + newChild = await pool.retain(processor, NoopProc); expect(child).not.to.be.eql(newChild); }); it('should return a new child if none free', async () => { const processor = __dirname + '/fixtures/fixture_processor_bar.js'; - const child = await pool.retain(processor); + const child = await pool.retain(processor, NoopProc); expect(child).to.be.ok; expect(pool.retained).not.to.be.empty; - const newChild = await pool.retain(processor); + const newChild = await pool.retain(processor, NoopProc); expect(child).to.not.be.eql(newChild); }); it('should return a new child if killed', async () => { const processor = __dirname + '/fixtures/fixture_processor_bar.js'; - const child = await pool.retain(processor); + const child = await pool.retain(processor, NoopProc); expect(child).to.be.ok; await pool.kill(child); expect(pool.retained).to.be.empty; - const newChild = await pool.retain(processor); + const newChild = await pool.retain(processor, NoopProc); expect(child).to.not.be.eql(newChild); }); it('should return a new child if many retained and none free', async () => { const processor = __dirname + '/fixtures/fixture_processor_bar.js'; const children = await Promise.all([ - pool.retain(processor), - pool.retain(processor), - pool.retain(processor), - pool.retain(processor), - pool.retain(processor), - pool.retain(processor), + pool.retain(processor, NoopProc), + pool.retain(processor, NoopProc), + pool.retain(processor, NoopProc), + pool.retain(processor, NoopProc), + pool.retain(processor, NoopProc), + pool.retain(processor, NoopProc), ]); expect(children).to.have.length(6); - const child = await pool.retain(processor); + const child = await pool.retain(processor, NoopProc); expect(children).not.to.include(child); }).timeout(10000); it('should return an old child if many retained and one free', async () => { const processor = __dirname + '/fixtures/fixture_processor_bar.js'; const children = await Promise.all([ - pool.retain(processor), - pool.retain(processor), - pool.retain(processor), - pool.retain(processor), - pool.retain(processor), - pool.retain(processor), + pool.retain(processor, NoopProc), + pool.retain(processor, NoopProc), + pool.retain(processor, NoopProc), + pool.retain(processor, NoopProc), + pool.retain(processor, NoopProc), + pool.retain(processor, NoopProc), ]); expect(children).to.have.length(6); @@ -108,7 +109,7 @@ function sandboxProcessTests( const processor = __dirname + '/fixtures/fixture_processor_bar.js'; process.execArgv.push('--no-warnings'); - const child = await pool.retain(processor); + const child = await pool.retain(processor, NoopProc); expect(child).to.be.ok; if (!useWorkerThreads) { expect(child.childProcess.spawnargs).to.include('--no-warnings'); diff --git a/tests/test_sandboxed_process.ts b/tests/test_sandboxed_process.ts index da6e13e015..c24a8ce2b7 100644 --- a/tests/test_sandboxed_process.ts +++ b/tests/test_sandboxed_process.ts @@ -77,6 +77,40 @@ describe('Sandboxed process using child processes', () => { await worker.close(); }); + + it('should allow to pass workerForkOptions with timeout', async function () { + const processFile = __dirname + '/fixtures/fixture_processor.js'; + + const workerForkOptions = { + timeout: 100, + } as any; + const worker = new Worker(queueName, processFile, { + connection, + prefix, + drainDelay: 1, + useWorkerThreads: false, + workerForkOptions, + }); + + const failing = new Promise((resolve, reject) => { + worker.on('failed', async (job, error) => { + try { + expect(error.message).to.be.equal( + 'Unexpected exit code: null signal: SIGTERM', + ); + resolve(); + } catch (err) { + reject(err); + } + }); + }); + + await queue.add('test', { foo: 'bar' }); + + await failing; + + await worker.close(); + }); }); }); @@ -1019,12 +1053,20 @@ function sandboxProcessTests( useWorkerThreads, }); - const job = await queue.add('test', { exitCode: 1 }); + const failing = new Promise((resolve, reject) => { + worker.on('failed', async (job, error) => { + try { + expect(error.message).to.be.equal('Broken file processor'); + resolve(); + } catch (err) { + reject(err); + } + }); + }); - await expect(job.waitUntilFinished(queueEvents)).to.be.rejectedWith( - 'Broken file processor', - ); + await queue.add('test', { exitCode: 1 }); + await failing; await worker.close(); }); From 6e02943c01b8c837132450f51afd75f3c00d0c44 Mon Sep 17 00:00:00 2001 From: roggervalf Date: Sat, 5 Oct 2024 12:34:43 -0500 Subject: [PATCH 2/8] test: fix test cases --- src/classes/child-pool.ts | 9 ++------- src/classes/sandbox.ts | 13 +++++++++++-- tests/test_sandboxed_process.ts | 9 ++++++--- 3 files changed, 19 insertions(+), 12 deletions(-) diff --git a/src/classes/child-pool.ts b/src/classes/child-pool.ts index a5da54742d..66147977aa 100644 --- a/src/classes/child-pool.ts +++ b/src/classes/child-pool.ts @@ -27,7 +27,7 @@ export class ChildPool { }; } - async retain(processFile: string, rejectCb: Function): Promise { + async retain(processFile: string, exitHandler: any): Promise { let child = this.getFree(processFile).pop(); if (child) { @@ -41,12 +41,7 @@ export class ChildPool { workerThreadsOptions: this.opts.workerThreadsOptions, }); - child.on('exit', (exitCode: any, signal: any) => { - this.remove.bind(this, child); - rejectCb( - new Error('Unexpected exit code: ' + exitCode + ' signal: ' + signal), - ); - }); + child.on('exit', exitHandler); try { await child.init(); diff --git a/src/classes/sandbox.ts b/src/classes/sandbox.ts index 3c6bdafbee..e9793714d3 100644 --- a/src/classes/sandbox.ts +++ b/src/classes/sandbox.ts @@ -11,11 +11,20 @@ const sandbox = ( return async function process(job: Job, token?: string): Promise { let child: Child; let msgHandler: any; + let exitHandler: any; try { const done: Promise = new Promise((resolve, reject) => { const initChild = async () => { try { - child = await childPool.retain(processFile, reject); + exitHandler = (exitCode: any, signal: any) => { + reject( + new Error( + 'Unexpected exit code: ' + exitCode + ' signal: ' + signal, + ), + ); + }; + + child = await childPool.retain(processFile, exitHandler); msgHandler = async (msg: ChildMessage) => { switch (msg.cmd) { @@ -66,7 +75,7 @@ const sandbox = ( } finally { if (child) { child.off('message', msgHandler); - + child.off('exit', exitHandler); if (child.exitCode !== null || /SIG.*/.test(`${child.signalCode}`)) { childPool.remove(child); } else { diff --git a/tests/test_sandboxed_process.ts b/tests/test_sandboxed_process.ts index c24a8ce2b7..a9d98379b2 100644 --- a/tests/test_sandboxed_process.ts +++ b/tests/test_sandboxed_process.ts @@ -1092,7 +1092,7 @@ function sandboxProcessTests( }); }); - it('should remove exited process', async () => { + it('should release exited process', async () => { const processFile = __dirname + '/fixtures/fixture_processor_exit.js'; const worker = new Worker(queueName, processFile, { @@ -1114,7 +1114,7 @@ function sandboxProcessTests( expect(Object.keys(worker['childPool'].retained)).to.have.lengthOf( 0, ); - expect(worker['childPool'].getAllFree()).to.have.lengthOf(0); + expect(worker['childPool'].getAllFree()).to.have.lengthOf(1); resolve(); } catch (err) { reject(err); @@ -1139,7 +1139,10 @@ function sandboxProcessTests( }); // acquire and release a child here so we know it has it's full termination handler setup - const initializedChild = await worker['childPool'].retain(processFile); + const initializedChild = await worker['childPool'].retain( + processFile, + () => {}, + ); await worker['childPool'].release(initializedChild); // await this After we've added the job From 0fb71546a3da7a3d2edfd472c8ba531be2a4a7a5 Mon Sep 17 00:00:00 2001 From: roggervalf Date: Sat, 5 Oct 2024 12:56:36 -0500 Subject: [PATCH 3/8] test: try to fix a test case --- tests/test_sandboxed_process.ts | 3 +++ 1 file changed, 3 insertions(+) diff --git a/tests/test_sandboxed_process.ts b/tests/test_sandboxed_process.ts index a9d98379b2..61907d1d56 100644 --- a/tests/test_sandboxed_process.ts +++ b/tests/test_sandboxed_process.ts @@ -85,6 +85,7 @@ describe('Sandboxed process using child processes', () => { timeout: 100, } as any; const worker = new Worker(queueName, processFile, { + autorun: false, connection, prefix, drainDelay: 1, @@ -107,6 +108,8 @@ describe('Sandboxed process using child processes', () => { await queue.add('test', { foo: 'bar' }); + worker.run(); + await failing; await worker.close(); From 3144a771b6ea43daab22a9586e03e9463896b6f6 Mon Sep 17 00:00:00 2001 From: roggervalf Date: Sat, 5 Oct 2024 23:57:35 -0500 Subject: [PATCH 4/8] test: fix test case --- tests/test_sandboxed_process.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_sandboxed_process.ts b/tests/test_sandboxed_process.ts index 61907d1d56..3783d14753 100644 --- a/tests/test_sandboxed_process.ts +++ b/tests/test_sandboxed_process.ts @@ -97,7 +97,7 @@ describe('Sandboxed process using child processes', () => { worker.on('failed', async (job, error) => { try { expect(error.message).to.be.equal( - 'Unexpected exit code: null signal: SIGTERM', + 'Unexpected exit code: 0 signal: null', ); resolve(); } catch (err) { From d69ce0136cc1f0d4ebda9f22630472cce332872b Mon Sep 17 00:00:00 2001 From: roggervalf Date: Sun, 6 Oct 2024 00:08:17 -0500 Subject: [PATCH 5/8] test: fix test case --- tests/test_sandboxed_process.ts | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/tests/test_sandboxed_process.ts b/tests/test_sandboxed_process.ts index 3783d14753..54e07d1030 100644 --- a/tests/test_sandboxed_process.ts +++ b/tests/test_sandboxed_process.ts @@ -82,7 +82,7 @@ describe('Sandboxed process using child processes', () => { const processFile = __dirname + '/fixtures/fixture_processor.js'; const workerForkOptions = { - timeout: 100, + timeout: 150, } as any; const worker = new Worker(queueName, processFile, { autorun: false, @@ -96,9 +96,10 @@ describe('Sandboxed process using child processes', () => { const failing = new Promise((resolve, reject) => { worker.on('failed', async (job, error) => { try { - expect(error.message).to.be.equal( + expect([ + 'Unexpected exit code: null signal: SIGTERM', 'Unexpected exit code: 0 signal: null', - ); + ]).to.include(error.message); resolve(); } catch (err) { reject(err); From fd848d767a69d003b327d44916d206b77389c0a7 Mon Sep 17 00:00:00 2001 From: roggervalf Date: Sun, 6 Oct 2024 00:15:31 -0500 Subject: [PATCH 6/8] test: 2nd try --- tests/test_sandboxed_process.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_sandboxed_process.ts b/tests/test_sandboxed_process.ts index 54e07d1030..165760cfa5 100644 --- a/tests/test_sandboxed_process.ts +++ b/tests/test_sandboxed_process.ts @@ -82,7 +82,7 @@ describe('Sandboxed process using child processes', () => { const processFile = __dirname + '/fixtures/fixture_processor.js'; const workerForkOptions = { - timeout: 150, + timeout: 250, } as any; const worker = new Worker(queueName, processFile, { autorun: false, From ff91794eb86972504c172d15723a3d84283b047d Mon Sep 17 00:00:00 2001 From: roggervalf Date: Sun, 6 Oct 2024 00:21:32 -0500 Subject: [PATCH 7/8] chore: add console for testing --- tests/test_sandboxed_process.ts | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/test_sandboxed_process.ts b/tests/test_sandboxed_process.ts index 165760cfa5..91c4b652b3 100644 --- a/tests/test_sandboxed_process.ts +++ b/tests/test_sandboxed_process.ts @@ -82,7 +82,7 @@ describe('Sandboxed process using child processes', () => { const processFile = __dirname + '/fixtures/fixture_processor.js'; const workerForkOptions = { - timeout: 250, + timeout: 300, } as any; const worker = new Worker(queueName, processFile, { autorun: false, @@ -96,6 +96,7 @@ describe('Sandboxed process using child processes', () => { const failing = new Promise((resolve, reject) => { worker.on('failed', async (job, error) => { try { + console.log(error.message); expect([ 'Unexpected exit code: null signal: SIGTERM', 'Unexpected exit code: 0 signal: null', From aef88393e2375b4e88cf851a95b359104c834d35 Mon Sep 17 00:00:00 2001 From: roggervalf Date: Sun, 6 Oct 2024 00:44:40 -0500 Subject: [PATCH 8/8] chore: try new test change --- tests/test_sandboxed_process.ts | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/tests/test_sandboxed_process.ts b/tests/test_sandboxed_process.ts index 91c4b652b3..4cf155dafb 100644 --- a/tests/test_sandboxed_process.ts +++ b/tests/test_sandboxed_process.ts @@ -82,7 +82,7 @@ describe('Sandboxed process using child processes', () => { const processFile = __dirname + '/fixtures/fixture_processor.js'; const workerForkOptions = { - timeout: 300, + timeout: 50, } as any; const worker = new Worker(queueName, processFile, { autorun: false, @@ -96,7 +96,6 @@ describe('Sandboxed process using child processes', () => { const failing = new Promise((resolve, reject) => { worker.on('failed', async (job, error) => { try { - console.log(error.message); expect([ 'Unexpected exit code: null signal: SIGTERM', 'Unexpected exit code: 0 signal: null',