Skip to content

Commit

Permalink
chore(test): move test along their implementations
Browse files Browse the repository at this point in the history
  • Loading branch information
FGRibreau committed Mar 28, 2015
1 parent 82a3baa commit 79e3731
Show file tree
Hide file tree
Showing 6 changed files with 296 additions and 253 deletions.
121 changes: 64 additions & 57 deletions lib/Endpoint.js
Original file line number Diff line number Diff line change
@@ -1,98 +1,97 @@
var env = require('./config').env
, log = require('./log')
, url = require('url')
, _ = require('lodash')
, fs = require('fs')
, Backoff = require('backoff')
, EventEmitter = require('events').EventEmitter
, jsonPackage = JSON.parse(fs.readFileSync(__dirname + '/../package.json'));
'use strict';
var log = require('./log');
var _ = require('lodash');
var fs = require('fs');
var Backoff = require('backoff');
var EventEmitter = require('events').EventEmitter;
var jsonPackage = JSON.parse(fs.readFileSync(__dirname + '/../package.json'));

/**
* Endpoint (redsmin)
* @param {Function} fnWrite `callback(data)` where to write datas from the endpoint
* @param {String} key Connection key
* @param {Object} opts Optional parameters
*/
function Endpoint(fnWrite, key, opts){
function Endpoint(fnWrite, key, opts) {
_.extend(this, EventEmitter);
_.bindAll(this);
opts = opts || {};

if(!fnWrite || typeof fnWrite !== 'function'){
if (!fnWrite || typeof fnWrite !== 'function') {
throw new Error("Endpoint `fnWrite` parameter is not defined or is not a function");
}

if(!key){
if (!key) {
throw new Error("Endpoint `key` parameter is not defined");
}

this.uri = null;
this.key = key;
this.hostname = null;
this.port = null;
this.auth = opts.auth;
this.uri = null;
this.key = key;
this.hostname = null;
this.port = null;
this.auth = opts.auth;
this.handshaken = false;
this.connected = false;
this.socket = null;
this.fnWrite = fnWrite;
this.connected = false;
this.socket = null;
this.fnWrite = fnWrite;

this.handshakenBackoff = new Backoff({
initialTimeout: opts.initialTimeout || 500,
maxTimeout: opts.maxTimeout || 10000
initialTimeout: opts.initialTimeout || 500,
maxTimeout: opts.maxTimeout || 10000
});

this.handshakenBackoff.on('backoff', this.reconnect);

this.reconnectBackoff = new Backoff({
initialTimeout: opts.initialTimeout || 500,
maxTimeout: opts.maxTimeout || 10000
initialTimeout: opts.initialTimeout || 500,
maxTimeout: opts.maxTimeout || 10000
});

this.reconnectBackoff.on('backoff', this.reconnect);
}

Endpoint.tls = require('tls');
Endpoint.log = log;
Endpoint.tls = require('tls');
Endpoint.log = log;
Endpoint.process = process;

_.extend(Endpoint.prototype, EventEmitter.prototype, {

connect:function(port, hostname){
if(!port){
throw new Error("connect(port, hostname) port must be defined")
connect: function (port, hostname) {
if (!port) {
throw new Error("connect(port, hostname) port must be defined");
}
if(!hostname){
throw new Error("connect(port, hostname) hostname must be defined")
if (!hostname) {
throw new Error("connect(port, hostname) hostname must be defined");
}
this.port = port; // Standard TLS port for "IMAP"
this.port = port; // Standard TLS port for "IMAP"
this.hostname = hostname;
this.uri = this.hostname + ':' + this.port;
this.uri = this.hostname + ':' + this.port;
this._connect();
},

_connect:function(){
if(this.socket){
_connect: function () {
if (this.socket) {
this.socket.removeAllListeners();
this.socket.destroy();
}

Endpoint.log.info("[Endpoint] Connecting to "+this.uri+"...");
Endpoint.log.info("[Endpoint] Connecting to " + this.uri + "...");

this.socket = Endpoint.tls.connect(this.port, this.hostname, this.onConnected);
this.socket.on('data', this.onData);
this.socket.on('close', this.onClose);
this.socket.on('error', this.onError);

this.socket.setTimeout(0, function(){
this.socket.setTimeout(0, function () {
console.log('timeout');
});

this.socket.setNoDelay(true);
this.socket.setKeepAlive(true, 30);
},

reconnect:function(number, delay){
if(this.connected){
reconnect: function ( /*number, delay*/ ) {
if (this.connected) {
// If, between the .backoff() call and the call to reconnect
// we are already back online, don't do anything else
return this.reconnectBackoff.reset();
Expand All @@ -102,42 +101,48 @@ _.extend(Endpoint.prototype, EventEmitter.prototype, {
this._connect();
},

onConnected: function(){
onConnected: function () {
Endpoint.log.debug("[Endpoint] Connected");

this.connected = true;
this.reconnectBackoff.reset();

if(!this.handshaken){
if (!this.handshaken) {
this._sendHandshake();
}

this.emit('connect');
},

_sendHandshake: function(){
if(this.handshaken){return this.handshakenBackoff.reset();}
_sendHandshake: function () {
if (this.handshaken) {
return this.handshakenBackoff.reset();
}
Endpoint.log.debug("[Endpoint] Handshaking...");
this.socket.write(JSON.stringify({version:jsonPackage.version, key: this.key, auth: this.auth}));
this.socket.write(JSON.stringify({
version: jsonPackage.version,
key: this.key,
auth: this.auth
}));
},

onData:function(data){
if(!this.handshaken){
onData: function (data) {
if (!this.handshaken) {
data = (data || '').toString();
var handshake = data;

if(data.indexOf('*') === -1){
if (data.indexOf('*') === -1) {
data = null;
} else { // in case of multiple messages after the handshake
var idx = handshake.indexOf('*');
handshake = data.substr(0, idx);
data = data.substr(idx);
}

try{
try {
var json = JSON.parse(handshake);

if(json && json.error){
if (json && json.error) {
Endpoint.log.error('[Endpoint] Handshake failed: ' + json.error);
Endpoint.log.error('Edit configuration file with `redsmin set_key`, see http://bit.ly/YAIeAM');
Endpoint.log.error('Exiting...');
Expand All @@ -146,7 +151,7 @@ _.extend(Endpoint.prototype, EventEmitter.prototype, {
}


} catch(err){
} catch (err) {
Endpoint.log.error('[Endpoint] Bad handshake response:' + handshake);
Endpoint.log.error(err);
this.handshakenBackoff.reset();
Expand All @@ -156,7 +161,9 @@ _.extend(Endpoint.prototype, EventEmitter.prototype, {
Endpoint.log.debug('[Endpoint] Handshake succeeded');
this.handshaken = true;

if(!data){return;}
if (!data) {
return;
}
}

this.fnWrite(data);
Expand All @@ -165,35 +172,35 @@ _.extend(Endpoint.prototype, EventEmitter.prototype, {
/**
* Forward data from redis to the endpoint
*/
write: function(data){
write: function (data) {
this.socket.write(data);
},

/**
* If the connection to redsmin just closed, try to reconnect
* @param {Error} err
*/
onClose: function(sourceWasAnError){
onClose: function (sourceWasAnError) {
Endpoint.log.error("[Endpoint] Connection closed " + (sourceWasAnError ? 'because of an error' : ''));

this.handshakenBackoff.reset();
try{
try {
this.reconnectBackoff.backoff();
} catch(no_error){}
} catch (no_error) {}

this.connected = false;
this.handshaken = false;

this.emit('close', sourceWasAnError);
},

onError: function(err){
onError: function (err) {
console.log('onerror', err);
Endpoint.log.error('[Endpoint] Error ' + (err ? err.message : ''));
this.socket.destroy(); // End the socket
try{
try {
this.onClose(err ? err.message : '');
}catch(err){
} catch (err) {
console.error('err::onError', err);
}
}
Expand Down
Loading

0 comments on commit 79e3731

Please sign in to comment.