From c1346064c6cd9f93c59b184f150eac11d51c91b4 Mon Sep 17 00:00:00 2001 From: Rogger Valverde Date: Fri, 13 Oct 2023 06:45:58 -0700 Subject: [PATCH] fix(events): do not publish removed event on non-existent jobs (#2227) --- src/commands/removeJob-1.lua | 6 +++--- tests/test_events.ts | 28 ++++++++++++++++++++++++++-- 2 files changed, 29 insertions(+), 5 deletions(-) diff --git a/src/commands/removeJob-1.lua b/src/commands/removeJob-1.lua index ac8a3cedc3..3ef586ba22 100644 --- a/src/commands/removeJob-1.lua +++ b/src/commands/removeJob-1.lua @@ -53,9 +53,9 @@ local function removeJob( prefix, jobId, parentKey, removeChildren) local prev = removeJobFromAnyState(prefix, jobId) - rcall("DEL", jobKey, jobKey .. ":logs", jobKey .. ":dependencies", jobKey .. ":processed") - - rcall("XADD", prefix .. "events", "*", "event", "removed", "jobId", jobId, "prev", prev); + if rcall("DEL", jobKey, jobKey .. ":logs", jobKey .. ":dependencies", jobKey .. ":processed") > 0 then + rcall("XADD", prefix .. "events", "*", "event", "removed", "jobId", jobId, "prev", prev) + end end local prefix = KEYS[1] diff --git a/tests/test_events.ts b/tests/test_events.ts index 0cac989ef2..4edbdb1e4c 100644 --- a/tests/test_events.ts +++ b/tests/test_events.ts @@ -486,6 +486,28 @@ describe('events', function () { await worker.close(); }); + describe('when jobs removal is attempted on non-existed records', async () => { + it('should not publish removed events', async () => { + const numRemovals = 100; + const trimmedQueue = new Queue(queueName, { + connection, + }); + + const client = await trimmedQueue.client; + + for (let i = 0; i < numRemovals; i++) { + await trimmedQueue.remove(i.toString()); + } + + const eventsLength = await client.xlen(trimmedQueue.keys.events); + + expect(eventsLength).to.be.eql(0); + + await trimmedQueue.close(); + await removeAllQueueData(new IORedis(), queueName); + }); + }); + describe('when maxLen is 0', function () { it('should trim events automatically', async () => { const trimmedQueue = new Queue(queueName, { @@ -657,7 +679,7 @@ describe('events', function () { }); }); - describe('when jobs are retried inmediately', function () { + describe('when jobs are retried immediately', function () { it('should trim events so its length is at least the threshold', async () => { const numJobs = 80; const trimmedQueue = new Queue(queueName, { @@ -685,7 +707,9 @@ describe('events', function () { const waitCompletedEvent = new Promise(resolve => { queueEvents.on('waiting', async ({ jobId, prev }) => { - if (prev === 'failed' && jobId === numJobs + '') {resolve();} + if (prev === 'failed' && jobId === numJobs + '') { + resolve(); + } }); });