diff --git a/src/classes/redis-connection.ts b/src/classes/redis-connection.ts index 96db6bf322..bc7e69a222 100644 --- a/src/classes/redis-connection.ts +++ b/src/classes/redis-connection.ts @@ -298,7 +298,7 @@ export class RedisConnection extends EventEmitter { return client.connect(); } - async close(): Promise { + async close(force = false): Promise { if (!this.closing) { const status = this.status; this.status = 'closing'; @@ -310,7 +310,7 @@ export class RedisConnection extends EventEmitter { await this.initializing; } if (!this.shared) { - if (status == 'initializing') { + if (status == 'initializing' || force || process.env.CI) { // If we have not still connected to Redis, we need to disconnect. this._client.disconnect(); } else { diff --git a/src/classes/worker.ts b/src/classes/worker.ts index 81d03a45a0..bda6163f7e 100644 --- a/src/classes/worker.ts +++ b/src/classes/worker.ts @@ -877,38 +877,34 @@ will never work with more accuracy than 1ms. */ } this.closing = (async () => { this.emit('closing', 'closing queue'); - this.abortDelayController?.abort(); - const client = - this.blockingConnection.status == 'ready' - ? await this.blockingConnection.client - : null; - this.resume(); - await Promise.resolve() - .finally(() => { + + // Define the async cleanup functions + const asyncCleanups = [ + () => { return force || this.whenCurrentJobsFinished(false); - }) - .finally(() => { - const closePoolPromise = this.childPool?.clean(); - - if (force) { - // since we're not waiting for the job to end attach - // an error handler to avoid crashing the whole process - closePoolPromise?.catch(err => { - console.error(err); // TODO: emit error in next breaking change version - }); - return; - } - return closePoolPromise; - }) - .finally(() => clearTimeout(this.extendLocksTimer)) - .finally(() => clearTimeout(this.stalledCheckTimer)) - .finally(() => client && client.disconnect()) - .finally(() => this.connection.close()) - .finally(() => this.emit('closed')); + }, + () => this.childPool?.clean(), + () => this.blockingConnection.close(force), + () => this.connection.close(force), + ]; + + // Run cleanup functions sequentially and make sure all are run despite any errors + for (const cleanup of asyncCleanups) { + try { + await cleanup(); + } catch (err) { + this.emit('error', err); + } + } + + clearTimeout(this.extendLocksTimer); + clearTimeout(this.stalledCheckTimer); + this.closed = true; + this.emit('closed'); })(); return this.closing; }