diff --git a/lib/queues-cache.ts b/lib/queues-cache.ts index 570a637..12f8fa1 100644 --- a/lib/queues-cache.ts +++ b/lib/queues-cache.ts @@ -1,9 +1,9 @@ import * as Bull from "bull"; import { RedisOptions } from "ioredis"; import { keyBy } from "lodash"; +import * as Redis from "ioredis"; const chalk = require("chalk"); -const Redis = require("ioredis"); let queuesCache: { [index: string]: Bull.Queue } = null; @@ -67,6 +67,35 @@ const queueNameRegExp = new RegExp("(.*):(.*):id"); async function getConnectionQueues( redisOpts: RedisOptions ): Promise { + const queues = await execRedisCommand(redisOpts, async (client) => { + const keys: string[] = await client.keys("*:*:id"); + const queues = keys.map(function (key) { + var match = queueNameRegExp.exec(key); + if (match) { + return { + prefix: match[1], + name: match[2], + }; + } + }); + return queues; + }); + + return queues; +} + +export async function getRedisInfo(redisOpts: RedisOptions) { + const info = await execRedisCommand(redisOpts, async (client) => { + return client.info(); + }); + + return info; +} + +async function execRedisCommand( + redisOpts: RedisOptions, + cb: (client: Redis.Redis) => any +) { const redisClient = new Redis(redisOpts); redisClient.on("error", (err: Error) => { @@ -89,18 +118,9 @@ async function getConnectionQueues( ); }); - const keys: string[] = await redisClient.keys("*:*:id"); - const queues = keys.map(function (key) { - var match = queueNameRegExp.exec(key); - if (match) { - return { - prefix: match[1], - name: match[2], - }; - } - }); + const result = await cb(redisClient); await redisClient.quit(); - return queues; + return result; } diff --git a/lib/socket.ts b/lib/socket.ts index dff0103..bfa5ce8 100644 --- a/lib/socket.ts +++ b/lib/socket.ts @@ -2,7 +2,7 @@ import * as Bull from "bull"; import { RedisOptions } from "ioredis"; import { pick } from "lodash"; import * as url from "url"; -import { getCache, updateQueuesCache } from "./queues-cache"; +import { getCache, updateQueuesCache, getRedisInfo } from "./queues-cache"; import { WebSocketClient } from "./ws-autoreconnect"; const chalk = require("chalk"); @@ -232,31 +232,41 @@ module.exports = ( async function respondConnectionCommand(connection: Connection, msg: any) { const data = msg.data; - const queues = await updateQueuesCache(redisOpts); switch (data.cmd) { case "getConnection": - console.log( - `${chalk.yellow("WebSocket: ")} ${chalk.green( - "sending connections: " - )} ${chalk.blueBright(name)} ${ - team ? chalk.green(" for team ") + chalk.blueBright(team) : "" - }` - ); + { + const queues = await updateQueuesCache(redisOpts); - respond(msg.id, { - queues, - connection: name, - team, - }); + console.log( + `${chalk.yellow("WebSocket: ")} ${chalk.green( + "sending connections: " + )} ${chalk.blueBright(name)} ${ + team ? chalk.green(" for team ") + chalk.blueBright(team) : "" + }` + ); + + respond(msg.id, { + queues, + connection: name, + team, + }); + } break; case "getQueues": - console.log( - chalk.yellow("WebSocket:") + chalk.blueBright(" sending queues "), - queues - ); + { + const queues = await updateQueuesCache(redisOpts); - respond(msg.id, queues); + console.log( + chalk.yellow("WebSocket:") + chalk.blueBright(" sending queues "), + queues + ); + respond(msg.id, queues); + } + break; + case "getInfo": + const info = await getRedisInfo(redisOpts); + respond(msg.id, info); break; } }