diff --git a/src/classes/redis-connection.ts b/src/classes/redis-connection.ts index ca169de3e2..0051024353 100644 --- a/src/classes/redis-connection.ts +++ b/src/classes/redis-connection.ts @@ -22,6 +22,10 @@ const deprecationMessage = [ 'On the next versions having this settings will throw an exception', ].join(' '); +interface RedisCapabilities { + canDoubleTimeout: boolean; +} + export interface RawCommand { content: string; name: string; @@ -33,11 +37,14 @@ export class RedisConnection extends EventEmitter { static recommendedMinimumVersion = '6.2.0'; closing: boolean; + capabilities: RedisCapabilities = { + canDoubleTimeout: false, + }; protected _client: RedisClient; private readonly opts: RedisOptions; - private initializing: Promise; + private readonly initializing: Promise; private version: string; private skipVersionCheck: boolean; @@ -207,6 +214,11 @@ export class RedisConnection extends EventEmitter { ); } } + + this.capabilities = { + canDoubleTimeout: !isRedisVersionLowerThan(this.version, '6.0.0'), + }; + return this._client; } diff --git a/src/classes/worker.ts b/src/classes/worker.ts index e2c1554038..d914d7d069 100644 --- a/src/classes/worker.ts +++ b/src/classes/worker.ts @@ -181,7 +181,7 @@ export class Worker< private stalledCheckTimer: NodeJS.Timeout; private waiting: Promise | null = null; private _repeat: Repeat; - + protected paused: Promise; protected processFn: Processor; protected running = false; @@ -540,12 +540,9 @@ export class Worker< ); // Only Redis v6.0.0 and above supports doubles as block time - blockTimeout = isRedisVersionLowerThan( - this.blockingConnection.redisVersion, - '6.0.0', - ) - ? Math.ceil(blockTimeout) - : blockTimeout; + blockTimeout = this.blockingConnection.capabilities.canDoubleTimeout + ? blockTimeout + : Math.ceil(blockTimeout); // We restrict the maximum block timeout to 10 second to avoid // blocking the connection for too long in the case of reconnections diff --git a/tests/test_repeat.ts b/tests/test_repeat.ts index 88367a80fe..364a6623e9 100644 --- a/tests/test_repeat.ts +++ b/tests/test_repeat.ts @@ -1053,6 +1053,44 @@ describe('repeat', function () { }); }); + describe('when repeatable job is promoted', function () { + it('keeps one repeatable and one delayed after being processed', async function () { + const options = { + repeat: { + pattern: '0 * 1 * *', + }, + }; + + const worker = new Worker(queueName, async () => {}, { connection }); + + const completing = new Promise(resolve => { + worker.on('completed', () => { + resolve(); + }); + }); + + const repeatableJob = await queue.add('test', { foo: 'bar' }, options); + const delayedCount = await queue.getDelayedCount(); + expect(delayedCount).to.be.equal(1); + + await repeatableJob.promote(); + await completing; + + const delayedCount2 = await queue.getDelayedCount(); + expect(delayedCount2).to.be.equal(1); + + const configs = await repeat.getRepeatableJobs(0, -1, true); + + expect(delayedCount).to.be.equal(1); + + const count = await queue.count(); + + expect(count).to.be.equal(1); + expect(configs).to.have.length(1); + await worker.close(); + }); + }); + it('should allow removing a named repeatable job', async function () { const numJobs = 3; const date = new Date('2017-02-07 9:24:00');