diff --git a/docs/gitbook/guide/workers/concurrency.md b/docs/gitbook/guide/workers/concurrency.md index 25c01899f9..31bd620d1d 100644 --- a/docs/gitbook/guide/workers/concurrency.md +++ b/docs/gitbook/guide/workers/concurrency.md @@ -2,9 +2,29 @@ There are basically two ways to achieve concurrency with BullMQ. You can run a worker with a concurrency factor larger than 1 \(which is the default value\), or you can run several workers in different node processes. -#### Concurrency factor +#### Global Concurrency factor -The concurrency factor is a worker option that determines how many jobs are allowed to be processed in parallel. This means that the same worker is able to process several jobs in parallel, however the queue guarantees such as "at-least-once" and order of processing are still preserved. +The global concurrency factor is a queue option that determines how many jobs are allowed to be processed in parallel across all your worker instances. + +```typescript +import { Queue } from 'bullmq'; + +await queue.setGlobalConcurrency(4); +``` + +And in order to get this value: + +```typescript +const globalConcurrency = await queue.getGlobalConcurrency(); +``` + +{% hint style="info" %} +Note that if you choose a concurrency level in your workers, it will not override the global one, it will just be the maximum jobs a given worker can process in parallel but never more than the global one. +{% endhint %} + +#### Local Concurrency factor + +The local concurrency factor is a worker option that determines how many jobs are allowed to be processed in parallel for that instance. This means that the same worker is able to process several jobs in parallel, however the queue guarantees such as "at-least-once" and order of processing are still preserved. ```typescript import { Worker, Job } from 'bullmq'; diff --git a/package.json b/package.json index bee7d073b9..f7c9ede064 100644 --- a/package.json +++ b/package.json @@ -107,6 +107,7 @@ "nyc": "^15.1.0", "prettier": "^2.7.1", "pretty-quick": "^3.1.3", + "progress": "^2.0.3", "rimraf": "^3.0.2", "rrule": "^2.6.9", "semantic-release": "^19.0.3", diff --git a/python/bullmq/scripts.py b/python/bullmq/scripts.py index e48f067216..ef1851bb12 100644 --- a/python/bullmq/scripts.py +++ b/python/bullmq/scripts.py @@ -31,11 +31,11 @@ def __init__(self, prefix: str, queueName: str, redisConnection: RedisConnection self.redisConnection = redisConnection self.redisClient = redisConnection.conn self.commands = { - "addStandardJob": self.redisClient.register_script(self.getScript("addStandardJob-7.lua")), + "addStandardJob": self.redisClient.register_script(self.getScript("addStandardJob-8.lua")), "addDelayedJob": self.redisClient.register_script(self.getScript("addDelayedJob-6.lua")), "addParentJob": self.redisClient.register_script(self.getScript("addParentJob-4.lua")), - "addPrioritizedJob": self.redisClient.register_script(self.getScript("addPrioritizedJob-7.lua")), - "changePriority": self.redisClient.register_script(self.getScript("changePriority-6.lua")), + "addPrioritizedJob": self.redisClient.register_script(self.getScript("addPrioritizedJob-8.lua")), + "changePriority": self.redisClient.register_script(self.getScript("changePriority-7.lua")), "cleanJobsInSet": self.redisClient.register_script(self.getScript("cleanJobsInSet-2.lua")), "extendLock": self.redisClient.register_script(self.getScript("extendLock-2.lua")), "getCounts": self.redisClient.register_script(self.getScript("getCounts-1.lua")), @@ -51,11 +51,11 @@ def __init__(self, prefix: str, queueName: str, redisConnection: RedisConnection "moveToWaitingChildren": self.redisClient.register_script(self.getScript("moveToWaitingChildren-5.lua")), "obliterate": self.redisClient.register_script(self.getScript("obliterate-2.lua")), "pause": self.redisClient.register_script(self.getScript("pause-7.lua")), - "promote": self.redisClient.register_script(self.getScript("promote-8.lua")), - "removeJob": self.redisClient.register_script(self.getScript("removeJob-1.lua")), - "reprocessJob": self.redisClient.register_script(self.getScript("reprocessJob-7.lua")), + "promote": self.redisClient.register_script(self.getScript("promote-9.lua")), + "removeJob": self.redisClient.register_script(self.getScript("removeJob-2.lua")), + "reprocessJob": self.redisClient.register_script(self.getScript("reprocessJob-8.lua")), "retryJob": self.redisClient.register_script(self.getScript("retryJob-11.lua")), - "moveJobsToWait": self.redisClient.register_script(self.getScript("moveJobsToWait-7.lua")), + "moveJobsToWait": self.redisClient.register_script(self.getScript("moveJobsToWait-8.lua")), "saveStacktrace": self.redisClient.register_script(self.getScript("saveStacktrace-1.lua")), "updateData": self.redisClient.register_script(self.getScript("updateData-1.lua")), "updateProgress": self.redisClient.register_script(self.getScript("updateProgress-3.lua")), @@ -119,7 +119,7 @@ def addStandardJob(self, job: Job, timestamp: int, pipe = None): Add a standard job to the queue """ keys = self.getKeys(['wait', 'paused', 'meta', 'id', - 'completed', 'events', 'marker']) + 'completed', 'active', 'events', 'marker']) args = self.addJobArgs(job, None) args.append(timestamp) @@ -141,7 +141,7 @@ def addPrioritizedJob(self, job: Job, timestamp: int, pipe = None): Add a prioritized job to the queue """ keys = self.getKeys(['marker', 'meta', 'id', - 'prioritized', 'completed', 'events', 'pc']) + 'prioritized', 'completed', 'active', 'events', 'pc']) args = self.addJobArgs(job, None) args.append(timestamp) @@ -285,7 +285,7 @@ async def moveToDelayed(self, job_id: str, timestamp: int, delay: int, token: st return None def promoteArgs(self, job_id: str): - keys = self.getKeys(['delayed', 'wait', 'paused', 'meta', 'prioritized', 'pc', 'events', 'marker']) + keys = self.getKeys(['delayed', 'wait', 'paused', 'meta', 'prioritized', 'active', 'pc', 'events', 'marker']) keys.append(self.toKey(job_id)) keys.append(self.keys['events']) keys.append(self.keys['paused']) @@ -306,7 +306,7 @@ async def promote(self, job_id: str): return None def remove(self, job_id: str, remove_children: bool): - keys = self.getKeys(['']) + keys = self.getKeys(['', 'meta']) args = [job_id, 1 if remove_children else 0] return self.commands["removeJob"](keys=keys, args=args) @@ -363,6 +363,7 @@ async def changePriority(self, job_id: str, priority:int = 0, lifo:bool = False) self.keys['paused'], self.keys['meta'], self.keys['prioritized'], + self.keys['active'], self.keys['pc'], self.keys['marker']] @@ -394,6 +395,7 @@ async def reprocessJob(self, job: Job, state: str): keys.append(self.keys['wait']) keys.append(self.keys['meta']) keys.append(self.keys['paused']) + keys.append(self.keys['active']) keys.append(self.keys['marker']) args = [ @@ -434,7 +436,7 @@ async def obliterate(self, count: int, force: bool = False): def moveJobsToWaitArgs(self, state: str, count: int, timestamp: int) -> int: keys = self.getKeys( - ['', 'events', state, 'wait', 'paused', 'meta', 'marker']) + ['', 'events', state, 'wait', 'paused', 'meta', 'active', 'marker']) args = [count or 1000, timestamp or round(time.time()*1000), state] return (keys, args) diff --git a/src/classes/queue.ts b/src/classes/queue.ts index f21310b437..834aa53eab 100644 --- a/src/classes/queue.ts +++ b/src/classes/queue.ts @@ -183,6 +183,32 @@ export class Queue< }); } + /** + * Get global concurrency value. + * Returns null in case no value is set. + */ + async getGlobalConcurrency():Promise { + const client = await this.client; + const concurrency = await client.hget(this.keys.meta, 'concurrency'); + if(concurrency){ + return Number(concurrency); + } + return null; + } + + /** + * Enable and set global concurrency value. + * @param concurrency - Maximum number of simultaneous jobs that the workers can handle. + * For instance, setting this value to 1 ensures that no more than one job + * is processed at any given time. If this limit is not defined, there will be no + * restriction on the number of concurrent jobs. + */ + async setGlobalConcurrency(concurrency: number) { + const client = await this.client; + return client.hset(this.keys.meta, 'concurrency', concurrency); + } + + /** * Adds a new job to the queue. * @@ -301,6 +327,13 @@ export class Queue< return pausedKeyExists === 1; } + /** + * Returns true if the queue is currently maxed. + */ + isMaxed(): Promise { + return this.scripts.isMaxed(); + } + /** * Get all repeatable meta jobs. * diff --git a/src/classes/scripts.ts b/src/classes/scripts.ts index e303168e6f..cdb215839f 100644 --- a/src/classes/scripts.ts +++ b/src/classes/scripts.ts @@ -107,6 +107,7 @@ export class Scripts { queueKeys.id, queueKeys.prioritized, queueKeys.completed, + queueKeys.active, queueKeys.events, queueKeys.pc, ]; @@ -148,6 +149,7 @@ export class Scripts { queueKeys.meta, queueKeys.id, queueKeys.completed, + queueKeys.active, queueKeys.events, queueKeys.marker, ]; @@ -283,7 +285,9 @@ export class Scripts { async remove(jobId: string, removeChildren: boolean): Promise { const client = await this.queue.client; - const keys: (string | number)[] = [''].map(name => this.queue.toKey(name)); + const keys: (string | number)[] = ['', 'meta'].map(name => + this.queue.toKey(name), + ); return (client).removeJob( keys.concat([jobId, removeChildren ? 1 : 0]), ); @@ -614,7 +618,9 @@ export class Scripts { return (client).getCounts(args); } - protected getCountsPerPriorityArgs(priorities: number[]): (string | number)[] { + protected getCountsPerPriorityArgs( + priorities: number[], + ): (string | number)[] { const keys: (string | number)[] = [ this.queue.keys.wait, this.queue.keys.paused, @@ -772,6 +778,7 @@ export class Scripts { this.queue.keys.paused, this.queue.keys.meta, this.queue.keys.prioritized, + this.queue.keys.active, this.queue.keys.pc, this.queue.keys.marker, ]; @@ -850,6 +857,20 @@ export class Scripts { ]); } + isMaxedArgs(): string[] { + const queueKeys = this.queue.keys; + const keys: string[] = [queueKeys.meta, queueKeys.active]; + + return keys; + } + + async isMaxed(): Promise { + const client = await this.queue.client; + + const args = this.isMaxedArgs(); + return !!(await (client).isMaxed(args)); + } + async moveToDelayed( jobId: string, timestamp: number, @@ -984,6 +1005,7 @@ export class Scripts { this.queue.toKey('wait'), this.queue.toKey('paused'), this.queue.keys.meta, + this.queue.keys.active, this.queue.keys.marker, ]; @@ -1038,6 +1060,7 @@ export class Scripts { this.queue.keys.wait, this.queue.keys.meta, this.queue.keys.paused, + this.queue.keys.active, this.queue.keys.marker, ]; @@ -1108,6 +1131,7 @@ export class Scripts { this.queue.keys.paused, this.queue.keys.meta, this.queue.keys.prioritized, + this.queue.keys.active, this.queue.keys.pc, this.queue.keys.events, this.queue.keys.marker, @@ -1179,7 +1203,7 @@ export class Scripts { const client = await this.queue.client; const lockKey = `${this.queue.toKey(jobId)}:lock`; - const keys = [ + const keys: (string | number)[] = [ this.queue.keys.active, this.queue.keys.wait, this.queue.keys.stalled, diff --git a/src/classes/worker.ts b/src/classes/worker.ts index 3a68feb40c..a4c40aefda 100644 --- a/src/classes/worker.ts +++ b/src/classes/worker.ts @@ -619,7 +619,7 @@ will never work with more accuracy than 1ms. */ let timeout: NodeJS.Timeout; try { - if (!this.closing) { + if (!this.closing && !this.limitUntil) { let blockTimeout = this.getBlockTimeout(blockUntil); if (blockTimeout > 0) { diff --git a/src/commands/addPrioritizedJob-7.lua b/src/commands/addPrioritizedJob-8.lua similarity index 89% rename from src/commands/addPrioritizedJob-7.lua rename to src/commands/addPrioritizedJob-8.lua index fa8f9307cc..53b6410c97 100644 --- a/src/commands/addPrioritizedJob-7.lua +++ b/src/commands/addPrioritizedJob-8.lua @@ -10,8 +10,9 @@ KEYS[3] 'id' KEYS[4] 'prioritized' KEYS[5] 'completed' - KEYS[6] events stream key - KEYS[7] 'pc' priority counter + KEYS[6] 'active' + KEYS[7] events stream key + KEYS[8] 'pc' priority counter ARGV[1] msgpacked arguments array [1] key prefix, @@ -36,8 +37,9 @@ local idKey = KEYS[3] local priorityKey = KEYS[4] local completedKey = KEYS[5] -local eventsKey = KEYS[6] -local priorityCounterKey = KEYS[7] +local activeKey = KEYS[6] +local eventsKey = KEYS[7] +local priorityCounterKey = KEYS[8] local jobId local jobIdKey @@ -58,7 +60,7 @@ local parentData --- @include "includes/storeJob" --- @include "includes/getOrSetMaxEvents" --- @include "includes/handleDuplicatedJob" ---- @include "includes/isQueuePaused" +--- @include "includes/isQueuePausedOrMaxed" if parentKey ~= nil then if rcall("EXISTS", parentKey) ~= 1 then return -5 end @@ -91,8 +93,8 @@ local delay, priority = storeJob(eventsKey, jobIdKey, jobId, args[3], ARGV[2], repeatJobKey) -- Add the job to the prioritized set -local isPause = isQueuePaused(metaKey) -addJobWithPriority( KEYS[1], priorityKey, priority, jobId, priorityCounterKey, isPause) +local isPausedOrMaxed = isQueuePausedOrMaxed(metaKey, activeKey) +addJobWithPriority( KEYS[1], priorityKey, priority, jobId, priorityCounterKey, isPausedOrMaxed) -- Emit waiting event rcall("XADD", eventsKey, "MAXLEN", "~", maxEvents, "*", "event", "waiting", diff --git a/src/commands/addStandardJob-7.lua b/src/commands/addStandardJob-8.lua similarity index 91% rename from src/commands/addStandardJob-7.lua rename to src/commands/addStandardJob-8.lua index 9a682261f7..ed07744abe 100644 --- a/src/commands/addStandardJob-7.lua +++ b/src/commands/addStandardJob-8.lua @@ -20,8 +20,9 @@ KEYS[3] 'meta' KEYS[4] 'id' KEYS[5] 'completed' - KEYS[6] events stream key - KEYS[7] marker key + KEYS[6] 'active' + KEYS[7] events stream key + KEYS[8] marker key ARGV[1] msgpacked arguments array [1] key prefix, @@ -41,7 +42,7 @@ jobId - OK -5 - Missing parent key ]] -local eventsKey = KEYS[6] +local eventsKey = KEYS[7] local jobId local jobIdKey @@ -94,11 +95,11 @@ end storeJob(eventsKey, jobIdKey, jobId, args[3], ARGV[2], opts, timestamp, parentKey, parentData, repeatJobKey) -local target, paused = getTargetQueueList(metaKey, KEYS[1], KEYS[2]) +local target, isPausedOrMaxed = getTargetQueueList(metaKey, KEYS[6], KEYS[1], KEYS[2]) -- LIFO or FIFO local pushCmd = opts['lifo'] and 'RPUSH' or 'LPUSH' -addJobInTargetList(target, KEYS[7], pushCmd, paused, jobId) +addJobInTargetList(target, KEYS[8], pushCmd, isPausedOrMaxed, jobId) -- Emit waiting event rcall("XADD", eventsKey, "MAXLEN", "~", maxEvents, "*", "event", "waiting", diff --git a/src/commands/changePriority-6.lua b/src/commands/changePriority-7.lua similarity index 69% rename from src/commands/changePriority-6.lua rename to src/commands/changePriority-7.lua index e89e0b0af0..d23461edb1 100644 --- a/src/commands/changePriority-6.lua +++ b/src/commands/changePriority-7.lua @@ -5,8 +5,9 @@ KEYS[2] 'paused' KEYS[3] 'meta' KEYS[4] 'prioritized' - KEYS[5] 'pc' priority counter - KEYS[6] 'marker' + KEYS[5] 'active' + KEYS[6] 'pc' priority counter + KEYS[7] 'marker' ARGV[1] priority value ARGV[2] job key @@ -29,33 +30,34 @@ local rcall = redis.call --- @include "includes/pushBackJobWithPriority" local function reAddJobWithNewPriority( prioritizedKey, markerKey, targetKey, - priorityCounter, lifo, priority, jobId, paused) + priorityCounter, lifo, priority, jobId, isPausedOrMaxed) if priority == 0 then local pushCmd = lifo and 'RPUSH' or 'LPUSH' - addJobInTargetList(targetKey, markerKey, pushCmd, paused, jobId) + addJobInTargetList(targetKey, markerKey, pushCmd, isPausedOrMaxed, jobId) else if lifo then pushBackJobWithPriority(prioritizedKey, priority, jobId) else addJobWithPriority(markerKey, prioritizedKey, priority, jobId, - priorityCounter, paused) + priorityCounter, isPausedOrMaxed) end end end if rcall("EXISTS", jobKey) == 1 then local metaKey = KEYS[3] - local target, isPaused = getTargetQueueList(metaKey, KEYS[1], KEYS[2]) - local markerKey = KEYS[6] + local target, isPausedOrMaxed = getTargetQueueList(metaKey, KEYS[5], KEYS[1], KEYS[2]) local prioritizedKey = KEYS[4] - + local priorityCounterKey = KEYS[6] + local markerKey = KEYS[7] + -- Re-add with the new priority if rcall("ZREM", KEYS[4], jobId) > 0 then reAddJobWithNewPriority( prioritizedKey, markerKey, target, - KEYS[5], ARGV[4] == '1', priority, jobId, isPaused) + priorityCounterKey, ARGV[4] == '1', priority, jobId, isPausedOrMaxed) elseif rcall("LREM", target, -1, jobId) > 0 then reAddJobWithNewPriority( prioritizedKey, markerKey, target, - KEYS[5], ARGV[4] == '1', priority, jobId, isPaused) + priorityCounterKey, ARGV[4] == '1', priority, jobId, isPausedOrMaxed) end rcall("HSET", jobKey, "priority", priority) diff --git a/src/commands/getCountsPerPriority-4.lua b/src/commands/getCountsPerPriority-4.lua index f379c46149..fa24ba2328 100644 --- a/src/commands/getCountsPerPriority-4.lua +++ b/src/commands/getCountsPerPriority-4.lua @@ -16,13 +16,16 @@ local pausedKey = KEYS[2] local prioritizedKey = KEYS[4] -- Includes ---- @include "includes/getTargetQueueList" +--- @include "includes/isQueuePaused" for i = 1, #ARGV do local priority = tonumber(ARGV[i]) if priority == 0 then - local target = getTargetQueueList(KEYS[3], waitKey, pausedKey) - results[#results+1] = rcall("LLEN", target) + if isQueuePaused(KEYS[3]) then + results[#results+1] = rcall("LLEN", pausedKey) + else + results[#results+1] = rcall("LLEN", waitKey) + end else results[#results+1] = rcall("ZCOUNT", prioritizedKey, priority * 0x100000000, (priority + 1) * 0x100000000 - 1) diff --git a/src/commands/includes/addBaseMarkerIfNeeded.lua b/src/commands/includes/addBaseMarkerIfNeeded.lua index 573d3a6a19..af10026589 100644 --- a/src/commands/includes/addBaseMarkerIfNeeded.lua +++ b/src/commands/includes/addBaseMarkerIfNeeded.lua @@ -2,8 +2,8 @@ Add marker if needed when a job is available. ]] -local function addBaseMarkerIfNeeded(markerKey, isPaused) - if not isPaused then +local function addBaseMarkerIfNeeded(markerKey, isPausedOrMaxed) + if not isPausedOrMaxed then rcall("ZADD", markerKey, 0, "0") end end diff --git a/src/commands/includes/addJobInTargetList.lua b/src/commands/includes/addJobInTargetList.lua index 387629b756..80f7bc0173 100644 --- a/src/commands/includes/addJobInTargetList.lua +++ b/src/commands/includes/addJobInTargetList.lua @@ -5,7 +5,7 @@ -- Includes --- @include "addBaseMarkerIfNeeded" -local function addJobInTargetList(targetKey, markerKey, pushCmd, isPaused, jobId) +local function addJobInTargetList(targetKey, markerKey, pushCmd, isPausedOrMaxed, jobId) rcall(pushCmd, targetKey, jobId) - addBaseMarkerIfNeeded(markerKey, isPaused) + addBaseMarkerIfNeeded(markerKey, isPausedOrMaxed) end diff --git a/src/commands/includes/addJobWithPriority.lua b/src/commands/includes/addJobWithPriority.lua index f83e887368..f5c334f62e 100644 --- a/src/commands/includes/addJobWithPriority.lua +++ b/src/commands/includes/addJobWithPriority.lua @@ -5,9 +5,10 @@ -- Includes --- @include "addBaseMarkerIfNeeded" -local function addJobWithPriority(markerKey, prioritizedKey, priority, jobId, priorityCounterKey, isPaused) +local function addJobWithPriority(markerKey, prioritizedKey, priority, jobId, priorityCounterKey, + isPausedOrMaxed) local prioCounter = rcall("INCR", priorityCounterKey) local score = priority * 0x100000000 + prioCounter % 0x100000000 rcall("ZADD", prioritizedKey, score, jobId) - addBaseMarkerIfNeeded(markerKey, isPaused) + addBaseMarkerIfNeeded(markerKey, isPausedOrMaxed) end diff --git a/src/commands/includes/getTargetQueueList.lua b/src/commands/includes/getTargetQueueList.lua index a208fdca6a..2a7b03571a 100644 --- a/src/commands/includes/getTargetQueueList.lua +++ b/src/commands/includes/getTargetQueueList.lua @@ -3,10 +3,20 @@ (since an empty list and !EXISTS are not really the same). ]] -local function getTargetQueueList(queueMetaKey, waitKey, pausedKey) - if rcall("HEXISTS", queueMetaKey, "paused") ~= 1 then - return waitKey, false - else +local function getTargetQueueList(queueMetaKey, activeKey, waitKey, pausedKey) + local queueAttributes = rcall("HMGET", queueMetaKey, "paused", "concurrency") + + if queueAttributes[1] then return pausedKey, true + else + if queueAttributes[2] then + local activeCount = rcall("LLEN", activeKey) + if activeCount >= tonumber(queueAttributes[2]) then + return waitKey, true + else + return waitKey, false + end + end end + return waitKey, false end diff --git a/src/commands/includes/isQueueMaxed.lua b/src/commands/includes/isQueueMaxed.lua new file mode 100644 index 0000000000..d0a81aedd5 --- /dev/null +++ b/src/commands/includes/isQueueMaxed.lua @@ -0,0 +1,15 @@ +--[[ + Function to check if queue is maxed or not. +]] +local function isQueueMaxed(queueMetaKey, activeKey) + local maxConcurrency = rcall("HGET", queueMetaKey, "concurrency") + + if maxConcurrency then + local activeCount = rcall("LLEN", activeKey) + if activeCount >= tonumber(maxConcurrency) then + return true + end + end + + return false +end diff --git a/src/commands/includes/isQueuePaused.lua b/src/commands/includes/isQueuePaused.lua index 6885245ea5..66d433aa7a 100644 --- a/src/commands/includes/isQueuePaused.lua +++ b/src/commands/includes/isQueuePaused.lua @@ -3,5 +3,5 @@ (since an empty list and !EXISTS are not really the same). ]] local function isQueuePaused(queueMetaKey) - return rcall("HEXISTS", queueMetaKey, "paused") == 1 + return rcall("HEXISTS", queueMetaKey, "paused") == 1 end diff --git a/src/commands/includes/isQueuePausedOrMaxed.lua b/src/commands/includes/isQueuePausedOrMaxed.lua new file mode 100644 index 0000000000..89d8a0f92c --- /dev/null +++ b/src/commands/includes/isQueuePausedOrMaxed.lua @@ -0,0 +1,18 @@ +--[[ + Function to check if queue is paused or maxed + (since an empty list and !EXISTS are not really the same). +]] + +local function isQueuePausedOrMaxed(queueMetaKey, activeKey) + local queueAttributes = rcall("HMGET", queueMetaKey, "paused", "concurrency") + + if queueAttributes[1] then + return true + else + if queueAttributes[2] then + local activeCount = rcall("LLEN", activeKey) + return activeCount >= tonumber(queueAttributes[2]) + end + end + return false +end diff --git a/src/commands/includes/moveParentToWaitIfNeeded.lua b/src/commands/includes/moveParentToWaitIfNeeded.lua index 35673b7a2e..33d9bcb031 100644 --- a/src/commands/includes/moveParentToWaitIfNeeded.lua +++ b/src/commands/includes/moveParentToWaitIfNeeded.lua @@ -6,7 +6,7 @@ --- @include "addDelayMarkerIfNeeded" --- @include "addJobInTargetList" --- @include "addJobWithPriority" ---- @include "isQueuePaused" +--- @include "isQueuePausedOrMaxed" --- @include "getTargetQueueList" local function moveParentToWaitIfNeeded(parentQueueKey, parentDependenciesKey, @@ -17,6 +17,7 @@ local function moveParentToWaitIfNeeded(parentQueueKey, parentDependenciesKey, rcall("ZREM", parentQueueKey .. ":waiting-children", parentId) local parentWaitKey = parentQueueKey .. ":wait" local parentPausedKey = parentQueueKey .. ":paused" + local parentActiveKey = parentQueueKey .. ":active" local parentMetaKey = parentQueueKey .. ":meta" local parentMarkerKey = parentQueueKey .. ":marker" @@ -35,15 +36,16 @@ local function moveParentToWaitIfNeeded(parentQueueKey, parentDependenciesKey, addDelayMarkerIfNeeded(parentMarkerKey, parentDelayedKey) else if priority == 0 then - local parentTarget, isParentPaused = - getTargetQueueList(parentMetaKey, parentWaitKey, + local parentTarget, isParentPausedOrMaxed = + getTargetQueueList(parentMetaKey, parentActiveKey, parentWaitKey, parentPausedKey) - addJobInTargetList(parentTarget, parentMarkerKey, "RPUSH", isParentPaused, parentId) + addJobInTargetList(parentTarget, parentMarkerKey, "RPUSH", isParentPausedOrMaxed, + parentId) else - local isPaused = isQueuePaused(parentMetaKey) + local isPausedOrMaxed = isQueuePausedOrMaxed(parentMetaKey, parentActiveKey) addJobWithPriority(parentMarkerKey, parentQueueKey .. ":prioritized", priority, - parentId, parentQueueKey .. ":pc", isPaused) + parentId, parentQueueKey .. ":pc", isPausedOrMaxed) end rcall("XADD", parentQueueKey .. ":events", "*", "event", "waiting", diff --git a/src/commands/includes/removeParentDependencyKey.lua b/src/commands/includes/removeParentDependencyKey.lua index fb75120cb8..dee16ca994 100644 --- a/src/commands/includes/removeParentDependencyKey.lua +++ b/src/commands/includes/removeParentDependencyKey.lua @@ -11,9 +11,9 @@ --- @include "removeJobKeys" local function moveParentToWait(parentPrefix, parentId, emitEvent) - local parentTarget, isPaused = getTargetQueueList(parentPrefix .. "meta", parentPrefix .. "wait", - parentPrefix .. "paused") - addJobInTargetList(parentTarget, parentPrefix .. "marker", "RPUSH", isPaused, parentId) + local parentTarget, isPausedOrMaxed = getTargetQueueList(parentPrefix .. "meta", parentPrefix .. "active", + parentPrefix .. "wait", parentPrefix .. "paused") + addJobInTargetList(parentTarget, parentPrefix .. "marker", "RPUSH", isPausedOrMaxed, parentId) if emitEvent then local parentEventStream = parentPrefix .. "events" diff --git a/src/commands/isMaxed-2.lua b/src/commands/isMaxed-2.lua new file mode 100644 index 0000000000..b01e79139a --- /dev/null +++ b/src/commands/isMaxed-2.lua @@ -0,0 +1,17 @@ +--[[ + Checks if queue is maxed. + + Input: + KEYS[1] meta key + KEYS[2] active key + + Output: + 1 if element found in the list. +]] + +local rcall = redis.call + +-- Includes +--- @include "includes/isQueueMaxed" + +return isQueueMaxed(KEYS[1], KEYS[2]) diff --git a/src/commands/moveJobFromActiveToWait-10.lua b/src/commands/moveJobFromActiveToWait-10.lua index a0ed554c2b..e90d6d2d10 100644 --- a/src/commands/moveJobFromActiveToWait-10.lua +++ b/src/commands/moveJobFromActiveToWait-10.lua @@ -35,7 +35,7 @@ if lockToken == token then local metaKey = KEYS[6] local removed = rcall("LREM", KEYS[1], 1, jobId) if removed > 0 then - local target, isPaused = getTargetQueueList(metaKey, KEYS[2], KEYS[5]) + local target, isPausedOrMaxed = getTargetQueueList(metaKey, KEYS[1], KEYS[2], KEYS[5]) rcall("SREM", KEYS[3], jobId) @@ -44,7 +44,7 @@ if lockToken == token then if priority > 0 then pushBackJobWithPriority(KEYS[8], priority, jobId) else - addJobInTargetList(target, KEYS[9], "RPUSH", isPaused, jobId) + addJobInTargetList(target, KEYS[9], "RPUSH", isPausedOrMaxed, jobId) end rcall("DEL", lockKey) diff --git a/src/commands/moveJobsToWait-7.lua b/src/commands/moveJobsToWait-8.lua similarity index 90% rename from src/commands/moveJobsToWait-7.lua rename to src/commands/moveJobsToWait-8.lua index 626cc9b9ad..15e99c6295 100644 --- a/src/commands/moveJobsToWait-7.lua +++ b/src/commands/moveJobsToWait-8.lua @@ -10,7 +10,8 @@ KEYS[4] 'wait' KEYS[5] 'paused' KEYS[6] 'meta' - KEYS[7] 'marker' + KEYS[7] 'active' + KEYS[8] 'marker' ARGV[1] count ARGV[2] timestamp @@ -32,7 +33,7 @@ local rcall = redis.call; --- @include "includes/getTargetQueueList" local metaKey = KEYS[6] -local target, paused = getTargetQueueList(metaKey, KEYS[4], KEYS[5]) +local target, isPausedOrMaxed = getTargetQueueList(metaKey, KEYS[7], KEYS[4], KEYS[5]) local jobs = rcall('ZRANGEBYSCORE', KEYS[3], 0, timestamp, 'LIMIT', 0, maxCount) if (#jobs > 0) then @@ -62,7 +63,7 @@ if (#jobs > 0) then rcall("LPUSH", target, unpack(jobs, from, to)) end - addBaseMarkerIfNeeded(KEYS[7], paused) + addBaseMarkerIfNeeded(KEYS[8], isPausedOrMaxed) end maxCount = maxCount - #jobs diff --git a/src/commands/moveStalledJobsToWait-9.lua b/src/commands/moveStalledJobsToWait-9.lua index df87ca5820..9660e77fcf 100644 --- a/src/commands/moveStalledJobsToWait-9.lua +++ b/src/commands/moveStalledJobsToWait-9.lua @@ -142,11 +142,11 @@ if (#stalling > 0) then table.insert(failed, jobId) else - local target, isPaused= - getTargetQueueList(metaKey, waitKey, pausedKey) + local target, isPausedOrMaxed= + getTargetQueueList(metaKey, activeKey, waitKey, pausedKey) -- Move the job back to the wait queue, to immediately be picked up by a waiting worker. - addJobInTargetList(target, markerKey, "RPUSH", isPaused, jobId) + addJobInTargetList(target, markerKey, "RPUSH", isPausedOrMaxed, jobId) rcall("XADD", eventStreamKey, "*", "event", "waiting", "jobId", jobId, 'prev', 'active') diff --git a/src/commands/moveToActive-11.lua b/src/commands/moveToActive-11.lua index 7ae202bebf..946091e84a 100644 --- a/src/commands/moveToActive-11.lua +++ b/src/commands/moveToActive-11.lua @@ -50,12 +50,12 @@ local opts = cmsgpack.unpack(ARGV[3]) --- @include "includes/prepareJobForProcessing" --- @include "includes/promoteDelayedJobs" -local target, paused = getTargetQueueList(KEYS[9], waitKey, KEYS[8]) +local target, isPausedOrMaxed = getTargetQueueList(KEYS[9], activeKey, waitKey, KEYS[8]) -- Check if there are delayed jobs that we can move to wait. local markerKey = KEYS[11] promoteDelayedJobs(delayedKey, markerKey, target, KEYS[3], eventStreamKey, ARGV[1], - ARGV[2], KEYS[10], paused) + ARGV[2], KEYS[10], isPausedOrMaxed) local maxJobs = tonumber(opts['limiter'] and opts['limiter']['max']) local expireTime = getRateLimitTTL(maxJobs, rateLimiterKey) @@ -63,8 +63,8 @@ local expireTime = getRateLimitTTL(maxJobs, rateLimiterKey) -- Check if we are rate limited first. if expireTime > 0 then return {0, 0, expireTime, 0} end --- paused queue -if paused then return {0, 0, 0, 0} end +-- paused or maxed queue +if isPausedOrMaxed then return {0, 0, 0, 0} end -- no job ID, try non-blocking move from wait to active local jobId = rcall("RPOPLPUSH", waitKey, activeKey) diff --git a/src/commands/moveToFinished-14.lua b/src/commands/moveToFinished-14.lua index 0f33fda8e1..2edbe03835 100644 --- a/src/commands/moveToFinished-14.lua +++ b/src/commands/moveToFinished-14.lua @@ -202,11 +202,11 @@ if rcall("EXISTS", jobIdKey) == 1 then -- // Make sure job exists -- and not rate limited. if (ARGV[6] == "1") then - local target, paused = getTargetQueueList(metaKey, KEYS[1], KEYS[8]) + local target, isPausedOrMaxed = getTargetQueueList(metaKey, KEYS[2], KEYS[1], KEYS[8]) -- Check if there are delayed jobs that can be promoted promoteDelayedJobs(KEYS[7], KEYS[14], target, KEYS[3], eventStreamKey, ARGV[7], - timestamp, KEYS[10], paused) + timestamp, KEYS[10], isPausedOrMaxed) local maxJobs = tonumber(opts['limiter'] and opts['limiter']['max']) -- Check if we are rate limited first. @@ -214,8 +214,8 @@ if rcall("EXISTS", jobIdKey) == 1 then -- // Make sure job exists if expireTime > 0 then return {0, 0, expireTime, 0} end - -- paused queue - if paused then return {0, 0, 0, 0} end + -- paused or maxed queue + if isPausedOrMaxed then return {0, 0, 0, 0} end jobId = rcall("RPOPLPUSH", KEYS[1], KEYS[2]) diff --git a/src/commands/moveToWaitingChildren-5.lua b/src/commands/moveToWaitingChildren-5.lua index a7519faeb0..4ee8f29a25 100644 --- a/src/commands/moveToWaitingChildren-5.lua +++ b/src/commands/moveToWaitingChildren-5.lua @@ -21,8 +21,9 @@ -3 - Job not in active set ]] local rcall = redis.call +local stalledKey = KEYS[5] --- Includes +--- Includes --- @include "includes/removeLock" local function moveToWaitingChildren (activeKey, waitingChildrenKey, jobId, @@ -43,7 +44,7 @@ end if rcall("EXISTS", KEYS[4]) == 1 then if ARGV[2] ~= "" then if rcall("SISMEMBER", KEYS[4] .. ":dependencies", ARGV[2]) ~= 0 then - local errorCode = removeLock(KEYS[4], KEYS[5], ARGV[1], ARGV[4]) + local errorCode = removeLock(KEYS[4], stalledKey, ARGV[1], ARGV[4]) if errorCode < 0 then return errorCode end @@ -53,7 +54,7 @@ if rcall("EXISTS", KEYS[4]) == 1 then return 1 else if rcall("SCARD", KEYS[4] .. ":dependencies") ~= 0 then - local errorCode = removeLock(KEYS[4], KEYS[5], ARGV[1], ARGV[4]) + local errorCode = removeLock(KEYS[4], stalledKey, ARGV[1], ARGV[4]) if errorCode < 0 then return errorCode end diff --git a/src/commands/promote-8.lua b/src/commands/promote-9.lua similarity index 73% rename from src/commands/promote-8.lua rename to src/commands/promote-9.lua index e8b816383b..2143e52aa0 100644 --- a/src/commands/promote-8.lua +++ b/src/commands/promote-9.lua @@ -7,9 +7,10 @@ KEYS[3] 'paused' KEYS[4] 'meta' KEYS[5] 'prioritized' - KEYS[6] 'pc' priority counter - KEYS[7] 'event stream' - KEYS[8] 'marker' + KEYS[6] 'active' + KEYS[7] 'pc' priority counter + KEYS[8] 'event stream' + KEYS[9] 'marker' ARGV[1] queue.toKey('') ARGV[2] jobId @@ -33,23 +34,24 @@ if rcall("ZREM", KEYS[1], jobId) == 1 then local jobKey = ARGV[1] .. jobId local priority = tonumber(rcall("HGET", jobKey, "priority")) or 0 local metaKey = KEYS[4] + local markerKey = KEYS[9] -- Remove delayed "marker" from the wait list if there is any. -- Since we are adding a job we do not need the marker anymore. -- Markers in waitlist DEPRECATED in v5: Remove in v6. - local target, paused = getTargetQueueList(metaKey, KEYS[2], KEYS[3]) + local target, isPausedOrMaxed = getTargetQueueList(metaKey, KEYS[6], KEYS[2], KEYS[3]) local marker = rcall("LINDEX", target, 0) if marker and string.sub(marker, 1, 2) == "0:" then rcall("LPOP", target) end if priority == 0 then -- LIFO or FIFO - addJobInTargetList(target, KEYS[8], "LPUSH", paused, jobId) + addJobInTargetList(target, markerKey, "LPUSH", isPausedOrMaxed, jobId) else - addJobWithPriority(KEYS[8], KEYS[5], priority, jobId, KEYS[6], paused) + addJobWithPriority(markerKey, KEYS[5], priority, jobId, KEYS[7], isPausedOrMaxed) end -- Emit waiting event (wait..ing@token) - rcall("XADD", KEYS[7], "*", "event", "waiting", "jobId", jobId, "prev", + rcall("XADD", KEYS[8], "*", "event", "waiting", "jobId", jobId, "prev", "delayed"); rcall("HSET", jobKey, "delay", 0) diff --git a/src/commands/removeJob-1.lua b/src/commands/removeJob-2.lua similarity index 97% rename from src/commands/removeJob-1.lua rename to src/commands/removeJob-2.lua index 5ce1c01c1e..aaee237727 100644 --- a/src/commands/removeJob-1.lua +++ b/src/commands/removeJob-2.lua @@ -4,6 +4,7 @@ Input: KEYS[1] queue prefix + KEYS[2] meta key ARGV[1] jobId ARGV[2] remove children @@ -66,7 +67,7 @@ local function removeJob( prefix, jobId, parentKey, removeChildren) local prev = removeJobFromAnyState(prefix, jobId) if removeJobKeys(jobKey) > 0 then - local maxEvents = getOrSetMaxEvents(prefix .. "meta") + local maxEvents = getOrSetMaxEvents(KEYS[2]) rcall("XADD", prefix .. "events", "MAXLEN", "~", maxEvents, "*", "event", "removed", "jobId", jobId, "prev", prev) end diff --git a/src/commands/reprocessJob-7.lua b/src/commands/reprocessJob-8.lua similarity index 83% rename from src/commands/reprocessJob-7.lua rename to src/commands/reprocessJob-8.lua index 1489b5fc58..300ab6a1e8 100644 --- a/src/commands/reprocessJob-7.lua +++ b/src/commands/reprocessJob-8.lua @@ -8,7 +8,8 @@ KEYS[4] wait key KEYS[5] meta KEYS[6] paused key - KEYS[7] marker key + KEYS[7] active key + KEYS[8] marker key ARGV[1] job.id ARGV[2] (job.opts.lifo ? 'R' : 'L') + 'PUSH' @@ -32,8 +33,8 @@ if rcall("EXISTS", KEYS[1]) == 1 then if (rcall("ZREM", KEYS[3], jobId) == 1) then rcall("HDEL", KEYS[1], "finishedOn", "processedOn", ARGV[3]) - local target, isPaused = getTargetQueueList(KEYS[5], KEYS[4], KEYS[6]) - addJobInTargetList(target, KEYS[7], ARGV[2], isPaused, jobId) + local target, isPausedOrMaxed = getTargetQueueList(KEYS[5], KEYS[7], KEYS[4], KEYS[6]) + addJobInTargetList(target, KEYS[8], ARGV[2], isPausedOrMaxed, jobId) local maxEvents = getOrSetMaxEvents(KEYS[5]) -- Emit waiting event diff --git a/src/commands/retryJob-11.lua b/src/commands/retryJob-11.lua index e620728225..33d1f7a85a 100644 --- a/src/commands/retryJob-11.lua +++ b/src/commands/retryJob-11.lua @@ -38,13 +38,14 @@ local rcall = redis.call --- @include "includes/getTargetQueueList" --- @include "includes/promoteDelayedJobs" --- @include "includes/removeLock" +--- @include "includes/isQueuePausedOrMaxed" -local target, paused = getTargetQueueList(KEYS[5], KEYS[2], KEYS[3]) +local target, isPausedOrMaxed = getTargetQueueList(KEYS[5], KEYS[1], KEYS[2], KEYS[3]) local markerKey = KEYS[10] -- Check if there are delayed jobs that we can move to wait. -- test example: when there are delayed jobs between retries -promoteDelayedJobs(KEYS[7], markerKey, target, KEYS[8], KEYS[6], ARGV[1], ARGV[2], KEYS[9], paused) +promoteDelayedJobs(KEYS[7], markerKey, target, KEYS[8], KEYS[6], ARGV[1], ARGV[2], KEYS[9], isPausedOrMaxed) if rcall("EXISTS", KEYS[4]) == 1 then local errorCode = removeLock(KEYS[4], KEYS[11], ARGV[5], ARGV[4]) @@ -57,11 +58,14 @@ if rcall("EXISTS", KEYS[4]) == 1 then local priority = tonumber(rcall("HGET", KEYS[4], "priority")) or 0 + --need to re-evaluate after removing job from active + isPausedOrMaxed = isQueuePausedOrMaxed(KEYS[5], KEYS[1]) + -- Standard or priority add if priority == 0 then - addJobInTargetList(target, KEYS[10], ARGV[3], paused, ARGV[4]) + addJobInTargetList(target, markerKey, ARGV[3], isPausedOrMaxed, ARGV[4]) else - addJobWithPriority(markerKey, KEYS[8], priority, ARGV[4], KEYS[9], paused) + addJobWithPriority(markerKey, KEYS[8], priority, ARGV[4], KEYS[9], isPausedOrMaxed) end rcall("HINCRBY", KEYS[4], "atm", 1) diff --git a/tests/test_concurrency.ts b/tests/test_concurrency.ts new file mode 100644 index 0000000000..4834aa4bf5 --- /dev/null +++ b/tests/test_concurrency.ts @@ -0,0 +1,637 @@ +import { default as IORedis } from 'ioredis'; +import { FlowProducer, QueueEvents, Queue, Worker } from '../src/classes'; +import { delay, removeAllQueueData } from '../src/utils'; +import { beforeEach, describe, it, after as afterAll } from 'mocha'; +import { v4 } from 'uuid'; +import { expect } from 'chai'; +import * as ProgressBar from 'progress'; +import { after } from 'lodash'; + +describe('Concurrency', () => { + const redisHost = process.env.REDIS_HOST || 'localhost'; + const prefix = process.env.BULLMQ_TEST_PREFIX || 'bull'; + let queueName: string; + + let connection; + before(async function () { + connection = new IORedis(redisHost, { maxRetriesPerRequest: null }); + }); + + beforeEach(async () => { + queueName = `test-${v4()}`; + await new IORedis().flushall(); + }); + + afterEach(async () => { + await removeAllQueueData(new IORedis(redisHost), queueName); + }); + + afterAll(async function () { + await connection.quit(); + }); + + it('should run max concurrency for jobs added', async () => { + const queue = new Queue(queueName, { connection, prefix }); + const numJobs = 15; + const jobsData: { name: string; data: any }[] = []; + for (let j = 0; j < numJobs; j++) { + jobsData.push({ + name: 'test', + data: { foo: `bar${j}` }, + }); + } + + const noConcurrency = await queue.getGlobalConcurrency(); + expect(noConcurrency).to.be.null; + + await queue.addBulk(jobsData); + await queue.setGlobalConcurrency(1); + const bar = new ProgressBar(':bar', { total: numJobs }); + + let count = 0; + let parallelJobs = 0; + let lastJobId = 0; + let worker: Worker; + const processing = new Promise((resolve, reject) => { + worker = new Worker( + queueName, + async job => { + try { + // Check order is correct + expect(job.id).to.be.eq(`${++lastJobId}`); + count++; + parallelJobs++; + await delay(100); + bar.tick(); + parallelJobs--; + expect(parallelJobs).to.be.eql(0); + if (count == numJobs) { + resolve(); + } + } catch (err) { + console.log(err); + reject(err); + throw err; + } + }, + { + autorun: false, + concurrency: 10, + drainDelay: 10, // If test hangs, 10 seconds here helps to fail quicker. + connection, + prefix, + }, + ); + worker.on('error', err => { + console.error(err); + }); + }); + await worker.waitUntilReady(); + + worker.run(); + + await processing; + + const globalConcurrency = await queue.getGlobalConcurrency(); + expect(globalConcurrency).to.be.eql(1); + + await worker.close(); + await queue.close(); + }).timeout(16000); + + it('emits drained global event only once when worker is idle', async function () { + const queue = new Queue(queueName, { connection, prefix }); + const worker = new Worker( + queueName, + async () => { + await delay(25); + }, + { + concurrency: 10, + drainDelay: 1, + connection, + prefix, + }, + ); + + let counterDrainedEvents = 0; + + const queueEvents = new QueueEvents(queueName, { connection, prefix }); + await queueEvents.waitUntilReady(); + queueEvents.on('drained', () => { + counterDrainedEvents++; + }); + + await queue.addBulk([ + { name: 'test', data: { foo: 'bar' } }, + { name: 'test', data: { foo: 'baz' } }, + ]); + await queue.setGlobalConcurrency(1); + + await delay(4000); + + const jobs = await queue.getJobCountByTypes('completed'); + expect(jobs).to.be.equal(2); + expect(counterDrainedEvents).to.be.equal(1); + + await worker.close(); + await queue.close(); + }).timeout(6000); + + describe('when global dynamic limit is used', () => { + it('should run max concurrency for jobs added respecting global dynamic limit', async () => { + const numJobs = 5; + const dynamicLimit = 250; + const duration = 100; + + const queue = new Queue(queueName, { + connection, + prefix, + }); + const queueEvents = new QueueEvents(queueName, { connection, prefix }); + await queueEvents.waitUntilReady(); + await queue.setGlobalConcurrency(1); + + const worker = new Worker( + queueName, + async job => { + if (job.attemptsStarted === 1) { + await worker.rateLimit(dynamicLimit); + throw Worker.RateLimitError(); + } + }, + { + autorun: false, + concurrency: 10, + drainDelay: 10, // If test hangs, 10 seconds here helps to fail quicker. + limiter: { + max: 1, + duration, + }, + connection, + prefix, + }, + ); + worker.on('error', err => { + console.error(err); + }); + await worker.waitUntilReady(); + + const startTime = new Date().getTime(); + + const result = new Promise((resolve, reject) => { + queueEvents.on( + 'completed', + // after every job has been completed + after(numJobs, async () => { + await worker.close(); + + try { + const timeDiff = new Date().getTime() - startTime; + expect(timeDiff).to.be.gte( + numJobs * (dynamicLimit + duration) - duration, + ); + resolve(); + } catch (err) { + reject(err); + } + }), + ); + + queueEvents.on('failed', async err => { + await worker.close(); + reject(err); + }); + }); + + const jobsData: { name: string; data: any }[] = []; + for (let j = 0; j < numJobs; j++) { + jobsData.push({ + name: 'test', + data: { foo: `bar${j}` }, + }); + } + + await queue.addBulk(jobsData); + + worker.run(); + + await result; + await queueEvents.close(); + await worker.close(); + await queue.close(); + }); + + describe('when max limiter is greater than 1', () => { + it('should run max concurrency for jobs added first processed', async () => { + const numJobs = 10; + const dynamicLimit = 250; + const duration = 100; + + const queue = new Queue(queueName, { + connection, + prefix, + }); + const queueEvents = new QueueEvents(queueName, { connection, prefix }); + await queueEvents.waitUntilReady(); + await queue.setGlobalConcurrency(1); + + const worker = new Worker( + queueName, + async job => { + if (job.attemptsStarted === 1) { + await worker.rateLimit(dynamicLimit); + throw Worker.RateLimitError(); + } + }, + { + autorun: false, + concurrency: 10, + drainDelay: 10, // If test hangs, 10 seconds here helps to fail quicker. + limiter: { + max: 2, + duration, + }, + connection, + prefix, + }, + ); + worker.on('error', err => { + console.error(err); + }); + await worker.waitUntilReady(); + + const startTime = new Date().getTime(); + + const result = new Promise((resolve, reject) => { + queueEvents.on( + 'completed', + // after every job has been completed + after(numJobs, async () => { + await worker.close(); + + try { + const timeDiff = new Date().getTime() - startTime; + expect(timeDiff).to.be.gte(numJobs * dynamicLimit); + resolve(); + } catch (err) { + reject(err); + } + }), + ); + + queueEvents.on('failed', async err => { + await worker.close(); + reject(err); + }); + }); + + const jobsData: { name: string; data: any }[] = []; + for (let j = 0; j < numJobs; j++) { + jobsData.push({ + name: 'test', + data: { foo: `bar${j}` }, + }); + } + + await queue.addBulk(jobsData); + + worker.run(); + + await result; + await queueEvents.close(); + await worker.close(); + await queue.close(); + }).timeout(4000); + }); + }); + + describe('when moving job to waiting-children', () => { + it('should run max concurrency for jobs added first processed', async () => { + const numJobs = 5; + const flow = new FlowProducer({ connection, prefix }); + const queue = new Queue(queueName, { + connection, + prefix, + }); + + const jobsData: { name: string; data: any }[] = []; + for (let j = 0; j < numJobs; j++) { + jobsData.push({ + name: 'test', + data: { foo: `bar${j}` }, + }); + } + + await queue.addBulk(jobsData); + await queue.setGlobalConcurrency(1); + + const name = 'child-job'; + + await flow.add({ + name: 'parent-job', + queueName, + data: {}, + children: [ + { + name, + data: { idx: 0, foo: 'bar' }, + queueName, + children: [ + { + name, + data: { idx: 0, foo: 'bar' }, + queueName, + }, + ], + }, + ], + }); + + const bar = new ProgressBar(':bar', { + total: numJobs + 3, + }); + + let count = 0; + let parallelJobs = 0; + let worker: Worker; + const processing = new Promise((resolve, reject) => { + worker = new Worker( + queueName, + async (job, token) => { + try { + count++; + parallelJobs++; + await delay(100); + bar.tick(); + parallelJobs--; + expect(parallelJobs).to.be.eql(0); + await job.moveToWaitingChildren(token!); + if (count == numJobs + 3) { + resolve(); + } + } catch (err) { + reject(err); + throw err; + } + }, + { + autorun: false, + concurrency: 10, + drainDelay: 10, // If test hangs, 10 seconds here helps to fail quicker. + connection, + prefix, + }, + ); + worker.on('error', err => { + console.error(err); + }); + }); + await worker.waitUntilReady(); + + worker.run(); + + await processing; + await flow.close(); + await worker.close(); + await queue.close(); + }).timeout(16000); + }); + + it('should automatically process stalled jobs respecting group order', async () => { + const numJobs = 4; + const globalConcurrency = 2; + const queue = new Queue(queueName, { + connection, + prefix, + }); + + for (let j = 0; j < numJobs; j++) { + await queue.add('test-stalled', { foo: j % 2 }); + } + await queue.setGlobalConcurrency(globalConcurrency); + + const concurrency = 4; + + const worker = new Worker( + queueName, + async () => { + return delay(10000); + }, + { + autorun: false, + connection, + lockDuration: 1000, + stalledInterval: 100, + concurrency, + prefix, + }, + ); + + const allActive = new Promise(resolve => { + worker.on('active', after(globalConcurrency, resolve)); + }); + + worker.run(); + + await allActive; + + await worker.close(true); + + const processedJobs: { data: any }[] = []; + + const worker2 = new Worker( + queueName, + async job => { + await delay(10); + processedJobs.push({ data: job.data.foo }); + }, + { + autorun: false, + connection, + concurrency, + stalledInterval: 100, + prefix, + }, + ); + + const allCompleted = new Promise(resolve => { + worker2.on('completed', after(numJobs, resolve)); + }); + + worker2.on('error', error => { + console.log('error'); + }); + + const allStalled = new Promise(resolve => { + worker2.on( + 'stalled', + after(globalConcurrency, (jobId, prev) => { + expect(prev).to.be.equal('active'); + resolve(); + }), + ); + }); + + worker2.run(); + await allStalled; + + await allCompleted; + + await worker2.close(); + await queue.close(); + + let index = 0, + sum = 0; + for (let i = 1; i <= numJobs; i++) { + const job = processedJobs[index++]; + sum += Number(job.data); + if (i % 2 == 0) { + expect(sum).to.be.equal(1); + sum = 0; + } + } + }); + + describe('when jobs use backoff strategy', () => { + it('processes jobs without getting stuck', async () => { + const numJobs = 2; + const globalConcurrency = 1; + const queue = new Queue(queueName, { + connection, + prefix, + }); + + for (let j = 0; j < numJobs; j++) { + await queue.add( + 'test', + { foo: `bar${j}` }, + { attempts: 2, backoff: 100 }, + ); + } + await queue.setGlobalConcurrency(globalConcurrency); + + const concurrency = 10; + + let worker: Worker; + const processedJobs: { data: any }[] = []; + const processing = new Promise(resolve => { + worker = new Worker( + queueName, + async job => { + await delay(25); + if (job.attemptsStarted == 1) { + throw new Error('Not yet!'); + } + + processedJobs.push({ data: job.data }); + if (processedJobs.length == numJobs) { + resolve(); + } + }, + { + connection, + concurrency, + prefix, + }, + ); + }); + + await processing; + + expect(processedJobs.length).to.be.equal(numJobs); + + await worker.close(); + await queue.close(); + }).timeout(20000); + + describe('when backoff is 0', () => { + it('processes jobs without getting stuck', async () => { + const numJobs = 7; + const globalConcurrency = 1; + const queue = new Queue(queueName, { + connection, + prefix, + }); + + for (let j = 0; j < numJobs; j++) { + await queue.add( + 'test', + { foo: `bar${j}` }, + { attempts: 2, backoff: 0 }, + ); + } + await queue.setGlobalConcurrency(globalConcurrency); + + const concurrency = 4; + + let worker: Worker; + const processedJobs: { data: any }[] = []; + const processing = new Promise(resolve => { + worker = new Worker( + queueName, + async job => { + await delay(20); + if (job.attemptsStarted == 1) { + throw new Error('Not yet!'); + } + + processedJobs.push({ data: job.data }); + if (processedJobs.length == numJobs) { + resolve(); + } + }, + { + connection, + concurrency, + prefix, + }, + ); + }); + + await processing; + + expect(processedJobs.length).to.be.equal(numJobs); + + await worker.close(); + await queue.close(); + }); + }); + }); + + describe('when lock is expired and removing a job in active state', () => { + it('does not get stuck in max state', async function () { + const globalConcurrency = 1; + const queue = new Queue(queueName, { + connection, + prefix, + }); + await queue.waitUntilReady(); + await queue.setGlobalConcurrency(globalConcurrency); + const worker = new Worker(queueName, null, { + connection, + lockRenewTime: 200, + lockDuration: 20, + skipStalledCheck: true, + skipLockRenewal: true, + prefix, + }); + const token = 'my-token'; + const numJobs = 4; + for (let j = 0; j < numJobs; j++) { + await queue.add('test', { foo: `bar${j}` }); + } + const job1 = await worker.getNextJob(token); + const state = await job1!.getState(); + expect(state).to.be.equal('active'); + await delay(50); + let isMaxed = await queue.isMaxed(); + expect(isMaxed).to.be.true; + await job1!.remove(); + isMaxed = await queue.isMaxed(); + expect(isMaxed).to.be.false; + await worker.close(); + await queue.close(); + }); + }); +}); diff --git a/yarn.lock b/yarn.lock index 2760d06dad..64aa585a25 100644 --- a/yarn.lock +++ b/yarn.lock @@ -6161,7 +6161,7 @@ process-on-spawn@^1.0.0: dependencies: fromentries "^1.2.0" -progress@^2.0.0: +progress@^2.0.0, progress@^2.0.3: version "2.0.3" resolved "https://registry.yarnpkg.com/progress/-/progress-2.0.3.tgz#7e8cf8d8f5b8f239c1bc68beb4eb78567d572ef8" integrity sha512-7PiHtLll5LdnKIMw100I+8xJXR5gW2QwWYkT6iJva0bXitZKa/XMrSbdmg3r2Xnaidz9Qumd0VPaMrZlF9V9sA==