Skip to content

Commit

Permalink
fix decode issue, ping packet bug fix
Browse files Browse the repository at this point in the history
test implement redis persist
  • Loading branch information
jeffreystoke committed Jan 3, 2018
1 parent ab6ed1c commit 8a21f38
Show file tree
Hide file tree
Showing 28 changed files with 606 additions and 147 deletions.
31 changes: 25 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,32 +2,38 @@

[![Build Status](https://travis-ci.org/goiiot/libmqtt.svg)](https://travis-ci.org/goiiot/libmqtt) [![GoDoc](https://godoc.org/github.com/goiiot/libmqtt?status.svg)](https://godoc.org/github.com/goiiot/libmqtt) [![GoReportCard](https://goreportcard.com/badge/goiiot/libmqtt)](https://goreportcard.com/report/github.com/goiiot/libmqtt)

Modern MQTT 3.1.1 client lib in pure Go, for `Go`, `C` and `Java`
Modern MQTT 3.1.1 client lib in pure Go, for `Go`, `C/C++`, `Java` and `Python`

## Contents

- [Features](#features)
- [Extensions](#extensions)
- [Usage](#usage)
- [Topic Routing](#topic-routing)
- [Session Persist](#session-persist)
- [Benchmark](#benchmark)
- [RoadMap](#roadmap)
- [LICENSE](#license)

## Features

1. Full functional MQTT 3.1.1 client (file persist state is now under work)
1. Full functional MQTT 3.1.1 client
1. HTTP server like API
1. High performance and less memory footprint (see [Benchmark](#benchmark))
1. Customizable TopicRouter (see [Topic Routing](#topic-routing))
1. [C lib](./c/), [Java lib](./java/), [Python lib (TODO)](./python/), [Command line client](./cmd/) support
1. [C/C++ lib](./c/), [Java lib](./java/), [Python lib - TODO](./python/), [Command line client](./cmd/) support
1. Idiomatic Go, reactive stream

## Extensions

Helpful extensions for libmqtt (see [extension](./extension/))

## Usage

This project can be used as

- A [Go lib](#as-a-go-lib)
- A [C lib](#as-a-c-lib)
- A [C/C++ lib](#as-a-c-lib)
- A [Java lib](#as-a-java-lib)
- A [Python lib](#as-a-python-lib) (TODO)
- A [Command line client](#as-a-command-line-client)
Expand Down Expand Up @@ -149,7 +155,7 @@ client.UnSubscribe("foo", "bar")
client.Destroy(true)
```

### As a C lib
### As a C/C++ lib

Please refer to [c - README.md](./c/README.md)

Expand All @@ -175,12 +181,25 @@ Routing topics is one of the most important thing when it comes to business logi
If you would like to apply other routing strategy to the client, you can provide this option when creating the client

```go
client, err := NewClient(
client, err := libmqtt.NewClient(
// ...
// for example, use `RegexRouter`
libmqtt.WithRouter(libmqtt.NewRegexRouter()),
// ...
)
```

## Session Persist

Per MQTT Specification, session state should be persisted and be recovered when next time connected to server without clean session flag set, currently we provide persist method as following:

1. NonePersist - no persist
1. MemPersist - in memory persist
1. FilePersist - use file as persist method (TODO)
1. RedisPersist - use redis as persist method (Available inside [github.com/goiiot/libmqtt/extension](./extension/) package)

We don't recommend `FilePersist`, use `RedisPersist` instead

## Benchmark

The procedure of the benchmark is as following:
Expand Down
2 changes: 2 additions & 0 deletions benchmark/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
bench:
go test -v -bench . -benchmem
10 changes: 4 additions & 6 deletions benchmark/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,14 @@ import (
//gmq "github.com/yosssi/gmq/mqtt/client"

const (
keepalive = 10
keepalive = 3600
server = "localhost:1883"
username = "foo"
password = "bar"
topic = "/foo"
qos = 0

pubCount = 50000
pubCount = 100000
)

var (
Expand All @@ -55,7 +55,6 @@ func BenchmarkLibmqttClient(b *testing.B) {

b.ResetTimer()
if client, err = lib.NewClient(
//lib.WithLog(lib.Verbose),
lib.WithServer(server),
lib.WithKeepalive(keepalive, 1.2),
lib.WithIdentity(username, password),
Expand All @@ -68,7 +67,7 @@ func BenchmarkLibmqttClient(b *testing.B) {

client.HandleSub(func(topics []*lib.Topic, err error) {
go func() {
for i := 0; i < pubCount; i++ {
for i := 0; i < b.N; i++ {
client.Publish(&lib.PublishPacket{
TopicName: topic,
Qos: qos,
Expand All @@ -88,7 +87,6 @@ func BenchmarkLibmqttClient(b *testing.B) {
if err != nil {
b.FailNow()
}

client.Destroy(true)
})

Expand Down Expand Up @@ -181,7 +179,7 @@ func BenchmarkPahoClient(b *testing.B) {
b.FailNow()
}

for i := 0; i < pubCount; i++ {
for i := 0; i < b.N; i++ {
t = client.Publish(topic, 0, false, topicMsg)
if !t.Wait() {
b.FailNow()
Expand Down
4 changes: 2 additions & 2 deletions c/README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# libmqtt C lib
# libmqtt C/C++ lib

A MQTT C client library exported from libmqtt
A MQTT C/C++ client library exported from libmqtt

__NOTE__: This library is still under work, some bugs can happen

Expand Down
16 changes: 16 additions & 0 deletions c/example/example.c
Original file line number Diff line number Diff line change
@@ -1,3 +1,19 @@
/*
* Copyright GoIIoT (https://github.com/goiiot)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
Expand Down
File renamed without changes.
59 changes: 33 additions & 26 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,20 +92,26 @@ func WithKeepalive(keepalive uint16, factor float64) Option {
}

// WithBackoffStrategy will set reconnect backoff strategy
func WithBackoffStrategy(bf *BackoffOption) Option {
func WithBackoffStrategy(firstDelay, maxDelay time.Duration, factor float64) Option {
return func(c *client) error {
if bf != nil {
if bf.FirstDelay < time.Millisecond {
bf.FirstDelay = time.Millisecond
}
if bf.MaxDelay < bf.FirstDelay {
bf.MaxDelay = bf.FirstDelay
}
if bf.Factor < 1 {
bf.Factor = 1
}
c.options.backoffOption = bf
if firstDelay < time.Millisecond {
firstDelay = time.Millisecond
}

if maxDelay < firstDelay {
maxDelay = firstDelay
}

if factor < 1 {
factor = 1
}

c.options.backoffOption = &BackoffOption{
FirstDelay: firstDelay,
MaxDelay: maxDelay,
Factor: factor,
}

return nil
}
}
Expand Down Expand Up @@ -300,10 +306,10 @@ type client struct {
msgC chan *message // error channel
sendC chan Packet // Pub channel for sending publish packet to server
recvC chan *PublishPacket // Pub recv channel for receiving
idGen *idGenerator // sorted in use packetId []uint16
router TopicRouter // topic router
persist PersistMethod // persist method
workers *sync.WaitGroup // workers
idGen *idGenerator // Packet id generator
router TopicRouter // Topic router
persist PersistMethod // Persist method
workers *sync.WaitGroup // Workers (connections)

// success/error handlers
pH PubHandler
Expand Down Expand Up @@ -749,7 +755,9 @@ func (c *connImpl) logic() {
func (c *connImpl) keepalive() {
lg.d("NET start keepalive")

t := time.NewTicker(c.parent.options.keepalive)
t := time.NewTicker(c.parent.options.keepalive * 3 / 4)
timeout := time.Duration(float64(c.parent.options.keepalive) * c.parent.options.keepaliveFactor)
timeoutTimer := time.NewTimer(timeout)
defer t.Stop()

for range t.C {
Expand All @@ -760,9 +768,9 @@ func (c *connImpl) keepalive() {
if !more {
return
}
case <-time.After(c.parent.options.keepalive * time.Duration(c.parent.options.keepaliveFactor)):
timeoutTimer.Reset(timeout)
case <-timeoutTimer.C:
lg.i("NET keepalive timeout")
// ping timeout
t.Stop()
c.conn.Close()
return
Expand Down Expand Up @@ -814,21 +822,21 @@ func (c *connImpl) handleClientSend() {

// handle mqtt logic control packet send
func (c *connImpl) handleLogicSend() {
for pkt := range c.logicSendC {
pkt.Bytes(c.sendBuf)
for logicPkt := range c.logicSendC {
logicPkt.Bytes(c.sendBuf)
if _, err := c.sendBuf.WriteTo(c.conn); err != nil {
// DO NOT NOTIFY net err HERE
// ALWAYS DETECT net err in receive
break
}

switch pkt.Type() {
switch logicPkt.Type() {
case CtrlPubRel:
c.parent.persist.Store(sendKey(pkt.(*PubRelPacket).PacketID), pkt)
c.parent.persist.Store(sendKey(logicPkt.(*PubRelPacket).PacketID), logicPkt)
case CtrlPubAck:
c.parent.persist.Delete(sendKey(pkt.(*PubAckPacket).PacketID))
c.parent.persist.Delete(sendKey(logicPkt.(*PubAckPacket).PacketID))
case CtrlPubComp:
c.parent.persist.Delete(sendKey(pkt.(*PubCompPacket).PacketID))
c.parent.persist.Delete(sendKey(logicPkt.(*PubCompPacket).PacketID))
case CtrlDisConn:
// disconnect to server
lg.i("disconnect to server")
Expand All @@ -851,7 +859,6 @@ func (c *connImpl) handleRecv() {
break
}

// pass packets
if pkt == PingRespPacket {
lg.d("NET received keepalive message")
c.keepaliveC <- 1
Expand Down
File renamed without changes.
Loading

0 comments on commit 8a21f38

Please sign in to comment.