From 1e67fd8949837eb520d38552961dda4aae9d36d3 Mon Sep 17 00:00:00 2001 From: Lance O'Connor Date: Tue, 11 Sep 2018 13:01:08 -0700 Subject: [PATCH] Add Splunk Metrics serializer (#4339) --- docs/DATA_FORMATS_OUTPUT.md | 1 + internal/config/config.go | 13 ++ plugins/serializers/registry.go | 10 + plugins/serializers/splunkmetric/README.md | 139 +++++++++++++ .../serializers/splunkmetric/splunkmetric.go | 126 ++++++++++++ .../splunkmetric/splunkmetric_test.go | 182 ++++++++++++++++++ 6 files changed, 471 insertions(+) create mode 100644 plugins/serializers/splunkmetric/README.md create mode 100644 plugins/serializers/splunkmetric/splunkmetric.go create mode 100644 plugins/serializers/splunkmetric/splunkmetric_test.go diff --git a/docs/DATA_FORMATS_OUTPUT.md b/docs/DATA_FORMATS_OUTPUT.md index f4e41c2541fc7..60902165660d5 100644 --- a/docs/DATA_FORMATS_OUTPUT.md +++ b/docs/DATA_FORMATS_OUTPUT.md @@ -7,6 +7,7 @@ plugins. 1. [InfluxDB Line Protocol](#influx) 1. [JSON](#json) 1. [Graphite](#graphite) +1. [SplunkMetric](../plugins/serializers/splunkmetric/README.md) You will be able to identify the plugins with support by the presence of a `data_format` config option, for example, in the `file` output plugin: diff --git a/internal/config/config.go b/internal/config/config.go index e10a5fc4cb244..2208268d27dbf 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -1693,6 +1693,18 @@ func buildSerializer(name string, tbl *ast.Table) (serializers.Serializer, error } } + if node, ok := tbl.Fields["splunkmetric_hec_routing"]; ok { + if kv, ok := node.(*ast.KeyValue); ok { + if b, ok := kv.Value.(*ast.Boolean); ok { + var err error + c.HecRouting, err = b.Boolean() + if err != nil { + return nil, err + } + } + } + } + delete(tbl.Fields, "influx_max_line_bytes") delete(tbl.Fields, "influx_sort_fields") delete(tbl.Fields, "influx_uint_support") @@ -1701,6 +1713,7 @@ func buildSerializer(name string, tbl *ast.Table) (serializers.Serializer, error delete(tbl.Fields, "prefix") delete(tbl.Fields, "template") delete(tbl.Fields, "json_timestamp_units") + delete(tbl.Fields, "splunkmetric_hec_routing") return serializers.NewSerializer(c) } diff --git a/plugins/serializers/registry.go b/plugins/serializers/registry.go index 277d332063ada..b8a0aef07006b 100644 --- a/plugins/serializers/registry.go +++ b/plugins/serializers/registry.go @@ -9,6 +9,7 @@ import ( "github.com/influxdata/telegraf/plugins/serializers/graphite" "github.com/influxdata/telegraf/plugins/serializers/influx" "github.com/influxdata/telegraf/plugins/serializers/json" + "github.com/influxdata/telegraf/plugins/serializers/splunkmetric" ) // SerializerOutput is an interface for output plugins that are able to @@ -60,6 +61,9 @@ type Config struct { // Timestamp units to use for JSON formatted output TimestampUnits time.Duration + + // Include HEC routing fields for splunkmetric output + HecRouting bool } // NewSerializer a Serializer interface based on the given config. @@ -73,6 +77,8 @@ func NewSerializer(config *Config) (Serializer, error) { serializer, err = NewGraphiteSerializer(config.Prefix, config.Template, config.GraphiteTagSupport) case "json": serializer, err = NewJsonSerializer(config.TimestampUnits) + case "splunkmetric": + serializer, err = NewSplunkmetricSerializer(config.HecRouting) default: err = fmt.Errorf("Invalid data format: %s", config.DataFormat) } @@ -83,6 +89,10 @@ func NewJsonSerializer(timestampUnits time.Duration) (Serializer, error) { return json.NewSerializer(timestampUnits) } +func NewSplunkmetricSerializer(splunkmetric_hec_routing bool) (Serializer, error) { + return splunkmetric.NewSerializer(splunkmetric_hec_routing) +} + func NewInfluxSerializerConfig(config *Config) (Serializer, error) { var sort influx.FieldSortOrder if config.InfluxSortFields { diff --git a/plugins/serializers/splunkmetric/README.md b/plugins/serializers/splunkmetric/README.md new file mode 100644 index 0000000000000..02d69db66ac22 --- /dev/null +++ b/plugins/serializers/splunkmetric/README.md @@ -0,0 +1,139 @@ +# Splunk Metrics serializer + +This serializer formats and outputs the metric data in a format that can be consumed by a Splunk metrics index. +It can be used to write to a file using the file output, or for sending metrics to a HEC using the standard telegraf HTTP output. + +If you're using the HTTP output, this serializer knows how to batch the metrics so you don't end up with an HTTP POST per metric. + +Th data is output in a format that conforms to the specified Splunk HEC JSON format as found here: +[Send metrics in JSON format](http://dev.splunk.com/view/event-collector/SP-CAAAFDN). + +An example event looks like: +```javascript +{ + "time": 1529708430, + "event": "metric", + "host": "patas-mbp", + "fields": { + "_value": 0.6, + "cpu": "cpu0", + "dc": "mobile", + "metric_name": "cpu.usage_user", + "user": "ronnocol" + } +} +``` +In the above snippet, the following keys are dimensions: +* cpu +* dc +* user + +## Using with the HTTP output + +To send this data to a Splunk HEC, you can use the HTTP output, there are some custom headers that you need to add +to manage the HEC authorization, here's a sample config for an HTTP output: + +```toml +[[outputs.http]] + ## URL is the address to send metrics to + url = "https://localhost:8088/services/collector" + + ## Timeout for HTTP message + # timeout = "5s" + + ## HTTP method, one of: "POST" or "PUT" + # method = "POST" + + ## HTTP Basic Auth credentials + # username = "username" + # password = "pa$$word" + + ## Optional TLS Config + # tls_ca = "/etc/telegraf/ca.pem" + # tls_cert = "/etc/telegraf/cert.pem" + # tls_key = "/etc/telegraf/key.pem" + ## Use TLS but skip chain & host verification + # insecure_skip_verify = false + + ## Data format to output. + ## Each data format has it's own unique set of configuration options, read + ## more about them here: + ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md + data_format = "splunkmetric" + ## Provides time, index, source overrides for the HEC + splunkmetric_hec_routing = true + + ## Additional HTTP headers + [outputs.http.headers] + # Should be set manually to "application/json" for json data_format + Content-Type = "application/json" + Authorization = "Splunk xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx" + X-Splunk-Request-Channel = "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx" +``` + +## Overrides +You can override the default values for the HEC token you are using by adding additional tags to the config file. + +The following aspects of the token can be overriden with tags: +* index +* source + +You can either use `[global_tags]` or using a more advanced configuration as documented [here](https://github.com/influxdata/telegraf/blob/master/docs/CONFIGURATION.md). + +Such as this example which overrides the index just on the cpu metric: +```toml +[[inputs.cpu]] + percpu = false + totalcpu = true + [inputs.cpu.tags] + index = "cpu_metrics" +``` + +## Using with the File output + +You can use the file output when running telegraf on a machine with a Splunk forwarder. + +A sample event when `hec_routing` is false (or unset) looks like: +```javascript +{ + "_value": 0.6, + "cpu": "cpu0", + "dc": "mobile", + "metric_name": "cpu.usage_user", + "user": "ronnocol", + "time": 1529708430 +} +``` +Data formatted in this manner can be ingested with a simple `props.conf` file that +looks like this: + +```ini +[telegraf] +category = Metrics +description = Telegraf Metrics +pulldown_type = 1 +DATETIME_CONFIG = +NO_BINARY_CHECK = true +SHOULD_LINEMERGE = true +disabled = false +INDEXED_EXTRACTIONS = json +KV_MODE = none +TIMESTAMP_FIELDS = time +TIME_FORMAT = %s.%3N +``` + +An example configuration of a file based output is: + +```toml + # Send telegraf metrics to file(s) +[[outputs.file]] + ## Files to write to, "stdout" is a specially handled file. + files = ["/tmp/metrics.out"] + + ## Data format to output. + ## Each data format has its own unique set of configuration options, read + ## more about them here: + ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md + data_format = "splunkmetric" + hec_routing = false +``` diff --git a/plugins/serializers/splunkmetric/splunkmetric.go b/plugins/serializers/splunkmetric/splunkmetric.go new file mode 100644 index 0000000000000..77de49ee07644 --- /dev/null +++ b/plugins/serializers/splunkmetric/splunkmetric.go @@ -0,0 +1,126 @@ +package splunkmetric + +import ( + "encoding/json" + "fmt" + "log" + + "github.com/influxdata/telegraf" +) + +type serializer struct { + HecRouting bool +} + +func NewSerializer(splunkmetric_hec_routing bool) (*serializer, error) { + s := &serializer{ + HecRouting: splunkmetric_hec_routing, + } + return s, nil +} + +func (s *serializer) Serialize(metric telegraf.Metric) ([]byte, error) { + + m, err := s.createObject(metric) + if err != nil { + return nil, fmt.Errorf("D! [serializer.splunkmetric] Dropping invalid metric: %s", metric.Name()) + } + + return m, nil +} + +func (s *serializer) SerializeBatch(metrics []telegraf.Metric) ([]byte, error) { + + var serialized []byte + + for _, metric := range metrics { + m, err := s.createObject(metric) + if err != nil { + return nil, fmt.Errorf("D! [serializer.splunkmetric] Dropping invalid metric: %s", metric.Name()) + } else if m != nil { + serialized = append(serialized, m...) + } + } + + return serialized, nil +} + +func (s *serializer) createObject(metric telegraf.Metric) (metricGroup []byte, err error) { + + /* Splunk supports one metric json object, and does _not_ support an array of JSON objects. + ** Splunk has the following required names for the metric store: + ** metric_name: The name of the metric + ** _value: The value for the metric + ** time: The timestamp for the metric + ** All other index fields become deminsions. + */ + type HECTimeSeries struct { + Time float64 `json:"time"` + Event string `json:"event"` + Host string `json:"host,omitempty"` + Index string `json:"index,omitempty"` + Source string `json:"source,omitempty"` + Fields map[string]interface{} `json:"fields"` + } + + dataGroup := HECTimeSeries{} + var metricJson []byte + + for _, field := range metric.FieldList() { + + if !verifyValue(field.Value) { + log.Printf("D! Can not parse value: %v for key: %v", field.Value, field.Key) + continue + } + + obj := map[string]interface{}{} + obj["metric_name"] = metric.Name() + "." + field.Key + obj["_value"] = field.Value + + dataGroup.Event = "metric" + // Convert ns to float seconds since epoch. + dataGroup.Time = float64(metric.Time().UnixNano()) / float64(1000000000) + dataGroup.Fields = obj + + // Break tags out into key(n)=value(t) pairs + for n, t := range metric.Tags() { + if n == "host" { + dataGroup.Host = t + } else if n == "index" { + dataGroup.Index = t + } else if n == "source" { + dataGroup.Source = t + } else { + dataGroup.Fields[n] = t + } + } + dataGroup.Fields["metric_name"] = metric.Name() + "." + field.Key + dataGroup.Fields["_value"] = field.Value + + switch s.HecRouting { + case true: + // Output the data as a fields array and host,index,time,source overrides for the HEC. + metricJson, err = json.Marshal(dataGroup) + default: + // Just output the data and the time, useful for file based outuputs + dataGroup.Fields["time"] = dataGroup.Time + metricJson, err = json.Marshal(dataGroup.Fields) + } + + metricGroup = append(metricGroup, metricJson...) + + if err != nil { + return nil, err + } + } + + return metricGroup, nil +} + +func verifyValue(v interface{}) bool { + switch v.(type) { + case string: + return false + } + return true +} diff --git a/plugins/serializers/splunkmetric/splunkmetric_test.go b/plugins/serializers/splunkmetric/splunkmetric_test.go new file mode 100644 index 0000000000000..f3825d803e9dc --- /dev/null +++ b/plugins/serializers/splunkmetric/splunkmetric_test.go @@ -0,0 +1,182 @@ +package splunkmetric + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/metric" +) + +func MustMetric(v telegraf.Metric, err error) telegraf.Metric { + if err != nil { + panic(err) + } + return v +} + +func TestSerializeMetricFloat(t *testing.T) { + // Test sub-second time + now := time.Unix(1529875740, 819000000) + tags := map[string]string{ + "cpu": "cpu0", + } + fields := map[string]interface{}{ + "usage_idle": float64(91.5), + } + m, err := metric.New("cpu", tags, fields, now) + assert.NoError(t, err) + + s, _ := NewSerializer(false) + var buf []byte + buf, err = s.Serialize(m) + assert.NoError(t, err) + expS := `{"_value":91.5,"cpu":"cpu0","metric_name":"cpu.usage_idle","time":1529875740.819}` + assert.Equal(t, string(expS), string(buf)) +} + +func TestSerializeMetricFloatHec(t *testing.T) { + // Test sub-second time + now := time.Unix(1529875740, 819000000) + tags := map[string]string{ + "cpu": "cpu0", + } + fields := map[string]interface{}{ + "usage_idle": float64(91.5), + } + m, err := metric.New("cpu", tags, fields, now) + assert.NoError(t, err) + + s, _ := NewSerializer(true) + var buf []byte + buf, err = s.Serialize(m) + assert.NoError(t, err) + expS := `{"time":1529875740.819,"event":"metric","fields":{"_value":91.5,"cpu":"cpu0","metric_name":"cpu.usage_idle"}}` + assert.Equal(t, string(expS), string(buf)) +} + +func TestSerializeMetricInt(t *testing.T) { + now := time.Unix(0, 0) + tags := map[string]string{ + "cpu": "cpu0", + } + fields := map[string]interface{}{ + "usage_idle": int64(90), + } + m, err := metric.New("cpu", tags, fields, now) + assert.NoError(t, err) + + s, _ := NewSerializer(false) + var buf []byte + buf, err = s.Serialize(m) + assert.NoError(t, err) + + expS := `{"_value":90,"cpu":"cpu0","metric_name":"cpu.usage_idle","time":0}` + assert.Equal(t, string(expS), string(buf)) +} + +func TestSerializeMetricIntHec(t *testing.T) { + now := time.Unix(0, 0) + tags := map[string]string{ + "cpu": "cpu0", + } + fields := map[string]interface{}{ + "usage_idle": int64(90), + } + m, err := metric.New("cpu", tags, fields, now) + assert.NoError(t, err) + + s, _ := NewSerializer(true) + var buf []byte + buf, err = s.Serialize(m) + assert.NoError(t, err) + + expS := `{"time":0,"event":"metric","fields":{"_value":90,"cpu":"cpu0","metric_name":"cpu.usage_idle"}}` + assert.Equal(t, string(expS), string(buf)) +} + +func TestSerializeMetricString(t *testing.T) { + now := time.Unix(0, 0) + tags := map[string]string{ + "cpu": "cpu0", + } + fields := map[string]interface{}{ + "processorType": "ARMv7 Processor rev 4 (v7l)", + "usage_idle": int64(5), + } + m, err := metric.New("cpu", tags, fields, now) + assert.NoError(t, err) + + s, _ := NewSerializer(false) + var buf []byte + buf, err = s.Serialize(m) + assert.NoError(t, err) + + expS := `{"_value":5,"cpu":"cpu0","metric_name":"cpu.usage_idle","time":0}` + assert.Equal(t, string(expS), string(buf)) + assert.NoError(t, err) +} + +func TestSerializeBatch(t *testing.T) { + m := MustMetric( + metric.New( + "cpu", + map[string]string{}, + map[string]interface{}{ + "value": 42.0, + }, + time.Unix(0, 0), + ), + ) + n := MustMetric( + metric.New( + "cpu", + map[string]string{}, + map[string]interface{}{ + "value": 92.0, + }, + time.Unix(0, 0), + ), + ) + + metrics := []telegraf.Metric{m, n} + s, _ := NewSerializer(false) + buf, err := s.SerializeBatch(metrics) + assert.NoError(t, err) + + expS := `{"_value":42,"metric_name":"cpu.value","time":0}` + `{"_value":92,"metric_name":"cpu.value","time":0}` + assert.Equal(t, string(expS), string(buf)) +} + +func TestSerializeBatchHec(t *testing.T) { + m := MustMetric( + metric.New( + "cpu", + map[string]string{}, + map[string]interface{}{ + "value": 42.0, + }, + time.Unix(0, 0), + ), + ) + n := MustMetric( + metric.New( + "cpu", + map[string]string{}, + map[string]interface{}{ + "value": 92.0, + }, + time.Unix(0, 0), + ), + ) + + metrics := []telegraf.Metric{m, n} + s, _ := NewSerializer(true) + buf, err := s.SerializeBatch(metrics) + assert.NoError(t, err) + + expS := `{"time":0,"event":"metric","fields":{"_value":42,"metric_name":"cpu.value"}}` + `{"time":0,"event":"metric","fields":{"_value":92,"metric_name":"cpu.value"}}` + assert.Equal(t, string(expS), string(buf)) +}