diff --git a/lib/agent/ack.js b/lib/agent/ack.js new file mode 100644 index 000000000..2fc77c503 --- /dev/null +++ b/lib/agent/ack.js @@ -0,0 +1,28 @@ +const storage = require('./utils/storage'); +const ackType = 'ack'; + +const existKeyAckInJson = (json) => { + //eslint-disable-next-line no-prototype-builtins + if (json.hasOwnProperty('ack_id')) { + return true; + } + return false; +}; +const existKeyIdInJson = (json) => { + //eslint-disable-next-line no-prototype-builtins + if (json.hasOwnProperty('id')) { + return true; + } + return false; +}; + +exports.processAck = (json, cb) => { + if (!existKeyAckInJson(json)) { + return cb(new Error('there is no key ack_id in the json')); + } + return cb(null, { + ack_id: json.ack_id, + type: ackType, + id: existKeyIdInJson(json) ? json.id : '' + }); +}; diff --git a/lib/agent/actions.js b/lib/agent/actions.js index 1e66e6e7a..cd149f84e 100644 --- a/lib/agent/actions.js +++ b/lib/agent/actions.js @@ -5,8 +5,6 @@ var join = require('path').join, var actions = {}, running = {}; -var actions_path = __dirname + '/actions'; - var load_action = function(type, name) { var file = join(__dirname, type + 's', name); try { diff --git a/lib/agent/plugins/control-panel/websockets/index.js b/lib/agent/plugins/control-panel/websockets/index.js index a1d9e35bf..a05b5159c 100644 --- a/lib/agent/plugins/control-panel/websockets/index.js +++ b/lib/agent/plugins/control-panel/websockets/index.js @@ -1,3 +1,4 @@ +/* eslint-disable indent */ const WebSocket = require('ws'); const HttpsProxyAgent = require('https-proxy-agent'); const { v4: uuidv4 } = require('uuid'); @@ -9,6 +10,8 @@ const fileretrieval = require('../../../actions/fileretrieval'); const triggers = require('../../../actions/triggers'); const storage = require('../../../utils/storage'); const errors = require('../api/errors'); +const ack = require('../../../ack'); + const common = require('../../../common'); const logger = common.logger.prefix('websockets'); @@ -28,6 +31,7 @@ let emitter; let setAliveTimeInterval = null; let setIntervalWSStatus = null; let notifyActionInterval = null; +let notifyAckInterval = null; let getStatusInterval = null; let idTimeoutToCancel; let websocketConnected = false; @@ -36,9 +40,12 @@ let countNotConnectionProxy = 0; const startupTimeout = 5000; const heartbeatTimeout = 120000 + 1000; +const retriesMax = 10; +const retriesMaxAck = 4; exports.re_schedule = true; exports.responses_queue = []; +exports.responsesAck = []; const propagateError = (message) => { hooks.trigger('error', new Error(message)); @@ -72,6 +79,96 @@ const retryQueuedResponses = () => { }); }; +const retryAckResponses = () => { + if (exports.responsesAck.length === 0) return; + + exports.responsesAck.forEach((respoAck) => { + exports.notifyAck( + respoAck.ack_id, + respoAck.type, + respoAck.id, + respoAck.send, + respoAck.retries, + ); + }); +}; + +const setValuesToJsonInAckArray = (ackId) => { + const index = exports.responsesAck.findIndex((x) => x.ack_id === ackId); + if (index >= 0) { + exports.responsesAck[index].sent = true; + } +}; + +const setValueRetriesToJsonInAckArray = (ackId) => { + const index = exports.responsesAck.findIndex((x) => x.ack_id === ackId); + if (index >= 0) { + exports.responsesAck[index].retries += 1; + } +}; + +exports.sendAckToServer = (sendToWs) => { + try { + if (!ws || !ws.readyState || ws.readyState !== 1) return; + const toSend = { ack_id: sendToWs.ack_id, type: sendToWs.type }; + ws.send(JSON.stringify(toSend)); + setValuesToJsonInAckArray(sendToWs.ack_id); + } catch (error) { + logger.error('error to send ack:', JSON.stringify(error)); + } +}; + +const removeAckFromArray = (ackId) => { + exports.responsesAck = exports.responsesAck.filter((x) => x.ack_id !== ackId); +}; + +exports.notifyAck = (ackId, type, id, sent, retries = 0) => { + if (retries >= retriesMaxAck) { + removeAckFromArray(ackId); + return; + } + if (id && id !== '') { + const ackResponseSent = exports.responsesAck.filter((x) => x.id === id + && x.sent === true); + if (ackResponseSent.length === 0) { + const ackResponse = exports.responsesAck.filter((x) => x.id === id + && x.sent === false); + if (ackResponse.length > 0) { + ackResponse[0].retries += 1; + exports.sendAckToServer(ackResponse[0]); + } + } else { + setValueRetriesToJsonInAckArray(ackId); + } + } else { + const ackResponse = exports.responsesAck.filter((x) => x.ack_id === ackId + && x.sent === false); + if (ackResponse.length > 0) { + ackResponse[0].retries += 1; + exports.sendAckToServer(ackResponse[0]); + } else { + setValueRetriesToJsonInAckArray(ackId); + } + } +}; + +const processAcks = (arr) => { + if (arr.forEach) { + arr.forEach((el) => { + ack.processAck(el, (err, sendToWs) => { + if (err) return; + exports.responsesAck.push({ + ack_id: sendToWs.ack_id, + type: sendToWs.type, + id: sendToWs.id, + sent: false, + retries: 0, + }); + }); + }); + } +}; + const processCommands = (arr) => { if (arr.forEach) { arr.forEach((el) => { @@ -90,6 +187,7 @@ const updateStoredConnection = (newStoredTime) => { }; const clearAndResetIntervals = (aliveTimeReset = false) => { + if (notifyAckInterval) clearInterval(notifyAckInterval); if (notifyActionInterval) clearInterval(notifyActionInterval); if (getStatusInterval) clearInterval(getStatusInterval); if (setIntervalWSStatus) clearInterval(setIntervalWSStatus); @@ -171,6 +269,7 @@ const loadServer = () => { exports.startWebsocket = () => { clearAndResetIntervals(); notifyActionInterval = setInterval(retryQueuedResponses, 5000); // <-revisar el tiempo + notifyAckInterval = setInterval(retryAckResponses, 4 * 1000); getStatusInterval = setInterval(getStatusByInterval, 5 * 60 * 1000); setIntervalWSStatus = setInterval(exports.heartbeat, 20000); const proxy = config.get('control-panel.try_proxy'); @@ -212,12 +311,15 @@ exports.startWebsocket = () => { ws.on('open', () => { pingInterval = setInterval(() => { ws.ping(); }, 60000); exports.notify_status(status); + storage.do('all', { type: 'responses' }, (errs, actions) => { if (!actions || typeof actions === 'undefined') return; + if (actions.length === 0 || errs) return; + if (Array.isArray(actions)) { exports.responses_queue = actions.map((element) => ({ - reply_id: `${element.action_id}`, // id de la acción + reply_id: `${element.action_id}`, type: 'response', out: element.out, error: element.error, @@ -229,7 +331,7 @@ exports.startWebsocket = () => { })); } else { exports.responses_queue.push({ - reply_id: `${actions.action_id}`, // id de la acción + reply_id: `${actions.action_id}`, type: 'response', out: actions.out, error: actions.error, @@ -264,6 +366,7 @@ exports.startWebsocket = () => { const len = parsedData.length; if (len && len > 0) { processCommands(parsedData); + processAcks(parsedData); } return 0; } @@ -293,7 +396,7 @@ exports.startWebsocket = () => { exports.notify_action = (status, id, action, opts, err, out, time, respId, retries = 0) => { if (!id || id === 'report' || action === 'triggers' || action === 'geofencing') return; - if (retries >= 10) { + if (retries >= retriesMax) { storage.do('del', { type: 'responses', id: respId }); exports.responses_queue = exports.responses_queue.filter((x) => x.id !== respId); return; diff --git a/test/lib/agent/ack.js b/test/lib/agent/ack.js new file mode 100644 index 000000000..e3376cdb5 --- /dev/null +++ b/test/lib/agent/ack.js @@ -0,0 +1,54 @@ +const { + describe, it, before, after, +} = require('mocha'); +const should = require('should'); +const sinon = require('sinon'); +const ack = require('../../../lib/agent/ack'); +const websocket = require('../../../lib/agent/plugins/control-panel/websockets'); + +describe('validation ack acknowledge', () => { + describe('validation if json is valid', () => { + it('there is no key ack_id in the json', (done) => { + ack.processAck({}, (err) => { + should.exist(err); + err.message.should.containEql('there is no key ack_id in the json'); + done(); + }); + }); + }); + describe('validation if json is valid', () => { + it('process ack and register', (done) => { + ack.processAck({ + ack_id: '3', type: 'ack', id: '1234', + }, (err, registeredJson) => { + should.not.exist(err); + JSON.stringify(registeredJson).should.equal(JSON.stringify({ + ack_id: '3', type: 'ack', id: '1234', + })); + done(); + }); + }); + }); + describe('send to server ack', () => { + let websocketStub = null; + websocket.responsesAck.push({ + ack_id: '3', + type: 'ack', + id: '1', + sent: false, + retries: 0, + }); + before(() => { + websocketStub = sinon.stub(websocket, 'sendAckToServer').callsFake(() => ''); + }); + + after(() => { + websocketStub.restore(); + }); + + it('notify ack', (done) => { + websocket.notifyAck('3,', 'ack', false, 0); + done(); + }); + }); +});