diff --git a/lib/client.js b/lib/client.js index b4005cad6..40f7e54d5 100644 --- a/lib/client.js +++ b/lib/client.js @@ -94,6 +94,9 @@ function MqttClient (streamBuilder, options) { // map of subscribed topics to support reconnection this._resubscribeTopics = {} + // map of a subscribe messageId and a topic + this.messageIdToTopic = {} + // Ping timer, setup in _setupPingTimer this.pingTimer = null // Is the client connected? @@ -492,11 +495,14 @@ MqttClient.prototype.subscribe = function () { // subscriptions to resubscribe to in case of disconnect if (this.options.resubscribe) { + var topics = [] subs.forEach(function (sub) { if (that.options.reconnectPeriod > 0) { that._resubscribeTopics[sub.topic] = sub.qos + topics.push(sub.topic) } }) + that.messageIdToTopic[packet.messageId] = topics } this.outgoing[packet.messageId] = function (err, packet) { @@ -939,6 +945,15 @@ MqttClient.prototype._handleAck = function (packet) { break case 'suback': delete this.outgoing[mid] + if (packet.granted.length === 1 && (packet.granted[0] & 0x80) !== 0) { + // suback with Failure status + var topics = this.messageIdToTopic[mid] + if (topics) { + topics.forEach(function (topic) { + delete that._resubscribeTopics[topic] + }) + } + } cb(null, packet) break case 'unsuback': diff --git a/test/abstract_client.js b/test/abstract_client.js index 8b1220abc..444a8fc9c 100644 --- a/test/abstract_client.js +++ b/test/abstract_client.js @@ -7,6 +7,8 @@ var should = require('should') var sinon = require('sinon') var mqtt = require('../') var xtend = require('xtend') +var Server = require('./server') +var port = 9876 module.exports = function (server, config) { function connect (opts) { @@ -1739,6 +1741,57 @@ module.exports = function (server, config) { }) }) + it('should not resubscribe when reconnecting if suback is error', function (done) { + var tryReconnect = true + var reconnectEvent = false + var server2 = new Server(function (c) { + c.on('connect', function (packet) { + c.connack({returnCode: 0}) + }) + c.on('subscribe', function (packet) { + c.suback({ + messageId: packet.messageId, + granted: packet.subscriptions.map(function (e) { + return e.qos | 0x80 + }) + }) + c.pubrel({ messageId: Math.floor(Math.random() * 9000) + 1000 }) + }) + }) + + server2.listen(port + 49, function () { + var client = mqtt.connect({ + port: port + 49, + host: 'localhost', + reconnectPeriod: 100 + }) + + client.on('reconnect', function () { + reconnectEvent = true + }) + + client.on('connect', function () { + if (tryReconnect) { + client.subscribe('hello', function () { + client.stream.end() + + server.once('client', function (serverClient) { + serverClient.on('subscribe', function () { + should.fail() + }) + }) + }) + tryReconnect = false + } else { + reconnectEvent.should.equal(true) + should(Object.keys(client._resubscribeTopics).length).be.equal(0) + server2.close() + done() + } + }) + }) + }) + context('with alternate server client', function () { var cachedClientListeners