Skip to content
This repository has been archived by the owner on Feb 11, 2020. It is now read-only.

Commit

Permalink
When publishing message, pass qos, clientId and messageId to ascoltat…
Browse files Browse the repository at this point in the history
…ore.

When forwarding message, pass on qos and messageId to client.
When subscribing, pass qos to asoltatore.
Close ascoltatore when server is closed.
Tests for passing on qos and messageId.
  • Loading branch information
davedoesdev committed May 22, 2013
1 parent 70afaf3 commit 8e377ac
Show file tree
Hide file tree
Showing 3 changed files with 102 additions and 30 deletions.
70 changes: 42 additions & 28 deletions lib/server.js
Original file line number Diff line number Diff line change
Expand Up @@ -155,15 +155,17 @@ Server.prototype.close = function(callback) {
that.closeConn(that.clients[id], cb);
};
}), function() {
that.once("closed", callback);
try {
that.server.close(function() {
debug("closed");
that.emit("closed");
});
} catch (exception) {
callback(exception);
}
that.ascoltatore.close(function () {
that.once("closed", callback);
try {
that.server.close(function() {
debug("closed");
that.emit("closed");
});
} catch (exception) {
callback(exception);
}
});
});
};

Expand Down Expand Up @@ -219,15 +221,16 @@ Server.prototype.serve = function(client) {
}
};

var forward = function(topic, payload, retry) {
var forward = function(topic, payload, options) {
debug("delivering message on " + topic + " to " + client.id);

var qos = client.subscriptions[topic];
var qos = Math.min((options && options.qos) || 0,
client.subscriptions[topic]);
var packet = {
topic: topic,
payload: payload,
qos: qos,
messageId: client.nextId++
messageId: (options && options.messageId) || client.nextId++
};

actualSend(packet, 0);
Expand Down Expand Up @@ -327,11 +330,15 @@ Server.prototype.serve = function(client) {
return;
}

that.ascoltatore.subscribe(s.topic.replace("#", "*"), forward, function() {
debug("subscribed " + client.id + " to " + s.topic);
client.subscriptions[s.topic] = s.qos;
cb();
});
that.ascoltatore.subscribe(
s.topic.replace("#", "*"),
{ qos: s.qos },
forward,
function() {
debug("subscribed " + client.id + " to " + s.topic);
client.subscriptions[s.topic] = s.qos;
cb();
});
});
};
}), function(err) {
Expand All @@ -353,17 +360,21 @@ Server.prototype.serve = function(client) {
return;
}

that.ascoltatore.publish(packet.topic, packet.payload, function() {
debug("client " + client.id + " published packet to topic " + packet.topic);

if (packet.qos === 1) {
client.puback({
messageId: packet.messageId
});
}
that.ascoltatore.publish(
packet.topic,
packet.payload,
{ qos: packet.qos, clientId: client.id, messageId: packet.messageId },
function() {
debug("client " + client.id + " published packet to topic " + packet.topic);

if (packet.qos === 1) {
client.puback({
messageId: packet.messageId
});
}

that.emit("published", packet, client);
});
that.emit("published", packet, client);
});
});
});

Expand All @@ -386,7 +397,10 @@ Server.prototype.serve = function(client) {

if (client.will) {
debug("delivering last will for client " + client.id + " to topic " + client.will.topic);
that.ascoltatore.publish(client.will.topic, client.will.payload);
that.ascoltatore.publish(
client.will.topic,
client.will.payload,
{ qos: client.will.qos, clientId: client.id });
}

unsubAndClose();
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
"dependencies": {
"mqtt": "~0.2.6",
"async": "~0.2.4",
"ascoltatori": "~0.4.1",
"ascoltatori": "~0.5.0",
"debug": "~0.7.2",
"commander": "~1.1.1",
"minimatch": "~0.2.11"
Expand Down
60 changes: 59 additions & 1 deletion test/server_spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -691,7 +691,7 @@ describe("mosca.Server", function() {
client.on("suback", function(packet) {
client.publish({
topic: "hello",
qos: 0,
qos: 1,
messageId: 24
});
});
Expand All @@ -709,6 +709,64 @@ describe("mosca.Server", function() {
});
});

it("should receive published messageId", function(done) {
buildAndConnect(done, function(client) {

client.once("publish", function(packet) {
expect(packet.messageId).to.be.equal(24);
client.disconnect();
});

client.on("suback", function(packet) {
client.publish({
topic: "hello",
qos: 1,
messageId: 24
});
});

var subscriptions = [{
topic: "hello",
qos: 1
}
];

client.subscribe({
subscriptions: subscriptions,
messageId: 42
});
});
});

it("should receive all messages at QoS 0 if a subscription is done with QoS 0", function(done) {
buildAndConnect(done, function(client) {

client.once("publish", function(packet) {
expect(packet.qos).to.be.equal(0);
client.disconnect();
});

client.on("suback", function(packet) {
client.publish({
topic: "hello",
qos: 1,
messageId: 24
});
});

var subscriptions = [{
topic: "hello",
qos: 0
}
];

client.subscribe({
subscriptions: subscriptions,
messageId: 42
});
});
});

it("should support will message", function(done) {

async.waterfall([
Expand Down

0 comments on commit 8e377ac

Please sign in to comment.