Skip to content

Commit

Permalink
feat: allow user to disable pre-generated write cache (#1151)
Browse files Browse the repository at this point in the history
Co-authored-by: Daniel Lando <[email protected]>
Co-authored-by: Yoshi Nagasaki <[email protected]>
  • Loading branch information
3 people authored Jun 29, 2023
1 parent 8c77eec commit 0d11888
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 4 deletions.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,8 @@ the `connect` event. Typically a `net.Socket`.
* `password`: the password required by your broker, if any
* `incomingStore`: a [Store](#store) for the incoming packets
* `outgoingStore`: a [Store](#store) for the outgoing packets
* `writeCache` : `true`, set to false to disable pre-generated write array for sending messages to the mqtt-stream and instead allocate buffers on-the-fly.
WARNING: This can affect performance.
* `queueQoSZero`: if connection is broken, queue outgoing QoS zero messages (default `true`)
* `customHandleAcks`: MQTT 5 feature of custom handling puback and pubrec packets. Its callback:
```js
Expand Down
8 changes: 7 additions & 1 deletion lib/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ const defaultConnectOptions = {
reconnectPeriod: 1000,
connectTimeout: 30 * 1000,
clean: true,
resubscribe: true
resubscribe: true,
writeCache: true
}

const socketErrors = [
Expand Down Expand Up @@ -275,6 +276,11 @@ function MqttClient (streamBuilder, options) {

this.options.customHandleAcks = (options.protocolVersion === 5 && options.customHandleAcks) ? options.customHandleAcks : function () { arguments[3](0) }

// Disable pre-generated write cache if requested. Will allocate buffers on-the-fly instead. WARNING: This can affect write performance
if (!this.options.writeCache) {
mqttPacket.writeToStream.cacheNumbers = false
}

this.streamBuilder = streamBuilder

this.messageIdProvider = (typeof this.options.messageIdProvider === 'undefined') ? new DefaultMessageIdProvider() : this.options.messageIdProvider
Expand Down
19 changes: 16 additions & 3 deletions test/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,19 @@ describe('MqttClient', function () {
done()
}
})

it('should disable number cache if specified in options', function (done) {
try {
assert.isTrue(mqttPacket.writeToStream.cacheNumbers)
client = mqtt.MqttClient(function () {
throw Error('break')
}, { writeCache: false })
client.end()
} catch (err) {
assert.isFalse(mqttPacket.writeToStream.cacheNumbers)
done()
}
})
})

describe('message ids', function () {
Expand Down Expand Up @@ -83,7 +96,7 @@ describe('MqttClient', function () {
const max = 1000
let count = 0
const duplex = new Duplex({
read: function (n) {},
read: function (n) { },
write: function (chunk, enc, cb) {
parser.parse(chunk)
cb() // nothing to do
Expand Down Expand Up @@ -300,7 +313,7 @@ describe('MqttClient', function () {
})

const server2 = new MqttServer(function (serverClient) {
serverClient.on('error', function () {})
serverClient.on('error', function () { })
debug('setting serverClient connect callback')
serverClient.on('connect', function (packet) {
if (packet.clientId === 'invalid') {
Expand Down Expand Up @@ -397,7 +410,7 @@ describe('MqttClient', function () {

const server2 = net.createServer(function (stream) {
const serverClient = new Connection(stream)
serverClient.on('error', function () {})
serverClient.on('error', function () { })
serverClient.on('connect', function (packet) {
if (packet.clientId === 'invalid') {
serverClient.connack({ returnCode: 2 })
Expand Down

0 comments on commit 0d11888

Please sign in to comment.