From 308db7f58758a72b8abb272da8e92509813a2178 Mon Sep 17 00:00:00 2001 From: Rogger Valverde Date: Wed, 25 Oct 2023 19:24:38 -0700 Subject: [PATCH 1/8] fix(sandbox): do not return empty object result when it is undefined (#2247) --- src/classes/child-processor.ts | 4 ++-- tests/test_sandboxed_process.ts | 4 +++- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/src/classes/child-processor.ts b/src/classes/child-processor.ts index 44bf586d28..ceb990109e 100644 --- a/src/classes/child-processor.ts +++ b/src/classes/child-processor.ts @@ -71,10 +71,10 @@ export class ChildProcessor { this.currentJobPromise = (async () => { try { const job = this.wrapJob(jobJson, this.send); - const result = (await this.processor(job, token)) || {}; + const result = await this.processor(job, token); await this.send({ cmd: ParentCommand.Completed, - value: result, + value: typeof result === 'undefined' ? null : result, }); } catch (err) { await this.send({ diff --git a/tests/test_sandboxed_process.ts b/tests/test_sandboxed_process.ts index 3fdf1cd7ca..53ab2a16e8 100644 --- a/tests/test_sandboxed_process.ts +++ b/tests/test_sandboxed_process.ts @@ -51,6 +51,7 @@ function sandboxProcessTests( const completing = new Promise((resolve, reject) => { worker.on('completed', async (job: Job, value: any) => { try { + expect(job.returnvalue).to.be.eql(42); expect(job.data).to.be.eql({ foo: 'bar' }); expect(value).to.be.eql(42); expect(Object.keys(worker['childPool'].retained)).to.have.lengthOf( @@ -770,8 +771,9 @@ function sandboxProcessTests( }); const completing = new Promise((resolve, reject) => { - worker.on('completed', async () => { + worker.on('completed', async job => { try { + expect(job.returnvalue).to.be.undefined; expect(Object.keys(worker['childPool'].retained)).to.have.lengthOf( 0, ); From 14176c1846c507da411495ddba40996a48e585b6 Mon Sep 17 00:00:00 2001 From: semantic-release-bot Date: Thu, 26 Oct 2023 02:25:51 +0000 Subject: [PATCH 2/8] chore(release): 4.12.6 [skip ci] ## [4.12.6](https://github.com/taskforcesh/bullmq/compare/v4.12.5...v4.12.6) (2023-10-26) ### Bug Fixes * **sandbox:** do not return empty object result when it is undefined ([#2247](https://github.com/taskforcesh/bullmq/issues/2247)) ([308db7f](https://github.com/taskforcesh/bullmq/commit/308db7f58758a72b8abb272da8e92509813a2178)) --- docs/gitbook/changelog.md | 7 +++++++ package.json | 2 +- 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/docs/gitbook/changelog.md b/docs/gitbook/changelog.md index 8a211e1458..36a4b8046c 100644 --- a/docs/gitbook/changelog.md +++ b/docs/gitbook/changelog.md @@ -1,3 +1,10 @@ +## [4.12.6](https://github.com/taskforcesh/bullmq/compare/v4.12.5...v4.12.6) (2023-10-26) + + +### Bug Fixes + +* **sandbox:** do not return empty object result when it is undefined ([#2247](https://github.com/taskforcesh/bullmq/issues/2247)) ([308db7f](https://github.com/taskforcesh/bullmq/commit/308db7f58758a72b8abb272da8e92509813a2178)) + ## [4.12.5](https://github.com/taskforcesh/bullmq/compare/v4.12.4...v4.12.5) (2023-10-18) diff --git a/package.json b/package.json index d557b1bc0b..d1d6d2b3b5 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "bullmq", - "version": "4.12.5", + "version": "4.12.6", "description": "Queue for messages and jobs based on Redis", "homepage": "https://bullmq.io/", "main": "./dist/cjs/index.js", From a09b15af0d5dedfa83bce7130ee9094f3fb69e10 Mon Sep 17 00:00:00 2001 From: Rogger Valverde Date: Sat, 28 Oct 2023 23:35:15 -0700 Subject: [PATCH 3/8] perf(redis-connection): check redis version greater or equal than v6 only once (#2252) --- src/classes/redis-connection.ts | 14 +++++++++++- src/classes/worker.ts | 11 ++++------ tests/test_repeat.ts | 38 +++++++++++++++++++++++++++++++++ 3 files changed, 55 insertions(+), 8 deletions(-) diff --git a/src/classes/redis-connection.ts b/src/classes/redis-connection.ts index ca169de3e2..0051024353 100644 --- a/src/classes/redis-connection.ts +++ b/src/classes/redis-connection.ts @@ -22,6 +22,10 @@ const deprecationMessage = [ 'On the next versions having this settings will throw an exception', ].join(' '); +interface RedisCapabilities { + canDoubleTimeout: boolean; +} + export interface RawCommand { content: string; name: string; @@ -33,11 +37,14 @@ export class RedisConnection extends EventEmitter { static recommendedMinimumVersion = '6.2.0'; closing: boolean; + capabilities: RedisCapabilities = { + canDoubleTimeout: false, + }; protected _client: RedisClient; private readonly opts: RedisOptions; - private initializing: Promise; + private readonly initializing: Promise; private version: string; private skipVersionCheck: boolean; @@ -207,6 +214,11 @@ export class RedisConnection extends EventEmitter { ); } } + + this.capabilities = { + canDoubleTimeout: !isRedisVersionLowerThan(this.version, '6.0.0'), + }; + return this._client; } diff --git a/src/classes/worker.ts b/src/classes/worker.ts index e2c1554038..d914d7d069 100644 --- a/src/classes/worker.ts +++ b/src/classes/worker.ts @@ -181,7 +181,7 @@ export class Worker< private stalledCheckTimer: NodeJS.Timeout; private waiting: Promise | null = null; private _repeat: Repeat; - + protected paused: Promise; protected processFn: Processor; protected running = false; @@ -540,12 +540,9 @@ export class Worker< ); // Only Redis v6.0.0 and above supports doubles as block time - blockTimeout = isRedisVersionLowerThan( - this.blockingConnection.redisVersion, - '6.0.0', - ) - ? Math.ceil(blockTimeout) - : blockTimeout; + blockTimeout = this.blockingConnection.capabilities.canDoubleTimeout + ? blockTimeout + : Math.ceil(blockTimeout); // We restrict the maximum block timeout to 10 second to avoid // blocking the connection for too long in the case of reconnections diff --git a/tests/test_repeat.ts b/tests/test_repeat.ts index 88367a80fe..364a6623e9 100644 --- a/tests/test_repeat.ts +++ b/tests/test_repeat.ts @@ -1053,6 +1053,44 @@ describe('repeat', function () { }); }); + describe('when repeatable job is promoted', function () { + it('keeps one repeatable and one delayed after being processed', async function () { + const options = { + repeat: { + pattern: '0 * 1 * *', + }, + }; + + const worker = new Worker(queueName, async () => {}, { connection }); + + const completing = new Promise(resolve => { + worker.on('completed', () => { + resolve(); + }); + }); + + const repeatableJob = await queue.add('test', { foo: 'bar' }, options); + const delayedCount = await queue.getDelayedCount(); + expect(delayedCount).to.be.equal(1); + + await repeatableJob.promote(); + await completing; + + const delayedCount2 = await queue.getDelayedCount(); + expect(delayedCount2).to.be.equal(1); + + const configs = await repeat.getRepeatableJobs(0, -1, true); + + expect(delayedCount).to.be.equal(1); + + const count = await queue.count(); + + expect(count).to.be.equal(1); + expect(configs).to.have.length(1); + await worker.close(); + }); + }); + it('should allow removing a named repeatable job', async function () { const numJobs = 3; const date = new Date('2017-02-07 9:24:00'); From a7f7080622d9689b1a62cb280df412520b3c1e3a Mon Sep 17 00:00:00 2001 From: semantic-release-bot Date: Sun, 29 Oct 2023 06:36:33 +0000 Subject: [PATCH 4/8] chore(release): 4.12.7 [skip ci] ## [4.12.7](https://github.com/taskforcesh/bullmq/compare/v4.12.6...v4.12.7) (2023-10-29) ### Performance Improvements * **redis-connection:** check redis version greater or equal than v6 only once ([#2252](https://github.com/taskforcesh/bullmq/issues/2252)) ([a09b15a](https://github.com/taskforcesh/bullmq/commit/a09b15af0d5dedfa83bce7130ee9094f3fb69e10)) --- docs/gitbook/changelog.md | 7 +++++++ package.json | 2 +- 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/docs/gitbook/changelog.md b/docs/gitbook/changelog.md index 36a4b8046c..64f859088d 100644 --- a/docs/gitbook/changelog.md +++ b/docs/gitbook/changelog.md @@ -1,3 +1,10 @@ +## [4.12.7](https://github.com/taskforcesh/bullmq/compare/v4.12.6...v4.12.7) (2023-10-29) + + +### Performance Improvements + +* **redis-connection:** check redis version greater or equal than v6 only once ([#2252](https://github.com/taskforcesh/bullmq/issues/2252)) ([a09b15a](https://github.com/taskforcesh/bullmq/commit/a09b15af0d5dedfa83bce7130ee9094f3fb69e10)) + ## [4.12.6](https://github.com/taskforcesh/bullmq/compare/v4.12.5...v4.12.6) (2023-10-26) diff --git a/package.json b/package.json index d1d6d2b3b5..5b35aed3c4 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "bullmq", - "version": "4.12.6", + "version": "4.12.7", "description": "Queue for messages and jobs based on Redis", "homepage": "https://bullmq.io/", "main": "./dist/cjs/index.js", From 07855ad9c84e943b8300df26a2e0304c7535c391 Mon Sep 17 00:00:00 2001 From: Rogger Valverde Date: Tue, 31 Oct 2023 21:23:41 -0700 Subject: [PATCH 5/8] docs(retry): extend description (#2164) --- docs/gitbook/guide/retrying-failing-jobs.md | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/docs/gitbook/guide/retrying-failing-jobs.md b/docs/gitbook/guide/retrying-failing-jobs.md index fc4ff481de..753deca232 100644 --- a/docs/gitbook/guide/retrying-failing-jobs.md +++ b/docs/gitbook/guide/retrying-failing-jobs.md @@ -1,6 +1,6 @@ # Retrying failing jobs -As your queues processes jobs, it is inevitable that over time some of these jobs will fail. In BullMQ, a job is considered failed in the following scenarios: +As your queues process jobs, it is inevitable that over time some of these jobs will fail. In BullMQ, a job is considered failed in the following scenarios: - The processor function defined in your [Worker](https://docs.bullmq.io/guide/workers) has thrown an exception. - The job has become [stalled](https://docs.bullmq.io/guide/jobs/stalled) and it has consumed the "max stalled count" setting. @@ -19,6 +19,10 @@ Often it is desirable to automatically retry failed jobs so that we do not give BullMQ supports retries of failed jobs using back-off functions. It is possible to use the **built-in** backoff functions or provide **custom** ones. If you do not specify a back-off function, the jobs will be retried without delay as soon as they fail. +{% hint style="info" %} +Retried jobs will respect their priority when they are moved back to waiting state. +{% endhint %} + #### Built-in backoff strategies The current built-in backoff functions are "exponential" and "fixed". @@ -81,6 +85,12 @@ const worker = new Worker('foo', async job => doSomeProcessing(), { }); ``` +{% hint style="info" %} +If your backoffStrategy returns 0, jobs will be moved at the end of our waiting list (priority 0) or moved back to prioritized state (priority > 0). + +If your backoffStrategy returns -1, jobs won't be retried, instead they will be moved to failed state. +{% endhint %} + You can then use your custom strategy when adding jobs: ```typescript @@ -128,3 +138,7 @@ const worker = new Worker('foo', async job => doSomeProcessing(), { }, }); ``` + +## Read more: + +- 💡 [Stop Retrying Jobs](../patterns/stop-retrying-jobs.md) From bc2d980e3409884a29220cd74cca3bbe5cacc9e6 Mon Sep 17 00:00:00 2001 From: Rogger Valverde Date: Tue, 31 Oct 2023 22:17:41 -0700 Subject: [PATCH 6/8] docs(repeatable): add example when removing repeatable jobs (#2257) --- docs/gitbook/guide/jobs/repeatable.md | 14 ++++++++++++++ src/classes/queue.ts | 4 ++-- 2 files changed, 16 insertions(+), 2 deletions(-) diff --git a/docs/gitbook/guide/jobs/repeatable.md b/docs/gitbook/guide/jobs/repeatable.md index 1dfbf8de16..24a4fc1153 100644 --- a/docs/gitbook/guide/jobs/repeatable.md +++ b/docs/gitbook/guide/jobs/repeatable.md @@ -52,6 +52,20 @@ There are some important considerations regarding repeatable jobs: - If there are no workers running, repeatable jobs will not accumulate next time a worker is online. - repeatable jobs can be removed using the [removeRepeatable](https://api.docs.bullmq.io/classes/v4.Queue.html#removeRepeatable) method or [removeRepeatableByKey](https://api.docs.bullmq.io/classes/v4.Queue.html#removeRepeatableByKey). +```typescript +import { Queue } from 'bullmq'; + +const repeat = { pattern: '*/1 * * * * *' }; + +const myQueue = new Queue('Paint'); + +const job1 = await myQueue.add('red', { foo: 'bar' }, { repeat }); +const job2 = await myQueue.add('blue', { foo: 'baz' }, { repeat }); + +const isRemoved1 = await myQueue.removeRepeatableByKey(job1.repeatJobKey); +const isRemoved2 = await queue.removeRepeatable('blue', repeat); +``` + All repeatable jobs have a repeatable job key that holds some metadata of the repeatable job itself. It is possible to retrieve all the current repeatable jobs in the queue calling [getRepeatableJobs](https://api.docs.bullmq.io/classes/v4.Queue.html#getRepeatableJobs): ```typescript diff --git a/src/classes/queue.ts b/src/classes/queue.ts index 4f845b6676..74d45dfcb0 100644 --- a/src/classes/queue.ts +++ b/src/classes/queue.ts @@ -314,7 +314,7 @@ export class Queue< * * @see removeRepeatableByKey * - * @param name - + * @param name - job name * @param repeatOpts - * @param jobId - * @returns @@ -337,7 +337,7 @@ export class Queue< * * @see getRepeatableJobs * - * @param key - to the repeatable job. + * @param repeatJobKey - to the repeatable job. * @returns */ async removeRepeatableByKey(key: string): Promise { From c4d12ea3a9837ffd7f58e2134796137c4181c3de Mon Sep 17 00:00:00 2001 From: Manuel Astudillo Date: Fri, 3 Nov 2023 03:44:58 +0100 Subject: [PATCH 7/8] fix(worker): keep extending locks while closing workers (#2259) --- src/classes/queue-base.ts | 6 ++++-- src/classes/worker.ts | 3 ++- tests/test_worker.ts | 33 +++++++++++++++++++++++++++++++++ 3 files changed, 39 insertions(+), 3 deletions(-) diff --git a/src/classes/queue-base.ts b/src/classes/queue-base.ts index 21a5d5ae3b..7134eac661 100644 --- a/src/classes/queue-base.ts +++ b/src/classes/queue-base.ts @@ -20,6 +20,7 @@ export class QueueBase extends EventEmitter implements MinimalQueue { keys: KeysMap; closing: Promise | undefined; + protected closed: boolean = false; protected scripts: Scripts; protected connection: RedisConnection; public readonly qualifiedName: string; @@ -137,11 +138,12 @@ export class QueueBase extends EventEmitter implements MinimalQueue { * * @returns Closes the connection and returns a promise that resolves when the connection is closed. */ - close(): Promise { + async close(): Promise { if (!this.closing) { this.closing = this.connection.close(); } - return this.closing; + await this.closing; + this.closed = true; } /** diff --git a/src/classes/worker.ts b/src/classes/worker.ts index d914d7d069..dbd8a74168 100644 --- a/src/classes/worker.ts +++ b/src/classes/worker.ts @@ -779,6 +779,7 @@ export class Worker< .finally(() => client.disconnect()) .finally(() => this.connection.close()) .finally(() => this.emit('closed')); + this.closed = true; })(); return this.closing; } @@ -818,7 +819,7 @@ export class Worker< if (!this.opts.skipLockRenewal) { clearTimeout(this.extendLocksTimer); - if (!this.closing) { + if (!this.closed) { this.extendLocksTimer = setTimeout(async () => { // Get all the jobs whose locks expire in less than 1/2 of the lockRenewTime const now = Date.now(); diff --git a/tests/test_worker.ts b/tests/test_worker.ts index 315b22dfa8..f2e29df9e8 100644 --- a/tests/test_worker.ts +++ b/tests/test_worker.ts @@ -1621,6 +1621,39 @@ describe('workers', function () { ).to.throw('stalledInterval must be greater than 0'); }); + it('lock extender continues to run until all active jobs are completed when closing a worker', async function () { + this.timeout(4000); + let worker; + + const startProcessing = new Promise(resolve => { + worker = new Worker( + queueName, + async () => { + resolve(); + return delay(2000); + }, + { + connection, + lockDuration: 1000, + lockRenewTime: 500, + stalledInterval: 1000, + }, + ); + }); + + await queue.add('test', { bar: 'baz' }); + + const completed = new Promise((resolve, reject) => { + worker.on('completed', resolve); + worker.on('failed', reject); + }); + + await startProcessing; + await worker.close(); + + await completed; + }); + describe('Concurrency process', () => { it('should thrown an exception if I specify a concurrency of 0', () => { try { From edd4b6b6cf714d1e95c43b61df7e363409e85ba4 Mon Sep 17 00:00:00 2001 From: semantic-release-bot Date: Fri, 3 Nov 2023 02:46:27 +0000 Subject: [PATCH 8/8] chore(release): 4.12.8 [skip ci] ## [4.12.8](https://github.com/taskforcesh/bullmq/compare/v4.12.7...v4.12.8) (2023-11-03) ### Bug Fixes * **worker:** keep extending locks while closing workers ([#2259](https://github.com/taskforcesh/bullmq/issues/2259)) ([c4d12ea](https://github.com/taskforcesh/bullmq/commit/c4d12ea3a9837ffd7f58e2134796137c4181c3de)) --- docs/gitbook/changelog.md | 7 +++++++ package.json | 2 +- 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/docs/gitbook/changelog.md b/docs/gitbook/changelog.md index 64f859088d..24946b7865 100644 --- a/docs/gitbook/changelog.md +++ b/docs/gitbook/changelog.md @@ -1,3 +1,10 @@ +## [4.12.8](https://github.com/taskforcesh/bullmq/compare/v4.12.7...v4.12.8) (2023-11-03) + + +### Bug Fixes + +* **worker:** keep extending locks while closing workers ([#2259](https://github.com/taskforcesh/bullmq/issues/2259)) ([c4d12ea](https://github.com/taskforcesh/bullmq/commit/c4d12ea3a9837ffd7f58e2134796137c4181c3de)) + ## [4.12.7](https://github.com/taskforcesh/bullmq/compare/v4.12.6...v4.12.7) (2023-10-29) diff --git a/package.json b/package.json index 5b35aed3c4..4d7e99a9a4 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "bullmq", - "version": "4.12.7", + "version": "4.12.8", "description": "Queue for messages and jobs based on Redis", "homepage": "https://bullmq.io/", "main": "./dist/cjs/index.js",