Skip to content

Commit

Permalink
fix issue with mqtt concurrent map write (#8562)
Browse files Browse the repository at this point in the history
  • Loading branch information
ssoroka authored Dec 16, 2020
1 parent b858eb9 commit e39208d
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 2 deletions.
10 changes: 9 additions & 1 deletion plugins/inputs/mqtt_consumer/mqtt_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@ import (
"errors"
"fmt"
"strings"
"sync"
"time"

"github.com/eclipse/paho.mqtt.golang"
mqtt "github.com/eclipse/paho.mqtt.golang"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/plugins/common/tls"
Expand Down Expand Up @@ -69,6 +70,7 @@ type MQTTConsumer struct {
state ConnectionState
sem semaphore
messages map[telegraf.TrackingID]bool
messagesMutex sync.Mutex
topicTag string

ctx context.Context
Expand Down Expand Up @@ -219,7 +221,9 @@ func (m *MQTTConsumer) connect() error {

m.Log.Infof("Connected %v", m.Servers)
m.state = Connected
m.messagesMutex.Lock()
m.messages = make(map[telegraf.TrackingID]bool)
m.messagesMutex.Unlock()

// Persistent sessions should skip subscription if a session is present, as
// the subscriptions are stored by the server.
Expand Down Expand Up @@ -258,13 +262,15 @@ func (m *MQTTConsumer) recvMessage(c mqtt.Client, msg mqtt.Message) {
select {
case track := <-m.acc.Delivered():
<-m.sem
m.messagesMutex.Lock()
_, ok := m.messages[track.ID()]
if !ok {
// Added by a previous connection
continue
}
// No ack, MQTT does not support durable handling
delete(m.messages, track.ID())
m.messagesMutex.Unlock()
case m.sem <- empty{}:
err := m.onMessage(m.acc, msg)
if err != nil {
Expand All @@ -290,7 +296,9 @@ func (m *MQTTConsumer) onMessage(acc telegraf.TrackingAccumulator, msg mqtt.Mess
}

id := acc.AddTrackingMetricGroup(metrics)
m.messagesMutex.Lock()
m.messages[id] = true
m.messagesMutex.Unlock()
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion plugins/inputs/mqtt_consumer/mqtt_consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"testing"
"time"

"github.com/eclipse/paho.mqtt.golang"
mqtt "github.com/eclipse/paho.mqtt.golang"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/parsers"
"github.com/influxdata/telegraf/testutil"
Expand Down

0 comments on commit e39208d

Please sign in to comment.