diff --git a/src/classes/worker.ts b/src/classes/worker.ts index d929eb0069..da45dadc96 100644 --- a/src/classes/worker.ts +++ b/src/classes/worker.ts @@ -595,37 +595,30 @@ export class Worker< } try { - const opts: WorkerOptions = this.opts; - if (!this.closing) { - let blockTimeout = Math.max( - blockUntil ? (blockUntil - Date.now()) / 1000 : opts.drainDelay, - 0, - ); + let blockTimeout = this.getBlockTimeout(blockUntil); - // Blocking for less than 50ms is useless. - if (blockTimeout > 0.05) { - blockTimeout = this.blockingConnection.capabilities.canDoubleTimeout - ? blockTimeout - : Math.ceil(blockTimeout); + blockTimeout = this.blockingConnection.capabilities.canDoubleTimeout + ? blockTimeout + : Math.ceil(blockTimeout); - // We restrict the maximum block timeout to 10 second to avoid - // blocking the connection for too long in the case of reconnections - // reference: https://github.com/taskforcesh/bullmq/issues/1658 - blockTimeout = Math.min(blockTimeout, maximumBlockTimeout); + // We restrict the maximum block timeout to 10 second to avoid + // blocking the connection for too long in the case of reconnections + // reference: https://github.com/taskforcesh/bullmq/issues/1658 + blockTimeout = Math.min(blockTimeout, maximumBlockTimeout); - // Markers should only be used for un-blocking, so we will handle them in this - // function only. - const result = await bclient.bzpopmin(this.keys.marker, blockTimeout); + // Markers should only be used for un-blocking, so we will handle them in this + // function only. + const result = await bclient.bzpopmin(this.keys.marker, blockTimeout); - if (result) { - const [_key, member, score] = result; + if (result) { + const [_key, member, score] = result; - if (member) { - return parseInt(score); - } + if (member) { + return parseInt(score); } } + return 0; } } catch (error) { @@ -641,6 +634,23 @@ export class Worker< return Infinity; } + protected getBlockTimeout(blockUntil: number): number { + const opts: WorkerOptions = this.opts; + + // when there are delayed jobs + if (blockUntil) { + const blockDelay = blockUntil - Date.now(); + // when we reach the time to get new jobs + if (blockDelay < 1) { + return 0.001; + } else { + return blockDelay / 1000; + } + } else { + return Math.max(opts.drainDelay, 0); + } + } + /** * * This function is exposed only for testing purposes.