Skip to content

Commit

Permalink
fix mqtt reconnect subscription issue
Browse files Browse the repository at this point in the history
Signed-off-by: Jeeva Kandasamy <[email protected]>
  • Loading branch information
jkandasa committed May 27, 2022
1 parent be788a0 commit 42c11a7
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 21 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
* `raw` - sends the serial,ethernet messages to mqtt as is
* `serial` to `MQTT`
* `ethernet` to `MQTT`
* `http` to `MQTT`

## Download
### Container images
Expand Down
40 changes: 21 additions & 19 deletions plugin/device/mqtt/device.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func NewDevice(ID string, config cmap.CustomMap, rxFunc func(msg *model.Message)
if err != nil {
return nil, err
}
zap.L().Debug("mqtt config", zap.String("id", ID), zap.Any("config", cfg))
zap.L().Debug("mqtt config", zap.Any("adapterName", ID), zap.Any("config", cfg))

// endpoint
endpoint := &Endpoint{
Expand Down Expand Up @@ -94,11 +94,7 @@ func NewDevice(ID string, config cmap.CustomMap, rxFunc func(msg *model.Message)
// adding client
endpoint.Client = c

err = endpoint.Subscribe(cfg.Subscribe)
if err != nil {
zap.L().Error("error on subscribe a topic", zap.String("topic", cfg.Subscribe), zap.Error(err))
}
zap.L().Debug("mqtt client connected successfully", zap.String("timeTaken", time.Since(start).String()), zap.Any("clientConfig", cfg))
zap.L().Debug("mqtt client connected successfully", zap.Any("adapterName", ID), zap.String("timeTaken", time.Since(start).String()), zap.Any("clientConfig", cfg))
return endpoint, nil
}

Expand All @@ -108,6 +104,12 @@ func (ep *Endpoint) Name() string {

func (ep *Endpoint) onConnectionHandler(c paho.Client) {
zap.L().Debug("mqtt connection success", zap.Any("adapterName", ep.ID))

err := ep.Subscribe(ep.Config.Subscribe)
if err != nil {
zap.L().Error("error on subscribe topics", zap.Any("adapterName", ep.ID), zap.String("topics", ep.Config.Subscribe), zap.Error(err))
}

ep.statusFunc(&model.State{
Status: model.StatusUP,
Message: "",
Expand All @@ -116,7 +118,7 @@ func (ep *Endpoint) onConnectionHandler(c paho.Client) {
}

func (ep *Endpoint) onConnectionLostHandler(c paho.Client, err error) {
zap.L().Error("mqtt connection lost", zap.Any("id", ep.ID), zap.Error(err))
zap.L().Error("mqtt connection lost", zap.Any("adapterName", ep.ID), zap.Error(err))
// Report connection lost
if err != nil {
ep.statusFunc(&model.State{
Expand All @@ -132,7 +134,7 @@ func (ep *Endpoint) Write(message *model.Message) error {
if message == nil {
return nil
}
zap.L().Debug("about to send a message", zap.String("message", message.ToString()))
zap.L().Debug("about to send a message", zap.Any("adapterName", ep.ID), zap.String("message", message.ToString()))
topic := message.Others.GetString(model.KeyMqttTopic)
qos := byte(ep.Config.QoS)

Expand Down Expand Up @@ -171,18 +173,18 @@ func (ep *Endpoint) getCallBack() func(paho.Client, paho.Message) {
}

// Subscribe a topic
func (ep *Endpoint) Subscribe(topic string) error {
if topic == "" {
func (ep *Endpoint) Subscribe(topicsStr string) error {
if topicsStr == "" {
return nil
}
token := ep.Client.Subscribe(topic, 0, ep.getCallBack())
token.WaitTimeout(3 * time.Second)
if token.Error() != nil {
ep.statusFunc(&model.State{
Status: model.StatusError,
Message: token.Error().Error(),
Since: time.Now(),
})
topics := strings.Split(topicsStr, ",")
for _, topic := range topics {
topic = strings.TrimSpace(topic)
token := ep.Client.Subscribe(topic, 0, ep.getCallBack())
token.WaitTimeout(3 * time.Second)
if token.Error() != nil {
return token.Error()
}
}
return token.Error()
return nil
}
4 changes: 2 additions & 2 deletions plugin/device/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package plugin
import (
"fmt"

"github.com/mycontroller-org/2mqtt/pkg/types"
model "github.com/mycontroller-org/2mqtt/pkg/types"
deviceType "github.com/mycontroller-org/2mqtt/plugin/device/types"
"github.com/mycontroller-org/server/v2/pkg/types/cmap"
"go.uber.org/zap"
Expand All @@ -25,7 +25,7 @@ func Register(name string, fn CreatorFn) {

func Create(name, ID string, config cmap.CustomMap, rxFunc func(msg *model.Message), statusFunc func(state *model.State)) (p deviceType.Plugin, err error) {
if fn, ok := creators[name]; ok {
p, err = fn("", config, rxFunc, statusFunc)
p, err = fn(ID, config, rxFunc, statusFunc)
} else {
err = fmt.Errorf("device plugin [%s] is not registered", name)
}
Expand Down

0 comments on commit 42c11a7

Please sign in to comment.