diff --git a/src/commands/addRepeatableJob-2.lua b/src/commands/addRepeatableJob-2.lua index eda5db3b85..4046df5381 100644 --- a/src/commands/addRepeatableJob-2.lua +++ b/src/commands/addRepeatableJob-2.lua @@ -67,8 +67,10 @@ end local prevMillis = rcall("ZSCORE", repeatKey, customKey) if prevMillis ~= false then local delayedJobId = "repeat:" .. customKey .. ":" .. prevMillis + local nextDelayedJobId = repeatKey .. ":" .. customKey .. ":" .. nextMillis - if rcall("ZSCORE", delayedKey, delayedJobId) ~= false then + if rcall("ZSCORE", delayedKey, delayedJobId) ~= false + and rcall("EXISTS", nextDelayedJobId) ~= 1 then removeJob(delayedJobId, true, prefixKey, true --[[remove debounce key]]) rcall("ZREM", delayedKey, delayedJobId) end diff --git a/tests/test_job.ts b/tests/test_job.ts index 2d5b83a644..7bfb5c6531 100644 --- a/tests/test_job.ts +++ b/tests/test_job.ts @@ -1391,6 +1391,67 @@ describe('Job', function () { const isCompleted = await job.isCompleted(); expect(isCompleted).to.be.equal(true); }); + + describe('when re-adding same repeatable job after previous delayed one is promoted', () => { + it('keep one delayed job', async () => { + const job = await queue.add( + 'test', + { foo: 'bar' }, + { + repeat: { + pattern: '0 0 7 * * *', + }, + }, + ); + const isDelayed = await job.isDelayed(); + expect(isDelayed).to.be.equal(true); + + await queue.add( + 'test', + { foo: 'bar' }, + { + repeat: { + pattern: '0 0 7 * * *', + }, + }, + ); + const delayedCount = await queue.getDelayedCount(); + expect(delayedCount).to.be.equal(1); + + await job.promote(); + expect(job.delay).to.be.equal(0); + + const worker = new Worker(queueName, null, { connection, prefix }); + const currentJob1 = (await worker.getNextJob('token')) as Job; + expect(currentJob1).to.not.be.undefined; + + await currentJob1.moveToCompleted('succeeded', 'token', true); + const completedCount = await queue.getCompletedCount(); + const delayedCountAfterPromote = await queue.getDelayedCount(); + expect(completedCount).to.be.equal(1); + expect(delayedCountAfterPromote).to.be.equal(1); + + const completedCountAfterRestart = await queue.getCompletedCount(); + const delayedCountAfterRestart = await queue.getDelayedCount(); + expect(completedCountAfterRestart).to.be.equal(1); + expect(delayedCountAfterRestart).to.be.equal(1); + + await queue.add( + 'test', + { foo: 'bar' }, + { + repeat: { + pattern: '0 0 7 * * *', + }, + }, + ); + + const completedCountAfterReAddition = await queue.getCompletedCount(); + const delayedCountAfterReAddition = await queue.getDelayedCount(); + expect(completedCountAfterReAddition).to.be.equal(1); + expect(delayedCountAfterReAddition).to.be.equal(1); + }); + }); }); describe('when queue is paused', () => {