-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmqtt_publisher.go
123 lines (103 loc) · 2.91 KB
/
mqtt_publisher.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
package buttonoff
import (
"context"
"net/url"
"time"
mqtt "github.com/eclipse/paho.mqtt.golang"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)
const (
mqttClientID = "buttonoff"
mqttPublishTimeout = time.Second * 5
mqttConnectTimeout = time.Second * 30
mqttMaxReconnectInterval = time.Minute * 5
mqttDisconnectAllowanceMS = 150
)
var (
MQTTPublishTimeoutErr = errors.Errorf("MQTT Publish timeout after %s", mqttPublishTimeout)
MQTTConnectTimeoutErr = errors.Errorf("MQTT Connect timeout after %s", mqttConnectTimeout)
MQTTDisconnectedErr = errors.New("Disconnected from MQTT Broker")
)
type mqttPublisher interface {
Publish(topic string, qos byte, retained bool, payload interface{}) mqtt.Token
IsConnected() bool
Disconnect(waitMS uint)
}
type MQTTPublisher struct {
log logrus.FieldLogger
mqtt mqttPublisher
}
func NewMQTTPublisher(conf MQTTConfig) (*MQTTPublisher, error) {
logger := appLogger.WithField("comp", "mqtt-publisher")
options, err := clientOptionsFromConfig(conf)
if err != nil {
return nil, err
}
client := mqtt.NewClient(options)
connectToken := client.Connect()
logger.Debug("Connecting to MQTT broker")
complete := connectToken.WaitTimeout(mqttConnectTimeout)
if !complete {
return nil, MQTTConnectTimeoutErr
}
connectErr := connectToken.Error()
if connectErr != nil {
return nil, connectErr
}
logger.Debug("Connected to MQTT broker")
pub := &MQTTPublisher{
log: logger,
mqtt: client,
}
return pub, nil
}
func (mp *MQTTPublisher) Run(ctx context.Context) error {
select {
case <-ctx.Done():
mp.log.Info("Shutting down")
return mp.shutdown()
}
}
func (mp *MQTTPublisher) Close() error {
return mp.shutdown()
}
func (mp *MQTTPublisher) shutdown() error {
if mp.mqtt.IsConnected() {
mp.log.Debug("Disconnecting broker connection")
mp.mqtt.Disconnect(mqttDisconnectAllowanceMS)
mp.log.Debug("Disconnect request completed")
} else {
mp.log.Warn("Cannot disconnect, already disconnected")
}
return nil
}
func (mp *MQTTPublisher) Publish(msg Message) error {
if !mp.mqtt.IsConnected() {
return MQTTDisconnectedErr
}
token := mp.mqtt.Publish(msg.Topic, 0, false, msg.Payload)
complete := token.WaitTimeout(mqttPublishTimeout)
if !complete {
return MQTTPublishTimeoutErr
}
publishErr := token.Error()
if publishErr != nil {
mp.log.Error(errors.Wrapf(publishErr, "could not publish message to %s", msg.Topic))
}
return publishErr
}
func clientOptionsFromConfig(conf MQTTConfig) (*mqtt.ClientOptions, error) {
opts := mqtt.NewClientOptions().
SetAutoReconnect(true).
SetClientID(mqttClientID).
SetMaxReconnectInterval(mqttMaxReconnectInterval)
opts.SetUsername(conf.Username)
opts.SetPassword(conf.Password)
_, parseErr := url.Parse(conf.BrokerAddr)
if parseErr != nil {
return nil, errors.Wrap(parseErr, "BrokerAddr must be in the form tcp://127.0.0.1:1883")
}
opts.AddBroker(conf.BrokerAddr)
return opts, nil
}