Skip to content
Permalink

Comparing changes

Choose two branches to see what’s changed or to start a new pull request. If you need to, you can also or learn more about diff comparisons.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also . Learn more about diff comparisons here.
base repository: taskforcesh/bullmq
Failed to load repositories. Confirm that selected base ref is valid, then try again.
Loading
base: v5.19.1
Choose a base ref
...
head repository: taskforcesh/bullmq
Failed to load repositories. Confirm that selected head ref is valid, then try again.
Loading
compare: v5.20.0
Choose a head ref
  • 2 commits
  • 6 files changed
  • 2 contributors

Commits on Oct 13, 2024

  1. Verified

    This commit was created on GitHub.com and signed with GitHub’s verified signature.
    Copy the full SHA
    3a4781b View commit details
  2. chore(release): 5.20.0 [skip ci]

    # [5.20.0](v5.19.1...v5.20.0) (2024-10-13)
    
    ### Features
    
    * **queue:** add queue version support ([#2822](#2822)) ([3a4781b](3a4781b))
    semantic-release-bot committed Oct 13, 2024
    Copy the full SHA
    073baf6 View commit details
Showing with 104 additions and 52 deletions.
  1. +7 −0 docs/gitbook/changelog.md
  2. +1 −1 package.json
  3. +14 −0 src/classes/queue.ts
  4. +50 −51 src/classes/scripts.ts
  5. +24 −0 src/utils.ts
  6. +8 −0 tests/test_queue.ts
7 changes: 7 additions & 0 deletions docs/gitbook/changelog.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
# [5.20.0](https://github.com/taskforcesh/bullmq/compare/v5.19.1...v5.20.0) (2024-10-13)


### Features

* **queue:** add queue version support ([#2822](https://github.com/taskforcesh/bullmq/issues/2822)) ([3a4781b](https://github.com/taskforcesh/bullmq/commit/3a4781bf7cadf04f6a324871654eed8f01cdadae))

## [5.19.1](https://github.com/taskforcesh/bullmq/compare/v5.19.0...v5.19.1) (2024-10-12)


2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "bullmq",
"version": "5.19.1",
"version": "5.20.0",
"description": "Queue for messages and jobs based on Redis",
"homepage": "https://bullmq.io/",
"main": "./dist/cjs/index.js",
14 changes: 14 additions & 0 deletions src/classes/queue.ts
Original file line number Diff line number Diff line change
@@ -13,6 +13,7 @@ import { QueueGetters } from './queue-getters';
import { Repeat } from './repeat';
import { RedisConnection } from './redis-connection';
import { JobScheduler } from './job-scheduler';
import { readPackageJson } from '../utils';

export interface ObliterateOpts {
/**
@@ -168,11 +169,24 @@ export class Queue<
}

get metaValues(): Record<string, string | number> {
const { name, version } = readPackageJson();

return {
'opts.maxLenEvents': this.opts?.streams?.events?.maxLen ?? 10000,
version: `${name}:${version}`,
};
}

/**
* Get library version.
*
* @returns the content of the meta.library field.
*/
async getVersion(): Promise<string> {
const client = await this.client;
return await client.hget(this.keys.meta, 'version');
}

get repeat(): Promise<Repeat> {
return new Promise<Repeat>(async resolve => {
if (!this._repeat) {
101 changes: 50 additions & 51 deletions src/classes/scripts.ts
Original file line number Diff line number Diff line change
@@ -398,11 +398,14 @@ export class Scripts {
return (<any>client).removeJobScheduler(keys.concat(args));
}

protected removeArgs(jobId: string, removeChildren: boolean): (string | number)[] {
protected removeArgs(
jobId: string,
removeChildren: boolean,
): (string | number)[] {
const keys: (string | number)[] = ['', 'meta'].map(name =>
this.queue.toKey(name),
);

const args = [jobId, removeChildren ? 1 : 0];

return keys.concat(args);
@@ -411,13 +414,9 @@ export class Scripts {
async remove(jobId: string, removeChildren: boolean): Promise<number> {
const client = await this.queue.client;

const args = this.removeArgs(
jobId, removeChildren
);
const args = this.removeArgs(jobId, removeChildren);

const result = await (<any>client).removeJob(
args,
);
const result = await (<any>client).removeJob(args);

if (result < 0) {
throw this.finishedErrors({
@@ -607,49 +606,6 @@ export class Scripts {
}
}

finishedErrors({
code,
jobId,
parentKey,
command,
state,
}: {
code: number;
jobId?: string;
parentKey?: string;
command: string;
state?: string;
}): Error {
switch (code) {
case ErrorCode.JobNotExist:
return new Error(`Missing key for job ${jobId}. ${command}`);
case ErrorCode.JobLockNotExist:
return new Error(`Missing lock for job ${jobId}. ${command}`);
case ErrorCode.JobNotInState:
return new Error(
`Job ${jobId} is not in the ${state} state. ${command}`,
);
case ErrorCode.JobPendingDependencies:
return new Error(`Job ${jobId} has pending dependencies. ${command}`);
case ErrorCode.ParentJobNotExist:
return new Error(`Missing key for parent job ${parentKey}. ${command}`);
case ErrorCode.JobLockMismatch:
return new Error(
`Lock mismatch for job ${jobId}. Cmd ${command} from ${state}`,
);
case ErrorCode.ParentJobCannotBeReplaced:
return new Error(
`The parent job ${parentKey} cannot be replaced. ${command}`,
);
case ErrorCode.JobBelongsToJobScheduler:
return new Error(
`Job ${jobId} belongs to a job scheduler and cannot be removed directly. ${command}`,
);
default:
return new Error(`Unknown code ${code} error for ${jobId}. ${command}`);
}
}

private drainArgs(delayed: boolean): (string | number)[] {
const queueKeys = this.queue.keys;

@@ -1463,6 +1419,49 @@ export class Scripts {
};
}
}

finishedErrors({
code,
jobId,
parentKey,
command,
state,
}: {
code: number;
jobId?: string;
parentKey?: string;
command: string;
state?: string;
}): Error {
switch (code) {
case ErrorCode.JobNotExist:
return new Error(`Missing key for job ${jobId}. ${command}`);
case ErrorCode.JobLockNotExist:
return new Error(`Missing lock for job ${jobId}. ${command}`);
case ErrorCode.JobNotInState:
return new Error(
`Job ${jobId} is not in the ${state} state. ${command}`,
);
case ErrorCode.JobPendingDependencies:
return new Error(`Job ${jobId} has pending dependencies. ${command}`);
case ErrorCode.ParentJobNotExist:
return new Error(`Missing key for parent job ${parentKey}. ${command}`);
case ErrorCode.JobLockMismatch:
return new Error(
`Lock mismatch for job ${jobId}. Cmd ${command} from ${state}`,
);
case ErrorCode.ParentJobCannotBeReplaced:
return new Error(
`The parent job ${parentKey} cannot be replaced. ${command}`,
);
case ErrorCode.JobBelongsToJobScheduler:
return new Error(
`Job ${jobId} belongs to a job scheduler and cannot be removed directly. ${command}`,
);
default:
return new Error(`Unknown code ${code} error for ${jobId}. ${command}`);
}
}
}

export function raw2NextJobData(raw: any[]) {
24 changes: 24 additions & 0 deletions src/utils.ts
Original file line number Diff line number Diff line change
@@ -10,6 +10,9 @@ import { ChildMessage, RedisClient } from './interfaces';
import { EventEmitter } from 'events';
import * as semver from 'semver';

import { join } from 'path';
import { readFileSync } from 'fs';

export const errorObject: { [index: string]: any } = { value: null };

export function tryCatch(
@@ -261,3 +264,24 @@ export const toString = (value: any): string => {
};

export const QUEUE_EVENT_SUFFIX = ':qe';

export const readPackageJson: () => { name: string; version: string } = () => {
const packageJsonPossiblePaths = [
join(__dirname, '../package.json'),
join(__dirname, '../../package.json'),
join(__dirname, '../../../package.json'),
];

for (const path of packageJsonPossiblePaths) {
try {
return JSON.parse(readFileSync(path, 'utf-8'));
} catch (err) {
if ((<any>err).code === 'ENOENT') {
continue;
}
console.log(err);
}
}

return { name: 'bullmq', version: '0.0.0' };
};
8 changes: 8 additions & 0 deletions tests/test_queue.ts
Original file line number Diff line number Diff line change
@@ -36,6 +36,14 @@ describe('queues', function () {
await connection.quit();
});

it('should return the queue version', async () => {
const queue = new Queue(queueName, { connection });
const version = await queue.getVersion();
const { version: pkgJsonVersion, name } = require('../package.json');
expect(version).to.be.equal(`${name}:${pkgJsonVersion}`);
return queue.close();
});

describe('.add', () => {
describe('when jobId is provided as integer', () => {
it('throws error', async function () {