diff --git a/README.md b/README.md index 57f3710..73683d4 100644 --- a/README.md +++ b/README.md @@ -12,6 +12,7 @@ * `raw` - sends the serial,ethernet messages to mqtt as is * `serial` to `MQTT` * `ethernet` to `MQTT` + * `http` to `MQTT` ## Download ### Container images diff --git a/plugin/device/mqtt/device.go b/plugin/device/mqtt/device.go index 2eaa9c7..09b4c0a 100644 --- a/plugin/device/mqtt/device.go +++ b/plugin/device/mqtt/device.go @@ -57,7 +57,7 @@ func NewDevice(ID string, config cmap.CustomMap, rxFunc func(msg *model.Message) if err != nil { return nil, err } - zap.L().Debug("mqtt config", zap.String("id", ID), zap.Any("config", cfg)) + zap.L().Debug("mqtt config", zap.Any("adapterName", ID), zap.Any("config", cfg)) // endpoint endpoint := &Endpoint{ @@ -94,11 +94,7 @@ func NewDevice(ID string, config cmap.CustomMap, rxFunc func(msg *model.Message) // adding client endpoint.Client = c - err = endpoint.Subscribe(cfg.Subscribe) - if err != nil { - zap.L().Error("error on subscribe a topic", zap.String("topic", cfg.Subscribe), zap.Error(err)) - } - zap.L().Debug("mqtt client connected successfully", zap.String("timeTaken", time.Since(start).String()), zap.Any("clientConfig", cfg)) + zap.L().Debug("mqtt client connected successfully", zap.Any("adapterName", ID), zap.String("timeTaken", time.Since(start).String()), zap.Any("clientConfig", cfg)) return endpoint, nil } @@ -108,6 +104,12 @@ func (ep *Endpoint) Name() string { func (ep *Endpoint) onConnectionHandler(c paho.Client) { zap.L().Debug("mqtt connection success", zap.Any("adapterName", ep.ID)) + + err := ep.Subscribe(ep.Config.Subscribe) + if err != nil { + zap.L().Error("error on subscribe topics", zap.Any("adapterName", ep.ID), zap.String("topics", ep.Config.Subscribe), zap.Error(err)) + } + ep.statusFunc(&model.State{ Status: model.StatusUP, Message: "", @@ -116,7 +118,7 @@ func (ep *Endpoint) onConnectionHandler(c paho.Client) { } func (ep *Endpoint) onConnectionLostHandler(c paho.Client, err error) { - zap.L().Error("mqtt connection lost", zap.Any("id", ep.ID), zap.Error(err)) + zap.L().Error("mqtt connection lost", zap.Any("adapterName", ep.ID), zap.Error(err)) // Report connection lost if err != nil { ep.statusFunc(&model.State{ @@ -132,7 +134,7 @@ func (ep *Endpoint) Write(message *model.Message) error { if message == nil { return nil } - zap.L().Debug("about to send a message", zap.String("message", message.ToString())) + zap.L().Debug("about to send a message", zap.Any("adapterName", ep.ID), zap.String("message", message.ToString())) topic := message.Others.GetString(model.KeyMqttTopic) qos := byte(ep.Config.QoS) @@ -171,18 +173,18 @@ func (ep *Endpoint) getCallBack() func(paho.Client, paho.Message) { } // Subscribe a topic -func (ep *Endpoint) Subscribe(topic string) error { - if topic == "" { +func (ep *Endpoint) Subscribe(topicsStr string) error { + if topicsStr == "" { return nil } - token := ep.Client.Subscribe(topic, 0, ep.getCallBack()) - token.WaitTimeout(3 * time.Second) - if token.Error() != nil { - ep.statusFunc(&model.State{ - Status: model.StatusError, - Message: token.Error().Error(), - Since: time.Now(), - }) + topics := strings.Split(topicsStr, ",") + for _, topic := range topics { + topic = strings.TrimSpace(topic) + token := ep.Client.Subscribe(topic, 0, ep.getCallBack()) + token.WaitTimeout(3 * time.Second) + if token.Error() != nil { + return token.Error() + } } - return token.Error() + return nil } diff --git a/plugin/device/plugin.go b/plugin/device/plugin.go index c146992..6d335a0 100644 --- a/plugin/device/plugin.go +++ b/plugin/device/plugin.go @@ -3,7 +3,7 @@ package plugin import ( "fmt" - "github.com/mycontroller-org/2mqtt/pkg/types" + model "github.com/mycontroller-org/2mqtt/pkg/types" deviceType "github.com/mycontroller-org/2mqtt/plugin/device/types" "github.com/mycontroller-org/server/v2/pkg/types/cmap" "go.uber.org/zap" @@ -25,7 +25,7 @@ func Register(name string, fn CreatorFn) { func Create(name, ID string, config cmap.CustomMap, rxFunc func(msg *model.Message), statusFunc func(state *model.State)) (p deviceType.Plugin, err error) { if fn, ok := creators[name]; ok { - p, err = fn("", config, rxFunc, statusFunc) + p, err = fn(ID, config, rxFunc, statusFunc) } else { err = fmt.Errorf("device plugin [%s] is not registered", name) }