Skip to content

Commit

Permalink
refactor(Endpoint)
Browse files Browse the repository at this point in the history
  • Loading branch information
FGRibreau committed Mar 29, 2015
1 parent ffaca98 commit 44ba1e6
Show file tree
Hide file tree
Showing 2 changed files with 206 additions and 197 deletions.
343 changes: 175 additions & 168 deletions lib/Endpoint.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,204 +4,211 @@ var _ = require('lodash');
var fs = require('fs');
var backoff = require('backoff');
var EventEmitter = require('events').EventEmitter;
var jsonPackage = JSON.parse(fs.readFileSync(__dirname + '/../package.json'));

/**
* Endpoint (redsmin)
* @param {Function} fnWrite `callback(data)` where to write datas from the endpoint
* @param {String} key Connection key
* @param {Object} opts Optional parameters
*/
function Endpoint(fnWrite, key, opts) {
_.extend(this, EventEmitter);
_.bindAll(this);
opts = opts || {};

if (!fnWrite || typeof fnWrite !== 'function') {
throw new Error("Endpoint `fnWrite` parameter is not defined or is not a function");
}

if (!key) {
throw new Error("Endpoint `key` parameter is not defined");
}
var assert = require('assert');

this.uri = null;
this.key = key;
this.hostname = null;
this.port = null;
this.auth = opts.auth;
this.handshaken = false;
this.connected = false;
this.socket = null;
this.fnWrite = fnWrite;

this.handshakenBackoff = backoff.fibonacci({
initialDelay: opts.initialTimeout || 500,
maxDelay: opts.maxTimeout || 10000
});

this.handshakenBackoff.on('backoff', this.reconnect);

this.reconnectBackoff = backoff.fibonacci({
initialDelay: opts.initialTimeout || 500,
maxDelay: opts.maxTimeout || 10000
});
module.exports = function (log, jsonPackage, tls, process) {
assert(_.isObject(log));
assert(_.isPlainObject(jsonPackage));
assert(_.isObject(tls));
assert(_.isFunction(tls.connect));
assert(_.isObject(process));

this.reconnectBackoff.on('backoff', this.reconnect);
}

Endpoint.tls = require('tls');
Endpoint.log = log;
Endpoint.process = process;
/**
* Endpoint (redsmin)
* @param {Function} fnWrite `callback(data)` where to write datas from the endpoint
* @param {String} key Connection key
* @param {Object} opts Optional parameters
*/
function Endpoint(fnWrite, key, opts) {
assert(_.isFunction(fnWrite));
assert(_.isString(key));

_.extend(Endpoint.prototype, EventEmitter.prototype, {
_.extend(this, EventEmitter);
_.bindAll(this);
opts = opts || {};

connect: function (port, hostname) {
if (!port) {
throw new Error("connect(port, hostname) port must be defined");
if (!fnWrite || typeof fnWrite !== 'function') {
throw new Error("Endpoint `fnWrite` parameter is not defined or is not a function");
}
if (!hostname) {
throw new Error("connect(port, hostname) hostname must be defined");
}
this.port = port; // Standard TLS port for "IMAP"
this.hostname = hostname;
this.uri = this.hostname + ':' + this.port;
this._connect();
},

_connect: function () {
if (this.socket) {
this.socket.removeAllListeners();
this.socket.destroy();

if (!key) {
throw new Error("Endpoint `key` parameter is not defined");
}

Endpoint.log.info("[Endpoint] Connecting to " + this.uri + "...");
this.uri = null;
this.key = key;
this.hostname = null;
this.port = null;
this.auth = opts.auth;
this.handshaken = false;
this.connected = false;
this.socket = null;
this.fnWrite = fnWrite;

this.handshakenBackoff = backoff.fibonacci({
initialDelay: opts.initialTimeout || 500,
maxDelay: opts.maxTimeout || 10000
});

this.socket = Endpoint.tls.connect(this.port, this.hostname, this.onConnected);
this.socket.on('data', this.onData);
this.socket.on('close', this.onClose);
this.socket.on('error', this.onError);
this.handshakenBackoff.on('backoff', this.reconnect);

this.socket.setTimeout(0, function () {
console.log('timeout');
this.reconnectBackoff = backoff.fibonacci({
initialDelay: opts.initialTimeout || 500,
maxDelay: opts.maxTimeout || 10000
});

this.socket.setNoDelay(true);
this.socket.setKeepAlive(true, 30);
},
this.reconnectBackoff.on('backoff', this.reconnect);
}

reconnect: function ( /*number, delay*/ ) {
if (this.connected) {
// If, between the .backoff() call and the call to reconnect
// we are already back online, don't do anything else
return this.reconnectBackoff.reset();
}
_.extend(Endpoint.prototype, EventEmitter.prototype, {

Endpoint.log.info("[Endpoint] Reconnecting...");
this._connect();
},
connect: function (port, hostname) {
if (!port) {
throw new Error("connect(port, hostname) port must be defined");
}
if (!hostname) {
throw new Error("connect(port, hostname) hostname must be defined");
}
this.port = port; // Standard TLS port for "IMAP"
this.hostname = hostname;
this.uri = this.hostname + ':' + this.port;
this._connect();
},

_connect: function () {
if (this.socket) {
this.socket.removeAllListeners();
this.socket.destroy();
}

onConnected: function () {
Endpoint.log.debug("[Endpoint] Connected");
log.info("[Endpoint] Connecting to " + this.uri + "...");

this.connected = true;
this.reconnectBackoff.reset();
this.socket = tls.connect(this.port, this.hostname, this.onConnected);
this.socket.on('data', this.onData);
this.socket.on('close', this.onClose);
this.socket.on('error', this.onError);

if (!this.handshaken) {
this._sendHandshake();
}
this.socket.setTimeout(0, function () {
log.error('timeout');
});

this.emit('connect');
},
this.socket.setNoDelay(true);
this.socket.setKeepAlive(true, 30);
},

_sendHandshake: function () {
if (this.handshaken) {
return this.handshakenBackoff.reset();
}
Endpoint.log.debug("[Endpoint] Handshaking...");
this.socket.write(JSON.stringify({
version: jsonPackage.version,
key: this.key,
auth: this.auth
}));
},

onData: function (data) {
if (!this.handshaken) {
data = (data || '').toString();
var handshake = data;

if (data.indexOf('*') === -1) {
data = null;
} else { // in case of multiple messages after the handshake
var idx = handshake.indexOf('*');
handshake = data.substr(0, idx);
data = data.substr(idx);
reconnect: function ( /*number, delay*/ ) {
if (this.connected) {
// If, between the .backoff() call and the call to reconnect
// we are already back online, don't do anything else
return this.reconnectBackoff.reset();
}

try {
var json = JSON.parse(handshake);
log.info("[Endpoint] Reconnecting...");
this._connect();
},

if (json && json.error) {
Endpoint.log.error('[Endpoint] Handshake failed: ' + json.error);
Endpoint.log.error('Edit configuration file with `redsmin set_key`, see http://bit.ly/YAIeAM');
Endpoint.log.error('Exiting...');
Endpoint.process.exit(1);
return;
}
onConnected: function () {
log.debug("[Endpoint] Connected");

this.connected = true;
this.reconnectBackoff.reset();

} catch (err) {
Endpoint.log.error('[Endpoint] Bad handshake response:' + handshake);
Endpoint.log.error(err);
this.handshakenBackoff.reset();
return this.handshakenBackoff.backoff();
if (!this.handshaken) {
this._sendHandshake();
}

Endpoint.log.debug('[Endpoint] Handshake succeeded');
this.handshaken = true;
this.emit('connect');
},

if (!data) {
return;
_sendHandshake: function () {
if (this.handshaken) {
return this.handshakenBackoff.reset();
}
}
log.debug("[Endpoint] Handshaking...");
this.socket.write(JSON.stringify({
version: jsonPackage.version,
key: this.key,
auth: this.auth
}));
},

onData: function (data) {
if (!this.handshaken) {
data = (data || '').toString();
var handshake = data;

if (data.indexOf('*') === -1) {
data = null;
} else { // in case of multiple messages after the handshake
var idx = handshake.indexOf('*');
handshake = data.substr(0, idx);
data = data.substr(idx);
}

this.fnWrite(data);
},
try {
var json = JSON.parse(handshake);

/**
* Forward data from redis to the endpoint
*/
write: function (data) {
this.socket.write(data);
},
if (json && json.error) {
log.error('[Endpoint] Handshake failed: ' + json.error);
log.error('Edit configuration file with `redsmin set_key`, see http://bit.ly/YAIeAM');
log.error('Exiting...');
process.exit(1);
return;
}

/**
* If the connection to redsmin just closed, try to reconnect
* @param {Error} err
*/
onClose: function (sourceWasAnError) {
Endpoint.log.error("[Endpoint] Connection closed " + (sourceWasAnError ? 'because of an error' : ''));

this.connected = false;
this.handshaken = false;
this.handshakenBackoff.reset();
try {
this.reconnectBackoff.backoff();
} catch (no_error) {}

this.emit('close', sourceWasAnError);
},

onError: function (err) {
Endpoint.log.error('[Endpoint] Error ' + (err ? err.message : ''));
this.socket.destroy(); // End the socket
try {
this.onClose(err ? err.message : '');
} catch (err) {
console.error('err::onError', err);
} catch (err) {
log.error('[Endpoint] Bad handshake response:' + handshake);
log.error(err);
this.handshakenBackoff.reset();
return this.handshakenBackoff.backoff();
}

log.debug('[Endpoint] Handshake succeeded');
this.handshaken = true;

if (!data) {
return;
}
}

this.fnWrite(data);
},

/**
* Forward data from redis to the endpoint
*/
write: function (data) {
this.socket.write(data);
},

/**
* If the connection to redsmin just closed, try to reconnect
* @param {Error} err
*/
onClose: function (sourceWasAnError) {
log.error("[Endpoint] Connection closed " + (sourceWasAnError ? 'because of an error' : ''));

this.connected = false;
this.handshaken = false;
this.handshakenBackoff.reset();
try {
this.reconnectBackoff.backoff();
} catch (no_error) {}

this.emit('close', sourceWasAnError);
},

onError: function (err) {
log.error('[Endpoint] Error ' + (err ? err.message : ''));
this.socket.destroy(); // End the socket
try {
this.onClose(err ? err.message : '');
} catch (err) {
log.error('err::onError', err);
}
}
}
});
});

module.exports = Endpoint;
return Endpoint;
};
Loading

0 comments on commit 44ba1e6

Please sign in to comment.