Skip to content
This repository has been archived by the owner on Feb 11, 2020. It is now read-only.

Commit

Permalink
Merge pull request #26 from davedoesdev/pass_options_to_backend
Browse files Browse the repository at this point in the history
Pass QoS level to the backing MQTTAscoltatore in the tree scenario.
  • Loading branch information
mcollina committed May 24, 2013
2 parents 70afaf3 + f41c4f8 commit 9fa4c7a
Show file tree
Hide file tree
Showing 3 changed files with 130 additions and 29 deletions.
70 changes: 42 additions & 28 deletions lib/server.js
Original file line number Diff line number Diff line change
Expand Up @@ -155,15 +155,17 @@ Server.prototype.close = function(callback) {
that.closeConn(that.clients[id], cb);
};
}), function() {
that.once("closed", callback);
try {
that.server.close(function() {
debug("closed");
that.emit("closed");
});
} catch (exception) {
callback(exception);
}
that.ascoltatore.close(function () {
that.once("closed", callback);
try {
that.server.close(function() {
debug("closed");
that.emit("closed");
});
} catch (exception) {
callback(exception);
}
});
});
};

Expand Down Expand Up @@ -219,15 +221,16 @@ Server.prototype.serve = function(client) {
}
};

var forward = function(topic, payload, retry) {
var forward = function(topic, payload, options) {
debug("delivering message on " + topic + " to " + client.id);

var qos = client.subscriptions[topic];
var qos = Math.min((options && options.qos) || 0,
client.subscriptions[topic]);
var packet = {
topic: topic,
payload: payload,
qos: qos,
messageId: client.nextId++
messageId: (options && options.messageId) || client.nextId++
};

actualSend(packet, 0);
Expand Down Expand Up @@ -327,11 +330,15 @@ Server.prototype.serve = function(client) {
return;
}

that.ascoltatore.subscribe(s.topic.replace("#", "*"), forward, function() {
debug("subscribed " + client.id + " to " + s.topic);
client.subscriptions[s.topic] = s.qos;
cb();
});
that.ascoltatore.subscribe(
s.topic.replace("#", "*"),
{ qos: s.qos },
forward,
function() {
debug("subscribed " + client.id + " to " + s.topic);
client.subscriptions[s.topic] = s.qos;
cb();
});
});
};
}), function(err) {
Expand All @@ -353,17 +360,21 @@ Server.prototype.serve = function(client) {
return;
}

that.ascoltatore.publish(packet.topic, packet.payload, function() {
debug("client " + client.id + " published packet to topic " + packet.topic);

if (packet.qos === 1) {
client.puback({
messageId: packet.messageId
});
}
that.ascoltatore.publish(
packet.topic,
packet.payload,
{ qos: packet.qos, clientId: client.id, messageId: packet.messageId },
function() {
debug("client " + client.id + " published packet to topic " + packet.topic);

if (packet.qos === 1) {
client.puback({
messageId: packet.messageId
});
}

that.emit("published", packet, client);
});
that.emit("published", packet, client);
});
});
});

Expand All @@ -386,7 +397,10 @@ Server.prototype.serve = function(client) {

if (client.will) {
debug("delivering last will for client " + client.id + " to topic " + client.will.topic);
that.ascoltatore.publish(client.will.topic, client.will.payload);
that.ascoltatore.publish(
client.will.topic,
client.will.payload,
{ qos: client.will.qos, clientId: client.id });
}

unsubAndClose();
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
"dependencies": {
"mqtt": "~0.2.6",
"async": "~0.2.4",
"ascoltatori": "~0.4.1",
"ascoltatori": "~0.5.0",
"debug": "~0.7.2",
"commander": "~1.1.1",
"minimatch": "~0.2.11"
Expand Down
87 changes: 87 additions & 0 deletions test/server_spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -688,6 +688,93 @@ describe("mosca.Server", function() {
});
});

client.on("suback", function(packet) {
client.publish({
topic: "hello",
qos: 1,
messageId: 24
});
});

var subscriptions = [{
topic: "hello",
qos: 1
}
];

client.subscribe({
subscriptions: subscriptions,
messageId: 42
});
});
});

it("should receive published messageId", function(done) {
buildAndConnect(done, function(client) {

client.once("publish", function(packet) {
expect(packet.messageId).to.be.equal(24);
client.disconnect();
});

client.on("suback", function(packet) {
client.publish({
topic: "hello",
qos: 1,
messageId: 24
});
});

var subscriptions = [{
topic: "hello",
qos: 1
}
];

client.subscribe({
subscriptions: subscriptions,
messageId: 42
});
});
});

it("should receive all messages at QoS 0 if a subscription is done with QoS 0", function(done) {
buildAndConnect(done, function(client) {

client.once("publish", function(packet) {
expect(packet.qos).to.be.equal(0);
client.disconnect();
});

client.on("suback", function(packet) {
client.publish({
topic: "hello",
qos: 1,
messageId: 24
});
});

var subscriptions = [{
topic: "hello",
qos: 0
}
];

client.subscribe({
subscriptions: subscriptions,
messageId: 42
});
});
});

it("should receive at QoS 0 all messages published at QoS 0 even if subscribed at QoS 1", function(done) {
buildAndConnect(done, function(client) {

client.once("publish", function(packet) {
expect(packet.qos).to.be.equal(0);
client.disconnect();
});

client.on("suback", function(packet) {
client.publish({
topic: "hello",
Expand Down

0 comments on commit 9fa4c7a

Please sign in to comment.