Skip to content

Commit

Permalink
perf(events): trim events when removing jobs
Browse files Browse the repository at this point in the history
  • Loading branch information
roggervalf committed Oct 17, 2023
1 parent fb7c648 commit 1719752
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 1 deletion.
4 changes: 3 additions & 1 deletion src/commands/removeJob-1.lua
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,9 @@ local function removeJob( prefix, jobId, parentKey, removeChildren)
local prev = removeJobFromAnyState(prefix, jobId)

if rcall("DEL", jobKey, jobKey .. ":logs", jobKey .. ":dependencies", jobKey .. ":processed") > 0 then
rcall("XADD", prefix .. "events", "*", "event", "removed", "jobId", jobId, "prev", prev)
local maxEvents = rcall("HGET", prefix .. "meta", "opts.maxLenEvents") or 10000
rcall("XADD", prefix .. "events", "MAXLEN", "~", maxEvents, "*", "event", "removed",
"jobId", jobId, "prev", prev)
end
end

Expand Down
34 changes: 34 additions & 0 deletions tests/test_events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -734,6 +734,40 @@ describe('events', function () {
await removeAllQueueData(new IORedis(), queueName);
});
});

describe('when jobs removal is attempted', async () => {
it('should trim events so its length is at least the threshold', async () => {
const numRemovals = 200;
const trimmedQueue = new Queue(queueName, {
connection,
streams: {
events: {
maxLen: 20,
},
},
});

const client = await trimmedQueue.client;

const jobs = Array.from(Array(numRemovals).keys()).map(() => ({
name: 'test',
data: { foo: 'bar' },
}));
await trimmedQueue.addBulk(jobs);

for (let i = 1; i <= numRemovals; i++) {
await trimmedQueue.remove(i.toString());
}

const eventsLength = await client.xlen(trimmedQueue.keys.events);

expect(eventsLength).to.be.lte(100);
expect(eventsLength).to.be.gte(20);

await trimmedQueue.close();
await removeAllQueueData(new IORedis(), queueName);
});
});
});

it('should trim events manually', async () => {
Expand Down

0 comments on commit 1719752

Please sign in to comment.