Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Don't crash when passed a null client id #2

Open
wants to merge 14 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitmodules
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
[submodule "vendor/faye"]
path = vendor/faye
url = https://github.com/zwily/faye.git
11 changes: 11 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
language: node_js

node_js:
- "0.10"
- "0.11"

services:
- redis-server

before_script:
- make
5 changes: 5 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
prepare:
git submodule update --init --recursive
cd vendor/faye && npm install
cd vendor/faye && ./node_modules/.bin/wake
npm install
20 changes: 18 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# faye-redis-sharded
# faye-redis-sharded [![Build Status](https://travis-ci.org/zwily/faye-redis-sharded-node.svg?branch=master)](https://travis-ci.org/zwily/faye-redis-sharded-node)

This plugin provides a Redis-based backend for the [Faye](http://faye.jcoglan.com)
messaging server. It allows a single Faye service to be distributed across many
Expand Down Expand Up @@ -30,7 +30,16 @@ var bayeux = new faye.NodeAdapter({
timeout: 25,
engine: {
type: redis,
hosts: ['redis-server-1:6397','redis-server-1:6380', 'redis-server-2:6379']
shards: [{
host: 'redis-server-1',
port: 6397
}, {
host: 'redis-server-1',
port: 6380
}, {
host: 'redis-server-2',
port: 6379
}]
// more options
}
});
Expand Down Expand Up @@ -117,6 +126,13 @@ one server.
4. Ability to provide a custom shard manager so users can add their own implementations.
5. Ability to perform resharding of keys when hosts change

## Running Tests

```bash
$ make
$ npm test
```

## License

(The MIT License)
Expand Down
197 changes: 107 additions & 90 deletions lib/faye-redis-sharded.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
var redis = require('redis'),
ShardManager = require('./sharding/shard-manager');
async = require('async'),
_ = require('lodash'),
ShardManager = require('./sharding/shard-manager');

var Engine = function (server, options) {
var self = this;
Expand Down Expand Up @@ -30,22 +32,31 @@ var Engine = function (server, options) {
var shardsList = [];
shards.forEach(function (shard) {
var client,
subscriber;
subscriber,
shardName = shard.shardName || (shard.host + ':' + shard.port),
closeChannel = self._ns + '/notifications/' + shardName + '/close';

client = connectRedis(shard, redisOptions, onRedisError);
if (isPrimary) {
subscriber = connectRedis(shard, redisOptions, onRedisError);
subscriber.on('message', function (topic, message) {
self._server.debug('Got message for ?', message);
self.emptyQueue(message);
if (topic === closeChannel) {
self._server.debug('Got close for ?', message);
self._server.trigger('close', message);
} else {
self._server.debug('Got message for ?', message);
self.emptyQueue(message);
}
});

subscriber.subscribe(closeChannel);
}

var shardName = shard.shardName || (shard.host + ':' + shard.port);
var newShard = {
redis:client,
subscriber:subscriber,
shardName:shardName
redis: client,
subscriber: subscriber,
shardName: shardName,
closeChannel: closeChannel
};

shardsList.push({ shardName:shardName, shard:newShard});
Expand Down Expand Up @@ -125,6 +136,9 @@ Engine.prototype = {
}
}

this._server.unbind('connection:open');
this._server.unbind('connection:close');

clearInterval(this._gc);

this._shardManagers.forEach(function (shardManager) {
Expand All @@ -137,23 +151,31 @@ Engine.prototype = {
this._getShard(clientId).redis.zadd(this._ns + '/clients', 0, clientId, function (error, added) {
if (added === 0) return self.createClient(callback, context);
self._server.debug('Created new client ?', clientId);
self.ping(clientId);
self._server.trigger('handshake', clientId);
callback.call(context, clientId);
});
},

clientExists:function (clientId, callback, context) {
if (!clientId) {
callback.call(context, false);
return;
}

var cutoff = new Date().getTime() - (1000 * 1.6 * this._server.timeout);

var redis = this._getShard(clientId).redis;
redis.zscore(this._ns + '/clients', clientId, function (error, score) {
callback.call(context, score !== null);
callback.call(context, parseInt(score, 10) > cutoff);
});
},

destroyClient:function (clientId, callback, context) {
var self = this,
shard = this._getShard(clientId),
redis = shard.redis,
subscriber = shard.subscriber;
shard = this._getShard(clientId),
redis = shard.redis,
subscriber = shard.subscriber;

subscriber.unsubscribe(self._ns + '/' + clientId + '/notify');
redis.smembers(this._ns + '/clients/' + clientId + '/channels', function (err, channels) {
Expand All @@ -162,39 +184,34 @@ Engine.prototype = {
return;
}

var n = channels.length, i = 0;
if (i === n) return self._afterSubscriptionsRemoved(clientId, callback, context);

var unsubscribeError = null;
channels.forEach(function (channel) {
self.unsubscribe(clientId, channel, function (err) {
unsubscribeError = unsubscribeError || err;
i += 1;
if (i === n) {
if (unsubscribeError) {
if (callback) callback.call(context);
} else {
self._afterSubscriptionsRemoved(clientId, callback, context);
}
// unsubscribe from all channels...
async.parallel(channels.map(function(channel) {
return function(done) {
self.unsubscribe(clientId, channel, done);
};
}), function(err) {
// ... and then clear out the client, and trigger a close event.
if (err) {
if (callback) callback.call(context);
return;
}

var multi = redis.multi();

multi.del(self._ns + '/clients/' + clientId + '/messages');
multi.zrem(self._ns + '/clients', clientId);
multi.publish(shard.closeChannel, clientId);

multi.exec(function(err, results) {
if (err) {
if (callback) callback.call(context);
return;
}
});
});
});
},

_afterSubscriptionsRemoved:function (clientId, callback, context) {
var self = this,
redis = this._getShard(clientId).redis;
redis.del(this._ns + '/clients/' + clientId + '/messages', function (err) {
if (err) {
if (callback) callback.call(context);
return;
}

redis.zrem(self._ns + '/clients', clientId, function () {
self._server.debug('Destroyed client ?', clientId);
self._server.trigger('disconnect', clientId);
if (callback) callback.call(context);
self._server.debug('Destroyed client ?', clientId);
self._server.trigger('disconnect', clientId);
if (callback) callback.call(context);
});
});
});
},
Expand Down Expand Up @@ -252,53 +269,53 @@ Engine.prototype = {
shardMap[shardName].channels.push(channel);
});

Object.keys(shardMap).forEach(function (shardName) {
var map = shardMap[shardName],
shard = map.shard,
channels = map.channels;
// We need to query all shards to get the clients subscribed to the given channels,
// and then dedupe those clientIds because each client should only get exactly
// *one* copy of the message.

self._publish(message, channels, shard, shardManager);
function getClientsForChannels(message, channels, channelShard, done) {
var keys = channels.map(function (c) {
return self._ns + '/channels' + c;
});

channelShard.redis.sunion.apply(channelShard.redis, keys.concat(done));
}

async.parallel(_.map(shardMap, function(shardInfo) {
// build up a list of clientIds to deliver the message to across shards...
return function(done) {
getClientsForChannels(message, shardInfo.channels, shardInfo.shard, done);
};
}), function(err, results) {
if (err) {
self._server.error('Error getting list of clients to publish to: ?', err.message);
return;
};

var jsonMessage = JSON.stringify(message);

// ... then uniquify it and deliver.
_.uniq(_.flatten(results)).forEach(function(clientId) {
var shard = self._getShard(clientId, shardManager),
redis = shard.redis;
self._server.debug('Queueing for client ?: ?', clientId, message);
redis.rpush(self._ns + '/clients/' + clientId + '/messages', jsonMessage, function (err, result) {
redis.publish(self._ns + '/' + clientId + '/notify', clientId);
self._server.debug('Published for client ? - ? - to server ?', clientId, message, shard.shardName);
});
});
});
}

// publish to all shard managers
function broadcast(message, channels) {
self._shardManagers.forEach(function (shardManager) {
publish(message, channels, shardManager);
})
}

broadcast(message, channels);
self._shardManagers.forEach(function (shardManager) {
publish(message, channels, shardManager);
})

this._server.debug('Publishing message ?', message);
this._server.trigger('publish', message.clientId, message.channel, message.data);
},

_publish:function (message, channels, channelShard, shardManager) {
var self = this,
jsonMessage = JSON.stringify(message),
keys = channels.map(function (c) {
return self._ns + '/channels' + c;
});

var notify = function (error, clients) {
if (error) return;

clients.forEach(function (clientId) {
var shard = self._getShard(clientId, shardManager),
redis = shard.redis;
self._server.debug('Queueing for client ?: ?', clientId, message);
redis.rpush(self._ns + '/clients/' + clientId + '/messages', jsonMessage, function (err, result) {
redis.publish(self._ns + '/' + clientId + '/notify', clientId);
self._server.debug('Published for client ? - ? - to server ?', clientId, message, shard.shardName);
});
});
};

keys.push(notify);
channelShard.redis.sunion.apply(channelShard.redis, keys); // get all clients subscribed to the channels on that shard, then call notify on them
},

emptyQueue:function (clientId) {
if (!this._server.hasConnection(clientId)) {
this._server.debug('Does not have connection for: ?', clientId);
Expand All @@ -322,7 +339,11 @@ Engine.prototype = {
});

multi.del(key);
multi.exec();
multi.exec(function(err) {
if (err) {
self._server.error('redis error from exec: ?', err.message);
}
});
},

gc:function () {
Expand All @@ -338,15 +359,11 @@ Engine.prototype = {
shard.redis.zrangebyscore(this._ns + '/clients', 0, cutoff, function (error, clients) {
if (error) return releaseLock();

var i = 0, n = clients.length;
if (i === n) return releaseLock();

clients.forEach(function (clientId) {
this.destroyClient(clientId, function () {
i += 1;
if (i === n) releaseLock();
}, this);
}, self);
async.parallel(clients.map(function(clientId) {
return function(done) {
self.destroyClient(clientId, done);
};
}), releaseLock);
});
}, self);
});
Expand Down
Loading