Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(queue): add queue version support #2822

Merged
merged 10 commits into from
Oct 13, 2024
14 changes: 14 additions & 0 deletions src/classes/queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import { QueueGetters } from './queue-getters';
import { Repeat } from './repeat';
import { RedisConnection } from './redis-connection';
import { JobScheduler } from './job-scheduler';
import { readPackageJson } from '../utils';

export interface ObliterateOpts {
/**
Expand Down Expand Up @@ -168,11 +169,24 @@ export class Queue<
}

get metaValues(): Record<string, string | number> {
const { name, version } = readPackageJson();

return {
'opts.maxLenEvents': this.opts?.streams?.events?.maxLen ?? 10000,
version: `${name}:${version}`,
};
}

/**
* Get library version.
*
* @returns the content of the meta.library field.
*/
async getVersion(): Promise<string> {
const client = await this.client;
return await client.hget(this.keys.meta, 'version');
}

get repeat(): Promise<Repeat> {
return new Promise<Repeat>(async resolve => {
if (!this._repeat) {
Expand Down
101 changes: 50 additions & 51 deletions src/classes/scripts.ts
Original file line number Diff line number Diff line change
Expand Up @@ -398,11 +398,14 @@ export class Scripts {
return (<any>client).removeJobScheduler(keys.concat(args));
}

protected removeArgs(jobId: string, removeChildren: boolean): (string | number)[] {
protected removeArgs(
jobId: string,
removeChildren: boolean,
): (string | number)[] {
const keys: (string | number)[] = ['', 'meta'].map(name =>
this.queue.toKey(name),
);

const args = [jobId, removeChildren ? 1 : 0];

return keys.concat(args);
Expand All @@ -411,13 +414,9 @@ export class Scripts {
async remove(jobId: string, removeChildren: boolean): Promise<number> {
const client = await this.queue.client;

const args = this.removeArgs(
jobId, removeChildren
);
const args = this.removeArgs(jobId, removeChildren);

const result = await (<any>client).removeJob(
args,
);
const result = await (<any>client).removeJob(args);

if (result < 0) {
throw this.finishedErrors({
Expand Down Expand Up @@ -607,49 +606,6 @@ export class Scripts {
}
}

finishedErrors({
code,
jobId,
parentKey,
command,
state,
}: {
code: number;
jobId?: string;
parentKey?: string;
command: string;
state?: string;
}): Error {
switch (code) {
case ErrorCode.JobNotExist:
return new Error(`Missing key for job ${jobId}. ${command}`);
case ErrorCode.JobLockNotExist:
return new Error(`Missing lock for job ${jobId}. ${command}`);
case ErrorCode.JobNotInState:
return new Error(
`Job ${jobId} is not in the ${state} state. ${command}`,
);
case ErrorCode.JobPendingDependencies:
return new Error(`Job ${jobId} has pending dependencies. ${command}`);
case ErrorCode.ParentJobNotExist:
return new Error(`Missing key for parent job ${parentKey}. ${command}`);
case ErrorCode.JobLockMismatch:
return new Error(
`Lock mismatch for job ${jobId}. Cmd ${command} from ${state}`,
);
case ErrorCode.ParentJobCannotBeReplaced:
return new Error(
`The parent job ${parentKey} cannot be replaced. ${command}`,
);
case ErrorCode.JobBelongsToJobScheduler:
return new Error(
`Job ${jobId} belongs to a job scheduler and cannot be removed directly. ${command}`,
);
default:
return new Error(`Unknown code ${code} error for ${jobId}. ${command}`);
}
}

private drainArgs(delayed: boolean): (string | number)[] {
const queueKeys = this.queue.keys;

Expand Down Expand Up @@ -1463,6 +1419,49 @@ export class Scripts {
};
}
}

finishedErrors({
code,
jobId,
parentKey,
command,
state,
}: {
code: number;
jobId?: string;
parentKey?: string;
command: string;
state?: string;
}): Error {
switch (code) {
case ErrorCode.JobNotExist:
return new Error(`Missing key for job ${jobId}. ${command}`);
case ErrorCode.JobLockNotExist:
return new Error(`Missing lock for job ${jobId}. ${command}`);
case ErrorCode.JobNotInState:
return new Error(
`Job ${jobId} is not in the ${state} state. ${command}`,
);
case ErrorCode.JobPendingDependencies:
return new Error(`Job ${jobId} has pending dependencies. ${command}`);
case ErrorCode.ParentJobNotExist:
return new Error(`Missing key for parent job ${parentKey}. ${command}`);
case ErrorCode.JobLockMismatch:
return new Error(
`Lock mismatch for job ${jobId}. Cmd ${command} from ${state}`,
);
case ErrorCode.ParentJobCannotBeReplaced:
return new Error(
`The parent job ${parentKey} cannot be replaced. ${command}`,
);
case ErrorCode.JobBelongsToJobScheduler:
return new Error(
`Job ${jobId} belongs to a job scheduler and cannot be removed directly. ${command}`,
);
default:
return new Error(`Unknown code ${code} error for ${jobId}. ${command}`);
}
}
}

export function raw2NextJobData(raw: any[]) {
Expand Down
24 changes: 24 additions & 0 deletions src/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ import { ChildMessage, RedisClient } from './interfaces';
import { EventEmitter } from 'events';
import * as semver from 'semver';

import { join } from 'path';
import { readFileSync } from 'fs';

export const errorObject: { [index: string]: any } = { value: null };

export function tryCatch(
Expand Down Expand Up @@ -261,3 +264,24 @@ export const toString = (value: any): string => {
};

export const QUEUE_EVENT_SUFFIX = ':qe';

export const readPackageJson: () => { name: string; version: string } = () => {
const packageJsonPossiblePaths = [
join(__dirname, '../package.json'),
join(__dirname, '../../package.json'),
join(__dirname, '../../../package.json'),
];

for (const path of packageJsonPossiblePaths) {
try {
return JSON.parse(readFileSync(path, 'utf-8'));
} catch (err) {
if ((<any>err).code === 'ENOENT') {
continue;
}
console.log(err);
}
}

return { name: 'bullmq', version: '0.0.0' };
};
8 changes: 8 additions & 0 deletions tests/test_queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,14 @@ describe('queues', function () {
await connection.quit();
});

it('should return the queue version', async () => {
const queue = new Queue(queueName, { connection });
const version = await queue.getVersion();
const { version: pkgJsonVersion, name } = require('../package.json');
expect(version).to.be.equal(`${name}:${pkgJsonVersion}`);
return queue.close();
});

describe('.add', () => {
describe('when jobId is provided as integer', () => {
it('throws error', async function () {
Expand Down
Loading