Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(job): set delay value on current job instance when it is retried #2266

Merged
merged 2 commits into from
Nov 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions python/bullmq/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ async def moveToFailed(self, err, token:str, fetchNext:bool = False):

move_to_failed = False
finished_on = 0
delay = 0
command = 'moveToFailed'

async with self.queue.redisConnection.conn.pipeline(transaction=True) as pipe:
Expand Down Expand Up @@ -137,6 +138,9 @@ async def moveToFailed(self, err, token:str, fetchNext:bool = False):
if finished_on and type(finished_on) == int:
self.finishedOn = finished_on

if delay and type(delay) == int:
self.delay = delay

async def saveStacktrace(self, pipe, err:str):
stacktrace = traceback.format_exc()
stackTraceLimit = self.opts.get("stackTraceLimit")
Expand Down
8 changes: 6 additions & 2 deletions src/classes/job.ts
Original file line number Diff line number Diff line change
Expand Up @@ -618,7 +618,7 @@ export class Job<
// Check if an automatic retry should be performed
//
let moveToFailed = false;
let finishedOn;
let finishedOn, delay;
if (
this.attemptsMade < this.opts.attempts &&
!this.discarded &&
Expand All @@ -627,7 +627,7 @@ export class Job<
const opts = queue.opts as WorkerOptions;

// Check if backoff is needed
const delay = await Backoffs.calculate(
delay = await Backoffs.calculate(
<BackoffOptions>this.opts.backoff,
this.attemptsMade,
err,
Expand Down Expand Up @@ -687,6 +687,10 @@ export class Job<
if (finishedOn && typeof finishedOn === 'number') {
this.finishedOn = finishedOn;
}

if (delay && typeof delay === 'number') {
this.delay = delay;
}
}

/**
Expand Down
55 changes: 49 additions & 6 deletions tests/test_worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2004,15 +2004,12 @@ describe('workers', function () {
);
await worker.waitUntilReady();

const job = await queue.add(
'test',
{ bar: 'baz' },
{ attempts: 3, backoff: 300 },
);
await queue.add('test', { bar: 'baz' }, { attempts: 3, backoff: 300 });

const failed = new Promise<void>((resolve, reject) => {
worker.once('failed', async () => {
worker.once('failed', async job => {
try {
expect(job?.delay).to.be.eql(300);
const gotJob = await queue.getJob(job.id!);
expect(gotJob!.delay).to.be.eql(300);
resolve();
Expand Down Expand Up @@ -2073,6 +2070,52 @@ describe('workers', function () {
await worker.close();
});

describe('when backoff type is exponential', () => {
it("updates job's delay property if it fails and backoff is set", async () => {
const worker = new Worker(
queueName,
async () => {
await delay(100);
throw new Error('error');
},
{ connection, prefix },
);
await worker.waitUntilReady();

await queue.add(
'test',
{ bar: 'baz' },
{
attempts: 3,
backoff: {
type: 'exponential',
delay: 200,
},
},
);

const failed = new Promise<void>((resolve, reject) => {
worker.on('failed', async job => {
try {
const attemptsMade = job?.attemptsMade;
if (attemptsMade! > 2) {
expect(job!.delay).to.be.eql(0);
const gotJob = await queue.getJob(job.id!);
expect(gotJob!.delay).to.be.eql(0);
resolve();
}
} catch (err) {
reject(err);
}
});
});

await failed;

await worker.close();
});
});

describe('when attempts is 1 and job fails', () => {
it('should execute job only once and emits retries-exhausted event', async () => {
const worker = new Worker(
Expand Down
Loading