Skip to content

Commit

Permalink
chore: pass marker score
Browse files Browse the repository at this point in the history
  • Loading branch information
roggervalf committed Oct 25, 2024
1 parent e544be3 commit 9f629d8
Show file tree
Hide file tree
Showing 15 changed files with 27 additions and 23 deletions.
5 changes: 4 additions & 1 deletion src/commands/addPrioritizedJob-8.lua
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
[8] parent? {id, queueKey}
[9] repeat job key
[10] deduplication key
[11] marker count
ARGV[2] Json stringified job data
ARGV[3] msgpacked options
Expand Down Expand Up @@ -55,6 +56,7 @@ local parentKey = args[5]
local parent = args[8]
local repeatJobKey = args[9]
local deduplicationKey = args[10]
local markerCount = args[11]
local parentData

-- Includes
Expand Down Expand Up @@ -103,7 +105,8 @@ local delay, priority = storeJob(eventsKey, jobIdKey, jobId, args[3], ARGV[2],

-- Add the job to the prioritized set
local isPausedOrMaxed = isQueuePausedOrMaxed(metaKey, activeKey)
addJobWithPriority( KEYS[1], priorityKey, priority, jobId, priorityCounterKey, isPausedOrMaxed)
local markerScore = (jobCounter or 1) % (markerCount or 1)
addJobWithPriority( KEYS[1], priorityKey, priority, jobId, priorityCounterKey, isPausedOrMaxed, markerScore)

-- Emit waiting event
rcall("XADD", eventsKey, "MAXLEN", "~", maxEvents, "*", "event", "waiting",
Expand Down
3 changes: 2 additions & 1 deletion src/commands/addStandardJob-8.lua
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,8 @@ local target, isPausedOrMaxed = getTargetQueueList(metaKey, KEYS[6], KEYS[1], KE

-- LIFO or FIFO
local pushCmd = opts['lifo'] and 'RPUSH' or 'LPUSH'
addJobInTargetList(target, KEYS[8], pushCmd, isPausedOrMaxed, jobId, jobCounter, markerCount)
local markerScore = (jobCounter or 1) % (markerCount or 1)
addJobInTargetList(target, KEYS[8], pushCmd, isPausedOrMaxed, jobId, markerScore)

-- Emit waiting event
rcall("XADD", eventsKey, "MAXLEN", "~", maxEvents, "*", "event", "waiting",
Expand Down
4 changes: 2 additions & 2 deletions src/commands/changePriority-7.lua
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,13 @@ local function reAddJobWithNewPriority( prioritizedKey, markerKey, targetKey,
priorityCounter, lifo, priority, jobId, isPausedOrMaxed)
if priority == 0 then
local pushCmd = lifo and 'RPUSH' or 'LPUSH'
addJobInTargetList(targetKey, markerKey, pushCmd, isPausedOrMaxed, jobId)
addJobInTargetList(targetKey, markerKey, pushCmd, isPausedOrMaxed, jobId, '0')
else
if lifo then
pushBackJobWithPriority(prioritizedKey, priority, jobId)
else
addJobWithPriority(markerKey, prioritizedKey, priority, jobId,
priorityCounter, isPausedOrMaxed)
priorityCounter, isPausedOrMaxed, '0')
end
end
end
Expand Down
4 changes: 2 additions & 2 deletions src/commands/includes/addBaseMarkerIfNeeded.lua
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@
Add marker if needed when a job is available.
]]

local function addBaseMarkerIfNeeded(markerKey, isPausedOrMaxed, jobCounter, markerCount)
local function addBaseMarkerIfNeeded(markerKey, isPausedOrMaxed, markerScore)
if not isPausedOrMaxed then
rcall("ZADD", markerKey, (jobCounter or 1) % (markerCount or 1), "0")
rcall("ZADD", markerKey, markerScore, "0")
end
end
4 changes: 2 additions & 2 deletions src/commands/includes/addJobInTargetList.lua
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
-- Includes
--- @include "addBaseMarkerIfNeeded"

local function addJobInTargetList(targetKey, markerKey, pushCmd, isPausedOrMaxed, jobId, jobCounter, markerCount)
local function addJobInTargetList(targetKey, markerKey, pushCmd, isPausedOrMaxed, jobId, markerScore)
rcall(pushCmd, targetKey, jobId)
addBaseMarkerIfNeeded(markerKey, isPausedOrMaxed, jobCounter, markerCount)
addBaseMarkerIfNeeded(markerKey, isPausedOrMaxed, markerScore)
end
4 changes: 2 additions & 2 deletions src/commands/includes/addJobWithPriority.lua
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@
--- @include "addBaseMarkerIfNeeded"

local function addJobWithPriority(markerKey, prioritizedKey, priority, jobId, priorityCounterKey,
isPausedOrMaxed)
isPausedOrMaxed, markerScore)
local prioCounter = rcall("INCR", priorityCounterKey)
local score = priority * 0x100000000 + prioCounter % 0x100000000
rcall("ZADD", prioritizedKey, score, jobId)
addBaseMarkerIfNeeded(markerKey, isPausedOrMaxed)
addBaseMarkerIfNeeded(markerKey, isPausedOrMaxed, markerScore)
end
4 changes: 2 additions & 2 deletions src/commands/includes/moveParentToWaitIfNeeded.lua
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,12 @@ local function moveParentToWaitIfNeeded(parentQueueKey, parentDependenciesKey,
getTargetQueueList(parentMetaKey, parentActiveKey, parentWaitKey,
parentPausedKey)
addJobInTargetList(parentTarget, parentMarkerKey, "RPUSH", isParentPausedOrMaxed,
parentId)
parentId, '0')
else
local isPausedOrMaxed = isQueuePausedOrMaxed(parentMetaKey, parentActiveKey)
addJobWithPriority(parentMarkerKey,
parentQueueKey .. ":prioritized", priority,
parentId, parentQueueKey .. ":pc", isPausedOrMaxed)
parentId, parentQueueKey .. ":pc", isPausedOrMaxed, '0')
end

rcall("XADD", parentQueueKey .. ":events", "*", "event", "waiting",
Expand Down
4 changes: 2 additions & 2 deletions src/commands/includes/promoteDelayedJobs.lua
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@ local function promoteDelayedJobs(delayedKey, markerKey, targetKey, prioritizedK

if priority == 0 then
-- LIFO or FIFO
addJobInTargetList(targetKey, markerKey, "LPUSH", isPaused, jobId)
addJobInTargetList(targetKey, markerKey, "LPUSH", isPaused, jobId, '0')
else
addJobWithPriority(markerKey, prioritizedKey, priority,
jobId, priorityCounterKey, isPaused)
jobId, priorityCounterKey, isPaused, '0')
end

-- Emit waiting event
Expand Down
2 changes: 1 addition & 1 deletion src/commands/includes/removeParentDependencyKey.lua
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
local function moveParentToWait(parentPrefix, parentId, emitEvent)
local parentTarget, isPausedOrMaxed = getTargetQueueList(parentPrefix .. "meta", parentPrefix .. "active",
parentPrefix .. "wait", parentPrefix .. "paused")
addJobInTargetList(parentTarget, parentPrefix .. "marker", "RPUSH", isPausedOrMaxed, parentId)
addJobInTargetList(parentTarget, parentPrefix .. "marker", "RPUSH", isPausedOrMaxed, parentId, '0')

if emitEvent then
local parentEventStream = parentPrefix .. "events"
Expand Down
2 changes: 1 addition & 1 deletion src/commands/moveJobFromActiveToWait-10.lua
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ if lockToken == token then
if priority > 0 then
pushBackJobWithPriority(KEYS[8], priority, jobId)
else
addJobInTargetList(target, KEYS[9], "RPUSH", isPausedOrMaxed, jobId)
addJobInTargetList(target, KEYS[9], "RPUSH", isPausedOrMaxed, jobId, '0')
end

rcall("DEL", lockKey)
Expand Down
2 changes: 1 addition & 1 deletion src/commands/moveJobsToWait-8.lua
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ if (#jobs > 0) then
rcall("LPUSH", target, unpack(jobs, from, to))
end

addBaseMarkerIfNeeded(KEYS[8], isPausedOrMaxed)
addBaseMarkerIfNeeded(KEYS[8], isPausedOrMaxed, '0')
end

maxCount = maxCount - #jobs
Expand Down
2 changes: 1 addition & 1 deletion src/commands/moveStalledJobsToWait-9.lua
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ if (#stalling > 0) then
getTargetQueueList(metaKey, activeKey, waitKey, pausedKey)

-- Move the job back to the wait queue, to immediately be picked up by a waiting worker.
addJobInTargetList(target, markerKey, "RPUSH", isPausedOrMaxed, jobId)
addJobInTargetList(target, markerKey, "RPUSH", isPausedOrMaxed, jobId, '0')

rcall("XADD", eventStreamKey, "*", "event",
"waiting", "jobId", jobId, 'prev', 'active')
Expand Down
4 changes: 2 additions & 2 deletions src/commands/promote-9.lua
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,9 @@ if rcall("ZREM", KEYS[1], jobId) == 1 then

if priority == 0 then
-- LIFO or FIFO
addJobInTargetList(target, markerKey, "LPUSH", isPausedOrMaxed, jobId)
addJobInTargetList(target, markerKey, "LPUSH", isPausedOrMaxed, jobId, '0')
else
addJobWithPriority(markerKey, KEYS[5], priority, jobId, KEYS[7], isPausedOrMaxed)
addJobWithPriority(markerKey, KEYS[5], priority, jobId, KEYS[7], isPausedOrMaxed, '0')
end

-- Emit waiting event (wait..ing@token)
Expand Down
2 changes: 1 addition & 1 deletion src/commands/reprocessJob-8.lua
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ if rcall("EXISTS", KEYS[1]) == 1 then
rcall("HDEL", KEYS[1], "finishedOn", "processedOn", ARGV[3])

local target, isPausedOrMaxed = getTargetQueueList(KEYS[5], KEYS[7], KEYS[4], KEYS[6])
addJobInTargetList(target, KEYS[8], ARGV[2], isPausedOrMaxed, jobId)
addJobInTargetList(target, KEYS[8], ARGV[2], isPausedOrMaxed, jobId, '0')

local maxEvents = getOrSetMaxEvents(KEYS[5])
-- Emit waiting event
Expand Down
4 changes: 2 additions & 2 deletions src/commands/retryJob-11.lua
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,9 @@ if rcall("EXISTS", KEYS[4]) == 1 then

-- Standard or priority add
if priority == 0 then
addJobInTargetList(target, markerKey, ARGV[3], isPausedOrMaxed, ARGV[4])
addJobInTargetList(target, markerKey, ARGV[3], isPausedOrMaxed, ARGV[4], '0')
else
addJobWithPriority(markerKey, KEYS[8], priority, ARGV[4], KEYS[9], isPausedOrMaxed)
addJobWithPriority(markerKey, KEYS[8], priority, ARGV[4], KEYS[9], isPausedOrMaxed, '0')
end

rcall("HINCRBY", KEYS[4], "atm", 1)
Expand Down

0 comments on commit 9f629d8

Please sign in to comment.