Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: switch to native Promise #624

Closed
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
language: node_js
node_js:
- "0.8"
- "0.10"
- "0.12"
- "iojs-v1"
- "iojs-v2"
Expand Down
11 changes: 8 additions & 3 deletions channel_api.js
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
var raw_connect = require('./lib/connect').connect;
var ChannelModel = require('./lib/channel_model').ChannelModel;
var Promise = require('bluebird');

function connect(url, connOptions) {
return Promise.fromCallback(function(cb) {
return raw_connect(url, connOptions, cb);
return new Promise(function (resolve, reject) {
raw_connect(url, connOptions, function (err, result) {
if (err) {
reject(err);
} else {
resolve(result);
}
})
})
.then(function(conn) {
return new ChannelModel(conn);
Expand Down
3 changes: 1 addition & 2 deletions examples/tutorials/receive_logs_direct.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
#!/usr/bin/env node

var amqp = require('amqplib');
var all = require('bluebird').all;
var basename = require('path').basename;

var severities = process.argv.slice(2);
Expand All @@ -24,7 +23,7 @@ amqp.connect('amqp://localhost').then(function(conn) {

ok = ok.then(function(qok) {
var queue = qok.queue;
return all(severities.map(function(sev) {
return Promise.all(severities.map(function(sev) {
ch.bindQueue(queue, ex, sev);
})).then(function() { return queue; });
});
Expand Down
3 changes: 1 addition & 2 deletions examples/tutorials/receive_logs_topic.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

var amqp = require('amqplib');
var basename = require('path').basename;
var all = require('bluebird').all;

var keys = process.argv.slice(2);
if (keys.length < 1) {
Expand All @@ -23,7 +22,7 @@ amqp.connect('amqp://localhost').then(function(conn) {

ok = ok.then(function(qok) {
var queue = qok.queue;
return all(keys.map(function(rk) {
return Promise.all(keys.map(function(rk) {
ch.bindQueue(queue, ex, rk);
})).then(function() { return queue; });
});
Expand Down
1 change: 0 additions & 1 deletion examples/tutorials/rpc_client.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

var amqp = require('amqplib');
var basename = require('path').basename;
var Promise = require('bluebird');
var uuid = require('node-uuid');

// I've departed from the form of the original RPC tutorial, which
Expand Down
1 change: 0 additions & 1 deletion lib/callback_model.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
'use strict';

var defs = require('./defs');
var Promise = require('bluebird');
var inherits = require('util').inherits;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unrelated to this PR but i just notices inherits, NodeJS docs mention discourage use of inherits and that class extends should be used instead.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll open PR for this

var EventEmitter = require('events').EventEmitter;
var BaseChannel = require('./channel').BaseChannel;
Expand Down
78 changes: 66 additions & 12 deletions lib/channel_model.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
'use strict';

var defs = require('./defs');
var Promise = require('bluebird');
var Bluebird = require('bluebird');
var inherits = require('util').inherits;
var EventEmitter = require('events').EventEmitter;
var BaseChannel = require('./channel').BaseChannel;
Expand All @@ -29,7 +29,17 @@ module.exports.ChannelModel = ChannelModel;
var CM = ChannelModel.prototype;

CM.close = function() {
return Promise.fromCallback(this.connection.close.bind(this.connection));
var close = this.connection.close.bind(this.connection);

return new Promise(function (resolve, reject) {
close(function (err, result) {
if (err) {
reject(err);
} else {
resolve(result);
}
});
});
};

// Channels
Expand All @@ -55,17 +65,31 @@ var C = Channel.prototype;
// API procedures.
C.rpc = function(method, fields, expect) {
var self = this;
return Promise.fromCallback(function(cb) {
return self._rpc(method, fields, expect, cb);
})
.then(function(f) {

return new Promise(function (resolve, reject) {
self._rpc(method, fields, expect, function (err, result) {
if (err) {
reject(err);
} else {
resolve(result);
}
});
}).then(function (f) {
return f.fields;
});
};

// Do the remarkably simple channel open handshake
C.open = function() {
return Promise.try(this.allocate.bind(this)).then(
var allocate = this.allocate.bind(this);

return new Promise(function (resolve, reject) {
try {
resolve(allocate());
} catch (e) {
reject(e);
}
}).then(
function(ch) {
return ch.rpc(defs.ChannelOpen, {outOfBand: ""},
defs.ChannelOpenOk);
Expand All @@ -74,10 +98,16 @@ C.open = function() {

C.close = function() {
var self = this;
return Promise.fromCallback(function(cb) {

return Bluebird.fromCallback(function(cb) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Am I misunderstanding something, or promisify could be used here instead of Bluebird?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After dropping old Node version — it could

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only Node 10+ is supported in master, so should be fine to use it now.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Replaced with util.promisify

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

btw .travis.yml in main still builds library for old Node.js versions

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I think it was just missed in 267dd4e. Probably Travis needs to be adjusted to only test for Node 10+ as well.

return self.closeBecause("Goodbye", defs.constants.REPLY_SUCCESS,
cb);
});
// return new Promise(function(resolve, reject) {
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@squaremo could you help with those three places (lines 106, 203 and 243)

If I replace Bluebird.fromCallback with commented code it seems that channel.accept is called after this promise resolved. So the behavior changes and I have no idea why. I'm sure you have more context with this.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be commented out?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I need some help with this.

If I replace Bluebird.fromCallback with commented code it seems that channel.accept is called after this promise resolved. So the behavior changes and I have no idea why. I'm sure you have more context with this.

// return self.closeBecause("Goodbye", defs.constants.REPLY_SUCCESS,
// (err, result) => err ? reject(err) : resolve(result)
// );
// });
};

// === Public API, declaring queues and stuff ===
Expand Down Expand Up @@ -167,9 +197,18 @@ C.consume = function(queue, callback, options) {
// NB we want the callback to be run synchronously, so that we've
// registered the consumerTag before any messages can arrive.
var fields = Args.consume(queue, options);
return Promise.fromCallback(function(cb) {
return Bluebird.fromCallback(function(cb) {
self._rpc(defs.BasicConsume, fields, defs.BasicConsumeOk, cb);
})
// return new Promise(function(resolve, reject) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be commented out?

// self._rpc(defs.BasicConsume, fields, defs.BasicConsumeOk, function (err, result) {
// if (err) {
// reject(err);
// } else {
// resolve(result);
// }
// });
// })
.then(function(ok) {
self.registerConsumer(ok.fields.consumerTag, callback);
return ok.fields;
Expand All @@ -178,10 +217,16 @@ C.consume = function(queue, callback, options) {

C.cancel = function(consumerTag) {
var self = this;
return Promise.fromCallback(function(cb) {
return new Promise(function(resolve, reject) {
self._rpc(defs.BasicCancel, Args.cancel(consumerTag),
defs.BasicCancelOk,
cb);
function (err, result) {
if (err) {
reject(err);
} else {
resolve(result);
}
});
})
.then(function(ok) {
self.unregisterConsumer(consumerTag);
Expand All @@ -192,9 +237,18 @@ C.cancel = function(consumerTag) {
C.get = function(queue, options) {
var self = this;
var fields = Args.get(queue, options);
return Promise.fromCallback(function(cb) {
return Bluebird.fromCallback(function(cb) {
return self.sendOrEnqueue(defs.BasicGet, fields, cb);
})
// return new Promise(function(resolve, reject) {
// return self.sendOrEnqueue(defs.BasicGet, fields, function (err, result) {
// if (err) {
// reject(err);
// } else {
// resolve(result);
// }
// });
// })
.then(function(f) {
if (f.id === defs.BasicGetEmpty) {
return false;
Expand Down
21 changes: 14 additions & 7 deletions test/channel.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
'use strict';

var assert = require('assert');
var Promise = require('bluebird');
var Channel = require('../lib/channel').Channel;
var Connection = require('../lib/connection').Connection;
var util = require('./util');
Expand Down Expand Up @@ -77,11 +76,19 @@ var DELIVER_FIELDS = {
};

function open(ch) {
return Promise.try(function() {
ch.allocate();
return Promise.fromCallback(function(cb) {
ch._rpc(defs.ChannelOpen, {outOfBand: ''}, defs.ChannelOpenOk, cb);
});
return new Promise(function (resolve, reject) {
try {
ch.allocate();
ch._rpc(defs.ChannelOpen, {outOfBand: ''}, defs.ChannelOpenOk, function (err, done) {
if (err) {
reject(err);
} else {
resolve(done);
}
});
} catch (e) {
reject(e);
}
});
}

Expand Down Expand Up @@ -286,7 +293,7 @@ test("RPC on closed channel", channelTest(
failureCb(resolve, reject));
});

Promise.join(close, fail1, fail2)
Promise.all([close, fail1, fail2])
.then(succeed(done))
.catch(fail(done));
},
Expand Down
Loading