diff --git a/src/classes/queue-getters.ts b/src/classes/queue-getters.ts index ba3690e89a..c910f6028c 100644 --- a/src/classes/queue-getters.ts +++ b/src/classes/queue-getters.ts @@ -103,13 +103,14 @@ export class QueueGetters< /** * Returns the time to live for a rate limited key in milliseconds. + * @param maxJobs - max jobs to be considered in rate limit state. If not passed + * it will return the remaining ttl without considering if max jobs is excedeed. * @returns -2 if the key does not exist. * -1 if the key exists but has no associated expire. * @see {@link https://redis.io/commands/pttl/} */ - async getRateLimitTtl(): Promise { - const client = await this.client; - return client.pttl(this.keys.limiter); + async getRateLimitTtl(maxJobs?: number): Promise { + return this.scripts.getRateLimitTtl(maxJobs); } /** @@ -148,6 +149,7 @@ export class QueueGetters< /** * Get current job state. * + * @param jobId - job identifier. * @returns Returns one of these values: * 'completed', 'failed', 'delayed', 'active', 'waiting', 'waiting-children', 'unknown'. */ diff --git a/src/classes/scripts.ts b/src/classes/scripts.ts index 43c776ad5e..b0fd95f8b2 100644 --- a/src/classes/scripts.ts +++ b/src/classes/scripts.ts @@ -933,6 +933,19 @@ export class Scripts { } } + getRateLimitTtlArgs(maxJobs?: number): (string | number)[] { + const keys: (string | number)[] = [this.queue.keys.limiter]; + + return keys.concat([maxJobs ?? '0']); + } + + async getRateLimitTtl(maxJobs?: number): Promise { + const client = await this.queue.client; + + const args = this.getRateLimitTtlArgs(maxJobs); + return (client).getRateLimitTtl(args); + } + /** * Remove jobs in a specific state. * diff --git a/src/commands/getRateLimitTtl-1.lua b/src/commands/getRateLimitTtl-1.lua new file mode 100644 index 0000000000..afcf794ae2 --- /dev/null +++ b/src/commands/getRateLimitTtl-1.lua @@ -0,0 +1,20 @@ +--[[ + Get rate limit ttl + + Input: + KEYS[1] 'limiter' + + ARGV[1] maxJobs +]] + +local rcall = redis.call + +-- Includes +--- @include "includes/getRateLimitTTL" + +local rateLimiterKey = KEYS[1] +if ARGV[1] ~= "0" then + return getRateLimitTTL(tonumber(ARGV[1]), rateLimiterKey) +else + return rcall("PTTL", rateLimiterKey) +end diff --git a/tests/test_rate_limiter.ts b/tests/test_rate_limiter.ts index 33914ca922..bb25a9d53c 100644 --- a/tests/test_rate_limiter.ts +++ b/tests/test_rate_limiter.ts @@ -486,6 +486,128 @@ describe('Rate Limiter', function () { }); }); + describe('when passing maxJobs when getting rate limit ttl', () => { + describe('when rate limit counter is lower than maxJobs', () => { + it('should returns 0', async function () { + this.timeout(4000); + + const numJobs = 1; + const duration = 100; + + const ttl = await queue.getRateLimitTtl(); + expect(ttl).to.be.equal(-2); + + const worker = new Worker( + queueName, + async job => { + if (job.attemptsStarted === 1) { + delay(50); + const currentTtl = await queue.getRateLimitTtl(2); + expect(currentTtl).to.be.equal(0); + } + }, + { + connection, + prefix, + maxStalledCount: 0, + limiter: { + max: 2, + duration, + }, + }, + ); + + const result = new Promise((resolve, reject) => { + queueEvents.on( + 'completed', + // after every job has been completed + after(numJobs, async () => { + try { + resolve(); + } catch (err) { + reject(err); + } + }), + ); + + queueEvents.on('failed', async err => { + await worker.close(); + reject(err); + }); + }); + + const jobs = Array.from(Array(numJobs).keys()).map(() => ({ + name: 'rate test', + data: {}, + })); + await queue.addBulk(jobs); + + await result; + await worker.close(); + }); + }); + + describe('when rate limit counter is greater than maxJobs', () => { + it('should returns at least rate limit duration', async function () { + this.timeout(4000); + + const numJobs = 10; + const duration = 100; + + const ttl = await queue.getRateLimitTtl(); + expect(ttl).to.be.equal(-2); + + const worker = new Worker( + queueName, + async job => { + if (job.attemptsStarted === 1) { + delay(50); + const currentTtl = await queue.getRateLimitTtl(1); + expect(currentTtl).to.be.lessThanOrEqual(duration); + } + }, + { + connection, + prefix, + maxStalledCount: 0, + limiter: { + max: 1, + duration, + }, + }, + ); + + const result = new Promise((resolve, reject) => { + queueEvents.on( + 'completed', + // after every job has been completed + after(numJobs, async () => { + try { + resolve(); + } catch (err) { + reject(err); + } + }), + ); + + queueEvents.on('failed', async err => { + await worker.close(); + reject(err); + }); + }); + + const jobs = Array.from(Array(numJobs).keys()).map(() => ({ + name: 'rate test', + data: {}, + })); + await queue.addBulk(jobs); + + await result; + await worker.close(); + }); + }); + }); + describe('when reaching max attempts and we want to move the job to failed', () => { it('should throw Unrecoverable error', async function () { const dynamicLimit = 550;