From 8e377acf594e68ad6f6fad4885ca9ffe7de724fc Mon Sep 17 00:00:00 2001 From: David Halls Date: Wed, 22 May 2013 22:58:41 +0100 Subject: [PATCH] When publishing message, pass qos, clientId and messageId to ascoltatore. When forwarding message, pass on qos and messageId to client. When subscribing, pass qos to asoltatore. Close ascoltatore when server is closed. Tests for passing on qos and messageId. --- lib/server.js | 70 +++++++++++++++++++++++++++------------------ package.json | 2 +- test/server_spec.js | 60 +++++++++++++++++++++++++++++++++++++- 3 files changed, 102 insertions(+), 30 deletions(-) diff --git a/lib/server.js b/lib/server.js index 00b6899..2e03c42 100644 --- a/lib/server.js +++ b/lib/server.js @@ -155,15 +155,17 @@ Server.prototype.close = function(callback) { that.closeConn(that.clients[id], cb); }; }), function() { - that.once("closed", callback); - try { - that.server.close(function() { - debug("closed"); - that.emit("closed"); - }); - } catch (exception) { - callback(exception); - } + that.ascoltatore.close(function () { + that.once("closed", callback); + try { + that.server.close(function() { + debug("closed"); + that.emit("closed"); + }); + } catch (exception) { + callback(exception); + } + }); }); }; @@ -219,15 +221,16 @@ Server.prototype.serve = function(client) { } }; - var forward = function(topic, payload, retry) { + var forward = function(topic, payload, options) { debug("delivering message on " + topic + " to " + client.id); - var qos = client.subscriptions[topic]; + var qos = Math.min((options && options.qos) || 0, + client.subscriptions[topic]); var packet = { topic: topic, payload: payload, qos: qos, - messageId: client.nextId++ + messageId: (options && options.messageId) || client.nextId++ }; actualSend(packet, 0); @@ -327,11 +330,15 @@ Server.prototype.serve = function(client) { return; } - that.ascoltatore.subscribe(s.topic.replace("#", "*"), forward, function() { - debug("subscribed " + client.id + " to " + s.topic); - client.subscriptions[s.topic] = s.qos; - cb(); - }); + that.ascoltatore.subscribe( + s.topic.replace("#", "*"), + { qos: s.qos }, + forward, + function() { + debug("subscribed " + client.id + " to " + s.topic); + client.subscriptions[s.topic] = s.qos; + cb(); + }); }); }; }), function(err) { @@ -353,17 +360,21 @@ Server.prototype.serve = function(client) { return; } - that.ascoltatore.publish(packet.topic, packet.payload, function() { - debug("client " + client.id + " published packet to topic " + packet.topic); - - if (packet.qos === 1) { - client.puback({ - messageId: packet.messageId - }); - } + that.ascoltatore.publish( + packet.topic, + packet.payload, + { qos: packet.qos, clientId: client.id, messageId: packet.messageId }, + function() { + debug("client " + client.id + " published packet to topic " + packet.topic); + + if (packet.qos === 1) { + client.puback({ + messageId: packet.messageId + }); + } - that.emit("published", packet, client); - }); + that.emit("published", packet, client); + }); }); }); @@ -386,7 +397,10 @@ Server.prototype.serve = function(client) { if (client.will) { debug("delivering last will for client " + client.id + " to topic " + client.will.topic); - that.ascoltatore.publish(client.will.topic, client.will.payload); + that.ascoltatore.publish( + client.will.topic, + client.will.payload, + { qos: client.will.qos, clientId: client.id }); } unsubAndClose(); diff --git a/package.json b/package.json index 8558326..a353cf7 100644 --- a/package.json +++ b/package.json @@ -46,7 +46,7 @@ "dependencies": { "mqtt": "~0.2.6", "async": "~0.2.4", - "ascoltatori": "~0.4.1", + "ascoltatori": "~0.5.0", "debug": "~0.7.2", "commander": "~1.1.1", "minimatch": "~0.2.11" diff --git a/test/server_spec.js b/test/server_spec.js index aee166b..9886c64 100644 --- a/test/server_spec.js +++ b/test/server_spec.js @@ -691,7 +691,7 @@ describe("mosca.Server", function() { client.on("suback", function(packet) { client.publish({ topic: "hello", - qos: 0, + qos: 1, messageId: 24 }); }); @@ -709,6 +709,64 @@ describe("mosca.Server", function() { }); }); + it("should receive published messageId", function(done) { + buildAndConnect(done, function(client) { + + client.once("publish", function(packet) { + expect(packet.messageId).to.be.equal(24); + client.disconnect(); + }); + + client.on("suback", function(packet) { + client.publish({ + topic: "hello", + qos: 1, + messageId: 24 + }); + }); + + var subscriptions = [{ + topic: "hello", + qos: 1 + } + ]; + + client.subscribe({ + subscriptions: subscriptions, + messageId: 42 + }); + }); + }); + + it("should receive all messages at QoS 0 if a subscription is done with QoS 0", function(done) { + buildAndConnect(done, function(client) { + + client.once("publish", function(packet) { + expect(packet.qos).to.be.equal(0); + client.disconnect(); + }); + + client.on("suback", function(packet) { + client.publish({ + topic: "hello", + qos: 1, + messageId: 24 + }); + }); + + var subscriptions = [{ + topic: "hello", + qos: 0 + } + ]; + + client.subscribe({ + subscriptions: subscriptions, + messageId: 42 + }); + }); + }); + it("should support will message", function(done) { async.waterfall([