Skip to content

Commit

Permalink
fix: remove jobId from stalled when it's moved from active
Browse files Browse the repository at this point in the history
  • Loading branch information
roggervalf committed Apr 7, 2024
1 parent fe4a4e0 commit 3a694da
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 20 deletions.
8 changes: 5 additions & 3 deletions python/bullmq/scripts.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,13 @@ def __init__(self, prefix: str, queueName: str, redisConnection: RedisConnection
"moveToActive": self.redisClient.register_script(self.getScript("moveToActive-11.lua")),
"moveToDelayed": self.redisClient.register_script(self.getScript("moveToDelayed-8.lua")),
"moveToFinished": self.redisClient.register_script(self.getScript("moveToFinished-14.lua")),
"moveToWaitingChildren": self.redisClient.register_script(self.getScript("moveToWaitingChildren-4.lua")),
"moveToWaitingChildren": self.redisClient.register_script(self.getScript("moveToWaitingChildren-5.lua")),
"obliterate": self.redisClient.register_script(self.getScript("obliterate-2.lua")),
"pause": self.redisClient.register_script(self.getScript("pause-7.lua")),
"promote": self.redisClient.register_script(self.getScript("promote-8.lua")),
"removeJob": self.redisClient.register_script(self.getScript("removeJob-1.lua")),
"reprocessJob": self.redisClient.register_script(self.getScript("reprocessJob-7.lua")),
"retryJob": self.redisClient.register_script(self.getScript("retryJob-10.lua")),
"retryJob": self.redisClient.register_script(self.getScript("retryJob-11.lua")),
"moveJobsToWait": self.redisClient.register_script(self.getScript("moveJobsToWait-7.lua")),
"saveStacktrace": self.redisClient.register_script(self.getScript("saveStacktrace-1.lua")),
"updateData": self.redisClient.register_script(self.getScript("updateData-1.lua")),
Expand Down Expand Up @@ -171,7 +171,8 @@ def moveToWaitingChildrenArgs(self, job_id, token, opts: dict = {}):
keys = [self.toKey(job_id) + ":lock",
self.keys['active'],
self.keys['waiting-children'],
self.toKey(job_id)]
self.toKey(job_id),
self.keys['stalled']]
child_key = opts.get("child") if opts else None
args = [token, get_parent_key(child_key) or "", round(time.time() * 1000), job_id,
"1" if opts.get("skipAttempt") else "0"]
Expand Down Expand Up @@ -251,6 +252,7 @@ def retryJobArgs(self, job_id: str, lifo: bool, token: str, opts: dict = {}):
keys.append(self.keys['prioritized'])
keys.append(self.keys['pc'])
keys.append(self.keys['marker'])
keys.append(self.keys['stalled'])

push_cmd = "RPUSH" if lifo else "LPUSH"

Expand Down
15 changes: 10 additions & 5 deletions src/classes/scripts.ts
Original file line number Diff line number Diff line change
Expand Up @@ -811,11 +811,15 @@ export class Scripts {

const childKey = getParentKey(opts.child);

const keys = [`${jobId}:lock`, 'active', 'waiting-children', jobId].map(
name => {
return this.queue.toKey(name);
},
);
const keys = [
`${jobId}:lock`,
'active',
'waiting-children',
jobId,
'stalled',
].map(name => {
return this.queue.toKey(name);
});

return keys.concat([
token,
Expand Down Expand Up @@ -920,6 +924,7 @@ export class Scripts {
this.queue.keys.prioritized,
this.queue.keys.pc,
this.queue.keys.marker,
this.queue.keys.stalled,
];

const pushCmd = (lifo ? 'R' : 'L') + 'PUSH';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
KEYS[2] active key
KEYS[3] waitChildrenKey key
KEYS[4] job key
KEYS[5] stalled key
ARGV[1] token
ARGV[2] child key
Expand All @@ -21,13 +22,21 @@
]]
local rcall = redis.call

local function moveToWaitingChildren (activeKey, waitingChildrenKey, jobId, timestamp,
lockKey, jobKey, token)
local function moveToWaitingChildren (activeKey, waitingChildrenKey, jobId,
timestamp, stalledKey, lockKey, jobKey, token)
if token ~= "0" then
if rcall("GET", lockKey) == token then
local lockToken = rcall("GET", lockKey)
if lockToken == token then
rcall("DEL", lockKey)
rcall("SREM", stalledKey, jobId)
else
return -2
if lockToken then
-- Lock exists but token does not match
return -6
else
-- Lock is missing completely
return -2
end
end
end

Expand All @@ -47,15 +56,15 @@ end
if rcall("EXISTS", KEYS[4]) == 1 then
if ARGV[2] ~= "" then
if rcall("SISMEMBER", KEYS[4] .. ":dependencies", ARGV[2]) ~= 0 then
return moveToWaitingChildren(KEYS[2], KEYS[3], ARGV[4], ARGV[3], KEYS[1], KEYS[4],
ARGV[1])
return moveToWaitingChildren(KEYS[2], KEYS[3], ARGV[4], ARGV[3], KEYS[5], KEYS[1],
KEYS[4], ARGV[1])
end

return 1
else
if rcall("SCARD", KEYS[4] .. ":dependencies") ~= 0 then
return moveToWaitingChildren(KEYS[2], KEYS[3], ARGV[4], ARGV[3], KEYS[1], KEYS[4],
ARGV[1])
return moveToWaitingChildren(KEYS[2], KEYS[3], ARGV[4], ARGV[3], KEYS[5], KEYS[1],
KEYS[4], ARGV[1])
end

return 1
Expand Down
18 changes: 14 additions & 4 deletions src/commands/retryJob-10.lua → src/commands/retryJob-11.lua
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
KEYS[8] prioritized key
KEYS[9] 'pc' priority counter
KEYS[10] 'marker'
KEYS[11] 'stalled'
ARGV[1] key prefix
ARGV[2] timestamp
Expand Down Expand Up @@ -43,13 +44,22 @@ local markerKey = KEYS[10]
promoteDelayedJobs(KEYS[7], markerKey, target, KEYS[8], KEYS[6], ARGV[1], ARGV[2], KEYS[9], paused)

if rcall("EXISTS", KEYS[4]) == 1 then

if ARGV[5] ~= "0" then
--TODO: to be refactored using an include
local token = ARGV[5]
if token ~= "0" then
local lockKey = KEYS[4] .. ':lock'
if rcall("GET", lockKey) == ARGV[5] then
local lockToken = rcall("GET", lockKey)
if locakToken == token then
rcall("DEL", lockKey)
rcall("SREM", KEYS[11], ARGV[4])
else
return -2
if lockToken then
-- Lock exists but token does not match
return -6
else
-- Lock is missing completely
return -2
end
end
end

Expand Down

0 comments on commit 3a694da

Please sign in to comment.