Skip to content

Commit

Permalink
fix: use different connections for bull and bullmq
Browse files Browse the repository at this point in the history
  • Loading branch information
manast committed Aug 8, 2023
1 parent 8db1360 commit a7ebf24
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 18 deletions.
23 changes: 12 additions & 11 deletions lib/queue-factory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ const maxCount = 50000;
const maxTime = 30000;

// We keep a redis client that we can reuse for all the queues.
let redisClient: Redis | Cluster;
let redisClients: Record<"bull" | "bullmq", Redis | Cluster> = {} as any;

export interface FoundQueue {
prefix: string;
Expand Down Expand Up @@ -97,30 +97,31 @@ export async function getRedisInfo(

export function getRedisClient(
redisOpts: RedisOptions,
type: "bull" | "bullmq",
clusterNodes?: string[]
) {
if (!redisClient) {
if (!redisClients[type]) {
if (clusterNodes && clusterNodes.length) {
redisClient = new Redis.Cluster(clusterNodes, redisOpts);
redisClients[type] = new Redis.Cluster(clusterNodes, redisOpts);
} else {
redisClient = new Redis(redisOpts);
redisClients[type] = new Redis(redisOpts);
}

redisClient.on("error", (err: Error) => {
redisClients[type].on("error", (err: Error) => {
console.log(
`${chalk.yellow("Redis:")} ${chalk.red("redis connection error")} ${
err.message
}`
);
});

redisClient.on("connect", () => {
redisClients[type].on("connect", () => {
console.log(
`${chalk.yellow("Redis:")} ${chalk.green("connected to redis server")}`
);
});

redisClient.on("end", () => {
redisClients[type].on("end", () => {
console.log(
`${chalk.yellow("Redis:")} ${chalk.blueBright(
"disconnected from redis server"
Expand All @@ -129,15 +130,15 @@ export function getRedisClient(
});
}

return redisClient;
return redisClients[type];
}

export async function execRedisCommand(
redisOpts: RedisOptions,
cb: (client: Redis | Cluster) => any,
clusterNodes?: string[]
) {
const redisClient = getRedisClient(redisOpts, clusterNodes);
const redisClient = getRedisClient(redisOpts, "bull", clusterNodes);

const result = await cb(redisClient);

Expand All @@ -158,7 +159,7 @@ export function createQueue(
const createClient = function (type: "client" /*, redisOpts */) {
switch (type) {
case "client":
return getRedisClient(redisOpts, nodes);
return getRedisClient(redisOpts, "bull", nodes);
default:
throw new Error(`Unexpected connection type: ${type}`);
}
Expand All @@ -176,7 +177,7 @@ export function createQueue(
case "bullmq":
return {
queue: new Queue(foundQueue.name, {
connection: getRedisClient(redisOpts, nodes),
connection: getRedisClient(redisOpts, "bullmq", nodes),
prefix: foundQueue.prefix,
}),
responders: BullMQResponders,
Expand Down
3 changes: 2 additions & 1 deletion lib/responders/bullmq-responders.ts
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,8 @@ async function respondQueueCommand(
respond(ws, msg.id);
break;
case "add":
await queue.add(...(data.args as [string, object, object]));
const [name, jobData, opts] = data.args as [string, object, object];
await queue.add(name, jobData, opts);
respond(ws, msg.id);
break;
case "empty":
Expand Down
18 changes: 12 additions & 6 deletions lib/socket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -191,12 +191,18 @@ export const Socket = (
{
const queues = await updateQueuesCache(redisOpts, opts);

console.log(
`${chalk.yellow("WebSocket:")} ${chalk.blueBright(
" sending queues "
)}`,
queues
);
for (const queue of queues) {
const { name, prefix, type } = queue;
console.log(
`${chalk.yellow("WebSocket:")} ${chalk.blueBright(
"Sending queue:"
)} ${chalk.green(name)} ${chalk.blueBright(
"type:"
)} ${chalk.green(type)} ${chalk.blueBright(
"prefix:"
)} ${chalk.green(prefix)}`
);
}

respond(msg.id, queues);
}
Expand Down

0 comments on commit a7ebf24

Please sign in to comment.