Skip to content

Commit

Permalink
perf(events): trim events when removing jobs (#2235) (python)
Browse files Browse the repository at this point in the history
  • Loading branch information
roggervalf authored Oct 18, 2023
1 parent fb7c648 commit 889815c
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 8 deletions.
14 changes: 7 additions & 7 deletions src/commands/addJob-9.lua
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ local parentData
--- @include "includes/addDelayMarkerIfNeeded"
--- @include "includes/addJobWithPriority"
--- @include "includes/getTargetQueueList"
--- @include "includes/trimEvents"
--- @include "includes/getNextDelayedTimestamp"
--- @include "includes/updateParentDepsIfNeeded"

Expand All @@ -75,8 +74,7 @@ end

local jobCounter = rcall("INCR", KEYS[4])

-- Trim events before emiting them to avoid trimming events emitted in this script
trimEvents(KEYS[3], KEYS[8])
local maxEvents = rcall("HGET", KEYS[3], "opts.maxLenEvents") or 10000

local parentDependenciesKey = args[7]
local timestamp = args[4]
Expand All @@ -99,7 +97,8 @@ else
end
rcall("HMSET", jobIdKey, "parentKey", parentKey, "parent", parentData)
end
rcall("XADD", KEYS[8], "*", "event", "duplicated", "jobId", jobId)
rcall("XADD", KEYS[8], "MAXLEN", "~", maxEvents, "*", "event", "duplicated",
"jobId", jobId)

return jobId .. "" -- convert to string
end
Expand Down Expand Up @@ -139,8 +138,8 @@ if waitChildrenKey ~= nil then
elseif (delayedTimestamp ~= 0) then
local score = delayedTimestamp * 0x1000 + bit.band(jobCounter, 0xfff)
rcall("ZADD", KEYS[5], score, jobId)
rcall("XADD", KEYS[8], "*", "event", "delayed", "jobId", jobId, "delay",
delayedTimestamp)
rcall("XADD", KEYS[8], "MAXLEN", "~", maxEvents, "*", "event", "delayed", "jobId", jobId,
"delay", delayedTimestamp)
-- If wait list is empty, and this delayed job is the next one to be processed,
-- then we need to signal the workers by adding a dummy job (jobId 0:delay) to the wait list.
local target = getTargetQueueList(KEYS[3], KEYS[1], KEYS[2])
Expand All @@ -157,7 +156,8 @@ else
addJobWithPriority(KEYS[1], KEYS[6], priority, paused, jobId, KEYS[9])
end
-- Emit waiting event
rcall("XADD", KEYS[8], "*", "event", "waiting", "jobId", jobId)
rcall("XADD", KEYS[8], "MAXLEN", "~", maxEvents, "*", "event", "waiting",
"jobId", jobId)
end

-- Check if this job is a child of another job, if so add it to the parents dependencies
Expand Down
4 changes: 3 additions & 1 deletion src/commands/removeJob-1.lua
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,9 @@ local function removeJob( prefix, jobId, parentKey, removeChildren)
local prev = removeJobFromAnyState(prefix, jobId)

if rcall("DEL", jobKey, jobKey .. ":logs", jobKey .. ":dependencies", jobKey .. ":processed") > 0 then
rcall("XADD", prefix .. "events", "*", "event", "removed", "jobId", jobId, "prev", prev)
local maxEvents = rcall("HGET", prefix .. "meta", "opts.maxLenEvents") or 10000
rcall("XADD", prefix .. "events", "MAXLEN", "~", maxEvents, "*", "event", "removed",
"jobId", jobId, "prev", prev)
end
end

Expand Down
34 changes: 34 additions & 0 deletions tests/test_events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -734,6 +734,40 @@ describe('events', function () {
await removeAllQueueData(new IORedis(), queueName);
});
});

describe('when jobs removal is attempted', async () => {
it('should trim events so its length is at least the threshold', async () => {
const numRemovals = 200;
const trimmedQueue = new Queue(queueName, {
connection,
streams: {
events: {
maxLen: 20,
},
},
});

const client = await trimmedQueue.client;

const jobs = Array.from(Array(numRemovals).keys()).map(() => ({
name: 'test',
data: { foo: 'bar' },
}));
await trimmedQueue.addBulk(jobs);

for (let i = 1; i <= numRemovals; i++) {
await trimmedQueue.remove(i.toString());
}

const eventsLength = await client.xlen(trimmedQueue.keys.events);

expect(eventsLength).to.be.lte(100);
expect(eventsLength).to.be.gte(20);

await trimmedQueue.close();
await removeAllQueueData(new IORedis(), queueName);
});
});
});

it('should trim events manually', async () => {
Expand Down

0 comments on commit 889815c

Please sign in to comment.