From 781e963cca48dcde7063fab13cd4148522480848 Mon Sep 17 00:00:00 2001 From: Bartosz Hernas Date: Fri, 17 Nov 2023 16:15:30 +0200 Subject: [PATCH] fix: correctly discover queues in redis clusters --- README.md | 14 +++++++++++++- index.js | 4 ++-- lib/queue-factory.ts | 44 ++++++++++++++++++++++++++++---------------- lib/socket.ts | 23 +---------------------- lib/utils.ts | 23 +++++++++++++++++++++++ 5 files changed, 67 insertions(+), 41 deletions(-) diff --git a/README.md b/README.md index f29612b..a8a7e43 100644 --- a/README.md +++ b/README.md @@ -48,6 +48,7 @@ Call the tool and get a help on the options: -s, --sentinels [host:port] comma-separated list of sentinel host/port pairs -m, --master [name] name of master node used in sentinel configuration -h, --help output usage information + --nodes [nodes] comma-separated list of cluster nodes uris to connect to (Redis Cluster) ``` Example: @@ -77,7 +78,18 @@ sentinel-password REDIS_SENTINEL_PASSWD uri REDIS_URI sentinels REDIS_SENTINELS (comma separated list of sentinels) master REDIS_MASTER -nodes REDIS_NODES (comma separated list of nodes) +nodes REDIS_NODES (comma separated list of nodes for Redis Cluster) +``` + + +Note for Redis Cluster: You may also need to specify following with environment variables. +```bash +Cluster TLS Certificate REDIS_CLUSTER_TLS +``` + +If your redis cluster still cannot connect due to failing certificate validation, you may need to pass this env to skip cert validation. +```bash +NODE_TLS_REJECT_UNAUTHORIZED="0" ``` ## Secured TLS Connections diff --git a/index.js b/index.js index 684fd94..351e33d 100755 --- a/index.js +++ b/index.js @@ -58,7 +58,7 @@ program .option( "--nodes ", "comma-separated list of cluster nodes uris to connect to", - process.env.REDIS_NODES ? process.env.REDIS_NODES.split(",") : undefined + process.env.REDIS_NODES ? process.env.REDIS_NODES : undefined ) .parse(process.argv); @@ -113,7 +113,7 @@ lastestVersion(name).then(function (newestVersion) { const { Socket } = require("./dist/socket"); Socket(program.name, program.backend, program.token, connection, { team: program.team, - nodes: program.nodes, + nodes: program.nodes ? program.nodes.split(",") : undefined, }); }); diff --git a/lib/queue-factory.ts b/lib/queue-factory.ts index eb9752b..1ecf6a4 100644 --- a/lib/queue-factory.ts +++ b/lib/queue-factory.ts @@ -1,6 +1,6 @@ import { Redis, Cluster, RedisOptions } from "ioredis"; -import { QueueType, getQueueType } from "./utils"; +import { QueueType, getQueueType, redisOptsFromUrl } from "./utils"; import { Queue } from "bullmq"; import * as Bull from "bull"; import { BullMQResponders, BullResponders } from "./responders"; @@ -23,23 +23,25 @@ export interface FoundQueue { } const getQueueKeys = async (client: Redis | Cluster) => { - let keys = [], - cursor = "0"; + let nodes = "nodes" in client ? client.nodes('master'): [client] + let keys = []; const startTime = Date.now(); - do { - const [nextCursor, scannedKeys] = await client.scan( - cursor, - "MATCH", - "*:*:id", - "COUNT", - maxCount - ); - cursor = nextCursor; - - keys.push(...scannedKeys); - } while (Date.now() - startTime < maxTime && cursor !== "0"); + for await (const node of nodes) { + let cursor = "0"; + do { + const [nextCursor, scannedKeys] = await node.scan( + cursor, + "MATCH", + "*:*:id", + "COUNT", + maxCount + ); + cursor = nextCursor; + keys.push(...scannedKeys); + } while (Date.now() - startTime < maxTime && cursor !== "0"); + } return keys; }; @@ -102,7 +104,17 @@ export function getRedisClient( ) { if (!redisClients[type]) { if (clusterNodes && clusterNodes.length) { - redisClients[type] = new Redis.Cluster(clusterNodes, redisOpts); + const { username, password } = redisOptsFromUrl(clusterNodes[0]) + redisClients[type] = new Redis.Cluster(clusterNodes, { + ...redisOpts, + redisOptions: { + username, + password, + tls: process.env.REDIS_CLUSTER_TLS ? { + cert: Buffer.from(process.env.REDIS_CLUSTER_TLS ?? '', 'base64').toString('ascii') + } : undefined, + } + }); } else { redisClients[type] = new Redis(redisOpts); } diff --git a/lib/socket.ts b/lib/socket.ts index 620901b..7ead9f7 100644 --- a/lib/socket.ts +++ b/lib/socket.ts @@ -1,10 +1,9 @@ import { RedisOptions } from "ioredis"; import { pick } from "lodash"; -import * as url from "url"; import { getCache, updateQueuesCache, queueKey } from "./queues-cache"; import { WebSocketClient } from "./ws-autoreconnect"; import { execRedisCommand, getRedisInfo, ping } from "./queue-factory"; -import { getQueueType } from "./utils"; +import { getQueueType, redisOptsFromUrl } from "./utils"; import { Integration } from "./interfaces/integration"; const { version } = require(`${__dirname}/../package.json`); @@ -260,23 +259,3 @@ function redisOptsFromConnection(connection: Connection): RedisOptions { }; return opts; } - -function redisOptsFromUrl(urlString: string) { - const redisOpts: RedisOptions = {}; - try { - const redisUrl = url.parse(urlString); - redisOpts.port = parseInt(redisUrl.port) || 6379; - redisOpts.host = redisUrl.hostname; - redisOpts.db = redisUrl.pathname - ? parseInt(redisUrl.pathname.split("/")[1]) - : 0; - if (redisUrl.auth) { - const username = redisUrl.auth.split(":")[0]; - redisOpts.username = username ? username : undefined; - redisOpts.password = redisUrl.auth.split(":")[1]; - } - } catch (e) { - throw new Error(e.message); - } - return redisOpts; -} diff --git a/lib/utils.ts b/lib/utils.ts index 0da57a0..71fd51e 100644 --- a/lib/utils.ts +++ b/lib/utils.ts @@ -1,5 +1,7 @@ import { Redis, Cluster } from "ioredis"; export type QueueType = "bull" | "bullmq" | "bullmq-pro"; +import { RedisOptions } from "ioredis"; +import * as url from "url"; export const getQueueType = async ( queueName: string, @@ -27,3 +29,24 @@ export const getQueueType = async ( // otherwise, it is a bull queue type. return "bull"; }; + + +export function redisOptsFromUrl(urlString: string) { + const redisOpts: RedisOptions = {}; + try { + const redisUrl = url.parse(urlString); + redisOpts.port = parseInt(redisUrl.port) || 6379; + redisOpts.host = redisUrl.hostname; + redisOpts.db = redisUrl.pathname + ? parseInt(redisUrl.pathname.split("/")[1]) + : 0; + if (redisUrl.auth) { + const username = redisUrl.auth.split(":")[0]; + redisOpts.username = username ? username : undefined; + redisOpts.password = redisUrl.auth.split(":")[1]; + } + } catch (e) { + throw new Error(e.message); + } + return redisOpts; +} \ No newline at end of file