From 010a909ee495b690a20318f5d50149c59ffba140 Mon Sep 17 00:00:00 2001 From: jeffreystoke Date: Sat, 30 Dec 2017 23:57:34 +0800 Subject: [PATCH] pass fuzzy test add benchmark (against paho mqtt lib) update copyright --- .gitignore | 6 +- Makefile | 13 ++- README.md | 44 ++++++-- benchmark/README.md | 5 + benchmark/client_test.go | 232 +++++++++++++++++++++++++++++++++++++++ c/callbacks.go | 16 +++ c/libmqtt.go | 4 +- client.go | 104 +++++++++--------- cmd/README.md | 2 +- cmd/conn.go | 16 +++ cmd/libmqttc.go | 16 +++ cmd/pub.go | 16 +++ cmd/sub.go | 16 +++ cmd/util.go | 16 +++ decoder.go | 76 +++++++------ decoder_test.go | 20 ++++ fuzz.go | 34 ++++++ libmqtt.go | 1 - log.go | 10 +- msg.go | 16 +++ persist.go | 16 +++ persist_test.go | 16 +++ router.go | 16 +++ router_test.go | 16 +++ 24 files changed, 624 insertions(+), 103 deletions(-) create mode 100644 benchmark/README.md create mode 100644 benchmark/client_test.go create mode 100644 fuzz.go diff --git a/.gitignore b/.gitignore index f66d762..fd20e07 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,6 @@ /.idea -/.vscode \ No newline at end of file +/.vscode + +/fuzz-test + +libmqtt-fuzz.zip \ No newline at end of file diff --git a/Makefile b/Makefile index 72051cc..ffaeaaf 100644 --- a/Makefile +++ b/Makefile @@ -1,4 +1,4 @@ -.PHONY: test lib client clean +.PHONY: test lib client clean fuzz-test test: go test -v -run=. -bench=. -benchmem @@ -9,6 +9,13 @@ lib: client: $(MAKE) -C cmd build -clean: +clean: fuzz-clean $(MAKE) -C c clean - $(MAKE) -C cmd clean \ No newline at end of file + $(MAKE) -C cmd clean + +fuzz-test: + go-fuzz-build github.com/goiiot/libmqtt + go-fuzz -bin=./libmqtt-fuzz.zip -workdir=fuzz-test + +fuzz-clean: + rm -rf fuzz-test libmqtt-fuzz.zip diff --git a/README.md b/README.md index c8ea9ed..c4b2af3 100644 --- a/README.md +++ b/README.md @@ -2,22 +2,29 @@ [![Build Status](https://travis-ci.org/goiiot/libmqtt.svg)](https://travis-ci.org/goiiot/libmqtt) [![GoDoc](https://godoc.org/github.com/goiiot/libmqtt?status.svg)](https://godoc.org/github.com/goiiot/libmqtt) [![GoReportCard](https://goreportcard.com/badge/goiiot/libmqtt)](https://goreportcard.com/report/github.com/goiiot/libmqtt) -MQTT 3.1.1 client lib with pure Go +MQTT 3.1.1 client lib in pure Go. ## Contents - [Features](#features) - [Usage](#usage) - [Topic Routing](#topic-routing) +- [Benchmark](#benchmark) +- [RoadMap](#roadmap) ## Features -1. A full functional MQTT 3.1.1 client. (currently only with in memory session state storage) -1. HTTP like API. +1. Full functional MQTT 3.1.1 client. (file persist state is now under work) +1. HTTP server like API. +1. High performance and less memory footprint. (see [Benchmark](#benchmark)) 1. Customizable TopicRouter. (see [Topic Routing](#topic-routing)) -1. Command line app support (see [cmd](./cmd/)) -1. C lib support (see [c](./c/)) -1. More efficient, idiomatic Go (maybe not quite idiomatic for now) +1. Command line app support. (see [cmd](./cmd/)) +1. C lib support. (see [c](./c/)) +1. Idiomatic Go. + +## Prerequisite + +1. Go 1.9+ (with `GOPATH` configured) ## Usage @@ -162,10 +169,33 @@ client, err := NewClient( ) ``` +## Benchmark + +The procedure of the benchmark is as following: + +1. Create the client +1. Connect to server +1. Subscribe to topic `foo` +1. Publish to topic `foo` +1. Unsubsecibe when received all published message (with `foo` topic) +1. Destroy client (a sudden disconnect without disconnect packet) + +The benchmark result listed below was taken on a Macbook Pro 13' (Early 2015, macOS 10.13.2), statistics inside which is the value of ten times average + +|Bench Name|Pub Count|ns/op|B/op|allocs/op|Transfer Time|Total Time| +|---|---|---|---|---|---|---| +|BenchmarkPahoClient-4|10000|199632|1399|31|0.230s|2.021s| +|BenchmarkLibmqttClient-4|10000|144407|331|9|0.124s|1.467s| +|BenchmarkPahoClient-4|50000|205884|1395|31|1.170s|10.316s| +|BenchmarkLibmqttClient-4|50000|161640|328|9|0.717s|8.105s| + +You can make the benchmark using source code from [benchmark](./benchmark/) + +Notice: benchmark on libmqtt sometimes can be a infinite loop, we are now trying to solve that. + ## RoadMap 1. File persist storage of session status. (High priority) 1. Full tested multiple connections in one client. (High priority) -1. More efficient processing. (Medium priority) 1. Add compatibility with mqtt 5.0 . (Medium priority) 1. Export to Java (JNI), Python (CPython), Objective-C... (Low priority) diff --git a/benchmark/README.md b/benchmark/README.md new file mode 100644 index 0000000..155e6d2 --- /dev/null +++ b/benchmark/README.md @@ -0,0 +1,5 @@ +# Benchmark for MQTT Golang client libs + +## Recommended Environment + +- MQTT Broker - emqttd (Docker) diff --git a/benchmark/client_test.go b/benchmark/client_test.go new file mode 100644 index 0000000..e12f55f --- /dev/null +++ b/benchmark/client_test.go @@ -0,0 +1,232 @@ +/* + * Copyright GoIIoT (https://github.com/goiiot) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package benchmark + +import ( + "bytes" + "net/url" + "sync" + "sync/atomic" + "testing" + + pah "github.com/eclipse/paho.mqtt.golang" + lib "github.com/goiiot/libmqtt" +) + +//smqM "github.com/surgemq/message" +//smq "github.com/surgemq/surgemq/service" +//gmq "github.com/yosssi/gmq/mqtt/client" + +const ( + keepalive = 10 + server = "localhost:1883" + username = "foo" + password = "bar" + topic = "/foo" + qos = 0 + + pubCount = 50000 +) + +var ( + topicMsg = []byte("bar") +) + +func BenchmarkLibmqttClient(b *testing.B) { + b.N = pubCount + b.ReportAllocs() + var count uint32 + var client lib.Client + var err error + + b.ResetTimer() + if client, err = lib.NewClient( + //lib.WithLog(lib.Verbose), + lib.WithServer(server), + lib.WithKeepalive(keepalive, 1.2), + lib.WithIdentity(username, password), + lib.WithRecvBuf(100), + lib.WithSendBuf(100), + lib.WithCleanSession(true)); err != nil { + b.Log(err) + b.FailNow() + } + + client.HandleSub(func(topics []*lib.Topic, err error) { + go func() { + for i := 0; i < pubCount; i++ { + client.Publish(&lib.PublishPacket{ + TopicName: topic, + Qos: qos, + Payload: topicMsg, + }) + } + }() + }) + + client.HandlePub(func(topic string, err error) { + if err != nil { + b.FailNow() + } + }) + + client.HandleUnSub(func(topics []string, err error) { + if err != nil { + b.FailNow() + } + + client.Destroy(true) + }) + + client.Handle(topic, func(t string, q lib.QosLevel, msg []byte) { + if topic != t || q != qos || bytes.Compare(msg, topicMsg) != 0 { + b.FailNow() + } + + atomic.AddUint32(&count, 1) + if atomic.LoadUint32(&count) == pubCount { + client.UnSubscribe(topic) + } + }) + + client.Connect(func(server string, code lib.ConnAckCode, err error) { + if err != nil { + b.Log(err) + b.FailNow() + } else if code != lib.ConnAccepted { + b.FailNow() + } + + client.Subscribe(&lib.Topic{Name: topic, Qos: qos}) + }) + + client.Wait() +} + +func BenchmarkPahoClient(b *testing.B) { + b.N = pubCount + b.ReportAllocs() + var count uint32 + + wg := &sync.WaitGroup{} + wg.Add(1) + b.ResetTimer() + serverURL, err := url.Parse("tcp://" + server) + if err != nil { + b.FailNow() + } + + client := pah.NewClient(&pah.ClientOptions{ + Servers: []*url.URL{serverURL}, + KeepAlive: keepalive, + Username: username, + Password: password, + CleanSession: true, + ProtocolVersion: 4, + Store: pah.NewMemoryStore(), + }) + + t := client.Connect() + if !t.Wait() { + b.FailNow() + } + + if err := t.Error(); err != nil { + b.Log(err) + b.FailNow() + } + + t = client.Subscribe(topic, 0, func(c pah.Client, message pah.Message) { + if topic != message.Topic() || + bytes.Compare(topicMsg, message.Payload()) != 0 || + qos != message.Qos() { + b.FailNow() + } + + atomic.AddUint32(&count, 1) + if atomic.LoadUint32(&count) == pubCount { + t := c.Unsubscribe(topic) + if !t.Wait() { + b.FailNow() + } + if err := t.Error(); err != nil { + b.Log(err) + b.FailNow() + } + + c.Disconnect(0) + wg.Done() + } + }) + + if !t.Wait() { + b.FailNow() + } + if err := t.Error(); err != nil { + b.Log(err) + b.FailNow() + } + + for i := 0; i < pubCount; i++ { + t = client.Publish(topic, 0, false, topicMsg) + if !t.Wait() { + b.FailNow() + } + + if err := t.Error(); err != nil { + b.Log(err) + b.FailNow() + } + } + wg.Wait() +} + +//func BenchmarkGmqClient(b *testing.B) { +// client := gmq.New(&gmq.Options{}) +// client.Connect(&gmq.ConnectOptions{ +// Network: "tcp", +// Address: server, +// UserName: []byte(username), +// Password: []byte(password), +// KeepAlive: keepalive, +// }) +// subHandler := func(topicName, message []byte) { +// +// } +// if err := client.Subscribe(&gmq.SubscribeOptions{ +// SubReqs: []*gmq.SubReq{ +// { +// TopicFilter: []byte(topic), +// QoS: qos, +// Handler: subHandler, +// }, +// }, +// }); err != nil { +// b.Log(err) +// b.FailNow() +// } +// +//} + +//func BenchmarkSurgeClient(b *testing.B) { +// client := &smq.Client{KeepAlive: 10} +// err := client.Connect(server, &smqM.ConnectMessage{}) +// if err != nil { +// b.Log(err) +// b.FailNow() +// } +//} diff --git a/c/callbacks.go b/c/callbacks.go index 848af7c..0d2d5fa 100644 --- a/c/callbacks.go +++ b/c/callbacks.go @@ -1,3 +1,19 @@ +/* + * Copyright GoIIoT (https://github.com/goiiot) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package main // #cgo CFLAGS: -I include diff --git a/c/libmqtt.go b/c/libmqtt.go index 8b27a79..467f422 100644 --- a/c/libmqtt.go +++ b/c/libmqtt.go @@ -1,5 +1,3 @@ -// +build cgo lib - /* * Copyright GoIIoT (https://github.com/goiiot) * @@ -16,6 +14,8 @@ * limitations under the License. */ +// +build cgo lib + package main // #cgo CFLAGS: -I include diff --git a/client.go b/client.go index 3ffded9..dfe3ce4 100644 --- a/client.go +++ b/client.go @@ -305,7 +305,7 @@ type client struct { workers *sync.WaitGroup // workers persist PersistMethod // persist method - // error handlers + // success/error handlers pH PubHandler sH SubHandler uH UnSubHandler @@ -315,7 +315,7 @@ type client struct { // defaultClient create the client with default options func defaultClient() *client { - c := &client{ + return &client{ options: &clientOptions{ sendChanSize: 128, recvChanSize: 128, @@ -335,7 +335,6 @@ func defaultClient() *client { workers: &sync.WaitGroup{}, persist: NewMemPersist(DefaultPersistStrategy()), } - return c } // Handle subscription message route @@ -356,27 +355,27 @@ func (c *client) Connect(h ConnHandler) { }() go func() { - for e := range c.msgC { - switch e.what { + for m := range c.msgC { + switch m.what { case pubMsg: if c.pH != nil { - c.pH(e.msg, e.err) + c.pH(m.msg, m.err) } case subMsg: if c.sH != nil { - c.sH(e.obj.([]*Topic), e.err) + c.sH(m.obj.([]*Topic), m.err) } case unSubMsg: if c.uH != nil { - c.uH(e.obj.([]string), e.err) + c.uH(m.obj.([]string), m.err) } case netMsg: if c.nH != nil { - c.nH(e.msg, e.err) + c.nH(m.msg, m.err) } case persistMsg: if c.psH != nil { - c.psH(e.err) + c.psH(m.err) } } } @@ -390,23 +389,22 @@ func (c *client) Connect(h ConnHandler) { // Publish message(s) to topic(s), one to one func (c *client) Publish(msg ...*PublishPacket) { - lg.d("CLIENT publish, msg(s) =", msg) for _, m := range msg { - if m.Qos > Qos2 { - m.Qos = Qos2 + if m == nil { + continue } - toSend := &PublishPacket{ - Qos: m.Qos, - IsRetain: m.IsRetain, - TopicName: m.TopicName, - Payload: m.Payload, + p := m + if p.Qos > Qos2 { + p.Qos = Qos2 } - if toSend.Qos != Qos0 { - toSend.PacketID = c.idGen.next(toSend) + if p.Qos != Qos0 { + p.PacketID = c.idGen.next(p) } - c.sendC <- toSend + + c.sendC <- p + lg.d("CLIENT publish, msg =", p) } } @@ -444,7 +442,8 @@ func (c *client) Wait() { // If force is true, then close connection without sending a DisConnPacket func (c *client) Destroy(force bool) { lg.d("CLIENT destroying client with force =", force) - close(c.sendC) + // TODO close all channel properly + // close(c.logicSendC) c.options.bf = nil if force { c.conn.Range(func(k, v interface{}) bool { @@ -523,10 +522,11 @@ func (c *client) connect(server string, h ConnHandler, triedTimes int) { parent: c, name: server, conn: conn, + clientBuf: &bytes.Buffer{}, sendBuf: &bytes.Buffer{}, keepaliveC: make(chan int), - sendC: make(chan Packet), - recvC: make(chan Packet), + logicSendC: make(chan Packet), + netRecvC: make(chan Packet), } if c.options.bf != nil { @@ -541,7 +541,6 @@ func (c *client) connect(server string, h ConnHandler, triedTimes int) { } go connImpl.handleLogicSend() - go connImpl.handleClientSend() go connImpl.handleRecv() connImpl.send(&ConPacket{ @@ -558,7 +557,7 @@ func (c *client) connect(server string, h ConnHandler, triedTimes int) { }) select { - case pkt, more := <-connImpl.recvC: + case pkt, more := <-connImpl.netRecvC: if more { if pkt.Type() == CtrlConnAck { p := pkt.(*ConAckPacket) @@ -587,21 +586,23 @@ func (c *client) connect(server string, h ConnHandler, triedTimes int) { return } + go connImpl.handleClientSend() + lg.i("CLIENT connected server =", server) if h != nil { - go h(server, ConnAccepted, nil) + h(server, ConnAccepted, nil) } - // login success, startLogic mqtt logic + // login success, start mqtt logic c.conn.Store(server, connImpl) - connImpl.startLogic() + connImpl.logic() if c.options.bf != nil { triedTimes++ c.workers.Add(1) - lg.w("CLIENT reconnect to server =", server, "seq =", triedTimes, "delay =", int64(connImpl.reconDelay)) + lg.w("CLIENT reconnect to server =", server, "seq =", triedTimes, "delay =", connImpl.reconDelay) go func() { - <-time.After(connImpl.reconDelay) + time.Sleep(connImpl.reconDelay) c.connect(server, h, triedTimes) }() } @@ -613,22 +614,23 @@ type connImpl struct { parent *client // client which created this connection name string // server addr info conn net.Conn // connection to server - sendBuf *bytes.Buffer // buffer for packet send - sendC chan Packet // logic send channel - recvC chan Packet // received packet from server + sendBuf *bytes.Buffer // buffer for logic packet send + clientBuf *bytes.Buffer // buffer for client packet send + logicSendC chan Packet // logic send channel + netRecvC chan Packet // received packet from server keepaliveC chan int // keepalive packet reconDelay time.Duration // reconnection delay } -// startLogic mqtt logic -func (c *connImpl) startLogic() { - // startLogic keepalive if required +// start mqtt logic +func (c *connImpl) logic() { + // start keepalive if required if c.parent.options.keepalive > 0 { go c.keepalive() } // inspect incoming packet - for pkt := range c.recvC { + for pkt := range c.netRecvC { switch pkt.Type() { case CtrlSubAck: p := pkt.(*SubAckPacket) @@ -790,8 +792,8 @@ func (c *connImpl) close() { // handle client message send func (c *connImpl) handleClientSend() { for pkt := range c.parent.sendC { - pkt.Bytes(c.sendBuf) - if _, err := c.sendBuf.WriteTo(c.conn); err != nil { + pkt.Bytes(c.clientBuf) + if _, err := c.clientBuf.WriteTo(c.conn); err != nil { // DO NOT NOTIFY net err HERE // ALWAYS DETECT net err in receive switch pkt.Type() { @@ -809,7 +811,7 @@ func (c *connImpl) handleClientSend() { case CtrlPublish: pub := pkt.(*PublishPacket) if pub.Qos == Qos0 { - c.parent.msgC <- newPubMsg(pkt.(*PublishPacket).TopicName, nil) + c.parent.msgC <- newPubMsg(pub.TopicName, nil) } else { c.parent.persist.Store(sendKey(pub.PacketID), pkt) } @@ -817,16 +819,16 @@ func (c *connImpl) handleClientSend() { c.parent.persist.Store(sendKey(pkt.(*SubscribePacket).PacketID), pkt) case CtrlUnSub: c.parent.persist.Store(sendKey(pkt.(*UnSubPacket).PacketID), pkt) - case CtrlDisConn: - // disconnect to server - break } } + lg.e("exit recv") + //for pkt := range c.parent.logicSendC { + //} } // handle mqtt logic control packet send func (c *connImpl) handleLogicSend() { - for pkt := range c.sendC { + for pkt := range c.logicSendC { pkt.Bytes(c.sendBuf) if _, err := c.sendBuf.WriteTo(c.conn); err != nil { // DO NOT NOTIFY net err HERE @@ -842,6 +844,8 @@ func (c *connImpl) handleLogicSend() { case CtrlPubComp: c.parent.persist.Delete(sendKey(pkt.(*PubCompPacket).PacketID)) case CtrlDisConn: + // disconnect to server + lg.i("disconnect to server") c.conn.Close() break } @@ -853,8 +857,8 @@ func (c *connImpl) handleRecv() { for { pkt, err := decodeOnePacket(c.conn) if err != nil { - lg.e("NET connection broken", "server =", c.name, "err =", err) - close(c.recvC) + lg.e("NET connection broken, server =", c.name, "err =", err) + close(c.netRecvC) close(c.keepaliveC) // TODO send proper net error to net handler // count.parent.msgC <- newNetMsg(count.name, err) @@ -866,12 +870,12 @@ func (c *connImpl) handleRecv() { lg.d("NET received keepalive message") c.keepaliveC <- 1 } else { - c.recvC <- pkt + c.netRecvC <- pkt } } } -// send internal mqtt logic packet +// send mqtt logic packet func (c *connImpl) send(pkt Packet) { - c.sendC <- pkt + c.logicSendC <- pkt } diff --git a/cmd/README.md b/cmd/README.md index 16b0a49..c616528 100644 --- a/cmd/README.md +++ b/cmd/README.md @@ -8,7 +8,7 @@ A command line MQTT client built on top of libmqtt. ### Prerequisite -1. Go (with `GOPATH` configured) +1. Go 1.9+ (with `GOPATH` configured) ### Steps diff --git a/cmd/conn.go b/cmd/conn.go index 8c6d32b..a7194fc 100644 --- a/cmd/conn.go +++ b/cmd/conn.go @@ -1,3 +1,19 @@ +/* + * Copyright GoIIoT (https://github.com/goiiot) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package main import ( diff --git a/cmd/libmqttc.go b/cmd/libmqttc.go index 2e18eea..04c3ad4 100644 --- a/cmd/libmqttc.go +++ b/cmd/libmqttc.go @@ -1,3 +1,19 @@ +/* + * Copyright GoIIoT (https://github.com/goiiot) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package main import ( diff --git a/cmd/pub.go b/cmd/pub.go index be4385e..9c97c9c 100644 --- a/cmd/pub.go +++ b/cmd/pub.go @@ -1,3 +1,19 @@ +/* + * Copyright GoIIoT (https://github.com/goiiot) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package main import ( diff --git a/cmd/sub.go b/cmd/sub.go index 8285dc8..75c7653 100644 --- a/cmd/sub.go +++ b/cmd/sub.go @@ -1,3 +1,19 @@ +/* + * Copyright GoIIoT (https://github.com/goiiot) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package main import ( diff --git a/cmd/util.go b/cmd/util.go index 3b2b9dd..846422c 100644 --- a/cmd/util.go +++ b/cmd/util.go @@ -1,3 +1,19 @@ +/* + * Copyright GoIIoT (https://github.com/goiiot) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package main import ( diff --git a/decoder.go b/decoder.go index 0c8a7b4..84c345b 100644 --- a/decoder.go +++ b/decoder.go @@ -28,27 +28,25 @@ var ( func decodeOnePacket(reader io.Reader) (pkt Packet, err error) { headerBytes := make([]byte, 1) - _, err = io.ReadFull(reader, headerBytes[:]) - if err != nil { + if _, err = io.ReadFull(reader, headerBytes[:]); err != nil { return } var bytesToRead int - bytesToRead, err = decodeRemainLength(reader) - if err != nil { + if bytesToRead, err = decodeRemainLength(reader); err != nil { return } + if bytesToRead < 2 { + return nil, ErrBadPacket + } + body := make([]byte, bytesToRead) - if bytesToRead > 0 { - var n = 0 - n, err = io.ReadFull(reader, body[:]) - if n < 2 { - err = ErrBadPacket - } - if err != nil { - return - } + n := 0 + if n, err = io.ReadFull(reader, body[:]); err != nil { + return + } else if n < 2 { + return nil, ErrBadPacket } header := headerBytes[0] @@ -56,12 +54,11 @@ func decodeOnePacket(reader io.Reader) (pkt Packet, err error) { switch header >> 4 { case CtrlConn: var protocol string - protocol, next, err = decodeString(body) - if err != nil { + if protocol, next, err = decodeString(body); err != nil { return } - if len(next) < 2 { + if len(next) < 4 { err = ErrBadPacket return } @@ -84,44 +81,50 @@ func decodeOnePacket(reader io.Reader) (pkt Packet, err error) { tmpPkt.WillTopic, next, err = decodeString(next) tmpPkt.WillMessage, next, err = decodeData(next) } + if hasUsername { tmpPkt.Username, next, err = decodeString(next) } + if hasPassword { tmpPkt.Password, _, err = decodeString(next) } - pkt = tmpPkt - case CtrlConnAck: - if len(body) < 2 { - err = ErrBadPacket + + if err != nil { return } + + pkt = tmpPkt + case CtrlConnAck: pkt = &ConAckPacket{ - Present: body[0]&0x01 == 1, + Present: body[0]&0x01 == 0x01, Code: body[1], } case CtrlPublish: var topicName string - topicName, next, err = decodeString(body) - if err != nil { + if topicName, next, err = decodeString(body); err != nil { return } + if len(next) < 2 { err = ErrBadPacket return } - tmpPkg := &PublishPacket{ + + pub := &PublishPacket{ IsDup: header&0x08 == 0x08, Qos: header & 0x06 >> 1, IsRetain: header&0x01 == 1, TopicName: topicName, } - if tmpPkg.Qos > Qos0 { - tmpPkg.PacketID = uint16(next[0])<<8 + uint16(next[1]) + + if pub.Qos > Qos0 { + pub.PacketID = uint16(next[0])<<8 + uint16(next[1]) next = next[2:] } - tmpPkg.Payload = next - pkt = tmpPkg + + pub.Payload = next + pkt = pub case CtrlPubAck: pkt = &PubAckPacket{ PacketID: uint16(body[0])<<8 + uint16(body[1]), @@ -142,14 +145,20 @@ func decodeOnePacket(reader io.Reader) (pkt Packet, err error) { pktTmp := &SubscribePacket{ PacketID: uint16(body[0])<<8 + uint16(body[1]), } - next := body[2:] + + next = body[2:] topics := make([]*Topic, 0) for len(next) > 0 { var name string - name, next, err = decodeString(next) - if err != nil { + if name, next, err = decodeString(next); err != nil { + return + } + + if len(next) < 1 { + err = ErrBadPacket return } + topics = append(topics, &Topic{Name: name, Qos: next[0]}) next = next[1:] } @@ -159,7 +168,8 @@ func decodeOnePacket(reader io.Reader) (pkt Packet, err error) { pktTmp := &SubAckPacket{ PacketID: uint16(body[0])<<8 + uint16(body[1]), } - next := body[2:] + + next = body[2:] codes := make([]SubAckCode, 0) for i := 0; i < len(next); i++ { codes = append(codes, next[i]) @@ -169,7 +179,7 @@ func decodeOnePacket(reader io.Reader) (pkt Packet, err error) { pktTmp := &UnSubPacket{ PacketID: uint16(body[0])<<8 + uint16(body[1]), } - next := body[2:] + next = body[2:] topics := make([]string, 0) for len(next) > 0 { var name string diff --git a/decoder_test.go b/decoder_test.go index b51327b..00ce98c 100644 --- a/decoder_test.go +++ b/decoder_test.go @@ -32,6 +32,10 @@ func TestDecodeRemainLength(t *testing.T) { buffer.Reset() } +func BenchmarkDecodeRemainLength(b *testing.B) { + +} + func TestDecodeOnePacket(t *testing.T) { // MQTT packet should work targetBytes := connWillBytes @@ -98,3 +102,19 @@ func TestDecodeOnePacket(t *testing.T) { // none MQTT packet should fail } + +func BenchmarkDecodeOnePacket(b *testing.B) { + b.StopTimer() + buf := &bytes.Buffer{} + for i := 0; i < b.N; i++ { + buf.Write(connWillBytes) + } + + b.StartTimer() + for i := 0; i < b.N; i++ { + _, err := decodeOnePacket(buf) + if err != nil { + b.Fail() + } + } +} diff --git a/fuzz.go b/fuzz.go new file mode 100644 index 0000000..840db45 --- /dev/null +++ b/fuzz.go @@ -0,0 +1,34 @@ +// +build gofuzz + +/* + * Copyright GoIIoT (https://github.com/goiiot) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package libmqtt + +import ( + "bytes" +) + +func Fuzz(data []byte) int { + pkt, err := decodeOnePacket(bytes.NewReader(data)) + if err != nil { + if pkt != nil { + panic("pkt != nil on error") + } + return 0 + } + return 1 +} diff --git a/libmqtt.go b/libmqtt.go index 27f3bdd..a42f41a 100644 --- a/libmqtt.go +++ b/libmqtt.go @@ -18,7 +18,6 @@ package libmqtt import ( "bytes" - "errors" ) // Packet is MQTT control packet diff --git a/log.go b/log.go index ebf9871..f531137 100644 --- a/log.go +++ b/log.go @@ -65,27 +65,27 @@ func newLogger(l LogLevel) *logger { if l <= Error { lo.error = newStdLogger() - lo.error.SetPrefix("[LIBMQTT] Error: ") + lo.error.SetPrefix("[LIBMQTT] E ") } if l <= Warning { lo.warning = newStdLogger() - lo.warning.SetPrefix("[LIBMQTT] Warning: ") + lo.warning.SetPrefix("[LIBMQTT] W ") } if l <= Info { lo.info = newStdLogger() - lo.info.SetPrefix("[LIBMQTT] Info: ") + lo.info.SetPrefix("[LIBMQTT] I ") } if l <= Debug { lo.debug = newStdLogger() - lo.debug.SetPrefix("[LIBMQTT] Debug: ") + lo.debug.SetPrefix("[LIBMQTT] D ") } if l <= Verbose { lo.verbose = newStdLogger() - lo.verbose.SetPrefix("[LIBMQTT] Verbose: ") + lo.verbose.SetPrefix("[LIBMQTT] V ") } if l <= Silent { diff --git a/msg.go b/msg.go index ae66305..f568cb8 100644 --- a/msg.go +++ b/msg.go @@ -1,3 +1,19 @@ +/* + * Copyright GoIIoT (https://github.com/goiiot) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package libmqtt type msgType uint8 diff --git a/persist.go b/persist.go index e68d7d3..32c5823 100644 --- a/persist.go +++ b/persist.go @@ -1,3 +1,19 @@ +/* + * Copyright GoIIoT (https://github.com/goiiot) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package libmqtt import ( diff --git a/persist_test.go b/persist_test.go index b5f189d..9d4c65c 100644 --- a/persist_test.go +++ b/persist_test.go @@ -1,3 +1,19 @@ +/* + * Copyright GoIIoT (https://github.com/goiiot) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package libmqtt import "testing" diff --git a/router.go b/router.go index 3e959d8..cc99c17 100644 --- a/router.go +++ b/router.go @@ -1,3 +1,19 @@ +/* + * Copyright GoIIoT (https://github.com/goiiot) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package libmqtt import ( diff --git a/router_test.go b/router_test.go index f854078..d37681e 100644 --- a/router_test.go +++ b/router_test.go @@ -1,3 +1,19 @@ +/* + * Copyright GoIIoT (https://github.com/goiiot) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package libmqtt import (