Skip to content

Commit

Permalink
refactor: simplify code in moveToActive (#2195)
Browse files Browse the repository at this point in the history
  • Loading branch information
manast authored Sep 26, 2023
1 parent b886d1d commit ba77e06
Showing 1 changed file with 34 additions and 46 deletions.
80 changes: 34 additions & 46 deletions src/commands/moveToActive-10.lua
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,12 @@
opts - lockDuration
opts - limiter
]]
local jobId
local rcall = redis.call
local waitKey = KEYS[1]
local activeKey = KEYS[2]
local rateLimiterKey = KEYS[6]
local delayedKey = KEYS[7]
local opts = cmsgpack.unpack(ARGV[4])

-- Includes
--- @include "includes/getNextDelayedTimestamp"
Expand All @@ -43,73 +47,57 @@ local rcall = redis.call
--- @include "includes/prepareJobForProcessing"
--- @include "includes/promoteDelayedJobs"

local target, paused = getTargetQueueList(KEYS[9], KEYS[1], KEYS[8])
local target, paused = getTargetQueueList(KEYS[9], waitKey, KEYS[8])

-- Check if there are delayed jobs that we can move to wait.
promoteDelayedJobs(KEYS[7], KEYS[1], target, KEYS[3], KEYS[4], ARGV[1], ARGV[2], paused, KEYS[10])
promoteDelayedJobs(delayedKey, waitKey, target, KEYS[3], KEYS[4], ARGV[1],
ARGV[2], paused, KEYS[10])

local opts = cmsgpack.unpack(ARGV[4])
local maxJobs = tonumber(opts['limiter'] and opts['limiter']['max'])
local expireTime = getRateLimitTTL(maxJobs, KEYS[6])
if (ARGV[3] ~= "") then
jobId = ARGV[3]
-- clean stalled key
rcall("SREM", KEYS[5], jobId)
else
-- Check if we are rate limited first.
if expireTime > 0 then
return { 0, 0, expireTime, 0 }
end
local expireTime = getRateLimitTTL(maxJobs, rateLimiterKey)

-- paused queue
if paused then return {0, 0, 0, 0} end
local jobId = nil
if ARGV[3] ~= "" then
jobId = ARGV[3]

-- no job ID, try non-blocking move from wait to active
jobId = rcall("RPOPLPUSH", KEYS[1], KEYS[2])
-- clean stalled key
rcall("SREM", KEYS[5], jobId)
end

-- If jobId is special ID 0:delay, then there is no job to process
if jobId then
if string.sub(jobId, 1, 2) == "0:" then
rcall("LREM", KEYS[2], 1, jobId)
if not jobId or (jobId and string.sub(jobId, 1, 2) == "0:") then
-- If jobId is special ID 0:delay, then there is no job to process
if jobId then rcall("LREM", activeKey, 1, jobId) end

if expireTime > 0 then
return { 0, 0, expireTime, 0 }
end
-- Check if we are rate limited first.
if expireTime > 0 then return {0, 0, expireTime, 0} end

-- paused queue
if paused then return {0, 0, 0, 0} end

-- Move again since we just got the marker job.
jobId = rcall("RPOPLPUSH", KEYS[1], KEYS[2])
-- no job ID, try non-blocking move from wait to active
jobId = rcall("RPOPLPUSH", waitKey, activeKey)

-- Since it is possible that between a call to BRPOPLPUSH and moveToActive
-- another script puts a new maker in wait, we need to check again.
if jobId and string.sub(jobId, 1, 2) == "0:" then
rcall("LREM", KEYS[2], 1, jobId)
jobId = rcall("RPOPLPUSH", KEYS[1], KEYS[2])
rcall("LREM", activeKey, 1, jobId)
jobId = rcall("RPOPLPUSH", waitKey, activeKey)
end
end
end

if jobId then
return prepareJobForProcessing(KEYS, ARGV[1], target, jobId, ARGV[2], maxJobs, expireTime, opts)
else
jobId = moveJobFromPriorityToActive(KEYS[3], KEYS[2], KEYS[10])
if jobId then
return prepareJobForProcessing(KEYS, ARGV[1], target, jobId, ARGV[2],
maxJobs, expireTime, opts)
else
jobId = moveJobFromPriorityToActive(KEYS[3], activeKey, KEYS[10])
if jobId then
return prepareJobForProcessing(KEYS, ARGV[1], target, jobId, ARGV[2], maxJobs, expireTime, opts)
return prepareJobForProcessing(KEYS, ARGV[1], target, jobId, ARGV[2],
maxJobs, expireTime, opts)
end
end
else
jobId = moveJobFromPriorityToActive(KEYS[3], KEYS[2], KEYS[10])
if jobId then
return prepareJobForProcessing(KEYS, ARGV[1], target, jobId, ARGV[2], maxJobs, expireTime, opts)
end
end

-- Return the timestamp for the next delayed job if any.
local nextTimestamp = getNextDelayedTimestamp(KEYS[7])
if (nextTimestamp ~= nil) then
return { 0, 0, 0, nextTimestamp }
end
local nextTimestamp = getNextDelayedTimestamp(delayedKey)
if (nextTimestamp ~= nil) then return {0, 0, 0, nextTimestamp} end

return { 0, 0, 0, 0}
return {0, 0, 0, 0}

0 comments on commit ba77e06

Please sign in to comment.