From 17aa2eb74ee47c9ff2b1ae19e47806aaa975ca9c Mon Sep 17 00:00:00 2001 From: roggervalf Date: Tue, 27 Feb 2024 23:57:57 -0500 Subject: [PATCH 1/3] fix(worker): consider block timeouts less than 50ms --- src/classes/worker.ts | 40 +++++++++++++++++++--------------------- 1 file changed, 19 insertions(+), 21 deletions(-) diff --git a/src/classes/worker.ts b/src/classes/worker.ts index d929eb0069..96d4ab0a02 100644 --- a/src/classes/worker.ts +++ b/src/classes/worker.ts @@ -603,29 +603,27 @@ export class Worker< 0, ); - // Blocking for less than 50ms is useless. - if (blockTimeout > 0.05) { - blockTimeout = this.blockingConnection.capabilities.canDoubleTimeout - ? blockTimeout - : Math.ceil(blockTimeout); - - // We restrict the maximum block timeout to 10 second to avoid - // blocking the connection for too long in the case of reconnections - // reference: https://github.com/taskforcesh/bullmq/issues/1658 - blockTimeout = Math.min(blockTimeout, maximumBlockTimeout); - - // Markers should only be used for un-blocking, so we will handle them in this - // function only. - const result = await bclient.bzpopmin(this.keys.marker, blockTimeout); - - if (result) { - const [_key, member, score] = result; - - if (member) { - return parseInt(score); - } + blockTimeout = this.blockingConnection.capabilities.canDoubleTimeout + ? blockTimeout + : Math.ceil(blockTimeout); + + // We restrict the maximum block timeout to 10 second to avoid + // blocking the connection for too long in the case of reconnections + // reference: https://github.com/taskforcesh/bullmq/issues/1658 + blockTimeout = Math.min(blockTimeout, maximumBlockTimeout); + + // Markers should only be used for un-blocking, so we will handle them in this + // function only. + const result = await bclient.bzpopmin(this.keys.marker, blockTimeout); + + if (result) { + const [_key, member, score] = result; + + if (member) { + return parseInt(score); } } + return 0; } } catch (error) { From 5e6eb043830c2ef8b2756e83b1b05a269a28e5a1 Mon Sep 17 00:00:00 2001 From: roggervalf Date: Wed, 28 Feb 2024 23:51:51 -0500 Subject: [PATCH 2/3] fix: set blockTimeout as 0.001 when reach the time to get new jobs --- src/classes/worker.ts | 24 ++++++++++++++++++------ 1 file changed, 18 insertions(+), 6 deletions(-) diff --git a/src/classes/worker.ts b/src/classes/worker.ts index 96d4ab0a02..2b2c5a17fa 100644 --- a/src/classes/worker.ts +++ b/src/classes/worker.ts @@ -595,13 +595,8 @@ export class Worker< } try { - const opts: WorkerOptions = this.opts; - if (!this.closing) { - let blockTimeout = Math.max( - blockUntil ? (blockUntil - Date.now()) / 1000 : opts.drainDelay, - 0, - ); + let blockTimeout = this.getBlockTimeout(blockUntil); blockTimeout = this.blockingConnection.capabilities.canDoubleTimeout ? blockTimeout @@ -639,6 +634,23 @@ export class Worker< return Infinity; } + getBlockTimeout(blockUntil: number) { + const opts: WorkerOptions = this.opts; + + // when there are delayed jobs + if (blockUntil) { + const blockDelay = blockUntil - Date.now(); + // when we reach the time to get new jobs + if (blockDelay < 1) { + return 0.001; + } else { + return blockDelay / 1000; + } + } else { + return Math.max(opts.drainDelay, 0); + } + } + /** * * This function is exposed only for testing purposes. From cedf0ade975cfe1cd64504cc74c858a5bdebb05f Mon Sep 17 00:00:00 2001 From: roggervalf Date: Wed, 28 Feb 2024 23:53:02 -0500 Subject: [PATCH 3/3] chore: add missing types --- src/classes/worker.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/classes/worker.ts b/src/classes/worker.ts index 2b2c5a17fa..da45dadc96 100644 --- a/src/classes/worker.ts +++ b/src/classes/worker.ts @@ -634,7 +634,7 @@ export class Worker< return Infinity; } - getBlockTimeout(blockUntil: number) { + protected getBlockTimeout(blockUntil: number): number { const opts: WorkerOptions = this.opts; // when there are delayed jobs