Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(stalled): consider adding marker when moving job back to wait #2384

Merged
merged 12 commits into from
Jan 20, 2024
4 changes: 2 additions & 2 deletions python/bullmq/scripts.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ def __init__(self, prefix: str, queueName: str, redisConnection: RedisConnection
"getState": self.redisClient.register_script(self.getScript("getState-8.lua")),
"getStateV2": self.redisClient.register_script(self.getScript("getStateV2-8.lua")),
"isJobInList": self.redisClient.register_script(self.getScript("isJobInList-1.lua")),
"moveStalledJobsToWait": self.redisClient.register_script(self.getScript("moveStalledJobsToWait-8.lua")),
"moveStalledJobsToWait": self.redisClient.register_script(self.getScript("moveStalledJobsToWait-9.lua")),
"moveToActive": self.redisClient.register_script(self.getScript("moveToActive-11.lua")),
"moveToDelayed": self.redisClient.register_script(self.getScript("moveToDelayed-7.lua")),
"moveToFinished": self.redisClient.register_script(self.getScript("moveToFinished-14.lua")),
Expand Down Expand Up @@ -554,7 +554,7 @@ def extendLock(self, jobId: str, token: str, duration: int, client: Redis = None

def moveStalledJobsToWait(self, maxStalledCount: int, stalledInterval: int):
keys = self.getKeys(['stalled', 'wait', 'active', 'failed',
'stalled-check', 'meta', 'paused', 'events'])
'stalled-check', 'meta', 'paused', 'marker', 'events'])
args = [maxStalledCount, self.keys[''], round(
time.time() * 1000), stalledInterval]
return self.commands["moveStalledJobsToWait"](keys, args)
Expand Down
1 change: 1 addition & 0 deletions src/classes/scripts.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1000,6 +1000,7 @@ export class Scripts {
this.queue.keys['stalled-check'],
this.queue.keys.meta,
this.queue.keys.paused,
this.queue.keys.marker,
this.queue.keys.events,
];
const args = [
Expand Down
8 changes: 2 additions & 6 deletions src/commands/addStandardJob-7.lua
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ local parent = args[8]
local parentData

-- Includes
--- @include "includes/addJobInTargetList"
--- @include "includes/getOrSetMaxEvents"
--- @include "includes/getTargetQueueList"
--- @include "includes/storeJob"
Expand Down Expand Up @@ -100,14 +101,9 @@ storeJob(eventsKey, jobIdKey, jobId, args[3], ARGV[2], opts, timestamp,

local target, paused = getTargetQueueList(metaKey, KEYS[1], KEYS[2])

if not paused then
-- mark that a job is available
rcall("ZADD", KEYS[7], 0, "0")
end

-- LIFO or FIFO
local pushCmd = opts['lifo'] and 'RPUSH' or 'LPUSH'
rcall(pushCmd, target, jobId)
addJobInTargetList(target, KEYS[7], pushCmd, paused, jobId)

-- Emit waiting event
rcall("XADD", eventsKey, "MAXLEN", "~", maxEvents, "*", "event", "waiting",
Expand Down
9 changes: 9 additions & 0 deletions src/commands/includes/addBaseMarkerIfNeeded.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
--[[
Add marker if needed when a job is available.
]]

local function addBaseMarkerIfNeeded(markerKey, isPaused)
if not isPaused then
rcall("ZADD", markerKey, 0, "0")
end
end
11 changes: 11 additions & 0 deletions src/commands/includes/addJobInTargetList.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
--[[
Function to add job in target list and add marker if needed.
]]

-- Includes
--- @include "addBaseMarkerIfNeeded"

local function addJobInTargetList(targetKey, markerKey, pushCmd, isPaused, jobId)
rcall(pushCmd, targetKey, jobId)
addBaseMarkerIfNeeded(markerKey, isPaused)
end
7 changes: 4 additions & 3 deletions src/commands/includes/addJobWithPriority.lua
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@
Function to add job considering priority.
]]

-- Includes
--- @include "addBaseMarkerIfNeeded"

local function addJobWithPriority(markerKey, prioritizedKey, priority, jobId, priorityCounterKey, isPaused)
local prioCounter = rcall("INCR", priorityCounterKey)
local score = priority * 0x100000000 + bit.band(prioCounter, 0xffffffffffff)
rcall("ZADD", prioritizedKey, score, jobId)
if not isPaused then
rcall("ZADD", markerKey, 0, "0")
end
addBaseMarkerIfNeeded(markerKey, isPaused)
end
7 changes: 4 additions & 3 deletions src/commands/includes/moveParentToWaitIfNeeded.lua
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@

-- Includes
--- @include "addDelayMarkerIfNeeded"
--- @include "addJobInTargetList"
--- @include "addJobWithPriority"
--- @include "isQueuePaused"
--- @include "getTargetQueueList"

local function moveParentToWaitIfNeeded(parentQueueKey, parentDependenciesKey,
parentKey, parentId, timestamp)
local isParentActive = rcall("ZSCORE",
Expand All @@ -33,11 +35,10 @@ local function moveParentToWaitIfNeeded(parentQueueKey, parentDependenciesKey,
addDelayMarkerIfNeeded(parentMarkerKey, parentDelayedKey)
else
if priority == 0 then
local parentTarget, _paused =
local parentTarget, isParentPaused =
getTargetQueueList(parentMetaKey, parentWaitKey,
parentPausedKey)
rcall("RPUSH", parentTarget, parentId)
rcall("ZADD", parentMarkerKey, 0, "0")
addJobInTargetList(parentTarget, parentMarkerKey, "RPUSH", isParentPaused, parentId)
else
local isPaused = isQueuePaused(parentMetaKey)
addJobWithPriority(parentMarkerKey,
Expand Down
6 changes: 2 additions & 4 deletions src/commands/includes/promoteDelayedJobs.lua
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
]]

-- Includes
--- @include "addJobInTargetList"
--- @include "addJobWithPriority"

-- Try to get as much as 1000 jobs at once
Expand All @@ -24,10 +25,7 @@ local function promoteDelayedJobs(delayedKey, markerKey, targetKey, prioritizedK

if priority == 0 then
-- LIFO or FIFO
rcall("LPUSH", targetKey, jobId)
if not isPaused then
rcall("ZADD", markerKey, 0, "0")
end
addJobInTargetList(targetKey, markerKey, "LPUSH", isPaused, jobId)
else
addJobWithPriority(markerKey, prioritizedKey, priority,
jobId, priorityCounterKey, isPaused)
Expand Down
5 changes: 2 additions & 3 deletions src/commands/moveJobsToWait-7.lua
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ local timestamp = tonumber(ARGV[2])
local rcall = redis.call;

-- Includes
--- @include "includes/addBaseMarkerIfNeeded"
--- @include "includes/batches"
--- @include "includes/getOrSetMaxEvents"
--- @include "includes/getTargetQueueList"
Expand Down Expand Up @@ -61,9 +62,7 @@ if (#jobs > 0) then
rcall("LPUSH", target, unpack(jobs, from, to))
end

if not paused then
rcall("ZADD", KEYS[7], 0, "0")
end
addBaseMarkerIfNeeded(KEYS[7], paused)
end

maxCount = maxCount - #jobs
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
--[[
Move stalled jobs to wait.

Input:
KEYS[1] 'stalled' (SET)
KEYS[2] 'wait', (LIST)
Expand All @@ -8,19 +9,22 @@
KEYS[5] 'stalled-check', (KEY)
KEYS[6] 'meta', (KEY)
KEYS[7] 'paused', (LIST)
KEYS[8] 'event stream' (STREAM)
KEYS[8] 'marker'
KEYS[9] 'event stream' (STREAM)

ARGV[1] Max stalled job count
ARGV[2] queue.toKey('')
ARGV[3] timestamp
ARGV[4] max check time

Events:
'stalled' with stalled job id.
]]

local rcall = redis.call

-- Includes
--- @include "includes/addJobInTargetList"
--- @include "includes/batches"
--- @include "includes/getTargetQueueList"
--- @include "includes/removeJob"
Expand All @@ -35,7 +39,8 @@ local failedKey = KEYS[4]
local stalledCheckKey = KEYS[5]
local metaKey = KEYS[6]
local pausedKey = KEYS[7]
local eventStreamKey = KEYS[8]
local markerKey = KEYS[8]
local eventStreamKey = KEYS[9]
local maxStalledJobCount = ARGV[1]
local queueKeyPrefix = ARGV[2]
local timestamp = ARGV[3]
Expand Down Expand Up @@ -113,11 +118,12 @@ if (#stalling > 0) then

table.insert(failed, jobId)
else
local target =
local target, isPaused=
getTargetQueueList(metaKey, waitKey, pausedKey)

-- Move the job back to the wait queue, to immediately be picked up by a waiting worker.
rcall("RPUSH", target, jobId)
addJobInTargetList(target, markerKey, "RPUSH", isPaused, jobId)

rcall("XADD", eventStreamKey, "*", "event",
"waiting", "jobId", jobId, 'prev', 'active')

Expand Down
4 changes: 2 additions & 2 deletions src/commands/promote-8.lua
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ local rcall = redis.call
local jobId = ARGV[2]

-- Includes
--- @include "includes/addJobInTargetList"
--- @include "includes/addJobWithPriority"
--- @include "includes/getTargetQueueList"

Expand All @@ -42,8 +43,7 @@ if rcall("ZREM", KEYS[1], jobId) == 1 then

if priority == 0 then
-- LIFO or FIFO
rcall("LPUSH", target, jobId)
if not paused then rcall("ZADD", KEYS[8], 0, "0") end
addJobInTargetList(target, KEYS[8], "LPUSH", paused, jobId)
else
addJobWithPriority(KEYS[8], KEYS[5], priority, jobId, KEYS[6], paused)
end
Expand Down
4 changes: 2 additions & 2 deletions tests/test_stalled_jobs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ describe('stalled jobs', function () {
});

it('process stalled jobs when starting a queue', async function () {
this.timeout(10000);
this.timeout(5000);
Copy link
Collaborator Author

@roggervalf roggervalf Jan 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these timeout changes are done because these tests will fail if markers are not added when stalled jobs are moved to wait


const queueEvents = new QueueEvents(queueName, { connection, prefix });
await queueEvents.waitUntilReady();
Expand Down Expand Up @@ -298,7 +298,7 @@ describe('stalled jobs', function () {
});

it('moves jobs to failed with maxStalledCount > 1', async function () {
this.timeout(60000);
this.timeout(8000);

const queueEvents = new QueueEvents(queueName, { connection, prefix });
await queueEvents.waitUntilReady();
Expand Down