Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(queue-getters): consider passing maxJobs when calling getRateLimitTtl #2631

Merged
merged 1 commit into from
Jul 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions src/classes/queue-getters.ts
Original file line number Diff line number Diff line change
Expand Up @@ -103,13 +103,14 @@ export class QueueGetters<

/**
* Returns the time to live for a rate limited key in milliseconds.
* @param maxJobs - max jobs to be considered in rate limit state. If not passed
* it will return the remaining ttl without considering if max jobs is excedeed.
* @returns -2 if the key does not exist.
* -1 if the key exists but has no associated expire.
* @see {@link https://redis.io/commands/pttl/}
*/
async getRateLimitTtl(): Promise<number> {
const client = await this.client;
return client.pttl(this.keys.limiter);
async getRateLimitTtl(maxJobs?: number): Promise<number> {
return this.scripts.getRateLimitTtl(maxJobs);
}

/**
Expand Down Expand Up @@ -148,6 +149,7 @@ export class QueueGetters<
/**
* Get current job state.
*
* @param jobId - job identifier.
* @returns Returns one of these values:
* 'completed', 'failed', 'delayed', 'active', 'waiting', 'waiting-children', 'unknown'.
*/
Expand Down
13 changes: 13 additions & 0 deletions src/classes/scripts.ts
Original file line number Diff line number Diff line change
Expand Up @@ -933,6 +933,19 @@ export class Scripts {
}
}

getRateLimitTtlArgs(maxJobs?: number): (string | number)[] {
const keys: (string | number)[] = [this.queue.keys.limiter];

return keys.concat([maxJobs ?? '0']);
}

async getRateLimitTtl(maxJobs?: number): Promise<number> {
const client = await this.queue.client;

const args = this.getRateLimitTtlArgs(maxJobs);
return (<any>client).getRateLimitTtl(args);
}

/**
* Remove jobs in a specific state.
*
Expand Down
20 changes: 20 additions & 0 deletions src/commands/getRateLimitTtl-1.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
--[[
Get rate limit ttl

Input:
KEYS[1] 'limiter'

ARGV[1] maxJobs
]]

local rcall = redis.call

-- Includes
--- @include "includes/getRateLimitTTL"

local rateLimiterKey = KEYS[1]
if ARGV[1] ~= "0" then
return getRateLimitTTL(tonumber(ARGV[1]), rateLimiterKey)
else
return rcall("PTTL", rateLimiterKey)
end
122 changes: 122 additions & 0 deletions tests/test_rate_limiter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -486,6 +486,128 @@ describe('Rate Limiter', function () {
});
});

describe('when passing maxJobs when getting rate limit ttl', () => {
describe('when rate limit counter is lower than maxJobs', () => {
it('should returns 0', async function () {
this.timeout(4000);

const numJobs = 1;
const duration = 100;

const ttl = await queue.getRateLimitTtl();
expect(ttl).to.be.equal(-2);

const worker = new Worker(
queueName,
async job => {
if (job.attemptsStarted === 1) {
delay(50);
const currentTtl = await queue.getRateLimitTtl(2);
expect(currentTtl).to.be.equal(0);
}
},
{
connection,
prefix,
maxStalledCount: 0,
limiter: {
max: 2,
duration,
},
},
);

const result = new Promise<void>((resolve, reject) => {
queueEvents.on(
'completed',
// after every job has been completed
after(numJobs, async () => {
try {
resolve();
} catch (err) {
reject(err);
}
}),
);

queueEvents.on('failed', async err => {
await worker.close();
reject(err);
});
});

const jobs = Array.from(Array(numJobs).keys()).map(() => ({
name: 'rate test',
data: {},
}));
await queue.addBulk(jobs);

await result;
await worker.close();
});
});

describe('when rate limit counter is greater than maxJobs', () => {
it('should returns at least rate limit duration', async function () {
this.timeout(4000);

const numJobs = 10;
const duration = 100;

const ttl = await queue.getRateLimitTtl();
expect(ttl).to.be.equal(-2);

const worker = new Worker(
queueName,
async job => {
if (job.attemptsStarted === 1) {
delay(50);
const currentTtl = await queue.getRateLimitTtl(1);
expect(currentTtl).to.be.lessThanOrEqual(duration);
}
},
{
connection,
prefix,
maxStalledCount: 0,
limiter: {
max: 1,
duration,
},
},
);

const result = new Promise<void>((resolve, reject) => {
queueEvents.on(
'completed',
// after every job has been completed
after(numJobs, async () => {
try {
resolve();
} catch (err) {
reject(err);
}
}),
);

queueEvents.on('failed', async err => {
await worker.close();
reject(err);
});
});

const jobs = Array.from(Array(numJobs).keys()).map(() => ({
name: 'rate test',
data: {},
}));
await queue.addBulk(jobs);

await result;
await worker.close();
});
});
});

describe('when reaching max attempts and we want to move the job to failed', () => {
it('should throw Unrecoverable error', async function () {
const dynamicLimit = 550;
Expand Down
Loading