Skip to content

Commit

Permalink
feat(job): provide skip attempt option in moveToDelayed
Browse files Browse the repository at this point in the history
  • Loading branch information
roggervalf committed Sep 30, 2023
1 parent ba77e06 commit b2288f8
Show file tree
Hide file tree
Showing 4 changed files with 77 additions and 3 deletions.
8 changes: 6 additions & 2 deletions src/classes/job.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1001,8 +1001,12 @@ export class Job<
* @param token - token to check job is locked by current worker
* @returns
*/
moveToDelayed(timestamp: number, token?: string): Promise<void> {
return this.scripts.moveToDelayed(this.id, timestamp, token);
moveToDelayed(
timestamp: number,
token?: string,
skipAttempt?: boolean,
): Promise<void> {
return this.scripts.moveToDelayed(this.id, timestamp, token, skipAttempt);
}

/**
Expand Down
5 changes: 4 additions & 1 deletion src/classes/scripts.ts
Original file line number Diff line number Diff line change
Expand Up @@ -574,6 +574,7 @@ export class Scripts {
jobId: string,
timestamp: number,
token: string,
skipAttempt?: boolean,
): (string | number)[] {
//
// Bake in the job id first 12 bits into the timestamp
Expand Down Expand Up @@ -609,6 +610,7 @@ export class Scripts {
JSON.stringify(timestamp),
jobId,
token,
skipAttempt ? '1' : '0',
]);
}

Expand Down Expand Up @@ -649,10 +651,11 @@ export class Scripts {
jobId: string,
timestamp: number,
token = '0',
skipAttempt = false,
): Promise<void> {
const client = await this.queue.client;

const args = this.moveToDelayedArgs(jobId, timestamp, token);
const args = this.moveToDelayedArgs(jobId, timestamp, token, skipAttempt);
const result = await (<any>client).moveToDelayed(args);
if (result < 0) {
throw this.finishedErrors(result, jobId, 'moveToDelayed', 'active');
Expand Down
5 changes: 5 additions & 0 deletions src/commands/moveToDelayed-8.lua
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
ARGV[3] delayedTimestamp
ARGV[4] the id of the job
ARGV[5] queue token
ARGV[6] skip attempt
Output:
0 - OK
Expand Down Expand Up @@ -53,6 +54,10 @@ if rcall("EXISTS", jobKey) == 1 then
return -3
end

if ARGV[6] == "1" then
rcall("HINCRBY", jobKey, "attemptsMade", -1)
end

rcall("ZADD", delayedKey, score, jobId)
rcall("XADD", KEYS[6], "*", "event", "delayed", "jobId", jobId, "delay", delayedTimestamp)

Expand Down
62 changes: 62 additions & 0 deletions tests/test_worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2464,6 +2464,68 @@ describe('workers', function () {

await worker.close();
});

describe('when skip attempt option is provided', () => {
it('should retry job after a delay time, whithout incrementing attemptMade', async function () {
this.timeout(8000);

enum Step {
Initial,
Second,
Finish,
}

const worker = new Worker(
queueName,
async (job, token) => {
let step = job.data.step;
while (step !== Step.Finish) {
switch (step) {
case Step.Initial: {
await job.moveToDelayed(Date.now() + 200, token, true);
await job.updateData({
step: Step.Second,
});
throw new DelayedError();
}
case Step.Second: {
await job.updateData({
step: Step.Finish,
});
step = Step.Finish;
return Step.Finish;
}
default: {
throw new Error('invalid step');
}
}
}
},
{ connection },
);

await worker.waitUntilReady();

const start = Date.now();
await queue.add('test', { step: Step.Initial });

await new Promise<void>((resolve, reject) => {
worker.on('completed', job => {
const elapse = Date.now() - start;
expect(elapse).to.be.greaterThan(200);
expect(job.returnvalue).to.be.eql(Step.Finish);
expect(job.attemptsMade).to.be.eql(1);
resolve();
});

worker.on('error', () => {
reject();
});
});

await worker.close();
});
});
});

describe('when creating children at runtime', () => {
Expand Down

0 comments on commit b2288f8

Please sign in to comment.