Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(queue): different score purpose per state #2133

Merged
merged 8 commits into from
Sep 20, 2023
9 changes: 6 additions & 3 deletions src/commands/cleanJobsInSet-2.lua
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,16 @@ local result
if ARGV[4] == "active" then
result = cleanList(KEYS[1], ARGV[1], rangeStart, rangeEnd, ARGV[2], false)
elseif ARGV[4] == "delayed" then
result = cleanSet(KEYS[1], ARGV[1], rangeStart, rangeEnd, ARGV[2], limit, {"processedOn", "timestamp"})
result = cleanSet(KEYS[1], ARGV[1], rangeStart, rangeEnd, ARGV[2], limit,
{"processedOn", "timestamp"}, false)
elseif ARGV[4] == "prioritized" then
result = cleanSet(KEYS[1], ARGV[1], rangeStart, rangeEnd, ARGV[2], limit, {"timestamp"})
result = cleanSet(KEYS[1], ARGV[1], rangeStart, rangeEnd, ARGV[2], limit,
{"timestamp"}, false)
elseif ARGV[4] == "wait" or ARGV[4] == "paused" then
result = cleanList(KEYS[1], ARGV[1], rangeStart, rangeEnd, ARGV[2], true)
else
result = cleanSet(KEYS[1], ARGV[1], rangeStart, rangeEnd, ARGV[2], limit, {"finishedOn"} )
result = cleanSet(KEYS[1], ARGV[1], rangeStart, rangeEnd, ARGV[2], limit,
{"finishedOn"}, true )
end

rcall("XADD", KEYS[2], "*", "event", "cleaned", "count", result[2])
Expand Down
7 changes: 4 additions & 3 deletions src/commands/includes/cleanSet.lua
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@
--- @include "getTimestamp"
--- @include "removeJob"

local function cleanSet(setKey, jobKeyPrefix, rangeStart, rangeEnd, timestamp, limit, attributes)
local jobs = getJobsInZset(setKey, rangeStart, rangeEnd, timestamp, limit)
local function cleanSet(setKey, jobKeyPrefix, rangeStart, rangeEnd, timestamp, limit, attributes, useTimestampAsScore)
local jobs = getJobsInZset(setKey, rangeStart, rangeEnd, timestamp, limit, useTimestampAsScore)
local deleted = {}
local deletedCount = 0
local jobTS
Expand All @@ -21,8 +21,9 @@ local function cleanSet(setKey, jobKeyPrefix, rangeStart, rangeEnd, timestamp, l

local jobKey = jobKeyPrefix .. job
-- * finishedOn says when the job was completed, but it isn't set unless the job has actually completed
-- TODO: check if in completed/failed this is needed, as ZRANGEBYSCORE could bring all the expected jobs
jobTS = getTimestamp(jobKey, attributes)
if (not jobTS or jobTS < timestamp) then
if (not jobTS or jobTS <= timestamp) then
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think better to consider <= as for completed and failed we can bring all the expected jobs in ZRANGEBYSCORE

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I need more context to understand this 😅

removeJob(job, true, jobKeyPrefix)
deletedCount = deletedCount + 1
table.insert(deleted, job)
Expand Down
14 changes: 10 additions & 4 deletions src/commands/includes/getJobsInZset.lua
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,17 @@
-- of items in a sorted set only run a single iteration. If we simply used
-- ZRANGE, we may take a long time traversing through jobs that are within the
-- grace period.
local function getJobsInZset(zsetKey, rangeStart, rangeEnd, maxTimestamp, limit)
local function getJobsInZset(zsetKey, rangeStart, rangeEnd, maxTimestamp, limit, useTimestampAsScore)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am confused, why aren't we using rangeEnd anymore?
Btw, instead of sending useTimeAsScore, couldn't we just set the proper rangeEnd argument before calling this function?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rangeStart is not used anymore either 🤔

local endRange
if useTimestampAsScore then
endRange = maxTimestamp
else
endRange = "+inf"
end

if limit > 0 then
return rcall("ZRANGEBYSCORE", zsetKey, 0, maxTimestamp, "LIMIT", 0, limit)
return rcall("ZRANGEBYSCORE", zsetKey, 0, endRange, "LIMIT", 0, limit)
else
return rcall("ZRANGE", zsetKey, rangeStart, rangeEnd)
return rcall("ZRANGEBYSCORE", zsetKey, 0, endRange)
end
end

20 changes: 20 additions & 0 deletions tests/test_clean.ts
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,26 @@ describe('Cleaner', () => {
expect(count).to.be.eql(0);
});

it('should clean all delayed jobs when limit is given', async () => {
await queue.add('test', { some: 'data' }, { delay: 5000 });
await queue.add('test', { some: 'data' }, { delay: 5000 });
await delay(100);
const jobs = await queue.clean(0, 1000, 'delayed');
expect(jobs.length).to.be.eql(2);
const count = await queue.count();
expect(count).to.be.eql(0);
});

it('should clean all prioritized jobs when limit is given', async () => {
await queue.add('test', { some: 'data' }, { priority: 5000 });
await queue.add('test', { some: 'data' }, { priority: 5001 });
await delay(100);
const jobs = await queue.clean(0, 1000, 'prioritized');
expect(jobs.length).to.be.eql(2);
const count = await queue.count();
expect(count).to.be.eql(0);
});

describe('when prioritized state is provided', async () => {
it('should clean the number of jobs requested', async () => {
await queue.add('test', { some: 'data' }, { priority: 1 }); // as queue is empty, this job will be added to wait
Expand Down