From 2219498723f641f2379b7acfb98a0a37a1a274d0 Mon Sep 17 00:00:00 2001 From: roggervalf Date: Fri, 11 Aug 2023 20:41:56 -0500 Subject: [PATCH 1/6] fix(queue): different score purpose per state --- src/commands/cleanJobsInSet-2.lua | 10 +++++++--- src/commands/includes/cleanSet.lua | 5 +++-- src/commands/includes/getJobsInZset.lua | 14 ++++++++++---- src/commands/includes/getTimestamp.lua | 2 ++ tests/test_clean.ts | 20 ++++++++++++++++++++ 5 files changed, 42 insertions(+), 9 deletions(-) diff --git a/src/commands/cleanJobsInSet-2.lua b/src/commands/cleanJobsInSet-2.lua index 20f06b9352..719320bd05 100644 --- a/src/commands/cleanJobsInSet-2.lua +++ b/src/commands/cleanJobsInSet-2.lua @@ -34,13 +34,17 @@ local result if ARGV[4] == "active" then result = cleanList(KEYS[1], ARGV[1], rangeStart, rangeEnd, ARGV[2], false) elseif ARGV[4] == "delayed" then - result = cleanSet(KEYS[1], ARGV[1], rangeStart, rangeEnd, ARGV[2], limit, {"processedOn", "timestamp"}) + rcall("SET", "DEBUG0", ARGV[2]) + result = cleanSet(KEYS[1], ARGV[1], rangeStart, rangeEnd, ARGV[2], limit, + {"processedOn", "timestamp"}, false) elseif ARGV[4] == "prioritized" then - result = cleanSet(KEYS[1], ARGV[1], rangeStart, rangeEnd, ARGV[2], limit, {"timestamp"}) + result = cleanSet(KEYS[1], ARGV[1], rangeStart, rangeEnd, ARGV[2], limit, + {"timestamp"}, false) elseif ARGV[4] == "wait" or ARGV[4] == "paused" then result = cleanList(KEYS[1], ARGV[1], rangeStart, rangeEnd, ARGV[2], true) else - result = cleanSet(KEYS[1], ARGV[1], rangeStart, rangeEnd, ARGV[2], limit, {"finishedOn"} ) + result = cleanSet(KEYS[1], ARGV[1], rangeStart, rangeEnd, ARGV[2], limit, + {"finishedOn"}, true ) end rcall("XADD", KEYS[2], "*", "event", "cleaned", "count", result[2]) diff --git a/src/commands/includes/cleanSet.lua b/src/commands/includes/cleanSet.lua index 3f795ccdd6..abcba2aca4 100644 --- a/src/commands/includes/cleanSet.lua +++ b/src/commands/includes/cleanSet.lua @@ -9,8 +9,9 @@ --- @include "getTimestamp" --- @include "removeJob" -local function cleanSet(setKey, jobKeyPrefix, rangeStart, rangeEnd, timestamp, limit, attributes) - local jobs = getJobsInZset(setKey, rangeStart, rangeEnd, timestamp, limit) +local function cleanSet(setKey, jobKeyPrefix, rangeStart, rangeEnd, timestamp, limit, attributes, useTimestampAsScore) + local jobs = getJobsInZset(setKey, rangeStart, rangeEnd, timestamp, limit, useTimestampAsScore) + rcall("SET", "DEBUG4", #jobs) local deleted = {} local deletedCount = 0 local jobTS diff --git a/src/commands/includes/getJobsInZset.lua b/src/commands/includes/getJobsInZset.lua index f2f4ab1f37..a5cd2bafee 100644 --- a/src/commands/includes/getJobsInZset.lua +++ b/src/commands/includes/getJobsInZset.lua @@ -2,11 +2,17 @@ -- of items in a sorted set only run a single iteration. If we simply used -- ZRANGE, we may take a long time traversing through jobs that are within the -- grace period. -local function getJobsInZset(zsetKey, rangeStart, rangeEnd, maxTimestamp, limit) +local function getJobsInZset(zsetKey, rangeStart, rangeEnd, maxTimestamp, limit, useTimestampAsScore) + local endRange + if useTimestampAsScore then + endRange = maxTimestamp + else + endRange = "+inf" + end + if limit > 0 then - return rcall("ZRANGEBYSCORE", zsetKey, 0, maxTimestamp, "LIMIT", 0, limit) + return rcall("ZRANGEBYSCORE", zsetKey, 0, endRange, "LIMIT", 0, limit) else - return rcall("ZRANGE", zsetKey, rangeStart, rangeEnd) + return rcall("ZRANGEBYSCORE", zsetKey, 0, endRange) end end - \ No newline at end of file diff --git a/src/commands/includes/getTimestamp.lua b/src/commands/includes/getTimestamp.lua index 6cb70b5523..ad0a74b28d 100644 --- a/src/commands/includes/getTimestamp.lua +++ b/src/commands/includes/getTimestamp.lua @@ -9,7 +9,9 @@ local function getTimestamp(jobKey, attributes) local jobTs for _, ts in ipairs(rcall("HMGET", jobKey, unpack(attributes))) do + rcall("SET", "DEBUG1", type(ts)) if (ts) then + rcall("SET", "DEBUG2", ts) jobTs = ts break end diff --git a/tests/test_clean.ts b/tests/test_clean.ts index 5415083f30..5106178f3a 100644 --- a/tests/test_clean.ts +++ b/tests/test_clean.ts @@ -175,6 +175,26 @@ describe('Cleaner', () => { expect(count).to.be.eql(0); }); + it('should clean all delayed jobs when limit is given', async () => { + await queue.add('test', { some: 'data' }, { delay: 5000 }); + await queue.add('test', { some: 'data' }, { delay: 5000 }); + await delay(100); + const jobs = await queue.clean(0, 1000, 'delayed'); + expect(jobs.length).to.be.eql(2); + const count = await queue.count(); + expect(count).to.be.eql(0); + }); + + it('should clean all prioritized jobs when limit is given', async () => { + await queue.add('test', { some: 'data' }, { priority: 5000 }); + await queue.add('test', { some: 'data' }, { priority: 5001 }); + await delay(100); + const jobs = await queue.clean(0, 1000, 'prioritized'); + expect(jobs.length).to.be.eql(2); + const count = await queue.count(); + expect(count).to.be.eql(0); + }); + describe('when prioritized state is provided', async () => { it('should clean the number of jobs requested', async () => { await queue.add('test', { some: 'data' }, { priority: 1 }); // as queue is empty, this job will be added to wait From 7695e435e5fe95c0de9341ff58938d79d4c9afc6 Mon Sep 17 00:00:00 2001 From: roggervalf Date: Fri, 11 Aug 2023 20:44:17 -0500 Subject: [PATCH 2/6] chore: remove debug statements --- src/commands/cleanJobsInSet-2.lua | 1 - src/commands/includes/cleanSet.lua | 1 - src/commands/includes/getTimestamp.lua | 2 -- 3 files changed, 4 deletions(-) diff --git a/src/commands/cleanJobsInSet-2.lua b/src/commands/cleanJobsInSet-2.lua index 719320bd05..22cce04a28 100644 --- a/src/commands/cleanJobsInSet-2.lua +++ b/src/commands/cleanJobsInSet-2.lua @@ -34,7 +34,6 @@ local result if ARGV[4] == "active" then result = cleanList(KEYS[1], ARGV[1], rangeStart, rangeEnd, ARGV[2], false) elseif ARGV[4] == "delayed" then - rcall("SET", "DEBUG0", ARGV[2]) result = cleanSet(KEYS[1], ARGV[1], rangeStart, rangeEnd, ARGV[2], limit, {"processedOn", "timestamp"}, false) elseif ARGV[4] == "prioritized" then diff --git a/src/commands/includes/cleanSet.lua b/src/commands/includes/cleanSet.lua index abcba2aca4..047de0a4af 100644 --- a/src/commands/includes/cleanSet.lua +++ b/src/commands/includes/cleanSet.lua @@ -11,7 +11,6 @@ local function cleanSet(setKey, jobKeyPrefix, rangeStart, rangeEnd, timestamp, limit, attributes, useTimestampAsScore) local jobs = getJobsInZset(setKey, rangeStart, rangeEnd, timestamp, limit, useTimestampAsScore) - rcall("SET", "DEBUG4", #jobs) local deleted = {} local deletedCount = 0 local jobTS diff --git a/src/commands/includes/getTimestamp.lua b/src/commands/includes/getTimestamp.lua index ad0a74b28d..6cb70b5523 100644 --- a/src/commands/includes/getTimestamp.lua +++ b/src/commands/includes/getTimestamp.lua @@ -9,9 +9,7 @@ local function getTimestamp(jobKey, attributes) local jobTs for _, ts in ipairs(rcall("HMGET", jobKey, unpack(attributes))) do - rcall("SET", "DEBUG1", type(ts)) if (ts) then - rcall("SET", "DEBUG2", ts) jobTs = ts break end From 6d90230052e2b2f11286c98366c2bc9ae1f81d1b Mon Sep 17 00:00:00 2001 From: roggervalf Date: Fri, 11 Aug 2023 20:59:32 -0500 Subject: [PATCH 3/6] docs: add todo comment for perf change --- src/commands/includes/cleanSet.lua | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/commands/includes/cleanSet.lua b/src/commands/includes/cleanSet.lua index 047de0a4af..25b3f685e4 100644 --- a/src/commands/includes/cleanSet.lua +++ b/src/commands/includes/cleanSet.lua @@ -21,8 +21,9 @@ local function cleanSet(setKey, jobKeyPrefix, rangeStart, rangeEnd, timestamp, l local jobKey = jobKeyPrefix .. job -- * finishedOn says when the job was completed, but it isn't set unless the job has actually completed + -- TODO: check if in completed/failed this is needed, as ZRANGEBYSCORE could bring all the expected jobs jobTS = getTimestamp(jobKey, attributes) - if (not jobTS or jobTS < timestamp) then + if (not jobTS or jobTS <= timestamp) then removeJob(job, true, jobKeyPrefix) deletedCount = deletedCount + 1 table.insert(deleted, job) From 32ee8cf85b542c826dc70b5eb05241010894b134 Mon Sep 17 00:00:00 2001 From: roggervalf Date: Sat, 12 Aug 2023 08:52:24 -0500 Subject: [PATCH 4/6] refactor(clean): set rangeEnd outside cleanSet --- src/commands/cleanJobsInSet-2.lua | 15 +++++++++------ src/commands/includes/cleanSet.lua | 4 ++-- src/commands/includes/getJobsInZset.lua | 13 +++---------- 3 files changed, 14 insertions(+), 18 deletions(-) diff --git a/src/commands/cleanJobsInSet-2.lua b/src/commands/cleanJobsInSet-2.lua index 22cce04a28..1d76e285bc 100644 --- a/src/commands/cleanJobsInSet-2.lua +++ b/src/commands/cleanJobsInSet-2.lua @@ -34,16 +34,19 @@ local result if ARGV[4] == "active" then result = cleanList(KEYS[1], ARGV[1], rangeStart, rangeEnd, ARGV[2], false) elseif ARGV[4] == "delayed" then - result = cleanSet(KEYS[1], ARGV[1], rangeStart, rangeEnd, ARGV[2], limit, - {"processedOn", "timestamp"}, false) + rangeEnd = "+inf" + result = cleanSet(KEYS[1], ARGV[1], rangeEnd, ARGV[2], limit, + {"processedOn", "timestamp"}) elseif ARGV[4] == "prioritized" then - result = cleanSet(KEYS[1], ARGV[1], rangeStart, rangeEnd, ARGV[2], limit, - {"timestamp"}, false) + rangeEnd = "+inf" + result = cleanSet(KEYS[1], ARGV[1], rangeEnd, ARGV[2], limit, + {"timestamp"}) elseif ARGV[4] == "wait" or ARGV[4] == "paused" then result = cleanList(KEYS[1], ARGV[1], rangeStart, rangeEnd, ARGV[2], true) else - result = cleanSet(KEYS[1], ARGV[1], rangeStart, rangeEnd, ARGV[2], limit, - {"finishedOn"}, true ) + rangeEnd = ARGV[2] + result = cleanSet(KEYS[1], ARGV[1], rangeEnd, ARGV[2], limit, + {"finishedOn"}) end rcall("XADD", KEYS[2], "*", "event", "cleaned", "count", result[2]) diff --git a/src/commands/includes/cleanSet.lua b/src/commands/includes/cleanSet.lua index 25b3f685e4..dffeef873c 100644 --- a/src/commands/includes/cleanSet.lua +++ b/src/commands/includes/cleanSet.lua @@ -9,8 +9,8 @@ --- @include "getTimestamp" --- @include "removeJob" -local function cleanSet(setKey, jobKeyPrefix, rangeStart, rangeEnd, timestamp, limit, attributes, useTimestampAsScore) - local jobs = getJobsInZset(setKey, rangeStart, rangeEnd, timestamp, limit, useTimestampAsScore) +local function cleanSet(setKey, jobKeyPrefix, rangeEnd, timestamp, limit, attributes) + local jobs = getJobsInZset(setKey, rangeEnd, limit) local deleted = {} local deletedCount = 0 local jobTS diff --git a/src/commands/includes/getJobsInZset.lua b/src/commands/includes/getJobsInZset.lua index a5cd2bafee..a9f90c3960 100644 --- a/src/commands/includes/getJobsInZset.lua +++ b/src/commands/includes/getJobsInZset.lua @@ -2,17 +2,10 @@ -- of items in a sorted set only run a single iteration. If we simply used -- ZRANGE, we may take a long time traversing through jobs that are within the -- grace period. -local function getJobsInZset(zsetKey, rangeStart, rangeEnd, maxTimestamp, limit, useTimestampAsScore) - local endRange - if useTimestampAsScore then - endRange = maxTimestamp - else - endRange = "+inf" - end - +local function getJobsInZset(zsetKey, rangeEnd, limit) if limit > 0 then - return rcall("ZRANGEBYSCORE", zsetKey, 0, endRange, "LIMIT", 0, limit) + return rcall("ZRANGEBYSCORE", zsetKey, 0, rangeEnd, "LIMIT", 0, limit) else - return rcall("ZRANGEBYSCORE", zsetKey, 0, endRange) + return rcall("ZRANGEBYSCORE", zsetKey, 0, rangeEnd) end end From 35f4a447147d3002787ed4bb8878331c55370289 Mon Sep 17 00:00:00 2001 From: roggervalf Date: Sat, 12 Aug 2023 09:06:01 -0500 Subject: [PATCH 5/6] perf(clean): do not compare timestamp for finished states as we are doing it when getting jobs --- src/commands/cleanJobsInSet-2.lua | 6 +++--- src/commands/includes/cleanSet.lua | 15 ++++++++++----- 2 files changed, 13 insertions(+), 8 deletions(-) diff --git a/src/commands/cleanJobsInSet-2.lua b/src/commands/cleanJobsInSet-2.lua index 1d76e285bc..c33c64eb11 100644 --- a/src/commands/cleanJobsInSet-2.lua +++ b/src/commands/cleanJobsInSet-2.lua @@ -36,17 +36,17 @@ if ARGV[4] == "active" then elseif ARGV[4] == "delayed" then rangeEnd = "+inf" result = cleanSet(KEYS[1], ARGV[1], rangeEnd, ARGV[2], limit, - {"processedOn", "timestamp"}) + {"processedOn", "timestamp"}, false) elseif ARGV[4] == "prioritized" then rangeEnd = "+inf" result = cleanSet(KEYS[1], ARGV[1], rangeEnd, ARGV[2], limit, - {"timestamp"}) + {"timestamp"}, false) elseif ARGV[4] == "wait" or ARGV[4] == "paused" then result = cleanList(KEYS[1], ARGV[1], rangeStart, rangeEnd, ARGV[2], true) else rangeEnd = ARGV[2] result = cleanSet(KEYS[1], ARGV[1], rangeEnd, ARGV[2], limit, - {"finishedOn"}) + {"finishedOn"}, true) end rcall("XADD", KEYS[2], "*", "event", "cleaned", "count", result[2]) diff --git a/src/commands/includes/cleanSet.lua b/src/commands/includes/cleanSet.lua index dffeef873c..f83393d50a 100644 --- a/src/commands/includes/cleanSet.lua +++ b/src/commands/includes/cleanSet.lua @@ -9,7 +9,7 @@ --- @include "getTimestamp" --- @include "removeJob" -local function cleanSet(setKey, jobKeyPrefix, rangeEnd, timestamp, limit, attributes) +local function cleanSet(setKey, jobKeyPrefix, rangeEnd, timestamp, limit, attributes, isFinished) local jobs = getJobsInZset(setKey, rangeEnd, limit) local deleted = {} local deletedCount = 0 @@ -20,13 +20,18 @@ local function cleanSet(setKey, jobKeyPrefix, rangeEnd, timestamp, limit, attrib end local jobKey = jobKeyPrefix .. job - -- * finishedOn says when the job was completed, but it isn't set unless the job has actually completed - -- TODO: check if in completed/failed this is needed, as ZRANGEBYSCORE could bring all the expected jobs - jobTS = getTimestamp(jobKey, attributes) - if (not jobTS or jobTS <= timestamp) then + if isFinished then removeJob(job, true, jobKeyPrefix) deletedCount = deletedCount + 1 table.insert(deleted, job) + else + -- * finishedOn says when the job was completed, but it isn't set unless the job has actually completed + jobTS = getTimestamp(jobKey, attributes) + if (not jobTS or jobTS <= timestamp) then + removeJob(job, true, jobKeyPrefix) + deletedCount = deletedCount + 1 + table.insert(deleted, job) + end end end From 1cb3163b4b2f88b230c2ecb805313a187eb30d63 Mon Sep 17 00:00:00 2001 From: roggervalf Date: Sat, 12 Aug 2023 09:09:51 -0500 Subject: [PATCH 6/6] refactor: consider timestamp value when comparing grace period --- src/commands/includes/cleanList.lua | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/commands/includes/cleanList.lua b/src/commands/includes/cleanList.lua index f7e7a65225..99b397ef53 100644 --- a/src/commands/includes/cleanList.lua +++ b/src/commands/includes/cleanList.lua @@ -30,7 +30,7 @@ local function cleanList(listKey, jobKeyPrefix, rangeStart, rangeEnd, -- Fetch all three of these (in that order) and use the first one that is set so that we'll leave jobs -- that have been active within the grace period: jobTS = getTimestamp(jobKey, {"finishedOn", "processedOn", "timestamp"}) - if (not jobTS or jobTS < timestamp) then + if (not jobTS or jobTS <= timestamp) then -- replace the entry with a deletion marker; the actual deletion will -- occur at the end of the script rcall("LSET", listKey, rangeEnd - jobIdsLen + i, deletionMarker)