Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Switch to bluebird #158

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions channel_api.js
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
var raw_connect = require('./lib/connect').connect;
var ChannelModel = require('./lib/channel_model').ChannelModel;
var defer = require('when').defer;
var Promise = require('bluebird');

function connect(url, connOptions) {
var opened = defer();
raw_connect(url, connOptions, function(err, conn) {
if (err === null) opened.resolve(new ChannelModel(conn));
else opened.reject(err);
return new Promise(function (resolve, reject) {
raw_connect(url, connOptions, function(err, conn) {
if (err === null) resolve(new ChannelModel(conn));
else reject(err);
});
});
return opened.promise;
};

module.exports.connect = connect;
Expand Down
18 changes: 9 additions & 9 deletions lib/callback_model.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
'use strict';

var defs = require('./defs');
var when = require('when'), defer = when.defer;
var Promise = require('bluebird');
var inherits = require('util').inherits;
var EventEmitter = require('events').EventEmitter;
var BaseChannel = require('./channel').BaseChannel;
Expand Down Expand Up @@ -321,15 +321,15 @@ ConfirmChannel.prototype.waitForConfirms = function(k) {
unconfirmed.forEach(function(val, index) {
if (val === null); // already confirmed
else {
var confirmed = defer();
unconfirmed[index] = function(err) {
if (val) val(err);
if (err === null) confirmed.resolve();
else confirmed.reject(err);
};
await.push(confirmed.promise);
await.push(new Promise(function (resolve, reject) {
unconfirmed[index] = function(err) {
if (val) val(err);
if (err === null) resolve();
else reject(err);
};
}));
}
});
return when.all(await).then(function() { k(); },
return Promise.all(await).then(function() { k(); },
function(err) { k(err); });
};
128 changes: 65 additions & 63 deletions lib/channel_model.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
'use strict';

var defs = require('./defs');
var when = require('when'), defer = when.defer;
var Promise = require('bluebird');
var inherits = require('util').inherits;
var EventEmitter = require('events').EventEmitter;
var BaseChannel = require('./channel').BaseChannel;
Expand All @@ -28,9 +28,10 @@ module.exports.ChannelModel = ChannelModel;
var CM = ChannelModel.prototype;

CM.close = function() {
var closed = defer();
this.connection.close(closed.resolve);
return closed.promise;
var self = this;
return new Promise(function (resolve, reject) {
self.connection.close(resolve);
});
};

// Channels
Expand All @@ -55,28 +56,29 @@ var C = Channel.prototype;
// response's fields; this is intended to be suitable for implementing
// API procedures.
C.rpc = function(method, fields, expect) {
var reply = defer();
this._rpc(method, fields, expect, function(err, f) {
if (err !== null) reply.reject(err);
else reply.resolve(f.fields);
var self = this;
return new Promise(function (resolve, reject) {
self._rpc(method, fields, expect, function(err, f) {
if (err !== null) reject(err);
else resolve(f.fields);
});
});
return reply.promise;
};

// Do the remarkably simple channel open handshake
C.open = function() {
return when.try(this.allocate.bind(this)).then(
return Promise.try(this.allocate.bind(this)).then(
function(ch) {
return ch.rpc(defs.ChannelOpen, {outOfBand: ""},
defs.ChannelOpenOk);
});
};

C.close = function() {
var closed = defer();
this.closeBecause("Goodbye", defs.constants.REPLY_SUCCESS,
closed.resolve)
return closed.promise;
var self = this;
return new Promise(function (resolve, reject) {
self.closeBecause("Goodbye", defs.constants.REPLY_SUCCESS, resolve);
});
};

// === Public API, declaring queues and stuff ===
Expand Down Expand Up @@ -166,58 +168,58 @@ C.consume = function(queue, callback, options) {
// NB we want the callback to be run synchronously, so that we've
// registered the consumerTag before any messages can arrive.
var fields = Args.consume(queue, options);
var reply = defer();
this._rpc(defs.BasicConsume, fields, defs.BasicConsumeOk,
function(err, ok) {
if (err === null) {
self.registerConsumer(ok.fields.consumerTag,
callback);
reply.resolve(ok.fields);
}
else reply.reject(err);
});
return reply.promise;
return new Promise(function (resolve, reject) {
self._rpc(defs.BasicConsume, fields, defs.BasicConsumeOk,
function(err, ok) {
if (err === null) {
self.registerConsumer(ok.fields.consumerTag,
callback);
resolve(ok.fields);
}
else reject(err);
});
});
};

C.cancel = function(consumerTag) {
var self = this;
var reply = defer();
this._rpc(defs.BasicCancel, Args.cancel(consumerTag),
defs.BasicCancelOk,
function(err, ok) {
if (err === null) {
self.unregisterConsumer(consumerTag);
reply.resolve(ok.fields);
}
else reply.reject(err);
});
return reply.promise;
return new Promise(function (resolve, reject) {
self._rpc(defs.BasicCancel, Args.cancel(consumerTag),
defs.BasicCancelOk,
function(err, ok) {
if (err === null) {
self.unregisterConsumer(consumerTag);
resolve(ok.fields);
}
else reject(err);
});
});
};

C.get = function(queue, options) {
var reply = defer();
var self = this;
var fields = Args.get(queue, options);
this.sendOrEnqueue(defs.BasicGet, fields, function(err, f) {
if (err === null) {
if (f.id === defs.BasicGetEmpty) {
reply.resolve(false);
}
else if (f.id === defs.BasicGetOk) {
var fields = f.fields;
self.handleMessage = acceptMessage(function(m) {
m.fields = fields;
reply.resolve(m);
});
return new Promise(function (resolve, reject) {
self.sendOrEnqueue(defs.BasicGet, fields, function(err, f) {
if (err === null) {
if (f.id === defs.BasicGetEmpty) {
resolve(false);
}
else if (f.id === defs.BasicGetOk) {
var fields = f.fields;
self.handleMessage = acceptMessage(function(m) {
m.fields = fields;
resolve(m);
});
}
else {
reject(new Error("Unexpected response to BasicGet: " +
inspect(f)));
}
}
else {
reply.reject(new Error("Unexpected response to BasicGet: " +
inspect(f)));
}
}
else reply.reject(err);
else reject(err);
});
});
return reply.promise;
};

C.ack = function(message, allUpTo) {
Expand Down Expand Up @@ -309,14 +311,14 @@ CC.waitForConfirms = function() {
unconfirmed.forEach(function(val, index) {
if (val === null); // already confirmed
else {
var confirmed = defer();
unconfirmed[index] = function(err) {
if (val) val(err);
if (err === null) confirmed.resolve();
else confirmed.reject(err);
};
await.push(confirmed.promise);
await.push(new Promise(function (resolve, reject) {
unconfirmed[index] = function(err) {
if (val) val(err);
if (err === null) resolve();
else reject(err);
};
}));
}
});
return when.all(await);
return Promise.all(await);
};
4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@
},
"dependencies": {
"bitsyntax": "~0.0.4",
"bluebird": "^2.9.26",
"buffer-more-ints": "0.0.2",
"readable-stream": "1.x >=1.1.9",
"when": "~3.6.2"
"readable-stream": "1.x >=1.1.9"
},
"devDependencies": {
"mocha": "~1",
Expand Down
35 changes: 17 additions & 18 deletions test/util.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ var Connection = require('../lib/connection').Connection;
var PassThrough =
require('stream').PassThrough ||
require('readable-stream/passthrough');
var defer = require('when').defer;
var defs = require('../lib/defs');
var assert = require('assert');

Expand Down Expand Up @@ -78,24 +77,24 @@ function runServer(socket, run) {

function await(method) {
return function() {
var d = defer();
if (method) {
frames.step(function(e, f) {
if (e !== null) return d.reject(e);
if (f.id === method)
d.resolve(f);
else
d.reject(new Error("Expected method: " + method +
return new Promise(function (resolve, reject) {
if (method) {
frames.step(function(e, f) {
if (e !== null) return reject(e);
if (f.id === method)
resolve(f);
else
reject(new Error("Expected method: " + method +
", got " + f.id));
});
}
else {
frames.step(function(e, f) {
if (e !== null) return d.reject(e);
else d.resolve(f);
});
}
return d.promise;
});
}
else {
frames.step(function(e, f) {
if (e !== null) return reject(e);
else resolve(f);
});
}
});
};
}
run(send, await);
Expand Down