Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(queue): add removeRateLimitKey method #2806

Merged
merged 5 commits into from
Nov 19, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions docs/gitbook/guide/rate-limiting.md
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,22 @@ if (ttl > 0) {
}
```

### Remove Rate Limit Key

Sometimes is useful to stop a rate limit delay.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need a bit more of information here, like what happens when this method is used precisely.


For this purpose, you can use the **`removeRateLimitKey`** method like this:

```typescript
import { Queue } from 'bullmq';

const queue = new Queue('myQueue', { connection });

await queue.removeRateLimitKey();
```

## Read more:

- 💡 [Rate Limit API Reference](https://api.docs.bullmq.io/classes/v5.Worker.html#rateLimit)
- 💡 [Get Rate Limit Ttl API Reference](https://api.docs.bullmq.io/classes/v5.Queue.html#getRateLimitTtl)
- 💡 [Remove Rate Limit Key API Reference](https://api.docs.bullmq.io/classes/v5.Queue.html#removeRateLimitKey)
9 changes: 9 additions & 0 deletions src/classes/queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -491,6 +491,15 @@ export class Queue<
return client.del(`${this.keys.de}:${id}`);
}

/**
* Removes rate limit key.
*/
async removeRateLimitKey(): Promise<number> {
const client = await this.client;

return client.del(this.keys.limiter);
}

/**
* Removes a repeatable job by its key. Note that the key is the one used
* to store the repeatable job metadata and not one of the job iterations
Expand Down
2 changes: 0 additions & 2 deletions src/interfaces/worker-options.ts
Original file line number Diff line number Diff line change
Expand Up @@ -99,15 +99,13 @@ export interface WorkerOptions extends QueueBaseOptions, SandboxedOptions {
skipLockRenewal?: boolean;

/**
*
* Number of seconds to long poll for jobs when the queue is empty.
*
* @default 5
*/
drainDelay?: number;

/**
*
* Duration of the lock for the job in milliseconds. The lock represents that
* a worker is processing the job. If the lock is lost, the job will be eventually
* be picked up by the stalled checker and move back to wait so that another worker
Expand Down
58 changes: 58 additions & 0 deletions tests/test_rate_limiter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -845,6 +845,64 @@ describe('Rate Limiter', function () {
await worker.close();
});
});

describe('when removing rate limit', () => {
it('should process jobs normally', async function () {
this.timeout(5000);

const numJobs = 2;
const dynamicLimit = 10000;
const duration = 1000;

const ttl = await queue.getRateLimitTtl();
expect(ttl).to.be.equal(-2);

const worker = new Worker(queueName, async () => {}, {
autorun: false,
connection,
prefix,
limiter: {
max: 1,
duration,
},
});

await worker.rateLimit(dynamicLimit);

await queue.removeRateLimitKey();
const result = new Promise<void>((resolve, reject) => {
queueEvents.on(
'completed',
// after every job has been completed
after(numJobs, async () => {
try {
const timeDiff = new Date().getTime() - startTime;
expect(timeDiff).to.be.gte((numJobs - 1) * duration);
expect(timeDiff).to.be.lte(numJobs * duration);
resolve();
} catch (err) {
reject(err);
}
}),
);

queueEvents.on('failed', async err => {
reject(err);
});
});

const startTime = new Date().getTime();
const jobs = Array.from(Array(numJobs).keys()).map(() => ({
name: 'rate test',
data: {},
}));
await queue.addBulk(jobs);

worker.run();
await result;
await worker.close();
});
});
});

describe('when there are more added jobs than max limiter', () => {
Expand Down
Loading