Skip to content

Commit

Permalink
fix: update delay job property when moving to delayed set (#2261)
Browse files Browse the repository at this point in the history
fix(python): update delay job property when moving to delayed set
  • Loading branch information
manast authored Nov 5, 2023
1 parent 0f0c351 commit 69ece08
Show file tree
Hide file tree
Showing 6 changed files with 54 additions and 7 deletions.
3 changes: 2 additions & 1 deletion python/bullmq/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,8 @@ async def moveToFailed(self, err, token:str, fetchNext:bool = False):
keys, args = self.scripts.moveToDelayedArgs(
self.id,
round(time.time() * 1000) + delay,
token
token,
delay
)

await self.scripts.commands["moveToDelayed"](keys=keys, args=args, client=pipe)
Expand Down
8 changes: 4 additions & 4 deletions python/bullmq/scripts.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ def retryJobArgs(self, job_id: str, lifo: bool, token: str):

return (keys, args)

def moveToDelayedArgs(self, job_id: str, timestamp: int, token: str):
def moveToDelayedArgs(self, job_id: str, timestamp: int, token: str, delay: int = 0):
max_timestamp = max(0, timestamp or 0)

if timestamp > 0:
Expand All @@ -231,12 +231,12 @@ def moveToDelayedArgs(self, job_id: str, timestamp: int, token: str):
keys.append(self.keys['meta'])

args = [self.keys[''], round(time.time() * 1000), str(max_timestamp),
job_id, token]
job_id, token, delay]

return (keys, args)

async def moveToDelayed(self, job_id: str, timestamp: int, token: str = "0"):
keys, args = self.moveToDelayedArgs(job_id, timestamp, token)
async def moveToDelayed(self, job_id: str, timestamp: int, delay: int, token: str = "0"):
keys, args = self.moveToDelayedArgs(job_id, timestamp, token, delay)

result = await self.commands["moveToDelayed"](keys=keys, args=args)

Expand Down
9 changes: 8 additions & 1 deletion src/classes/job.ts
Original file line number Diff line number Diff line change
Expand Up @@ -642,6 +642,7 @@ export class Job<
this.id,
Date.now() + delay,
token,
delay,
);
(<any>multi).moveToDelayed(args);
command = 'delayed';
Expand Down Expand Up @@ -1021,7 +1022,13 @@ export class Job<
* @returns
*/
moveToDelayed(timestamp: number, token?: string): Promise<void> {
return this.scripts.moveToDelayed(this.id, timestamp, token);
const delay = timestamp - Date.now();
return this.scripts.moveToDelayed(
this.id,
timestamp,
delay > 0 ? delay : 0,
token,
);
}

/**
Expand Down
5 changes: 4 additions & 1 deletion src/classes/scripts.ts
Original file line number Diff line number Diff line change
Expand Up @@ -572,6 +572,7 @@ export class Scripts {
jobId: string,
timestamp: number,
token: string,
delay: number,
): (string | number)[] {
//
// Bake in the job id first 12 bits into the timestamp
Expand Down Expand Up @@ -607,6 +608,7 @@ export class Scripts {
JSON.stringify(timestamp),
jobId,
token,
delay,
]);
}

Expand Down Expand Up @@ -646,11 +648,12 @@ export class Scripts {
async moveToDelayed(
jobId: string,
timestamp: number,
delay: number,
token = '0',
): Promise<void> {
const client = await this.queue.client;

const args = this.moveToDelayedArgs(jobId, timestamp, token);
const args = this.moveToDelayedArgs(jobId, timestamp, token, delay);
const result = await (<any>client).moveToDelayed(args);
if (result < 0) {
throw this.finishedErrors(result, jobId, 'moveToDelayed', 'active');
Expand Down
3 changes: 3 additions & 0 deletions src/commands/moveToDelayed-8.lua
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
ARGV[3] delayedTimestamp
ARGV[4] the id of the job
ARGV[5] queue token
ARGV[6] delay value
Output:
0 - OK
Expand Down Expand Up @@ -53,6 +54,8 @@ if rcall("EXISTS", jobKey) == 1 then
return -3
end

rcall("HSET", jobKey, "delay", ARGV[6])

local maxEvents = rcall("HGET", KEYS[8], "opts.maxLenEvents") or 10000

rcall("ZADD", delayedKey, score, jobId)
Expand Down
33 changes: 33 additions & 0 deletions tests/test_worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1984,6 +1984,39 @@ describe('workers', function () {
});

describe('Retries and backoffs', () => {
it("updates job's delay property if it fails and backoff is set", async () => {
const worker = new Worker(
queueName,
async () => {
throw new Error('error');
},
{ connection, prefix },
);
await worker.waitUntilReady();

const job = await queue.add(
'test',
{ bar: 'baz' },
{ attempts: 3, backoff: 300 },
);

const failed = new Promise<void>((resolve, reject) => {
worker.once('failed', async () => {
try {
const gotJob = await queue.getJob(job.id!);
expect(gotJob!.delay).to.be.eql(300);
resolve();
} catch (err) {
reject(err);
}
});
});

await failed;

await worker.close();
});

it('deletes token after moving jobs to delayed', async function () {
const worker = new Worker(
queueName,
Expand Down

0 comments on commit 69ece08

Please sign in to comment.