diff --git a/src/classes/worker.ts b/src/classes/worker.ts index acc6f1d533..f9a2ce34ce 100644 --- a/src/classes/worker.ts +++ b/src/classes/worker.ts @@ -1042,7 +1042,6 @@ will never work with more accuracy than 1ms. */ } clearTimeout(this.extendLocksTimer); - //clearTimeout(this.stalledCheckTimer); this.stalledCheckStopper?.(); this.closed = true; @@ -1249,6 +1248,7 @@ will never work with more accuracy than 1ms. */ this.emit('stalled', jobId, 'active'); }); + // Todo: check if there any listeners on failed event const jobPromises: Promise>[] = []; for (let i = 0; i < failed.length; i++) { jobPromises.push( diff --git a/tests/test_job_scheduler.ts b/tests/test_job_scheduler.ts index f0f8ad5e4d..b676e711f4 100644 --- a/tests/test_job_scheduler.ts +++ b/tests/test_job_scheduler.ts @@ -1403,6 +1403,247 @@ describe('Job Scheduler', function () { }); }); + describe('when repeatable job fails', async function () { + it('should continue repeating', async function () { + const repeatOpts = { + pattern: '0 * 1 * *', + }; + + const worker = new Worker( + queueName, + async () => { + throw new Error('failed'); + }, + { + connection, + prefix, + }, + ); + + const failing = new Promise(resolve => { + worker.on('failed', () => { + resolve(); + }); + }); + + const repeatableJob = await queue.upsertJobScheduler('test', repeatOpts); + const delayedCount = await queue.getDelayedCount(); + expect(delayedCount).to.be.equal(1); + + await repeatableJob!.promote(); + await failing; + + const failedCount = await queue.getFailedCount(); + expect(failedCount).to.be.equal(1); + + const delayedCount2 = await queue.getDelayedCount(); + expect(delayedCount2).to.be.equal(1); + + const configs = await repeat.getRepeatableJobs(0, -1, true); + + expect(delayedCount).to.be.equal(1); + + const count = await queue.count(); + + expect(count).to.be.equal(1); + expect(configs).to.have.length(1); + await worker.close(); + }); + + it('should not create a new delayed job if the failed job is retried with retryJobs', async function () { + const repeatOpts = { + every: 579, + }; + + const worker = new Worker( + queueName, + async () => { + this.clock.tick(177); + throw new Error('failed'); + }, + { + connection, + prefix, + }, + ); + + const failing = new Promise(resolve => { + worker.on('failed', async () => { + resolve(); + }); + }); + + const repeatableJob = await queue.upsertJobScheduler('test', repeatOpts); + const delayedCount = await queue.getDelayedCount(); + expect(delayedCount).to.be.equal(1); + + await repeatableJob!.promote(); + await failing; + + const failedCount = await queue.getFailedCount(); + expect(failedCount).to.be.equal(1); + + // Retry the failed job + this.clock.tick(1143); + await queue.retryJobs({ state: 'failed' }); + const failedCountAfterRetry = await queue.getFailedCount(); + expect(failedCountAfterRetry).to.be.equal(0); + + const delayedCount2 = await queue.getDelayedCount(); + expect(delayedCount2).to.be.equal(1); + }); + + it('should not create a new delayed job if the failed job is retried with Job.retry()', async function () { + const repeatOpts = { + every: 477, + }; + + const worker = new Worker( + queueName, + async () => { + this.clock.tick(177); + throw new Error('failed'); + }, + { + connection, + prefix, + }, + ); + + const failing = new Promise(resolve => { + worker.on('failed', async () => { + resolve(); + }); + }); + + const repeatableJob = await queue.upsertJobScheduler('test', repeatOpts); + const delayedCount = await queue.getDelayedCount(); + expect(delayedCount).to.be.equal(1); + + await repeatableJob!.promote(); + + this.clock.tick(177); + + await failing; + + this.clock.tick(177); + + const failedJobs = await queue.getFailed(); + expect(failedJobs.length).to.be.equal(1); + + // Retry the failed job + const failedJob = await queue.getJob(failedJobs[0].id); + await failedJob!.retry(); + const failedCountAfterRetry = await queue.getFailedCount(); + expect(failedCountAfterRetry).to.be.equal(0); + + const delayedCount2 = await queue.getDelayedCount(); + expect(delayedCount2).to.be.equal(1); + }); + + it('should not create a new delayed job if the failed job is stalled and moved back to wait', async function () { + // Note, this test is expected to throw an exception like this: + // "Error: Missing lock for job repeat:test:1486455840000. moveToFinished" + const date = new Date('2017-02-07 9:24:00'); + this.clock.setSystemTime(date); + + const repeatOpts = { + every: 2000, + }; + + const repeatableJob = await queue.upsertJobScheduler('test', repeatOpts); + expect(repeatableJob).to.be.ok; + + const delayedCount = await queue.getDelayedCount(); + expect(delayedCount).to.be.equal(1); + + await repeatableJob!.promote(); + + let resolveCompleting: () => void; + const complettingJob = new Promise(resolve => { + resolveCompleting = resolve; + }); + + let worker: Worker; + const processing = new Promise(resolve => { + worker = new Worker( + queueName, + async () => { + resolve(); + return complettingJob; + }, + { + connection, + prefix, + skipLockRenewal: true, + skipStalledCheck: true, + }, + ); + }); + + await processing; + + // force remove the lock + const client = await queue.client; + const lockKey = `bull:${queueName}:${repeatableJob!.id}:lock`; + await client.del(lockKey); + + const stalledCheckerKey = `bull:${queueName}:stalled-check`; + await client.del(stalledCheckerKey); + + const scripts = (worker!).scripts; + let [failed, stalled] = await scripts.moveStalledJobsToWait(); + + await client.del(stalledCheckerKey); + + [failed, stalled] = await scripts.moveStalledJobsToWait(); + + const waitingJobs = await queue.getWaiting(); + expect(waitingJobs.length).to.be.equal(1); + + await this.clock.tick(500); + + resolveCompleting!(); + await worker!.close(); + + await this.clock.tick(500); + + const delayedCount2 = await queue.getDelayedCount(); + expect(delayedCount2).to.be.equal(1); + + let completedJobs = await queue.getCompleted(); + expect(completedJobs.length).to.be.equal(0); + + const processing2 = new Promise(resolve => { + worker = new Worker( + queueName, + async () => { + resolve(); + }, + { + connection, + prefix, + skipLockRenewal: true, + skipStalledCheck: true, + }, + ); + }); + + await processing2; + + await worker!.close(); + + completedJobs = await queue.getCompleted(); + expect(completedJobs.length).to.be.equal(1); + + const waitingJobs2 = await queue.getWaiting(); + expect(waitingJobs2.length).to.be.equal(0); + + const delayedCount3 = await queue.getDelayedCount(); + expect(delayedCount3).to.be.equal(1); + }); + }); + it('should keep only one delayed job if adding a new repeatable job with the same id', async function () { const date = new Date('2017-02-07 9:24:00'); const key = 'mykey'; diff --git a/tests/test_queue.ts b/tests/test_queue.ts index 80bd41a92d..8b5ca6c3e7 100644 --- a/tests/test_queue.ts +++ b/tests/test_queue.ts @@ -37,6 +37,19 @@ describe('queues', function () { await connection.quit(); }); + describe('use generics', function () { + it('should be able to use generics', async function () { + const queue = new Queue<{ foo: string; bar: number }>('test', { + connection, + }); + const job = await queue.add('test', { foo: 'bar', bar: 1 }); + const job2 = await queue.getJob(job.id!); + expect(job2?.data.foo).to.be.eql('bar'); + expect(job2?.data.bar).to.be.eql(1); + await queue.close(); + }); + }); + it('should return the queue version', async () => { const queue = new Queue(queueName, { connection }); const version = await queue.getVersion();