diff --git a/.husky/pre-commit b/.husky/pre-commit index 336cbd628b..48087ddca7 100755 --- a/.husky/pre-commit +++ b/.husky/pre-commit @@ -1,4 +1,4 @@ #!/usr/bin/env sh . "$(dirname -- "$0")/_/husky.sh" -yarn npm-run-all pretty:quick lint:staged +npx npm-run-all pretty:quick lint:staged diff --git a/package.json b/package.json index aa71199f30..9387aafee6 100644 --- a/package.json +++ b/package.json @@ -28,7 +28,7 @@ "copy:lua": "copyfiles -f ./src/commands/*.lua ./dist/cjs/commands && copyfiles -f ./src/commands/*.lua ./dist/esm/commands", "copy:lua:python": "copyfiles -f ./rawScripts/*.lua ./python/bullmq/commands", "copy:main:type": "copyfiles -f ./dist/esm/classes/main.d.ts ./dist/esm/classes/main-worker.d.ts ./dist/cjs/classes", - "coverage": "nyc --reporter=text --reporter=lcovonly yarn test", + "coverage": "nyc --reporter=text --reporter=lcovonly npm run test", "cm": "git cz", "docs": "typedoc --excludeExternals --excludeProtected --excludePrivate --readme none src/index.ts", "docs:json": "typedoc --excludeExternals --excludeProtected --excludePrivate --readme none src/index.ts --json ./apiVersions/v5.json --name v5", @@ -39,7 +39,7 @@ "generate:raw:scripts": "ts-node --project tsconfig-cjs.json generateRawScripts.ts", "lint": "./node_modules/.bin/eslint . --ignore-path ./.eslintignore", "lint:staged": "lint-staged", - "prepublishOnly": "yarn build", + "prepublishOnly": "npm run build", "prepare": "husky install", "pretest": "npm-run-all clean:scripts generate:raw:scripts transform:commands", "prettier": "prettier --config package.json src/**/*.ts", @@ -162,7 +162,7 @@ ] }, "lint-staged": { - "*.{js,ts}": "yarn eslint:fix" + "*.{js,ts}": "npm run eslint:fix" }, "repository": { "type": "git", diff --git a/src/classes/child-processor.ts b/src/classes/child-processor.ts index 98ea85b0e5..5ae835bc6d 100644 --- a/src/classes/child-processor.ts +++ b/src/classes/child-processor.ts @@ -114,7 +114,7 @@ export class ChildProcessor { job: JobJsonSandbox, send: (msg: any) => Promise, ): SandboxedJob { - return { + const wrappedJob = { ...job, data: JSON.parse(job.data || '{}'), opts: job.opts, @@ -136,7 +136,7 @@ export class ChildProcessor { * Emulate the real job `log` function. */ log: async (row: any) => { - send({ + await send({ cmd: ParentCommand.Log, value: row, }); @@ -145,7 +145,7 @@ export class ChildProcessor { * Emulate the real job `moveToDelayed` function. */ moveToDelayed: async (timestamp: number, token?: string) => { - send({ + await send({ cmd: ParentCommand.MoveToDelayed, value: { timestamp, token }, }); @@ -154,11 +154,14 @@ export class ChildProcessor { * Emulate the real job `updateData` function. */ updateData: async (data: any) => { - send({ + await send({ cmd: ParentCommand.Update, value: data, }); + wrappedJob.data = data; }, }; + + return wrappedJob; } } diff --git a/tests/fixtures/fixture_processor_steps.js b/tests/fixtures/fixture_processor_steps.js new file mode 100644 index 0000000000..74adca9517 --- /dev/null +++ b/tests/fixtures/fixture_processor_steps.js @@ -0,0 +1,37 @@ +'use strict'; + +const delay = require('./delay'); + +module.exports = async function (job) { + let step = job.data.step; + while (step !== 'FINISH') { + switch (step) { + case 'INITIAL': { + await delay(200); + const data = { + ...job.data, + step: 'SECOND', + extraDataSecondStep: 'second data', + }; + await job.updateData(data); + step = 'SECOND'; + break; + } + case 'SECOND': { + await delay(200); + const data = { + ...job.data, + extraDataFinishedStep: 'finish data', + step: 'FINISH', + }; + + await job.updateData(data); + step = 'FINISH'; + return; + } + default: { + throw new Error('invalid step'); + } + } + } +}; diff --git a/tests/test_sandboxed_process.ts b/tests/test_sandboxed_process.ts index 30dd6940ab..df2595d8da 100644 --- a/tests/test_sandboxed_process.ts +++ b/tests/test_sandboxed_process.ts @@ -545,6 +545,37 @@ function sandboxProcessTests( await worker.close(); }); + it('should process steps and complete', async () => { + const processFile = __dirname + '/fixtures/fixture_processor_steps.js'; + + const worker = new Worker(queueName, processFile, { + connection, + prefix, + drainDelay: 1, + }); + + const completing = new Promise((resolve, reject) => { + worker.on('completed', async (job: Job) => { + try { + expect(job.data).to.be.eql({ + step: 'FINISH', + extraDataSecondStep: 'second data', + extraDataFinishedStep: 'finish data', + }); + resolve(); + } catch (err) { + reject(err); + } + }); + }); + + await queue.add('test', { step: 'INITIAL' }); + + await completing; + + await worker.close(); + }); + it('should process and move to delayed', async () => { const processFile = __dirname + '/fixtures/fixture_processor_move_to_delayed.js';