Skip to content

Commit

Permalink
feat(queue): add removeRateLimitKey method
Browse files Browse the repository at this point in the history
  • Loading branch information
roggervalf committed Oct 9, 2024
1 parent f0cb07e commit 56533c7
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 8 deletions.
21 changes: 15 additions & 6 deletions src/classes/queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -485,12 +485,21 @@ export class Queue<
*
* @param id - identifier
*/
async removeDeduplicationKey(id: string): Promise<number> {
const client = await this.client;

return client.del(`${this.keys.de}:${id}`);
}

async removeDeduplicationKey(id: string): Promise<number> {
const client = await this.client;

return client.del(`${this.keys.de}:${id}`);
}

/**
* Removes rate limit key.
*/
async removeRateLimitKey(): Promise<number> {
const client = await this.client;

return client.del(this.keys.limiter);
}

/**
* Removes a repeatable job by its key. Note that the key is the one used
* to store the repeatable job metadata and not one of the job iterations
Expand Down
2 changes: 0 additions & 2 deletions src/interfaces/worker-options.ts
Original file line number Diff line number Diff line change
Expand Up @@ -99,15 +99,13 @@ export interface WorkerOptions extends QueueBaseOptions, SandboxedOptions {
skipLockRenewal?: boolean;

/**
*
* Number of seconds to long poll for jobs when the queue is empty.
*
* @default 5
*/
drainDelay?: number;

/**
*
* Duration of the lock for the job in milliseconds. The lock represents that
* a worker is processing the job. If the lock is lost, the job will be eventually
* be picked up by the stalled checker and move back to wait so that another worker
Expand Down
58 changes: 58 additions & 0 deletions tests/test_rate_limiter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -845,6 +845,64 @@ describe('Rate Limiter', function () {
await worker.close();
});
});

describe('when removing rate limit', () => {
it('should process jobs normally', async function () {
this.timeout(5000);

const numJobs = 2;
const dynamicLimit = 10000;
const duration = 1000;

const ttl = await queue.getRateLimitTtl();
expect(ttl).to.be.equal(-2);

const worker = new Worker(queueName, async () => {}, {
autorun: false,
connection,
prefix,
limiter: {
max: 1,
duration,
},
});

await worker.rateLimit(dynamicLimit);

await queue.removeRateLimitKey();
const result = new Promise<void>((resolve, reject) => {
queueEvents.on(
'completed',
// after every job has been completed
after(numJobs, async () => {
try {
const timeDiff = new Date().getTime() - startTime;
expect(timeDiff).to.be.gte((numJobs - 1) * duration);
expect(timeDiff).to.be.lte(numJobs * duration);
resolve();
} catch (err) {
reject(err);
}
}),
);

queueEvents.on('failed', async err => {
reject(err);
});
});

const startTime = new Date().getTime();
const jobs = Array.from(Array(numJobs).keys()).map(() => ({
name: 'rate test',
data: {},
}));
await queue.addBulk(jobs);

worker.run();
await result;
await worker.close();
});
});
});

describe('when there are more added jobs than max limiter', () => {
Expand Down

0 comments on commit 56533c7

Please sign in to comment.