Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(queue): add removeRateLimitKey method #2806

Merged
merged 5 commits into from
Nov 19, 2024
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
feat(queue): add removeRateLimitKey method
roggervalf committed Oct 9, 2024
commit 56533c79721d845ceb534eecf912cf713bcad4f1
21 changes: 15 additions & 6 deletions src/classes/queue.ts
Original file line number Diff line number Diff line change
@@ -485,12 +485,21 @@ export class Queue<
*
* @param id - identifier
*/
async removeDeduplicationKey(id: string): Promise<number> {
const client = await this.client;

return client.del(`${this.keys.de}:${id}`);
}

async removeDeduplicationKey(id: string): Promise<number> {
const client = await this.client;

return client.del(`${this.keys.de}:${id}`);
}

/**
* Removes rate limit key.
*/
async removeRateLimitKey(): Promise<number> {
const client = await this.client;

return client.del(this.keys.limiter);
}

/**
* Removes a repeatable job by its key. Note that the key is the one used
* to store the repeatable job metadata and not one of the job iterations
2 changes: 0 additions & 2 deletions src/interfaces/worker-options.ts
Original file line number Diff line number Diff line change
@@ -99,15 +99,13 @@ export interface WorkerOptions extends QueueBaseOptions, SandboxedOptions {
skipLockRenewal?: boolean;

/**
*
* Number of seconds to long poll for jobs when the queue is empty.
*
* @default 5
*/
drainDelay?: number;

/**
*
* Duration of the lock for the job in milliseconds. The lock represents that
* a worker is processing the job. If the lock is lost, the job will be eventually
* be picked up by the stalled checker and move back to wait so that another worker
58 changes: 58 additions & 0 deletions tests/test_rate_limiter.ts
Original file line number Diff line number Diff line change
@@ -845,6 +845,64 @@ describe('Rate Limiter', function () {
await worker.close();
});
});

describe('when removing rate limit', () => {
it('should process jobs normally', async function () {
this.timeout(5000);

const numJobs = 2;
const dynamicLimit = 10000;
const duration = 1000;

const ttl = await queue.getRateLimitTtl();
expect(ttl).to.be.equal(-2);

const worker = new Worker(queueName, async () => {}, {
autorun: false,
connection,
prefix,
limiter: {
max: 1,
duration,
},
});

await worker.rateLimit(dynamicLimit);

await queue.removeRateLimitKey();
const result = new Promise<void>((resolve, reject) => {
queueEvents.on(
'completed',
// after every job has been completed
after(numJobs, async () => {
try {
const timeDiff = new Date().getTime() - startTime;
expect(timeDiff).to.be.gte((numJobs - 1) * duration);
expect(timeDiff).to.be.lte(numJobs * duration);
resolve();
} catch (err) {
reject(err);
}
}),
);

queueEvents.on('failed', async err => {
reject(err);
});
});

const startTime = new Date().getTime();
const jobs = Array.from(Array(numJobs).keys()).map(() => ({
name: 'rate test',
data: {},
}));
await queue.addBulk(jobs);

worker.run();
await result;
await worker.close();
});
});
});

describe('when there are more added jobs than max limiter', () => {