diff --git a/src/commands/removeJob-1.lua b/src/commands/removeJob-1.lua index ac8a3cedc3..3ef586ba22 100644 --- a/src/commands/removeJob-1.lua +++ b/src/commands/removeJob-1.lua @@ -53,9 +53,9 @@ local function removeJob( prefix, jobId, parentKey, removeChildren) local prev = removeJobFromAnyState(prefix, jobId) - rcall("DEL", jobKey, jobKey .. ":logs", jobKey .. ":dependencies", jobKey .. ":processed") - - rcall("XADD", prefix .. "events", "*", "event", "removed", "jobId", jobId, "prev", prev); + if rcall("DEL", jobKey, jobKey .. ":logs", jobKey .. ":dependencies", jobKey .. ":processed") > 0 then + rcall("XADD", prefix .. "events", "*", "event", "removed", "jobId", jobId, "prev", prev) + end end local prefix = KEYS[1] diff --git a/tests/test_events.ts b/tests/test_events.ts index 8eeb3c769f..8cbeac29bb 100644 --- a/tests/test_events.ts +++ b/tests/test_events.ts @@ -505,6 +505,28 @@ describe('events', function () { await worker.close(); }); + describe('when jobs removal is attempted on non-existed records', async () => { + it('should not publish removed events', async () => { + const numRemovals = 100; + const trimmedQueue = new Queue(queueName, { + connection, + }); + + const client = await trimmedQueue.client; + + for (let i = 0; i < numRemovals; i++) { + await trimmedQueue.remove(i.toString()); + } + + const eventsLength = await client.xlen(trimmedQueue.keys.events); + + expect(eventsLength).to.be.eql(0); + + await trimmedQueue.close(); + await removeAllQueueData(new IORedis(), queueName); + }); + }); + describe('when maxLen is 0', function () { it('should trim events automatically', async () => { const trimmedQueue = new Queue(queueName, { @@ -679,7 +701,7 @@ describe('events', function () { }); }); - describe('when jobs are retried inmediately', function () { + describe('when jobs are retried immediately', function () { it('should trim events so its length is at least the threshold', async () => { const numJobs = 80; const trimmedQueue = new Queue(queueName, {