diff --git a/src/commands/cleanJobsInSet-2.lua b/src/commands/cleanJobsInSet-2.lua index 20f06b9352..c33c64eb11 100644 --- a/src/commands/cleanJobsInSet-2.lua +++ b/src/commands/cleanJobsInSet-2.lua @@ -34,13 +34,19 @@ local result if ARGV[4] == "active" then result = cleanList(KEYS[1], ARGV[1], rangeStart, rangeEnd, ARGV[2], false) elseif ARGV[4] == "delayed" then - result = cleanSet(KEYS[1], ARGV[1], rangeStart, rangeEnd, ARGV[2], limit, {"processedOn", "timestamp"}) + rangeEnd = "+inf" + result = cleanSet(KEYS[1], ARGV[1], rangeEnd, ARGV[2], limit, + {"processedOn", "timestamp"}, false) elseif ARGV[4] == "prioritized" then - result = cleanSet(KEYS[1], ARGV[1], rangeStart, rangeEnd, ARGV[2], limit, {"timestamp"}) + rangeEnd = "+inf" + result = cleanSet(KEYS[1], ARGV[1], rangeEnd, ARGV[2], limit, + {"timestamp"}, false) elseif ARGV[4] == "wait" or ARGV[4] == "paused" then result = cleanList(KEYS[1], ARGV[1], rangeStart, rangeEnd, ARGV[2], true) else - result = cleanSet(KEYS[1], ARGV[1], rangeStart, rangeEnd, ARGV[2], limit, {"finishedOn"} ) + rangeEnd = ARGV[2] + result = cleanSet(KEYS[1], ARGV[1], rangeEnd, ARGV[2], limit, + {"finishedOn"}, true) end rcall("XADD", KEYS[2], "*", "event", "cleaned", "count", result[2]) diff --git a/src/commands/includes/cleanList.lua b/src/commands/includes/cleanList.lua index f7e7a65225..99b397ef53 100644 --- a/src/commands/includes/cleanList.lua +++ b/src/commands/includes/cleanList.lua @@ -30,7 +30,7 @@ local function cleanList(listKey, jobKeyPrefix, rangeStart, rangeEnd, -- Fetch all three of these (in that order) and use the first one that is set so that we'll leave jobs -- that have been active within the grace period: jobTS = getTimestamp(jobKey, {"finishedOn", "processedOn", "timestamp"}) - if (not jobTS or jobTS < timestamp) then + if (not jobTS or jobTS <= timestamp) then -- replace the entry with a deletion marker; the actual deletion will -- occur at the end of the script rcall("LSET", listKey, rangeEnd - jobIdsLen + i, deletionMarker) diff --git a/src/commands/includes/cleanSet.lua b/src/commands/includes/cleanSet.lua index 3f795ccdd6..f83393d50a 100644 --- a/src/commands/includes/cleanSet.lua +++ b/src/commands/includes/cleanSet.lua @@ -9,8 +9,8 @@ --- @include "getTimestamp" --- @include "removeJob" -local function cleanSet(setKey, jobKeyPrefix, rangeStart, rangeEnd, timestamp, limit, attributes) - local jobs = getJobsInZset(setKey, rangeStart, rangeEnd, timestamp, limit) +local function cleanSet(setKey, jobKeyPrefix, rangeEnd, timestamp, limit, attributes, isFinished) + local jobs = getJobsInZset(setKey, rangeEnd, limit) local deleted = {} local deletedCount = 0 local jobTS @@ -20,12 +20,18 @@ local function cleanSet(setKey, jobKeyPrefix, rangeStart, rangeEnd, timestamp, l end local jobKey = jobKeyPrefix .. job - -- * finishedOn says when the job was completed, but it isn't set unless the job has actually completed - jobTS = getTimestamp(jobKey, attributes) - if (not jobTS or jobTS < timestamp) then + if isFinished then removeJob(job, true, jobKeyPrefix) deletedCount = deletedCount + 1 table.insert(deleted, job) + else + -- * finishedOn says when the job was completed, but it isn't set unless the job has actually completed + jobTS = getTimestamp(jobKey, attributes) + if (not jobTS or jobTS <= timestamp) then + removeJob(job, true, jobKeyPrefix) + deletedCount = deletedCount + 1 + table.insert(deleted, job) + end end end diff --git a/src/commands/includes/getJobsInZset.lua b/src/commands/includes/getJobsInZset.lua index f2f4ab1f37..a9f90c3960 100644 --- a/src/commands/includes/getJobsInZset.lua +++ b/src/commands/includes/getJobsInZset.lua @@ -2,11 +2,10 @@ -- of items in a sorted set only run a single iteration. If we simply used -- ZRANGE, we may take a long time traversing through jobs that are within the -- grace period. -local function getJobsInZset(zsetKey, rangeStart, rangeEnd, maxTimestamp, limit) +local function getJobsInZset(zsetKey, rangeEnd, limit) if limit > 0 then - return rcall("ZRANGEBYSCORE", zsetKey, 0, maxTimestamp, "LIMIT", 0, limit) + return rcall("ZRANGEBYSCORE", zsetKey, 0, rangeEnd, "LIMIT", 0, limit) else - return rcall("ZRANGE", zsetKey, rangeStart, rangeEnd) + return rcall("ZRANGEBYSCORE", zsetKey, 0, rangeEnd) end end - \ No newline at end of file diff --git a/tests/test_clean.ts b/tests/test_clean.ts index 5415083f30..5106178f3a 100644 --- a/tests/test_clean.ts +++ b/tests/test_clean.ts @@ -175,6 +175,26 @@ describe('Cleaner', () => { expect(count).to.be.eql(0); }); + it('should clean all delayed jobs when limit is given', async () => { + await queue.add('test', { some: 'data' }, { delay: 5000 }); + await queue.add('test', { some: 'data' }, { delay: 5000 }); + await delay(100); + const jobs = await queue.clean(0, 1000, 'delayed'); + expect(jobs.length).to.be.eql(2); + const count = await queue.count(); + expect(count).to.be.eql(0); + }); + + it('should clean all prioritized jobs when limit is given', async () => { + await queue.add('test', { some: 'data' }, { priority: 5000 }); + await queue.add('test', { some: 'data' }, { priority: 5001 }); + await delay(100); + const jobs = await queue.clean(0, 1000, 'prioritized'); + expect(jobs.length).to.be.eql(2); + const count = await queue.count(); + expect(count).to.be.eql(0); + }); + describe('when prioritized state is provided', async () => { it('should clean the number of jobs requested', async () => { await queue.add('test', { some: 'data' }, { priority: 1 }); // as queue is empty, this job will be added to wait