Skip to content

Commit

Permalink
fix: guarantee every repeatable jobs are slotted
Browse files Browse the repository at this point in the history
  • Loading branch information
manast committed Dec 13, 2024
1 parent 3b218ff commit 9917df1
Show file tree
Hide file tree
Showing 3 changed files with 236 additions and 65 deletions.
28 changes: 21 additions & 7 deletions src/classes/job-scheduler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,13 @@ export class JobScheduler extends QueueBase {

async upsertJobScheduler<T = any, R = any, N extends string = string>(
jobSchedulerId: string,
repeatOpts: Omit<RepeatOptions, 'key' | 'prevMillis' | 'offset'>,
repeatOpts: Omit<RepeatOptions, 'key' | 'prevMillis'>,
jobName: N,
jobData: T,
opts: JobSchedulerTemplateOptions,
{ override }: { override: boolean },
): Promise<Job<T, R, N> | undefined> {
const { every, pattern } = repeatOpts;
const { every, pattern, offset } = repeatOpts;

if (pattern && every) {
throw new Error(
Expand All @@ -59,6 +59,12 @@ export class JobScheduler extends QueueBase {
);
}

if (repeatOpts.immediately && repeatOpts.every) {
console.warn(
"Using option immediately with every does not affect the job's schedule. Job will run immediately anyway.",
);
}

// Check if we reached the limit of the repeatable job's iterations
const iterationCount = repeatOpts.count ? repeatOpts.count + 1 : 1;
if (
Expand All @@ -75,24 +81,32 @@ export class JobScheduler extends QueueBase {
return;
}

const prevMillis = opts.prevMillis || 0;

// Check if we have a start date for the repeatable job
const { startDate, immediately, ...filteredRepeatOpts } = repeatOpts;
if (startDate) {
const startMillis = new Date(startDate).getTime();
now = startMillis > now ? startMillis : now;
}

const prevMillis = opts.prevMillis || 0;
now = prevMillis < now ? now : prevMillis;

let nextMillis: number;
let newOffset = offset;

if (every) {
nextMillis = prevMillis + every;
const nextSlot = Math.floor(now / every) * every + every;
if (prevMillis || offset) {
nextMillis = nextSlot + (offset || 0);
} else {
nextMillis = now;
newOffset = every - (nextSlot - now);
}

if (nextMillis < now) {
nextMillis = now;
}
} else if (pattern) {
now = prevMillis < now ? now : prevMillis;
nextMillis = await this.repeatStrategy(now, repeatOpts, jobName);
}

Expand Down Expand Up @@ -149,7 +163,7 @@ export class JobScheduler extends QueueBase {
jobSchedulerId,
{
...opts,
repeat: filteredRepeatOpts,
repeat: { ...filteredRepeatOpts, offset: newOffset },
telemetry,
},
jobData,
Expand Down
15 changes: 7 additions & 8 deletions src/interfaces/repeat-options.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ export interface RepeatOptions extends Omit<ParserOptions, 'iterator'> {

/**
* Repeated job should start right now
* ( work only with every settings)
* ( work only with cron settings)
*/
immediately?: boolean;

Expand All @@ -42,16 +42,15 @@ export interface RepeatOptions extends Omit<ParserOptions, 'iterator'> {
count?: number;

/**
* Internal property to store the previous time the job was executed.
*/
prevMillis?: number;
* Offset in milliseconds to affect the next iteration time
*
* */
offset?: number;

/**
* Internal property to store the offset to apply to the next iteration.
*
* @deprecated
* Internal property to store the previous time the job was executed.
*/
offset?: number;
prevMillis?: number;

/**
* Internal property to store de job id
Expand Down
Loading

0 comments on commit 9917df1

Please sign in to comment.