Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf(stalled): remove jobId from stalled after removing lock when moved from active #2512

Merged
merged 4 commits into from
Apr 7, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 7 additions & 4 deletions python/bullmq/scripts.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,15 +45,15 @@ def __init__(self, prefix: str, queueName: str, redisConnection: RedisConnection
"isJobInList": self.redisClient.register_script(self.getScript("isJobInList-1.lua")),
"moveStalledJobsToWait": self.redisClient.register_script(self.getScript("moveStalledJobsToWait-9.lua")),
"moveToActive": self.redisClient.register_script(self.getScript("moveToActive-11.lua")),
"moveToDelayed": self.redisClient.register_script(self.getScript("moveToDelayed-7.lua")),
"moveToDelayed": self.redisClient.register_script(self.getScript("moveToDelayed-8.lua")),
"moveToFinished": self.redisClient.register_script(self.getScript("moveToFinished-14.lua")),
"moveToWaitingChildren": self.redisClient.register_script(self.getScript("moveToWaitingChildren-4.lua")),
"moveToWaitingChildren": self.redisClient.register_script(self.getScript("moveToWaitingChildren-5.lua")),
"obliterate": self.redisClient.register_script(self.getScript("obliterate-2.lua")),
"pause": self.redisClient.register_script(self.getScript("pause-7.lua")),
"promote": self.redisClient.register_script(self.getScript("promote-8.lua")),
"removeJob": self.redisClient.register_script(self.getScript("removeJob-1.lua")),
"reprocessJob": self.redisClient.register_script(self.getScript("reprocessJob-7.lua")),
"retryJob": self.redisClient.register_script(self.getScript("retryJob-10.lua")),
"retryJob": self.redisClient.register_script(self.getScript("retryJob-11.lua")),
"moveJobsToWait": self.redisClient.register_script(self.getScript("moveJobsToWait-7.lua")),
"saveStacktrace": self.redisClient.register_script(self.getScript("saveStacktrace-1.lua")),
"updateData": self.redisClient.register_script(self.getScript("updateData-1.lua")),
Expand Down Expand Up @@ -171,7 +171,8 @@ def moveToWaitingChildrenArgs(self, job_id, token, opts: dict = {}):
keys = [self.toKey(job_id) + ":lock",
self.keys['active'],
self.keys['waiting-children'],
self.toKey(job_id)]
self.toKey(job_id),
self.keys['stalled']]
child_key = opts.get("child") if opts else None
args = [token, get_parent_key(child_key) or "", round(time.time() * 1000), job_id,
"1" if opts.get("skipAttempt") else "0"]
Expand Down Expand Up @@ -251,6 +252,7 @@ def retryJobArgs(self, job_id: str, lifo: bool, token: str, opts: dict = {}):
keys.append(self.keys['prioritized'])
keys.append(self.keys['pc'])
keys.append(self.keys['marker'])
keys.append(self.keys['stalled'])

push_cmd = "RPUSH" if lifo else "LPUSH"

Expand All @@ -269,6 +271,7 @@ def moveToDelayedArgs(self, job_id: str, timestamp: int, token: str, delay: int
keys.append(self.toKey(job_id))
keys.append(self.keys['events'])
keys.append(self.keys['meta'])
keys.append(self.keys['stalled'])

args = [self.keys[''], round(time.time() * 1000), str(max_timestamp),
job_id, token, delay, "1" if opts.get("skipAttempt") else "0" ]
Expand Down
16 changes: 11 additions & 5 deletions src/classes/scripts.ts
Original file line number Diff line number Diff line change
Expand Up @@ -778,6 +778,7 @@ export class Scripts {
this.queue.toKey(jobId),
queueKeys.events,
queueKeys.meta,
queueKeys.stalled,
];

return keys.concat([
Expand Down Expand Up @@ -810,11 +811,15 @@ export class Scripts {

const childKey = getParentKey(opts.child);

const keys = [`${jobId}:lock`, 'active', 'waiting-children', jobId].map(
name => {
return this.queue.toKey(name);
},
);
const keys = [
`${jobId}:lock`,
'active',
'waiting-children',
jobId,
'stalled',
].map(name => {
return this.queue.toKey(name);
});

return keys.concat([
token,
Expand Down Expand Up @@ -919,6 +924,7 @@ export class Scripts {
this.queue.keys.prioritized,
this.queue.keys.pc,
this.queue.keys.marker,
this.queue.keys.stalled,
];

const pushCmd = (lifo ? 'R' : 'L') + 'PUSH';
Expand Down
19 changes: 19 additions & 0 deletions src/commands/includes/removeLockToken.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
local function removeLockToken(jobKey, stalledKey, token, jobId)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the name should just be "removeLock"

if token ~= "0" then
local lockKey = jobKey .. ':lock'
local lockToken = rcall("GET", lockKey)
if lockToken == token then
rcall("DEL", lockKey)
rcall("SREM", stalledKey, jobId)
else
if lockToken then
-- Lock exists but token does not match
return -6
else
-- Lock is missing completely
return -2
end
end
end
return 0
end
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
KEYS[5] job key
KEYS[6] events stream
KEYS[7] meta key
KEYS[8] stalled key

ARGV[1] key prefix
ARGV[2] timestamp
Expand All @@ -32,20 +33,18 @@ local rcall = redis.call
--- @include "includes/addDelayMarkerIfNeeded"
--- @include "includes/getOrSetMaxEvents"
--- @include "includes/isQueuePaused"
--- @include "includes/removeLockToken"

local jobKey = KEYS[5]
local metaKey = KEYS[7]
local token = ARGV[5]
if rcall("EXISTS", jobKey) == 1 then
local delayedKey = KEYS[4]
if ARGV[5] ~= "0" then
local lockKey = jobKey .. ':lock'
if rcall("GET", lockKey) == ARGV[5] then
rcall("DEL", lockKey)
else
return -2
end
local errorCode = removeLockToken(jobKey, KEYS[8], token, ARGV[4])
if errorCode < 0 then
return errorCode
end

local delayedKey = KEYS[4]
local jobId = ARGV[4]
local score = tonumber(ARGV[3])
local delayedTimestamp = (score / 0x1000)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
KEYS[2] active key
KEYS[3] waitChildrenKey key
KEYS[4] job key
KEYS[5] stalled key

ARGV[1] token
ARGV[2] child key
Expand All @@ -21,16 +22,11 @@
]]
local rcall = redis.call

local function moveToWaitingChildren (activeKey, waitingChildrenKey, jobId, timestamp,
lockKey, jobKey, token)
if token ~= "0" then
if rcall("GET", lockKey) == token then
rcall("DEL", lockKey)
else
return -2
end
end
-- Includes
--- @include "includes/removeLockToken"

local function moveToWaitingChildren (activeKey, waitingChildrenKey, jobId,
timestamp)
local score = tonumber(timestamp)

local numRemovedElements = rcall("LREM", activeKey, -1, jobId)
Expand All @@ -47,15 +43,21 @@ end
if rcall("EXISTS", KEYS[4]) == 1 then
if ARGV[2] ~= "" then
if rcall("SISMEMBER", KEYS[4] .. ":dependencies", ARGV[2]) ~= 0 then
return moveToWaitingChildren(KEYS[2], KEYS[3], ARGV[4], ARGV[3], KEYS[1], KEYS[4],
ARGV[1])
local errorCode = removeLockToken(KEYS[4], KEYS[5], ARGV[1], ARGV[4])
if errorCode < 0 then
return errorCode
end
return moveToWaitingChildren(KEYS[2], KEYS[3], ARGV[4], ARGV[3])
end

return 1
else
if rcall("SCARD", KEYS[4] .. ":dependencies") ~= 0 then
return moveToWaitingChildren(KEYS[2], KEYS[3], ARGV[4], ARGV[3], KEYS[1], KEYS[4],
ARGV[1])
local errorCode = removeLockToken(KEYS[4], KEYS[5], ARGV[1], ARGV[4])
if errorCode < 0 then
return errorCode
end
return moveToWaitingChildren(KEYS[2], KEYS[3], ARGV[4], ARGV[3])
end

return 1
Expand Down
13 changes: 5 additions & 8 deletions src/commands/retryJob-10.lua → src/commands/retryJob-11.lua
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
KEYS[8] prioritized key
KEYS[9] 'pc' priority counter
KEYS[10] 'marker'
KEYS[11] 'stalled'

ARGV[1] key prefix
ARGV[2] timestamp
Expand All @@ -34,6 +35,7 @@ local rcall = redis.call
--- @include "includes/getOrSetMaxEvents"
--- @include "includes/getTargetQueueList"
--- @include "includes/promoteDelayedJobs"
--- @include "includes/removeLockToken"

local target, paused = getTargetQueueList(KEYS[5], KEYS[2], KEYS[3])
local markerKey = KEYS[10]
Expand All @@ -43,14 +45,9 @@ local markerKey = KEYS[10]
promoteDelayedJobs(KEYS[7], markerKey, target, KEYS[8], KEYS[6], ARGV[1], ARGV[2], KEYS[9], paused)

if rcall("EXISTS", KEYS[4]) == 1 then

if ARGV[5] ~= "0" then
local lockKey = KEYS[4] .. ':lock'
if rcall("GET", lockKey) == ARGV[5] then
rcall("DEL", lockKey)
else
return -2
end
local errorCode = removeLockToken(KEYS[4], KEYS[11], ARGV[5], ARGV[4])
if errorCode < 0 then
return errorCode
end

rcall("LREM", KEYS[1], 0, ARGV[4])
Expand Down
Loading