From 6e1173e11e59761884ec33133ca11f3efdef8c8b Mon Sep 17 00:00:00 2001 From: Manuel Astudillo Date: Thu, 2 Nov 2023 23:23:50 +0100 Subject: [PATCH] fix(worker): keep extending locks while closing workers --- src/classes/queue-base.ts | 6 ++++-- src/classes/worker.ts | 3 ++- tests/test_worker.ts | 33 +++++++++++++++++++++++++++++++++ 3 files changed, 39 insertions(+), 3 deletions(-) diff --git a/src/classes/queue-base.ts b/src/classes/queue-base.ts index 21a5d5ae3b..7134eac661 100644 --- a/src/classes/queue-base.ts +++ b/src/classes/queue-base.ts @@ -20,6 +20,7 @@ export class QueueBase extends EventEmitter implements MinimalQueue { keys: KeysMap; closing: Promise | undefined; + protected closed: boolean = false; protected scripts: Scripts; protected connection: RedisConnection; public readonly qualifiedName: string; @@ -137,11 +138,12 @@ export class QueueBase extends EventEmitter implements MinimalQueue { * * @returns Closes the connection and returns a promise that resolves when the connection is closed. */ - close(): Promise { + async close(): Promise { if (!this.closing) { this.closing = this.connection.close(); } - return this.closing; + await this.closing; + this.closed = true; } /** diff --git a/src/classes/worker.ts b/src/classes/worker.ts index d914d7d069..dbd8a74168 100644 --- a/src/classes/worker.ts +++ b/src/classes/worker.ts @@ -779,6 +779,7 @@ export class Worker< .finally(() => client.disconnect()) .finally(() => this.connection.close()) .finally(() => this.emit('closed')); + this.closed = true; })(); return this.closing; } @@ -818,7 +819,7 @@ export class Worker< if (!this.opts.skipLockRenewal) { clearTimeout(this.extendLocksTimer); - if (!this.closing) { + if (!this.closed) { this.extendLocksTimer = setTimeout(async () => { // Get all the jobs whose locks expire in less than 1/2 of the lockRenewTime const now = Date.now(); diff --git a/tests/test_worker.ts b/tests/test_worker.ts index 315b22dfa8..f2e29df9e8 100644 --- a/tests/test_worker.ts +++ b/tests/test_worker.ts @@ -1621,6 +1621,39 @@ describe('workers', function () { ).to.throw('stalledInterval must be greater than 0'); }); + it('lock extender continues to run until all active jobs are completed when closing a worker', async function () { + this.timeout(4000); + let worker; + + const startProcessing = new Promise(resolve => { + worker = new Worker( + queueName, + async () => { + resolve(); + return delay(2000); + }, + { + connection, + lockDuration: 1000, + lockRenewTime: 500, + stalledInterval: 1000, + }, + ); + }); + + await queue.add('test', { bar: 'baz' }); + + const completed = new Promise((resolve, reject) => { + worker.on('completed', resolve); + worker.on('failed', reject); + }); + + await startProcessing; + await worker.close(); + + await completed; + }); + describe('Concurrency process', () => { it('should thrown an exception if I specify a concurrency of 0', () => { try {