Skip to content

Commit

Permalink
fix(job-scheduler): avoid duplicated delayed jobs when repeatable job…
Browse files Browse the repository at this point in the history
…s are retried
  • Loading branch information
manast committed Dec 13, 2024
1 parent 791ba26 commit af75315
Show file tree
Hide file tree
Showing 5 changed files with 77 additions and 21 deletions.
10 changes: 9 additions & 1 deletion src/classes/job-scheduler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ export class JobScheduler extends QueueBase {
jobName: N,
jobData: T,
opts: JobSchedulerTemplateOptions,
{ override }: { override: boolean },
{ override, producerId }: { override: boolean; producerId?: string },
): Promise<Job<T, R, N> | undefined> {
const { every, pattern, offset } = repeatOpts;

Expand Down Expand Up @@ -171,6 +171,7 @@ export class JobScheduler extends QueueBase {
},
jobData,
iterationCount,
producerId,
);

const results = await multi.exec(); // multi.exec returns an array of results [ err, result ][]
Expand Down Expand Up @@ -206,6 +207,8 @@ export class JobScheduler extends QueueBase {
opts: JobsOptions,
data: T,
currentCount: number,
// The job id of the job that produced this next iteration
producerId?: string,
) {
//
// Generate unique job id for this iteration.
Expand All @@ -232,6 +235,11 @@ export class JobScheduler extends QueueBase {
const job = new this.Job<T, R, N>(this, name, data, mergedOpts, jobId);
job.addJob(client);

if (producerId) {
const producerJobKey = this.toKey(producerId);
client.hset(producerJobKey, 'nrjid', job.id);
}

return job;
}

Expand Down
11 changes: 11 additions & 0 deletions src/classes/job.ts
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,12 @@ export class Job<
*/
repeatJobKey?: string;

/**
* Produced next repetable job Id.
*
*/
nextRepeatableJobId?: string;

/**
* The token used for locking this job.
*/
Expand Down Expand Up @@ -384,6 +390,10 @@ export class Job<
job.processedBy = json.pb;
}

if (json.nrjid) {
job.nextRepeatableJobId = json.nrjid;
}

return job;
}

Expand Down Expand Up @@ -493,6 +503,7 @@ export class Job<
deduplicationId: this.deduplicationId,
repeatJobKey: this.repeatJobKey,
returnvalue: JSON.stringify(this.returnvalue),
nrjid: this.nextRepeatableJobId,
});
}

Expand Down
10 changes: 6 additions & 4 deletions src/classes/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -788,7 +788,7 @@ will never work with more accuracy than 1ms. */
job.token = token;

// Add next scheduled job if necessary.
if (job.opts.repeat) {
if (job.opts.repeat && !job.nextRepeatableJobId) {
// Use new job scheduler if possible
if (job.repeatJobKey) {
const jobScheduler = await this.jobScheduler;
Expand All @@ -798,7 +798,7 @@ will never work with more accuracy than 1ms. */
job.name,
job.data,
job.opts,
{ override: false },
{ override: false, producerId: job.id },
);
} else {
const repeat = await this.repeat;
Expand Down Expand Up @@ -835,6 +835,8 @@ will never work with more accuracy than 1ms. */
});

const handleCompleted = async (result: ResultType) => {
jobsInProgress.delete(inProgressItem);

if (!this.connection.closing) {
const completed = await job.moveToCompleted(
result,
Expand All @@ -855,6 +857,8 @@ will never work with more accuracy than 1ms. */
};

const handleFailed = async (err: Error) => {
jobsInProgress.delete(inProgressItem);

if (!this.connection.closing) {
try {
// Check if the job was manually rate-limited
Expand Down Expand Up @@ -911,8 +915,6 @@ will never work with more accuracy than 1ms. */
[TelemetryAttributes.JobFinishedTimestamp]: Date.now(),
[TelemetryAttributes.JobProcessedTimestamp]: processedOn,
});

jobsInProgress.delete(inProgressItem);
}
},
srcPropagationMedatada,
Expand Down
2 changes: 2 additions & 0 deletions src/interfaces/job-json.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ export interface JobJson {
parent?: ParentKeys;
parentKey?: string;
repeatJobKey?: string;
nextRepeatableJobKey?: string;
debounceId?: string;
deduplicationId?: string;
processedBy?: string;
Expand All @@ -41,6 +42,7 @@ export interface JobJsonRaw {
parent?: string;
deid?: string;
rjk?: string;
nrjid?: string;
atm?: string;
ats?: string;
pb?: string; // Worker name
Expand Down
65 changes: 49 additions & 16 deletions tests/test_job_scheduler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1636,20 +1636,24 @@ describe('Job Scheduler', function () {

let isFirstRun = true;

const worker = new Worker(
queueName,
async () => {
this.clock.tick(177);
if (isFirstRun) {
isFirstRun = false;
throw new Error('failed');
}
},
{
connection,
prefix,
},
);
let worker;
const processingAfterFailing = new Promise<void>(resolve => {
worker = new Worker(
queueName,
async () => {
this.clock.tick(177);
if (isFirstRun) {
isFirstRun = false;
throw new Error('failed');
}
resolve();
},
{
connection,
prefix,
},
);
});

const failing = new Promise<void>(resolve => {
worker.on('failed', async () => {
Expand All @@ -1658,26 +1662,40 @@ describe('Job Scheduler', function () {
});

const repeatableJob = await queue.upsertJobScheduler('test', repeatOpts);
const delayedCount = await queue.getDelayedCount();
expect(delayedCount).to.be.equal(1);

await repeatableJob!.promote();

const delayedCountBeforeFailing = await queue.getDelayedCount();
expect(delayedCountBeforeFailing).to.be.equal(0);

await failing;

const failedCount = await queue.getFailedCount();
expect(failedCount).to.be.equal(1);

const delayedCountAfterFailing = await queue.getDelayedCount();
expect(delayedCountAfterFailing).to.be.equal(1);

// Retry the failed job
this.clock.tick(1143);
await queue.retryJobs({ state: 'failed' });
const failedCountAfterRetry = await queue.getFailedCount();
expect(failedCountAfterRetry).to.be.equal(0);

await processingAfterFailing;

await worker.close();

const delayedCount2 = await queue.getDelayedCount();
expect(delayedCount2).to.be.equal(1);

const waitingCount = await queue.getWaitingCount();
expect(waitingCount).to.be.equal(0);
});

it('should not create a new delayed job if the failed job is retried with Job.retry()', async function () {
let expectError;

const date = new Date('2017-02-07 9:24:00');
this.clock.setSystemTime(date);

Expand All @@ -1692,6 +1710,13 @@ describe('Job Scheduler', function () {
async () => {
this.clock.tick(177);

try {
const delayedCount = await queue.getDelayedCount();
expect(delayedCount).to.be.equal(1);
} catch (error) {
expectError = error;
}

if (isFirstRun) {
isFirstRun = false;
throw new Error('failed');
Expand Down Expand Up @@ -1731,6 +1756,14 @@ describe('Job Scheduler', function () {
const failedCountAfterRetry = await queue.getFailedCount();
expect(failedCountAfterRetry).to.be.equal(0);

this.clock.tick(177);

await worker.close();

if (expectError) {
throw expectError;
}

const delayedCount2 = await queue.getDelayedCount();
expect(delayedCount2).to.be.equal(1);
});
Expand Down

0 comments on commit af75315

Please sign in to comment.