Gmqtt provides:
- MQTT broker that fully implements the MQTT protocol V3.1.1.
- Golang MQTT broker package for secondary development.
- MQTT protocol pack/unpack package for implementing MQTT clients or testing.
$ go get -u github.com/DrmagicE/gmqtt
- Provide hook method to customized the broker behaviours(Authentication, ACL, etc..). See
hooks.go
for more details - Support tls/ssl and websocket
- Enable user to write plugins. See
plugin.go
and/plugin
for more details. - Provide abilities for extensions to interact with the server. See
Server
interface inserver.go
andexample_test.go
for more details. - Provide metrics (by using Prometheus). (plugin: prometheus)
- Provide restful API to interact with server. (plugin:management)
- The retained messages are not persisted when the server exit.
- Cluster is not supported.
$ cd cmd/broker
$ go run main.go
The broker will listen on port 1883 for TCP and 8080 for websocket. The broker loads the following plugins:
- management: Listens on port
8081
, provides restful api service - prometheus: Listens on port
8082
, serve as a prometheus exporter with/metrics
path.
$ docker build -t gmqtt .
$ docker run -p 1883:1883 -p 8081:8081 -p 8082:8082 gmqtt
The features of build-in MQTT broker are not rich enough.It is not implementing some features such as Authentication, ACL etc.. But It is easy to write your own plugins to extend the broker.
func main() {
// listener
ln, err := net.Listen("tcp", ":1883")
if err != nil {
log.Fatalln(err.Error())
return
}
// websocket server
ws := &gmqtt.WsServer{
Server: &http.Server{Addr: ":8080"},
Path: "/ws",
}
if err != nil {
panic(err)
}
l, _ := zap.NewProduction()
// l, _ := zap.NewDevelopment()
s := gmqtt.NewServer(
gmqtt.WithTCPListener(ln),
gmqtt.WithWebsocketServer(ws),
// Add your plugins
gmqtt.WithPlugin(management.New(":8081", nil)),
gmqtt.WithPlugin(prometheus.New(&http.Server{
Addr: ":8082",
}, "/metrics")),
gmqtt.WithLogger(l),
)
s.Run()
signalCh := make(chan os.Signal, 1)
signal.Notify(signalCh, os.Interrupt, syscall.SIGTERM)
<-signalCh
s.Stop(context.Background())
}
See /examples
for more details.
Gmqtt implements the following hooks:
- OnAccept (Only for tcp/ssl, not for ws/wss)
- OnConnect
- OnConnected
- OnSessionCreated
- OnSessionResumed
- OnSessionTerminated
- OnSubscribe
- OnSubscribed
- OnUnsubscribed
- OnMsgArrived
- OnAcked
- OnMsgDropped
- OnDeliver
- OnClose
- OnStop
See /examples/hook
for more detail.
Call server.Stop()
to stop the broker gracefully:
- Close all open TCP listeners and shutting down all open websocket servers
- Close all idle connections
- Wait for all connections have been closed
- Trigger OnStop().
$ go test -race . && go test -race pkg/packets
$ cd pkg/packets
$ go test -race .
Pass paho.mqtt.testing.
- Support MQTT V3 and V5.
- Support bridge mode and maybe cluster.
Breaking changes may occur when adding this new features.