From 18025a1e62977fd6282e9618ed9a08032ae55406 Mon Sep 17 00:00:00 2001 From: roggervalf Date: Mon, 7 Oct 2024 11:15:00 -0500 Subject: [PATCH 1/3] feat(job): expose priority value --- src/classes/job.ts | 18 ++++++++++++++++-- src/interfaces/base-job-options.ts | 3 ++- tests/test_job.ts | 28 +++++++++++++++++----------- 3 files changed, 35 insertions(+), 14 deletions(-) diff --git a/src/classes/job.ts b/src/classes/job.ts index d6e84ed605..fb606d9eaa 100644 --- a/src/classes/job.ts +++ b/src/classes/job.ts @@ -95,7 +95,15 @@ export class Job< * An amount of milliseconds to wait until this job can be processed. * @defaultValue 0 */ - delay: number; + delay = 0; + + /** + * Ranges from 0 (highest priority) to 2 097 152 (lowest priority). Note that + * using priorities has a slight impact on performance, + * so do not use it if not required. + * @defaultValue 0 + */ + priority = 0; /** * Timestamp when the job was created (unless overridden with job options). @@ -201,6 +209,8 @@ export class Job< this.delay = this.opts.delay; + this.priority = this.opts.priority || 0; + this.repeatJobKey = repeatJobKey; this.timestamp = opts.timestamp ? opts.timestamp : Date.now(); @@ -214,7 +224,9 @@ export class Job< : undefined; this.debounceId = opts.debounce ? opts.debounce.id : undefined; - this.deduplicationId = opts.deduplication ? opts.deduplication.id : this.debounceId; + this.deduplicationId = opts.deduplication + ? opts.deduplication.id + : this.debounceId; this.toKey = queue.toKey.bind(queue); this.setScripts(); @@ -840,6 +852,7 @@ export class Job< /** * Change job priority. * + * @param opts - options containing priority and lifo values. * @returns void */ async changePriority(opts: { @@ -847,6 +860,7 @@ export class Job< lifo?: boolean; }): Promise { await this.scripts.changePriority(this.id, opts.priority, opts.lifo); + this.priority = opts.priority || 0; } /** diff --git a/src/interfaces/base-job-options.ts b/src/interfaces/base-job-options.ts index 2d630f8c3e..7d5d7d4c2a 100644 --- a/src/interfaces/base-job-options.ts +++ b/src/interfaces/base-job-options.ts @@ -8,9 +8,10 @@ export interface DefaultJobOptions { timestamp?: number; /** - * Ranges from 1 (highest priority) to 2 097 152 (lowest priority). Note that + * Ranges from 0 (highest priority) to 2 097 152 (lowest priority). Note that * using priorities has a slight impact on performance, * so do not use it if not required. + * @defaultValue 0 */ priority?: number; diff --git a/tests/test_job.ts b/tests/test_job.ts index 7bfb5c6531..0393fe006e 100644 --- a/tests/test_job.ts +++ b/tests/test_job.ts @@ -1053,11 +1053,15 @@ describe('Job', function () { { priority: 16 }, ); + expect(job.priority).to.be.eql(16); + await job.changePriority({ priority: 0, lifo: true, }); + expect(job.priority).to.be.eql(0); + const worker = new Worker( queueName, async () => { @@ -1367,9 +1371,11 @@ describe('Job', function () { const job = await queue.add( 'test', { foo: 'bar' }, - { repeat: { - pattern: '0 0 7 * * *' - }, }, + { + repeat: { + pattern: '0 0 7 * * *', + }, + }, ); const isDelayed = await job.isDelayed(); expect(isDelayed).to.be.equal(true); @@ -1377,7 +1383,7 @@ describe('Job', function () { expect(job.delay).to.be.equal(0); const worker = new Worker(queueName, null, { connection, prefix }); - + const currentJob1 = (await worker.getNextJob('token')) as Job; expect(currentJob1).to.not.be.undefined; @@ -1405,7 +1411,7 @@ describe('Job', function () { ); const isDelayed = await job.isDelayed(); expect(isDelayed).to.be.equal(true); - + await queue.add( 'test', { foo: 'bar' }, @@ -1417,25 +1423,25 @@ describe('Job', function () { ); const delayedCount = await queue.getDelayedCount(); expect(delayedCount).to.be.equal(1); - + await job.promote(); expect(job.delay).to.be.equal(0); - + const worker = new Worker(queueName, null, { connection, prefix }); const currentJob1 = (await worker.getNextJob('token')) as Job; expect(currentJob1).to.not.be.undefined; - + await currentJob1.moveToCompleted('succeeded', 'token', true); const completedCount = await queue.getCompletedCount(); const delayedCountAfterPromote = await queue.getDelayedCount(); expect(completedCount).to.be.equal(1); expect(delayedCountAfterPromote).to.be.equal(1); - + const completedCountAfterRestart = await queue.getCompletedCount(); const delayedCountAfterRestart = await queue.getDelayedCount(); expect(completedCountAfterRestart).to.be.equal(1); expect(delayedCountAfterRestart).to.be.equal(1); - + await queue.add( 'test', { foo: 'bar' }, @@ -1451,7 +1457,7 @@ describe('Job', function () { expect(completedCountAfterReAddition).to.be.equal(1); expect(delayedCountAfterReAddition).to.be.equal(1); }); - }); + }); }); describe('when queue is paused', () => { From a3677d5b71656725ddd1402acdf9df0bd2314fe7 Mon Sep 17 00:00:00 2001 From: roggervalf Date: Tue, 8 Oct 2024 19:33:35 -0500 Subject: [PATCH 2/3] chore: fix finishedErrors reference for pro version --- src/classes/job.ts | 3 +- src/classes/scripts.ts | 76 ++++++++++++++++++------ src/commands/includes/deduplicateJob.lua | 1 - src/utils.ts | 42 ------------- tests/test_job_scheduler.ts | 22 ++----- tests/test_repeat.ts | 22 ++----- 6 files changed, 66 insertions(+), 100 deletions(-) diff --git a/src/classes/job.ts b/src/classes/job.ts index fb606d9eaa..3ae9447161 100644 --- a/src/classes/job.ts +++ b/src/classes/job.ts @@ -29,7 +29,6 @@ import { lengthInUtf8Bytes, parseObjectValues, tryCatch, - finishedErrors, } from '../utils'; import { Backoffs } from './backoffs'; import { Scripts, raw2NextJobData } from './scripts'; @@ -749,7 +748,7 @@ export class Job< const result = results[results.length - 1][1] as number; if (result < 0) { - throw finishedErrors({ + throw this.scripts.finishedErrors({ code: result, jobId: this.id, command, diff --git a/src/classes/scripts.ts b/src/classes/scripts.ts index 4cbaa099a7..d5df5b0ab3 100644 --- a/src/classes/scripts.ts +++ b/src/classes/scripts.ts @@ -34,12 +34,7 @@ import { RedisJobOptions, } from '../types'; import { ErrorCode } from '../enums'; -import { - array2obj, - finishedErrors, - getParentKey, - isRedisVersionLowerThan, -} from '../utils'; +import { array2obj, getParentKey, isRedisVersionLowerThan } from '../utils'; import { ChainableCommander } from 'ioredis'; export type JobData = [JobJsonRaw | number, string?]; @@ -225,7 +220,7 @@ export class Scripts { } if (result < 0) { - throw finishedErrors({ + throw this.finishedErrors({ code: result, parentKey: parentOpts.parentKey, command: 'addJob', @@ -414,7 +409,7 @@ export class Scripts { ); if (result == ErrorCode.JobBelongsToJobScheduler) { - throw finishedErrors({ + throw this.finishedErrors({ code: ErrorCode.JobBelongsToJobScheduler, jobId, command: 'remove', @@ -453,7 +448,7 @@ export class Scripts { const result = await (client).updateData(keys.concat([dataJson])); if (result < 0) { - throw finishedErrors({ + throw this.finishedErrors({ code: result, jobId: job.id, command: 'updateData', @@ -479,7 +474,7 @@ export class Scripts { ); if (result < 0) { - throw finishedErrors({ + throw this.finishedErrors({ code: result, jobId, command: 'updateProgress', @@ -504,7 +499,7 @@ export class Scripts { ); if (result < 0) { - throw finishedErrors({ + throw this.finishedErrors({ code: result, jobId, command: 'addLog', @@ -588,7 +583,7 @@ export class Scripts { const result = await (client).moveToFinished(args); if (result < 0) { - throw finishedErrors({ + throw this.finishedErrors({ code: result, jobId, command: 'moveToFinished', @@ -601,6 +596,49 @@ export class Scripts { } } + finishedErrors = ({ + code, + jobId, + parentKey, + command, + state, + }: { + code: number; + jobId?: string; + parentKey?: string; + command: string; + state?: string; + }): Error => { + switch (code) { + case ErrorCode.JobNotExist: + return new Error(`Missing key for job ${jobId}. ${command}`); + case ErrorCode.JobLockNotExist: + return new Error(`Missing lock for job ${jobId}. ${command}`); + case ErrorCode.JobNotInState: + return new Error( + `Job ${jobId} is not in the ${state} state. ${command}`, + ); + case ErrorCode.JobPendingDependencies: + return new Error(`Job ${jobId} has pending dependencies. ${command}`); + case ErrorCode.ParentJobNotExist: + return new Error(`Missing key for parent job ${parentKey}. ${command}`); + case ErrorCode.JobLockMismatch: + return new Error( + `Lock mismatch for job ${jobId}. Cmd ${command} from ${state}`, + ); + case ErrorCode.ParentJobCannotBeReplaced: + return new Error( + `The parent job ${parentKey} cannot be replaced. ${command}`, + ); + case ErrorCode.JobBelongsToJobScheduler: + return new Error( + `Job ${jobId} belongs to a job scheduler and cannot be removed directly. ${command}`, + ); + default: + return new Error(`Unknown code ${code} error for ${jobId}. ${command}`); + } + }; + private drainArgs(delayed: boolean): (string | number)[] { const queueKeys = this.queue.keys; @@ -652,7 +690,7 @@ export class Scripts { case 1: return false; default: - throw finishedErrors({ + throw this.finishedErrors({ code: result, jobId, parentKey, @@ -816,7 +854,7 @@ export class Scripts { const args = this.changeDelayArgs(jobId, delay); const result = await (client).changeDelay(args); if (result < 0) { - throw finishedErrors({ + throw this.finishedErrors({ code: result, jobId, command: 'changeDelay', @@ -853,7 +891,7 @@ export class Scripts { const args = this.changePriorityArgs(jobId, priority, lifo); const result = await (client).changePriority(args); if (result < 0) { - throw finishedErrors({ + throw this.finishedErrors({ code: result, jobId, command: 'changePriority', @@ -971,7 +1009,7 @@ export class Scripts { const args = this.moveToDelayedArgs(jobId, timestamp, token, delay, opts); const result = await (client).moveToDelayed(args); if (result < 0) { - throw finishedErrors({ + throw this.finishedErrors({ code: result, jobId, command: 'moveToDelayed', @@ -1007,7 +1045,7 @@ export class Scripts { case 1: return false; default: - throw finishedErrors({ + throw this.finishedErrors({ code: result, jobId, command: 'moveToWaitingChildren', @@ -1166,7 +1204,7 @@ export class Scripts { case 1: return; default: - throw finishedErrors({ + throw this.finishedErrors({ code: result, jobId: job.id, command: 'reprocessJob', @@ -1230,7 +1268,7 @@ export class Scripts { const code = await (client).promote(keys.concat(args)); if (code < 0) { - throw finishedErrors({ + throw this.finishedErrors({ code, jobId, command: 'promote', diff --git a/src/commands/includes/deduplicateJob.lua b/src/commands/includes/deduplicateJob.lua index 07d785437a..ff873e599c 100644 --- a/src/commands/includes/deduplicateJob.lua +++ b/src/commands/includes/deduplicateJob.lua @@ -22,4 +22,3 @@ local function deduplicateJob(prefixKey, deduplicationOpts, jobId, deduplication end end end - \ No newline at end of file diff --git a/src/utils.ts b/src/utils.ts index 835fffe3a2..cceecb438c 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -9,7 +9,6 @@ import { CONNECTION_CLOSED_ERROR_MSG } from 'ioredis/built/utils'; import { ChildMessage, RedisClient } from './interfaces'; import { EventEmitter } from 'events'; import * as semver from 'semver'; -import { ErrorCode } from './enums'; export const errorObject: { [index: string]: any } = { value: null }; @@ -248,44 +247,3 @@ export const toString = (value: any): string => { }; export const QUEUE_EVENT_SUFFIX = ':qe'; - -export const finishedErrors = ({ - code, - jobId, - parentKey, - command, - state, -}: { - code: number; - jobId?: string; - parentKey?: string; - command: string; - state?: string; -}): Error => { - switch (code) { - case ErrorCode.JobNotExist: - return new Error(`Missing key for job ${jobId}. ${command}`); - case ErrorCode.JobLockNotExist: - return new Error(`Missing lock for job ${jobId}. ${command}`); - case ErrorCode.JobNotInState: - return new Error(`Job ${jobId} is not in the ${state} state. ${command}`); - case ErrorCode.JobPendingDependencies: - return new Error(`Job ${jobId} has pending dependencies. ${command}`); - case ErrorCode.ParentJobNotExist: - return new Error(`Missing key for parent job ${parentKey}. ${command}`); - case ErrorCode.JobLockMismatch: - return new Error( - `Lock mismatch for job ${jobId}. Cmd ${command} from ${state}`, - ); - case ErrorCode.ParentJobCannotBeReplaced: - return new Error( - `The parent job ${parentKey} cannot be replaced. ${command}`, - ); - case ErrorCode.JobBelongsToJobScheduler: - return new Error( - `Job ${jobId} belongs to a job scheduler and cannot be removed directly`, - ); - default: - return new Error(`Unknown code ${code} error for ${jobId}. ${command}`); - } -}; diff --git a/tests/test_job_scheduler.ts b/tests/test_job_scheduler.ts index f22f80f253..6d63006a51 100644 --- a/tests/test_job_scheduler.ts +++ b/tests/test_job_scheduler.ts @@ -14,8 +14,7 @@ import { Worker, } from '../src/classes'; import { JobsOptions } from '../src/types'; -import { removeAllQueueData, delay, finishedErrors } from '../src/utils'; -import { ErrorCode } from '../src/enums'; +import { removeAllQueueData } from '../src/utils'; const moment = require('moment'); @@ -1363,22 +1362,9 @@ describe('Job Scheduler', function () { // Try to remove the delayed job const job = delayed[0]; - try { - await job.remove(); - const delayed = await queue.getDelayed(); - console.log({ delayed }); - expect.fail( - 'Should not be able to remove a delayed job that belongs to a repeatable job', - ); - } catch (err) { - const expectedErrMessage = finishedErrors({ - code: ErrorCode.JobBelongsToJobScheduler, - jobId: job.id, - command: 'remove', - }).message; - - expect(err.message).to.be.eql(expectedErrMessage); - } + await expect(job.remove()).to.be.rejectedWith( + `Job ${job.id} belongs to a job scheduler and cannot be removed directly. remove`, + ); }); it('should not remove delayed jobs if they belong to a repeatable job when using drain', async function () { diff --git a/tests/test_repeat.ts b/tests/test_repeat.ts index 6d2f846ee4..ed6ee6e301 100644 --- a/tests/test_repeat.ts +++ b/tests/test_repeat.ts @@ -13,14 +13,13 @@ import { Worker, } from '../src/classes'; import { JobsOptions } from '../src/types'; -import { removeAllQueueData, delay, finishedErrors } from '../src/utils'; +import { removeAllQueueData } from '../src/utils'; import { createRepeatableJobKey, extractRepeatableJobChecksumFromRedisKey, getRepeatableJobKeyPrefix, getRepeatJobIdCheckum, } from './utils/repeat_utils'; -import { ErrorCode } from '../src/enums'; const moment = require('moment'); @@ -1780,22 +1779,9 @@ describe('repeat', function () { // Try to remove the delayed job const job = delayed[0]; - try { - await job.remove(); - const delayed = await queue.getDelayed(); - console.log({ delayed }); - expect.fail( - 'Should not be able to remove a delayed job that belongs to a repeatable job', - ); - } catch (err) { - const expectedErrMessage = finishedErrors({ - code: ErrorCode.JobBelongsToJobScheduler, - jobId: job.id, - command: 'remove', - }).message; - - expect(err.message).to.be.eql(expectedErrMessage); - } + await expect(job.remove()).to.be.rejectedWith( + `Job ${job.id} belongs to a job scheduler and cannot be removed directly. remove`, + ); }); it('should not repeat more than 5 times', async function () { From 4fd7af471a95a05a43bd4d519de0b5b9356e48ea Mon Sep 17 00:00:00 2001 From: roggervalf Date: Tue, 8 Oct 2024 19:43:33 -0500 Subject: [PATCH 3/3] test: fix flaky test --- tests/test_job_scheduler.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_job_scheduler.ts b/tests/test_job_scheduler.ts index 8064828641..2c62777230 100644 --- a/tests/test_job_scheduler.ts +++ b/tests/test_job_scheduler.ts @@ -1103,7 +1103,7 @@ describe('Job Scheduler', function () { try { if (prev) { expect(prev.timestamp).to.be.lt(job.timestamp); - const diff = moment(job.timestamp).diff( + const diff = moment(job.processedOn!).diff( moment(prev.timestamp), 'months', true,