From ba77e067a5e367a24f6dba02f26ffa78f0d876b7 Mon Sep 17 00:00:00 2001 From: Manuel Astudillo Date: Tue, 26 Sep 2023 14:50:22 +0200 Subject: [PATCH] refactor: simplify code in moveToActive (#2195) --- src/commands/moveToActive-10.lua | 80 ++++++++++++++------------------ 1 file changed, 34 insertions(+), 46 deletions(-) diff --git a/src/commands/moveToActive-10.lua b/src/commands/moveToActive-10.lua index 51426984ec..1c93db0185 100644 --- a/src/commands/moveToActive-10.lua +++ b/src/commands/moveToActive-10.lua @@ -32,8 +32,12 @@ opts - lockDuration opts - limiter ]] -local jobId local rcall = redis.call +local waitKey = KEYS[1] +local activeKey = KEYS[2] +local rateLimiterKey = KEYS[6] +local delayedKey = KEYS[7] +local opts = cmsgpack.unpack(ARGV[4]) -- Includes --- @include "includes/getNextDelayedTimestamp" @@ -43,73 +47,57 @@ local rcall = redis.call --- @include "includes/prepareJobForProcessing" --- @include "includes/promoteDelayedJobs" -local target, paused = getTargetQueueList(KEYS[9], KEYS[1], KEYS[8]) +local target, paused = getTargetQueueList(KEYS[9], waitKey, KEYS[8]) -- Check if there are delayed jobs that we can move to wait. -promoteDelayedJobs(KEYS[7], KEYS[1], target, KEYS[3], KEYS[4], ARGV[1], ARGV[2], paused, KEYS[10]) +promoteDelayedJobs(delayedKey, waitKey, target, KEYS[3], KEYS[4], ARGV[1], + ARGV[2], paused, KEYS[10]) -local opts = cmsgpack.unpack(ARGV[4]) local maxJobs = tonumber(opts['limiter'] and opts['limiter']['max']) -local expireTime = getRateLimitTTL(maxJobs, KEYS[6]) -if (ARGV[3] ~= "") then - jobId = ARGV[3] - -- clean stalled key - rcall("SREM", KEYS[5], jobId) -else - -- Check if we are rate limited first. - if expireTime > 0 then - return { 0, 0, expireTime, 0 } - end +local expireTime = getRateLimitTTL(maxJobs, rateLimiterKey) - -- paused queue - if paused then return {0, 0, 0, 0} end +local jobId = nil +if ARGV[3] ~= "" then + jobId = ARGV[3] - -- no job ID, try non-blocking move from wait to active - jobId = rcall("RPOPLPUSH", KEYS[1], KEYS[2]) + -- clean stalled key + rcall("SREM", KEYS[5], jobId) end --- If jobId is special ID 0:delay, then there is no job to process -if jobId then - if string.sub(jobId, 1, 2) == "0:" then - rcall("LREM", KEYS[2], 1, jobId) +if not jobId or (jobId and string.sub(jobId, 1, 2) == "0:") then + -- If jobId is special ID 0:delay, then there is no job to process + if jobId then rcall("LREM", activeKey, 1, jobId) end - if expireTime > 0 then - return { 0, 0, expireTime, 0 } - end + -- Check if we are rate limited first. + if expireTime > 0 then return {0, 0, expireTime, 0} end -- paused queue if paused then return {0, 0, 0, 0} end - -- Move again since we just got the marker job. - jobId = rcall("RPOPLPUSH", KEYS[1], KEYS[2]) + -- no job ID, try non-blocking move from wait to active + jobId = rcall("RPOPLPUSH", waitKey, activeKey) -- Since it is possible that between a call to BRPOPLPUSH and moveToActive -- another script puts a new maker in wait, we need to check again. if jobId and string.sub(jobId, 1, 2) == "0:" then - rcall("LREM", KEYS[2], 1, jobId) - jobId = rcall("RPOPLPUSH", KEYS[1], KEYS[2]) + rcall("LREM", activeKey, 1, jobId) + jobId = rcall("RPOPLPUSH", waitKey, activeKey) end - end +end - if jobId then - return prepareJobForProcessing(KEYS, ARGV[1], target, jobId, ARGV[2], maxJobs, expireTime, opts) - else - jobId = moveJobFromPriorityToActive(KEYS[3], KEYS[2], KEYS[10]) +if jobId then + return prepareJobForProcessing(KEYS, ARGV[1], target, jobId, ARGV[2], + maxJobs, expireTime, opts) +else + jobId = moveJobFromPriorityToActive(KEYS[3], activeKey, KEYS[10]) if jobId then - return prepareJobForProcessing(KEYS, ARGV[1], target, jobId, ARGV[2], maxJobs, expireTime, opts) + return prepareJobForProcessing(KEYS, ARGV[1], target, jobId, ARGV[2], + maxJobs, expireTime, opts) end - end -else - jobId = moveJobFromPriorityToActive(KEYS[3], KEYS[2], KEYS[10]) - if jobId then - return prepareJobForProcessing(KEYS, ARGV[1], target, jobId, ARGV[2], maxJobs, expireTime, opts) - end end -- Return the timestamp for the next delayed job if any. -local nextTimestamp = getNextDelayedTimestamp(KEYS[7]) -if (nextTimestamp ~= nil) then - return { 0, 0, 0, nextTimestamp } -end +local nextTimestamp = getNextDelayedTimestamp(delayedKey) +if (nextTimestamp ~= nil) then return {0, 0, 0, nextTimestamp} end -return { 0, 0, 0, 0} +return {0, 0, 0, 0}