Skip to content

Commit

Permalink
fix(resubscribe): message id allocate twice (#1337)
Browse files Browse the repository at this point in the history
* fix: messageeId

* Fix messageId allocate twice on deliver.

resubscribe is out of MQTT spec. It is MQTT.js expansion.
On connect sequence, the following three steps are defined by the MQTT Spec.

1. The client sends CONNECT to the broker with CleanStart:false
2. The broker sends CONNACK to the client with SessionPresent:true if
session exists
3. The client re-sends in-flight PUBLISH and PUBREL messages

resubscribe was processed between the step 2 and step 3.
It's too early. The resubscribe might allocate messageId that is the
same as PUBLISH or PUBREL packet. It is not good.

So I moved resubscribe process to after the step 3.

* Removed invalid fallback code.

* Stored CONNACK packet instead of sessionPresent.

Co-authored-by: Yoseph Maguire <[email protected]>
  • Loading branch information
redboltz and YoDaMa authored Oct 11, 2021
1 parent e3e15c3 commit 7466819
Showing 1 changed file with 6 additions and 8 deletions.
14 changes: 6 additions & 8 deletions lib/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,7 @@ function MqttClient (streamBuilder, options) {
var packet = null

if (!entry) {
that._resubscribe()
return
}

Expand All @@ -343,10 +344,7 @@ function MqttClient (streamBuilder, options) {
var send = true
if (packet.messageId && packet.messageId !== 0) {
if (!that.messageIdProvider.register(packet.messageId)) {
packet.messageeId = that.messageIdProvider.allocate()
if (packet.messageId === null) {
send = false
}
send = false
}
}
if (send) {
Expand All @@ -360,7 +358,7 @@ function MqttClient (streamBuilder, options) {
}
)
} else {
debug('messageId: %d has already used.', packet.messageId)
debug('messageId: %d has already used. The message is skipped and removed.', packet.messageId)
deliver()
}
}
Expand Down Expand Up @@ -1674,11 +1672,11 @@ MqttClient.prototype.getLastMessageId = function () {
* _resubscribe
* @api private
*/
MqttClient.prototype._resubscribe = function (connack) {
MqttClient.prototype._resubscribe = function () {
debug('_resubscribe')
var _resubscribeTopicsKeys = Object.keys(this._resubscribeTopics)
if (!this._firstConnection &&
(this.options.clean || (this.options.protocolVersion === 5 && !connack.sessionPresent)) &&
(this.options.clean || (this.options.protocolVersion === 5 && !this.connackPacket.sessionPresent)) &&
_resubscribeTopicsKeys.length > 0) {
if (this.options.resubscribe) {
if (this.options.protocolVersion === 5) {
Expand Down Expand Up @@ -1714,9 +1712,9 @@ MqttClient.prototype._onConnect = function (packet) {

var that = this

this.connackPacket = packet
this.messageIdProvider.clear()
this._setupPingTimer()
this._resubscribe(packet)

this.connected = true

Expand Down

0 comments on commit 7466819

Please sign in to comment.