diff --git a/src/classes/job-scheduler.ts b/src/classes/job-scheduler.ts index 2e41591e45..b913af1405 100644 --- a/src/classes/job-scheduler.ts +++ b/src/classes/job-scheduler.ts @@ -33,13 +33,13 @@ export class JobScheduler extends QueueBase { async upsertJobScheduler( jobSchedulerId: string, - repeatOpts: Omit, + repeatOpts: Omit, jobName: N, jobData: T, opts: JobSchedulerTemplateOptions, { override }: { override: boolean }, ): Promise | undefined> { - const { every, pattern } = repeatOpts; + const { every, pattern, offset } = repeatOpts; if (pattern && every) { throw new Error( @@ -59,6 +59,12 @@ export class JobScheduler extends QueueBase { ); } + if (repeatOpts.immediately && repeatOpts.every) { + console.warn( + "Using option immediately with every does not affect the job's schedule. Job will run immediately anyway.", + ); + } + // Check if we reached the limit of the repeatable job's iterations const iterationCount = repeatOpts.count ? repeatOpts.count + 1 : 1; if ( @@ -75,8 +81,6 @@ export class JobScheduler extends QueueBase { return; } - const prevMillis = opts.prevMillis || 0; - // Check if we have a start date for the repeatable job const { startDate, immediately, ...filteredRepeatOpts } = repeatOpts; if (startDate) { @@ -84,15 +88,25 @@ export class JobScheduler extends QueueBase { now = startMillis > now ? startMillis : now; } + const prevMillis = opts.prevMillis || 0; + now = prevMillis < now ? now : prevMillis; + let nextMillis: number; + let newOffset = offset; + if (every) { - nextMillis = prevMillis + every; + const nextSlot = Math.floor(now / every) * every + every; + if (prevMillis || offset) { + nextMillis = nextSlot + (offset || 0); + } else { + nextMillis = now; + newOffset = every - (nextSlot - now); + } if (nextMillis < now) { nextMillis = now; } } else if (pattern) { - now = prevMillis < now ? now : prevMillis; nextMillis = await this.repeatStrategy(now, repeatOpts, jobName); } @@ -149,7 +163,7 @@ export class JobScheduler extends QueueBase { jobSchedulerId, { ...opts, - repeat: filteredRepeatOpts, + repeat: { ...filteredRepeatOpts, offset: newOffset }, telemetry, }, jobData, diff --git a/src/interfaces/repeat-options.ts b/src/interfaces/repeat-options.ts index fe71d27d12..65c5da3d4d 100644 --- a/src/interfaces/repeat-options.ts +++ b/src/interfaces/repeat-options.ts @@ -32,7 +32,7 @@ export interface RepeatOptions extends Omit { /** * Repeated job should start right now - * ( work only with every settings) + * ( work only with cron settings) */ immediately?: boolean; @@ -42,16 +42,15 @@ export interface RepeatOptions extends Omit { count?: number; /** - * Internal property to store the previous time the job was executed. - */ - prevMillis?: number; + * Offset in milliseconds to affect the next iteration time + * + * */ + offset?: number; /** - * Internal property to store the offset to apply to the next iteration. - * - * @deprecated + * Internal property to store the previous time the job was executed. */ - offset?: number; + prevMillis?: number; /** * Internal property to store de job id diff --git a/tests/test_job_scheduler.ts b/tests/test_job_scheduler.ts index 427cb99cee..a3288ddee8 100644 --- a/tests/test_job_scheduler.ts +++ b/tests/test_job_scheduler.ts @@ -191,6 +191,9 @@ describe('Job Scheduler', function () { describe('when job schedulers have same id and different every pattern', function () { it('should create only one job scheduler', async function () { + const date = new Date('2017-02-07 9:24:00'); + this.clock.setSystemTime(date); + await Promise.all([ queue.upsertJobScheduler('test-scheduler1', { every: 1000 }), queue.upsertJobScheduler('test-scheduler1', { every: 2000 }), @@ -244,6 +247,9 @@ describe('Job Scheduler', function () { }); it('should create job schedulers with different cron patterns', async function () { + const date = new Date('2017-02-07T15:24:00.000Z'); + this.clock.setSystemTime(date); + const crons = [ '10 * * * * *', '2 10 * * * *', @@ -254,11 +260,11 @@ describe('Job Scheduler', function () { await Promise.all([ queue.upsertJobScheduler('first', { pattern: crons[0], - endDate: 12345, + endDate: Date.now() + 12345, }), queue.upsertJobScheduler('second', { pattern: crons[1], - endDate: 610000, + endDate: Date.now() + 6100000, }), queue.upsertJobScheduler('third', { pattern: crons[2], @@ -273,9 +279,13 @@ describe('Job Scheduler', function () { tz: 'Europa/Copenhaguen', }), ]); + const count = await repeat.getRepeatableCount(); expect(count).to.be.eql(5); + const delayedCount = await queue.getDelayedCount(); + expect(delayedCount).to.be.eql(5); + const jobs = await repeat.getRepeatableJobs(0, -1, true); expect(jobs) @@ -288,25 +298,25 @@ describe('Job Scheduler', function () { tz: 'Europa/Copenhaguen', pattern: null, every: '5000', - next: 5000, + next: 1486481040000, }) .and.to.deep.include({ key: 'first', name: 'first', - endDate: 12345, + endDate: Date.now() + 12345, tz: null, pattern: '10 * * * * *', every: null, - next: 10000, + next: 1486481050000, }) .and.to.deep.include({ key: 'second', name: 'second', - endDate: 610000, + endDate: Date.now() + 6100000, tz: null, pattern: '2 10 * * * *', every: null, - next: 602000, + next: 1486483802000, }) .and.to.deep.include({ key: 'fourth', @@ -315,7 +325,7 @@ describe('Job Scheduler', function () { tz: 'Africa/Accra', pattern: '2 * * 4 * *', every: null, - next: 259202000, + next: 1488585602000, }) .and.to.deep.include({ key: 'third', @@ -324,7 +334,7 @@ describe('Job Scheduler', function () { tz: 'Africa/Abidjan', pattern: '1 * * 5 * *', every: null, - next: 345601000, + next: 1488672001000, }); }); @@ -773,54 +783,177 @@ describe('Job Scheduler', function () { }); }); - it('should repeat every 2 seconds and start immediately', async function () { - const date = new Date('2017-02-07 9:24:00'); - this.clock.setSystemTime(date); - const nextTick = 2 * ONE_SECOND; + describe("when using 'every' option is on same millis as iteration time", function () { + it('should repeat every 2 seconds and start immediately', async function () { + const date = new Date('2017-02-07 9:24:00'); + this.clock.setSystemTime(date); + const nextTick = 2 * ONE_SECOND; - const worker = new Worker( - queueName, - async () => { - this.clock.tick(nextTick); - }, - { connection, prefix }, - ); + const worker = new Worker( + queueName, + async () => { + this.clock.tick(nextTick); + }, + { connection, prefix }, + ); - let prev: Job; - let counter = 0; + let prev: Job; + let counter = 0; - const completing = new Promise((resolve, reject) => { - worker.on('completed', async job => { - try { - if (prev && counter === 1) { - expect(prev.timestamp).to.be.lte(job.timestamp); - expect(job.timestamp - prev.timestamp).to.be.lte(1); - } else if (prev) { - expect(prev.timestamp).to.be.lt(job.timestamp); - expect(job.timestamp - prev.timestamp).to.be.gte(2000); + const completing = new Promise((resolve, reject) => { + worker.on('completed', async job => { + try { + if (prev && counter === 1) { + expect(prev.timestamp).to.be.lte(job.timestamp); + expect(job.timestamp - prev.timestamp).to.be.lte(1); + } else if (prev) { + expect(prev.timestamp).to.be.lt(job.timestamp); + expect(job.timestamp - prev.timestamp).to.be.eq(2000); + } + prev = job; + counter++; + if (counter === 5) { + resolve(); + } + } catch (err) { + reject(err); } - prev = job; - counter++; - if (counter === 5) { - resolve(); + }); + }); + + await queue.upsertJobScheduler( + 'repeat', + { + every: 2000, + }, + { data: { foo: 'bar' } }, + ); + + const delayedCountBefore = await queue.getDelayedCount(); + expect(delayedCountBefore).to.be.eq(1); + + await completing; + + const waitingCount = await queue.getWaitingCount(); + expect(waitingCount).to.be.eq(0); + + const delayedCountAfter = await queue.getDelayedCount(); + expect(delayedCountAfter).to.be.eq(1); + + await worker.close(); + }); + }); + + describe("when using 'every' and time is one millisecond before iteration time", function () { + it('should repeat every 2 seconds and start immediately', async function () { + const startTimeMillis = new Date('2017-02-07 9:24:00').getTime(); + + const date = new Date(startTimeMillis - 1); + this.clock.setSystemTime(date); + const nextTick = 2 * ONE_SECOND; + + const worker = new Worker( + queueName, + async () => { + this.clock.tick(nextTick); + }, + { connection, prefix }, + ); + + let prev: Job; + let counter = 0; + + const completing = new Promise((resolve, reject) => { + worker.on('completed', async job => { + try { + if (prev && counter === 1) { + expect(prev.timestamp).to.be.lte(job.timestamp); + expect(job.timestamp - prev.timestamp).to.be.lte(1); + } else if (prev) { + expect(prev.timestamp).to.be.lt(job.timestamp); + expect(job.timestamp - prev.timestamp).to.be.eq(2000); + } + + prev = job; + counter++; + if (counter === 5) { + resolve(); + } + } catch (err) { + reject(err); } - } catch (err) { - reject(err); - } + }); }); + + await queue.upsertJobScheduler( + 'repeat', + { + every: 2000, + }, + { data: { foo: 'bar' } }, + ); + + await completing; + + await worker.close(); }); + }); - await queue.upsertJobScheduler( - 'repeat', - { - every: 2000, - }, - { data: { foo: 'bar' } }, - ); + describe("when using 'every' and time is one millisecond after iteration time", function () { + it('should repeat every 2 seconds and start immediately', async function () { + const startTimeMillis = new Date('2017-02-07 9:24:00').getTime() + 1; - await completing; + const date = new Date(startTimeMillis); + this.clock.setSystemTime(date); + const nextTick = 2 * ONE_SECOND; - await worker.close(); + const worker = new Worker( + queueName, + async () => { + this.clock.tick(nextTick); + }, + { connection, prefix }, + ); + + let prev: Job; + let counter = 0; + + const completing = new Promise((resolve, reject) => { + worker.on('completed', async job => { + try { + if (prev && counter === 1) { + expect(prev.timestamp).to.be.lte(job.timestamp); + expect(job.timestamp - prev.timestamp).to.be.lte(1); + } else if (prev) { + expect(prev.timestamp).to.be.lt(job.timestamp); + expect(job.timestamp - prev.timestamp).to.be.eq(2000); + } + + prev = job; + counter++; + if (counter === 5) { + resolve(); + } + } catch (err) { + reject(err); + } + }); + }); + + await queue.upsertJobScheduler( + 'repeat', + { + every: 2000, + }, + { data: { foo: 'bar' } }, + ); + + //this.clock.tick(1000); + + await completing; + + await worker.close(); + }); }); it('should start immediately even after removing the job scheduler and adding it again', async function () { @@ -850,7 +983,6 @@ describe('Job Scheduler', function () { 'repeat', { every: 2000, - immediately: true, }, { data: { foo: 'bar' } }, ); @@ -884,7 +1016,6 @@ describe('Job Scheduler', function () { 'repeat', { every: 2000, - immediately: true, }, { data: { foo: 'bar' } }, ); @@ -1196,12 +1327,15 @@ describe('Job Scheduler', function () { }); }); - await queue.upsertJobScheduler('repeat', { + const job = await queue.upsertJobScheduler('repeat', { pattern: '0 1 * * *', endDate: new Date('2017-05-10 13:13:00'), tz: 'Europe/Athens', utc: true, }); + + expect(job).to.be.ok; + this.clock.tick(nextTick + delay); worker.run(); @@ -1495,6 +1629,9 @@ describe('Job Scheduler', function () { }); it('should not create a new delayed job if the failed job is retried with retryJobs', async function () { + const date = new Date('2017-02-07 9:24:00'); + this.clock.setSystemTime(date); + const repeatOpts = { every: 579, }; @@ -1543,6 +1680,9 @@ describe('Job Scheduler', function () { }); it('should not create a new delayed job if the failed job is retried with Job.retry()', async function () { + const date = new Date('2017-02-07 9:24:00'); + this.clock.setSystemTime(date); + const repeatOpts = { every: 477, }; @@ -1884,6 +2024,9 @@ describe('Job Scheduler', function () { }).timeout(8000); it('should not allow to remove a delayed job if it belongs to a repeatable job', async function () { + const date = new Date('2019-07-13 1:58:23'); + this.clock.setSystemTime(date); + const repeat = { every: 1000, }; @@ -1902,6 +2045,9 @@ describe('Job Scheduler', function () { }); it('should not remove delayed jobs if they belong to a repeatable job when using drain', async function () { + const date = new Date('2014-09-03 5:32:12'); + this.clock.setSystemTime(date); + await queue.upsertJobScheduler('myTestJob', { every: 5000 }); await queue.add('test', { foo: 'bar' }, { delay: 1000 }); @@ -1919,6 +2065,9 @@ describe('Job Scheduler', function () { }); it('should not remove delayed jobs if they belong to a repeatable job when using clean', async function () { + const date = new Date('2012-08-05 2:32:12'); + this.clock.setSystemTime(date); + await queue.upsertJobScheduler('myTestJob', { every: 5000 }); await queue.add('test', { foo: 'bar' }, { delay: 1000 }); @@ -1936,6 +2085,9 @@ describe('Job Scheduler', function () { }); it("should keep one delayed job if updating a repeatable job's every option", async function () { + const date = new Date('2022-01-08 7:22:21'); + this.clock.setSystemTime(date); + await queue.upsertJobScheduler('myTestJob', { every: 5000 }); await queue.upsertJobScheduler('myTestJob', { every: 4000 }); await queue.upsertJobScheduler('myTestJob', { every: 5000 }); @@ -2175,6 +2327,9 @@ describe('Job Scheduler', function () { }); it("should return a valid job with the job's options and data passed as the job template", async function () { + const date = new Date('2017-02-07 9:24:00'); + this.clock.setSystemTime(date); + const repeatOpts = { every: 1000, }; @@ -2224,6 +2379,9 @@ describe('Job Scheduler', function () { }); it('should have the right count value', async function () { + const date = new Date('2017-02-07 9:24:00'); + this.clock.setSystemTime(date); + await queue.upsertJobScheduler('test', { every: 1000 }); this.clock.tick(ONE_SECOND + 100);