Skip to content

Commit

Permalink
Merge pull request #765 from prey/verify-websocket-arrival-message
Browse files Browse the repository at this point in the history
add logic for send ack to ws ack
  • Loading branch information
SoraKenji authored May 22, 2023
2 parents 68afe63 + a75ffc9 commit 16c00a7
Show file tree
Hide file tree
Showing 4 changed files with 188 additions and 5 deletions.
28 changes: 28 additions & 0 deletions lib/agent/ack.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
const storage = require('./utils/storage');
const ackType = 'ack';

const existKeyAckInJson = (json) => {
//eslint-disable-next-line no-prototype-builtins
if (json.hasOwnProperty('ack_id')) {
return true;
}
return false;
};
const existKeyIdInJson = (json) => {
//eslint-disable-next-line no-prototype-builtins
if (json.hasOwnProperty('id')) {
return true;
}
return false;
};

exports.processAck = (json, cb) => {
if (!existKeyAckInJson(json)) {
return cb(new Error('there is no key ack_id in the json'));
}
return cb(null, {
ack_id: json.ack_id,
type: ackType,
id: existKeyIdInJson(json) ? json.id : ''
});
};
2 changes: 0 additions & 2 deletions lib/agent/actions.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@ var join = require('path').join,
var actions = {},
running = {};

var actions_path = __dirname + '/actions';

var load_action = function(type, name) {
var file = join(__dirname, type + 's', name);
try {
Expand Down
109 changes: 106 additions & 3 deletions lib/agent/plugins/control-panel/websockets/index.js
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
/* eslint-disable indent */
const WebSocket = require('ws');
const HttpsProxyAgent = require('https-proxy-agent');
const { v4: uuidv4 } = require('uuid');
Expand All @@ -9,6 +10,8 @@ const fileretrieval = require('../../../actions/fileretrieval');
const triggers = require('../../../actions/triggers');
const storage = require('../../../utils/storage');
const errors = require('../api/errors');
const ack = require('../../../ack');

const common = require('../../../common');

const logger = common.logger.prefix('websockets');
Expand All @@ -28,6 +31,7 @@ let emitter;
let setAliveTimeInterval = null;
let setIntervalWSStatus = null;
let notifyActionInterval = null;
let notifyAckInterval = null;
let getStatusInterval = null;
let idTimeoutToCancel;
let websocketConnected = false;
Expand All @@ -36,9 +40,12 @@ let countNotConnectionProxy = 0;

const startupTimeout = 5000;
const heartbeatTimeout = 120000 + 1000;
const retriesMax = 10;
const retriesMaxAck = 4;

exports.re_schedule = true;
exports.responses_queue = [];
exports.responsesAck = [];

const propagateError = (message) => {
hooks.trigger('error', new Error(message));
Expand Down Expand Up @@ -72,6 +79,96 @@ const retryQueuedResponses = () => {
});
};

const retryAckResponses = () => {
if (exports.responsesAck.length === 0) return;

exports.responsesAck.forEach((respoAck) => {
exports.notifyAck(
respoAck.ack_id,
respoAck.type,
respoAck.id,
respoAck.send,
respoAck.retries,
);
});
};

const setValuesToJsonInAckArray = (ackId) => {
const index = exports.responsesAck.findIndex((x) => x.ack_id === ackId);
if (index >= 0) {
exports.responsesAck[index].sent = true;
}
};

const setValueRetriesToJsonInAckArray = (ackId) => {
const index = exports.responsesAck.findIndex((x) => x.ack_id === ackId);
if (index >= 0) {
exports.responsesAck[index].retries += 1;
}
};

exports.sendAckToServer = (sendToWs) => {
try {
if (!ws || !ws.readyState || ws.readyState !== 1) return;
const toSend = { ack_id: sendToWs.ack_id, type: sendToWs.type };
ws.send(JSON.stringify(toSend));
setValuesToJsonInAckArray(sendToWs.ack_id);
} catch (error) {
logger.error('error to send ack:', JSON.stringify(error));
}
};

const removeAckFromArray = (ackId) => {
exports.responsesAck = exports.responsesAck.filter((x) => x.ack_id !== ackId);
};

exports.notifyAck = (ackId, type, id, sent, retries = 0) => {
if (retries >= retriesMaxAck) {
removeAckFromArray(ackId);
return;
}
if (id && id !== '') {
const ackResponseSent = exports.responsesAck.filter((x) => x.id === id
&& x.sent === true);
if (ackResponseSent.length === 0) {
const ackResponse = exports.responsesAck.filter((x) => x.id === id
&& x.sent === false);
if (ackResponse.length > 0) {
ackResponse[0].retries += 1;
exports.sendAckToServer(ackResponse[0]);
}
} else {
setValueRetriesToJsonInAckArray(ackId);
}
} else {
const ackResponse = exports.responsesAck.filter((x) => x.ack_id === ackId
&& x.sent === false);
if (ackResponse.length > 0) {
ackResponse[0].retries += 1;
exports.sendAckToServer(ackResponse[0]);
} else {
setValueRetriesToJsonInAckArray(ackId);
}
}
};

const processAcks = (arr) => {
if (arr.forEach) {
arr.forEach((el) => {
ack.processAck(el, (err, sendToWs) => {
if (err) return;
exports.responsesAck.push({
ack_id: sendToWs.ack_id,
type: sendToWs.type,
id: sendToWs.id,
sent: false,
retries: 0,
});
});
});
}
};

const processCommands = (arr) => {
if (arr.forEach) {
arr.forEach((el) => {
Expand All @@ -90,6 +187,7 @@ const updateStoredConnection = (newStoredTime) => {
};

const clearAndResetIntervals = (aliveTimeReset = false) => {
if (notifyAckInterval) clearInterval(notifyAckInterval);
if (notifyActionInterval) clearInterval(notifyActionInterval);
if (getStatusInterval) clearInterval(getStatusInterval);
if (setIntervalWSStatus) clearInterval(setIntervalWSStatus);
Expand Down Expand Up @@ -171,6 +269,7 @@ const loadServer = () => {
exports.startWebsocket = () => {
clearAndResetIntervals();
notifyActionInterval = setInterval(retryQueuedResponses, 5000); // <-revisar el tiempo
notifyAckInterval = setInterval(retryAckResponses, 4 * 1000);
getStatusInterval = setInterval(getStatusByInterval, 5 * 60 * 1000);
setIntervalWSStatus = setInterval(exports.heartbeat, 20000);
const proxy = config.get('control-panel.try_proxy');
Expand Down Expand Up @@ -212,12 +311,15 @@ exports.startWebsocket = () => {
ws.on('open', () => {
pingInterval = setInterval(() => { ws.ping(); }, 60000);
exports.notify_status(status);

storage.do('all', { type: 'responses' }, (errs, actions) => {
if (!actions || typeof actions === 'undefined') return;

if (actions.length === 0 || errs) return;

if (Array.isArray(actions)) {
exports.responses_queue = actions.map((element) => ({
reply_id: `${element.action_id}`, // id de la acción
reply_id: `${element.action_id}`,
type: 'response',
out: element.out,
error: element.error,
Expand All @@ -229,7 +331,7 @@ exports.startWebsocket = () => {
}));
} else {
exports.responses_queue.push({
reply_id: `${actions.action_id}`, // id de la acción
reply_id: `${actions.action_id}`,
type: 'response',
out: actions.out,
error: actions.error,
Expand Down Expand Up @@ -264,6 +366,7 @@ exports.startWebsocket = () => {
const len = parsedData.length;
if (len && len > 0) {
processCommands(parsedData);
processAcks(parsedData);
}
return 0;
}
Expand Down Expand Up @@ -293,7 +396,7 @@ exports.startWebsocket = () => {

exports.notify_action = (status, id, action, opts, err, out, time, respId, retries = 0) => {
if (!id || id === 'report' || action === 'triggers' || action === 'geofencing') return;
if (retries >= 10) {
if (retries >= retriesMax) {
storage.do('del', { type: 'responses', id: respId });
exports.responses_queue = exports.responses_queue.filter((x) => x.id !== respId);
return;
Expand Down
54 changes: 54 additions & 0 deletions test/lib/agent/ack.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
const {
describe, it, before, after,
} = require('mocha');
const should = require('should');
const sinon = require('sinon');
const ack = require('../../../lib/agent/ack');
const websocket = require('../../../lib/agent/plugins/control-panel/websockets');

describe('validation ack acknowledge', () => {
describe('validation if json is valid', () => {
it('there is no key ack_id in the json', (done) => {
ack.processAck({}, (err) => {
should.exist(err);
err.message.should.containEql('there is no key ack_id in the json');
done();
});
});
});
describe('validation if json is valid', () => {
it('process ack and register', (done) => {
ack.processAck({
ack_id: '3', type: 'ack', id: '1234',
}, (err, registeredJson) => {
should.not.exist(err);
JSON.stringify(registeredJson).should.equal(JSON.stringify({
ack_id: '3', type: 'ack', id: '1234',
}));
done();
});
});
});
describe('send to server ack', () => {
let websocketStub = null;
websocket.responsesAck.push({
ack_id: '3',
type: 'ack',
id: '1',
sent: false,
retries: 0,
});
before(() => {
websocketStub = sinon.stub(websocket, 'sendAckToServer').callsFake(() => '');
});

after(() => {
websocketStub.restore();
});

it('notify ack', (done) => {
websocket.notifyAck('3,', 'ack', false, 0);
done();
});
});
});

0 comments on commit 16c00a7

Please sign in to comment.