From 56533c79721d845ceb534eecf912cf713bcad4f1 Mon Sep 17 00:00:00 2001 From: roggervalf Date: Mon, 7 Oct 2024 14:50:58 -0500 Subject: [PATCH] feat(queue): add removeRateLimitKey method --- src/classes/queue.ts | 21 ++++++++---- src/interfaces/worker-options.ts | 2 -- tests/test_rate_limiter.ts | 58 ++++++++++++++++++++++++++++++++ 3 files changed, 73 insertions(+), 8 deletions(-) diff --git a/src/classes/queue.ts b/src/classes/queue.ts index d2ff19a2a2..7fefe29aff 100644 --- a/src/classes/queue.ts +++ b/src/classes/queue.ts @@ -485,12 +485,21 @@ export class Queue< * * @param id - identifier */ - async removeDeduplicationKey(id: string): Promise { - const client = await this.client; - - return client.del(`${this.keys.de}:${id}`); - } - + async removeDeduplicationKey(id: string): Promise { + const client = await this.client; + + return client.del(`${this.keys.de}:${id}`); + } + + /** + * Removes rate limit key. + */ + async removeRateLimitKey(): Promise { + const client = await this.client; + + return client.del(this.keys.limiter); + } + /** * Removes a repeatable job by its key. Note that the key is the one used * to store the repeatable job metadata and not one of the job iterations diff --git a/src/interfaces/worker-options.ts b/src/interfaces/worker-options.ts index 73324e145d..cb69995f71 100644 --- a/src/interfaces/worker-options.ts +++ b/src/interfaces/worker-options.ts @@ -99,7 +99,6 @@ export interface WorkerOptions extends QueueBaseOptions, SandboxedOptions { skipLockRenewal?: boolean; /** - * * Number of seconds to long poll for jobs when the queue is empty. * * @default 5 @@ -107,7 +106,6 @@ export interface WorkerOptions extends QueueBaseOptions, SandboxedOptions { drainDelay?: number; /** - * * Duration of the lock for the job in milliseconds. The lock represents that * a worker is processing the job. If the lock is lost, the job will be eventually * be picked up by the stalled checker and move back to wait so that another worker diff --git a/tests/test_rate_limiter.ts b/tests/test_rate_limiter.ts index bb25a9d53c..28e4c5c9ad 100644 --- a/tests/test_rate_limiter.ts +++ b/tests/test_rate_limiter.ts @@ -845,6 +845,64 @@ describe('Rate Limiter', function () { await worker.close(); }); }); + + describe('when removing rate limit', () => { + it('should process jobs normally', async function () { + this.timeout(5000); + + const numJobs = 2; + const dynamicLimit = 10000; + const duration = 1000; + + const ttl = await queue.getRateLimitTtl(); + expect(ttl).to.be.equal(-2); + + const worker = new Worker(queueName, async () => {}, { + autorun: false, + connection, + prefix, + limiter: { + max: 1, + duration, + }, + }); + + await worker.rateLimit(dynamicLimit); + + await queue.removeRateLimitKey(); + const result = new Promise((resolve, reject) => { + queueEvents.on( + 'completed', + // after every job has been completed + after(numJobs, async () => { + try { + const timeDiff = new Date().getTime() - startTime; + expect(timeDiff).to.be.gte((numJobs - 1) * duration); + expect(timeDiff).to.be.lte(numJobs * duration); + resolve(); + } catch (err) { + reject(err); + } + }), + ); + + queueEvents.on('failed', async err => { + reject(err); + }); + }); + + const startTime = new Date().getTime(); + const jobs = Array.from(Array(numJobs).keys()).map(() => ({ + name: 'rate test', + data: {}, + })); + await queue.addBulk(jobs); + + worker.run(); + await result; + await worker.close(); + }); + }); }); describe('when there are more added jobs than max limiter', () => {