Skip to content

Commit

Permalink
Merge pull request #165 from prey/fix-long-polling-with-proxy
Browse files Browse the repository at this point in the history
Fix long polling with proxy
  • Loading branch information
mauricioschneider committed Nov 2, 2015
2 parents 8e9d244 + 58d37c0 commit 810d1dd
Show file tree
Hide file tree
Showing 5 changed files with 173 additions and 70 deletions.
4 changes: 3 additions & 1 deletion lib/agent/plugins/control-panel/api/request.js
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,9 @@ var is_temporary_error = function(err, resp) {

var send = function(attempt, method, path, data, options, cb) {
if (!defaults.client) {
return cb(new Error('No HTTP client set!'));
var err = new Error('No HTTP client set!')
if (cb) return cb(err);
return err;
}

// opts are used for the current request, while options are
Expand Down
169 changes: 116 additions & 53 deletions lib/agent/plugins/control-panel/long-polling/index.js
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
var api = require('./../api'),
getter = api.devices.get,
var needle = require('needle'),
keys = require('../api/keys'),
errors = require('../api/errors'),
logger = require('../../../common').logger.prefix('long-polling'),
https = require('https'),
Emitter = require('events').EventEmitter;

var hooks,
logger,
loaded;
config;

var emitter,
connection_status,
connected,
current_request;
current_request,
loaded;

var request = function(re_schedule) {
if (!connected || current_request) {
Expand All @@ -18,73 +21,134 @@ var request = function(re_schedule) {
return;
}

logger.debug('Fetching instructions...');
https.globalAgent.options.secureProtocol = 'TLSv1_method';

try {
current_request = getter.commands();
} catch(e) {
// At this point, we are trying to get commands without
// a DEVICE_KEY in settings
return propagateError(e);
}
var proxy = config.get('try_proxy'),
protocol = config.get('protocol'),
host = config.get('host'),
device_key = keys.get().device,
api_key = keys.get().api;

current_request.on('data', function(data) {
var len = data.length;
if (!keys.get().device) return propagate_error(errors.get('NO_DEVICE_KEY'));

if (len && len > 0) {
logger.info('Got ' + len + ' commands.');
process_commands(data);
}
});
var base = protocol + '://' + host,
url = base + '/api/v2/devices/' + device_key + '.json';

current_request.request.on('response', function(resp) {
if (resp.statusCode !== 200)
propagateError('Invalid response received with status code ' + resp.statusCode);
});
var options = {
protocol: protocol,
host: host,
username: api_key,
password: 'x',
timeout: 1000 * 120
};

current_request.on('end', function() {
logger.debug("Request ended");
checkForReschedule();
});
if (proxy) {
options.proxy = proxy;
logger.debug('Setting up proxy');
}

current_request.on('error', function(err) {
propagateError(err);
checkForReschedule();
});
logger.debug('Fetching instructions...');
current_request = needle.get(url, options);
attach_listeners(current_request);

function attach_listeners(request) {

request.on('data', function(data) {
var len = data.length;

if (len && len > 0) {
logger.info('Got ' + len + ' commands.');
process_commands(data);
}
});

request.on('end', function() {
var msg = options.proxy ? 'with proxy' : '';
logger.debug("Request " + msg + " ended.");
});

request.request.on('response', function(res) {
if (res.statusCode !== 200) {
propagate_error('Invalid response received with status code ' + res.statusCode);
clear_current_request();
} else {
check_for_reschedule();
}
});

// This is most likely a connection error.
// If try_proxy is set, let's retry with no proxy.
request.request.on('error', function(err) {
clear_current_request();
if (options.proxy) {
logger.debug('Trying to reconnect without proxy');
delete options.proxy;
current_request = needle.get(url, options);
attach_listeners(current_request);
} else {
setTimeout(check_for_reschedule, 3000);
}
});

request.request.on('abort', function() {
logger.debug("Request aborted");
});

request.on('error', function(err) {
propagate_error(err);
});
}

function process_commands(arr) {
if (arr.forEach) {
arr.forEach(function(el) {
var cmd = el.target ? el : parse_cmd(el);
if (cmd) emitter.emit('command', cmd);
});
} else {
propagateError('Invalid command object');
}
if (arr.forEach) {
arr.forEach(function(el) {
var cmd = el.target ? el : parse_cmd(el);
if (cmd) emitter.emit('command', cmd);
});
} else {
propagate_error('Invalid command object');
}
}

function parse_cmd(str) {
try {
return JSON.parse(str);
} catch(e) {
if (hooks)
propagateError('Invalid command: ' + str);
propagate_error('Invalid command: ' + str);
}
}

function propagateError(message) {
hooks.trigger('error', new Error(message));
function propagate_error(message) {
hooks.trigger('error', new Error(message));
logger.debug(message);
}
}

function checkForReschedule() {
function check_for_reschedule() {
current_request = null;
if (re_schedule) {
request(true);
}
}
if (re_schedule) {
logger.debug("Re-scheduling request");
request(true);
}
}

function stop_schedule() {
re_schedule = false;
clear_current_request()
}
};

function clear_current_request() {
if (current_request) {
logger.debug("Clearing current request");
if (current_request.request) {
logger.debug("Aborting current request");
current_request.request.abort();
}
current_request = null;
}
}

function load_hooks() {
hooks.on('connected', function() {
connected = true;
Expand All @@ -107,8 +171,7 @@ var unload = function() {
hooks.remove('disconnected', request);

if (current_request) {
logger.debug("Aborting current request");
current_request.request.abort();
clear_current_request();
}

if (emitter) {
Expand All @@ -128,8 +191,8 @@ exports.load = function(cb) {
return cb(null, emitter);

var common = this;
config = common.config;
hooks = common.hooks;
logger = common.logger;

// Hooks take care of beginning the request iteration
// when connected and stopping it when disconnected
Expand All @@ -142,5 +205,5 @@ exports.load = function(cb) {
exports.unload = function(cb) {
if (!hooks) return; // not loaded yet.
unload();
cb && cb(true);
cb && cb(true);
};
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
var assert = require("assert"),
sinon = require("sinon"),
should = require("should"),
var assert = require('assert'),
sinon = require('sinon'),
should = require('should'),
needle = require('needle'),
api = require('../../api'),
request = require('../../api/request'),
getter = api.devices.get,
Expand Down Expand Up @@ -60,7 +61,7 @@ function server_down(cb) {
describe('long-polling', function() {

before(function() {
spy = sinon.spy(getter, 'commands');
spy = sinon.spy(needle, 'get');
keys_stub = sinon.stub(keys, 'get', function() {
return {device: prey_cfg.device_key, api: prey_cfg.api_key}
});
Expand Down
49 changes: 40 additions & 9 deletions lib/agent/providers/network/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,13 @@
// use memoize on them, unlike the function from the hardware provider.

var network = require('network'),
https = require('https'),
os_name = process.platform.replace('darwin', 'mac').replace('win32', 'windows'),
os_functions = require('./' + os_name),
exp = module.exports;
exp = module.exports,
common = require('../../common'),
logger = common.logger.prefix('network'),
config = common.config,
needle = require('needle');

/////////////////////////////////////////////////////////////////
// getters
Expand Down Expand Up @@ -121,14 +124,42 @@ exp.get_open_access_points_list = function(callback) {
};

exp.get_connection_status = function(cb) {
https.get({protocol: 'https:', hostname: 'control.preyproject.com/healthz'}, return_status)
.on('error', function(e) {
cb('disconnected');
});
var proxy = config.get('try_proxy'),
protocol = config.get('control-panel').protocol,
host = config.get('control-panel').host,
opts = {};

if (proxy) opts.proxy = proxy;

logger.debug('Getting connection status');
connect(opts);

function connect(opts) {
needle.get(protocol + '://' + host, opts, return_status)
}

function return_status(err, res) {
if (err) {
if (opts.proxy) {
logger.debug('Getting connection status without proxy');
delete opts.proxy;
return connect(opts);
} else {
disconnected();
}
}

if (res && res.statusCode) {
if (res.statusCode === 200) {
return cb('connected');
} else {
disconnected();
}
}

function return_status(res) {
if (res.statusCode) {
cb('connected');
function disconnected() {
logger.debug('Device cannot connect to host');
return cb('disconnected');
}
}
};
12 changes: 9 additions & 3 deletions lib/agent/triggers/connection/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,16 @@ var join = require('path').join,

var emitter,
status, // either connected or disconnected
checking = false;
checking = false,
interval_checker;

var check_status = function() {
if (checking) return;

checking = true;
network.get_connection_status(function(new_status) {
// logger.info('Checked status. Result: ' + new_status)

if (status != new_status) {
// emitter.emit(new_status);
// trigger directly the event instead of emitting it to the actions manager
hooks.trigger(new_status);
}
Expand All @@ -29,12 +28,19 @@ var check_status = function() {
exports.start = function(opts, cb) {
check_status();
hooks.on('network_state_changed', check_status);
// Connection Heartbeat
// todo @lemavri Dinamically change interval from
// 15-60 seconds if status remains. Reset otherwise
interval_checker = setInterval(function() {
check_status();
}, 15000);
emitter = new Emitter();
cb(null, emitter)
}

exports.stop = function(cb) {
hooks.remove('network_state_changed', check_status);
clearInterval(interval_checker);
if (emitter) {
emitter.removeAllListeners();
emitter = null;
Expand Down

0 comments on commit 810d1dd

Please sign in to comment.