From e39208d60a99a44273c19253e94bd869d3e00402 Mon Sep 17 00:00:00 2001 From: Steven Soroka Date: Wed, 16 Dec 2020 15:38:33 -0500 Subject: [PATCH] fix issue with mqtt concurrent map write (#8562) --- plugins/inputs/mqtt_consumer/mqtt_consumer.go | 10 +++++++++- plugins/inputs/mqtt_consumer/mqtt_consumer_test.go | 2 +- 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/plugins/inputs/mqtt_consumer/mqtt_consumer.go b/plugins/inputs/mqtt_consumer/mqtt_consumer.go index 26122b8e86b88..73d41a32f0f9e 100644 --- a/plugins/inputs/mqtt_consumer/mqtt_consumer.go +++ b/plugins/inputs/mqtt_consumer/mqtt_consumer.go @@ -5,9 +5,10 @@ import ( "errors" "fmt" "strings" + "sync" "time" - "github.com/eclipse/paho.mqtt.golang" + mqtt "github.com/eclipse/paho.mqtt.golang" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/plugins/common/tls" @@ -69,6 +70,7 @@ type MQTTConsumer struct { state ConnectionState sem semaphore messages map[telegraf.TrackingID]bool + messagesMutex sync.Mutex topicTag string ctx context.Context @@ -219,7 +221,9 @@ func (m *MQTTConsumer) connect() error { m.Log.Infof("Connected %v", m.Servers) m.state = Connected + m.messagesMutex.Lock() m.messages = make(map[telegraf.TrackingID]bool) + m.messagesMutex.Unlock() // Persistent sessions should skip subscription if a session is present, as // the subscriptions are stored by the server. @@ -258,6 +262,7 @@ func (m *MQTTConsumer) recvMessage(c mqtt.Client, msg mqtt.Message) { select { case track := <-m.acc.Delivered(): <-m.sem + m.messagesMutex.Lock() _, ok := m.messages[track.ID()] if !ok { // Added by a previous connection @@ -265,6 +270,7 @@ func (m *MQTTConsumer) recvMessage(c mqtt.Client, msg mqtt.Message) { } // No ack, MQTT does not support durable handling delete(m.messages, track.ID()) + m.messagesMutex.Unlock() case m.sem <- empty{}: err := m.onMessage(m.acc, msg) if err != nil { @@ -290,7 +296,9 @@ func (m *MQTTConsumer) onMessage(acc telegraf.TrackingAccumulator, msg mqtt.Mess } id := acc.AddTrackingMetricGroup(metrics) + m.messagesMutex.Lock() m.messages[id] = true + m.messagesMutex.Unlock() return nil } diff --git a/plugins/inputs/mqtt_consumer/mqtt_consumer_test.go b/plugins/inputs/mqtt_consumer/mqtt_consumer_test.go index 4884fc0508107..2d9db2c23872a 100644 --- a/plugins/inputs/mqtt_consumer/mqtt_consumer_test.go +++ b/plugins/inputs/mqtt_consumer/mqtt_consumer_test.go @@ -4,7 +4,7 @@ import ( "testing" "time" - "github.com/eclipse/paho.mqtt.golang" + mqtt "github.com/eclipse/paho.mqtt.golang" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/plugins/parsers" "github.com/influxdata/telegraf/testutil"