From e205803cdd6da9ca713e32cf7fc411122b9ab0c1 Mon Sep 17 00:00:00 2001 From: Manuel Astudillo Date: Sat, 12 Oct 2024 12:20:09 +0200 Subject: [PATCH 1/9] feat(queue): add queue version support --- src/classes/queue.ts | 14 +++++++ src/classes/scripts.ts | 91 ++++++++++++------------------------------ src/utils.ts | 66 ++++++++++++++++++++++++++++++ tests/test_queue.ts | 8 ++++ 4 files changed, 114 insertions(+), 65 deletions(-) diff --git a/src/classes/queue.ts b/src/classes/queue.ts index e1dde0044f..76b68a1303 100644 --- a/src/classes/queue.ts +++ b/src/classes/queue.ts @@ -13,6 +13,7 @@ import { QueueGetters } from './queue-getters'; import { Repeat } from './repeat'; import { RedisConnection } from './redis-connection'; import { JobScheduler } from './job-scheduler'; +import { readPackageJson } from '../utils'; export interface ObliterateOpts { /** @@ -168,11 +169,24 @@ export class Queue< } get metaValues(): Record { + const { name, version } = readPackageJson(); + return { 'opts.maxLenEvents': this.opts?.streams?.events?.maxLen ?? 10000, + version: `${name}:${version}`, }; } + /** + * Get library version. + * + * @returns the content of the meta.library field. + */ + async getVersion(): Promise { + const client = await this.client; + return await client.hget(this.keys.meta, 'version'); + } + get repeat(): Promise { return new Promise(async resolve => { if (!this._repeat) { diff --git a/src/classes/scripts.ts b/src/classes/scripts.ts index 285ec36efa..6f007b90b6 100644 --- a/src/classes/scripts.ts +++ b/src/classes/scripts.ts @@ -34,7 +34,12 @@ import { RedisJobOptions, } from '../types'; import { ErrorCode } from '../enums'; -import { array2obj, getParentKey, isRedisVersionLowerThan } from '../utils'; +import { + array2obj, + finishedErrors, + getParentKey, + isRedisVersionLowerThan, +} from '../utils'; import { ChainableCommander } from 'ioredis'; export type JobData = [JobJsonRaw | number, string?]; @@ -220,7 +225,7 @@ export class Scripts { } if (result < 0) { - throw this.finishedErrors({ + throw finishedErrors({ code: result, parentKey: parentOpts.parentKey, command: 'addJob', @@ -398,11 +403,14 @@ export class Scripts { return (client).removeJobScheduler(keys.concat(args)); } - protected removeArgs(jobId: string, removeChildren: boolean): (string | number)[] { + protected removeArgs( + jobId: string, + removeChildren: boolean, + ): (string | number)[] { const keys: (string | number)[] = ['', 'meta'].map(name => this.queue.toKey(name), ); - + const args = [jobId, removeChildren ? 1 : 0]; return keys.concat(args); @@ -411,16 +419,12 @@ export class Scripts { async remove(jobId: string, removeChildren: boolean): Promise { const client = await this.queue.client; - const args = this.removeArgs( - jobId, removeChildren - ); + const args = this.removeArgs(jobId, removeChildren); - const result = await (client).removeJob( - args, - ); + const result = await (client).removeJob(args); if (result < 0) { - throw this.finishedErrors({ + throw finishedErrors({ code: result, jobId, command: 'removeJob', @@ -459,7 +463,7 @@ export class Scripts { const result = await (client).updateData(keys.concat([dataJson])); if (result < 0) { - throw this.finishedErrors({ + throw finishedErrors({ code: result, jobId: job.id, command: 'updateData', @@ -485,7 +489,7 @@ export class Scripts { ); if (result < 0) { - throw this.finishedErrors({ + throw finishedErrors({ code: result, jobId, command: 'updateProgress', @@ -510,7 +514,7 @@ export class Scripts { ); if (result < 0) { - throw this.finishedErrors({ + throw finishedErrors({ code: result, jobId, command: 'addLog', @@ -594,7 +598,7 @@ export class Scripts { const result = await (client).moveToFinished(args); if (result < 0) { - throw this.finishedErrors({ + throw finishedErrors({ code: result, jobId, command: 'moveToFinished', @@ -607,49 +611,6 @@ export class Scripts { } } - finishedErrors({ - code, - jobId, - parentKey, - command, - state, - }: { - code: number; - jobId?: string; - parentKey?: string; - command: string; - state?: string; - }): Error { - switch (code) { - case ErrorCode.JobNotExist: - return new Error(`Missing key for job ${jobId}. ${command}`); - case ErrorCode.JobLockNotExist: - return new Error(`Missing lock for job ${jobId}. ${command}`); - case ErrorCode.JobNotInState: - return new Error( - `Job ${jobId} is not in the ${state} state. ${command}`, - ); - case ErrorCode.JobPendingDependencies: - return new Error(`Job ${jobId} has pending dependencies. ${command}`); - case ErrorCode.ParentJobNotExist: - return new Error(`Missing key for parent job ${parentKey}. ${command}`); - case ErrorCode.JobLockMismatch: - return new Error( - `Lock mismatch for job ${jobId}. Cmd ${command} from ${state}`, - ); - case ErrorCode.ParentJobCannotBeReplaced: - return new Error( - `The parent job ${parentKey} cannot be replaced. ${command}`, - ); - case ErrorCode.JobBelongsToJobScheduler: - return new Error( - `Job ${jobId} belongs to a job scheduler and cannot be removed directly. ${command}`, - ); - default: - return new Error(`Unknown code ${code} error for ${jobId}. ${command}`); - } - } - private drainArgs(delayed: boolean): (string | number)[] { const queueKeys = this.queue.keys; @@ -701,7 +662,7 @@ export class Scripts { case 1: return false; default: - throw this.finishedErrors({ + throw finishedErrors({ code: result, jobId, parentKey, @@ -865,7 +826,7 @@ export class Scripts { const args = this.changeDelayArgs(jobId, delay); const result = await (client).changeDelay(args); if (result < 0) { - throw this.finishedErrors({ + throw finishedErrors({ code: result, jobId, command: 'changeDelay', @@ -902,7 +863,7 @@ export class Scripts { const args = this.changePriorityArgs(jobId, priority, lifo); const result = await (client).changePriority(args); if (result < 0) { - throw this.finishedErrors({ + throw finishedErrors({ code: result, jobId, command: 'changePriority', @@ -1020,7 +981,7 @@ export class Scripts { const args = this.moveToDelayedArgs(jobId, timestamp, token, delay, opts); const result = await (client).moveToDelayed(args); if (result < 0) { - throw this.finishedErrors({ + throw finishedErrors({ code: result, jobId, command: 'moveToDelayed', @@ -1056,7 +1017,7 @@ export class Scripts { case 1: return false; default: - throw this.finishedErrors({ + throw finishedErrors({ code: result, jobId, command: 'moveToWaitingChildren', @@ -1215,7 +1176,7 @@ export class Scripts { case 1: return; default: - throw this.finishedErrors({ + throw finishedErrors({ code: result, jobId: job.id, command: 'reprocessJob', @@ -1279,7 +1240,7 @@ export class Scripts { const code = await (client).promote(keys.concat(args)); if (code < 0) { - throw this.finishedErrors({ + throw finishedErrors({ code, jobId, command: 'promote', diff --git a/src/utils.ts b/src/utils.ts index cceecb438c..75e57c519f 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -10,6 +10,10 @@ import { ChildMessage, RedisClient } from './interfaces'; import { EventEmitter } from 'events'; import * as semver from 'semver'; +import { ErrorCode } from './enums'; +import { join } from 'path'; +import { readFileSync } from 'fs'; + export const errorObject: { [index: string]: any } = { value: null }; export function tryCatch( @@ -247,3 +251,65 @@ export const toString = (value: any): string => { }; export const QUEUE_EVENT_SUFFIX = ':qe'; + +export const finishedErrors = ({ + code, + jobId, + parentKey, + command, + state, +}: { + code: number; + jobId?: string; + parentKey?: string; + command: string; + state?: string; +}): Error => { + switch (code) { + case ErrorCode.JobNotExist: + return new Error(`Missing key for job ${jobId}. ${command}`); + case ErrorCode.JobLockNotExist: + return new Error(`Missing lock for job ${jobId}. ${command}`); + case ErrorCode.JobNotInState: + return new Error(`Job ${jobId} is not in the ${state} state. ${command}`); + case ErrorCode.JobPendingDependencies: + return new Error(`Job ${jobId} has pending dependencies. ${command}`); + case ErrorCode.ParentJobNotExist: + return new Error(`Missing key for parent job ${parentKey}. ${command}`); + case ErrorCode.JobLockMismatch: + return new Error( + `Lock mismatch for job ${jobId}. Cmd ${command} from ${state}`, + ); + case ErrorCode.ParentJobCannotBeReplaced: + return new Error( + `The parent job ${parentKey} cannot be replaced. ${command}`, + ); + case ErrorCode.JobBelongsToJobScheduler: + return new Error( + `Job ${jobId} belongs to a job scheduler and cannot be removed directly`, + ); + default: + return new Error(`Unknown code ${code} error for ${jobId}. ${command}`); + } +}; + +export const readPackageJson: () => { name: string; version: string } = () => { + const packageJsonPossiblePaths = [ + join(__dirname, '../package.json'), + join(__dirname, '../../package.json'), + join(__dirname, '../../../package.json'), + ]; + + for (const path of packageJsonPossiblePaths) { + try { + return JSON.parse(readFileSync(path, 'utf-8')); + } catch (err) { + if ((err).code === 'ENOENT') { + continue; + } + console.log(err); + } + } + + return { name: 'bullmq', version: '0.0.0' }; +}; diff --git a/tests/test_queue.ts b/tests/test_queue.ts index 75cff55d5d..435ca89ced 100644 --- a/tests/test_queue.ts +++ b/tests/test_queue.ts @@ -36,6 +36,14 @@ describe('queues', function () { await connection.quit(); }); + it('should return the queue version', async () => { + const queue = new Queue(queueName, { connection }); + const version = await queue.getVersion(); + const { version: pkgJsonVersion, name } = require('../package.json'); + expect(version).to.be.equal(`${name}:${pkgJsonVersion}`); + return queue.close(); + }); + describe('.add', () => { describe('when jobId is provided as integer', () => { it('throws error', async function () { From f0cbc7c74ef365bd70aca689e0c404f814bad833 Mon Sep 17 00:00:00 2001 From: Manuel Astudillo Date: Sat, 12 Oct 2024 12:22:52 +0200 Subject: [PATCH 2/9] chore: fix compile error --- src/classes/job.ts | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/classes/job.ts b/src/classes/job.ts index 7cdfc9d842..740b06f81d 100644 --- a/src/classes/job.ts +++ b/src/classes/job.ts @@ -29,6 +29,7 @@ import { lengthInUtf8Bytes, parseObjectValues, tryCatch, + finishedErrors, } from '../utils'; import { Backoffs } from './backoffs'; import { Scripts, raw2NextJobData } from './scripts'; @@ -747,7 +748,7 @@ export class Job< const result = results[results.length - 1][1] as number; if (result < 0) { - throw this.scripts.finishedErrors({ + throw finishedErrors({ code: result, jobId: this.id, command, From deaa75d8009c5ead22ab6298a71e5ec27133ae5e Mon Sep 17 00:00:00 2001 From: Manuel Astudillo Date: Sat, 12 Oct 2024 12:48:55 +0200 Subject: [PATCH 3/9] chore: fix test error --- src/utils.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/utils.ts b/src/utils.ts index 75e57c519f..67a1711a6a 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -286,7 +286,7 @@ export const finishedErrors = ({ ); case ErrorCode.JobBelongsToJobScheduler: return new Error( - `Job ${jobId} belongs to a job scheduler and cannot be removed directly`, + `Job ${jobId} belongs to a job scheduler and cannot be removed directly. ${command}`, ); default: return new Error(`Unknown code ${code} error for ${jobId}. ${command}`); From 72d489638f53bf771f8265621724c918df75aa22 Mon Sep 17 00:00:00 2001 From: Manuel Astudillo Date: Sat, 12 Oct 2024 23:14:23 +0200 Subject: [PATCH 4/9] chore: move finishedErrors back to the scripts class --- src/classes/job.ts | 3 +- src/classes/scripts.ts | 76 +++++++++++++++++++++++++++++++----------- src/utils.ts | 41 ----------------------- 3 files changed, 58 insertions(+), 62 deletions(-) diff --git a/src/classes/job.ts b/src/classes/job.ts index 740b06f81d..7cdfc9d842 100644 --- a/src/classes/job.ts +++ b/src/classes/job.ts @@ -29,7 +29,6 @@ import { lengthInUtf8Bytes, parseObjectValues, tryCatch, - finishedErrors, } from '../utils'; import { Backoffs } from './backoffs'; import { Scripts, raw2NextJobData } from './scripts'; @@ -748,7 +747,7 @@ export class Job< const result = results[results.length - 1][1] as number; if (result < 0) { - throw finishedErrors({ + throw this.scripts.finishedErrors({ code: result, jobId: this.id, command, diff --git a/src/classes/scripts.ts b/src/classes/scripts.ts index 6f007b90b6..cd0dcdfdaa 100644 --- a/src/classes/scripts.ts +++ b/src/classes/scripts.ts @@ -34,12 +34,7 @@ import { RedisJobOptions, } from '../types'; import { ErrorCode } from '../enums'; -import { - array2obj, - finishedErrors, - getParentKey, - isRedisVersionLowerThan, -} from '../utils'; +import { array2obj, getParentKey, isRedisVersionLowerThan } from '../utils'; import { ChainableCommander } from 'ioredis'; export type JobData = [JobJsonRaw | number, string?]; @@ -225,7 +220,7 @@ export class Scripts { } if (result < 0) { - throw finishedErrors({ + throw this.finishedErrors({ code: result, parentKey: parentOpts.parentKey, command: 'addJob', @@ -424,7 +419,7 @@ export class Scripts { const result = await (client).removeJob(args); if (result < 0) { - throw finishedErrors({ + throw this.finishedErrors({ code: result, jobId, command: 'removeJob', @@ -463,7 +458,7 @@ export class Scripts { const result = await (client).updateData(keys.concat([dataJson])); if (result < 0) { - throw finishedErrors({ + throw this.finishedErrors({ code: result, jobId: job.id, command: 'updateData', @@ -489,7 +484,7 @@ export class Scripts { ); if (result < 0) { - throw finishedErrors({ + throw this.finishedErrors({ code: result, jobId, command: 'updateProgress', @@ -514,7 +509,7 @@ export class Scripts { ); if (result < 0) { - throw finishedErrors({ + throw this.finishedErrors({ code: result, jobId, command: 'addLog', @@ -598,7 +593,7 @@ export class Scripts { const result = await (client).moveToFinished(args); if (result < 0) { - throw finishedErrors({ + throw this.finishedErrors({ code: result, jobId, command: 'moveToFinished', @@ -662,7 +657,7 @@ export class Scripts { case 1: return false; default: - throw finishedErrors({ + throw this.finishedErrors({ code: result, jobId, parentKey, @@ -826,7 +821,7 @@ export class Scripts { const args = this.changeDelayArgs(jobId, delay); const result = await (client).changeDelay(args); if (result < 0) { - throw finishedErrors({ + throw this.finishedErrors({ code: result, jobId, command: 'changeDelay', @@ -863,7 +858,7 @@ export class Scripts { const args = this.changePriorityArgs(jobId, priority, lifo); const result = await (client).changePriority(args); if (result < 0) { - throw finishedErrors({ + throw this.finishedErrors({ code: result, jobId, command: 'changePriority', @@ -981,7 +976,7 @@ export class Scripts { const args = this.moveToDelayedArgs(jobId, timestamp, token, delay, opts); const result = await (client).moveToDelayed(args); if (result < 0) { - throw finishedErrors({ + throw this.finishedErrors({ code: result, jobId, command: 'moveToDelayed', @@ -1017,7 +1012,7 @@ export class Scripts { case 1: return false; default: - throw finishedErrors({ + throw this.finishedErrors({ code: result, jobId, command: 'moveToWaitingChildren', @@ -1176,7 +1171,7 @@ export class Scripts { case 1: return; default: - throw finishedErrors({ + throw this.finishedErrors({ code: result, jobId: job.id, command: 'reprocessJob', @@ -1240,7 +1235,7 @@ export class Scripts { const code = await (client).promote(keys.concat(args)); if (code < 0) { - throw finishedErrors({ + throw this.finishedErrors({ code, jobId, command: 'promote', @@ -1424,6 +1419,49 @@ export class Scripts { }; } } + + finishedErrors({ + code, + jobId, + parentKey, + command, + state, + }: { + code: number; + jobId?: string; + parentKey?: string; + command: string; + state?: string; + }): Error { + switch (code) { + case ErrorCode.JobNotExist: + return new Error(`Missing key for job ${jobId}. ${command}`); + case ErrorCode.JobLockNotExist: + return new Error(`Missing lock for job ${jobId}. ${command}`); + case ErrorCode.JobNotInState: + return new Error( + `Job ${jobId} is not in the ${state} state. ${command}`, + ); + case ErrorCode.JobPendingDependencies: + return new Error(`Job ${jobId} has pending dependencies. ${command}`); + case ErrorCode.ParentJobNotExist: + return new Error(`Missing key for parent job ${parentKey}. ${command}`); + case ErrorCode.JobLockMismatch: + return new Error( + `Lock mismatch for job ${jobId}. Cmd ${command} from ${state}`, + ); + case ErrorCode.ParentJobCannotBeReplaced: + return new Error( + `The parent job ${parentKey} cannot be replaced. ${command}`, + ); + case ErrorCode.JobBelongsToJobScheduler: + return new Error( + `Job ${jobId} belongs to a job scheduler and cannot be removed directly. ${command}`, + ); + default: + return new Error(`Unknown code ${code} error for ${jobId}. ${command}`); + } + } } export function raw2NextJobData(raw: any[]) { diff --git a/src/utils.ts b/src/utils.ts index 67a1711a6a..785d602974 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -252,47 +252,6 @@ export const toString = (value: any): string => { export const QUEUE_EVENT_SUFFIX = ':qe'; -export const finishedErrors = ({ - code, - jobId, - parentKey, - command, - state, -}: { - code: number; - jobId?: string; - parentKey?: string; - command: string; - state?: string; -}): Error => { - switch (code) { - case ErrorCode.JobNotExist: - return new Error(`Missing key for job ${jobId}. ${command}`); - case ErrorCode.JobLockNotExist: - return new Error(`Missing lock for job ${jobId}. ${command}`); - case ErrorCode.JobNotInState: - return new Error(`Job ${jobId} is not in the ${state} state. ${command}`); - case ErrorCode.JobPendingDependencies: - return new Error(`Job ${jobId} has pending dependencies. ${command}`); - case ErrorCode.ParentJobNotExist: - return new Error(`Missing key for parent job ${parentKey}. ${command}`); - case ErrorCode.JobLockMismatch: - return new Error( - `Lock mismatch for job ${jobId}. Cmd ${command} from ${state}`, - ); - case ErrorCode.ParentJobCannotBeReplaced: - return new Error( - `The parent job ${parentKey} cannot be replaced. ${command}`, - ); - case ErrorCode.JobBelongsToJobScheduler: - return new Error( - `Job ${jobId} belongs to a job scheduler and cannot be removed directly. ${command}`, - ); - default: - return new Error(`Unknown code ${code} error for ${jobId}. ${command}`); - } -}; - export const readPackageJson: () => { name: string; version: string } = () => { const packageJsonPossiblePaths = [ join(__dirname, '../package.json'), From e1d55711ab0ce756b413256d7eb23a7daa0e523f Mon Sep 17 00:00:00 2001 From: Manuel Astudillo Date: Sat, 12 Oct 2024 12:20:09 +0200 Subject: [PATCH 5/9] feat(queue): add queue version support --- src/classes/queue.ts | 14 +++++++ src/classes/scripts.ts | 91 ++++++++++++------------------------------ src/utils.ts | 66 ++++++++++++++++++++++++++++++ tests/test_queue.ts | 8 ++++ 4 files changed, 114 insertions(+), 65 deletions(-) diff --git a/src/classes/queue.ts b/src/classes/queue.ts index e1dde0044f..76b68a1303 100644 --- a/src/classes/queue.ts +++ b/src/classes/queue.ts @@ -13,6 +13,7 @@ import { QueueGetters } from './queue-getters'; import { Repeat } from './repeat'; import { RedisConnection } from './redis-connection'; import { JobScheduler } from './job-scheduler'; +import { readPackageJson } from '../utils'; export interface ObliterateOpts { /** @@ -168,11 +169,24 @@ export class Queue< } get metaValues(): Record { + const { name, version } = readPackageJson(); + return { 'opts.maxLenEvents': this.opts?.streams?.events?.maxLen ?? 10000, + version: `${name}:${version}`, }; } + /** + * Get library version. + * + * @returns the content of the meta.library field. + */ + async getVersion(): Promise { + const client = await this.client; + return await client.hget(this.keys.meta, 'version'); + } + get repeat(): Promise { return new Promise(async resolve => { if (!this._repeat) { diff --git a/src/classes/scripts.ts b/src/classes/scripts.ts index 285ec36efa..6f007b90b6 100644 --- a/src/classes/scripts.ts +++ b/src/classes/scripts.ts @@ -34,7 +34,12 @@ import { RedisJobOptions, } from '../types'; import { ErrorCode } from '../enums'; -import { array2obj, getParentKey, isRedisVersionLowerThan } from '../utils'; +import { + array2obj, + finishedErrors, + getParentKey, + isRedisVersionLowerThan, +} from '../utils'; import { ChainableCommander } from 'ioredis'; export type JobData = [JobJsonRaw | number, string?]; @@ -220,7 +225,7 @@ export class Scripts { } if (result < 0) { - throw this.finishedErrors({ + throw finishedErrors({ code: result, parentKey: parentOpts.parentKey, command: 'addJob', @@ -398,11 +403,14 @@ export class Scripts { return (client).removeJobScheduler(keys.concat(args)); } - protected removeArgs(jobId: string, removeChildren: boolean): (string | number)[] { + protected removeArgs( + jobId: string, + removeChildren: boolean, + ): (string | number)[] { const keys: (string | number)[] = ['', 'meta'].map(name => this.queue.toKey(name), ); - + const args = [jobId, removeChildren ? 1 : 0]; return keys.concat(args); @@ -411,16 +419,12 @@ export class Scripts { async remove(jobId: string, removeChildren: boolean): Promise { const client = await this.queue.client; - const args = this.removeArgs( - jobId, removeChildren - ); + const args = this.removeArgs(jobId, removeChildren); - const result = await (client).removeJob( - args, - ); + const result = await (client).removeJob(args); if (result < 0) { - throw this.finishedErrors({ + throw finishedErrors({ code: result, jobId, command: 'removeJob', @@ -459,7 +463,7 @@ export class Scripts { const result = await (client).updateData(keys.concat([dataJson])); if (result < 0) { - throw this.finishedErrors({ + throw finishedErrors({ code: result, jobId: job.id, command: 'updateData', @@ -485,7 +489,7 @@ export class Scripts { ); if (result < 0) { - throw this.finishedErrors({ + throw finishedErrors({ code: result, jobId, command: 'updateProgress', @@ -510,7 +514,7 @@ export class Scripts { ); if (result < 0) { - throw this.finishedErrors({ + throw finishedErrors({ code: result, jobId, command: 'addLog', @@ -594,7 +598,7 @@ export class Scripts { const result = await (client).moveToFinished(args); if (result < 0) { - throw this.finishedErrors({ + throw finishedErrors({ code: result, jobId, command: 'moveToFinished', @@ -607,49 +611,6 @@ export class Scripts { } } - finishedErrors({ - code, - jobId, - parentKey, - command, - state, - }: { - code: number; - jobId?: string; - parentKey?: string; - command: string; - state?: string; - }): Error { - switch (code) { - case ErrorCode.JobNotExist: - return new Error(`Missing key for job ${jobId}. ${command}`); - case ErrorCode.JobLockNotExist: - return new Error(`Missing lock for job ${jobId}. ${command}`); - case ErrorCode.JobNotInState: - return new Error( - `Job ${jobId} is not in the ${state} state. ${command}`, - ); - case ErrorCode.JobPendingDependencies: - return new Error(`Job ${jobId} has pending dependencies. ${command}`); - case ErrorCode.ParentJobNotExist: - return new Error(`Missing key for parent job ${parentKey}. ${command}`); - case ErrorCode.JobLockMismatch: - return new Error( - `Lock mismatch for job ${jobId}. Cmd ${command} from ${state}`, - ); - case ErrorCode.ParentJobCannotBeReplaced: - return new Error( - `The parent job ${parentKey} cannot be replaced. ${command}`, - ); - case ErrorCode.JobBelongsToJobScheduler: - return new Error( - `Job ${jobId} belongs to a job scheduler and cannot be removed directly. ${command}`, - ); - default: - return new Error(`Unknown code ${code} error for ${jobId}. ${command}`); - } - } - private drainArgs(delayed: boolean): (string | number)[] { const queueKeys = this.queue.keys; @@ -701,7 +662,7 @@ export class Scripts { case 1: return false; default: - throw this.finishedErrors({ + throw finishedErrors({ code: result, jobId, parentKey, @@ -865,7 +826,7 @@ export class Scripts { const args = this.changeDelayArgs(jobId, delay); const result = await (client).changeDelay(args); if (result < 0) { - throw this.finishedErrors({ + throw finishedErrors({ code: result, jobId, command: 'changeDelay', @@ -902,7 +863,7 @@ export class Scripts { const args = this.changePriorityArgs(jobId, priority, lifo); const result = await (client).changePriority(args); if (result < 0) { - throw this.finishedErrors({ + throw finishedErrors({ code: result, jobId, command: 'changePriority', @@ -1020,7 +981,7 @@ export class Scripts { const args = this.moveToDelayedArgs(jobId, timestamp, token, delay, opts); const result = await (client).moveToDelayed(args); if (result < 0) { - throw this.finishedErrors({ + throw finishedErrors({ code: result, jobId, command: 'moveToDelayed', @@ -1056,7 +1017,7 @@ export class Scripts { case 1: return false; default: - throw this.finishedErrors({ + throw finishedErrors({ code: result, jobId, command: 'moveToWaitingChildren', @@ -1215,7 +1176,7 @@ export class Scripts { case 1: return; default: - throw this.finishedErrors({ + throw finishedErrors({ code: result, jobId: job.id, command: 'reprocessJob', @@ -1279,7 +1240,7 @@ export class Scripts { const code = await (client).promote(keys.concat(args)); if (code < 0) { - throw this.finishedErrors({ + throw finishedErrors({ code, jobId, command: 'promote', diff --git a/src/utils.ts b/src/utils.ts index d6d5bb37d2..6d97b824a9 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -10,6 +10,10 @@ import { ChildMessage, RedisClient } from './interfaces'; import { EventEmitter } from 'events'; import * as semver from 'semver'; +import { ErrorCode } from './enums'; +import { join } from 'path'; +import { readFileSync } from 'fs'; + export const errorObject: { [index: string]: any } = { value: null }; export function tryCatch( @@ -261,3 +265,65 @@ export const toString = (value: any): string => { }; export const QUEUE_EVENT_SUFFIX = ':qe'; + +export const finishedErrors = ({ + code, + jobId, + parentKey, + command, + state, +}: { + code: number; + jobId?: string; + parentKey?: string; + command: string; + state?: string; +}): Error => { + switch (code) { + case ErrorCode.JobNotExist: + return new Error(`Missing key for job ${jobId}. ${command}`); + case ErrorCode.JobLockNotExist: + return new Error(`Missing lock for job ${jobId}. ${command}`); + case ErrorCode.JobNotInState: + return new Error(`Job ${jobId} is not in the ${state} state. ${command}`); + case ErrorCode.JobPendingDependencies: + return new Error(`Job ${jobId} has pending dependencies. ${command}`); + case ErrorCode.ParentJobNotExist: + return new Error(`Missing key for parent job ${parentKey}. ${command}`); + case ErrorCode.JobLockMismatch: + return new Error( + `Lock mismatch for job ${jobId}. Cmd ${command} from ${state}`, + ); + case ErrorCode.ParentJobCannotBeReplaced: + return new Error( + `The parent job ${parentKey} cannot be replaced. ${command}`, + ); + case ErrorCode.JobBelongsToJobScheduler: + return new Error( + `Job ${jobId} belongs to a job scheduler and cannot be removed directly`, + ); + default: + return new Error(`Unknown code ${code} error for ${jobId}. ${command}`); + } +}; + +export const readPackageJson: () => { name: string; version: string } = () => { + const packageJsonPossiblePaths = [ + join(__dirname, '../package.json'), + join(__dirname, '../../package.json'), + join(__dirname, '../../../package.json'), + ]; + + for (const path of packageJsonPossiblePaths) { + try { + return JSON.parse(readFileSync(path, 'utf-8')); + } catch (err) { + if ((err).code === 'ENOENT') { + continue; + } + console.log(err); + } + } + + return { name: 'bullmq', version: '0.0.0' }; +}; diff --git a/tests/test_queue.ts b/tests/test_queue.ts index 75cff55d5d..435ca89ced 100644 --- a/tests/test_queue.ts +++ b/tests/test_queue.ts @@ -36,6 +36,14 @@ describe('queues', function () { await connection.quit(); }); + it('should return the queue version', async () => { + const queue = new Queue(queueName, { connection }); + const version = await queue.getVersion(); + const { version: pkgJsonVersion, name } = require('../package.json'); + expect(version).to.be.equal(`${name}:${pkgJsonVersion}`); + return queue.close(); + }); + describe('.add', () => { describe('when jobId is provided as integer', () => { it('throws error', async function () { From 5f90a26b2ac37b3236528738c0f970952d38281b Mon Sep 17 00:00:00 2001 From: Manuel Astudillo Date: Sat, 12 Oct 2024 12:22:52 +0200 Subject: [PATCH 6/9] chore: fix compile error --- src/classes/job.ts | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/classes/job.ts b/src/classes/job.ts index 7cdfc9d842..740b06f81d 100644 --- a/src/classes/job.ts +++ b/src/classes/job.ts @@ -29,6 +29,7 @@ import { lengthInUtf8Bytes, parseObjectValues, tryCatch, + finishedErrors, } from '../utils'; import { Backoffs } from './backoffs'; import { Scripts, raw2NextJobData } from './scripts'; @@ -747,7 +748,7 @@ export class Job< const result = results[results.length - 1][1] as number; if (result < 0) { - throw this.scripts.finishedErrors({ + throw finishedErrors({ code: result, jobId: this.id, command, From 1419eb9e6b564f6fb2c6ead6e737db455771bb2b Mon Sep 17 00:00:00 2001 From: Manuel Astudillo Date: Sat, 12 Oct 2024 12:48:55 +0200 Subject: [PATCH 7/9] chore: fix test error --- src/utils.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/utils.ts b/src/utils.ts index 6d97b824a9..bb7caf53da 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -300,7 +300,7 @@ export const finishedErrors = ({ ); case ErrorCode.JobBelongsToJobScheduler: return new Error( - `Job ${jobId} belongs to a job scheduler and cannot be removed directly`, + `Job ${jobId} belongs to a job scheduler and cannot be removed directly. ${command}`, ); default: return new Error(`Unknown code ${code} error for ${jobId}. ${command}`); From a4b32c9ed7758bff5439a112cf6245a8643ecaa5 Mon Sep 17 00:00:00 2001 From: Manuel Astudillo Date: Sat, 12 Oct 2024 23:14:23 +0200 Subject: [PATCH 8/9] chore: move finishedErrors back to the scripts class --- src/classes/job.ts | 3 +- src/classes/scripts.ts | 76 +++++++++++++++++++++++++++++++----------- src/utils.ts | 41 ----------------------- 3 files changed, 58 insertions(+), 62 deletions(-) diff --git a/src/classes/job.ts b/src/classes/job.ts index 740b06f81d..7cdfc9d842 100644 --- a/src/classes/job.ts +++ b/src/classes/job.ts @@ -29,7 +29,6 @@ import { lengthInUtf8Bytes, parseObjectValues, tryCatch, - finishedErrors, } from '../utils'; import { Backoffs } from './backoffs'; import { Scripts, raw2NextJobData } from './scripts'; @@ -748,7 +747,7 @@ export class Job< const result = results[results.length - 1][1] as number; if (result < 0) { - throw finishedErrors({ + throw this.scripts.finishedErrors({ code: result, jobId: this.id, command, diff --git a/src/classes/scripts.ts b/src/classes/scripts.ts index 6f007b90b6..cd0dcdfdaa 100644 --- a/src/classes/scripts.ts +++ b/src/classes/scripts.ts @@ -34,12 +34,7 @@ import { RedisJobOptions, } from '../types'; import { ErrorCode } from '../enums'; -import { - array2obj, - finishedErrors, - getParentKey, - isRedisVersionLowerThan, -} from '../utils'; +import { array2obj, getParentKey, isRedisVersionLowerThan } from '../utils'; import { ChainableCommander } from 'ioredis'; export type JobData = [JobJsonRaw | number, string?]; @@ -225,7 +220,7 @@ export class Scripts { } if (result < 0) { - throw finishedErrors({ + throw this.finishedErrors({ code: result, parentKey: parentOpts.parentKey, command: 'addJob', @@ -424,7 +419,7 @@ export class Scripts { const result = await (client).removeJob(args); if (result < 0) { - throw finishedErrors({ + throw this.finishedErrors({ code: result, jobId, command: 'removeJob', @@ -463,7 +458,7 @@ export class Scripts { const result = await (client).updateData(keys.concat([dataJson])); if (result < 0) { - throw finishedErrors({ + throw this.finishedErrors({ code: result, jobId: job.id, command: 'updateData', @@ -489,7 +484,7 @@ export class Scripts { ); if (result < 0) { - throw finishedErrors({ + throw this.finishedErrors({ code: result, jobId, command: 'updateProgress', @@ -514,7 +509,7 @@ export class Scripts { ); if (result < 0) { - throw finishedErrors({ + throw this.finishedErrors({ code: result, jobId, command: 'addLog', @@ -598,7 +593,7 @@ export class Scripts { const result = await (client).moveToFinished(args); if (result < 0) { - throw finishedErrors({ + throw this.finishedErrors({ code: result, jobId, command: 'moveToFinished', @@ -662,7 +657,7 @@ export class Scripts { case 1: return false; default: - throw finishedErrors({ + throw this.finishedErrors({ code: result, jobId, parentKey, @@ -826,7 +821,7 @@ export class Scripts { const args = this.changeDelayArgs(jobId, delay); const result = await (client).changeDelay(args); if (result < 0) { - throw finishedErrors({ + throw this.finishedErrors({ code: result, jobId, command: 'changeDelay', @@ -863,7 +858,7 @@ export class Scripts { const args = this.changePriorityArgs(jobId, priority, lifo); const result = await (client).changePriority(args); if (result < 0) { - throw finishedErrors({ + throw this.finishedErrors({ code: result, jobId, command: 'changePriority', @@ -981,7 +976,7 @@ export class Scripts { const args = this.moveToDelayedArgs(jobId, timestamp, token, delay, opts); const result = await (client).moveToDelayed(args); if (result < 0) { - throw finishedErrors({ + throw this.finishedErrors({ code: result, jobId, command: 'moveToDelayed', @@ -1017,7 +1012,7 @@ export class Scripts { case 1: return false; default: - throw finishedErrors({ + throw this.finishedErrors({ code: result, jobId, command: 'moveToWaitingChildren', @@ -1176,7 +1171,7 @@ export class Scripts { case 1: return; default: - throw finishedErrors({ + throw this.finishedErrors({ code: result, jobId: job.id, command: 'reprocessJob', @@ -1240,7 +1235,7 @@ export class Scripts { const code = await (client).promote(keys.concat(args)); if (code < 0) { - throw finishedErrors({ + throw this.finishedErrors({ code, jobId, command: 'promote', @@ -1424,6 +1419,49 @@ export class Scripts { }; } } + + finishedErrors({ + code, + jobId, + parentKey, + command, + state, + }: { + code: number; + jobId?: string; + parentKey?: string; + command: string; + state?: string; + }): Error { + switch (code) { + case ErrorCode.JobNotExist: + return new Error(`Missing key for job ${jobId}. ${command}`); + case ErrorCode.JobLockNotExist: + return new Error(`Missing lock for job ${jobId}. ${command}`); + case ErrorCode.JobNotInState: + return new Error( + `Job ${jobId} is not in the ${state} state. ${command}`, + ); + case ErrorCode.JobPendingDependencies: + return new Error(`Job ${jobId} has pending dependencies. ${command}`); + case ErrorCode.ParentJobNotExist: + return new Error(`Missing key for parent job ${parentKey}. ${command}`); + case ErrorCode.JobLockMismatch: + return new Error( + `Lock mismatch for job ${jobId}. Cmd ${command} from ${state}`, + ); + case ErrorCode.ParentJobCannotBeReplaced: + return new Error( + `The parent job ${parentKey} cannot be replaced. ${command}`, + ); + case ErrorCode.JobBelongsToJobScheduler: + return new Error( + `Job ${jobId} belongs to a job scheduler and cannot be removed directly. ${command}`, + ); + default: + return new Error(`Unknown code ${code} error for ${jobId}. ${command}`); + } + } } export function raw2NextJobData(raw: any[]) { diff --git a/src/utils.ts b/src/utils.ts index bb7caf53da..bf8a009e08 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -266,47 +266,6 @@ export const toString = (value: any): string => { export const QUEUE_EVENT_SUFFIX = ':qe'; -export const finishedErrors = ({ - code, - jobId, - parentKey, - command, - state, -}: { - code: number; - jobId?: string; - parentKey?: string; - command: string; - state?: string; -}): Error => { - switch (code) { - case ErrorCode.JobNotExist: - return new Error(`Missing key for job ${jobId}. ${command}`); - case ErrorCode.JobLockNotExist: - return new Error(`Missing lock for job ${jobId}. ${command}`); - case ErrorCode.JobNotInState: - return new Error(`Job ${jobId} is not in the ${state} state. ${command}`); - case ErrorCode.JobPendingDependencies: - return new Error(`Job ${jobId} has pending dependencies. ${command}`); - case ErrorCode.ParentJobNotExist: - return new Error(`Missing key for parent job ${parentKey}. ${command}`); - case ErrorCode.JobLockMismatch: - return new Error( - `Lock mismatch for job ${jobId}. Cmd ${command} from ${state}`, - ); - case ErrorCode.ParentJobCannotBeReplaced: - return new Error( - `The parent job ${parentKey} cannot be replaced. ${command}`, - ); - case ErrorCode.JobBelongsToJobScheduler: - return new Error( - `Job ${jobId} belongs to a job scheduler and cannot be removed directly. ${command}`, - ); - default: - return new Error(`Unknown code ${code} error for ${jobId}. ${command}`); - } -}; - export const readPackageJson: () => { name: string; version: string } = () => { const packageJsonPossiblePaths = [ join(__dirname, '../package.json'), From d8d9f3d0bd0b9141edab76225f9074804316954c Mon Sep 17 00:00:00 2001 From: Manuel Astudillo Date: Sun, 13 Oct 2024 11:41:16 +0200 Subject: [PATCH 9/9] chore: remove unused dependency --- src/utils.ts | 1 - 1 file changed, 1 deletion(-) diff --git a/src/utils.ts b/src/utils.ts index 785d602974..2495e0f52a 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -10,7 +10,6 @@ import { ChildMessage, RedisClient } from './interfaces'; import { EventEmitter } from 'events'; import * as semver from 'semver'; -import { ErrorCode } from './enums'; import { join } from 'path'; import { readFileSync } from 'fs';