Skip to content

Commit

Permalink
test(job-scheduler): add tests covering job retries and stalls
Browse files Browse the repository at this point in the history
  • Loading branch information
manast committed Nov 27, 2024
1 parent 36b9973 commit 53911c2
Show file tree
Hide file tree
Showing 3 changed files with 255 additions and 1 deletion.
2 changes: 1 addition & 1 deletion src/classes/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1042,7 +1042,6 @@ will never work with more accuracy than 1ms. */
}

clearTimeout(this.extendLocksTimer);
//clearTimeout(this.stalledCheckTimer);
this.stalledCheckStopper?.();

this.closed = true;
Expand Down Expand Up @@ -1249,6 +1248,7 @@ will never work with more accuracy than 1ms. */
this.emit('stalled', jobId, 'active');
});

// Todo: check if there any listeners on failed event
const jobPromises: Promise<Job<DataType, ResultType, NameType>>[] = [];
for (let i = 0; i < failed.length; i++) {
jobPromises.push(
Expand Down
241 changes: 241 additions & 0 deletions tests/test_job_scheduler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1403,6 +1403,247 @@ describe('Job Scheduler', function () {
});
});

describe('when repeatable job fails', async function () {
it('should continue repeating', async function () {
const repeatOpts = {
pattern: '0 * 1 * *',
};

const worker = new Worker(
queueName,
async () => {
throw new Error('failed');
},
{
connection,
prefix,
},
);

const failing = new Promise<void>(resolve => {
worker.on('failed', () => {
resolve();
});
});

const repeatableJob = await queue.upsertJobScheduler('test', repeatOpts);
const delayedCount = await queue.getDelayedCount();
expect(delayedCount).to.be.equal(1);

await repeatableJob!.promote();
await failing;

const failedCount = await queue.getFailedCount();
expect(failedCount).to.be.equal(1);

const delayedCount2 = await queue.getDelayedCount();
expect(delayedCount2).to.be.equal(1);

const configs = await repeat.getRepeatableJobs(0, -1, true);

expect(delayedCount).to.be.equal(1);

const count = await queue.count();

expect(count).to.be.equal(1);
expect(configs).to.have.length(1);
await worker.close();
});

it('should not create a new delayed job if the failed job is retried with retryJobs', async function () {
const repeatOpts = {
every: 579,
};

const worker = new Worker(
queueName,
async () => {
this.clock.tick(177);
throw new Error('failed');
},
{
connection,
prefix,
},
);

const failing = new Promise<void>(resolve => {
worker.on('failed', async () => {
resolve();
});
});

const repeatableJob = await queue.upsertJobScheduler('test', repeatOpts);
const delayedCount = await queue.getDelayedCount();
expect(delayedCount).to.be.equal(1);

await repeatableJob!.promote();
await failing;

const failedCount = await queue.getFailedCount();
expect(failedCount).to.be.equal(1);

// Retry the failed job
this.clock.tick(1143);
await queue.retryJobs({ state: 'failed' });
const failedCountAfterRetry = await queue.getFailedCount();
expect(failedCountAfterRetry).to.be.equal(0);

const delayedCount2 = await queue.getDelayedCount();
expect(delayedCount2).to.be.equal(1);
});

it('should not create a new delayed job if the failed job is retried with Job.retry()', async function () {
const repeatOpts = {
every: 477,
};

const worker = new Worker(
queueName,
async () => {
this.clock.tick(177);
throw new Error('failed');
},
{
connection,
prefix,
},
);

const failing = new Promise<void>(resolve => {
worker.on('failed', async () => {
resolve();
});
});

const repeatableJob = await queue.upsertJobScheduler('test', repeatOpts);
const delayedCount = await queue.getDelayedCount();
expect(delayedCount).to.be.equal(1);

await repeatableJob!.promote();

this.clock.tick(177);

await failing;

this.clock.tick(177);

const failedJobs = await queue.getFailed();
expect(failedJobs.length).to.be.equal(1);

// Retry the failed job
const failedJob = await queue.getJob(failedJobs[0].id);
await failedJob!.retry();
const failedCountAfterRetry = await queue.getFailedCount();
expect(failedCountAfterRetry).to.be.equal(0);

const delayedCount2 = await queue.getDelayedCount();
expect(delayedCount2).to.be.equal(1);
});

it('should not create a new delayed job if the failed job is stalled and moved back to wait', async function () {
// Note, this test is expected to throw an exception like this:
// "Error: Missing lock for job repeat:test:1486455840000. moveToFinished"
const date = new Date('2017-02-07 9:24:00');
this.clock.setSystemTime(date);

const repeatOpts = {
every: 2000,
};

const repeatableJob = await queue.upsertJobScheduler('test', repeatOpts);
expect(repeatableJob).to.be.ok;

const delayedCount = await queue.getDelayedCount();
expect(delayedCount).to.be.equal(1);

await repeatableJob!.promote();

let resolveCompleting: () => void;
const complettingJob = new Promise<void>(resolve => {
resolveCompleting = resolve;
});

let worker: Worker;
const processing = new Promise<void>(resolve => {
worker = new Worker(
queueName,
async () => {
resolve();
return complettingJob;
},
{
connection,
prefix,
skipLockRenewal: true,
skipStalledCheck: true,
},
);
});

await processing;

// force remove the lock
const client = await queue.client;
const lockKey = `bull:${queueName}:${repeatableJob!.id}:lock`;
await client.del(lockKey);

const stalledCheckerKey = `bull:${queueName}:stalled-check`;
await client.del(stalledCheckerKey);

const scripts = (<any>worker!).scripts;
let [failed, stalled] = await scripts.moveStalledJobsToWait();

await client.del(stalledCheckerKey);

[failed, stalled] = await scripts.moveStalledJobsToWait();

const waitingJobs = await queue.getWaiting();
expect(waitingJobs.length).to.be.equal(1);

await this.clock.tick(500);

resolveCompleting!();
await worker!.close();

await this.clock.tick(500);

const delayedCount2 = await queue.getDelayedCount();
expect(delayedCount2).to.be.equal(1);

let completedJobs = await queue.getCompleted();
expect(completedJobs.length).to.be.equal(0);

const processing2 = new Promise<void>(resolve => {
worker = new Worker(
queueName,
async () => {
resolve();
},
{
connection,
prefix,
skipLockRenewal: true,
skipStalledCheck: true,
},
);
});

await processing2;

await worker!.close();

completedJobs = await queue.getCompleted();
expect(completedJobs.length).to.be.equal(1);

const waitingJobs2 = await queue.getWaiting();
expect(waitingJobs2.length).to.be.equal(0);

const delayedCount3 = await queue.getDelayedCount();
expect(delayedCount3).to.be.equal(1);
});
});

it('should keep only one delayed job if adding a new repeatable job with the same id', async function () {
const date = new Date('2017-02-07 9:24:00');
const key = 'mykey';
Expand Down
13 changes: 13 additions & 0 deletions tests/test_queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,19 @@ describe('queues', function () {
await connection.quit();
});

describe('use generics', function () {
it('should be able to use generics', async function () {
const queue = new Queue<{ foo: string; bar: number }>('test', {
connection,
});
const job = await queue.add('test', { foo: 'bar', bar: 1 });
const job2 = await queue.getJob(job.id!);
expect(job2?.data.foo).to.be.eql('bar');
expect(job2?.data.bar).to.be.eql(1);
await queue.close();
});
});

it('should return the queue version', async () => {
const queue = new Queue(queueName, { connection });
const version = await queue.getVersion();
Expand Down

0 comments on commit 53911c2

Please sign in to comment.