From 56533c79721d845ceb534eecf912cf713bcad4f1 Mon Sep 17 00:00:00 2001 From: roggervalf Date: Mon, 7 Oct 2024 14:50:58 -0500 Subject: [PATCH 1/3] feat(queue): add removeRateLimitKey method --- src/classes/queue.ts | 21 ++++++++---- src/interfaces/worker-options.ts | 2 -- tests/test_rate_limiter.ts | 58 ++++++++++++++++++++++++++++++++ 3 files changed, 73 insertions(+), 8 deletions(-) diff --git a/src/classes/queue.ts b/src/classes/queue.ts index d2ff19a2a2..7fefe29aff 100644 --- a/src/classes/queue.ts +++ b/src/classes/queue.ts @@ -485,12 +485,21 @@ export class Queue< * * @param id - identifier */ - async removeDeduplicationKey(id: string): Promise { - const client = await this.client; - - return client.del(`${this.keys.de}:${id}`); - } - + async removeDeduplicationKey(id: string): Promise { + const client = await this.client; + + return client.del(`${this.keys.de}:${id}`); + } + + /** + * Removes rate limit key. + */ + async removeRateLimitKey(): Promise { + const client = await this.client; + + return client.del(this.keys.limiter); + } + /** * Removes a repeatable job by its key. Note that the key is the one used * to store the repeatable job metadata and not one of the job iterations diff --git a/src/interfaces/worker-options.ts b/src/interfaces/worker-options.ts index 73324e145d..cb69995f71 100644 --- a/src/interfaces/worker-options.ts +++ b/src/interfaces/worker-options.ts @@ -99,7 +99,6 @@ export interface WorkerOptions extends QueueBaseOptions, SandboxedOptions { skipLockRenewal?: boolean; /** - * * Number of seconds to long poll for jobs when the queue is empty. * * @default 5 @@ -107,7 +106,6 @@ export interface WorkerOptions extends QueueBaseOptions, SandboxedOptions { drainDelay?: number; /** - * * Duration of the lock for the job in milliseconds. The lock represents that * a worker is processing the job. If the lock is lost, the job will be eventually * be picked up by the stalled checker and move back to wait so that another worker diff --git a/tests/test_rate_limiter.ts b/tests/test_rate_limiter.ts index bb25a9d53c..28e4c5c9ad 100644 --- a/tests/test_rate_limiter.ts +++ b/tests/test_rate_limiter.ts @@ -845,6 +845,64 @@ describe('Rate Limiter', function () { await worker.close(); }); }); + + describe('when removing rate limit', () => { + it('should process jobs normally', async function () { + this.timeout(5000); + + const numJobs = 2; + const dynamicLimit = 10000; + const duration = 1000; + + const ttl = await queue.getRateLimitTtl(); + expect(ttl).to.be.equal(-2); + + const worker = new Worker(queueName, async () => {}, { + autorun: false, + connection, + prefix, + limiter: { + max: 1, + duration, + }, + }); + + await worker.rateLimit(dynamicLimit); + + await queue.removeRateLimitKey(); + const result = new Promise((resolve, reject) => { + queueEvents.on( + 'completed', + // after every job has been completed + after(numJobs, async () => { + try { + const timeDiff = new Date().getTime() - startTime; + expect(timeDiff).to.be.gte((numJobs - 1) * duration); + expect(timeDiff).to.be.lte(numJobs * duration); + resolve(); + } catch (err) { + reject(err); + } + }), + ); + + queueEvents.on('failed', async err => { + reject(err); + }); + }); + + const startTime = new Date().getTime(); + const jobs = Array.from(Array(numJobs).keys()).map(() => ({ + name: 'rate test', + data: {}, + })); + await queue.addBulk(jobs); + + worker.run(); + await result; + await worker.close(); + }); + }); }); describe('when there are more added jobs than max limiter', () => { From a1e5906ca13e764c3184df54346c256ab553e596 Mon Sep 17 00:00:00 2001 From: roggervalf Date: Mon, 7 Oct 2024 14:55:24 -0500 Subject: [PATCH 2/3] docs(guide): add description --- docs/gitbook/guide/rate-limiting.md | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/docs/gitbook/guide/rate-limiting.md b/docs/gitbook/guide/rate-limiting.md index a06d98997e..c282b7ac3c 100644 --- a/docs/gitbook/guide/rate-limiting.md +++ b/docs/gitbook/guide/rate-limiting.md @@ -112,7 +112,22 @@ if (ttl > 0) { } ``` +### Remove Rate Limit Key + +Sometimes is useful to stop a rate limit delay. + +For this purpose, you can use the **`removeRateLimitKey`** method like this: + +```typescript +import { Queue } from 'bullmq'; + +const queue = new Queue('myQueue', { connection }); + +await queue.removeRateLimitKey(); +``` + ## Read more: - 💡 [Rate Limit API Reference](https://api.docs.bullmq.io/classes/v5.Worker.html#rateLimit) - 💡 [Get Rate Limit Ttl API Reference](https://api.docs.bullmq.io/classes/v5.Queue.html#getRateLimitTtl) +- 💡 [Remove Rate Limit Key API Reference](https://api.docs.bullmq.io/classes/v5.Queue.html#removeRateLimitKey) From 9be5ba5bd11b0f144c659bba6785d111027b8703 Mon Sep 17 00:00:00 2001 From: roggervalf Date: Mon, 14 Oct 2024 22:26:34 -0500 Subject: [PATCH 3/3] docs: clarify removeRateLimitKey consequences --- docs/gitbook/guide/rate-limiting.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/docs/gitbook/guide/rate-limiting.md b/docs/gitbook/guide/rate-limiting.md index c282b7ac3c..49e155e528 100644 --- a/docs/gitbook/guide/rate-limiting.md +++ b/docs/gitbook/guide/rate-limiting.md @@ -126,6 +126,8 @@ const queue = new Queue('myQueue', { connection }); await queue.removeRateLimitKey(); ``` +By removing rate limit key, workers will be able to pick jobs again and your rate limit counter is reset to zero. + ## Read more: - 💡 [Rate Limit API Reference](https://api.docs.bullmq.io/classes/v5.Worker.html#rateLimit)