Skip to content

Commit

Permalink
fix(events): trim events when retrying a job (#2224)
Browse files Browse the repository at this point in the history
  • Loading branch information
roggervalf authored Oct 10, 2023
1 parent a01bb0b commit 1986b05
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 3 deletions.
1 change: 0 additions & 1 deletion src/commands/moveToDelayed-8.lua
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ if rcall("EXISTS", jobKey) == 1 then

rcall("ZADD", delayedKey, score, jobId)
rcall("XADD", KEYS[6], "MAXLEN", "~", maxEvents, "*", "event", "delayed",
--rcall("XADD", KEYS[6], "*", "event", "delayed",
"jobId", jobId, "delay", delayedTimestamp)

-- Check if we need to push a marker job to wake up sleeping workers.
Expand Down
5 changes: 4 additions & 1 deletion src/commands/retryJob-9.lua
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,11 @@ if rcall("EXISTS", KEYS[4]) == 1 then
addJobWithPriority(KEYS[2], KEYS[8], priority, paused, ARGV[4], KEYS[9])
end

local maxEvents = rcall("HGET", KEYS[5], "opts.maxLenEvents") or 10000

-- Emit waiting event
rcall("XADD", KEYS[6], "*", "event", "waiting", "jobId", ARGV[4], "prev", "failed")
rcall("XADD", KEYS[6], "MAXLEN", "~", maxEvents, "*", "event", "waiting",
"jobId", ARGV[4], "prev", "failed")

return 0
else
Expand Down
55 changes: 54 additions & 1 deletion tests/test_events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -611,7 +611,6 @@ describe('events', function () {
},
});

const i = 0;
const worker = new Worker(
queueName,
async () => {
Expand Down Expand Up @@ -657,6 +656,60 @@ describe('events', function () {
await removeAllQueueData(new IORedis(), queueName);
});
});

describe('when jobs are retried inmediately', function () {
it('should trim events so its length is at least the threshold', async () => {
const numJobs = 80;
const trimmedQueue = new Queue(queueName, {
connection,
streams: {
events: {
maxLen: 20,
},
},
});

const worker = new Worker(
queueName,
async () => {
await delay(25);
throw new Error('error');
},
{ connection },
);

await trimmedQueue.waitUntilReady();
await worker.waitUntilReady();

const client = await trimmedQueue.client;

const waitCompletedEvent = new Promise<void>(resolve => {
queueEvents.on('waiting', async ({ jobId, prev }) => {
if (prev === 'failed' && jobId === numJobs + '') {resolve();}
});
});

const jobs = Array.from(Array(numJobs).keys()).map(() => ({
name: 'test',
data: { foo: 'bar' },
opts: {
attempts: 2,
},
}));
await trimmedQueue.addBulk(jobs);

await waitCompletedEvent;

const eventsLength = await client.xlen(trimmedQueue.keys.events);

expect(eventsLength).to.be.lte(35);
expect(eventsLength).to.be.gte(20);

await worker.close();
await trimmedQueue.close();
await removeAllQueueData(new IORedis(), queueName);
});
});
});

it('should trim events manually', async () => {
Expand Down

0 comments on commit 1986b05

Please sign in to comment.