Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug]: Missing lock with Upstash #2969

Closed
1 task done
gorango opened this issue Dec 14, 2024 · 10 comments · Fixed by #2971
Closed
1 task done

[Bug]: Missing lock with Upstash #2969

gorango opened this issue Dec 14, 2024 · 10 comments · Fixed by #2971

Comments

@gorango
Copy link

gorango commented Dec 14, 2024

Version

v5.34.1

Platform

NodeJS

What happened?

Upgrading from 5.29.1 is throwing the "Missing lock for job" error after each job successfully completes. Unable to reproduce with a local instance of Redis - only occurs on Upstash (disabled eviction, works with <5.29).

Single queue with a single worker processing short jobs.

How to reproduce.

No response

Relevant log output

Error: Missing lock for job xxx. moveToFinished
    at Scripts.finishedErrors (/node_modules/.pnpm/[email protected]/node_modules/bullmq/dist/cjs/classes/scripts.js:1005:24)
    at Scripts.moveToFinished (/node_modules/.pnpm/[email protected]/node_modules/bullmq/dist/cjs/classes/scripts.js:391:24)
    at process.processTicksAndRejections (node:internal/process/task_queues:95:5)
    at async /node_modules/.pnpm/[email protected]/node_modules/bullmq/dist/cjs/classes/job.js:492:26
    at async handleFailed (/node_modules/.pnpm/[email protected]/node_modules/bullmq/dist/cjs/classes/worker.js:491:40)
    at async /node_modules/.pnpm/[email protected]/node_modules/bullmq/dist/cjs/classes/worker.js:520:32
    at async Worker.retryIfFailed (/node_modules/.pnpm/[email protected]/node_modules/bullmq/dist/cjs/classes/worker.js:743:24)

Code of Conduct

  • I agree to follow this project's Code of Conduct
@gorango gorango added the bug Something isn't working label Dec 14, 2024
@manast manast removed the bug Something isn't working label Dec 14, 2024
@manast
Copy link
Contributor

manast commented Dec 14, 2024

You must produce some code that reproduces this issue, if it only happens with Upstash then great, but we need to know what you are running so that we can have any possible chance to fix it.

@manast
Copy link
Contributor

manast commented Dec 14, 2024

Also, could you verify the exact version from which the issue can be reproduced? that may help in narrowing down the potential origin of the issue.

@gorango
Copy link
Author

gorango commented Dec 14, 2024

Can confirm v5.30.0 introduces the bug. The following code should suffice to reproduce.

import { Redis } from 'ioredis'
import { Queue, Worker } from 'bullmq'

const REDIS_URL = 'redis://default:[email protected]:6379'

export const connection = new Redis(REDIS_URL, { maxRetriesPerRequest: null })

async function main() {
	const myQueue = new Queue('myQueue', { connection })

	const worker = new Worker('myQueue', async (job) => {
		console.log('Processing job:', job.id)
		console.log('Job data:', job.data)
		return job.data
	}, { connection })

	await myQueue.add('testJob', { message: 'Hello BullMQ!' })
}

main().catch(console.error)

@manast
Copy link
Contributor

manast commented Dec 14, 2024

v5.30.0 only has a small change for job schedulers I cannot imagine that it would introduce an issue like this.

@gorango
Copy link
Author

gorango commented Dec 14, 2024

Sorry, it's v5.33.0

➜  cat package.json | jq '.dependencies'
{
  "bullmq": "5.32.0",
  "ioredis": "^5.3.2"
}
➜  node index.js
Processing job: 1
Job data: { message: 'Hello BullMQ!' }

➜  pnpm i [email protected]
Packages: +1 -1
+-
dependencies:
- bullmq 5.32.0
+ bullmq 5.33.0 (5.34.1 is available)
➜  node index.js
Processing job: 2
Job data: { message: 'Hello BullMQ!' }
Error: Missing lock for job 2. moveToFinished

@manast
Copy link
Contributor

manast commented Dec 14, 2024

Could you possibly to a "redis-cli monitor" against the upstash instance when you run the test that reproduces the issue? The output of the monitor would be very useful to find the issue.

@gorango
Copy link
Author

gorango commented Dec 14, 2024

Here it is redis-monitor.csv:

13:01:15.215 [0 76.67.116.197:45098] AUTH "(redacted)" "(redacted)"
13:01:15.217 [0 76.67.116.197:45098] INFO
13:01:15.223 [0 76.67.116.197:45108] AUTH "(redacted)" "(redacted)"
13:01:15.224 [0 76.67.116.197:45108] INFO
13:01:15.245 [0 76.67.116.197:45098] INFO
13:01:15.249 [0 76.67.116.197:45108] INFO
13:01:15.249 [0 76.67.116.197:45108] INFO
13:01:15.276 [0 76.67.116.197:45108] HMSET "bull:myQueue:meta" "opts.maxLenEvents" "10000" "version" "bullmq:5.33.0"
13:01:15.300 [0 76.67.116.197:45108] EVAL "--[[\n Adds a job to the queue by doing the following:\n - Increases the job counter if needed.\n - Creates a new job key with the job data.\n - if delayed:\n - computes timestamp.\n - adds to delayed zset.\n - Emits a global event 'delayed' if the job is delayed.\n - if not delayed\n - Adds the jobId to the wait/paused list in one of three ways:\n - LIFO\n - FIFO\n - prioritized.\n - Adds the job to the \"added\" list so that workers gets notified.\n Input:\n KEYS[1] 'wait',\n KEYS[2] 'paused'\n KEYS[3] 'meta'\n KEYS[4] 'id'\n KEYS[5] 'completed'\n KEYS[6] 'active'\n KEYS[7] events stream key\n KEYS[8] marker key\n ARGV[1] msgpacked arguments array\n [1] key prefix,\n [2] custom id (will not generate one automatically)\n [3] name\n [4] timestamp\n [5] parentKey?\n [6] waitChildrenKey key.\n [7] parent dependencies key.\n [8] parent? {id, queueKey}\n [9] repeat job key\n [10] deduplication key\n ARGV[2] Json stringified job data\n ARGV[3] msgpacked options\n Output:\n jobId - OK\n -5 - Missing parent key\n]]\nlocal eventsKey = KEYS[7]\nlocal jobId\nlocal jobIdKey\nlocal rcall = redis.call\nlocal args = cmsgpack.unpack(ARGV[1])\nlocal data = ARGV[2]\nlocal opts = cmsgpack.unpack(ARGV[3])\nlocal parentKey = args[5]\nlocal parent = args[8]\nlocal repeatJobKey = args[9]\nlocal deduplicationKey = args[10]\nlocal parentData\n-- Includes\n--[[\n Function to add job in target list and add marker if needed.\n]]\n-- Includes\n--[[\n Add marker if needed when a job is available.\n]]\nlocal function addBaseMarkerIfNeeded(markerKey, isPausedOrMaxed)\n if not isPausedOrMaxed then\n rcall(\"ZADD\", markerKey, 0, \"0\")\n end \nend\nlocal function addJobInTargetList(targetKey, markerKey, pushCmd, isPausedOrMaxed, jobId)\n rcall(pushCmd, targetKey, jobId)\n addBaseMarkerIfNeeded(markerKey, isPausedOrMaxed)\nend\n--[[\n Function to debounce a job.\n]]\nlocal function deduplicateJob(prefixKey, deduplicationOpts, jobId, deduplicationKey, eventsKey, maxEvents)\n local deduplicationId = deduplicationOpts and deduplicationOpts['id']\n if deduplicationId then\n local ttl = deduplicationOpts['ttl']\n local deduplicationKeyExists\n if ttl then\n deduplicationKeyExists = not rcall('SET', deduplicationKey, jobId, 'PX', ttl, 'NX')\n else\n deduplicationKeyExists = not rcall('SET', deduplicationKey, jobId, 'NX')\n end\n if deduplicationKeyExists then\n local currentDebounceJobId = rcall('GET', deduplicationKey)\n rcall(\"XADD\", eventsKey, \"MAXLEN\", \"\", maxEvents, \"*\", \"event\",\n \"debounced\", \"jobId\", currentDebounceJobId, \"debounceId\", deduplicationId)\n rcall(\"XADD\", eventsKey, \"MAXLEN\", \"\", maxEvents, \"*\", \"event\",\n \"deduplicated\", \"jobId\", currentDebounceJobId, \"deduplicationId\", deduplicationId)\n return currentDebounceJobId\n end\n end\nend\n--[[\n Function to get max events value or set by default 10000.\n]]\nlocal function getOrSetMaxEvents(metaKey)\n local maxEvents = rcall(\"HGET\", metaKey, \"opts.maxLenEvents\")\n if not maxEvents then\n maxEvents = 10000\n rcall(\"HSET\", metaKey, \"opts.maxLenEvents\", maxEvents)\n end\n return maxEvents\nend\n--[[\n Function to check for the meta.paused key to decide if we are paused or not\n (since an empty list and !EXISTS are not really the same).\n]]\nlocal function getTargetQueueList(queueMetaKey, activeKey, waitKey, pausedKey)\n local queueAttributes = rcall(\"HMGET\", queueMetaKey, \"paused\", \"concurrency\")\n if queueAttributes[1] then\n return pausedKey, true\n else\n if queueAttributes[2] then\n local activeCount = rcall(\"LLEN\", activeKey)\n if activeCount >= tonumber(queueAttributes[2]) then\n return waitKey, true\n else\n return waitKey, false\n end\n end\n end\n return waitKey, false\nend\n--[[\n Function to handle the case when job is duplicated.\n]]\n-- Includes\n--[[\n This function is used to update the parent's dependencies if the job\n is already completed and about to be ignored. The parent must get its\n dependencies updated to avoid the parent job being stuck forever in \n the waiting-children state.\n]]\n-- Includes\n--[[\n Validate and move or add dependencies to parent.\n]]\n-- Includes\n--[[\n Validate and move parent to active if needed.\n]]\n-- Includes\n--[[\n Add delay marker if needed.\n]]\n-- Includes\n--[[\n Function to return the next delayed job timestamp.\n]]\nlocal function getNextDelayedTimestamp(delayedKey)\n local result = rcall(\"ZRANGE\", delayedKey, 0, 0, \"WITHSCORES\")\n if #result then\n local nextTimestamp = tonumber(result[2])\n if nextTimestamp ~= nil then \n return nextTimestamp / 0x1000\n end\n end\nend\nlocal function addDelayMarkerIfNeeded(markerKey, delayedKey)\n local nextTimestamp = getNextDelayedTimestamp(delayedKey)\n if nextTimestamp ~= nil then\n -- Replace the score of the marker with the newest known\n -- next timestamp.\n rcall(\"ZADD\", markerKey, nextTimestamp, \"1\")\n end\nend\n--[[\n Function to add job considering priority.\n]]\n-- Includes\nlocal function addJobWithPriority(markerKey, prioritizedKey, priority, jobId, priorityCounterKey,\n isPausedOrMaxed)\n local prioCounter = rcall(\"INCR\", priorityCounterKey)\n local score = priority * 0x100000000 + prioCounter % 0x100000000\n rcall(\"ZADD\", prioritizedKey, score, jobId)\n addBaseMarkerIfNeeded(markerKey, isPausedOrMaxed)\nend\n--[[\n Function to check if queue is paused or maxed\n (since an empty list and !EXISTS are not really the same).\n]]\nlocal function isQueuePausedOrMaxed(queueMetaKey, activeKey)\n local queueAttributes = rcall(\"HMGET\", queueMetaKey, \"paused\", \"concurrency\")\n if queueAttributes[1] then\n return true\n else\n if queueAttributes[2] then\n local activeCount = rcall(\"LLEN\", activeKey)\n return activeCount >= tonumber(queueAttributes[2])\n end\n end\n return false\nend\nlocal function moveParentToWaitIfNeeded(parentQueueKey, parentDependenciesKey,\n parentKey, parentId, timestamp)\n local isParentActive = rcall(\"ZSCORE\",\n parentQueueKey .. \":waiting-children\", parentId)\n if rcall(\"SCARD\", parentDependenciesKey) == 0 and isParentActive then\n rcall(\"ZREM\", parentQueueKey .. \":waiting-children\", parentId)\n local parentWaitKey = parentQueueKey .. \":wait\"\n local parentPausedKey = parentQueueKey .. \":paused\"\n local parentActiveKey = parentQueueKey .. \":active\"\n local parentMetaKey = parentQueueKey .. \":meta\"\n local parentMarkerKey = parentQueueKey .. \":marker\"\n local jobAttributes = rcall(\"HMGET\", parentKey, \"priority\", \"delay\")\n local priority = tonumber(jobAttributes[1]) or 0\n local delay = tonumber(jobAttributes[2]) or 0\n if delay > 0 then\n local delayedTimestamp = tonumber(timestamp) + delay\n local score = delayedTimestamp * 0x1000\n local parentDelayedKey = parentQueueKey .. \":delayed\"\n rcall(\"ZADD\", parentDelayedKey, score, parentId)\n rcall(\"XADD\", parentQueueKey .. \":events\", \"*\", \"event\", \"delayed\",\n \"jobId\", parentId, \"delay\", delayedTimestamp)\n addDelayMarkerIfNeeded(parentMarkerKey, parentDelayedKey)\n else\n if priority == 0 then\n local parentTarget, isParentPausedOrMaxed =\n getTargetQueueList(parentMetaKey, parentActiveKey, parentWaitKey,\n parentPausedKey)\n addJobInTargetList(parentTarget, parentMarkerKey, \"RPUSH\", isParentPausedOrMaxed,\n parentId)\n else\n local isPausedOrMaxed = isQueuePausedOrMaxed(parentMetaKey, parentActiveKey)\n addJobWithPriority(parentMarkerKey,\n parentQueueKey .. \":prioritized\", priority,\n parentId, parentQueueKey .. \":pc\", isPausedOrMaxed)\n end\n rcall(\"XADD\", parentQueueKey .. \":events\", \"*\", \"event\", \"waiting\",\n \"jobId\", parentId, \"prev\", \"waiting-children\")\n end\n end\nend\nlocal function updateParentDepsIfNeeded(parentKey, parentQueueKey, parentDependenciesKey,\n parentId, jobIdKey, returnvalue, timestamp )\n local processedSet = parentKey .. \":processed\"\n rcall(\"HSET\", processedSet, jobIdKey, returnvalue)\n moveParentToWaitIfNeeded(parentQueueKey, parentDependenciesKey, parentKey, parentId, timestamp)\nend\nlocal function updateExistingJobsParent(parentKey, parent, parentData,\n parentDependenciesKey, completedKey,\n jobIdKey, jobId, timestamp)\n if parentKey ~= nil then\n if rcall(\"ZSCORE\", completedKey, jobId) ~= false then\n local returnvalue = rcall(\"HGET\", jobIdKey, \"returnvalue\")\n updateParentDepsIfNeeded(parentKey, parent['queueKey'],\n parentDependenciesKey, parent['id'],\n jobIdKey, returnvalue, timestamp)\n else\n if parentDependenciesKey ~= nil then\n rcall(\"SADD\", parentDependenciesKey, jobIdKey)\n end\n end\n rcall(\"HMSET\", jobIdKey, \"parentKey\", parentKey, \"parent\", parentData)\n end\nend\nlocal function handleDuplicatedJob(jobKey, jobId, currentParentKey, currentParent,\n parentData, parentDependenciesKey, completedKey, eventsKey, maxEvents, timestamp)\n local existedParentKey = rcall(\"HGET\", jobKey, \"parentKey\")\n if not existedParentKey or existedParentKey == currentParentKey then\n updateExistingJobsParent(currentParentKey, currentParent, parentData,\n parentDependenciesKey, completedKey, jobKey,\n jobId, timestamp)\n else\n if currentParentKey ~= nil and currentParentKey = existedParentKey\n and (rcall(\"EXISTS\", existedParentKey) == 1) then\n return -7\n end\n end\n rcall(\"XADD\", eventsKey, \"MAXLEN\", \"\", maxEvents, \"*\", \"event\",\n \"duplicated\", \"jobId\", jobId)\n return jobId .. \"\" -- convert to string\nend\n--[[\n Function to store a job\n]]\nlocal function storeJob(eventsKey, jobIdKey, jobId, name, data, opts, timestamp,\n parentKey, parentData, repeatJobKey)\n local jsonOpts = cjson.encode(opts)\n local delay = opts['delay'] or 0\n local priority = opts['priority'] or 0\n local debounceId = opts['de'] and opts['de']['id']\n local optionalValues = {}\n if parentKey ~= nil then\n table.insert(optionalValues, \"parentKey\")\n table.insert(optionalValues, parentKey)\n table.insert(optionalValues, \"parent\")\n table.insert(optionalValues, parentData)\n end\n if repeatJobKey ~= nil then\n table.insert(optionalValues, \"rjk\")\n table.insert(optionalValues, repeatJobKey)\n end\n if debounceId then\n table.insert(optionalValues, \"deid\")\n table.insert(optionalValues, debounceId)\n end\n rcall(\"HMSET\", jobIdKey, \"name\", name, \"data\", data, \"opts\", jsonOpts,\n \"timestamp\", timestamp, \"delay\", delay, \"priority\", priority,\n unpack(optionalValues))\n rcall(\"XADD\", eventsKey, \"*\", \"event\", \"added\", \"jobId\", jobId, \"name\", name)\n return delay, priority\nend\nif parentKey ~= nil then\n if rcall(\"EXISTS\", parentKey) = 1 then return -5 end\n parentData = cjson.encode(parent)\nend\nlocal jobCounter = rcall(\"INCR\", KEYS[4])\nlocal metaKey = KEYS[3]\nlocal maxEvents = getOrSetMaxEvents(metaKey)\nlocal parentDependenciesKey = args[7]\nlocal timestamp = args[4]\nif args[2] == \"\" then\n jobId = jobCounter\n jobIdKey = args[1] .. jobId\nelse\n jobId = args[2]\n jobIdKey = args[1] .. jobId\n if rcall(\"EXISTS\", jobIdKey) == 1 then\n return handleDuplicatedJob(jobIdKey, jobId, parentKey, parent,\n parentData, parentDependenciesKey, KEYS[5], eventsKey,\n maxEvents, timestamp)\n end\nend\nlocal deduplicationJobId = deduplicateJob(args[1], opts['de'],\n jobId, deduplicationKey, eventsKey, maxEvents)\nif deduplicationJobId then\n return deduplicationJobId\nend\n-- Store the job.\nstoreJob(eventsKey, jobIdKey, jobId, args[3], ARGV[2], opts, timestamp,\n parentKey, parentData, repeatJobKey)\nlocal target, isPausedOrMaxed = getTargetQueueList(metaKey, KEYS[6], KEYS[1], KEYS[2])\n-- LIFO or FIFO\nlocal pushCmd = opts['lifo'] and 'RPUSH' or 'LPUSH'\naddJobInTargetList(target, KEYS[8], pushCmd, isPausedOrMaxed, jobId)\n-- Emit waiting event\nrcall(\"XADD\", eventsKey, \"MAXLEN\", \"\", maxEvents, \"*\", \"event\", \"waiting\",\n \"jobId\", jobId)\n-- Check if this job is a child of another job, if so add it to the parents dependencies\nif parentDependenciesKey ~= nil then\n rcall(\"SADD\", parentDependenciesKey, jobIdKey)\nend\nreturn jobId .. \"\" -- convert to string\n" "8" "bull:myQueue:wait" "bull:myQueue:paused" "bull:myQueue:meta" "bull:myQueue:id" "bull:myQueue:completed" "bull:myQueue:active" "bull:myQueue:events" "bull:myQueue:marker" "\x9a\xadbull:myQueue:\xa0\xa7testJob\xcbBy<eN\xef\xf0\x00\xc0\xc0\xc0\xc0\xc0\xc0" "{\"message\":\"Hello BullMQ!\"}" "\xde\x00\x01\xa8attempts\x00"
13:01:15.300 [0 lua] INCR "bull:myQueue:id"
13:01:15.300 [0 lua] HGET "bull:myQueue:meta" "opts.maxLenEvents"
13:01:15.300 [0 lua] HMSET "bull:myQueue:1" "name" "testJob" "data" "{\"message\":\"Hello BullMQ!\"}" "opts" "{\"attempts\":0}" "timestamp" "1734199275263" "delay" "0" "priority" "0"
13:01:15.300 [0 lua] XADD "bull:myQueue:events" "*" "event" "added" "jobId" "1" "name" "testJob"
13:01:15.301 [0 lua] HMGET "bull:myQueue:meta" "paused" "concurrency"
13:01:15.301 [0 lua] LPUSH "bull:myQueue:wait" "1"
13:01:15.301 [0 lua] ZADD "bull:myQueue:marker" "0" "0"
13:01:15.301 [0 lua] XADD "bull:myQueue:events" "MAXLEN" "~" "10000" "*" "event" "waiting" "jobId" "1"
13:01:15.341 [0 76.67.116.197:45108] EVAL "--[[\n Move stalled jobs to wait.\n Input:\n KEYS[1] 'stalled' (SET)\n KEYS[2] 'wait', (LIST)\n KEYS[3] 'active', (LIST)\n KEYS[4] 'failed', (ZSET)\n KEYS[5] 'stalled-check', (KEY)\n KEYS[6] 'meta', (KEY)\n KEYS[7] 'paused', (LIST)\n KEYS[8] 'marker'\n KEYS[9] 'event stream' (STREAM)\n ARGV[1] Max stalled job count\n ARGV[2] queue.toKey('')\n ARGV[3] timestamp\n ARGV[4] max check time\n Events:\n 'stalled' with stalled job id.\n]]\nlocal rcall = redis.call\n-- Includes\n--[[\n Function to add job in target list and add marker if needed.\n]]\n-- Includes\n--[[\n Add marker if needed when a job is available.\n]]\nlocal function addBaseMarkerIfNeeded(markerKey, isPausedOrMaxed)\n if not isPausedOrMaxed then\n rcall(\"ZADD\", markerKey, 0, \"0\")\n end \nend\nlocal function addJobInTargetList(targetKey, markerKey, pushCmd, isPausedOrMaxed, jobId)\n rcall(pushCmd, targetKey, jobId)\n addBaseMarkerIfNeeded(markerKey, isPausedOrMaxed)\nend\n--[[\n Function to loop in batches.\n Just a bit of warning, some commands as ZREM\n could receive a maximum of 7000 parameters per call.\n]]\nlocal function batches(n, batchSize)\n local i = 0\n return function()\n local from = i * batchSize + 1\n i = i + 1\n if (from <= n) then\n local to = math.min(from + batchSize - 1, n)\n return from, to\n end\n end\nend\n--[[\n Function to check for the meta.paused key to decide if we are paused or not\n (since an empty list and !EXISTS are not really the same).\n]]\nlocal function getTargetQueueList(queueMetaKey, activeKey, waitKey, pausedKey)\n local queueAttributes = rcall(\"HMGET\", queueMetaKey, \"paused\", \"concurrency\")\n if queueAttributes[1] then\n return pausedKey, true\n else\n if queueAttributes[2] then\n local activeCount = rcall(\"LLEN\", activeKey)\n if activeCount >= tonumber(queueAttributes[2]) then\n return waitKey, true\n else\n return waitKey, false\n end\n end\n end\n return waitKey, false\nend\n--[[\n Function to recursively move from waitingChildren to failed.\n]]\n-- Includes\n--[[\n Validate and move parent to active if needed.\n]]\n-- Includes\n--[[\n Add delay marker if needed.\n]]\n-- Includes\n--[[\n Function to return the next delayed job timestamp.\n]]\nlocal function getNextDelayedTimestamp(delayedKey)\n local result = rcall(\"ZRANGE\", delayedKey, 0, 0, \"WITHSCORES\")\n if #result then\n local nextTimestamp = tonumber(result[2])\n if nextTimestamp ~= nil then \n return nextTimestamp / 0x1000\n end\n end\nend\nlocal function addDelayMarkerIfNeeded(markerKey, delayedKey)\n local nextTimestamp = getNextDelayedTimestamp(delayedKey)\n if nextTimestamp ~= nil then\n -- Replace the score of the marker with the newest known\n -- next timestamp.\n rcall(\"ZADD\", markerKey, nextTimestamp, \"1\")\n end\nend\n--[[\n Function to add job considering priority.\n]]\n-- Includes\nlocal function addJobWithPriority(markerKey, prioritizedKey, priority, jobId, priorityCounterKey,\n isPausedOrMaxed)\n local prioCounter = rcall(\"INCR\", priorityCounterKey)\n local score = priority * 0x100000000 + prioCounter % 0x100000000\n rcall(\"ZADD\", prioritizedKey, score, jobId)\n addBaseMarkerIfNeeded(markerKey, isPausedOrMaxed)\nend\n--[[\n Function to check if queue is paused or maxed\n (since an empty list and !EXISTS are not really the same).\n]]\nlocal function isQueuePausedOrMaxed(queueMetaKey, activeKey)\n local queueAttributes = rcall(\"HMGET\", queueMetaKey, \"paused\", \"concurrency\")\n if queueAttributes[1] then\n return true\n else\n if queueAttributes[2] then\n local activeCount = rcall(\"LLEN\", activeKey)\n return activeCount >= tonumber(queueAttributes[2])\n end\n end\n return false\nend\nlocal function moveParentToWaitIfNeeded(parentQueueKey, parentDependenciesKey,\n parentKey, parentId, timestamp)\n local isParentActive = rcall(\"ZSCORE\",\n parentQueueKey .. \":waiting-children\", parentId)\n if rcall(\"SCARD\", parentDependenciesKey) == 0 and isParentActive then\n rcall(\"ZREM\", parentQueueKey .. \":waiting-children\", parentId)\n local parentWaitKey = parentQueueKey .. \":wait\"\n local parentPausedKey = parentQueueKey .. \":paused\"\n local parentActiveKey = parentQueueKey .. \":active\"\n local parentMetaKey = parentQueueKey .. \":meta\"\n local parentMarkerKey = parentQueueKey .. \":marker\"\n local jobAttributes = rcall(\"HMGET\", parentKey, \"priority\", \"delay\")\n local priority = tonumber(jobAttributes[1]) or 0\n local delay = tonumber(jobAttributes[2]) or 0\n if delay > 0 then\n local delayedTimestamp = tonumber(timestamp) + delay\n local score = delayedTimestamp * 0x1000\n local parentDelayedKey = parentQueueKey .. \":delayed\"\n rcall(\"ZADD\", parentDelayedKey, score, parentId)\n rcall(\"XADD\", parentQueueKey .. \":events\", \"*\", \"event\", \"delayed\",\n \"jobId\", parentId, \"delay\", delayedTimestamp)\n addDelayMarkerIfNeeded(parentMarkerKey, parentDelayedKey)\n else\n if priority == 0 then\n local parentTarget, isParentPausedOrMaxed =\n getTargetQueueList(parentMetaKey, parentActiveKey, parentWaitKey,\n parentPausedKey)\n addJobInTargetList(parentTarget, parentMarkerKey, \"RPUSH\", isParentPausedOrMaxed,\n parentId)\n else\n local isPausedOrMaxed = isQueuePausedOrMaxed(parentMetaKey, parentActiveKey)\n addJobWithPriority(parentMarkerKey,\n parentQueueKey .. \":prioritized\", priority,\n parentId, parentQueueKey .. \":pc\", isPausedOrMaxed)\n end\n rcall(\"XADD\", parentQueueKey .. \":events\", \"*\", \"event\", \"waiting\",\n \"jobId\", parentId, \"prev\", \"waiting-children\")\n end\n end\nend\n--[[\n Function to remove deduplication key if needed.\n]]\nlocal function removeDeduplicationKeyIfNeeded(prefixKey, deduplicationId)\n if deduplicationId then\n local deduplicationKey = prefixKey .. \"de:\" .. deduplicationId\n local pttl = rcall(\"PTTL\", deduplicationKey)\n if pttl == 0 or pttl == -1 then\n rcall(\"DEL\", deduplicationKey)\n end\n end\nend\n--[[\n Functions to remove jobs when removeOnFail option is provided.\n]]\n-- Includes\n--[[\n Function to remove job.\n]]\n-- Includes\n--[[\n Function to remove deduplication key.\n]]\nlocal function removeDeduplicationKey(prefixKey, jobKey)\n local deduplicationId = rcall(\"HGET\", jobKey, \"deid\")\n if deduplicationId then\n local deduplicationKey = prefixKey .. \"de:\" .. deduplicationId\n rcall(\"DEL\", deduplicationKey)\n end\nend\n--[[\n Function to remove job keys.\n]]\nlocal function removeJobKeys(jobKey)\n return rcall(\"DEL\", jobKey, jobKey .. ':logs',\n jobKey .. ':dependencies', jobKey .. ':processed', jobKey .. ':failed')\nend\n--[[\n Check if this job has a parent. If so we will just remove it from\n the parent child list, but if it is the last child we should move the parent to \"wait/paused\"\n which requires code from \"moveToFinished\"\n]]\n-- Includes\n--[[\n Functions to destructure job key.\n Just a bit of warning, these functions may be a bit slow and affect performance significantly.\n]]\nlocal getJobIdFromKey = function (jobKey)\n return string.match(jobKey, \".*:(.*)\")\nend\nlocal getJobKeyPrefix = function (jobKey, jobId)\n return string.sub(jobKey, 0, #jobKey - #jobId)\nend\nlocal function moveParentToWait(parentPrefix, parentId, emitEvent)\n local parentTarget, isPausedOrMaxed = getTargetQueueList(parentPrefix .. \"meta\", parentPrefix .. \"active\",\n parentPrefix .. \"wait\", parentPrefix .. \"paused\")\n addJobInTargetList(parentTarget, parentPrefix .. \"marker\", \"RPUSH\", isPausedOrMaxed, parentId)\n if emitEvent then\n local parentEventStream = parentPrefix .. \"events\"\n rcall(\"XADD\", parentEventStream, \"*\", \"event\", \"waiting\", \"jobId\", parentId, \"prev\", \"waiting-children\")\n end\nend\nlocal function removeParentDependencyKey(jobKey, hard, parentKey, baseKey, debounceId)\n if parentKey then\n local parentDependenciesKey = parentKey .. \":dependencies\"\n local result = rcall(\"SREM\", parentDependenciesKey, jobKey)\n if result > 0 then\n local pendingDependencies = rcall(\"SCARD\", parentDependenciesKey)\n if pendingDependencies == 0 then\n local parentId = getJobIdFromKey(parentKey)\n local parentPrefix = getJobKeyPrefix(parentKey, parentId)\n local numRemovedElements = rcall(\"ZREM\", parentPrefix .. \"waiting-children\", parentId)\n if numRemovedElements == 1 then\n if hard then -- remove parent in same queue\n if parentPrefix == baseKey then\n removeParentDependencyKey(parentKey, hard, nil, baseKey, nil)\n removeJobKeys(parentKey)\n if debounceId then\n rcall(\"DEL\", parentPrefix .. \"de:\" .. debounceId)\n end\n else\n moveParentToWait(parentPrefix, parentId)\n end\n else\n moveParentToWait(parentPrefix, parentId, true)\n end\n end\n end\n return true\n end\n else\n local parentAttributes = rcall(\"HMGET\", jobKey, \"parentKey\", \"deid\")\n local missedParentKey = parentAttributes[1]\n if( (type(missedParentKey) == \"string\") and missedParentKey ~= \"\"\n and (rcall(\"EXISTS\", missedParentKey) == 1)) then\n local parentDependenciesKey = missedParentKey .. \":dependencies\"\n local result = rcall(\"SREM\", parentDependenciesKey, jobKey)\n if result > 0 then\n local pendingDependencies = rcall(\"SCARD\", parentDependenciesKey)\n if pendingDependencies == 0 then\n local parentId = getJobIdFromKey(missedParentKey)\n local parentPrefix = getJobKeyPrefix(missedParentKey, parentId)\n local numRemovedElements = rcall(\"ZREM\", parentPrefix .. \"waiting-children\", parentId)\n if numRemovedElements == 1 then\n if hard then\n if parentPrefix == baseKey then\n removeParentDependencyKey(missedParentKey, hard, nil, baseKey, nil)\n removeJobKeys(missedParentKey)\n if parentAttributes[2] then\n rcall(\"DEL\", parentPrefix .. \"de:\" .. parentAttributes[2])\n end\n else\n moveParentToWait(parentPrefix, parentId)\n end\n else\n moveParentToWait(parentPrefix, parentId, true)\n end\n end\n end\n return true\n end\n end\n end\n return false\nend\nlocal function removeJob(jobId, hard, baseKey, shouldRemoveDeduplicationKey)\n local jobKey = baseKey .. jobId\n removeParentDependencyKey(jobKey, hard, nil, baseKey)\n if shouldRemoveDeduplicationKey then\n removeDeduplicationKey(baseKey, jobKey)\n end\n removeJobKeys(jobKey)\nend\n--[[\n Functions to remove jobs by max age.\n]]\n-- Includes\nlocal function removeJobsByMaxAge(timestamp, maxAge, targetSet, prefix,\n shouldRemoveDebounceKey)\n local start = timestamp - maxAge * 1000\n local jobIds = rcall(\"ZREVRANGEBYSCORE\", targetSet, start, \"-inf\")\n for i, jobId in ipairs(jobIds) do\n removeJob(jobId, false, prefix, false --[[remove debounce key]])\n end\n rcall(\"ZREMRANGEBYSCORE\", targetSet, \"-inf\", start)\nend\n--[[\n Functions to remove jobs by max count.\n]]\n-- Includes\nlocal function removeJobsByMaxCount(maxCount, targetSet, prefix)\n local start = maxCount\n local jobIds = rcall(\"ZREVRANGE\", targetSet, start, -1)\n for i, jobId in ipairs(jobIds) do\n removeJob(jobId, false, prefix, false --[[remove debounce key]])\n end\n rcall(\"ZREMRANGEBYRANK\", targetSet, 0, -(maxCount + 1))\nend\nlocal function removeJobsOnFail(queueKeyPrefix, failedKey, jobId, opts, timestamp)\n local removeOnFailType = type(opts[\"removeOnFail\"])\n if removeOnFailType == \"number\" then\n removeJobsByMaxCount(opts[\"removeOnFail\"],\n failedKey, queueKeyPrefix)\n elseif removeOnFailType == \"boolean\" then\n if opts[\"removeOnFail\"] then\n removeJob(jobId, false, queueKeyPrefix,\n false --[[remove debounce key]])\n rcall(\"ZREM\", failedKey, jobId)\n end\n elseif removeOnFailType ~= \"nil\" then\n local maxAge = opts[\"removeOnFail\"][\"age\"]\n local maxCount = opts[\"removeOnFail\"][\"count\"]\n if maxAge ~= nil then\n removeJobsByMaxAge(timestamp, maxAge,\n failedKey, queueKeyPrefix)\n end\n if maxCount = nil and maxCount > 0 then\n removeJobsByMaxCount(maxCount, failedKey,\n queueKeyPrefix)\n end\n end \nend\nlocal function moveParentFromWaitingChildrenToFailed( parentQueueKey, parentKey, parentId, jobIdKey, timestamp)\n if rcall(\"ZREM\", parentQueueKey .. \":waiting-children\", parentId) == 1 then\n local parentQueuePrefix = parentQueueKey .. \":\"\n local parentFailedKey = parentQueueKey .. \":failed\"\n rcall(\"ZADD\", parentFailedKey, timestamp, parentId)\n local failedReason = \"child \" .. jobIdKey .. \" failed\"\n rcall(\"HMSET\", parentKey, \"failedReason\", failedReason, \"finishedOn\", timestamp)\n rcall(\"XADD\", parentQueueKey .. \":events\", \"*\", \"event\", \"failed\", \"jobId\", parentId, \"failedReason\",\n failedReason, \"prev\", \"waiting-children\")\n local jobAttributes = rcall(\"HMGET\", parentKey, \"parent\", \"deid\", \"opts\")\n removeDeduplicationKeyIfNeeded(parentQueueKey .. \":\", jobAttributes[2])\n if jobAttributes[1] then\n local parentData = cjson.decode(jobAttributes[1])\n if parentData['fpof'] then\n moveParentFromWaitingChildrenToFailed(\n parentData['queueKey'],\n parentData['queueKey'] .. ':' .. parentData['id'],\n parentData['id'],\n parentKey,\n timestamp\n )\n elseif parentData['idof'] or parentData['rdof'] then\n local grandParentKey = parentData['queueKey'] .. ':' .. parentData['id']\n local grandParentDependenciesSet = grandParentKey .. \":dependencies\"\n if rcall(\"SREM\", grandParentDependenciesSet, parentKey) == 1 then\n moveParentToWaitIfNeeded(parentData['queueKey'], grandParentDependenciesSet,\n grandParentKey, parentData['id'], timestamp)\n if parentData['idof'] then\n local grandParentFailedSet = grandParentKey .. \":failed\"\n rcall(\"HSET\", grandParentFailedSet, parentKey, failedReason)\n end\n end\n end\n end\n local parentRawOpts = jobAttributes[3]\n local parentOpts = cjson.decode(parentRawOpts)\n removeJobsOnFail(parentQueuePrefix, parentFailedKey, parentId, parentOpts, timestamp)\n end\nend\n--[[\n Function to trim events, default 10000.\n]]\n-- Includes\n--[[\n Function to get max events value or set by default 10000.\n]]\nlocal function getOrSetMaxEvents(metaKey)\n local maxEvents = rcall(\"HGET\", metaKey, \"opts.maxLenEvents\")\n if not maxEvents then\n maxEvents = 10000\n rcall(\"HSET\", metaKey, \"opts.maxLenEvents\", maxEvents)\n end\n return maxEvents\nend\nlocal function trimEvents(metaKey, eventStreamKey)\n local maxEvents = getOrSetMaxEvents(metaKey)\n if maxEvents = false then\n rcall(\"XTRIM\", eventStreamKey, \"MAXLEN\", \"\", maxEvents)\n else\n rcall(\"XTRIM\", eventStreamKey, \"MAXLEN\", \"\", 10000)\n end\nend\nlocal stalledKey = KEYS[1]\nlocal waitKey = KEYS[2]\nlocal activeKey = KEYS[3]\nlocal failedKey = KEYS[4]\nlocal stalledCheckKey = KEYS[5]\nlocal metaKey = KEYS[6]\nlocal pausedKey = KEYS[7]\nlocal markerKey = KEYS[8]\nlocal eventStreamKey = KEYS[9]\nlocal maxStalledJobCount = tonumber(ARGV[1])\nlocal queueKeyPrefix = ARGV[2]\nlocal timestamp = ARGV[3]\nlocal maxCheckTime = ARGV[4]\nif rcall(\"EXISTS\", stalledCheckKey) == 1 then return {{}, {}} end\nrcall(\"SET\", stalledCheckKey, timestamp, \"PX\", maxCheckTime)\n-- Trim events before emiting them to avoid trimming events emitted in this script\ntrimEvents(metaKey, eventStreamKey)\n-- Move all stalled jobs to wait\nlocal stalling = rcall('SMEMBERS', stalledKey)\nlocal stalled = {}\nlocal failed = {}\nif (#stalling > 0) then\n rcall('DEL', stalledKey)\n -- Remove from active list\n for i, jobId in ipairs(stalling) do\n -- Markers in waitlist DEPRECATED in v5: Remove in v6.\n if string.sub(jobId, 1, 2) == \"0:\" then\n -- If the jobId is a delay marker ID we just remove it.\n rcall(\"LREM\", activeKey, 1, jobId)\n else\n local jobKey = queueKeyPrefix .. jobId\n -- Check that the lock is also missing, then we can handle this job as really stalled.\n if (rcall(\"EXISTS\", jobKey .. \":lock\") == 0) then\n -- Remove from the active queue.\n local removed = rcall(\"LREM\", activeKey, 1, jobId)\n if (removed > 0) then\n -- If this job has been stalled too many times, such as if it crashes the worker, then fail it.\n local stalledCount =\n rcall(\"HINCRBY\", jobKey, \"stalledCounter\", 1)\n if (stalledCount > maxStalledJobCount) then\n local jobAttributes = rcall(\"HMGET\", jobKey, \"opts\", \"parent\", \"deid\")\n local rawOpts = jobAttributes[1]\n local rawParentData = jobAttributes[2]\n local opts = cjson.decode(rawOpts)\n rcall(\"ZADD\", failedKey, timestamp, jobId)\n removeDeduplicationKeyIfNeeded(queueKeyPrefix, jobAttributes[3])\n local failedReason =\n \"job stalled more than allowable limit\"\n rcall(\"HMSET\", jobKey, \"failedReason\", failedReason,\n \"finishedOn\", timestamp)\n rcall(\"XADD\", eventStreamKey, \"*\", \"event\",\n \"failed\", \"jobId\", jobId, 'prev', 'active',\n 'failedReason', failedReason)\n if rawParentData ~= false then\n if opts['fpof'] then\n local parentData = cjson.decode(rawParentData)\n moveParentFromWaitingChildrenToFailed(\n parentData['queueKey'],\n parentData['queueKey'] .. ':' .. parentData['id'],\n parentData['id'],\n jobKey,\n timestamp\n )\n elseif opts['idof'] or opts['rdof'] then\n local parentData = cjson.decode(rawParentData)\n local parentKey = parentData['queueKey'] .. ':' .. parentData['id']\n local dependenciesSet = parentKey .. \":dependencies\"\n if rcall(\"SREM\", dependenciesSet, jobKey) == 1 then\n moveParentToWaitIfNeeded(parentData['queueKey'], dependenciesSet,\n parentKey, parentData['id'], timestamp)\n if opts['idof'] then\n local failedSet = parentKey .. \":failed\"\n rcall(\"HSET\", failedSet, jobKey, failedReason)\n end\n end\n end\n end\n removeJobsOnFail(queueKeyPrefix, failedKey, jobId, opts, timestamp)\n table.insert(failed, jobId)\n else\n local target, isPausedOrMaxed =\n getTargetQueueList(metaKey, activeKey, waitKey, pausedKey)\n -- Move the job back to the wait queue, to immediately be picked up by a waiting worker.\n addJobInTargetList(target, markerKey, \"RPUSH\", isPausedOrMaxed, jobId)\n rcall(\"XADD\", eventStreamKey, \"*\", \"event\",\n \"waiting\", \"jobId\", jobId, 'prev', 'active')\n -- Emit the stalled event\n rcall(\"XADD\", eventStreamKey, \"*\", \"event\",\n \"stalled\", \"jobId\", jobId)\n table.insert(stalled, jobId)\n end\n end\n end\n end\n end\nend\n-- Mark potentially stalled jobs\nlocal active = rcall('LRANGE', activeKey, 0, -1)\nif (#active > 0) then\n for from, to in batches(#active, 7000) do\n rcall('SADD', stalledKey, unpack(active, from, to))\n end\nend\nreturn {failed, stalled}" "9" "bull:myQueue:stalled" "bull:myQueue:wait" "bull:myQueue:active" "bull:myQueue:failed" "bull:myQueue:stalled-check" "bull:myQueue:meta" "bull:myQueue:paused" "bull:myQueue:marker" "bull:myQueue:events" "1" "bull:myQueue:" "1734199275267" "30000"
13:01:15.341 [0 lua] EXISTS "bull:myQueue:stalled-check"
13:01:15.341 [0 lua] SET "bull:myQueue:stalled-check" "1734199275267" "PX" "30000"
13:01:15.341 [0 lua] HGET "bull:myQueue:meta" "opts.maxLenEvents"
13:01:15.341 [0 lua] XTRIM "bull:myQueue:events" "MAXLEN" "~" "10000"
13:01:15.341 [0 lua] SMEMBERS "bull:myQueue:stalled"
13:01:15.341 [0 lua] LRANGE "bull:myQueue:active" "0" "-1"
13:01:15.341 [0 76.67.116.197:45108] EVAL "--[[\n Move next job to be processed to active, lock it and fetch its data. The job\n may be delayed, in that case we need to move it to the delayed set instead.\n This operation guarantees that the worker owns the job during the lock\n expiration time. The worker is responsible of keeping the lock fresh\n so that no other worker picks this job again.\n Input:\n KEYS[1] wait key\n KEYS[2] active key\n KEYS[3] prioritized key\n KEYS[4] stream events key\n KEYS[5] stalled key\n -- Rate limiting\n KEYS[6] rate limiter key\n KEYS[7] delayed key\n -- Delayed jobs\n KEYS[8] paused key\n KEYS[9] meta key\n KEYS[10] pc priority counter\n -- Marker\n KEYS[11] marker key\n -- Arguments\n ARGV[1] key prefix\n ARGV[2] timestamp\n ARGV[3] opts\n opts - token - lock token\n opts - lockDuration\n opts - limiter\n]]\nlocal rcall = redis.call\nlocal waitKey = KEYS[1]\nlocal activeKey = KEYS[2]\nlocal eventStreamKey = KEYS[4]\nlocal rateLimiterKey = KEYS[6]\nlocal delayedKey = KEYS[7]\nlocal opts = cmsgpack.unpack(ARGV[3])\n-- Includes\n--[[\n Function to return the next delayed job timestamp.\n]]\nlocal function getNextDelayedTimestamp(delayedKey)\n local result = rcall(\"ZRANGE\", delayedKey, 0, 0, \"WITHSCORES\")\n if #result then\n local nextTimestamp = tonumber(result[2])\n if nextTimestamp ~= nil then \n return nextTimestamp / 0x1000\n end\n end\nend\n--[[\n Function to get current rate limit ttl.\n]]\nlocal function getRateLimitTTL(maxJobs, rateLimiterKey)\n if maxJobs and maxJobs <= tonumber(rcall(\"GET\", rateLimiterKey) or 0) then\n local pttl = rcall(\"PTTL\", rateLimiterKey)\n if pttl == 0 then\n rcall(\"DEL\", rateLimiterKey)\n end\n if pttl > 0 then\n return pttl\n end\n end\n return 0\nend\n--[[\n Function to check for the meta.paused key to decide if we are paused or not\n (since an empty list and !EXISTS are not really the same).\n]]\nlocal function getTargetQueueList(queueMetaKey, activeKey, waitKey, pausedKey)\n local queueAttributes = rcall(\"HMGET\", queueMetaKey, \"paused\", \"concurrency\")\n if queueAttributes[1] then\n return pausedKey, true\n else\n if queueAttributes[2] then\n local activeCount = rcall(\"LLEN\", activeKey)\n if activeCount >= tonumber(queueAttributes[2]) then\n return waitKey, true\n else\n return waitKey, false\n end\n end\n end\n return waitKey, false\nend\n--[[\n Function to move job from prioritized state to active.\n]]\nlocal function moveJobFromPriorityToActive(priorityKey, activeKey, priorityCounterKey)\n local prioritizedJob = rcall(\"ZPOPMIN\", priorityKey)\n if #prioritizedJob > 0 then\n rcall(\"LPUSH\", activeKey, prioritizedJob[1])\n return prioritizedJob[1]\n else\n rcall(\"DEL\", priorityCounterKey)\n end\nend\n--[[\n Function to move job from wait state to active.\n Input:\n opts - token - lock token\n opts - lockDuration\n opts - limiter\n]]\n-- Includes\n--[[\n Add marker if needed when a job is available.\n]]\nlocal function addBaseMarkerIfNeeded(markerKey, isPausedOrMaxed)\n if not isPausedOrMaxed then\n rcall(\"ZADD\", markerKey, 0, \"0\")\n end \nend\nlocal function prepareJobForProcessing(keyPrefix, rateLimiterKey, eventStreamKey,\n jobId, processedOn, maxJobs, markerKey, opts)\n local jobKey = keyPrefix .. jobId\n -- Check if we need to perform rate limiting.\n if maxJobs then\n local jobCounter = tonumber(rcall(\"INCR\", rateLimiterKey))\n if jobCounter == 1 then\n local limiterDuration = opts['limiter'] and opts['limiter']['duration']\n local integerDuration = math.floor(math.abs(limiterDuration))\n rcall(\"PEXPIRE\", rateLimiterKey, integerDuration)\n end\n end\n local lockKey = jobKey .. ':lock'\n -- get a lock\n if opts['token'] ~= \"0\" then\n rcall(\"SET\", lockKey, opts['token'], \"PX\", opts['lockDuration'])\n end\n local optionalValues = {}\n if opts['name'] then\n -- Set \"processedBy\" field to the worker name\n table.insert(optionalValues, \"pb\")\n table.insert(optionalValues, opts['name'])\n end\n rcall(\"XADD\", eventStreamKey, \"*\", \"event\", \"active\", \"jobId\", jobId, \"prev\", \"waiting\")\n rcall(\"HMSET\", jobKey, \"processedOn\", processedOn, unpack(optionalValues))\n rcall(\"HINCRBY\", jobKey, \"ats\", 1)\n addBaseMarkerIfNeeded(markerKey, false)\n return {rcall(\"HGETALL\", jobKey), jobId, 0, 0} -- get job data\nend\n--[[\n Updates the delay set, by moving delayed jobs that should\n be processed now to \"wait\".\n Events:\n 'waiting'\n]]\n-- Includes\n--[[\n Function to add job in target list and add marker if needed.\n]]\n-- Includes\nlocal function addJobInTargetList(targetKey, markerKey, pushCmd, isPausedOrMaxed, jobId)\n rcall(pushCmd, targetKey, jobId)\n addBaseMarkerIfNeeded(markerKey, isPausedOrMaxed)\nend\n--[[\n Function to add job considering priority.\n]]\n-- Includes\nlocal function addJobWithPriority(markerKey, prioritizedKey, priority, jobId, priorityCounterKey,\n isPausedOrMaxed)\n local prioCounter = rcall(\"INCR\", priorityCounterKey)\n local score = priority * 0x100000000 + prioCounter % 0x100000000\n rcall(\"ZADD\", prioritizedKey, score, jobId)\n addBaseMarkerIfNeeded(markerKey, isPausedOrMaxed)\nend\n-- Try to get as much as 1000 jobs at once\nlocal function promoteDelayedJobs(delayedKey, markerKey, targetKey, prioritizedKey,\n eventStreamKey, prefix, timestamp, priorityCounterKey, isPaused)\n local jobs = rcall(\"ZRANGEBYSCORE\", delayedKey, 0, (timestamp + 1) * 0x1000 - 1, \"LIMIT\", 0, 1000)\n if (#jobs > 0) then\n rcall(\"ZREM\", delayedKey, unpack(jobs))\n for _, jobId in ipairs(jobs) do\n local jobKey = prefix .. jobId\n local priority =\n tonumber(rcall(\"HGET\", jobKey, \"priority\")) or 0\n if priority == 0 then\n -- LIFO or FIFO\n addJobInTargetList(targetKey, markerKey, \"LPUSH\", isPaused, jobId)\n else\n addJobWithPriority(markerKey, prioritizedKey, priority,\n jobId, priorityCounterKey, isPaused)\n end\n -- Emit waiting event\n rcall(\"XADD\", eventStreamKey, \"*\", \"event\", \"waiting\", \"jobId\",\n jobId, \"prev\", \"delayed\")\n rcall(\"HSET\", jobKey, \"delay\", 0)\n end\n end\nend\nlocal target, isPausedOrMaxed = getTargetQueueList(KEYS[9], activeKey, waitKey, KEYS[8])\n-- Check if there are delayed jobs that we can move to wait.\nlocal markerKey = KEYS[11]\npromoteDelayedJobs(delayedKey, markerKey, target, KEYS[3], eventStreamKey, ARGV[1],\n ARGV[2], KEYS[10], isPausedOrMaxed)\nlocal maxJobs = tonumber(opts['limiter'] and opts['limiter']['max'])\nlocal expireTime = getRateLimitTTL(maxJobs, rateLimiterKey)\n-- Check if we are rate limited first.\nif expireTime > 0 then return {0, 0, expireTime, 0} end\n-- paused or maxed queue\nif isPausedOrMaxed then return {0, 0, 0, 0} end\n-- no job ID, try non-blocking move from wait to active\nlocal jobId = rcall(\"RPOPLPUSH\", waitKey, activeKey)\n-- Markers in waitlist DEPRECATED in v5: Will be completely removed in v6.\nif jobId and string.sub(jobId, 1, 2) == \"0:\" then\n rcall(\"LREM\", activeKey, 1, jobId)\n jobId = rcall(\"RPOPLPUSH\", waitKey, activeKey)\nend\nif jobId then\n return prepareJobForProcessing(ARGV[1], rateLimiterKey, eventStreamKey, jobId, ARGV[2],\n maxJobs, markerKey, opts)\nelse\n jobId = moveJobFromPriorityToActive(KEYS[3], activeKey, KEYS[10])\n if jobId then\n return prepareJobForProcessing(ARGV[1], rateLimiterKey, eventStreamKey, jobId, ARGV[2],\n maxJobs, markerKey, opts)\n end\nend\n-- Return the timestamp for the next delayed job if any.\nlocal nextTimestamp = getNextDelayedTimestamp(delayedKey)\nif nextTimestamp ~= nil then return {0, 0, 0, nextTimestamp} end\nreturn {0, 0, 0, 0}\n" "11" "bull:myQueue:wait" "bull:myQueue:active" "bull:myQueue:prioritized" "bull:myQueue:events" "bull:myQueue:stalled" "bull:myQueue:limiter" "bull:myQueue:delayed" "bull:myQueue:paused" "bull:myQueue:meta" "bull:myQueue:pc" "bull:myQueue:marker" "bull:myQueue:" "1734199275268" "\xde\x00\x04\xa5token\xd9&6faea81d-b31e-4213-9ed1-f5e317b9e65c:0\xaclockDuration\xcdu0\xa7limiter\xc0\xa4name\xc0"
13:01:15.342 [0 lua] HMGET "bull:myQueue:meta" "paused" "concurrency"
13:01:15.342 [0 lua] ZRANGEBYSCORE "bull:myQueue:delayed" "0" "7103280231501823" "LIMIT" "0" "1000"
13:01:15.342 [0 lua] RPOPLPUSH "bull:myQueue:wait" "bull:myQueue:active"
13:01:15.342 [0 lua] SET "bull:myQueue:1:lock" "6faea81d-b31e-4213-9ed1-f5e317b9e65c:0" "PX" "30000"
13:01:15.342 [0 lua] XADD "bull:myQueue:events" "*" "event" "active" "jobId" "1" "prev" "waiting"
13:01:15.342 [0 lua] HMSET "bull:myQueue:1" "processedOn" "1734199275268"
13:01:15.342 [0 lua] HINCRBY "bull:myQueue:1" "ats" "1"
13:01:15.342 [0 lua] ZADD "bull:myQueue:marker" "0" "0"
13:01:15.342 [0 lua] HGETALL "bull:myQueue:1"
13:01:15.421 [0 76.67.116.197:45108] EVAL "--[[\n Move job from active to a finished status (completed o failed)\n A job can only be moved to completed if it was active.\n The job must be locked before it can be moved to a finished status,\n and the lock must be released in this script.\n Input:\n KEYS[1] wait key\n KEYS[2] active key\n KEYS[3] prioritized key\n KEYS[4] event stream key\n KEYS[5] stalled key\n -- Rate limiting\n KEYS[6] rate limiter key\n KEYS[7] delayed key\n KEYS[8] paused key\n KEYS[9] meta key\n KEYS[10] pc priority counter\n KEYS[11] completed/failed key\n KEYS[12] jobId key\n KEYS[13] metrics key\n KEYS[14] marker key\n ARGV[1] jobId\n ARGV[2] timestamp\n ARGV[3] msg property returnvalue / failedReason\n ARGV[4] return value / failed reason\n ARGV[5] target (completed/failed)\n ARGV[6] fetch next?\n ARGV[7] keys prefix\n ARGV[8] opts\n ARGV[9] job fields to update\n opts - token - lock token\n opts - keepJobs\n opts - lockDuration - lock duration in milliseconds\n opts - attempts max attempts\n opts - maxMetricsSize\n opts - fpof - fail parent on fail\n opts - idof - ignore dependency on fail\n opts - rdof - remove dependency on fail\n Output:\n 0 OK\n -1 Missing key.\n -2 Missing lock.\n -3 Job not in active set\n -4 Job has pending dependencies\n -6 Lock is not owned by this client\n Events:\n 'completed/failed'\n]]\nlocal rcall = redis.call\n--- Includes\n--[[\n Functions to collect metrics based on a current and previous count of jobs.\n Granualarity is fixed at 1 minute.\n]] \n--[[\n Function to loop in batches.\n Just a bit of warning, some commands as ZREM\n could receive a maximum of 7000 parameters per call.\n]]\nlocal function batches(n, batchSize)\n local i = 0\n return function()\n local from = i * batchSize + 1\n i = i + 1\n if (from <= n) then\n local to = math.min(from + batchSize - 1, n)\n return from, to\n end\n end\nend\nlocal function collectMetrics(metaKey, dataPointsList, maxDataPoints,\n timestamp)\n -- Increment current count\n local count = rcall(\"HINCRBY\", metaKey, \"count\", 1) - 1\n -- Compute how many data points we need to add to the list, N.\n local prevTS = rcall(\"HGET\", metaKey, \"prevTS\")\n if not prevTS then\n -- If prevTS is nil, set it to the current timestamp\n rcall(\"HSET\", metaKey, \"prevTS\", timestamp, \"prevCount\", 0)\n return\n end\n local N = math.min(math.floor(timestamp / 60000) - math.floor(prevTS / 60000), tonumber(maxDataPoints))\n if N > 0 then\n local delta = count - rcall(\"HGET\", metaKey, \"prevCount\")\n -- If N > 1, add N-1 zeros to the list\n if N > 1 then\n local points = {}\n points[1] = delta\n for i = 2, N do\n points[i] = 0\n end\n for from, to in batches(#points, 7000) do\n rcall(\"LPUSH\", dataPointsList, unpack(points, from, to))\n end\n else\n -- LPUSH delta to the list\n rcall(\"LPUSH\", dataPointsList, delta)\n end\n -- LTRIM to keep list to its max size\n rcall(\"LTRIM\", dataPointsList, 0, maxDataPoints - 1)\n -- update prev count with current count\n rcall(\"HSET\", metaKey, \"prevCount\", count, \"prevTS\", timestamp)\n end\nend\n--[[\n Function to return the next delayed job timestamp.\n]]\nlocal function getNextDelayedTimestamp(delayedKey)\n local result = rcall(\"ZRANGE\", delayedKey, 0, 0, \"WITHSCORES\")\n if #result then\n local nextTimestamp = tonumber(result[2])\n if nextTimestamp ~= nil then \n return nextTimestamp / 0x1000\n end\n end\nend\n--[[\n Function to get current rate limit ttl.\n]]\nlocal function getRateLimitTTL(maxJobs, rateLimiterKey)\n if maxJobs and maxJobs <= tonumber(rcall(\"GET\", rateLimiterKey) or 0) then\n local pttl = rcall(\"PTTL\", rateLimiterKey)\n if pttl == 0 then\n rcall(\"DEL\", rateLimiterKey)\n end\n if pttl > 0 then\n return pttl\n end\n end\n return 0\nend\n--[[\n Function to check for the meta.paused key to decide if we are paused or not\n (since an empty list and !EXISTS are not really the same).\n]]\nlocal function getTargetQueueList(queueMetaKey, activeKey, waitKey, pausedKey)\n local queueAttributes = rcall(\"HMGET\", queueMetaKey, \"paused\", \"concurrency\")\n if queueAttributes[1] then\n return pausedKey, true\n else\n if queueAttributes[2] then\n local activeCount = rcall(\"LLEN\", activeKey)\n if activeCount >= tonumber(queueAttributes[2]) then\n return waitKey, true\n else\n return waitKey, false\n end\n end\n end\n return waitKey, false\nend\n--[[\n Function to move job from prioritized state to active.\n]]\nlocal function moveJobFromPriorityToActive(priorityKey, activeKey, priorityCounterKey)\n local prioritizedJob = rcall(\"ZPOPMIN\", priorityKey)\n if #prioritizedJob > 0 then\n rcall(\"LPUSH\", activeKey, prioritizedJob[1])\n return prioritizedJob[1]\n else\n rcall(\"DEL\", priorityCounterKey)\n end\nend\n--[[\n Function to recursively move from waitingChildren to failed.\n]]\n-- Includes\n--[[\n Validate and move parent to active if needed.\n]]\n-- Includes\n--[[\n Add delay marker if needed.\n]]\n-- Includes\nlocal function addDelayMarkerIfNeeded(markerKey, delayedKey)\n local nextTimestamp = getNextDelayedTimestamp(delayedKey)\n if nextTimestamp ~= nil then\n -- Replace the score of the marker with the newest known\n -- next timestamp.\n rcall(\"ZADD\", markerKey, nextTimestamp, \"1\")\n end\nend\n--[[\n Function to add job in target list and add marker if needed.\n]]\n-- Includes\n--[[\n Add marker if needed when a job is available.\n]]\nlocal function addBaseMarkerIfNeeded(markerKey, isPausedOrMaxed)\n if not isPausedOrMaxed then\n rcall(\"ZADD\", markerKey, 0, \"0\")\n end \nend\nlocal function addJobInTargetList(targetKey, markerKey, pushCmd, isPausedOrMaxed, jobId)\n rcall(pushCmd, targetKey, jobId)\n addBaseMarkerIfNeeded(markerKey, isPausedOrMaxed)\nend\n--[[\n Function to add job considering priority.\n]]\n-- Includes\nlocal function addJobWithPriority(markerKey, prioritizedKey, priority, jobId, priorityCounterKey,\n isPausedOrMaxed)\n local prioCounter = rcall(\"INCR\", priorityCounterKey)\n local score = priority * 0x100000000 + prioCounter % 0x100000000\n rcall(\"ZADD\", prioritizedKey, score, jobId)\n addBaseMarkerIfNeeded(markerKey, isPausedOrMaxed)\nend\n--[[\n Function to check if queue is paused or maxed\n (since an empty list and !EXISTS are not really the same).\n]]\nlocal function isQueuePausedOrMaxed(queueMetaKey, activeKey)\n local queueAttributes = rcall(\"HMGET\", queueMetaKey, \"paused\", \"concurrency\")\n if queueAttributes[1] then\n return true\n else\n if queueAttributes[2] then\n local activeCount = rcall(\"LLEN\", activeKey)\n return activeCount >= tonumber(queueAttributes[2])\n end\n end\n return false\nend\nlocal function moveParentToWaitIfNeeded(parentQueueKey, parentDependenciesKey,\n parentKey, parentId, timestamp)\n local isParentActive = rcall(\"ZSCORE\",\n parentQueueKey .. \":waiting-children\", parentId)\n if rcall(\"SCARD\", parentDependenciesKey) == 0 and isParentActive then\n rcall(\"ZREM\", parentQueueKey .. \":waiting-children\", parentId)\n local parentWaitKey = parentQueueKey .. \":wait\"\n local parentPausedKey = parentQueueKey .. \":paused\"\n local parentActiveKey = parentQueueKey .. \":active\"\n local parentMetaKey = parentQueueKey .. \":meta\"\n local parentMarkerKey = parentQueueKey .. \":marker\"\n local jobAttributes = rcall(\"HMGET\", parentKey, \"priority\", \"delay\")\n local priority = tonumber(jobAttributes[1]) or 0\n local delay = tonumber(jobAttributes[2]) or 0\n if delay > 0 then\n local delayedTimestamp = tonumber(timestamp) + delay\n local score = delayedTimestamp * 0x1000\n local parentDelayedKey = parentQueueKey .. \":delayed\"\n rcall(\"ZADD\", parentDelayedKey, score, parentId)\n rcall(\"XADD\", parentQueueKey .. \":events\", \"*\", \"event\", \"delayed\",\n \"jobId\", parentId, \"delay\", delayedTimestamp)\n addDelayMarkerIfNeeded(parentMarkerKey, parentDelayedKey)\n else\n if priority == 0 then\n local parentTarget, isParentPausedOrMaxed =\n getTargetQueueList(parentMetaKey, parentActiveKey, parentWaitKey,\n parentPausedKey)\n addJobInTargetList(parentTarget, parentMarkerKey, \"RPUSH\", isParentPausedOrMaxed,\n parentId)\n else\n local isPausedOrMaxed = isQueuePausedOrMaxed(parentMetaKey, parentActiveKey)\n addJobWithPriority(parentMarkerKey,\n parentQueueKey .. \":prioritized\", priority,\n parentId, parentQueueKey .. \":pc\", isPausedOrMaxed)\n end\n rcall(\"XADD\", parentQueueKey .. \":events\", \"*\", \"event\", \"waiting\",\n \"jobId\", parentId, \"prev\", \"waiting-children\")\n end\n end\nend\n--[[\n Function to remove deduplication key if needed.\n]]\nlocal function removeDeduplicationKeyIfNeeded(prefixKey, deduplicationId)\n if deduplicationId then\n local deduplicationKey = prefixKey .. \"de:\" .. deduplicationId\n local pttl = rcall(\"PTTL\", deduplicationKey)\n if pttl == 0 or pttl == -1 then\n rcall(\"DEL\", deduplicationKey)\n end\n end\nend\n--[[\n Functions to remove jobs when removeOnFail option is provided.\n]]\n-- Includes\n--[[\n Function to remove job.\n]]\n-- Includes\n--[[\n Function to remove deduplication key.\n]]\nlocal function removeDeduplicationKey(prefixKey, jobKey)\n local deduplicationId = rcall(\"HGET\", jobKey, \"deid\")\n if deduplicationId then\n local deduplicationKey = prefixKey .. \"de:\" .. deduplicationId\n rcall(\"DEL\", deduplicationKey)\n end\nend\n--[[\n Function to remove job keys.\n]]\nlocal function removeJobKeys(jobKey)\n return rcall(\"DEL\", jobKey, jobKey .. ':logs',\n jobKey .. ':dependencies', jobKey .. ':processed', jobKey .. ':failed')\nend\n--[[\n Check if this job has a parent. If so we will just remove it from\n the parent child list, but if it is the last child we should move the parent to \"wait/paused\"\n which requires code from \"moveToFinished\"\n]]\n-- Includes\n--[[\n Functions to destructure job key.\n Just a bit of warning, these functions may be a bit slow and affect performance significantly.\n]]\nlocal getJobIdFromKey = function (jobKey)\n return string.match(jobKey, \".*:(.*)\")\nend\nlocal getJobKeyPrefix = function (jobKey, jobId)\n return string.sub(jobKey, 0, #jobKey - #jobId)\nend\nlocal function moveParentToWait(parentPrefix, parentId, emitEvent)\n local parentTarget, isPausedOrMaxed = getTargetQueueList(parentPrefix .. \"meta\", parentPrefix .. \"active\",\n parentPrefix .. \"wait\", parentPrefix .. \"paused\")\n addJobInTargetList(parentTarget, parentPrefix .. \"marker\", \"RPUSH\", isPausedOrMaxed, parentId)\n if emitEvent then\n local parentEventStream = parentPrefix .. \"events\"\n rcall(\"XADD\", parentEventStream, \"*\", \"event\", \"waiting\", \"jobId\", parentId, \"prev\", \"waiting-children\")\n end\nend\nlocal function removeParentDependencyKey(jobKey, hard, parentKey, baseKey, debounceId)\n if parentKey then\n local parentDependenciesKey = parentKey .. \":dependencies\"\n local result = rcall(\"SREM\", parentDependenciesKey, jobKey)\n if result > 0 then\n local pendingDependencies = rcall(\"SCARD\", parentDependenciesKey)\n if pendingDependencies == 0 then\n local parentId = getJobIdFromKey(parentKey)\n local parentPrefix = getJobKeyPrefix(parentKey, parentId)\n local numRemovedElements = rcall(\"ZREM\", parentPrefix .. \"waiting-children\", parentId)\n if numRemovedElements == 1 then\n if hard then -- remove parent in same queue\n if parentPrefix == baseKey then\n removeParentDependencyKey(parentKey, hard, nil, baseKey, nil)\n removeJobKeys(parentKey)\n if debounceId then\n rcall(\"DEL\", parentPrefix .. \"de:\" .. debounceId)\n end\n else\n moveParentToWait(parentPrefix, parentId)\n end\n else\n moveParentToWait(parentPrefix, parentId, true)\n end\n end\n end\n return true\n end\n else\n local parentAttributes = rcall(\"HMGET\", jobKey, \"parentKey\", \"deid\")\n local missedParentKey = parentAttributes[1]\n if( (type(missedParentKey) == \"string\") and missedParentKey ~= \"\"\n and (rcall(\"EXISTS\", missedParentKey) == 1)) then\n local parentDependenciesKey = missedParentKey .. \":dependencies\"\n local result = rcall(\"SREM\", parentDependenciesKey, jobKey)\n if result > 0 then\n local pendingDependencies = rcall(\"SCARD\", parentDependenciesKey)\n if pendingDependencies == 0 then\n local parentId = getJobIdFromKey(missedParentKey)\n local parentPrefix = getJobKeyPrefix(missedParentKey, parentId)\n local numRemovedElements = rcall(\"ZREM\", parentPrefix .. \"waiting-children\", parentId)\n if numRemovedElements == 1 then\n if hard then\n if parentPrefix == baseKey then\n removeParentDependencyKey(missedParentKey, hard, nil, baseKey, nil)\n removeJobKeys(missedParentKey)\n if parentAttributes[2] then\n rcall(\"DEL\", parentPrefix .. \"de:\" .. parentAttributes[2])\n end\n else\n moveParentToWait(parentPrefix, parentId)\n end\n else\n moveParentToWait(parentPrefix, parentId, true)\n end\n end\n end\n return true\n end\n end\n end\n return false\nend\nlocal function removeJob(jobId, hard, baseKey, shouldRemoveDeduplicationKey)\n local jobKey = baseKey .. jobId\n removeParentDependencyKey(jobKey, hard, nil, baseKey)\n if shouldRemoveDeduplicationKey then\n removeDeduplicationKey(baseKey, jobKey)\n end\n removeJobKeys(jobKey)\nend\n--[[\n Functions to remove jobs by max age.\n]]\n-- Includes\nlocal function removeJobsByMaxAge(timestamp, maxAge, targetSet, prefix,\n shouldRemoveDebounceKey)\n local start = timestamp - maxAge * 1000\n local jobIds = rcall(\"ZREVRANGEBYSCORE\", targetSet, start, \"-inf\")\n for i, jobId in ipairs(jobIds) do\n removeJob(jobId, false, prefix, false --[[remove debounce key]])\n end\n rcall(\"ZREMRANGEBYSCORE\", targetSet, \"-inf\", start)\nend\n--[[\n Functions to remove jobs by max count.\n]]\n-- Includes\nlocal function removeJobsByMaxCount(maxCount, targetSet, prefix)\n local start = maxCount\n local jobIds = rcall(\"ZREVRANGE\", targetSet, start, -1)\n for i, jobId in ipairs(jobIds) do\n removeJob(jobId, false, prefix, false --[[remove debounce key]])\n end\n rcall(\"ZREMRANGEBYRANK\", targetSet, 0, -(maxCount + 1))\nend\nlocal function removeJobsOnFail(queueKeyPrefix, failedKey, jobId, opts, timestamp)\n local removeOnFailType = type(opts[\"removeOnFail\"])\n if removeOnFailType == \"number\" then\n removeJobsByMaxCount(opts[\"removeOnFail\"],\n failedKey, queueKeyPrefix)\n elseif removeOnFailType == \"boolean\" then\n if opts[\"removeOnFail\"] then\n removeJob(jobId, false, queueKeyPrefix,\n false --[[remove debounce key]])\n rcall(\"ZREM\", failedKey, jobId)\n end\n elseif removeOnFailType ~= \"nil\" then\n local maxAge = opts[\"removeOnFail\"][\"age\"]\n local maxCount = opts[\"removeOnFail\"][\"count\"]\n if maxAge ~= nil then\n removeJobsByMaxAge(timestamp, maxAge,\n failedKey, queueKeyPrefix)\n end\n if maxCount ~= nil and maxCount > 0 then\n removeJobsByMaxCount(maxCount, failedKey,\n queueKeyPrefix)\n end\n end \nend\nlocal function moveParentFromWaitingChildrenToFailed( parentQueueKey, parentKey, parentId, jobIdKey, timestamp)\n if rcall(\"ZREM\", parentQueueKey .. \":waiting-children\", parentId) == 1 then\n local parentQueuePrefix = parentQueueKey .. \":\"\n local parentFailedKey = parentQueueKey .. \":failed\"\n rcall(\"ZADD\", parentFailedKey, timestamp, parentId)\n local failedReason = \"child \" .. jobIdKey .. \" failed\"\n rcall(\"HMSET\", parentKey, \"failedReason\", failedReason, \"finishedOn\", timestamp)\n rcall(\"XADD\", parentQueueKey .. \":events\", \"*\", \"event\", \"failed\", \"jobId\", parentId, \"failedReason\",\n failedReason, \"prev\", \"waiting-children\")\n local jobAttributes = rcall(\"HMGET\", parentKey, \"parent\", \"deid\", \"opts\")\n removeDeduplicationKeyIfNeeded(parentQueueKey .. \":\", jobAttributes[2])\n if jobAttributes[1] then\n local parentData = cjson.decode(jobAttributes[1])\n if parentData['fpof'] then\n moveParentFromWaitingChildrenToFailed(\n parentData['queueKey'],\n parentData['queueKey'] .. ':' .. parentData['id'],\n parentData['id'],\n parentKey,\n timestamp\n )\n elseif parentData['idof'] or parentData['rdof'] then\n local grandParentKey = parentData['queueKey'] .. ':' .. parentData['id']\n local grandParentDependenciesSet = grandParentKey .. \":dependencies\"\n if rcall(\"SREM\", grandParentDependenciesSet, parentKey) == 1 then\n moveParentToWaitIfNeeded(parentData['queueKey'], grandParentDependenciesSet,\n grandParentKey, parentData['id'], timestamp)\n if parentData['idof'] then\n local grandParentFailedSet = grandParentKey .. \":failed\"\n rcall(\"HSET\", grandParentFailedSet, parentKey, failedReason)\n end\n end\n end\n end\n local parentRawOpts = jobAttributes[3]\n local parentOpts = cjson.decode(parentRawOpts)\n removeJobsOnFail(parentQueuePrefix, parentFailedKey, parentId, parentOpts, timestamp)\n end\nend\n--[[\n Function to move job from wait state to active.\n Input:\n opts - token - lock token\n opts - lockDuration\n opts - limiter\n]]\n-- Includes\nlocal function prepareJobForProcessing(keyPrefix, rateLimiterKey, eventStreamKey,\n jobId, processedOn, maxJobs, markerKey, opts)\n local jobKey = keyPrefix .. jobId\n -- Check if we need to perform rate limiting.\n if maxJobs then\n local jobCounter = tonumber(rcall(\"INCR\", rateLimiterKey))\n if jobCounter == 1 then\n local limiterDuration = opts['limiter'] and opts['limiter']['duration']\n local integerDuration = math.floor(math.abs(limiterDuration))\n rcall(\"PEXPIRE\", rateLimiterKey, integerDuration)\n end\n end\n local lockKey = jobKey .. ':lock'\n -- get a lock\n if opts['token'] ~= \"0\" then\n rcall(\"SET\", lockKey, opts['token'], \"PX\", opts['lockDuration'])\n end\n local optionalValues = {}\n if opts['name'] then\n -- Set \"processedBy\" field to the worker name\n table.insert(optionalValues, \"pb\")\n table.insert(optionalValues, opts['name'])\n end\n rcall(\"XADD\", eventStreamKey, \"*\", \"event\", \"active\", \"jobId\", jobId, \"prev\", \"waiting\")\n rcall(\"HMSET\", jobKey, \"processedOn\", processedOn, unpack(optionalValues))\n rcall(\"HINCRBY\", jobKey, \"ats\", 1)\n addBaseMarkerIfNeeded(markerKey, false)\n return {rcall(\"HGETALL\", jobKey), jobId, 0, 0} -- get job data\nend\n--[[\n Updates the delay set, by moving delayed jobs that should\n be processed now to \"wait\".\n Events:\n 'waiting'\n]]\n-- Includes\n-- Try to get as much as 1000 jobs at once\nlocal function promoteDelayedJobs(delayedKey, markerKey, targetKey, prioritizedKey,\n eventStreamKey, prefix, timestamp, priorityCounterKey, isPaused)\n local jobs = rcall(\"ZRANGEBYSCORE\", delayedKey, 0, (timestamp + 1) * 0x1000 - 1, \"LIMIT\", 0, 1000)\n if (#jobs > 0) then\n rcall(\"ZREM\", delayedKey, unpack(jobs))\n for _, jobId in ipairs(jobs) do\n local jobKey = prefix .. jobId\n local priority =\n tonumber(rcall(\"HGET\", jobKey, \"priority\")) or 0\n if priority == 0 then\n -- LIFO or FIFO\n addJobInTargetList(targetKey, markerKey, \"LPUSH\", isPaused, jobId)\n else\n addJobWithPriority(markerKey, prioritizedKey, priority,\n jobId, priorityCounterKey, isPaused)\n end\n -- Emit waiting event\n rcall(\"XADD\", eventStreamKey, \"*\", \"event\", \"waiting\", \"jobId\",\n jobId, \"prev\", \"delayed\")\n rcall(\"HSET\", jobKey, \"delay\", 0)\n end\n end\nend\nlocal function removeLock(jobKey, stalledKey, token, jobId)\n if token = \"0\" then\n local lockKey = jobKey .. ':lock'\n local lockToken = rcall(\"GET\", lockKey)\n if lockToken == token then\n rcall(\"DEL\", lockKey)\n rcall(\"SREM\", stalledKey, jobId)\n else\n if lockToken then\n -- Lock exists but token does not match\n return -6\n else\n -- Lock is missing completely\n return -2\n end\n end\n end\n return 0\nend\n--[[\n Function to trim events, default 10000.\n]]\n-- Includes\n--[[\n Function to get max events value or set by default 10000.\n]]\nlocal function getOrSetMaxEvents(metaKey)\n local maxEvents = rcall(\"HGET\", metaKey, \"opts.maxLenEvents\")\n if not maxEvents then\n maxEvents = 10000\n rcall(\"HSET\", metaKey, \"opts.maxLenEvents\", maxEvents)\n end\n return maxEvents\nend\nlocal function trimEvents(metaKey, eventStreamKey)\n local maxEvents = getOrSetMaxEvents(metaKey)\n if maxEvents = false then\n rcall(\"XTRIM\", eventStreamKey, \"MAXLEN\", \"\", maxEvents)\n else\n rcall(\"XTRIM\", eventStreamKey, \"MAXLEN\", \"\", 10000)\n end\nend\n--[[\n Validate and move or add dependencies to parent.\n]]\n-- Includes\nlocal function updateParentDepsIfNeeded(parentKey, parentQueueKey, parentDependenciesKey,\n parentId, jobIdKey, returnvalue, timestamp )\n local processedSet = parentKey .. \":processed\"\n rcall(\"HSET\", processedSet, jobIdKey, returnvalue)\n moveParentToWaitIfNeeded(parentQueueKey, parentDependenciesKey, parentKey, parentId, timestamp)\nend\n--[[\n Function to update a bunch of fields in a job.\n]]\nlocal function updateJobFields(jobKey, msgpackedFields)\n if msgpackedFields then\n local fieldsToUpdate = cmsgpack.unpack(msgpackedFields)\n if fieldsToUpdate then\n redis.call(\"HMSET\", jobKey, unpack(fieldsToUpdate))\n end\n end\nend\nlocal jobIdKey = KEYS[12]\nif rcall(\"EXISTS\", jobIdKey) == 1 then -- // Make sure job exists\n local opts = cmsgpack.unpack(ARGV[8])\n local token = opts['token']\n local errorCode = removeLock(jobIdKey, KEYS[5], token, ARGV[1])\n if errorCode < 0 then\n return errorCode\n end\n updateJobFields(jobIdKey, ARGV[9]);\n local attempts = opts['attempts']\n local maxMetricsSize = opts['maxMetricsSize']\n local maxCount = opts['keepJobs']['count']\n local maxAge = opts['keepJobs']['age']\n if rcall(\"SCARD\", jobIdKey .. \":dependencies\") ~= 0 then -- // Make sure it does not have pending dependencies\n return -4\n end\n local jobAttributes = rcall(\"HMGET\", jobIdKey, \"parentKey\", \"parent\", \"deid\")\n local parentKey = jobAttributes[1] or \"\"\n local parentId = \"\"\n local parentQueueKey = \"\"\n if jobAttributes[2] ~= false then\n local jsonDecodedParent = cjson.decode(jobAttributes[2])\n parentId = jsonDecodedParent['id']\n parentQueueKey = jsonDecodedParent['queueKey']\n end\n local jobId = ARGV[1]\n local timestamp = ARGV[2]\n -- Remove from active list (if not active we shall return error)\n local numRemovedElements = rcall(\"LREM\", KEYS[2], -1, jobId)\n if (numRemovedElements < 1) then return -3 end\n local eventStreamKey = KEYS[4]\n local metaKey = KEYS[9]\n -- Trim events before emiting them to avoid trimming events emitted in this script\n trimEvents(metaKey, eventStreamKey)\n local prefix = ARGV[7]\n removeDeduplicationKeyIfNeeded(prefix, jobAttributes[3])\n -- If job has a parent we need to\n -- 1) remove this job id from parents dependencies\n -- 2) move the job Id to parent \"processed\" set\n -- 3) push the results into parent \"results\" list\n -- 4) if parent's dependencies is empty, then move parent to \"wait/paused\". Note it may be a different queue!.\n if parentId == \"\" and parentKey ~= \"\" then\n parentId = getJobIdFromKey(parentKey)\n parentQueueKey = getJobKeyPrefix(parentKey, \":\" .. parentId)\n end\n if parentId ~= \"\" then\n if ARGV[5] == \"completed\" then\n local dependenciesSet = parentKey .. \":dependencies\"\n if rcall(\"SREM\", dependenciesSet, jobIdKey) == 1 then\n updateParentDepsIfNeeded(parentKey, parentQueueKey,\n dependenciesSet, parentId, jobIdKey,\n ARGV[4], timestamp)\n end\n else\n if opts['fpof'] then\n moveParentFromWaitingChildrenToFailed(parentQueueKey, parentKey,\n parentId, jobIdKey,\n timestamp)\n elseif opts['idof'] or opts['rdof'] then\n local dependenciesSet = parentKey .. \":dependencies\"\n if rcall(\"SREM\", dependenciesSet, jobIdKey) == 1 then\n moveParentToWaitIfNeeded(parentQueueKey, dependenciesSet,\n parentKey, parentId, timestamp)\n if opts['idof'] then\n local failedSet = parentKey .. \":failed\"\n rcall(\"HSET\", failedSet, jobIdKey, ARGV[4])\n end\n end\n end\n end\n end\n local attemptsMade = rcall(\"HINCRBY\", jobIdKey, \"atm\", 1)\n -- Remove job?\n if maxCount ~= 0 then\n local targetSet = KEYS[11]\n -- Add to complete/failed set\n rcall(\"ZADD\", targetSet, timestamp, jobId)\n rcall(\"HMSET\", jobIdKey, ARGV[3], ARGV[4], \"finishedOn\", timestamp)\n -- \"returnvalue\" / \"failedReason\" and \"finishedOn\"\n -- Remove old jobs?\n if maxAge ~= nil then\n removeJobsByMaxAge(timestamp, maxAge, targetSet, prefix)\n end\n if maxCount ~= nil and maxCount > 0 then\n removeJobsByMaxCount(maxCount, targetSet, prefix)\n end\n else\n removeJobKeys(jobIdKey)\n if parentKey ~= \"\" then\n -- TODO: when a child is removed when finished, result or failure in parent\n -- must not be deleted, those value references should be deleted when the parent\n -- is deleted\n removeParentDependencyKey(jobIdKey, false, parentKey, jobAttributes[3])\n end\n end\n rcall(\"XADD\", eventStreamKey, \"*\", \"event\", ARGV[5], \"jobId\", jobId, ARGV[3],\n ARGV[4])\n if ARGV[5] == \"failed\" then\n if tonumber(attemptsMade) >= tonumber(attempts) then\n rcall(\"XADD\", eventStreamKey, \"*\", \"event\", \"retries-exhausted\", \"jobId\",\n jobId, \"attemptsMade\", attemptsMade)\n end\n end\n -- Collect metrics\n if maxMetricsSize ~= \"\" then\n collectMetrics(KEYS[13], KEYS[13] .. ':data', maxMetricsSize, timestamp)\n end\n -- Try to get next job to avoid an extra roundtrip if the queue is not closing,\n -- and not rate limited.\n if (ARGV[6] == \"1\") then\n local target, isPausedOrMaxed = getTargetQueueList(metaKey, KEYS[2], KEYS[1], KEYS[8])\n local markerKey = KEYS[14]\n -- Check if there are delayed jobs that can be promoted\n promoteDelayedJobs(KEYS[7], markerKey, target, KEYS[3], eventStreamKey, prefix,\n timestamp, KEYS[10], isPausedOrMaxed)\n local maxJobs = tonumber(opts['limiter'] and opts['limiter']['max'])\n -- Check if we are rate limited first.\n local expireTime = getRateLimitTTL(maxJobs, KEYS[6])\n if expireTime > 0 then return {0, 0, expireTime, 0} end\n -- paused or maxed queue\n if isPausedOrMaxed then return {0, 0, 0, 0} end\n jobId = rcall(\"RPOPLPUSH\", KEYS[1], KEYS[2])\n if jobId then\n -- Markers in waitlist DEPRECATED in v5: Remove in v6.\n if string.sub(jobId, 1, 2) == \"0:\" then\n rcall(\"LREM\", KEYS[2], 1, jobId)\n -- If jobId is special ID 0:delay (delay greater than 0), then there is no job to process\n -- but if ID is 0:0, then there is at least 1 prioritized job to process\n if jobId == \"0:0\" then\n jobId = moveJobFromPriorityToActive(KEYS[3], KEYS[2],\n KEYS[10])\n return prepareJobForProcessing(prefix, KEYS[6], eventStreamKey, jobId,\n timestamp, maxJobs, markerKey,\n opts)\n end\n else\n return prepareJobForProcessing(prefix, KEYS[6], eventStreamKey, jobId,\n timestamp, maxJobs, markerKey,\n opts)\n end\n else\n jobId = moveJobFromPriorityToActive(KEYS[3], KEYS[2], KEYS[10])\n if jobId then\n return prepareJobForProcessing(prefix, KEYS[6], eventStreamKey, jobId,\n timestamp, maxJobs, markerKey,\n opts)\n end\n end\n -- Return the timestamp for the next delayed job if any.\n local nextTimestamp = getNextDelayedTimestamp(KEYS[7])\n if nextTimestamp ~= nil then\n -- The result is guaranteed to be positive, since the\n -- ZRANGEBYSCORE command would have return a job otherwise.\n return {0, 0, 0, nextTimestamp}\n end\n end\n local waitLen = rcall(\"LLEN\", KEYS[1])\n if waitLen == 0 then\n local activeLen = rcall(\"LLEN\", KEYS[2])\n if activeLen == 0 then\n local prioritizedLen = rcall(\"ZCARD\", KEYS[3])\n if prioritizedLen == 0 then\n rcall(\"XADD\", eventStreamKey, \"*\", \"event\", \"drained\")\n end\n end\n end\n return 0\nelse\n return -1\nend\n" "14" "bull:myQueue:wait" "bull:myQueue:active" "bull:myQueue:prioritized" "bull:myQueue:events" "bull:myQueue:stalled" "bull:myQueue:limiter" "bull:myQueue:delayed" "bull:myQueue:paused" "bull:myQueue:meta" "bull:myQueue:pc" "bull:myQueue:completed" "bull:myQueue:1" "bull:myQueue:metrics:completed" "bull:myQueue:marker" "1" "1734199275407" "returnvalue" "{\"message\":\"Hello BullMQ!\"}" "completed" "1" "bull:myQueue:" "\xde\x00\t\xa5token\xd9&6faea81d-b31e-4213-9ed1-f5e317b9e65c:0\xa8keepJobs\xde\x00\x01\xa5count\xff\xa7limiter\xc0\xaclockDuration\xcdu0\xa8attempts\x00\xaemaxMetricsSize\xa0\xa4fpof¤idof¤rdof\xc2" ""
13:01:15.422 [0 lua] EXISTS "bull:myQueue:1"
13:01:15.422 [0 lua] GET "bull:myQueue:1:lock"
13:01:15.422 [0 lua] DEL "bull:myQueue:1:lock"
13:01:15.422 [0 lua] SREM "bull:myQueue:stalled" "1"
13:01:15.489 [0 76.67.116.197:45108] EVALSHA "732e4299a1b577e428d6a3794d5ca9104e175f64" "14" "bull:myQueue:wait" "bull:myQueue:active" "bull:myQueue:prioritized" "bull:myQueue:events" "bull:myQueue:stalled" "bull:myQueue:limiter" "bull:myQueue:delayed" "bull:myQueue:paused" "bull:myQueue:meta" "bull:myQueue:pc" "bull:myQueue:failed" "bull:myQueue:1" "bull:myQueue:metrics:failed" "bull:myQueue:marker" "1" "1734199275477" "failedReason" "ERR Error running script: @cmsgpack:813: missing bytes stack traceback: [G]: in function 'error' @cmsgpack:813: in function 'underflow' @cmsgpack:529: in function 'unpack_cursor' @cmsgpack:843: in function 'unpack' @user_script:613: in function 'updateJobFields' @user_script:627: in main chunk [G]: ?" "failed" "1" "bull:myQueue:" "\xde\x00\t\xa5token\xd9&6faea81d-b31e-4213-9ed1-f5e317b9e65c:0\xa8keepJobs\xde\x00\x01\xa5count\xff\xa7limiter\xc0\xaclockDuration\xcdu0\xa8attempts\x00\xaemaxMetricsSize\xa0\xa4fpof¤idof¤rdof\xc2" "\x94\xacfailedReason\xda\x014ERR Error running script: @cmsgpack:813: missing bytes stack traceback: [G]: in function 'error' @cmsgpack:813: in function 'underflow' @cmsgpack:529: in function 'unpack_cursor' @cmsgpack:843: in function 'unpack' @user_script:613: in function 'updateJobFields' @user_script:627: in main chunk [G]: ?\xaastacktrace\xda\x02;[\"ReplyError: ERR Error running script: @cmsgpack:813: missing bytes stack traceback: [G]: in function 'error' @cmsgpack:813: in function 'underflow' @cmsgpack:529: in function 'unpack_cursor' @cmsgpack:843: in function 'unpack' @user_script:613: in function 'updateJobFields' @user_script:627: in main chunk [G]: ?\\n at parseError (/home/go/r/bullmq/node_modules/.pnpm/[email protected]/node_modules/redis-parser/lib/parser.js:179:12)\\n at parseType (/home/go/r/bullmq/node_modules/.pnpm/[email protected]/node_modules/redis-parser/lib/parser.js:302:14)\"]"
13:01:15.489 [0 lua] EXISTS "bull:myQueue:1"
13:01:15.489 [0 lua] GET "bull:myQueue:1:lock"
13:01:15.513 [0 76.67.116.197:45108] EVALSHA "c7dfdf24d9f50df790149cf0ca449bb227913dbc" "11" "bull:myQueue:wait" "bull:myQueue:active" "bull:myQueue:prioritized" "bull:myQueue:events" "bull:myQueue:stalled" "bull:myQueue:limiter" "bull:myQueue:delayed" "bull:myQueue:paused" "bull:myQueue:meta" "bull:myQueue:pc" "bull:myQueue:marker" "bull:myQueue:" "1734199275503" "\xde\x00\x04\xa5token\xd9&6faea81d-b31e-4213-9ed1-f5e317b9e65c:1\xaclockDuration\xcdu0\xa7limiter\xc0\xa4name\xc0"
13:01:15.514 [0 lua] HMGET "bull:myQueue:meta" "paused" "concurrency"
13:01:15.514 [0 lua] ZRANGEBYSCORE "bull:myQueue:delayed" "0" "7103280232464383" "LIMIT" "0" "1000"
13:01:15.514 [0 lua] RPOPLPUSH "bull:myQueue:wait" "bull:myQueue:active"
13:01:15.514 [0 lua] ZPOPMIN "bull:myQueue:prioritized"
13:01:15.514 [0 lua] DEL "bull:myQueue:pc"
13:01:15.514 [0 lua] ZRANGE "bull:myQueue:delayed" "0" "0" "WITHSCORES"
13:01:15.538 [0 76.67.116.197:45098] BZPOPMIN "bull:myQueue:marker" "5"
13:01:15.561 [0 76.67.116.197:45108] EVALSHA "c7dfdf24d9f50df790149cf0ca449bb227913dbc" "11" "bull:myQueue:wait" "bull:myQueue:active" "bull:myQueue:prioritized" "bull:myQueue:events" "bull:myQueue:stalled" "bull:myQueue:limiter" "bull:myQueue:delayed" "bull:myQueue:paused" "bull:myQueue:meta" "bull:myQueue:pc" "bull:myQueue:marker" "bull:myQueue:" "1734199275550" "\xde\x00\x04\xa5token\xd9&6faea81d-b31e-4213-9ed1-f5e317b9e65c:2\xaclockDuration\xcdu0\xa7limiter\xc0\xa4name\xc0"
13:01:15.561 [0 lua] HMGET "bull:myQueue:meta" "paused" "concurrency"
13:01:15.561 [0 lua] ZRANGEBYSCORE "bull:myQueue:delayed" "0" "7103280232656895" "LIMIT" "0" "1000"
13:01:15.561 [0 lua] RPOPLPUSH "bull:myQueue:wait" "bull:myQueue:active"
13:01:15.561 [0 lua] ZPOPMIN "bull:myQueue:prioritized"
13:01:15.561 [0 lua] DEL "bull:myQueue:pc"
13:01:15.561 [0 lua] ZRANGE "bull:myQueue:delayed" "0" "0" "WITHSCORES"
13:01:20.584 [0 76.67.116.197:45098] BZPOPMIN "bull:myQueue:marker" "5"
Pause

@manast
Copy link
Contributor

manast commented Dec 14, 2024

Thanks, this was useful.

@manast
Copy link
Contributor

manast commented Dec 14, 2024

I have a fix that I hope will solve this for you. The reason I think this fails in Upstash is because they use a different LUA engine with a different version of msgpack.

@manast
Copy link
Contributor

manast commented Dec 14, 2024

Please try again with v5.34.2

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants