Skip to content

Commit

Permalink
feat: add support for specifying queue names
Browse files Browse the repository at this point in the history
  • Loading branch information
manast committed Jun 10, 2024
1 parent 13279b6 commit 8982572
Show file tree
Hide file tree
Showing 6 changed files with 127 additions and 46 deletions.
64 changes: 49 additions & 15 deletions index.js
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
#! /usr/bin/env node
const program = require("commander");
const { Command, Option } = require("commander");
const { name, version } = require(__dirname + "/package.json");
const chalk = require("chalk");

// Check version
const lastestVersion = require("latest-version");
const semver = require("semver");

const program = new Command();
program
.version(version)

Expand Down Expand Up @@ -60,6 +61,13 @@ program
"comma-separated list of cluster nodes uris to connect to",
process.env.REDIS_NODES ? process.env.REDIS_NODES : undefined
)
.option("--queues <queues>", "comma-separated list of queues to monitor")
.addOption(
new Option(
"--queuesFile <queuesFile>",
"file with queues to monitor"
).conflicts("queues")
)
.parse(process.argv);

console.info(
Expand All @@ -68,6 +76,8 @@ console.info(
)
);

const options = program.opts();

lastestVersion(name).then(function (newestVersion) {
if (semver.gt(newestVersion, version)) {
console.error(
Expand All @@ -78,7 +88,7 @@ lastestVersion(name).then(function (newestVersion) {
)
);
}
if (!program.token) {
if (!options.token) {
console.error(
chalk.red(
`ERROR: A valid token is required, use either TASKFORCE_TOKEN env or pass it with -t (get token at https://taskforce.sh)`
Expand All @@ -87,33 +97,40 @@ lastestVersion(name).then(function (newestVersion) {
process.exit(1);
}

const queueNames = options.queuesFile
? parseQueuesFile(options.queuesFile)
: options.queues
? parseQueues(options.queues)
: undefined;

const connection = {
port: program.port,
host: program.host,
password: program.passwd,
sentinelPassword: program.spasswd,
db: program.database,
uri: program.uri,
tls: program.tls
port: options.port,
host: options.host,
password: options.passwd,
sentinelPassword: options.spasswd,
db: options.database,
uri: options.uri,
tls: options.tls
? {
rejectUnauthorized: false,
requestCert: true,
agent: false,
}
: void 0,
sentinels:
program.sentinels &&
program.sentinels.split(",").map((hostPort) => {
options.sentinels &&
options.sentinels.split(",").map((hostPort) => {
const [host, port] = hostPort.split(":");
return { host, port };
}),
name: program.master,
name: options.master,
};

const { Socket } = require("./dist/socket");
Socket(program.name, program.backend, program.token, connection, {
team: program.team,
nodes: program.nodes ? program.nodes.split(",") : undefined,
Socket(options.name, options.backend, options.token, connection, {
team: options.team,
nodes: options.nodes ? options.nodes.split(",") : undefined,
queueNames,
});
});

Expand All @@ -125,3 +142,20 @@ process.on("uncaughtException", function (err) {
process.on("unhandledRejection", (reason, promise) => {
console.error({ promise, reason }, "Unhandled Rejection at: Promise");
});

function parseQueuesFile(file) {
// Load the queues from the file. The file must be a list of queues separated by new lines
const fs = require("fs");
const path = require("path");
const queuesFile = path.resolve(file);
if (fs.existsSync(queuesFile)) {
return fs.readFileSync(queuesFile, "utf8").split("\n").filter(Boolean);
} else {
console.error(chalk.red(`ERROR: File ${queuesFile} does not exist`));
process.exit(1);
}
}

function parseQueues(queuesString) {
return queuesString.split(",");
}
82 changes: 65 additions & 17 deletions lib/queue-factory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,39 +22,87 @@ export interface FoundQueue {
type: QueueType;
}

const getQueueKeys = async (client: Redis | Cluster) => {
const scanForQueues = async (node: Redis | Cluster, startTime: number) => {
let cursor = "0";
const keys = [];
do {
const [nextCursor, scannedKeys] = await node.scan(
cursor,
"MATCH",
"*:*:id",
"COUNT",
maxCount,
"TYPE",
"string"
);
cursor = nextCursor;

keys.push(...scannedKeys);
} while (Date.now() - startTime < maxTime && cursor !== "0");

return keys;
}

const getQueueKeys = async (client: Redis | Cluster, queueNames?: string[]) => {
let nodes = "nodes" in client ? client.nodes('master') : [client]
let keys = [];
const startTime = Date.now();
const foundQueues = new Set<string>();

for await (const node of nodes) {
let cursor = "0";
do {
const [nextCursor, scannedKeys] = await node.scan(
cursor,
"MATCH",
"*:*:id",
"COUNT",
maxCount,
"TYPE",
"string"
);
cursor = nextCursor;

keys.push(...scannedKeys);
} while (Date.now() - startTime < maxTime && cursor !== "0");
// If we have proposed queue names, lets check if they exist (including prefix)
// Basically checking if there is a id key for the queue (prefix:name:id)
if (queueNames) {

const queueKeys = queueNames.map((queueName) => {
// Separate queue name from prefix
let [prefix, name] = queueName.split(":");
if (!name) {
name = prefix;
prefix = "bull";
}

// If the queue name includes a prefix use that, otherwise use the default prefix "bull"
return `${prefix}:${name}:id`;
});

for (const key of queueKeys) {
const exists = await node.exists(key);
if (exists) {
foundQueues.add(key);
}
}
keys.push(...foundQueues);

// Warn for missing queues
for (const key of queueKeys) {
if (!foundQueues.has(key)) {
// Extract queue name from key
const match = queueNameRegExp.exec(key);
console.log(
chalk.yellow("Redis:") +
chalk.red(` Queue "${match[1]}:${match[2]}" not found in Redis. Skipping...`)
);
}
}

} else {
keys.push(...await scanForQueues(node, startTime));
}
}
return keys;
};

export async function getConnectionQueues(
redisOpts: RedisOptions,
clusterNodes?: string[]
clusterNodes?: string[],
queueNames?: string[]
): Promise<FoundQueue[]> {
const queues = await execRedisCommand(
redisOpts,
async (client) => {
const keys = await getQueueKeys(client);
const keys = await getQueueKeys(client, queueNames);

const queues = await Promise.all(
keys
Expand Down
5 changes: 3 additions & 2 deletions lib/queues-cache.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,11 @@ export async function updateQueuesCache(
integrations?: {
[key: string]: Integration;
};
queueNames?: string[];
} = {}
) {
const { nodes, integrations } = opts;
const newQueues = await getConnectionQueues(redisOpts, nodes);
const { nodes, integrations, queueNames } = opts;
const newQueues = await getConnectionQueues(redisOpts, nodes, queueNames);

queuesCache = queuesCache || {};

Expand Down
12 changes: 5 additions & 7 deletions lib/socket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,10 @@ export const Socket = (
integrations?: {
[key: string]: Integration;
};
queueNames?: string[]
} = {}
) => {
const { team, nodes, integrations } = opts;
const { team, nodes } = opts;
const ws = new WebSocketClient();
const redisOpts = redisOptsFromConnection(connection);

Expand Down Expand Up @@ -86,10 +87,7 @@ export const Socket = (
//
// Send this connection.
//
const queues = await updateQueuesCache(redisOpts, {
nodes,
integrations,
});
const queues = await updateQueuesCache(redisOpts, opts);
console.log(
`${chalk.yellow("WebSocket:")} ${chalk.green(
"sending connection:"
Expand Down Expand Up @@ -168,7 +166,7 @@ export const Socket = (
switch (data.cmd) {
case "ping":
const pong = await ping(redisOpts, nodes);
respond(msg.id, pong);
respond(msg.id, startTime, pong);
break;
case "getConnection":
{
Expand Down Expand Up @@ -202,7 +200,7 @@ export const Socket = (
break;
case "getInfo":
const info = await getRedisInfo(redisOpts, nodes);
respond(msg.id, info);
respond(msg.id, startTime, info);
break;

case "getQueueType":
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
"bull": "^4.12.6",
"bullmq": "^5.7.9",
"chalk": "^4.1.0",
"commander": "^5.1.0",
"commander": "^12.1.0",
"ioredis": "^5.4.1",
"latest-version": "^5.1.0",
"lodash": "^4.17.21",
Expand Down
8 changes: 4 additions & 4 deletions yarn.lock
Original file line number Diff line number Diff line change
Expand Up @@ -1741,10 +1741,10 @@ combined-stream@^1.0.6, combined-stream@~1.0.6:
dependencies:
delayed-stream "~1.0.0"

commander@^5.1.0:
version "5.1.0"
resolved "https://registry.npmjs.org/commander/-/commander-5.1.0.tgz"
integrity sha512-P0CysNDQ7rtVw4QIQtm+MRxV66vKFSvlsQvGYXZWR3qFU0jlMKHZZZgw8e+8DSah4UDKMqnknRDQz+xuQXQ/Zg==
commander@^12.1.0:
version "12.1.0"
resolved "https://registry.yarnpkg.com/commander/-/commander-12.1.0.tgz#01423b36f501259fdaac4d0e4d60c96c991585d3"
integrity sha512-Vw8qHK3bZM9y/P10u3Vib8o/DdkvA2OtPtZvD871QKjy74Wj1WSKFILMPRPSdUSx5RFK1arlJzEtA4PkFgnbuA==

common-ancestor-path@^1.0.1:
version "1.0.1"
Expand Down

0 comments on commit 8982572

Please sign in to comment.