Skip to content

Commit

Permalink
fix(worker): keep extending locks while closing workers
Browse files Browse the repository at this point in the history
  • Loading branch information
manast committed Nov 2, 2023
1 parent bc2d980 commit 6e1173e
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 3 deletions.
6 changes: 4 additions & 2 deletions src/classes/queue-base.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ export class QueueBase extends EventEmitter implements MinimalQueue {
keys: KeysMap;
closing: Promise<void> | undefined;

protected closed: boolean = false;
protected scripts: Scripts;
protected connection: RedisConnection;
public readonly qualifiedName: string;
Expand Down Expand Up @@ -137,11 +138,12 @@ export class QueueBase extends EventEmitter implements MinimalQueue {
*
* @returns Closes the connection and returns a promise that resolves when the connection is closed.
*/
close(): Promise<void> {
async close(): Promise<void> {
if (!this.closing) {
this.closing = this.connection.close();
}
return this.closing;
await this.closing;
this.closed = true;
}

/**
Expand Down
3 changes: 2 additions & 1 deletion src/classes/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -779,6 +779,7 @@ export class Worker<
.finally(() => client.disconnect())
.finally(() => this.connection.close())
.finally(() => this.emit('closed'));
this.closed = true;
})();
return this.closing;
}
Expand Down Expand Up @@ -818,7 +819,7 @@ export class Worker<
if (!this.opts.skipLockRenewal) {
clearTimeout(this.extendLocksTimer);

if (!this.closing) {
if (!this.closed) {
this.extendLocksTimer = setTimeout(async () => {
// Get all the jobs whose locks expire in less than 1/2 of the lockRenewTime
const now = Date.now();
Expand Down
33 changes: 33 additions & 0 deletions tests/test_worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1621,6 +1621,39 @@ describe('workers', function () {
).to.throw('stalledInterval must be greater than 0');
});

it('lock extender continues to run until all active jobs are completed when closing a worker', async function () {
this.timeout(4000);
let worker;

const startProcessing = new Promise<void>(resolve => {
worker = new Worker(
queueName,
async () => {
resolve();
return delay(2000);
},
{
connection,
lockDuration: 1000,
lockRenewTime: 500,
stalledInterval: 1000,
},
);
});

await queue.add('test', { bar: 'baz' });

const completed = new Promise((resolve, reject) => {
worker.on('completed', resolve);
worker.on('failed', reject);
});

await startProcessing;
await worker.close();

await completed;
});

describe('Concurrency process', () => {
it('should thrown an exception if I specify a concurrency of 0', () => {
try {
Expand Down

0 comments on commit 6e1173e

Please sign in to comment.