diff --git a/python/bullmq/scripts.py b/python/bullmq/scripts.py index b8e6873863..d45cf099bf 100644 --- a/python/bullmq/scripts.py +++ b/python/bullmq/scripts.py @@ -54,7 +54,7 @@ def __init__(self, prefix: str, queueName: str, redisConnection: RedisConnection "removeJob": self.redisClient.register_script(self.getScript("removeJob-1.lua")), "reprocessJob": self.redisClient.register_script(self.getScript("reprocessJob-6.lua")), "retryJob": self.redisClient.register_script(self.getScript("retryJob-10.lua")), - "moveJobsToWait": self.redisClient.register_script(self.getScript("moveJobsToWait-6.lua")), + "moveJobsToWait": self.redisClient.register_script(self.getScript("moveJobsToWait-7.lua")), "saveStacktrace": self.redisClient.register_script(self.getScript("saveStacktrace-1.lua")), "updateData": self.redisClient.register_script(self.getScript("updateData-1.lua")), "updateProgress": self.redisClient.register_script(self.getScript("updateProgress-3.lua")), @@ -419,7 +419,7 @@ async def obliterate(self, count: int, force: bool = False): def moveJobsToWaitArgs(self, state: str, count: int, timestamp: int) -> int: keys = self.getKeys( - ['', 'events', state, 'wait', 'paused', 'meta']) + ['', 'events', state, 'wait', 'paused', 'meta', 'marker']) args = [count or 1000, timestamp or round(time.time()*1000), state] return (keys, args) diff --git a/src/classes/scripts.ts b/src/classes/scripts.ts index 1964b24011..23e762bb1f 100644 --- a/src/classes/scripts.ts +++ b/src/classes/scripts.ts @@ -848,7 +848,8 @@ export class Scripts { this.queue.toKey(state), this.queue.toKey('wait'), this.queue.toKey('paused'), - this.queue.toKey('meta'), + this.queue.keys.meta, + this.queue.keys.marker, ]; const args = [count, timestamp, state]; diff --git a/src/commands/moveJobsToWait-6.lua b/src/commands/moveJobsToWait-7.lua similarity index 91% rename from src/commands/moveJobsToWait-6.lua rename to src/commands/moveJobsToWait-7.lua index f8141d87d4..21f09e718a 100644 --- a/src/commands/moveJobsToWait-6.lua +++ b/src/commands/moveJobsToWait-7.lua @@ -10,6 +10,7 @@ KEYS[4] 'wait' KEYS[5] 'paused' KEYS[6] 'meta' + KEYS[7] 'marker' ARGV[1] count ARGV[2] timestamp @@ -30,7 +31,7 @@ local rcall = redis.call; --- @include "includes/getTargetQueueList" local metaKey = KEYS[6] -local target = getTargetQueueList(metaKey, KEYS[4], KEYS[5]) +local target, paused = getTargetQueueList(metaKey, KEYS[4], KEYS[5]) local jobs = rcall('ZRANGEBYSCORE', KEYS[3], 0, timestamp, 'LIMIT', 0, maxCount) if (#jobs > 0) then @@ -59,6 +60,10 @@ if (#jobs > 0) then rcall("ZREM", KEYS[3], unpack(jobs, from, to)) rcall("LPUSH", target, unpack(jobs, from, to)) end + + if not paused then + rcall("ZADD", KEYS[7], 0, "0") + end end maxCount = maxCount - #jobs diff --git a/tests/test_events.ts b/tests/test_events.ts index f0c6377c92..9131e33bd1 100644 --- a/tests/test_events.ts +++ b/tests/test_events.ts @@ -264,7 +264,7 @@ describe('events', function () { { name: 'test', data: { foo: 'baz' } }, ]); - await delay(1000); + await delay(2000); const jobs = await queue.getJobCountByTypes('completed'); expect(jobs).to.be.equal(4); diff --git a/tests/test_queue.ts b/tests/test_queue.ts index 491d709ded..39cc448e6d 100644 --- a/tests/test_queue.ts +++ b/tests/test_queue.ts @@ -468,7 +468,7 @@ describe('queues', function () { }); describe('when completed state is provided', () => { - it('retries all completed jobs', async () => { + it('retries all completed jobs', async function () { await queue.waitUntilReady(); const jobCount = 8; @@ -485,9 +485,11 @@ describe('queues', function () { worker.on('completed', after(jobCount, resolve)); }); - for (const index of Array.from(Array(jobCount).keys())) { - await queue.add('test', { idx: index }); - } + const jobs = Array.from(Array(jobCount).keys()).map(index => ({ + name: 'test', + data: { idx: index }, + })); + await queue.addBulk(jobs); await completing1;