diff --git a/channel_api.js b/channel_api.js index 42e70429..7cb6c3fe 100644 --- a/channel_api.js +++ b/channel_api.js @@ -1,14 +1,14 @@ var raw_connect = require('./lib/connect').connect; var ChannelModel = require('./lib/channel_model').ChannelModel; -var defer = require('when').defer; +var Promise = require('bluebird'); function connect(url, connOptions) { - var opened = defer(); - raw_connect(url, connOptions, function(err, conn) { - if (err === null) opened.resolve(new ChannelModel(conn)); - else opened.reject(err); + return new Promise(function (resolve, reject) { + raw_connect(url, connOptions, function(err, conn) { + if (err === null) resolve(new ChannelModel(conn)); + else reject(err); + }); }); - return opened.promise; }; module.exports.connect = connect; diff --git a/lib/callback_model.js b/lib/callback_model.js index 1edd5317..26c72404 100644 --- a/lib/callback_model.js +++ b/lib/callback_model.js @@ -5,7 +5,7 @@ 'use strict'; var defs = require('./defs'); -var when = require('when'), defer = when.defer; +var Promise = require('bluebird'); var inherits = require('util').inherits; var EventEmitter = require('events').EventEmitter; var BaseChannel = require('./channel').BaseChannel; @@ -321,15 +321,15 @@ ConfirmChannel.prototype.waitForConfirms = function(k) { unconfirmed.forEach(function(val, index) { if (val === null); // already confirmed else { - var confirmed = defer(); - unconfirmed[index] = function(err) { - if (val) val(err); - if (err === null) confirmed.resolve(); - else confirmed.reject(err); - }; - await.push(confirmed.promise); + await.push(new Promise(function (resolve, reject) { + unconfirmed[index] = function(err) { + if (val) val(err); + if (err === null) resolve(); + else reject(err); + }; + })); } }); - return when.all(await).then(function() { k(); }, + return Promise.all(await).then(function() { k(); }, function(err) { k(err); }); }; diff --git a/lib/channel_model.js b/lib/channel_model.js index e4b204aa..c3406079 100644 --- a/lib/channel_model.js +++ b/lib/channel_model.js @@ -5,7 +5,7 @@ 'use strict'; var defs = require('./defs'); -var when = require('when'), defer = when.defer; +var Promise = require('bluebird'); var inherits = require('util').inherits; var EventEmitter = require('events').EventEmitter; var BaseChannel = require('./channel').BaseChannel; @@ -28,9 +28,10 @@ module.exports.ChannelModel = ChannelModel; var CM = ChannelModel.prototype; CM.close = function() { - var closed = defer(); - this.connection.close(closed.resolve); - return closed.promise; + var self = this; + return new Promise(function (resolve, reject) { + self.connection.close(resolve); + }); }; // Channels @@ -55,17 +56,18 @@ var C = Channel.prototype; // response's fields; this is intended to be suitable for implementing // API procedures. C.rpc = function(method, fields, expect) { - var reply = defer(); - this._rpc(method, fields, expect, function(err, f) { - if (err !== null) reply.reject(err); - else reply.resolve(f.fields); + var self = this; + return new Promise(function (resolve, reject) { + self._rpc(method, fields, expect, function(err, f) { + if (err !== null) reject(err); + else resolve(f.fields); + }); }); - return reply.promise; }; // Do the remarkably simple channel open handshake C.open = function() { - return when.try(this.allocate.bind(this)).then( + return Promise.try(this.allocate.bind(this)).then( function(ch) { return ch.rpc(defs.ChannelOpen, {outOfBand: ""}, defs.ChannelOpenOk); @@ -73,10 +75,10 @@ C.open = function() { }; C.close = function() { - var closed = defer(); - this.closeBecause("Goodbye", defs.constants.REPLY_SUCCESS, - closed.resolve) - return closed.promise; + var self = this; + return new Promise(function (resolve, reject) { + self.closeBecause("Goodbye", defs.constants.REPLY_SUCCESS, resolve); + }); }; // === Public API, declaring queues and stuff === @@ -166,58 +168,58 @@ C.consume = function(queue, callback, options) { // NB we want the callback to be run synchronously, so that we've // registered the consumerTag before any messages can arrive. var fields = Args.consume(queue, options); - var reply = defer(); - this._rpc(defs.BasicConsume, fields, defs.BasicConsumeOk, - function(err, ok) { - if (err === null) { - self.registerConsumer(ok.fields.consumerTag, - callback); - reply.resolve(ok.fields); - } - else reply.reject(err); - }); - return reply.promise; + return new Promise(function (resolve, reject) { + self._rpc(defs.BasicConsume, fields, defs.BasicConsumeOk, + function(err, ok) { + if (err === null) { + self.registerConsumer(ok.fields.consumerTag, + callback); + resolve(ok.fields); + } + else reject(err); + }); + }); }; C.cancel = function(consumerTag) { var self = this; - var reply = defer(); - this._rpc(defs.BasicCancel, Args.cancel(consumerTag), - defs.BasicCancelOk, - function(err, ok) { - if (err === null) { - self.unregisterConsumer(consumerTag); - reply.resolve(ok.fields); - } - else reply.reject(err); - }); - return reply.promise; + return new Promise(function (resolve, reject) { + self._rpc(defs.BasicCancel, Args.cancel(consumerTag), + defs.BasicCancelOk, + function(err, ok) { + if (err === null) { + self.unregisterConsumer(consumerTag); + resolve(ok.fields); + } + else reject(err); + }); + }); }; C.get = function(queue, options) { - var reply = defer(); var self = this; var fields = Args.get(queue, options); - this.sendOrEnqueue(defs.BasicGet, fields, function(err, f) { - if (err === null) { - if (f.id === defs.BasicGetEmpty) { - reply.resolve(false); - } - else if (f.id === defs.BasicGetOk) { - var fields = f.fields; - self.handleMessage = acceptMessage(function(m) { - m.fields = fields; - reply.resolve(m); - }); + return new Promise(function (resolve, reject) { + self.sendOrEnqueue(defs.BasicGet, fields, function(err, f) { + if (err === null) { + if (f.id === defs.BasicGetEmpty) { + resolve(false); + } + else if (f.id === defs.BasicGetOk) { + var fields = f.fields; + self.handleMessage = acceptMessage(function(m) { + m.fields = fields; + resolve(m); + }); + } + else { + reject(new Error("Unexpected response to BasicGet: " + + inspect(f))); + } } - else { - reply.reject(new Error("Unexpected response to BasicGet: " + - inspect(f))); - } - } - else reply.reject(err); + else reject(err); + }); }); - return reply.promise; }; C.ack = function(message, allUpTo) { @@ -309,14 +311,14 @@ CC.waitForConfirms = function() { unconfirmed.forEach(function(val, index) { if (val === null); // already confirmed else { - var confirmed = defer(); - unconfirmed[index] = function(err) { - if (val) val(err); - if (err === null) confirmed.resolve(); - else confirmed.reject(err); - }; - await.push(confirmed.promise); + await.push(new Promise(function (resolve, reject) { + unconfirmed[index] = function(err) { + if (val) val(err); + if (err === null) resolve(); + else reject(err); + }; + })); } }); - return when.all(await); + return Promise.all(await); }; diff --git a/package.json b/package.json index dca2dde4..6123ebf6 100644 --- a/package.json +++ b/package.json @@ -13,9 +13,9 @@ }, "dependencies": { "bitsyntax": "~0.0.4", + "bluebird": "^2.9.26", "buffer-more-ints": "0.0.2", - "readable-stream": "1.x >=1.1.9", - "when": "~3.6.2" + "readable-stream": "1.x >=1.1.9" }, "devDependencies": { "mocha": "~1", diff --git a/test/util.js b/test/util.js index 8f0f9a7d..10588773 100644 --- a/test/util.js +++ b/test/util.js @@ -5,7 +5,6 @@ var Connection = require('../lib/connection').Connection; var PassThrough = require('stream').PassThrough || require('readable-stream/passthrough'); -var defer = require('when').defer; var defs = require('../lib/defs'); var assert = require('assert'); @@ -78,24 +77,24 @@ function runServer(socket, run) { function await(method) { return function() { - var d = defer(); - if (method) { - frames.step(function(e, f) { - if (e !== null) return d.reject(e); - if (f.id === method) - d.resolve(f); - else - d.reject(new Error("Expected method: " + method + + return new Promise(function (resolve, reject) { + if (method) { + frames.step(function(e, f) { + if (e !== null) return reject(e); + if (f.id === method) + resolve(f); + else + reject(new Error("Expected method: " + method + ", got " + f.id)); - }); - } - else { - frames.step(function(e, f) { - if (e !== null) return d.reject(e); - else d.resolve(f); - }); - } - return d.promise; + }); + } + else { + frames.step(function(e, f) { + if (e !== null) return reject(e); + else resolve(f); + }); + } + }); }; } run(send, await);