Skip to content

Commit

Permalink
pass fuzzy test
Browse files Browse the repository at this point in the history
add benchmark (against paho mqtt lib)
update copyright
  • Loading branch information
jeffreystoke committed Dec 30, 2017
1 parent 66132c4 commit 010a909
Show file tree
Hide file tree
Showing 24 changed files with 624 additions and 103 deletions.
6 changes: 5 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,6 @@
/.idea
/.vscode
/.vscode

/fuzz-test

libmqtt-fuzz.zip
13 changes: 10 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
.PHONY: test lib client clean
.PHONY: test lib client clean fuzz-test

test:
go test -v -run=. -bench=. -benchmem
Expand All @@ -9,6 +9,13 @@ lib:
client:
$(MAKE) -C cmd build

clean:
clean: fuzz-clean
$(MAKE) -C c clean
$(MAKE) -C cmd clean
$(MAKE) -C cmd clean

fuzz-test:
go-fuzz-build github.com/goiiot/libmqtt
go-fuzz -bin=./libmqtt-fuzz.zip -workdir=fuzz-test

fuzz-clean:
rm -rf fuzz-test libmqtt-fuzz.zip
44 changes: 37 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,29 @@

[![Build Status](https://travis-ci.org/goiiot/libmqtt.svg)](https://travis-ci.org/goiiot/libmqtt) [![GoDoc](https://godoc.org/github.com/goiiot/libmqtt?status.svg)](https://godoc.org/github.com/goiiot/libmqtt) [![GoReportCard](https://goreportcard.com/badge/goiiot/libmqtt)](https://goreportcard.com/report/github.com/goiiot/libmqtt)

MQTT 3.1.1 client lib with pure Go
MQTT 3.1.1 client lib in pure Go.

## Contents

- [Features](#features)
- [Usage](#usage)
- [Topic Routing](#topic-routing)
- [Benchmark](#benchmark)
- [RoadMap](#roadmap)

## Features

1. A full functional MQTT 3.1.1 client. (currently only with in memory session state storage)
1. HTTP like API.
1. Full functional MQTT 3.1.1 client. (file persist state is now under work)
1. HTTP server like API.
1. High performance and less memory footprint. (see [Benchmark](#benchmark))
1. Customizable TopicRouter. (see [Topic Routing](#topic-routing))
1. Command line app support (see [cmd](./cmd/))
1. C lib support (see [c](./c/))
1. More efficient, idiomatic Go (maybe not quite idiomatic for now)
1. Command line app support. (see [cmd](./cmd/))
1. C lib support. (see [c](./c/))
1. Idiomatic Go.

## Prerequisite

1. Go 1.9+ (with `GOPATH` configured)

## Usage

Expand Down Expand Up @@ -162,10 +169,33 @@ client, err := NewClient(
)
```

## Benchmark

The procedure of the benchmark is as following:

1. Create the client
1. Connect to server
1. Subscribe to topic `foo`
1. Publish to topic `foo`
1. Unsubsecibe when received all published message (with `foo` topic)
1. Destroy client (a sudden disconnect without disconnect packet)

The benchmark result listed below was taken on a Macbook Pro 13' (Early 2015, macOS 10.13.2), statistics inside which is the value of ten times average

|Bench Name|Pub Count|ns/op|B/op|allocs/op|Transfer Time|Total Time|
|---|---|---|---|---|---|---|
|BenchmarkPahoClient-4|10000|199632|1399|31|0.230s|2.021s|
|BenchmarkLibmqttClient-4|10000|144407|331|9|0.124s|1.467s|
|BenchmarkPahoClient-4|50000|205884|1395|31|1.170s|10.316s|
|BenchmarkLibmqttClient-4|50000|161640|328|9|0.717s|8.105s|

You can make the benchmark using source code from [benchmark](./benchmark/)

Notice: benchmark on libmqtt sometimes can be a infinite loop, we are now trying to solve that.

## RoadMap

1. File persist storage of session status. (High priority)
1. Full tested multiple connections in one client. (High priority)
1. More efficient processing. (Medium priority)
1. Add compatibility with mqtt 5.0 . (Medium priority)
1. Export to Java (JNI), Python (CPython), Objective-C... (Low priority)
5 changes: 5 additions & 0 deletions benchmark/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# Benchmark for MQTT Golang client libs

## Recommended Environment

- MQTT Broker - emqttd (Docker)
232 changes: 232 additions & 0 deletions benchmark/client_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,232 @@
/*
* Copyright GoIIoT (https://github.com/goiiot)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package benchmark

import (
"bytes"
"net/url"
"sync"
"sync/atomic"
"testing"

pah "github.com/eclipse/paho.mqtt.golang"
lib "github.com/goiiot/libmqtt"
)

//smqM "github.com/surgemq/message"
//smq "github.com/surgemq/surgemq/service"
//gmq "github.com/yosssi/gmq/mqtt/client"

const (
keepalive = 10
server = "localhost:1883"
username = "foo"
password = "bar"
topic = "/foo"
qos = 0

pubCount = 50000
)

var (
topicMsg = []byte("bar")
)

func BenchmarkLibmqttClient(b *testing.B) {
b.N = pubCount
b.ReportAllocs()
var count uint32
var client lib.Client
var err error

b.ResetTimer()
if client, err = lib.NewClient(
//lib.WithLog(lib.Verbose),
lib.WithServer(server),
lib.WithKeepalive(keepalive, 1.2),
lib.WithIdentity(username, password),
lib.WithRecvBuf(100),
lib.WithSendBuf(100),
lib.WithCleanSession(true)); err != nil {
b.Log(err)
b.FailNow()
}

client.HandleSub(func(topics []*lib.Topic, err error) {
go func() {
for i := 0; i < pubCount; i++ {
client.Publish(&lib.PublishPacket{
TopicName: topic,
Qos: qos,
Payload: topicMsg,
})
}
}()
})

client.HandlePub(func(topic string, err error) {
if err != nil {
b.FailNow()
}
})

client.HandleUnSub(func(topics []string, err error) {
if err != nil {
b.FailNow()
}

client.Destroy(true)
})

client.Handle(topic, func(t string, q lib.QosLevel, msg []byte) {
if topic != t || q != qos || bytes.Compare(msg, topicMsg) != 0 {
b.FailNow()
}

atomic.AddUint32(&count, 1)
if atomic.LoadUint32(&count) == pubCount {
client.UnSubscribe(topic)
}
})

client.Connect(func(server string, code lib.ConnAckCode, err error) {
if err != nil {
b.Log(err)
b.FailNow()
} else if code != lib.ConnAccepted {
b.FailNow()
}

client.Subscribe(&lib.Topic{Name: topic, Qos: qos})
})

client.Wait()
}

func BenchmarkPahoClient(b *testing.B) {
b.N = pubCount
b.ReportAllocs()
var count uint32

wg := &sync.WaitGroup{}
wg.Add(1)
b.ResetTimer()
serverURL, err := url.Parse("tcp://" + server)
if err != nil {
b.FailNow()
}

client := pah.NewClient(&pah.ClientOptions{
Servers: []*url.URL{serverURL},
KeepAlive: keepalive,
Username: username,
Password: password,
CleanSession: true,
ProtocolVersion: 4,
Store: pah.NewMemoryStore(),
})

t := client.Connect()
if !t.Wait() {
b.FailNow()
}

if err := t.Error(); err != nil {
b.Log(err)
b.FailNow()
}

t = client.Subscribe(topic, 0, func(c pah.Client, message pah.Message) {
if topic != message.Topic() ||
bytes.Compare(topicMsg, message.Payload()) != 0 ||
qos != message.Qos() {
b.FailNow()
}

atomic.AddUint32(&count, 1)
if atomic.LoadUint32(&count) == pubCount {
t := c.Unsubscribe(topic)
if !t.Wait() {
b.FailNow()
}
if err := t.Error(); err != nil {
b.Log(err)
b.FailNow()
}

c.Disconnect(0)
wg.Done()
}
})

if !t.Wait() {
b.FailNow()
}
if err := t.Error(); err != nil {
b.Log(err)
b.FailNow()
}

for i := 0; i < pubCount; i++ {
t = client.Publish(topic, 0, false, topicMsg)
if !t.Wait() {
b.FailNow()
}

if err := t.Error(); err != nil {
b.Log(err)
b.FailNow()
}
}
wg.Wait()
}

//func BenchmarkGmqClient(b *testing.B) {
// client := gmq.New(&gmq.Options{})
// client.Connect(&gmq.ConnectOptions{
// Network: "tcp",
// Address: server,
// UserName: []byte(username),
// Password: []byte(password),
// KeepAlive: keepalive,
// })
// subHandler := func(topicName, message []byte) {
//
// }
// if err := client.Subscribe(&gmq.SubscribeOptions{
// SubReqs: []*gmq.SubReq{
// {
// TopicFilter: []byte(topic),
// QoS: qos,
// Handler: subHandler,
// },
// },
// }); err != nil {
// b.Log(err)
// b.FailNow()
// }
//
//}

//func BenchmarkSurgeClient(b *testing.B) {
// client := &smq.Client{KeepAlive: 10}
// err := client.Connect(server, &smqM.ConnectMessage{})
// if err != nil {
// b.Log(err)
// b.FailNow()
// }
//}
16 changes: 16 additions & 0 deletions c/callbacks.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,19 @@
/*
* Copyright GoIIoT (https://github.com/goiiot)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package main

// #cgo CFLAGS: -I include
Expand Down
4 changes: 2 additions & 2 deletions c/libmqtt.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
// +build cgo lib

/*
* Copyright GoIIoT (https://github.com/goiiot)
*
Expand All @@ -16,6 +14,8 @@
* limitations under the License.
*/

// +build cgo lib

package main

// #cgo CFLAGS: -I include
Expand Down
Loading

0 comments on commit 010a909

Please sign in to comment.