diff --git a/index.js b/index.js index ab63002..188c3b6 100755 --- a/index.js +++ b/index.js @@ -47,8 +47,8 @@ program .parse(process.argv); console.info( - chalk.blue( - "Taskforce Connector v" + pkg.version + " - (c) 2017-2019 Taskforce.sh Inc." + chalk.blueBright( + "Taskforce Connector v" + pkg.version + " - (c) 2017-2020 Taskforce.sh Inc." ) ); diff --git a/lib/queues-cache.ts b/lib/queues-cache.ts new file mode 100644 index 0000000..570a637 --- /dev/null +++ b/lib/queues-cache.ts @@ -0,0 +1,106 @@ +import * as Bull from "bull"; +import { RedisOptions } from "ioredis"; +import { keyBy } from "lodash"; + +const chalk = require("chalk"); +const Redis = require("ioredis"); + +let queuesCache: { [index: string]: Bull.Queue } = null; + +export const getCache = () => { + return queuesCache; +}; + +export interface FoundQueue { + prefix: string; + name: string; +} + +export async function updateQueuesCache(redisOpts: RedisOptions) { + const newQueues = await getConnectionQueues(redisOpts); + + queuesCache = queuesCache || {}; + + const oldQueues = Object.keys(queuesCache); + const newQueuesObject = keyBy(newQueues, "name"); + + const toAdd = []; + const toRemove = []; + + for (let i = 0; i < newQueues.length; i++) { + const newQueue = newQueues[i]; + const oldQueue = queuesCache[newQueue.name]; + + if (!oldQueue) { + toAdd.push(newQueue); + } + } + + for (let i = 0; i < oldQueues.length; i++) { + const oldQueue = oldQueues[i]; + const newQueue = newQueuesObject[oldQueue]; + + if (!newQueue) { + toRemove.push(queuesCache[oldQueue]); + } + } + + await Promise.all( + toRemove.map(function (queue: Bull.Queue) { + var closing = queue.close(); + delete queuesCache[queue.name]; + return closing; + }) + ); + + toAdd.forEach(function (queue: FoundQueue) { + queuesCache[queue.name] = new Bull(queue.name, { + prefix: queue.prefix, + redis: redisOpts, + }); + }); + + return newQueues; +} + +const queueNameRegExp = new RegExp("(.*):(.*):id"); +async function getConnectionQueues( + redisOpts: RedisOptions +): Promise { + const redisClient = new Redis(redisOpts); + + redisClient.on("error", (err: Error) => { + console.log( + chalk.yellow("Redis:") + chalk.red(" redis connection error "), + err.message + ); + }); + + redisClient.on("connect", () => { + console.log( + chalk.yellow("Redis:") + chalk.green(" connected to redis server") + ); + }); + + redisClient.on("end", () => { + console.log( + chalk.yellow("Redis:") + + chalk.blueBright(" disconnected from redis server") + ); + }); + + const keys: string[] = await redisClient.keys("*:*:id"); + const queues = keys.map(function (key) { + var match = queueNameRegExp.exec(key); + if (match) { + return { + prefix: match[1], + name: match[2], + }; + } + }); + + await redisClient.quit(); + + return queues; +} diff --git a/lib/socket.ts b/lib/socket.ts index 968c4a8..df06b96 100644 --- a/lib/socket.ts +++ b/lib/socket.ts @@ -1,11 +1,10 @@ -import { WebSocketClient } from "./ws-autoreconnect"; import * as Bull from "bull"; - +import { RedisOptions } from "ioredis"; +import { pick } from "lodash"; import * as url from "url"; -import { RedisOptions, Redis } from "ioredis"; +import { getCache, updateQueuesCache } from "./queues-cache"; +import { WebSocketClient } from "./ws-autoreconnect"; -const Redis = require("ioredis"); -const _ = require("lodash"); const chalk = require("chalk"); interface Connection { @@ -30,27 +29,25 @@ module.exports = ( ws.open(server, { headers: { Authorization: "Bearer " + token, - Taskforce: "connector" - } + Taskforce: "connector", + }, }); console.log( chalk.yellow("WebSocket:") + - chalk.blue(" opening connection to ") + + chalk.blueBright(" opening connection to ") + chalk.gray("Taskforce.sh") ); - const queues: { [index: string]: Bull.Queue } = {}; - ws.onopen = function open() { console.log( chalk.yellow("WebSocket:") + - chalk.blue(" opened connection to ") + + chalk.blueBright(" opened connection to ") + chalk.gray("Taskforce.sh") ); }; - ws.onerror = function(err) { + ws.onerror = function (err) { var msg; if (err.message === "Unexpected server response: 401") { msg = @@ -62,7 +59,10 @@ module.exports = ( }; ws.onmessage = async function incoming(input: string) { - console.log(chalk.yellow("WebSocket:") + chalk.blue(" received"), input); + console.log( + chalk.yellow("WebSocket:") + chalk.blueBright(" received"), + input + ); if (input === "authorized") { console.log( chalk.yellow("WebSocket: ") + @@ -72,12 +72,12 @@ module.exports = ( // // Send this connection. // - const queues = await getConnectionQueues(connection); + const queues = await updateQueuesCache(redisOpts); console.log( `${chalk.yellow("WebSocket: ")} ${chalk.green( "sending connection: " - )} ${chalk.blue(name)} ${ - team ? chalk.green(" for team ") + chalk.blue(team) : "" + )} ${chalk.blueBright(name)} ${ + team ? chalk.green(" for team ") + chalk.blueBright(team) : "" }` ); ws.send( @@ -86,120 +86,56 @@ module.exports = ( cmd: "update", queues, connection: name, - team + team, }) ); - return; - } - const msg = JSON.parse(input); - - if (!msg.data) { - console.error( - chalk.red("WebSocket:") + chalk.blue(" missing message data "), - msg - ); - return; - } - - const data = msg.data; - const res = data.res; + } else { + const msg = JSON.parse(input); - switch (res) { - case "connections": - respondConnectionCommand(connection, msg); - break; - case "queues": - case "jobs": - var queue = queues[data.queueName]; + if (!msg.data) { + console.error( + chalk.red("WebSocket:") + chalk.blueBright(" missing message data "), + msg + ); + return; + } - if (!queue) { - ws.send( - JSON.stringify({ - id: msg.id, - err: "Queue not found" - }) - ); - } else { - switch (res) { - case "queues": - respondQueueCommand(queue, msg); - break; - case "jobs": - respondJobCommand(queue, msg); - break; + const { res, queueName } = msg.data; + + switch (res) { + case "connections": + respondConnectionCommand(connection, msg); + break; + case "queues": + case "jobs": + const cache = getCache(); + if (!cache) { + await updateQueuesCache(redisOpts); } - } - break; + var queue = cache[queueName]; + + if (!queue) { + ws.send( + JSON.stringify({ + id: msg.id, + err: "Queue not found", + }) + ); + } else { + switch (res) { + case "queues": + respondQueueCommand(queue, msg); + break; + case "jobs": + respondJobCommand(queue, msg); + break; + } + } + break; + } } }; - interface FoundQueue { - prefix: string; - name: string; - } - - const queueNameRegExp = new RegExp("(.*):(.*):id"); - async function getConnectionQueues( - connection: Connection - ): Promise { - const redisClient = new Redis(redisOpts); - - redisClient.on("error", (err: Error) => { - console.log( - chalk.yellow("Redis:") + chalk.red(" redis connection error "), - err.message - ); - }); - - redisClient.on("connect", () => { - console.log( - chalk.yellow("Redis:") + chalk.green(" connected to redis server") - ); - }); - - redisClient.on("end", () => { - console.log( - chalk.yellow("Redis:") + chalk.blue(" disconnected from redis server") - ); - }); - - const keys: string[] = await redisClient.keys("*:*:id"); - const queues = keys.map(function(key) { - var match = queueNameRegExp.exec(key); - if (match) { - return { - prefix: match[1], - name: match[2] - }; - } - }); - - await redisClient.quit(); - - return queues; - } - - async function updateQueueCache(newQueues: FoundQueue[]) { - const oldQueues = Object.keys(queues); - const toRemove = _.difference(oldQueues, newQueues); - const toAdd = _.difference(newQueues, oldQueues); - - await Promise.all( - toRemove.map(function(queueName: string) { - var closing = queues[queueName].close(); - delete queues[queueName]; - return closing; - }) - ); - - toAdd.forEach(function(queue: FoundQueue) { - queues[queue.name] = new Bull(queue.name, { - prefix: queue.prefix, - redis: redisOpts - }); - }); - } - function paginate( queue: Bull.Queue, messageId: string, @@ -209,7 +145,7 @@ module.exports = ( ) { start = start || 0; end = end || -1; - return (queue)[method](start, end).then(function(jobs: Bull.Job[]) { + return (queue)[method](start, end).then(function (jobs: Bull.Job[]) { respond(messageId, jobs); }); } @@ -293,27 +229,26 @@ module.exports = ( async function respondConnectionCommand(connection: Connection, msg: any) { const data = msg.data; - const queues = await getConnectionQueues(connection); + const queues = await updateQueuesCache(redisOpts); switch (data.cmd) { case "getConnection": console.log( `${chalk.yellow("WebSocket: ")} ${chalk.green( "sending connections: " - )} ${chalk.blue(name)} ${ - team ? chalk.green(" for team ") + chalk.blue(team) : "" + )} ${chalk.blueBright(name)} ${ + team ? chalk.green(" for team ") + chalk.blueBright(team) : "" }` ); respond(msg.id, { queues, connection: name, - team + team, }); break; case "getQueues": - await updateQueueCache(queues); console.log( - chalk.yellow("WebSocket:") + chalk.blue(" sending queues "), + chalk.yellow("WebSocket:") + chalk.blueBright(" sending queues "), queues ); @@ -326,7 +261,7 @@ module.exports = ( function respond(id: string, data: any = {}) { const response = JSON.stringify({ id, - data + data, }); ws.send(response); } @@ -334,14 +269,14 @@ module.exports = ( function redisOptsFromConnection(connection: Connection): RedisOptions { let opts: RedisOptions = { - ..._.pick(connection, ["port", "host", "family", "password", "db", "tls"]) + ...pick(connection, ["port", "host", "family", "password", "db", "tls"]), }; if (connection.uri) { opts = { ...opts, ...redisOptsFromUrl(connection.uri) }; } - opts.retryStrategy = function(times: number) { + opts.retryStrategy = function (times: number) { times = times % 8; const delay = Math.round(Math.pow(2, times + 8)); console.log(chalk.yellow("Redis: ") + `Reconnecting in ${delay} ms`); diff --git a/package.json b/package.json index d395fcc..9f8da02 100644 --- a/package.json +++ b/package.json @@ -19,6 +19,7 @@ "devDependencies": { "@types/bull": "^3.3.20", "@types/chalk": "^2.2.0", + "@types/lodash": "^4.14.149", "@types/ws": "^6.0.1", "typescript": "^3.0.3" }, diff --git a/yarn.lock b/yarn.lock index e50dd48..a50e65f 100644 --- a/yarn.lock +++ b/yarn.lock @@ -35,6 +35,11 @@ dependencies: "@types/node" "*" +"@types/lodash@^4.14.149": + version "4.14.149" + resolved "https://registry.yarnpkg.com/@types/lodash/-/lodash-4.14.149.tgz#1342d63d948c6062838fbf961012f74d4e638440" + integrity sha512-ijGqzZt/b7BfzcK9vTrS6MFljQRPn5BFWOx8oE0GYxribu6uV+aA9zZuXI1zc/etK9E8nrgdoF2+LgUw7+9tJQ== + "@types/node@*": version "12.11.1" resolved "https://registry.yarnpkg.com/@types/node/-/node-12.11.1.tgz#1fd7b821f798b7fa29f667a1be8f3442bb8922a3"