Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(worker): fix close sequence to reduce risk for open handlers #2656

Merged
merged 10 commits into from
Aug 12, 2024
4 changes: 2 additions & 2 deletions src/classes/redis-connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ export class RedisConnection extends EventEmitter {
return client.connect();
}

async close(): Promise<void> {
async close(force = false): Promise<void> {
if (!this.closing) {
const status = this.status;
this.status = 'closing';
Expand All @@ -310,7 +310,7 @@ export class RedisConnection extends EventEmitter {
await this.initializing;
}
if (!this.shared) {
if (status == 'initializing') {
if (status == 'initializing' || force || process.env.CI) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

better to not add a process.env value here, in case users use the same name

// If we have not still connected to Redis, we need to disconnect.
this._client.disconnect();
} else {
Expand Down
50 changes: 23 additions & 27 deletions src/classes/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -877,38 +877,34 @@ will never work with more accuracy than 1ms. */
}
this.closing = (async () => {
this.emit('closing', 'closing queue');

this.abortDelayController?.abort();

const client =
this.blockingConnection.status == 'ready'
? await this.blockingConnection.client
: null;

this.resume();
await Promise.resolve()
.finally(() => {

// Define the async cleanup functions
const asyncCleanups = [
() => {
return force || this.whenCurrentJobsFinished(false);
})
.finally(() => {
const closePoolPromise = this.childPool?.clean();

if (force) {
// since we're not waiting for the job to end attach
// an error handler to avoid crashing the whole process
closePoolPromise?.catch(err => {
console.error(err); // TODO: emit error in next breaking change version
});
return;
}
return closePoolPromise;
})
.finally(() => clearTimeout(this.extendLocksTimer))
.finally(() => clearTimeout(this.stalledCheckTimer))
.finally(() => client && client.disconnect())
.finally(() => this.connection.close())
.finally(() => this.emit('closed'));
},
() => this.childPool?.clean(),
() => this.blockingConnection.close(force),
() => this.connection.close(force),
];

// Run cleanup functions sequentially and make sure all are run despite any errors
for (const cleanup of asyncCleanups) {
try {
await cleanup();
} catch (err) {
this.emit('error', <Error>err);
}
}

clearTimeout(this.extendLocksTimer);
clearTimeout(this.stalledCheckTimer);

this.closed = true;
this.emit('closed');
})();
return this.closing;
}
Expand Down
Loading