Skip to content

Commit

Permalink
Merge pull request #12 from tinque/master
Browse files Browse the repository at this point in the history
Add AMQP Adapter
  • Loading branch information
ekryski committed Apr 21, 2016
2 parents c4bb742 + 13ea43d commit ebda566
Show file tree
Hide file tree
Showing 4 changed files with 149 additions and 2 deletions.
6 changes: 6 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,12 @@ Additionally you can pass the original sync options:
- __db__ - The Redis connection string (e.g. `redis://localhost:6379`) or database object
- __connect__ - A callback for when the Redis connection has been established

### Use with AMQP

- __uri__ - The AMQP connection string (e.g. `amqp://guest:guest@localhost:5672`)
- __prefix__ - A prefix that will be applied to all queues, exchanges and messages created by sync
- __amqpConnectionOptions__ - AMQP connection options

## Caveats

When listening to service events with this, all events are going to get propagated to all clients. This means, that your event listeners should not perform any actions that change the global state (e.g. write something into the database) because every client will perform the same action.
Expand Down
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
"src": "src"
},
"dependencies": {
"amqplib": "^0.4.1",
"debug": "^2.1.3",
"mubsub": "^1.0.4",
"redis": "^2.4.2"
Expand Down
132 changes: 132 additions & 0 deletions src/amqp.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
var amqplib = require('amqplib/callback_api');
var debug = require('debug')('feathers-sync');

var channel;
var conf ={};
var listSubscriber = [];

var exchangeName = 'feathers-sync';

var onmessage = function(ev,data){
if(listSubscriber[ev] && Array.isArray(listSubscriber[ev])){
listSubscriber[ev].forEach(function(cb){
cb(data);
});
}

};

var connection = function(cb){

amqplib.connect(conf.uri, conf.amqpConnectionOptions, function (err, conn){
if(err){
debug('Error while connecting to AMQP: ', err.toString());
}else{
channel = conn.createChannel();

channel.assertExchange(exchangeName, 'fanout', {durable: false});
channel.assertQueue('', {exclusive: true}, function(err, q) {
if(err){
debug('Error while assertQueue to AMQP: ', err.toString());
}else{
channel.bindQueue(q.queue, exchangeName, '');
if(typeof cb === 'function'){
cb(channel);
}
channel.consume(q.queue, function(msg) {
var temp = JSON.parse(msg.content.toString());
onmessage(temp.ev,temp.data);
}, {noAck: true});
}
});
}
});
};

var subscribe = function(ev,cb){
if(listSubscriber[ev]){
listSubscriber[ev].push(cb);
}else{
listSubscriber[ev] = [cb];
}
};

var publish = function(ev,data){
var msg = {};
msg.ev = ev;
msg.data = data;
//amqp name
if(channel){
channel.publish(exchangeName, '', new Buffer(JSON.stringify(msg)));
}else{
connection(function(channel){
channel.publish(exchangeName, '', new Buffer(JSON.stringify(msg)));
});
}

};

module.exports = function(config) {

debug('setting up AMQP uri %s', config.uri);
conf.uri = config.uri;
if(!config.prefix){
conf.prefix = '';
}else{
conf.prefix = config.prefix;
exchangeName = conf.prefix+'-feathers-sync';
}

conf.amqpConnectionOptions = {
//heartbeat: 30
};

if(config.amqpConnectionOptions){
conf.amqpConnectionOptions = config.amqpConnectionOptions;
}



connection();

return function() {
var oldSetup = this.setup;

this.setup = function() {
var result = oldSetup.apply(this, arguments);
var services = this.services;
Object.keys(services).forEach(function(path) {
var service = services[path];
service._serviceEvents.forEach(function(event) {
var ev = path + ' ' + event;
debug('subscribing to handler %s', ev);
subscribe(ev, function(data) {
debug('got event, calling old emit %s', ev);
service._emit.call(service, event, data);
});
});
});
return result;
};

this.providers.push(function(path, service) {
if(typeof service.emit !== 'function' || typeof service.on !== 'function') {
return;
}

// Store the old emit method
service._emit = service.emit;

// Override an emit that publishes to the hub
service.mixin({
emit: function(ev, data) {
var event = path + ' ' + ev;
debug('emitting event to channel %s', event);
return publish(event, data);
}
});
});


};
};
12 changes: 10 additions & 2 deletions src/sync.js
Original file line number Diff line number Diff line change
@@ -1,14 +1,20 @@
var debug = require('debug')('feathers-sync');
var mongo = require('./mongodb');
var redis = require('./redis');
var amqp = require('./amqp');

module.exports = function(config) {

var proto = '';
config = config || { db: 'mongodb://localhost:27017/sync' };

var proto = config.db.split('://')[0];
if(config.db){
proto = config.db.split('://')[0];
}else if(config.uri){
proto = config.uri.split('://')[0];
}

if(['mongodb', 'redis'].indexOf(proto) === -1){
if(['mongodb', 'redis','amqp'].indexOf(proto) === -1){
return debug('Adapter not found %s', proto);
}

Expand All @@ -18,6 +24,8 @@ module.exports = function(config) {
return mongo(config);
} else if(proto === 'redis') {
return redis(config);
}else if(proto === 'amqp'){
return amqp(config);
} else {
return;
}
Expand Down

0 comments on commit ebda566

Please sign in to comment.