diff --git a/src/classes/job.ts b/src/classes/job.ts index f457b92121..c0de437cbd 100644 --- a/src/classes/job.ts +++ b/src/classes/job.ts @@ -1001,8 +1001,12 @@ export class Job< * @param token - token to check job is locked by current worker * @returns */ - moveToDelayed(timestamp: number, token?: string): Promise { - return this.scripts.moveToDelayed(this.id, timestamp, token); + moveToDelayed( + timestamp: number, + token?: string, + skipAttempt?: boolean, + ): Promise { + return this.scripts.moveToDelayed(this.id, timestamp, token, skipAttempt); } /** diff --git a/src/classes/scripts.ts b/src/classes/scripts.ts index 89da747223..a78e3152bd 100644 --- a/src/classes/scripts.ts +++ b/src/classes/scripts.ts @@ -574,6 +574,7 @@ export class Scripts { jobId: string, timestamp: number, token: string, + skipAttempt?: boolean, ): (string | number)[] { // // Bake in the job id first 12 bits into the timestamp @@ -609,6 +610,7 @@ export class Scripts { JSON.stringify(timestamp), jobId, token, + skipAttempt ? '1' : '0', ]); } @@ -649,10 +651,11 @@ export class Scripts { jobId: string, timestamp: number, token = '0', + skipAttempt = false, ): Promise { const client = await this.queue.client; - const args = this.moveToDelayedArgs(jobId, timestamp, token); + const args = this.moveToDelayedArgs(jobId, timestamp, token, skipAttempt); const result = await (client).moveToDelayed(args); if (result < 0) { throw this.finishedErrors(result, jobId, 'moveToDelayed', 'active'); diff --git a/src/commands/moveToDelayed-8.lua b/src/commands/moveToDelayed-8.lua index 77e6cd3488..44ceeda85d 100644 --- a/src/commands/moveToDelayed-8.lua +++ b/src/commands/moveToDelayed-8.lua @@ -16,6 +16,7 @@ ARGV[3] delayedTimestamp ARGV[4] the id of the job ARGV[5] queue token + ARGV[6] skip attempt Output: 0 - OK @@ -53,6 +54,10 @@ if rcall("EXISTS", jobKey) == 1 then return -3 end + if ARGV[6] == "1" then + rcall("HINCRBY", jobKey, "attemptsMade", -1) + end + rcall("ZADD", delayedKey, score, jobId) rcall("XADD", KEYS[6], "*", "event", "delayed", "jobId", jobId, "delay", delayedTimestamp) diff --git a/tests/test_worker.ts b/tests/test_worker.ts index 315b22dfa8..81a8432a0e 100644 --- a/tests/test_worker.ts +++ b/tests/test_worker.ts @@ -2464,6 +2464,68 @@ describe('workers', function () { await worker.close(); }); + + describe('when skip attempt option is provided', () => { + it('should retry job after a delay time, whithout incrementing attemptMade', async function () { + this.timeout(8000); + + enum Step { + Initial, + Second, + Finish, + } + + const worker = new Worker( + queueName, + async (job, token) => { + let step = job.data.step; + while (step !== Step.Finish) { + switch (step) { + case Step.Initial: { + await job.moveToDelayed(Date.now() + 200, token, true); + await job.updateData({ + step: Step.Second, + }); + throw new DelayedError(); + } + case Step.Second: { + await job.updateData({ + step: Step.Finish, + }); + step = Step.Finish; + return Step.Finish; + } + default: { + throw new Error('invalid step'); + } + } + } + }, + { connection }, + ); + + await worker.waitUntilReady(); + + const start = Date.now(); + await queue.add('test', { step: Step.Initial }); + + await new Promise((resolve, reject) => { + worker.on('completed', job => { + const elapse = Date.now() - start; + expect(elapse).to.be.greaterThan(200); + expect(job.returnvalue).to.be.eql(Step.Finish); + expect(job.attemptsMade).to.be.eql(1); + resolve(); + }); + + worker.on('error', () => { + reject(); + }); + }); + + await worker.close(); + }); + }); }); describe('when creating children at runtime', () => {