diff --git a/DEVELOPMENT.md b/DEVELOPMENT.md index 6505d666d..b9fd579d9 100644 --- a/DEVELOPMENT.md +++ b/DEVELOPMENT.md @@ -19,10 +19,9 @@ npm test This will run both `browser` and `node` tests. - ### Running specific tests -For example, you can run `node -r esbuild-register --test test/pingTimer.ts` +For example, you can run `node -r esbuild-register --test test/keepaliveManager.ts` ### Browser diff --git a/example.ts b/example.ts index 363b979f7..499f74107 100644 --- a/example.ts +++ b/example.ts @@ -1,8 +1,8 @@ import mqtt from './src/index' -const client = mqtt.connect('mqtts://test.mosquitto.org', { - keepalive: 10, - port: 8883, +const client = mqtt.connect('mqtt://broker.hivemq.com', { + keepalive: 3, + port: 1883, reconnectPeriod: 15000, rejectUnauthorized: false, }) diff --git a/src/lib/KeepaliveManager.ts b/src/lib/KeepaliveManager.ts new file mode 100644 index 000000000..14daf1eb3 --- /dev/null +++ b/src/lib/KeepaliveManager.ts @@ -0,0 +1,107 @@ +import type MqttClient from './client' +import getTimer, { type Timer } from './get-timer' +import type { TimerVariant } from './shared' + +export default class KeepaliveManager { + private _keepalive: number + + private timerId: number + + private timer: Timer + + private destroyed = false + + private counter: number + + private client: MqttClient + + private _keepaliveTimeoutTimestamp: number + + private _intervalEvery: number + + /** Timestamp of next keepalive timeout */ + get keepaliveTimeoutTimestamp() { + return this._keepaliveTimeoutTimestamp + } + + /** Milliseconds of the actual interval */ + get intervalEvery() { + return this._intervalEvery + } + + get keepalive() { + return this._keepalive + } + + constructor(client: MqttClient, variant: TimerVariant) { + this.client = client + this.timer = getTimer(variant) + this.setKeepalive(client.options.keepalive) + } + + private clear() { + if (this.timerId) { + this.timer.clear(this.timerId) + this.timerId = null + } + } + + /** Change the keepalive */ + setKeepalive(value: number) { + // keepalive is in seconds + value *= 1000 + + if ( + // eslint-disable-next-line no-restricted-globals + isNaN(value) || + value <= 0 || + value > 2147483647 + ) { + throw new Error( + `Keepalive value must be an integer between 0 and 2147483647. Provided value is ${value}`, + ) + } + + this._keepalive = value + + this.reschedule() + + this.client['log'](`KeepaliveManager: set keepalive to ${value}ms`) + } + + destroy() { + this.clear() + this.destroyed = true + } + + reschedule() { + if (this.destroyed) { + return + } + + this.clear() + this.counter = 0 + + // https://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Figure_3.5_Keep + const keepAliveTimeout = Math.ceil(this._keepalive * 1.5) + + this._keepaliveTimeoutTimestamp = Date.now() + keepAliveTimeout + this._intervalEvery = Math.ceil(this._keepalive / 2) + + this.timerId = this.timer.set(() => { + // this should never happen, but just in case + if (this.destroyed) { + return + } + + this.counter += 1 + + // after keepalive seconds, send a pingreq + if (this.counter === 2) { + this.client.sendPing() + } else if (this.counter > 2) { + this.client.onKeepaliveTimeout() + } + }, this._intervalEvery) + } +} diff --git a/src/lib/PingTimer.ts b/src/lib/PingTimer.ts deleted file mode 100644 index be18f45d9..000000000 --- a/src/lib/PingTimer.ts +++ /dev/null @@ -1,56 +0,0 @@ -import getTimer, { type Timer } from './get-timer' -import type { TimerVariant } from './shared' - -export default class PingTimer { - private keepalive: number - - private timerId: number - - private timer: Timer - - private checkPing: () => void - - private destroyed = false - - constructor( - keepalive: number, - checkPing: () => void, - variant: TimerVariant, - ) { - this.keepalive = keepalive * 1000 - this.checkPing = checkPing - this.timer = getTimer(variant) - this.reschedule() - } - - private clear() { - if (this.timerId) { - this.timer.clear(this.timerId) - this.timerId = null - } - } - - destroy() { - this.clear() - this.destroyed = true - } - - reschedule() { - if (this.destroyed) { - return - } - - this.clear() - this.timerId = this.timer.set(() => { - // this should never happen, but just in case - if (this.destroyed) { - return - } - - this.checkPing() - // this must be called after `checkPing` otherwise in case `destroy` - // is called in `checkPing` the timer would be rescheduled anyway - this.reschedule() - }, this.keepalive) - } -} diff --git a/src/lib/client.ts b/src/lib/client.ts index 153b4a5c5..192b1c1ed 100644 --- a/src/lib/client.ts +++ b/src/lib/client.ts @@ -39,7 +39,7 @@ import { } from './shared' import TopicAliasSend from './topic-alias-send' import { TypedEventEmitter } from './TypedEmitter' -import PingTimer from './PingTimer' +import KeepaliveManager from './KeepaliveManager' import isBrowser, { isWebWorker } from './is-browser' const setImmediate = @@ -433,10 +433,7 @@ export default class MqttClient extends TypedEventEmitter void - /** Timestamp of last received control packet */ - public pingResp: number - - public pingTimer: PingTimer + public keepaliveManager: KeepaliveManager /** * The connection to the Broker. In browsers env this also have `socket` property @@ -572,8 +569,8 @@ export default class MqttClient extends TypedEventEmitter { - this._checkPing() - }, + if (!this.keepaliveManager && this.options.keepalive) { + this.keepaliveManager = new KeepaliveManager( + this, this.options.timerVariant, ) - this.pingResp = Date.now() } } - private _destroyPingTimer() { - if (this.pingTimer) { - this.log('_destroyPingTimer :: destroying ping timer') - this.pingTimer.destroy() - this.pingTimer = null + private _destroyKeepaliveManager() { + if (this.keepaliveManager) { + this.log('_destroyKeepaliveManager :: destroying keepalive manager') + this.keepaliveManager.destroy() + this.keepaliveManager = null } } /** - - * _shiftPingInterval - reschedule the ping interval - * - * @api private + * Reschedule the ping interval */ - private _shiftPingInterval() { + public reschedulePing() { if ( - this.pingTimer && + this.keepaliveManager && this.options.keepalive && this.options.reschedulePings ) { @@ -2115,34 +2103,20 @@ export default class MqttClient extends TypedEventEmitter setTimeout(func, time), - clear: (timerId) => clearTimeout(timerId), + set: (func, time) => setInterval(func, time), + clear: (timerId) => clearInterval(timerId), } const getTimer = (variant: TimerVariant): Timer => { diff --git a/src/lib/handlers/connack.ts b/src/lib/handlers/connack.ts index a723d3d0d..f5020c94b 100644 --- a/src/lib/handlers/connack.ts +++ b/src/lib/handlers/connack.ts @@ -29,8 +29,8 @@ const handleConnack: PacketHandler = (client, packet: IConnackPacket) => { } if (packet.properties.serverKeepAlive && options.keepalive) { options.keepalive = packet.properties.serverKeepAlive - client['_shiftPingInterval']() } + if (packet.properties.maximumPacketSize) { if (!options.properties) { options.properties = {} diff --git a/src/lib/handlers/index.ts b/src/lib/handlers/index.ts index 71f2008f5..b2bfe3fac 100644 --- a/src/lib/handlers/index.ts +++ b/src/lib/handlers/index.ts @@ -22,19 +22,12 @@ const handle: PacketHandler = (client, packet, done) => { return client } - // keep track of last time we received a packet (for keepalive mechanism) - client.pingResp = Date.now() - - // do not shift on pingresp otherwise we would skip the pingreq sending - if (!['pingresp', 'publish'].includes(packet.cmd)) { - client['_shiftPingInterval']() - } - client.log('_handlePacket :: emitting packetreceive') client.emit('packetreceive', packet) switch (packet.cmd) { case 'publish': + // DO NOT SHIFT PING HERE, this would lead to https://github.com/mqttjs/MQTT.js/issues/1861 handlePublish(client, packet, done) break case 'puback': @@ -42,22 +35,28 @@ const handle: PacketHandler = (client, packet, done) => { case 'pubcomp': case 'suback': case 'unsuback': + client.reschedulePing() handleAck(client, packet) done() break case 'pubrel': + client.reschedulePing() handlePubrel(client, packet, done) break case 'connack': + // no need to reschedule ping here as keepalive manager is created after successll connect + // (when onConnect is called at the end of handleConnack) handleConnack(client, packet) done() break case 'auth': + client.reschedulePing() handleAuth(client, packet) done() break case 'pingresp': - // this will be checked in _checkPing client method every keepalive interval + client.log('_handlePacket :: received pingresp') + client.reschedulePing() done() break case 'disconnect': diff --git a/src/mqtt.ts b/src/mqtt.ts index 60bca03a7..e3a79abeb 100644 --- a/src/mqtt.ts +++ b/src/mqtt.ts @@ -9,7 +9,7 @@ import DefaultMessageIdProvider from './lib/default-message-id-provider' import UniqueMessageIdProvider from './lib/unique-message-id-provider' import Store, { IStore } from './lib/store' import connect, { connectAsync } from './lib/connect' -import PingTimer from './lib/PingTimer' +import KeepaliveManager from './lib/KeepaliveManager' export const Client = MqttClient export { @@ -20,7 +20,7 @@ export { DefaultMessageIdProvider, UniqueMessageIdProvider, IStore, - PingTimer, + KeepaliveManager, } export * from './lib/client' export * from './lib/shared' diff --git a/test/abstract_client.ts b/test/abstract_client.ts index 0cbc52c55..33fdc1f97 100644 --- a/test/abstract_client.ts +++ b/test/abstract_client.ts @@ -88,17 +88,17 @@ export default function abstractTest(server, config, ports) { }) }) - it('should stop ping timer if stream closes', function _test(t, done) { + it('should destroy keepalive manager if stream closes', function _test(t, done) { const client = connect() client.once('close', () => { - assert.notExists(client.pingTimer) + assert.notExists(client.keepaliveManager) client.end(true, (err) => done(err)) }) client.once('connect', () => { - assert.exists(client.pingTimer) + assert.exists(client.keepaliveManager) client.stream.end() }) @@ -210,13 +210,13 @@ export default function abstractTest(server, config, ports) { }) }) - it('should stop ping timer after end called', function _test(t, done) { + it('should destroy keepalive manager after end called', function _test(t, done) { const client = connect() client.once('connect', () => { - assert.exists(client.pingTimer) + assert.exists(client.keepaliveManager) client.end((err) => { - assert.notExists(client.pingTimer) + assert.notExists(client.keepaliveManager) done(err) }) }) @@ -1975,12 +1975,11 @@ export default function abstractTest(server, config, ports) { clock.restore() }) - it('should checkPing at keepalive interval', function _test(t, done) { - const interval = 3 - const client = connect({ keepalive: interval }) + it('should send ping at keepalive interval', function _test(t, done) { + const interval = 3000 + const client = connect({ keepalive: interval / 1000 }) - const spy = sinon.spy() - client['_checkPing'] = spy + const spy = sinon.spy(client, 'sendPing') client.on('error', (err) => { client.end(true, () => { @@ -1988,17 +1987,26 @@ export default function abstractTest(server, config, ports) { }) }) - client.once('connect', () => { - clock.tick(interval * 1000) - assert.strictEqual(spy.callCount, 1) + let pingReceived = 0 - clock.tick(interval * 1000) - assert.strictEqual(spy.callCount, 2) + client.on('packetreceive', (packet) => { + if (packet.cmd === 'pingresp') { + process.nextTick(() => { + pingReceived++ + assert.strictEqual(spy.callCount, pingReceived) - clock.tick(interval * 1000) - assert.strictEqual(spy.callCount, 3) + if (pingReceived === 3) { + client.end(true, done) + } else { + clock.tick(interval) + } + }) + clock.tick(1) + } + }) - client.end(true, done) + client.once('connect', () => { + clock.tick(interval) }) }) @@ -2049,8 +2057,10 @@ export default function abstractTest(server, config, ports) { }) }) - const checkPing = (reschedulePings: boolean) => { - it(`should checkPing if publishing at a higher rate than keepalive and reschedulePings===${reschedulePings}`, function _test(t, done) { + const reschedulePing = (reschedulePings: boolean) => { + it(`should ${ + !reschedulePings ? 'not ' : '' + }reschedule pings if publishing at a higher rate than keepalive and reschedulePings===${reschedulePings}`, function _test(t, done) { const intervalMs = 3000 const client = connect({ keepalive: intervalMs / 1000, @@ -2066,22 +2076,28 @@ export default function abstractTest(server, config, ports) { client.on('packetreceive', (packet) => { if (packet.cmd === 'puback') { - clock.tick(intervalMs) + process.nextTick(() => { + clock.tick(intervalMs) - received++ + received++ - if (reschedulePings) { - assert.strictEqual( - spyReschedule.callCount, - received, - ) - } else { - assert.strictEqual(spyReschedule.callCount, 0) - } + if (received === 2) { + if (reschedulePings) { + assert.strictEqual( + spyReschedule.callCount, + received, + ) + } else { + assert.strictEqual( + spyReschedule.callCount, + 0, + ) + } + client.end(true, done) + } + }) - if (received === 2) { - client.end(true, done) - } + clock.tick(1) } }) @@ -2102,23 +2118,23 @@ export default function abstractTest(server, config, ports) { }) } - checkPing(true) - checkPing(false) + reschedulePing(true) + reschedulePing(false) }) describe('pinging', () => { - it('should set a ping timer', function _test(t, done) { + it('should setup keepalive manager', function _test(t, done) { const client = connect({ keepalive: 3 }) client.once('connect', () => { - assert.exists(client.pingTimer) + assert.exists(client.keepaliveManager) client.end(true, done) }) }) - it('should not set a ping timer keepalive=0', function _test(t, done) { + it('should not setup keepalive manager if keepalive=0', function _test(t, done) { const client = connect({ keepalive: 0 }) client.on('connect', () => { - assert.notExists(client.pingTimer) + assert.notExists(client.keepaliveManager) client.end(true, done) }) }) @@ -2144,12 +2160,9 @@ export default function abstractTest(server, config, ports) { reconnectPeriod: 5000, } - let client = connect() + let client = connect(options) client.once('connect', () => { - // when using fake timers Date.now() counts from 0: https://sinonjs.org/releases/latest/fake-timers/ - client.pingResp = -options.keepalive * 1000 - client.once('error', (err) => { assert.equal(err.message, 'Keepalive timeout') client.once('connect', () => { @@ -2164,7 +2177,10 @@ export default function abstractTest(server, config, ports) { clock.tick(client.options.reconnectPeriod) }) - clock.tick(client.options.keepalive * 1000) + const timeoutTimestamp = + client.keepaliveManager.keepaliveTimeoutTimestamp + + clock.tick(timeoutTimestamp - Date.now()) }) }, ) @@ -2190,7 +2206,10 @@ export default function abstractTest(server, config, ports) { client.once('connect', () => { setImmediate(() => { // make keepalive check trigger - clock.tick(client.options.keepalive * 1000) + const timeoutTimestamp = + client.keepaliveManager.keepaliveTimeoutTimestamp + + clock.tick(timeoutTimestamp - Date.now()) }) client.on('packetsend', (packet) => { @@ -2206,31 +2225,6 @@ export default function abstractTest(server, config, ports) { }) }, ) - - it('should defer the next ping when sending a control packet', function _test(t, done) { - const client = connect({ keepalive: 1 }) - - client.once('connect', () => { - const spy = sinon.spy() - client['_checkPing'] = spy - - client.publish('foo', 'bar') - setTimeout(() => { - assert.strictEqual(spy.callCount, 0) - client.publish('foo', 'bar') - - setTimeout(() => { - assert.strictEqual(spy.callCount, 0) - client.publish('foo', 'bar') - - setTimeout(() => { - assert.strictEqual(spy.callCount, 0) - done() - }, 75) - }, 75) - }, 75) - }) - }) }) describe('subscribing', () => { diff --git a/test/keepaliveManager.ts b/test/keepaliveManager.ts new file mode 100644 index 000000000..60084f139 --- /dev/null +++ b/test/keepaliveManager.ts @@ -0,0 +1,108 @@ +import { afterEach, beforeEach, describe, it } from 'node:test' +import KeepaliveManager from '../src/lib/KeepaliveManager' +import { assert } from 'chai' +import { useFakeTimers, spy, mock } from 'sinon' +import { MqttClient } from 'src' + +function mockedClient(keepalive: number) { + return { + options: { + keepalive, + }, + onKeepaliveTimeout: () => {}, + sendPing: () => {}, + log: () => {}, + } as unknown as MqttClient +} + +describe('KeepaliveManager', () => { + let clock: sinon.SinonFakeTimers + beforeEach(() => { + clock = useFakeTimers() + }) + + afterEach(() => { + clock.restore() + }) + + it('should schedule and destroy', () => { + const keepalive = 10 // seconds + const client = mockedClient(keepalive) + const manager = new KeepaliveManager(client, 'auto') + + const spySendPing = spy(client, 'sendPing') + const spyTimeout = spy(client, 'onKeepaliveTimeout') + + const checksEvery = manager.intervalEvery + + assert.ok(manager['timerId'], 'timer should be created automatically') + + clock.tick(checksEvery) + assert.equal( + spySendPing.callCount, + 0, + 'should not send ping before keepalive seconds', + ) + + clock.tick(checksEvery) + assert.equal(spySendPing.callCount, 1, 'should send ping automatically') + assert.equal(spyTimeout.callCount, 0, 'should not trigger timeout') + + clock.tick(checksEvery) + assert.equal( + spyTimeout.callCount, + 1, + 'should trigger keepalive timeout after 1.5*keepalive seconds', + ) + + manager.destroy() + assert.ok( + !manager['timerId'], + 'timer should not exists after destroy()', + ) + + assert.ok( + manager['destroyed'], + 'timer should have `destroyed` set to true after destroy()', + ) + }) + + it('should reschedule', () => { + const keepalive = 10 // seconds + const manager = new KeepaliveManager(mockedClient(keepalive), 'auto') + + const checksEvery = manager.intervalEvery + + clock.tick(checksEvery) + assert.equal( + manager['counter'], + 1, + 'should increese counter on every check', + ) + manager.reschedule() + assert.equal( + manager['counter'], + 0, + 'should reset counter after reschedule', + ) + }) + + it('should validate keepalive', () => { + const manager = new KeepaliveManager(mockedClient(1), 'auto') + + assert.throw( + () => manager.setKeepalive(-1), + 'Keepalive value must be an integer between 0 and 2147483647. Provided value is -1', + ) + + assert.throw( + () => manager.setKeepalive(2147483648), + 'Keepalive value must be an integer between 0 and 2147483647. Provided value is 2147483648', + ) + + manager.setKeepalive(10) + + assert.equal(manager.keepalive, 10000) + assert.equal(manager.intervalEvery, 5000) + }) +}) diff --git a/test/pingTimer.ts b/test/pingTimer.ts deleted file mode 100644 index 7adcbc54d..000000000 --- a/test/pingTimer.ts +++ /dev/null @@ -1,65 +0,0 @@ -import { afterEach, beforeEach, describe, it } from 'node:test' -import PingTimer from '../src/lib/PingTimer' -import { assert } from 'chai' -import { useFakeTimers, spy } from 'sinon' - -describe('PingTimer', () => { - let clock: sinon.SinonFakeTimers - beforeEach(() => { - clock = useFakeTimers() - }) - - afterEach(() => { - clock.restore() - }) - - it('should schedule and destroy', () => { - const keepalive = 10 // seconds - const cb = spy() - const pingTimer = new PingTimer(keepalive, cb, 'auto') - - assert.ok(pingTimer['timerId'], 'timer should be created automatically') - - clock.tick(keepalive * 1000 + 1) - assert.equal( - cb.callCount, - 1, - 'should trigger the callback after keepalive seconds', - ) - clock.tick(keepalive * 1000 + 1) - assert.equal(cb.callCount, 2, 'should reschedule automatically') - pingTimer.destroy() - assert.ok( - !pingTimer['timerId'], - 'timer should not exists after destroy()', - ) - - assert.ok( - pingTimer['destroyed'], - 'timer should have `destroyed` set to true after destroy()', - ) - }) - - it('should not re-schedule if timer has been cleared in check ping', () => { - const keepalive = 10 // seconds - const cb = spy() - const pingTimer = new PingTimer( - keepalive, - () => { - pingTimer.destroy() - cb() - }, - 'auto', - ) - - clock.tick(keepalive * 1000 + 1) - assert.equal( - cb.callCount, - 1, - 'should trigger the callback after keepalive seconds', - ) - clock.tick(keepalive * 1000 + 1) - assert.equal(cb.callCount, 1, 'should not re-schedule') - assert.ok(!pingTimer['timerId'], 'timer should not exists') - }) -})