Skip to content

Commit

Permalink
refacto: use qos 1
Browse files Browse the repository at this point in the history
  • Loading branch information
kpetremann committed Jun 25, 2023
1 parent 29cfc1c commit 38d2135
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 6 deletions.
2 changes: 0 additions & 2 deletions cmd/dht2mqtt/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,10 @@ func main() {
log.Info().Msg("connected to sensor")

// Connect to MQTT server
log.Info().Msgf("connecting to MQTT server")
publisher := mqtt.NewPublisher(cfg.MQTTUrl, cfg.MQTTTopicRoot, cfg.MQTTUsername, cfg.MQTTPassword)
if err := publisher.Connect(); err != nil {
log.Fatal().Err(err).Msg("publisher init error")
}
log.Info().Msgf("connected to MQTT server")

// Watch for metrics
ch := make(chan sensor.Payload, 10)
Expand Down
26 changes: 22 additions & 4 deletions internal/mqtt/mqtt.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package mqtt
import (
"encoding/json"
"errors"
"fmt"
"time"

"github.com/rs/zerolog/log"
Expand All @@ -16,7 +17,7 @@ const mqttTimeout = 5

type Publisher struct {
client mqtt.Client
mqttUrl string // example: "tcp://10.2.0.166:1883"
mqttUrl string
username string
password string
topicRoot string
Expand All @@ -35,6 +36,14 @@ func NewPublisher(mqttUrl, topicRoot, username, password string) Publisher {
}
}

var connectHandler mqtt.OnConnectHandler = func(client mqtt.Client) {
log.Info().Msg("connected to MQTT")
}

var connectLostHandler mqtt.ConnectionLostHandler = func(client mqtt.Client, err error) {
log.Error().Err(err).Msg("connecttion to MQTT lost")
}

// Connect connects to the MQTT server;
//
// The Publisher must have been instantiated first using NewPublisher.
Expand All @@ -47,10 +56,12 @@ func (p *Publisher) Connect() error {
opts.SetPassword(p.password)
}
opts.SetAutoReconnect(true)
opts.SetOnConnectHandler(connectHandler)
opts.SetConnectionLostHandler(connectLostHandler)

p.client = mqtt.NewClient(opts)

if t := p.client.Connect(); t.Wait() && t.Error() != nil {
if t := p.client.Connect(); t.WaitTimeout(10*time.Second) && t.Error() != nil {
log.Fatal().Err(t.Error()).Send()
}

Expand All @@ -73,10 +84,17 @@ func (p *Publisher) Publish(sensorName string, payload interface{}) error {
}

log.Debug().Msgf("publishing new MQTT message: '%s %s'", p.topicRoot+sensorName, data)
t := p.client.Publish(p.topicRoot+sensorName, 0, false, data)
if ok := t.Wait(); !ok {
t := p.client.Publish(p.topicRoot+sensorName, 1, false, data)
ack := t.WaitTimeout(10 * time.Second)

if !ack {
return errors.New("MQTT server did not confirm receiving the message")
}
if t.Error() != nil {
return fmt.Errorf("MQTT server publish error: %w", t.Error())
}

log.Debug().Msgf("published: '%s %s'", p.topicRoot+sensorName, data)

return nil
}

0 comments on commit 38d2135

Please sign in to comment.