Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add versioning support #103

Merged
merged 1 commit into from
Dec 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 37 additions & 10 deletions lib/queue-factory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ export interface FoundQueue {
prefix: string;
name: string;
type: QueueType;
majorVersion: number;
version?: string;
}

const scanForQueues = async (node: Redis | Cluster, startTime: number) => {
Expand Down Expand Up @@ -117,9 +119,12 @@ export async function getConnectionQueues(
})
.filter((queue) => queue !== undefined)
.map(async function (queue) {
const type = await getQueueType(queue.name, queue.prefix, client);
queue.type = type;
return queue;
const { type, majorVersion, version } = await getQueueType(
queue.name,
queue.prefix,
client
);
return { ...queue, type, majorVersion, version };
})
);
return queues;
Expand Down Expand Up @@ -250,13 +255,35 @@ export function createQueue(

switch (foundQueue.type) {
case "bullmq":
return {
queue: new Queue(foundQueue.name, {
connection: getRedisClient(redisOpts, "bullmq", nodes),
prefix: foundQueue.prefix,
}),
responders: BullMQResponders,
};
const connection = getRedisClient(redisOpts, "bullmq", nodes);
switch (foundQueue.majorVersion) {
case 0:
return {
queue: new Queue(foundQueue.name, {
connection,
prefix: foundQueue.prefix,
}),
responders: BullMQResponders,
};
case 3:
const { createQueue } = require("./queue-factory/bullmqv3-factory");
return createQueue(foundQueue.name, foundQueue.prefix, connection);
case 4:
const {
createQueue: createQueueV4,
} = require("./queue-factory/bullmqv4-factory");
return createQueueV4(foundQueue.name, foundQueue.prefix, connection);
case 5:
const {
createQueue: createQueueV5,
} = require("./queue-factory/bullmqv5-factory");
return createQueueV5(foundQueue.name, foundQueue.prefix, connection);
default:
console.error(
chalk.red(`ERROR:`) +
`Unexpected major version: ${foundQueue.majorVersion} for queue ${foundQueue.name}`
);
}

case "bull":
return {
Expand Down
15 changes: 15 additions & 0 deletions lib/queue-factory/bullmqv3-factory.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
import { Queue } from "bullmq-v3";
import { Redis } from "ioredis";
import { BullMQResponders } from "../responders";

export const createQueue = (
name: string,
prefix: string,
connection: Redis,
) => ({
queue: new Queue(name, {
connection,
prefix,
}),
responders: BullMQResponders,
});
15 changes: 15 additions & 0 deletions lib/queue-factory/bullmqv4-factory.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
import { Queue } from "bullmq-v4";
import { Redis } from "ioredis";
import { BullMQResponders } from "../responders";

export const createQueue = (
name: string,
prefix: string,
connection: Redis
) => ({
queue: new Queue(name, {
connection,
prefix,
}),
responders: BullMQResponders,
});
15 changes: 15 additions & 0 deletions lib/queue-factory/bullmqv5-factory.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
import { Queue } from "bullmq-v5";
import { Redis } from "ioredis";
import { BullMQResponders } from "../responders";

export const createQueue = (
name: string,
prefix: string,
connection: Redis
) => ({
queue: new Queue(name, {
connection,
prefix,
}),
responders: BullMQResponders,
});
4 changes: 3 additions & 1 deletion lib/queues-cache.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@ export const getCache = () => {
return queuesCache;
};

export function queueKey(queue: Omit<FoundQueue, "type">) {
export function queueKey(
queue: Omit<FoundQueue, "type" | "majorVersion" | "version">
) {
return `${queue.prefix}:${queue.name}`;
}

Expand Down
2 changes: 1 addition & 1 deletion lib/responders/bull-responders.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ function paginate(
start = start || 0;
end = end || -1;
return (<any>queue)
[method](start, end, opts)
[method](start, end, opts)
.then(function (jobs: Bull.Job[]) {
respond(ws, Date.now(), messageId, jobs);
});
Expand Down
18 changes: 16 additions & 2 deletions lib/responders/bullmq-responders.ts
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,12 @@ async function respondQueueCommand(
respond(ws, startTime, msg.id, metrics);
break;
case "getDependencies":
const dependencies = await queue.getDependencies(data.parentId, data.type, data.start, data.end);
const dependencies = await queue.getDependencies(
data.parentId,
data.type,
data.start,
data.end
);
respond(ws, startTime, msg.id, dependencies);
break;

Expand All @@ -88,6 +93,7 @@ async function respondQueueCommand(
case "getCompleted":
case "getFailed":
case "getRepeatableJobs":
case "getJobSchedulers":
case "getWorkers":
paginate(ws, queue, msg.id, data.start, data.end, data.cmd, data.opts);
break;
Expand All @@ -96,13 +102,21 @@ async function respondQueueCommand(
const logs = await queue.getJobLogs(data.jobId, data.start, data.end);
respond(ws, startTime, msg.id, logs);

case "getRepeatableCount":
case "getJobSchedulersCount": {
const client = await queue.client;
const jobSchedulersKey = queue.keys.repeat;
const count = await client.zcard(jobSchedulersKey);
respond(ws, startTime, msg.id, count);
break;
}

case "getWaitingChildrenCount":
case "getWaitingCount":
case "getActiveCount":
case "getDelayedCount":
case "getCompletedCount":
case "getFailedCount":
case "getRepeatableCount":
const count = await (<any>queue)[data.cmd]();
respond(ws, startTime, msg.id, count);
break;
Expand Down
66 changes: 37 additions & 29 deletions lib/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,34 +3,6 @@ export type QueueType = "bull" | "bullmq" | "bullmq-pro";
import { RedisOptions } from "ioredis";
import * as url from "url";

export const getQueueType = async (
queueName: string,
prefix: string,
client: Redis | Cluster
): Promise<QueueType> => {
// Check if queue includes the "meta" key, if so, it is a bullmq or bullmq pro queue type.
const metaKey = `${prefix}:${queueName}:meta`;

// Check if meta key includes the field "pro"
// if so, it is a bullmq-pro queue type.
const hasMeta = await client.exists(metaKey);
if (hasMeta) {
const version = await client.hget(metaKey, "version");
if (version && version.includes("bullmq-pro")) {
return "bullmq-pro"; // Will fail unless a bullmq-pro integration is provided.
}

const maxLenEvents = await client.hget(metaKey, "opts.maxLenEvents");
if (maxLenEvents) {
return "bullmq";
}
}

// otherwise, it is a bull queue type.
return "bull";
};


export function redisOptsFromUrl(urlString: string) {
const redisOpts: RedisOptions = {};
try {
Expand All @@ -49,4 +21,40 @@ export function redisOptsFromUrl(urlString: string) {
throw new Error(e.message);
}
return redisOpts;
}
}

export const getQueueType = async (
queueName: string,
prefix: string,
client: Redis | Cluster
): Promise<{ type: QueueType; majorVersion: number; version?: string }> => {
// Check if queue includes the "meta" key, if so, it is a bullmq queue type.
const metaKey = `${prefix}:${queueName}:meta`;

// check if meta key includes the field "pro"
// if so, it is a bullmq-pro queue type.
const hasMeta = await client.exists(metaKey);
if (hasMeta) {
const longVersion = await client.hget(metaKey, "version");
const version = longVersion ? longVersion.split(":")[1] : "";

if (longVersion) {
const type = longVersion.includes("bullmq-pro") ? "bullmq-pro" : "bullmq";

// Try to get the major version number from the version string (e.g. bullmq:3.20.0)
const majorVersionStr = version?.split(".")[0];
const majorVersion = majorVersionStr ? parseInt(majorVersionStr, 10) : 0;
if (majorVersion >= 3) {
return { type, majorVersion, version };
}
}

const maxLenEvents = await client.hget(metaKey, "opts.maxLenEvents");
if (maxLenEvents) {
return { type: "bullmq", majorVersion: 0, version };
}
}

// otherwise, it is a bull queue type.
return { type: "bull", majorVersion: 0 };
};
5 changes: 4 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@
],
"dependencies": {
"bull": "^4.15.1",
"bullmq": "^5.8.3",
"bullmq": "^5.31.1",
"bullmq-v3": "npm:bullmq@^3.16.2",
"bullmq-v4": "npm:bullmq@^4.18.2",
"bullmq-v5": "npm:bullmq@^5.31.1",
"chalk": "^4.1.0",
"commander": "^12.1.0",
"ioredis": "^5.4.1",
Expand Down
Loading
Loading