Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: keepalive issues #1855

Merged
merged 5 commits into from
Apr 30, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 16 additions & 16 deletions example.ts
Original file line number Diff line number Diff line change
@@ -1,24 +1,23 @@
import mqtt from '.'

const client = mqtt.connect('mqtt://test.mosquitto.org', {
keepalive: 10,
const client = mqtt.connect('mqtts://test.mosquitto.org', {
keepalive: 60,
port: 8883,
reconnectPeriod: 15000,
rejectUnauthorized: false,
})

const testTopic = 'presence'

function publish() {
client.publish(
testTopic,
`Hello mqtt ${new Date().toISOString()}`,
(err2) => {
if (!err2) {
console.log('message published')
} else {
console.error(err2)
}
},
)
const msg = `Hello mqtt ${new Date().toISOString()}`
client.publish(testTopic, msg, { qos: 1 }, (err2) => {
if (!err2) {
console.log('message published')
} else {
console.error(err2)
}
})
}

client.subscribe(testTopic, (err) => {
Expand All @@ -31,11 +30,12 @@ client.subscribe(testTopic, (err) => {

client.on('message', (topic, message) => {
console.log('received message "%s" from topic "%s"', message, topic)
setTimeout(() => {
publish()
}, 2000)
})

setInterval(() => {
publish()
}, 2000)

client.on('error', (err) => {
console.error(err)
})
Expand Down
48 changes: 26 additions & 22 deletions src/lib/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -424,8 +424,6 @@ export default class MqttClient extends TypedEventEmitter<MqttClientEventCallbac

public messageIdProvider: IMessageIdProvider

public pingResp: boolean

public outgoing: Record<
number,
{ volatile: boolean; cb: (err: Error, packet?: Packet) => void }
Expand All @@ -435,6 +433,9 @@ export default class MqttClient extends TypedEventEmitter<MqttClientEventCallbac

public noop: (error?: any) => void

/** Timestamp of last received control packet */
public pingResp: number

public pingTimer: PingTimer

/**
Expand Down Expand Up @@ -659,11 +660,7 @@ export default class MqttClient extends TypedEventEmitter<MqttClientEventCallbac
this.log('close :: clearing connackTimer')
clearTimeout(this.connackTimer)

this.log('close :: destroy ping timer')
if (this.pingTimer) {
this.pingTimer.destroy()
this.pingTimer = null
}
this._destroyPingTimer()

if (this.topicAliasRecv) {
this.topicAliasRecv.clear()
Expand Down Expand Up @@ -722,6 +719,7 @@ export default class MqttClient extends TypedEventEmitter<MqttClientEventCallbac
public connect() {
const writable = new Writable()
const parser = mqttPacket.parser(this.options)

let completeParse = null
const packets = []

Expand Down Expand Up @@ -1782,11 +1780,7 @@ export default class MqttClient extends TypedEventEmitter<MqttClientEventCallbac
this._setupReconnect()
}

if (this.pingTimer) {
this.log('_cleanUp :: destroy pingTimer')
this.pingTimer.destroy()
this.pingTimer = null
}
this._destroyPingTimer()

if (done && !this.connected) {
this.log(
Expand Down Expand Up @@ -1924,9 +1918,6 @@ export default class MqttClient extends TypedEventEmitter<MqttClientEventCallbac

this.emit('packetsend', packet)

// When writing a packet, reschedule the ping timer
this._shiftPingInterval()
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The root cause of the problem


this.log('_writePacket :: writing to stream')
const result = mqttPacket.writeToStream(
packet,
Expand Down Expand Up @@ -2084,18 +2075,27 @@ export default class MqttClient extends TypedEventEmitter<MqttClientEventCallbac
)

if (!this.pingTimer && this.options.keepalive) {
this.pingResp = true
this.pingTimer = new PingTimer(
this.options.keepalive,
() => {
this._checkPing()
},
this.options.timerVariant,
)
this.pingResp = Date.now()
}
}

private _destroyPingTimer() {
if (this.pingTimer) {
this.log('_destroyPingTimer :: destroying ping timer')
this.pingTimer.destroy()
this.pingTimer = null
}
}

/**

* _shiftPingInterval - reschedule the ping interval
*
* @api private
Expand All @@ -2117,12 +2117,11 @@ export default class MqttClient extends TypedEventEmitter<MqttClientEventCallbac
*/
private _checkPing() {
this.log('_checkPing :: checking ping...')
if (this.pingResp) {
this.log(
'_checkPing :: ping response received. Clearing flag and sending `pingreq`',
)
this.pingResp = false
this._sendPacket({ cmd: 'pingreq' })
// give 100ms offset to avoid ping timeout when receiving fast responses
const timeSincePing = Date.now() - this.pingResp - 100
if (timeSincePing <= this.options.keepalive * 1000) {
this.log('_checkPing :: ping response received in time')
this._sendPing()
} else {
// do a forced cleanup since socket will be in bad shape
this.emit('error', new Error('Keepalive timeout'))
Expand All @@ -2131,6 +2130,11 @@ export default class MqttClient extends TypedEventEmitter<MqttClientEventCallbac
}
}

private _sendPing() {
this.log('_sendPing :: sending pingreq')
this._sendPacket({ cmd: 'pingreq' })
}

/**
* _resubscribe
* @api private
Expand Down
10 changes: 9 additions & 1 deletion src/lib/handlers/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,18 @@ const handle: PacketHandler = (client, packet, done) => {
})
return client
}

client.log('_handlePacket :: emitting packetreceive')
client.emit('packetreceive', packet)

// keep track of last time we received a packet (for keepalive mechanism)
client.pingResp = Date.now()

// do not shift on pingresp otherwise we would skip the pingreq sending
if (packet.cmd !== 'pingresp') {
client['_shiftPingInterval']()
}

switch (packet.cmd) {
case 'publish':
handlePublish(client, packet, done)
Expand All @@ -49,7 +58,6 @@ const handle: PacketHandler = (client, packet, done) => {
break
case 'pingresp':
// this will be checked in _checkPing client method every keepalive interval
client.pingResp = true
done()
break
case 'disconnect':
Expand Down
15 changes: 10 additions & 5 deletions test/abstract_client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -93,11 +93,13 @@ export default function abstractTest(server, config, ports) {

client.once('close', () => {
assert.notExists(client.pingTimer)

client.end(true, (err) => done(err))
})

client.once('connect', () => {
assert.exists(client.pingTimer)

client.stream.end()
})
})
Expand Down Expand Up @@ -1994,7 +1996,7 @@ export default function abstractTest(server, config, ports) {
})
})

it('should not checkPing if publishing at a higher rate than keepalive', function _test(t, done) {
it('should not shift ping on publish', function _test(t, done) {
const intervalMs = 3000
const client = connect({ keepalive: intervalMs / 1000 })

Expand All @@ -2007,7 +2009,7 @@ export default function abstractTest(server, config, ports) {
client.publish('foo', 'bar')
clock.tick(2)

assert.strictEqual(spy.callCount, 0)
assert.strictEqual(spy.callCount, 1)
client.end(true, done)
})
})
Expand Down Expand Up @@ -2067,13 +2069,16 @@ export default function abstractTest(server, config, ports) {
}
})

let client = connect({
const options: IClientOptions = {
keepalive: 60,
reconnectPeriod: 5000,
})
}

let client = connect()

client.once('connect', () => {
client.pingResp = false
// when using fake timers Date.now() counts from 0: https://sinonjs.org/releases/latest/fake-timers/
client.pingResp = -options.keepalive * 1000

client.once('error', (err) => {
assert.equal(err.message, 'Keepalive timeout')
Expand Down
Loading