From 7928d5a11ebca2bccbc0b0c15eb8776bd184f9f6 Mon Sep 17 00:00:00 2001 From: Manuel Astudillo Date: Tue, 19 Feb 2019 11:46:16 +0100 Subject: [PATCH] feat: add support for empty queues --- lib/socket.ts | 80 +++++++++++++++++++-------------------------------- 1 file changed, 29 insertions(+), 51 deletions(-) diff --git a/lib/socket.ts b/lib/socket.ts index 5379339..d81420e 100644 --- a/lib/socket.ts +++ b/lib/socket.ts @@ -193,12 +193,7 @@ module.exports = ( start = start || 0; end = end || -1; return (queue)[method](start, end).then(function(jobs: Bull.Job[]) { - ws.send( - JSON.stringify({ - id: messageId, - data: jobs - }) - ); + respond(messageId, jobs); }); } @@ -221,11 +216,7 @@ module.exports = ( break; } - ws.send( - JSON.stringify({ - id: msg.id - }) - ); + respond(msg.id); } async function respondQueueCommand(queue: Bull.Queue, msg: any) { @@ -233,21 +224,11 @@ module.exports = ( switch (data.cmd) { case "getJob": const job = await queue.getJob(data.jobId); - ws.send( - JSON.stringify({ - id: msg.id, - data: job - }) - ); + respond(msg.id, job); break; case "getJobCounts": const jobCounts = await queue.getJobCounts(); - ws.send( - JSON.stringify({ - id: msg.id, - data: jobCounts - }) - ); + respond(msg.id, jobCounts); break; case "getWaiting": case "getActive": @@ -267,23 +248,20 @@ module.exports = ( case "getRepeatableCount": case "getWorkersCount": const count = await (queue)[data.cmd](); - ws.send( - JSON.stringify({ - id: msg.id, - data: count - }) - ); + respond(msg.id, count); break; case "removeRepeatableByKey": - console.log('Whatsa?', data) await (queue).removeRepeatableByKey(data.key); - - ws.send( - JSON.stringify({ - id: msg.id - }) - ); + respond(msg.id); break; + case "empty": + await queue.empty(); + respond(msg.id); + break; + default: + console.error( + `Missing command ${data.cmd}. Too old version of taskforce-connector?` + ); } } @@ -297,15 +275,11 @@ module.exports = ( chalk.green("sending connection: ") + chalk.blue(name) ); - ws.send( - JSON.stringify({ - id: msg.id, - data: { - queues: queues, - connection: name - } - }) - ); + + respond(msg.id, { + queues, + connection: name + }); break; case "getQueues": await updateQueueCache(queues); @@ -314,13 +288,17 @@ module.exports = ( queues ); - ws.send( - JSON.stringify({ - id: msg.id, - data: queues - }) - ); + respond(msg.id, queues); + break; } } + + function respond(id: string, data: any = {}) { + const response = JSON.stringify({ + id, + data + }); + ws.send(response); + } };