Skip to content

Commit

Permalink
Add Splunk Metrics serializer (influxdata#4339)
Browse files Browse the repository at this point in the history
  • Loading branch information
ronnocol authored and Jean-Louis Dupond committed Apr 22, 2019
1 parent 2e6a4eb commit 839a0ec
Show file tree
Hide file tree
Showing 6 changed files with 471 additions and 0 deletions.
1 change: 1 addition & 0 deletions docs/DATA_FORMATS_OUTPUT.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ plugins.
1. [InfluxDB Line Protocol](#influx)
1. [JSON](#json)
1. [Graphite](#graphite)
1. [SplunkMetric](../plugins/serializers/splunkmetric/README.md)

You will be able to identify the plugins with support by the presence of a
`data_format` config option, for example, in the `file` output plugin:
Expand Down
13 changes: 13 additions & 0 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -1693,6 +1693,18 @@ func buildSerializer(name string, tbl *ast.Table) (serializers.Serializer, error
}
}

if node, ok := tbl.Fields["splunkmetric_hec_routing"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if b, ok := kv.Value.(*ast.Boolean); ok {
var err error
c.HecRouting, err = b.Boolean()
if err != nil {
return nil, err
}
}
}
}

delete(tbl.Fields, "influx_max_line_bytes")
delete(tbl.Fields, "influx_sort_fields")
delete(tbl.Fields, "influx_uint_support")
Expand All @@ -1701,6 +1713,7 @@ func buildSerializer(name string, tbl *ast.Table) (serializers.Serializer, error
delete(tbl.Fields, "prefix")
delete(tbl.Fields, "template")
delete(tbl.Fields, "json_timestamp_units")
delete(tbl.Fields, "splunkmetric_hec_routing")
return serializers.NewSerializer(c)
}

Expand Down
10 changes: 10 additions & 0 deletions plugins/serializers/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/influxdata/telegraf/plugins/serializers/graphite"
"github.com/influxdata/telegraf/plugins/serializers/influx"
"github.com/influxdata/telegraf/plugins/serializers/json"
"github.com/influxdata/telegraf/plugins/serializers/splunkmetric"
)

// SerializerOutput is an interface for output plugins that are able to
Expand Down Expand Up @@ -60,6 +61,9 @@ type Config struct {

// Timestamp units to use for JSON formatted output
TimestampUnits time.Duration

// Include HEC routing fields for splunkmetric output
HecRouting bool
}

// NewSerializer a Serializer interface based on the given config.
Expand All @@ -73,6 +77,8 @@ func NewSerializer(config *Config) (Serializer, error) {
serializer, err = NewGraphiteSerializer(config.Prefix, config.Template, config.GraphiteTagSupport)
case "json":
serializer, err = NewJsonSerializer(config.TimestampUnits)
case "splunkmetric":
serializer, err = NewSplunkmetricSerializer(config.HecRouting)
default:
err = fmt.Errorf("Invalid data format: %s", config.DataFormat)
}
Expand All @@ -83,6 +89,10 @@ func NewJsonSerializer(timestampUnits time.Duration) (Serializer, error) {
return json.NewSerializer(timestampUnits)
}

func NewSplunkmetricSerializer(splunkmetric_hec_routing bool) (Serializer, error) {
return splunkmetric.NewSerializer(splunkmetric_hec_routing)
}

func NewInfluxSerializerConfig(config *Config) (Serializer, error) {
var sort influx.FieldSortOrder
if config.InfluxSortFields {
Expand Down
139 changes: 139 additions & 0 deletions plugins/serializers/splunkmetric/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
# Splunk Metrics serializer

This serializer formats and outputs the metric data in a format that can be consumed by a Splunk metrics index.
It can be used to write to a file using the file output, or for sending metrics to a HEC using the standard telegraf HTTP output.

If you're using the HTTP output, this serializer knows how to batch the metrics so you don't end up with an HTTP POST per metric.

Th data is output in a format that conforms to the specified Splunk HEC JSON format as found here:
[Send metrics in JSON format](http://dev.splunk.com/view/event-collector/SP-CAAAFDN).

An example event looks like:
```javascript
{
"time": 1529708430,
"event": "metric",
"host": "patas-mbp",
"fields": {
"_value": 0.6,
"cpu": "cpu0",
"dc": "mobile",
"metric_name": "cpu.usage_user",
"user": "ronnocol"
}
}
```
In the above snippet, the following keys are dimensions:
* cpu
* dc
* user

## Using with the HTTP output

To send this data to a Splunk HEC, you can use the HTTP output, there are some custom headers that you need to add
to manage the HEC authorization, here's a sample config for an HTTP output:

```toml
[[outputs.http]]
## URL is the address to send metrics to
url = "https://localhost:8088/services/collector"

## Timeout for HTTP message
# timeout = "5s"

## HTTP method, one of: "POST" or "PUT"
# method = "POST"

## HTTP Basic Auth credentials
# username = "username"
# password = "pa$$word"

## Optional TLS Config
# tls_ca = "/etc/telegraf/ca.pem"
# tls_cert = "/etc/telegraf/cert.pem"
# tls_key = "/etc/telegraf/key.pem"
## Use TLS but skip chain & host verification
# insecure_skip_verify = false

## Data format to output.
## Each data format has it's own unique set of configuration options, read
## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md
data_format = "splunkmetric"
## Provides time, index, source overrides for the HEC
splunkmetric_hec_routing = true

## Additional HTTP headers
[outputs.http.headers]
# Should be set manually to "application/json" for json data_format
Content-Type = "application/json"
Authorization = "Splunk xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx"
X-Splunk-Request-Channel = "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx"
```

## Overrides
You can override the default values for the HEC token you are using by adding additional tags to the config file.

The following aspects of the token can be overriden with tags:
* index
* source

You can either use `[global_tags]` or using a more advanced configuration as documented [here](https://github.com/influxdata/telegraf/blob/master/docs/CONFIGURATION.md).

Such as this example which overrides the index just on the cpu metric:
```toml
[[inputs.cpu]]
percpu = false
totalcpu = true
[inputs.cpu.tags]
index = "cpu_metrics"
```

## Using with the File output

You can use the file output when running telegraf on a machine with a Splunk forwarder.

A sample event when `hec_routing` is false (or unset) looks like:
```javascript
{
"_value": 0.6,
"cpu": "cpu0",
"dc": "mobile",
"metric_name": "cpu.usage_user",
"user": "ronnocol",
"time": 1529708430
}
```
Data formatted in this manner can be ingested with a simple `props.conf` file that
looks like this:

```ini
[telegraf]
category = Metrics
description = Telegraf Metrics
pulldown_type = 1
DATETIME_CONFIG =
NO_BINARY_CHECK = true
SHOULD_LINEMERGE = true
disabled = false
INDEXED_EXTRACTIONS = json
KV_MODE = none
TIMESTAMP_FIELDS = time
TIME_FORMAT = %s.%3N
```

An example configuration of a file based output is:

```toml
# Send telegraf metrics to file(s)
[[outputs.file]]
## Files to write to, "stdout" is a specially handled file.
files = ["/tmp/metrics.out"]

## Data format to output.
## Each data format has its own unique set of configuration options, read
## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md
data_format = "splunkmetric"
hec_routing = false
```
126 changes: 126 additions & 0 deletions plugins/serializers/splunkmetric/splunkmetric.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
package splunkmetric

import (
"encoding/json"
"fmt"
"log"

"github.com/influxdata/telegraf"
)

type serializer struct {
HecRouting bool
}

func NewSerializer(splunkmetric_hec_routing bool) (*serializer, error) {
s := &serializer{
HecRouting: splunkmetric_hec_routing,
}
return s, nil
}

func (s *serializer) Serialize(metric telegraf.Metric) ([]byte, error) {

m, err := s.createObject(metric)
if err != nil {
return nil, fmt.Errorf("D! [serializer.splunkmetric] Dropping invalid metric: %s", metric.Name())
}

return m, nil
}

func (s *serializer) SerializeBatch(metrics []telegraf.Metric) ([]byte, error) {

var serialized []byte

for _, metric := range metrics {
m, err := s.createObject(metric)
if err != nil {
return nil, fmt.Errorf("D! [serializer.splunkmetric] Dropping invalid metric: %s", metric.Name())
} else if m != nil {
serialized = append(serialized, m...)
}
}

return serialized, nil
}

func (s *serializer) createObject(metric telegraf.Metric) (metricGroup []byte, err error) {

/* Splunk supports one metric json object, and does _not_ support an array of JSON objects.
** Splunk has the following required names for the metric store:
** metric_name: The name of the metric
** _value: The value for the metric
** time: The timestamp for the metric
** All other index fields become deminsions.
*/
type HECTimeSeries struct {
Time float64 `json:"time"`
Event string `json:"event"`
Host string `json:"host,omitempty"`
Index string `json:"index,omitempty"`
Source string `json:"source,omitempty"`
Fields map[string]interface{} `json:"fields"`
}

dataGroup := HECTimeSeries{}
var metricJson []byte

for _, field := range metric.FieldList() {

if !verifyValue(field.Value) {
log.Printf("D! Can not parse value: %v for key: %v", field.Value, field.Key)
continue
}

obj := map[string]interface{}{}
obj["metric_name"] = metric.Name() + "." + field.Key
obj["_value"] = field.Value

dataGroup.Event = "metric"
// Convert ns to float seconds since epoch.
dataGroup.Time = float64(metric.Time().UnixNano()) / float64(1000000000)
dataGroup.Fields = obj

// Break tags out into key(n)=value(t) pairs
for n, t := range metric.Tags() {
if n == "host" {
dataGroup.Host = t
} else if n == "index" {
dataGroup.Index = t
} else if n == "source" {
dataGroup.Source = t
} else {
dataGroup.Fields[n] = t
}
}
dataGroup.Fields["metric_name"] = metric.Name() + "." + field.Key
dataGroup.Fields["_value"] = field.Value

switch s.HecRouting {
case true:
// Output the data as a fields array and host,index,time,source overrides for the HEC.
metricJson, err = json.Marshal(dataGroup)
default:
// Just output the data and the time, useful for file based outuputs
dataGroup.Fields["time"] = dataGroup.Time
metricJson, err = json.Marshal(dataGroup.Fields)
}

metricGroup = append(metricGroup, metricJson...)

if err != nil {
return nil, err
}
}

return metricGroup, nil
}

func verifyValue(v interface{}) bool {
switch v.(type) {
case string:
return false
}
return true
}
Loading

0 comments on commit 839a0ec

Please sign in to comment.