Skip to content

Commit

Permalink
feat(migrations): Add option migration for inputs.mqtt_consumer (#14233)
Browse files Browse the repository at this point in the history
  • Loading branch information
srebhan authored Nov 2, 2023
1 parent bbc5b16 commit 5b6b9cb
Show file tree
Hide file tree
Showing 7 changed files with 255 additions and 0 deletions.
5 changes: 5 additions & 0 deletions migrations/all/inputs_mqtt_consumer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
//go:build !custom || (migrations && (inputs || inputs.mqtt_consumer))

package all

import _ "github.com/influxdata/telegraf/migrations/inputs_mqtt_consumer" // register migration
43 changes: 43 additions & 0 deletions migrations/inputs_mqtt_consumer/migration.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package inputs_mqtt_consumer

import (
"github.com/influxdata/toml"
"github.com/influxdata/toml/ast"

"github.com/influxdata/telegraf/migrations"
)

// Migration function
func migrate(tbl *ast.Table) ([]byte, string, error) {
// Decode the old data structure
var plugin map[string]interface{}
if err := toml.UnmarshalTable(tbl, &plugin); err != nil {
return nil, "", err
}

// Check for deprecated option(s) and migrate them
var applied bool
if _, found := plugin["metric_buffer"]; found {
applied = true

// Remove the ignored setting
delete(plugin, "metric_buffer")
}

// No options migrated so we can exit early
if !applied {
return nil, "", migrations.ErrNotApplicable
}

// Create the corresponding plugin configurations
cfg := migrations.CreateTOMLStruct("inputs", "mqtt_consumer")
cfg.Add("inputs", "mqtt_consumer", plugin)

output, err := toml.Marshal(cfg)
return output, "", err
}

// Register the migration function for the plugin type
func init() {
migrations.AddPluginOptionMigration("inputs.mqtt_consumer", migrate)
}
161 changes: 161 additions & 0 deletions migrations/inputs_mqtt_consumer/migration_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
package inputs_mqtt_consumer_test

import (
"os"
"path/filepath"
"testing"

"github.com/stretchr/testify/require"

"github.com/influxdata/telegraf/config"
_ "github.com/influxdata/telegraf/migrations/inputs_mqtt_consumer" // register migration
_ "github.com/influxdata/telegraf/plugins/inputs/mqtt_consumer" // register plugin
_ "github.com/influxdata/telegraf/plugins/parsers/all" // register parsers
)

func TestNoMigration(t *testing.T) {
defaultCfg := []byte(`
# Read metrics from MQTT topic(s)
[[inputs.mqtt_consumer]]
## Broker URLs for the MQTT server or cluster. To connect to multiple
## clusters or standalone servers, use a separate plugin instance.
## example: servers = ["tcp://localhost:1883"]
## servers = ["ssl://localhost:1883"]
## servers = ["ws://localhost:1883"]
servers = ["tcp://127.0.0.1:1883"]
## Topics that will be subscribed to.
topics = [
"telegraf/host01/cpu",
"telegraf/+/mem",
"sensors/#",
]
## The message topic will be stored in a tag specified by this value. If set
## to the empty string no topic tag will be created.
# topic_tag = "topic"
## QoS policy for messages
## 0 = at most once
## 1 = at least once
## 2 = exactly once
##
## When using a QoS of 1 or 2, you should enable persistent_session to allow
## resuming unacknowledged messages.
# qos = 0
## Connection timeout for initial connection in seconds
# connection_timeout = "30s"
## Max undelivered messages
## This plugin uses tracking metrics, which ensure messages are read to
## outputs before acknowledging them to the original broker to ensure data
## is not lost. This option sets the maximum messages to read from the
## broker that have not been written by an output.
##
## This value needs to be picked with awareness of the agent's
## metric_batch_size value as well. Setting max undelivered messages too high
## can result in a constant stream of data batches to the output. While
## setting it too low may never flush the broker's messages.
# max_undelivered_messages = 1000
## Persistent session disables clearing of the client session on connection.
## In order for this option to work you must also set client_id to identify
## the client. To receive messages that arrived while the client is offline,
## also set the qos option to 1 or 2 and don't forget to also set the QoS when
## publishing.
# persistent_session = false
## If unset, a random client ID will be generated.
# client_id = ""
## Username and password to connect MQTT server.
# username = "telegraf"
# password = "metricsmetricsmetricsmetrics"
## Optional TLS Config
# tls_ca = "/etc/telegraf/ca.pem"
# tls_cert = "/etc/telegraf/cert.pem"
# tls_key = "/etc/telegraf/key.pem"
## Use TLS but skip chain & host verification
# insecure_skip_verify = false
## Client trace messages
## When set to true, and debug mode enabled in the agent settings, the MQTT
## client's messages are included in telegraf logs. These messages are very
## noisey, but essential for debugging issues.
# client_trace = false
## Data format to consume.
## Each data format has its own unique set of configuration options, read
## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
data_format = "influx"
## Enable extracting tag values from MQTT topics
## _ denotes an ignored entry in the topic path
# [[inputs.mqtt_consumer.topic_parsing]]
# topic = ""
# measurement = ""
# tags = ""
# fields = ""
## Value supported is int, float, unit
# [[inputs.mqtt_consumer.topic.types]]
# key = type
`)

// Migrate and check that nothing changed
output, n, err := config.ApplyMigrations(defaultCfg)
require.NoError(t, err)
require.NotEmpty(t, output)
require.Zero(t, n)
require.Equal(t, string(defaultCfg), string(output))
}

func TestCases(t *testing.T) {
// Get all directories in testdata
folders, err := os.ReadDir("testcases")
require.NoError(t, err)

for _, f := range folders {
// Only handle folders
if !f.IsDir() {
continue
}

t.Run(f.Name(), func(t *testing.T) {
testcasePath := filepath.Join("testcases", f.Name())
inputFile := filepath.Join(testcasePath, "telegraf.conf")
expectedFile := filepath.Join(testcasePath, "expected.conf")

// Read the expected output
expected := config.NewConfig()
require.NoError(t, expected.LoadConfig(expectedFile))
require.NotEmpty(t, expected.Inputs)

// Read the input data
input, remote, err := config.LoadConfigFile(inputFile)
require.NoError(t, err)
require.False(t, remote)
require.NotEmpty(t, input)

// Migrate
output, n, err := config.ApplyMigrations(input)
require.NoError(t, err)
require.NotEmpty(t, output)
require.GreaterOrEqual(t, n, uint64(1))
actual := config.NewConfig()
require.NoError(t, actual.LoadConfigData(output))

// Test the output
require.Len(t, actual.Inputs, len(expected.Inputs))
actualIDs := make([]string, 0, len(expected.Inputs))
expectedIDs := make([]string, 0, len(expected.Inputs))
for i := range actual.Inputs {
actualIDs = append(actualIDs, actual.Inputs[i].ID())
expectedIDs = append(expectedIDs, expected.Inputs[i].ID())
}
require.ElementsMatch(t, expectedIDs, actualIDs, string(output))
})
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
[[inputs.mqtt_consumer]]
data_format = "influx"
servers = ["tcp://127.0.0.1:1883"]
topics = ["telegraf/host01/cpu", "telegraf/+/mem", "sensors/#"]
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
# Read metrics from MQTT topic(s)
[[inputs.mqtt_consumer]]
servers = ["tcp://127.0.0.1:1883"]
topics = [
"telegraf/host01/cpu",
"telegraf/+/mem",
"sensors/#",
]
metric_buffer = 1024
data_format = "influx"
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
[[inputs.mqtt_consumer]]
data_format = "xpath_json"
servers = ["tcp://127.0.0.1:1883"]
topics = ["telegraf/host01/cpu", "telegraf/+/mem", "sensors/#"]
xpath_native_types = true

[[inputs.mqtt_consumer.xpath]]
field_selection = "/fields/*"
metric_name = "/name"
tag_selection = "/tags/*"
timestamp = "/timestamp"
timestamp_format = "unix_ms"
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# Read metrics from MQTT topic(s)
[[inputs.mqtt_consumer]]
servers = ["tcp://127.0.0.1:1883"]
topics = [
"telegraf/host01/cpu",
"telegraf/+/mem",
"sensors/#",
]
metric_buffer = 1024

data_format = "xpath_json"
xpath_native_types = true

# Configuration matching the first (ENERGY) message
[[inputs.mqtt_consumer.xpath]]
metric_name = "/name"
timestamp = "/timestamp"
timestamp_format = "unix_ms"
field_selection = "/fields/*"
tag_selection = "/tags/*"

0 comments on commit 5b6b9cb

Please sign in to comment.