Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix persistent session in mqtt_consumer. #6236

Merged
merged 1 commit into from
Aug 15, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 21 additions & 18 deletions plugins/inputs/mqtt_consumer/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,20 @@
The [MQTT][mqtt] consumer plugin reads from the specified MQTT topics
and creates metrics using one of the supported [input data formats][].

### Configuration:
### Configuration

```toml
[[inputs.mqtt_consumer]]
## MQTT broker URLs to be used. The format should be scheme://host:port,
## schema can be tcp, ssl, or ws.
servers = ["tcp://localhost:1883"]
servers = ["tcp://127.0.0.1:1883"]

## Topics that will be subscribed to.
topics = [
"telegraf/host01/cpu",
"telegraf/+/mem",
"sensors/#",
]

## QoS policy for messages
## 0 = at most once
Expand All @@ -18,10 +25,10 @@ and creates metrics using one of the supported [input data formats][].
##
## When using a QoS of 1 or 2, you should enable persistent_session to allow
## resuming unacknowledged messages.
qos = 0
# qos = 0

## Connection timeout for initial connection in seconds
connection_timeout = "30s"
# connection_timeout = "30s"

## Maximum messages to read from the broker that have not been written by an
## output. For best throughput set based on the number of metrics within
Expand All @@ -33,21 +40,17 @@ and creates metrics using one of the supported [input data formats][].
## waiting until the next flush_interval.
# max_undelivered_messages = 1000

## Topics to subscribe to
topics = [
"telegraf/host01/cpu",
"telegraf/+/mem",
"sensors/#",
]
## Persistent session disables clearing of the client session on connection.
## In order for this option to work you must also set client_id to identity
## the client. To receive messages that arrived while the client is offline,
## also set the qos option to 1 or 2 and don't forget to also set the QoS when
## publishing.
# persistent_session = false

# if true, messages that can't be delivered while the subscriber is offline
# will be delivered when it comes back (such as on service restart).
# NOTE: if true, client_id MUST be set
persistent_session = false
# If empty, a random client ID will be generated.
client_id = ""
## If unset, a random client ID will be generated.
# client_id = ""

## username and password to connect MQTT server.
## Username and password to connect MQTT server.
# username = "telegraf"
# password = "metricsmetricsmetricsmetrics"

Expand All @@ -65,7 +68,7 @@ and creates metrics using one of the supported [input data formats][].
data_format = "influx"
```

### Tags:
### Metrics

- All measurements are tagged with the incoming topic, ie
`topic=telegraf/host01/cpu`
Expand Down
141 changes: 92 additions & 49 deletions plugins/inputs/mqtt_consumer/mqtt_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,15 @@ const (
Connected
)

type Client interface {
Connect() mqtt.Token
SubscribeMultiple(filters map[string]byte, callback mqtt.MessageHandler) mqtt.Token
AddRoute(topic string, callback mqtt.MessageHandler)
Disconnect(quiesce uint)
}

type ClientFactory func(o *mqtt.ClientOptions) Client

type MQTTConsumer struct {
Servers []string
Topics []string
Expand All @@ -51,12 +60,13 @@ type MQTTConsumer struct {
ClientID string `toml:"client_id"`
tls.ClientConfig

client mqtt.Client
acc telegraf.TrackingAccumulator
state ConnectionState
subscribed bool
sem semaphore
messages map[telegraf.TrackingID]bool
clientFactory ClientFactory
client Client
opts *mqtt.ClientOptions
acc telegraf.TrackingAccumulator
state ConnectionState
sem semaphore
messages map[telegraf.TrackingID]bool

ctx context.Context
cancel context.CancelFunc
Expand All @@ -65,7 +75,14 @@ type MQTTConsumer struct {
var sampleConfig = `
## MQTT broker URLs to be used. The format should be scheme://host:port,
## schema can be tcp, ssl, or ws.
servers = ["tcp://localhost:1883"]
servers = ["tcp://127.0.0.1:1883"]

## Topics that will be subscribed to.
topics = [
"telegraf/host01/cpu",
"telegraf/+/mem",
"sensors/#",
]

## QoS policy for messages
## 0 = at most once
Expand All @@ -74,10 +91,10 @@ var sampleConfig = `
##
## When using a QoS of 1 or 2, you should enable persistent_session to allow
## resuming unacknowledged messages.
qos = 0
# qos = 0

## Connection timeout for initial connection in seconds
connection_timeout = "30s"
# connection_timeout = "30s"

## Maximum messages to read from the broker that have not been written by an
## output. For best throughput set based on the number of metrics within
Expand All @@ -89,21 +106,17 @@ var sampleConfig = `
## waiting until the next flush_interval.
# max_undelivered_messages = 1000

## Topics to subscribe to
topics = [
"telegraf/host01/cpu",
"telegraf/+/mem",
"sensors/#",
]
## Persistent session disables clearing of the client session on connection.
## In order for this option to work you must also set client_id to identity
## the client. To receive messages that arrived while the client is offline,
## also set the qos option to 1 or 2 and don't forget to also set the QoS when
## publishing.
# persistent_session = false

# if true, messages that can't be delivered while the subscriber is offline
# will be delivered when it comes back (such as on service restart).
# NOTE: if true, client_id MUST be set
persistent_session = false
# If empty, a random client ID will be generated.
client_id = ""
## If unset, a random client ID will be generated.
# client_id = ""

## username and password to connect MQTT server.
## Username and password to connect MQTT server.
# username = "telegraf"
# password = "metricsmetricsmetricsmetrics"

Expand Down Expand Up @@ -133,7 +146,7 @@ func (m *MQTTConsumer) SetParser(parser parsers.Parser) {
m.parser = parser
}

func (m *MQTTConsumer) Start(acc telegraf.Accumulator) error {
func (m *MQTTConsumer) Init() error {
m.state = Disconnected

if m.PersistentSession && m.ClientID == "" {
Expand All @@ -148,23 +161,41 @@ func (m *MQTTConsumer) Start(acc telegraf.Accumulator) error {
return fmt.Errorf("connection_timeout must be greater than 1s: %s", m.ConnectionTimeout.Duration)
}

m.acc = acc.WithTracking(m.MaxUndeliveredMessages)
m.ctx, m.cancel = context.WithCancel(context.Background())

opts, err := m.createOpts()
if err != nil {
return err
}

m.client = mqtt.NewClient(opts)
m.opts = opts

return nil
}

func (m *MQTTConsumer) Start(acc telegraf.Accumulator) error {
m.state = Disconnected

m.acc = acc.WithTracking(m.MaxUndeliveredMessages)
m.ctx, m.cancel = context.WithCancel(context.Background())

m.client = m.clientFactory(m.opts)

// AddRoute sets up the function for handling messages. These need to be
// added in case we find a persistent session containing subscriptions so we
// know where to dispatch presisted and new messages to. In the alternate
// case that we need to create the subscriptions these will be replaced.
for _, topic := range m.Topics {
m.client.AddRoute(topic, m.recvMessage)
}

m.state = Connecting
m.connect()

return nil
}

func (m *MQTTConsumer) connect() error {
if token := m.client.Connect(); token.Wait() && token.Error() != nil {
token := m.client.Connect()
if token.Wait() && token.Error() != nil {
err := token.Error()
m.state = Disconnected
return err
Expand All @@ -175,22 +206,26 @@ func (m *MQTTConsumer) connect() error {
m.sem = make(semaphore, m.MaxUndeliveredMessages)
m.messages = make(map[telegraf.TrackingID]bool)

// Only subscribe on first connection when using persistent sessions. On
// subsequent connections the subscriptions should be stored in the
// session, but the proper way to do this is to check the connection
// response to ensure a session was found.
if !m.PersistentSession || !m.subscribed {
topics := make(map[string]byte)
for _, topic := range m.Topics {
topics[topic] = byte(m.QoS)
}
subscribeToken := m.client.SubscribeMultiple(topics, m.recvMessage)
subscribeToken.Wait()
if subscribeToken.Error() != nil {
m.acc.AddError(fmt.Errorf("subscription error: topics: %s: %v",
strings.Join(m.Topics[:], ","), subscribeToken.Error()))
}
m.subscribed = true
// Presistent sessions should skip subscription if a session is present, as
// the subscriptions are stored by the server.
type sessionPresent interface {
SessionPresent() bool
}
if t, ok := token.(sessionPresent); ok && t.SessionPresent() {
log.Printf("D! [inputs.mqtt_consumer] Session found %v", m.Servers)
return nil
}

topics := make(map[string]byte)
for _, topic := range m.Topics {
topics[topic] = byte(m.QoS)
}

subscribeToken := m.client.SubscribeMultiple(topics, m.recvMessage)
subscribeToken.Wait()
if subscribeToken.Error() != nil {
m.acc.AddError(fmt.Errorf("subscription error: topics: %s: %v",
strings.Join(m.Topics[:], ","), subscribeToken.Error()))
}

return nil
Expand Down Expand Up @@ -316,12 +351,20 @@ func (m *MQTTConsumer) createOpts() (*mqtt.ClientOptions, error) {
return opts, nil
}

func New(factory ClientFactory) *MQTTConsumer {
return &MQTTConsumer{
Servers: []string{"tcp://127.0.0.1:1883"},
ConnectionTimeout: defaultConnectionTimeout,
MaxUndeliveredMessages: defaultMaxUndeliveredMessages,
clientFactory: factory,
state: Disconnected,
}
}

func init() {
inputs.Add("mqtt_consumer", func() telegraf.Input {
return &MQTTConsumer{
ConnectionTimeout: defaultConnectionTimeout,
MaxUndeliveredMessages: defaultMaxUndeliveredMessages,
state: Disconnected,
}
return New(func(o *mqtt.ClientOptions) Client {
return mqtt.NewClient(o)
})
})
}
Loading