Skip to content

Commit

Permalink
fix: keepalive issues (#1855)
Browse files Browse the repository at this point in the history
* fix: keepalive issues

* fix: typo

* fix: add missing shift

* fix: avoid race conditions

* fix: improve tests
  • Loading branch information
robertsLando authored Apr 30, 2024
1 parent b207984 commit 4f242f4
Show file tree
Hide file tree
Showing 4 changed files with 133 additions and 65 deletions.
36 changes: 19 additions & 17 deletions example.ts
Original file line number Diff line number Diff line change
@@ -1,24 +1,25 @@
import mqtt from '.'
import mqtt from './src/index'

const client = mqtt.connect('mqtt://test.mosquitto.org', {
const client = mqtt.connect('mqtts://test.mosquitto.org', {
keepalive: 10,
port: 8883,
reconnectPeriod: 15000,
rejectUnauthorized: false,
})

const testTopic = 'presence'
const randomNumber = Math.floor(Math.random() * 1000)

const testTopic = `presence_${randomNumber.toString()}`

function publish() {
client.publish(
testTopic,
`Hello mqtt ${new Date().toISOString()}`,
(err2) => {
if (!err2) {
console.log('message published')
} else {
console.error(err2)
}
},
)
const msg = `Hello mqtt ${new Date().toISOString()}`
client.publish(testTopic, msg, { qos: 1 }, (err2) => {
if (!err2) {
console.log('message published')
} else {
console.error(err2)
}
})
}

client.subscribe(testTopic, (err) => {
Expand All @@ -31,11 +32,12 @@ client.subscribe(testTopic, (err) => {

client.on('message', (topic, message) => {
console.log('received message "%s" from topic "%s"', message, topic)
setTimeout(() => {
publish()
}, 2000)
})

setInterval(() => {
publish()
}, 2000)

client.on('error', (err) => {
console.error(err)
})
Expand Down
58 changes: 35 additions & 23 deletions src/lib/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -424,8 +424,6 @@ export default class MqttClient extends TypedEventEmitter<MqttClientEventCallbac

public messageIdProvider: IMessageIdProvider

public pingResp: boolean

public outgoing: Record<
number,
{ volatile: boolean; cb: (err: Error, packet?: Packet) => void }
Expand All @@ -435,6 +433,9 @@ export default class MqttClient extends TypedEventEmitter<MqttClientEventCallbac

public noop: (error?: any) => void

/** Timestamp of last received control packet */
public pingResp: number

public pingTimer: PingTimer

/**
Expand Down Expand Up @@ -659,11 +660,7 @@ export default class MqttClient extends TypedEventEmitter<MqttClientEventCallbac
this.log('close :: clearing connackTimer')
clearTimeout(this.connackTimer)

this.log('close :: destroy ping timer')
if (this.pingTimer) {
this.pingTimer.destroy()
this.pingTimer = null
}
this._destroyPingTimer()

if (this.topicAliasRecv) {
this.topicAliasRecv.clear()
Expand Down Expand Up @@ -722,6 +719,7 @@ export default class MqttClient extends TypedEventEmitter<MqttClientEventCallbac
public connect() {
const writable = new Writable()
const parser = mqttPacket.parser(this.options)

let completeParse = null
const packets = []

Expand Down Expand Up @@ -1782,11 +1780,7 @@ export default class MqttClient extends TypedEventEmitter<MqttClientEventCallbac
this._setupReconnect()
}

if (this.pingTimer) {
this.log('_cleanUp :: destroy pingTimer')
this.pingTimer.destroy()
this.pingTimer = null
}
this._destroyPingTimer()

if (done && !this.connected) {
this.log(
Expand Down Expand Up @@ -1924,9 +1918,6 @@ export default class MqttClient extends TypedEventEmitter<MqttClientEventCallbac

this.emit('packetsend', packet)

// When writing a packet, reschedule the ping timer
this._shiftPingInterval()

this.log('_writePacket :: writing to stream')
const result = mqttPacket.writeToStream(
packet,
Expand Down Expand Up @@ -2084,18 +2075,27 @@ export default class MqttClient extends TypedEventEmitter<MqttClientEventCallbac
)

if (!this.pingTimer && this.options.keepalive) {
this.pingResp = true
this.pingTimer = new PingTimer(
this.options.keepalive,
() => {
this._checkPing()
},
this.options.timerVariant,
)
this.pingResp = Date.now()
}
}

private _destroyPingTimer() {
if (this.pingTimer) {
this.log('_destroyPingTimer :: destroying ping timer')
this.pingTimer.destroy()
this.pingTimer = null
}
}

/**
* _shiftPingInterval - reschedule the ping interval
*
* @api private
Expand All @@ -2106,23 +2106,30 @@ export default class MqttClient extends TypedEventEmitter<MqttClientEventCallbac
this.options.keepalive &&
this.options.reschedulePings
) {
this.pingTimer.reschedule()
this._reschedulePing()
}
}

/**
* Mostly needed for test purposes
*/
private _reschedulePing() {
this.log('_reschedulePing :: rescheduling ping')
this.pingTimer.reschedule()
}

/**
* _checkPing - check if a pingresp has come back, and ping the server again
*
* @api private
*/
private _checkPing() {
this.log('_checkPing :: checking ping...')
if (this.pingResp) {
this.log(
'_checkPing :: ping response received. Clearing flag and sending `pingreq`',
)
this.pingResp = false
this._sendPacket({ cmd: 'pingreq' })
// give 100ms offset to avoid ping timeout when receiving fast responses
const timeSincePing = Date.now() - this.pingResp - 100
if (timeSincePing <= this.options.keepalive * 1000) {
this.log('_checkPing :: ping response received in time')
this._sendPing()
} else {
// do a forced cleanup since socket will be in bad shape
this.emit('error', new Error('Keepalive timeout'))
Expand All @@ -2131,6 +2138,11 @@ export default class MqttClient extends TypedEventEmitter<MqttClientEventCallbac
}
}

private _sendPing() {
this.log('_sendPing :: sending pingreq')
this._sendPacket({ cmd: 'pingreq' })
}

/**
* _resubscribe
* @api private
Expand Down
10 changes: 9 additions & 1 deletion src/lib/handlers/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,15 @@ const handle: PacketHandler = (client, packet, done) => {
})
return client
}

// keep track of last time we received a packet (for keepalive mechanism)
client.pingResp = Date.now()

// do not shift on pingresp otherwise we would skip the pingreq sending
if (packet.cmd !== 'pingresp') {
client['_shiftPingInterval']()
}

client.log('_handlePacket :: emitting packetreceive')
client.emit('packetreceive', packet)

Expand Down Expand Up @@ -49,7 +58,6 @@ const handle: PacketHandler = (client, packet, done) => {
break
case 'pingresp':
// this will be checked in _checkPing client method every keepalive interval
client.pingResp = true
done()
break
case 'disconnect':
Expand Down
94 changes: 70 additions & 24 deletions test/abstract_client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
* Testing dependencies
*/
import { assert } from 'chai'
import sinon from 'sinon'
import sinon, { SinonSpy } from 'sinon'
import fs from 'fs'
import levelStore from 'mqtt-level-store'
import Store from '../src/lib/store'
Expand Down Expand Up @@ -93,11 +93,13 @@ export default function abstractTest(server, config, ports) {

client.once('close', () => {
assert.notExists(client.pingTimer)

client.end(true, (err) => done(err))
})

client.once('connect', () => {
assert.exists(client.pingTimer)

client.stream.end()
})
})
Expand Down Expand Up @@ -1980,6 +1982,12 @@ export default function abstractTest(server, config, ports) {
const spy = sinon.spy()
client['_checkPing'] = spy

client.on('error', (err) => {
client.end(true, () => {
done(err)
})
})

client.once('connect', () => {
clock.tick(interval * 1000)
assert.strictEqual(spy.callCount, 1)
Expand All @@ -1994,7 +2002,7 @@ export default function abstractTest(server, config, ports) {
})
})

it('should not checkPing if publishing at a higher rate than keepalive', function _test(t, done) {
it('should not shift ping on publish', function _test(t, done) {
const intervalMs = 3000
const client = connect({ keepalive: intervalMs / 1000 })

Expand All @@ -2003,35 +2011,70 @@ export default function abstractTest(server, config, ports) {

client.once('connect', () => {
client.publish('foo', 'bar')
clock.tick(intervalMs - 1)
clock.tick(intervalMs)
client.publish('foo', 'bar')
clock.tick(2)
clock.tick(intervalMs)

assert.strictEqual(spy.callCount, 0)
assert.strictEqual(spy.callCount, 2)
client.end(true, done)
})
})

it('should checkPing if publishing at a higher rate than keepalive and reschedulePings===false', function _test(t, done) {
const intervalMs = 3000
const client = connect({
keepalive: intervalMs / 1000,
reschedulePings: false,
})
const checkPing = (reschedulePings: boolean) => {
it(`should checkPing if publishing at a higher rate than keepalive and reschedulePings===${reschedulePings}`, function _test(t, done) {
const intervalMs = 3000
const client = connect({
keepalive: intervalMs / 1000,
reschedulePings,
})

const spy = sinon.spy()
client['_checkPing'] = spy
const spyReschedule = sinon.spy(
client,
'_reschedulePing' as any,
)

client.once('connect', () => {
client.publish('foo', 'bar')
clock.tick(intervalMs - 1)
client.publish('foo', 'bar')
clock.tick(2)
let received = 0

assert.strictEqual(spy.callCount, 1)
client.end(true, done)
client.on('packetreceive', (packet) => {
if (packet.cmd === 'puback') {
clock.tick(intervalMs)

received++

if (reschedulePings) {
assert.strictEqual(
spyReschedule.callCount,
received,
)
} else {
assert.strictEqual(spyReschedule.callCount, 0)
}

if (received === 2) {
client.end(true, done)
}
}
})

server.once('client', (serverClient) => {
serverClient.on('publish', () => {
// needed to trigger the setImmediate inside server publish listener and send suback
clock.tick(1)
})
})

client.once('connect', () => {
// reset call count (it's called also on connack)
spyReschedule.resetHistory()
// use qos1 so the puback is received (to reschedule ping)
client.publish('foo', 'bar', { qos: 1 })
client.publish('foo', 'bar', { qos: 1 })
})
})
})
}

checkPing(true)
checkPing(false)
})

describe('pinging', () => {
Expand Down Expand Up @@ -2067,13 +2110,16 @@ export default function abstractTest(server, config, ports) {
}
})

let client = connect({
const options: IClientOptions = {
keepalive: 60,
reconnectPeriod: 5000,
})
}

let client = connect()

client.once('connect', () => {
client.pingResp = false
// when using fake timers Date.now() counts from 0: https://sinonjs.org/releases/latest/fake-timers/
client.pingResp = -options.keepalive * 1000

client.once('error', (err) => {
assert.equal(err.message, 'Keepalive timeout')
Expand Down

0 comments on commit 4f242f4

Please sign in to comment.