From 72e3deb5bc619ca1c9638916b689c7a0f6507ba2 Mon Sep 17 00:00:00 2001 From: Manuel Astudillo Date: Fri, 30 Aug 2024 16:05:06 +0200 Subject: [PATCH] fix: prefix redis cache connection cache with checksum --- lib/queue-factory.ts | 58 +++++++++++++++++++++++++++----------------- package.json | 2 +- 2 files changed, 37 insertions(+), 23 deletions(-) diff --git a/lib/queue-factory.ts b/lib/queue-factory.ts index ece4c2b..36470e2 100644 --- a/lib/queue-factory.ts +++ b/lib/queue-factory.ts @@ -14,7 +14,7 @@ const maxCount = 150000; const maxTime = 40000; // We keep a redis client that we can reuse for all the queues. -let redisClients: Record<"bull" | "bullmq", Redis | Cluster> = {} as any; +let redisClients: Record = {} as any; export interface FoundQueue { prefix: string; @@ -41,20 +41,18 @@ const scanForQueues = async (node: Redis | Cluster, startTime: number) => { } while (Date.now() - startTime < maxTime && cursor !== "0"); return keys; -} +}; const getQueueKeys = async (client: Redis | Cluster, queueNames?: string[]) => { - let nodes = "nodes" in client ? client.nodes('master') : [client] + let nodes = "nodes" in client ? client.nodes("master") : [client]; let keys = []; const startTime = Date.now(); const foundQueues = new Set(); for await (const node of nodes) { - // If we have proposed queue names, lets check if they exist (including prefix) // Basically checking if there is a id key for the queue (prefix:name:id) if (queueNames) { - const queueKeys = queueNames.map((queueName) => { // Separate queue name from prefix let [prefix, name] = queueName.split(":"); @@ -82,13 +80,14 @@ const getQueueKeys = async (client: Redis | Cluster, queueNames?: string[]) => { const match = queueNameRegExp.exec(key); console.log( chalk.yellow("Redis:") + - chalk.red(` Queue "${match[1]}:${match[2]}" not found in Redis. Skipping...`) + chalk.red( + ` Queue "${match[1]}:${match[2]}" not found in Redis. Skipping...` + ) ); } } - } else { - keys.push(...await scanForQueues(node, startTime)); + keys.push(...(await scanForQueues(node, startTime))); } } return keys; @@ -152,37 +151,52 @@ export function getRedisClient( type: "bull" | "bullmq", clusterNodes?: string[] ) { - if (!redisClients[type]) { + // Compute checksum for redisOpts + const checksumJson = JSON.stringify(redisOpts); + const checksum = require("crypto") + .createHash("md5") + .update(checksumJson) + .digest("hex"); + + const key = `${type}-${checksum}`; + + if (!redisClients[key]) { if (clusterNodes && clusterNodes.length) { - const { username, password } = redisOptsFromUrl(clusterNodes[0]) - redisClients[type] = new Redis.Cluster(clusterNodes, { + const { username, password } = redisOptsFromUrl(clusterNodes[0]); + redisClients[key] = new Redis.Cluster(clusterNodes, { ...redisOpts, redisOptions: { username, password, - tls: process.env.REDIS_CLUSTER_TLS ? { - cert: Buffer.from(process.env.REDIS_CLUSTER_TLS ?? '', 'base64').toString('ascii') - } : undefined, - } + tls: process.env.REDIS_CLUSTER_TLS + ? { + cert: Buffer.from( + process.env.REDIS_CLUSTER_TLS ?? "", + "base64" + ).toString("ascii"), + } + : undefined, + }, }); } else { - redisClients[type] = new Redis(redisOpts); + redisClients[key] = new Redis(redisOpts); } - redisClients[type].on("error", (err: Error) => { + redisClients[key].on("error", (err: Error) => { console.log( - `${chalk.yellow("Redis:")} ${chalk.red("redis connection error")} ${err.message + `${chalk.yellow("Redis:")} ${chalk.red("redis connection error")} ${ + err.message }` ); }); - redisClients[type].on("connect", () => { + redisClients[key].on("connect", () => { console.log( `${chalk.yellow("Redis:")} ${chalk.green("connected to redis server")}` ); }); - redisClients[type].on("end", () => { + redisClients[key].on("end", () => { console.log( `${chalk.yellow("Redis:")} ${chalk.blueBright( "disconnected from redis server" @@ -191,7 +205,7 @@ export function getRedisClient( }); } - return redisClients[type]; + return redisClients[key]; } export async function execRedisCommand( @@ -255,7 +269,7 @@ export function createQueue( default: console.error( chalk.red(`ERROR:`) + - `Unexpected queue type: ${foundQueue.type} for queue ${foundQueue.name}` + `Unexpected queue type: ${foundQueue.type} for queue ${foundQueue.name}` ); } } diff --git a/package.json b/package.json index 4a44ebc..243a89b 100644 --- a/package.json +++ b/package.json @@ -47,7 +47,7 @@ "build": "tsc", "test": "jest", "start": "node app.js", - "prepare": "yarn build" + "prepare": "npm run build" }, "repository": { "type": "git",