From 1986b05ac03fe4ee48861aa60caadcc9df8170a6 Mon Sep 17 00:00:00 2001 From: Rogger Valverde Date: Mon, 9 Oct 2023 19:15:42 -0700 Subject: [PATCH] fix(events): trim events when retrying a job (#2224) --- src/commands/moveToDelayed-8.lua | 1 - src/commands/retryJob-9.lua | 5 ++- tests/test_events.ts | 55 +++++++++++++++++++++++++++++++- 3 files changed, 58 insertions(+), 3 deletions(-) diff --git a/src/commands/moveToDelayed-8.lua b/src/commands/moveToDelayed-8.lua index f9673f4a70..c1695141a1 100644 --- a/src/commands/moveToDelayed-8.lua +++ b/src/commands/moveToDelayed-8.lua @@ -57,7 +57,6 @@ if rcall("EXISTS", jobKey) == 1 then rcall("ZADD", delayedKey, score, jobId) rcall("XADD", KEYS[6], "MAXLEN", "~", maxEvents, "*", "event", "delayed", - --rcall("XADD", KEYS[6], "*", "event", "delayed", "jobId", jobId, "delay", delayedTimestamp) -- Check if we need to push a marker job to wake up sleeping workers. diff --git a/src/commands/retryJob-9.lua b/src/commands/retryJob-9.lua index 0fcb3a11ed..5711f17b9a 100644 --- a/src/commands/retryJob-9.lua +++ b/src/commands/retryJob-9.lua @@ -60,8 +60,11 @@ if rcall("EXISTS", KEYS[4]) == 1 then addJobWithPriority(KEYS[2], KEYS[8], priority, paused, ARGV[4], KEYS[9]) end + local maxEvents = rcall("HGET", KEYS[5], "opts.maxLenEvents") or 10000 + -- Emit waiting event - rcall("XADD", KEYS[6], "*", "event", "waiting", "jobId", ARGV[4], "prev", "failed") + rcall("XADD", KEYS[6], "MAXLEN", "~", maxEvents, "*", "event", "waiting", + "jobId", ARGV[4], "prev", "failed") return 0 else diff --git a/tests/test_events.ts b/tests/test_events.ts index 2fd012f93f..0cac989ef2 100644 --- a/tests/test_events.ts +++ b/tests/test_events.ts @@ -611,7 +611,6 @@ describe('events', function () { }, }); - const i = 0; const worker = new Worker( queueName, async () => { @@ -657,6 +656,60 @@ describe('events', function () { await removeAllQueueData(new IORedis(), queueName); }); }); + + describe('when jobs are retried inmediately', function () { + it('should trim events so its length is at least the threshold', async () => { + const numJobs = 80; + const trimmedQueue = new Queue(queueName, { + connection, + streams: { + events: { + maxLen: 20, + }, + }, + }); + + const worker = new Worker( + queueName, + async () => { + await delay(25); + throw new Error('error'); + }, + { connection }, + ); + + await trimmedQueue.waitUntilReady(); + await worker.waitUntilReady(); + + const client = await trimmedQueue.client; + + const waitCompletedEvent = new Promise(resolve => { + queueEvents.on('waiting', async ({ jobId, prev }) => { + if (prev === 'failed' && jobId === numJobs + '') {resolve();} + }); + }); + + const jobs = Array.from(Array(numJobs).keys()).map(() => ({ + name: 'test', + data: { foo: 'bar' }, + opts: { + attempts: 2, + }, + })); + await trimmedQueue.addBulk(jobs); + + await waitCompletedEvent; + + const eventsLength = await client.xlen(trimmedQueue.keys.events); + + expect(eventsLength).to.be.lte(35); + expect(eventsLength).to.be.gte(20); + + await worker.close(); + await trimmedQueue.close(); + await removeAllQueueData(new IORedis(), queueName); + }); + }); }); it('should trim events manually', async () => {