From a59b57340400e5ae189510a864b16dc4f0c6bd44 Mon Sep 17 00:00:00 2001 From: Daniel Nelson Date: Wed, 10 Oct 2018 14:04:45 -0700 Subject: [PATCH 1/3] Fix race condition in mqtt_consumer connection --- plugins/inputs/mqtt_consumer/mqtt_consumer.go | 103 +++++++---------- .../mqtt_consumer/mqtt_consumer_test.go | 106 +++--------------- 2 files changed, 56 insertions(+), 153 deletions(-) diff --git a/plugins/inputs/mqtt_consumer/mqtt_consumer.go b/plugins/inputs/mqtt_consumer/mqtt_consumer.go index 5853ad939a2c6..af5054f062a9e 100644 --- a/plugins/inputs/mqtt_consumer/mqtt_consumer.go +++ b/plugins/inputs/mqtt_consumer/mqtt_consumer.go @@ -4,7 +4,6 @@ import ( "fmt" "log" "strings" - "sync" "time" "github.com/influxdata/telegraf" @@ -19,6 +18,14 @@ import ( // 30 Seconds is the default used by paho.mqtt.golang var defaultConnectionTimeout = internal.Duration{Duration: 30 * time.Second} +type ConnectionState int + +const ( + Disconnected ConnectionState = iota + Connecting + Connected +) + type MQTTConsumer struct { Servers []string Topics []string @@ -36,16 +43,9 @@ type MQTTConsumer struct { ClientID string `toml:"client_id"` tls.ClientConfig - sync.Mutex client mqtt.Client - // channel of all incoming raw mqtt messages - in chan mqtt.Message - done chan struct{} - - // keep the accumulator internally: - acc telegraf.Accumulator - - connected bool + acc telegraf.Accumulator + state ConnectionState } var sampleConfig = ` @@ -110,9 +110,7 @@ func (m *MQTTConsumer) SetParser(parser parsers.Parser) { } func (m *MQTTConsumer) Start(acc telegraf.Accumulator) error { - m.Lock() - defer m.Unlock() - m.connected = false + m.state = Disconnected if m.PersistentSession && m.ClientID == "" { return fmt.Errorf("ERROR MQTT Consumer: When using persistent_session" + @@ -134,9 +132,7 @@ func (m *MQTTConsumer) Start(acc telegraf.Accumulator) error { } m.client = mqtt.NewClient(opts) - m.in = make(chan mqtt.Message, 1000) - m.done = make(chan struct{}) - + m.state = Connecting m.connect() return nil @@ -145,80 +141,65 @@ func (m *MQTTConsumer) Start(acc telegraf.Accumulator) error { func (m *MQTTConsumer) connect() error { if token := m.client.Connect(); token.Wait() && token.Error() != nil { err := token.Error() - log.Printf("D! MQTT Consumer, connection error - %v", err) - + log.Printf("E! MQTT Consumer, connection error - %v", err) + m.state = Disconnected return err } - go m.receiver() + log.Printf("I! MQTT Client Connected") - return nil -} + m.state = Connected -func (m *MQTTConsumer) onConnect(c mqtt.Client) { - log.Printf("I! MQTT Client Connected") - if !m.PersistentSession || !m.connected { + if !m.PersistentSession { topics := make(map[string]byte) for _, topic := range m.Topics { topics[topic] = byte(m.QoS) } - subscribeToken := c.SubscribeMultiple(topics, m.recvMessage) + subscribeToken := m.client.SubscribeMultiple(topics, m.recvMessage) subscribeToken.Wait() if subscribeToken.Error() != nil { m.acc.AddError(fmt.Errorf("E! MQTT Subscribe Error\ntopics: %s\nerror: %s", strings.Join(m.Topics[:], ","), subscribeToken.Error())) } - m.connected = true } - return + + return nil } func (m *MQTTConsumer) onConnectionLost(c mqtt.Client, err error) { - m.acc.AddError(fmt.Errorf("E! MQTT Connection lost\nerror: %s\nMQTT Client will try to reconnect", err.Error())) + m.state = Disconnected + m.acc.AddError(fmt.Errorf("E! MQTT Connection lost: %v\n", err)) return } -// receiver() reads all incoming messages from the consumer, and parses them into -// influxdb metric points. -func (m *MQTTConsumer) receiver() { - for { - select { - case <-m.done: - return - case msg := <-m.in: - topic := msg.Topic() - metrics, err := m.parser.Parse(msg.Payload()) - if err != nil { - m.acc.AddError(fmt.Errorf("E! MQTT Parse Error\nmessage: %s\nerror: %s", - string(msg.Payload()), err.Error())) - } - - for _, metric := range metrics { - tags := metric.Tags() - tags["topic"] = topic - m.acc.AddFields(metric.Name(), metric.Fields(), tags, metric.Time()) - } - } +func (m *MQTTConsumer) recvMessage(c mqtt.Client, msg mqtt.Message) { + topic := msg.Topic() + metrics, err := m.parser.Parse(msg.Payload()) + if err != nil { + m.acc.AddError(fmt.Errorf("E! MQTT Parse Error\nmessage: %s\nerror: %s", + string(msg.Payload()), err.Error())) } -} -func (m *MQTTConsumer) recvMessage(_ mqtt.Client, msg mqtt.Message) { - m.in <- msg + for _, metric := range metrics { + tags := metric.Tags() + tags["topic"] = topic + m.acc.AddFields(metric.Name(), metric.Fields(), tags, metric.Time()) + } } func (m *MQTTConsumer) Stop() { - m.Lock() - defer m.Unlock() - - if m.connected { - close(m.done) + if m.state == Connected { + log.Printf("I! MQTT Client Disconnecting") m.client.Disconnect(200) - m.connected = false + log.Printf("I! MQTT Client Disconnected") + m.state = Disconnected } } func (m *MQTTConsumer) Gather(acc telegraf.Accumulator) error { - if !m.connected { + if m.state == Disconnected { + m.state = Connecting + log.Printf("I! MQTT Client Reconnecting") m.connect() } @@ -271,10 +252,9 @@ func (m *MQTTConsumer) createOpts() (*mqtt.ClientOptions, error) { opts.AddBroker(server) } - opts.SetAutoReconnect(true) + opts.SetAutoReconnect(false) opts.SetKeepAlive(time.Second * 60) opts.SetCleanSession(!m.PersistentSession) - opts.SetOnConnectHandler(m.onConnect) opts.SetConnectionLostHandler(m.onConnectionLost) return opts, nil @@ -284,6 +264,7 @@ func init() { inputs.Add("mqtt_consumer", func() telegraf.Input { return &MQTTConsumer{ ConnectionTimeout: defaultConnectionTimeout, + state: Disconnected, } }) } diff --git a/plugins/inputs/mqtt_consumer/mqtt_consumer_test.go b/plugins/inputs/mqtt_consumer/mqtt_consumer_test.go index a2e5deaa8a94a..c04bd18a73012 100644 --- a/plugins/inputs/mqtt_consumer/mqtt_consumer_test.go +++ b/plugins/inputs/mqtt_consumer/mqtt_consumer_test.go @@ -12,24 +12,17 @@ import ( ) const ( - testMsg = "cpu_load_short,host=server01 value=23422.0 1422568543702900257\n" - testMsgNeg = "cpu_load_short,host=server01 value=-23422.0 1422568543702900257\n" - testMsgGraphite = "cpu.load.short.graphite 23422 1454780029" - testMsgJSON = "{\"a\": 5, \"b\": {\"c\": 6}}\n" - invalidMsg = "cpu_load_short,host=server01 1422568543702900257\n" + testMsg = "cpu_load_short,host=server01 value=23422.0 1422568543702900257\n" + invalidMsg = "cpu_load_short,host=server01 1422568543702900257\n" ) -func newTestMQTTConsumer() (*MQTTConsumer, chan mqtt.Message) { - in := make(chan mqtt.Message, 100) +func newTestMQTTConsumer() *MQTTConsumer { n := &MQTTConsumer{ - Topics: []string{"telegraf"}, - Servers: []string{"localhost:1883"}, - in: in, - done: make(chan struct{}), - connected: true, + Topics: []string{"telegraf"}, + Servers: []string{"localhost:1883"}, } - return n, in + return n } // Test that default client has random ID @@ -79,31 +72,12 @@ func TestPersistentClientIDFail(t *testing.T) { } func TestRunParser(t *testing.T) { - n, in := newTestMQTTConsumer() + n := newTestMQTTConsumer() acc := testutil.Accumulator{} n.acc = &acc - defer close(n.done) - n.parser, _ = parsers.NewInfluxParser() - go n.receiver() - in <- mqttMsg(testMsgNeg) - acc.Wait(1) - - if a := acc.NFields(); a != 1 { - t.Errorf("got %v, expected %v", a, 1) - } -} -func TestRunParserNegativeNumber(t *testing.T) { - n, in := newTestMQTTConsumer() - acc := testutil.Accumulator{} - n.acc = &acc - defer close(n.done) - - n.parser, _ = parsers.NewInfluxParser() - go n.receiver() - in <- mqttMsg(testMsg) - acc.Wait(1) + n.recvMessage(nil, mqttMsg(testMsg)) if a := acc.NFields(); a != 1 { t.Errorf("got %v, expected %v", a, 1) @@ -112,84 +86,32 @@ func TestRunParserNegativeNumber(t *testing.T) { // Test that the parser ignores invalid messages func TestRunParserInvalidMsg(t *testing.T) { - n, in := newTestMQTTConsumer() + n := newTestMQTTConsumer() acc := testutil.Accumulator{} n.acc = &acc - defer close(n.done) - n.parser, _ = parsers.NewInfluxParser() - go n.receiver() - in <- mqttMsg(invalidMsg) - acc.WaitError(1) + + n.recvMessage(nil, mqttMsg(invalidMsg)) if a := acc.NFields(); a != 0 { t.Errorf("got %v, expected %v", a, 0) } - assert.Contains(t, acc.Errors[0].Error(), "MQTT Parse Error") + assert.Len(t, acc.Errors, 1) } // Test that the parser parses line format messages into metrics func TestRunParserAndGather(t *testing.T) { - n, in := newTestMQTTConsumer() + n := newTestMQTTConsumer() acc := testutil.Accumulator{} n.acc = &acc - - defer close(n.done) - n.parser, _ = parsers.NewInfluxParser() - go n.receiver() - in <- mqttMsg(testMsg) - acc.Wait(1) - n.Gather(&acc) + n.recvMessage(nil, mqttMsg(testMsg)) acc.AssertContainsFields(t, "cpu_load_short", map[string]interface{}{"value": float64(23422)}) } -// Test that the parser parses graphite format messages into metrics -func TestRunParserAndGatherGraphite(t *testing.T) { - n, in := newTestMQTTConsumer() - acc := testutil.Accumulator{} - n.acc = &acc - defer close(n.done) - - n.parser, _ = parsers.NewGraphiteParser("_", []string{}, nil) - go n.receiver() - in <- mqttMsg(testMsgGraphite) - - n.Gather(&acc) - acc.Wait(1) - - acc.AssertContainsFields(t, "cpu_load_short_graphite", - map[string]interface{}{"value": float64(23422)}) -} - -// Test that the parser parses json format messages into metrics -func TestRunParserAndGatherJSON(t *testing.T) { - n, in := newTestMQTTConsumer() - acc := testutil.Accumulator{} - n.acc = &acc - defer close(n.done) - - n.parser, _ = parsers.NewParser(&parsers.Config{ - DataFormat: "json", - MetricName: "nats_json_test", - }) - go n.receiver() - in <- mqttMsg(testMsgJSON) - - n.Gather(&acc) - - acc.Wait(1) - - acc.AssertContainsFields(t, "nats_json_test", - map[string]interface{}{ - "a": float64(5), - "b_c": float64(6), - }) -} - func mqttMsg(val string) mqtt.Message { return &message{ topic: "telegraf/unit_test", From dcf4fb4ba53da4be36ad97858d6bc00396297205 Mon Sep 17 00:00:00 2001 From: Daniel Nelson Date: Wed, 10 Oct 2018 14:19:43 -0700 Subject: [PATCH 2/3] Update mqtt_consumer log message style --- plugins/inputs/mqtt_consumer/mqtt_consumer.go | 28 +++++++++---------- 1 file changed, 13 insertions(+), 15 deletions(-) diff --git a/plugins/inputs/mqtt_consumer/mqtt_consumer.go b/plugins/inputs/mqtt_consumer/mqtt_consumer.go index af5054f062a9e..87f2a26086ae2 100644 --- a/plugins/inputs/mqtt_consumer/mqtt_consumer.go +++ b/plugins/inputs/mqtt_consumer/mqtt_consumer.go @@ -1,6 +1,7 @@ package mqtt_consumer import ( + "errors" "fmt" "log" "strings" @@ -113,17 +114,16 @@ func (m *MQTTConsumer) Start(acc telegraf.Accumulator) error { m.state = Disconnected if m.PersistentSession && m.ClientID == "" { - return fmt.Errorf("ERROR MQTT Consumer: When using persistent_session" + - " = true, you MUST also set client_id") + return errors.New("persistent_session requires client_id") } m.acc = acc if m.QoS > 2 || m.QoS < 0 { - return fmt.Errorf("MQTT Consumer, invalid QoS value: %d", m.QoS) + return fmt.Errorf("qos value must be 0, 1, or 2: %d", m.QoS) } if m.ConnectionTimeout.Duration < 1*time.Second { - return fmt.Errorf("MQTT Consumer, invalid connection_timeout value: %s", m.ConnectionTimeout.Duration) + return fmt.Errorf("connection_timeout must be greater than 1s: %s", m.ConnectionTimeout.Duration) } opts, err := m.createOpts() @@ -141,13 +141,11 @@ func (m *MQTTConsumer) Start(acc telegraf.Accumulator) error { func (m *MQTTConsumer) connect() error { if token := m.client.Connect(); token.Wait() && token.Error() != nil { err := token.Error() - log.Printf("E! MQTT Consumer, connection error - %v", err) m.state = Disconnected return err } - log.Printf("I! MQTT Client Connected") - + log.Printf("I! [inputs.mqtt_consumer]: connected %v", m.Servers) m.state = Connected if !m.PersistentSession { @@ -158,7 +156,7 @@ func (m *MQTTConsumer) connect() error { subscribeToken := m.client.SubscribeMultiple(topics, m.recvMessage) subscribeToken.Wait() if subscribeToken.Error() != nil { - m.acc.AddError(fmt.Errorf("E! MQTT Subscribe Error\ntopics: %s\nerror: %s", + m.acc.AddError(fmt.Errorf("subscription error: topics: %s: %v", strings.Join(m.Topics[:], ","), subscribeToken.Error())) } } @@ -167,8 +165,9 @@ func (m *MQTTConsumer) connect() error { } func (m *MQTTConsumer) onConnectionLost(c mqtt.Client, err error) { + m.acc.AddError(fmt.Errorf("connection lost: %v", err)) + log.Printf("D! [inputs.mqtt_consumer]: disconnected %v", m.Servers) m.state = Disconnected - m.acc.AddError(fmt.Errorf("E! MQTT Connection lost: %v\n", err)) return } @@ -176,8 +175,7 @@ func (m *MQTTConsumer) recvMessage(c mqtt.Client, msg mqtt.Message) { topic := msg.Topic() metrics, err := m.parser.Parse(msg.Payload()) if err != nil { - m.acc.AddError(fmt.Errorf("E! MQTT Parse Error\nmessage: %s\nerror: %s", - string(msg.Payload()), err.Error())) + m.acc.AddError(err) } for _, metric := range metrics { @@ -189,9 +187,9 @@ func (m *MQTTConsumer) recvMessage(c mqtt.Client, msg mqtt.Message) { func (m *MQTTConsumer) Stop() { if m.state == Connected { - log.Printf("I! MQTT Client Disconnecting") + log.Printf("D! [inputs.mqtt_consumer]: disconnecting %v", m.Servers) m.client.Disconnect(200) - log.Printf("I! MQTT Client Disconnected") + log.Printf("D! [inputs.mqtt_consumer]: disconnected %v", m.Servers) m.state = Disconnected } } @@ -199,7 +197,7 @@ func (m *MQTTConsumer) Stop() { func (m *MQTTConsumer) Gather(acc telegraf.Accumulator) error { if m.state == Disconnected { m.state = Connecting - log.Printf("I! MQTT Client Reconnecting") + log.Printf("D! [inputs.mqtt_consumer]: connecting %v", m.Servers) m.connect() } @@ -242,7 +240,7 @@ func (m *MQTTConsumer) createOpts() (*mqtt.ClientOptions, error) { for _, server := range m.Servers { // Preserve support for host:port style servers; deprecated in Telegraf 1.4.4 if !strings.Contains(server, "://") { - log.Printf("W! mqtt_consumer server %q should be updated to use `scheme://host:port` format", server) + log.Printf("W! [inputs.mqtt_consumer] server %q should be updated to use `scheme://host:port` format", server) if tlsCfg == nil { server = "tcp://" + server } else { From 2cc071d4a26c8c36cb355d72fd52e37006097047 Mon Sep 17 00:00:00 2001 From: Daniel Nelson Date: Thu, 11 Oct 2018 01:40:23 -0700 Subject: [PATCH 3/3] Subscribe to topics on first connection with persistent session --- plugins/inputs/mqtt_consumer/mqtt_consumer.go | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/plugins/inputs/mqtt_consumer/mqtt_consumer.go b/plugins/inputs/mqtt_consumer/mqtt_consumer.go index 87f2a26086ae2..0a253b8d82ba6 100644 --- a/plugins/inputs/mqtt_consumer/mqtt_consumer.go +++ b/plugins/inputs/mqtt_consumer/mqtt_consumer.go @@ -44,9 +44,10 @@ type MQTTConsumer struct { ClientID string `toml:"client_id"` tls.ClientConfig - client mqtt.Client - acc telegraf.Accumulator - state ConnectionState + client mqtt.Client + acc telegraf.Accumulator + state ConnectionState + subscribed bool } var sampleConfig = ` @@ -148,7 +149,11 @@ func (m *MQTTConsumer) connect() error { log.Printf("I! [inputs.mqtt_consumer]: connected %v", m.Servers) m.state = Connected - if !m.PersistentSession { + // Only subscribe on first connection when using persistent sessions. On + // subsequent connections the subscriptions should be stored in the + // session, but the proper way to do this is to check the connection + // response to ensure a session was found. + if !m.PersistentSession || !m.subscribed { topics := make(map[string]byte) for _, topic := range m.Topics { topics[topic] = byte(m.QoS) @@ -159,6 +164,7 @@ func (m *MQTTConsumer) connect() error { m.acc.AddError(fmt.Errorf("subscription error: topics: %s: %v", strings.Join(m.Topics[:], ","), subscribeToken.Error())) } + m.subscribed = true } return nil