From cbbb502846e1c0e692cb3e708c1e25fbfe71e5b7 Mon Sep 17 00:00:00 2001 From: roggervalf Date: Fri, 19 Jan 2024 21:19:17 -0500 Subject: [PATCH] fix(marker): differentiate standard and delayed markers --- .../includes/addDelayMarkerIfNeeded.lua | 2 +- tests/test_job.ts | 56 +++++++++++++++++++ 2 files changed, 57 insertions(+), 1 deletion(-) diff --git a/src/commands/includes/addDelayMarkerIfNeeded.lua b/src/commands/includes/addDelayMarkerIfNeeded.lua index 15e6d72d21..0ec017696e 100644 --- a/src/commands/includes/addDelayMarkerIfNeeded.lua +++ b/src/commands/includes/addDelayMarkerIfNeeded.lua @@ -10,6 +10,6 @@ local function addDelayMarkerIfNeeded(markerKey, delayedKey) if nextTimestamp ~= nil then -- Replace the score of the marker with the newest known -- next timestamp. - rcall("ZADD", markerKey, nextTimestamp, "0") + rcall("ZADD", markerKey, nextTimestamp, "1") end end diff --git a/tests/test_job.ts b/tests/test_job.ts index 63555e9db2..3a98ed8949 100644 --- a/tests/test_job.ts +++ b/tests/test_job.ts @@ -16,6 +16,7 @@ import { v4 } from 'uuid'; import { Job, Queue, QueueEvents, Worker } from '../src/classes'; import { JobsOptions } from '../src/types'; import { delay, getParentKey, removeAllQueueData } from '../src/utils'; +import { SSL_OP_NO_TLSv1_2 } from 'constants'; describe('Job', function () { const redisHost = process.env.REDIS_HOST || 'localhost'; @@ -882,6 +883,61 @@ describe('Job', function () { `Job ${job.id} is not in the delayed state. changeDelay`, ); }); + + describe('when adding delayed job after standard one when worker is drained', () => { + it('pick standard job without delay', async function () { + this.timeout(6000); + + await Job.create(queue, 'test1', { foo: 'bar' }); + + const worker = new Worker( + queueName, + async job => { + await delay(1000); + }, + { + connection, + prefix, + }, + ); + await worker.waitUntilReady(); + + // after this event, worker should be drained + const completing = new Promise(resolve => { + worker.once('completed', async () => { + await queue.addBulk([ + { name: 'test1', data: { idx: 0, foo: 'bar' } }, + { + name: 'test2', + data: { idx: 1, foo: 'baz' }, + opts: { delay: 3000 }, + }, + ]); + + resolve(); + }); + }); + + await completing; + + const now = Date.now(); + const completing2 = new Promise(resolve => { + worker.on( + 'completed', + after(2, job => { + const timeDiff = Date.now() - now; + expect(timeDiff).to.be.greaterThanOrEqual(4000); + expect(timeDiff).to.be.lessThan(4500); + expect(job.delay).to.be.equal(0); + resolve(); + }), + ); + }); + + await completing2; + await worker.close(); + }); + }); }); describe('.changePriority', () => {