Skip to content

Commit

Permalink
Add parse_multivalue to collectd parser (influxdata#4403)
Browse files Browse the repository at this point in the history
  • Loading branch information
maxunt authored and otherpirate committed Mar 15, 2019
1 parent 74602a6 commit 8b6b741
Show file tree
Hide file tree
Showing 5 changed files with 117 additions and 35 deletions.
6 changes: 6 additions & 0 deletions docs/DATA_FORMATS_INPUT.md
Original file line number Diff line number Diff line change
Expand Up @@ -479,6 +479,12 @@ You can also change the path to the typesdb or add additional typesdb using
collectd_security_level = "encrypt"
## Path of to TypesDB specifications
collectd_typesdb = ["/usr/share/collectd/types.db"]

# Multi-value plugins can be handled two ways.
# "split" will parse and store the multi-value plugin data into separate measurements
# "join" will parse and store the multi-value plugin as a single multi-value measurement.
# "split" is the default behavior for backward compatability with previous versions of influxdb.
collectd_parse_multivalue = "split"
```

# Dropwizard:
Expand Down
9 changes: 9 additions & 0 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -1285,6 +1285,14 @@ func buildParser(name string, tbl *ast.Table) (parsers.Parser, error) {
}
}

if node, ok := tbl.Fields["collectd_parse_multivalue"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if str, ok := kv.Value.(*ast.String); ok {
c.CollectdSplit = str.Value
}
}
}

if node, ok := tbl.Fields["collectd_typesdb"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if ary, ok := kv.Value.(*ast.Array); ok {
Expand Down Expand Up @@ -1348,6 +1356,7 @@ func buildParser(name string, tbl *ast.Table) (parsers.Parser, error) {
delete(tbl.Fields, "collectd_auth_file")
delete(tbl.Fields, "collectd_security_level")
delete(tbl.Fields, "collectd_typesdb")
delete(tbl.Fields, "collectd_parse_multivalue")
delete(tbl.Fields, "dropwizard_metric_registry_path")
delete(tbl.Fields, "dropwizard_time_path")
delete(tbl.Fields, "dropwizard_time_format")
Expand Down
109 changes: 79 additions & 30 deletions plugins/parsers/collectd/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,10 @@ type CollectdParser struct {
// DefaultTags will be added to every parsed metric
DefaultTags map[string]string

popts network.ParseOpts
//whether or not to split multi value metric into multiple metrics
//default value is split
ParseMultiValue string
popts network.ParseOpts
}

func (p *CollectdParser) SetParseOpts(popts *network.ParseOpts) {
Expand All @@ -32,6 +35,7 @@ func NewCollectdParser(
authFile string,
securityLevel string,
typesDB []string,
split string,
) (*CollectdParser, error) {
popts := network.ParseOpts{}

Expand Down Expand Up @@ -64,7 +68,8 @@ func NewCollectdParser(
}
}

parser := CollectdParser{popts: popts}
parser := CollectdParser{popts: popts,
ParseMultiValue: split}
return &parser, nil
}

Expand All @@ -76,7 +81,7 @@ func (p *CollectdParser) Parse(buf []byte) ([]telegraf.Metric, error) {

metrics := []telegraf.Metric{}
for _, valueList := range valueLists {
metrics = append(metrics, UnmarshalValueList(valueList)...)
metrics = append(metrics, UnmarshalValueList(valueList, p.ParseMultiValue)...)
}

if len(p.DefaultTags) > 0 {
Expand Down Expand Up @@ -111,47 +116,91 @@ func (p *CollectdParser) SetDefaultTags(tags map[string]string) {
}

// UnmarshalValueList translates a ValueList into a Telegraf metric.
func UnmarshalValueList(vl *api.ValueList) []telegraf.Metric {
func UnmarshalValueList(vl *api.ValueList, multiValue string) []telegraf.Metric {
timestamp := vl.Time.UTC()

var metrics []telegraf.Metric
for i := range vl.Values {
var name string
name = fmt.Sprintf("%s_%s", vl.Identifier.Plugin, vl.DSName(i))
tags := make(map[string]string)
fields := make(map[string]interface{})

// Convert interface back to actual type, then to float64
switch value := vl.Values[i].(type) {
case api.Gauge:
fields["value"] = float64(value)
case api.Derive:
fields["value"] = float64(value)
case api.Counter:
fields["value"] = float64(value)
}
//set multiValue to default "split" if nothing is specified
if multiValue == "" {
multiValue = "split"
}
switch multiValue {
case "split":
for i := range vl.Values {
var name string
name = fmt.Sprintf("%s_%s", vl.Identifier.Plugin, vl.DSName(i))
tags := make(map[string]string)
fields := make(map[string]interface{})

// Convert interface back to actual type, then to float64
switch value := vl.Values[i].(type) {
case api.Gauge:
fields["value"] = float64(value)
case api.Derive:
fields["value"] = float64(value)
case api.Counter:
fields["value"] = float64(value)
}

if vl.Identifier.Host != "" {
tags["host"] = vl.Identifier.Host
}
if vl.Identifier.PluginInstance != "" {
tags["instance"] = vl.Identifier.PluginInstance
}
if vl.Identifier.Type != "" {
tags["type"] = vl.Identifier.Type
if vl.Identifier.Host != "" {
tags["host"] = vl.Identifier.Host
}
if vl.Identifier.PluginInstance != "" {
tags["instance"] = vl.Identifier.PluginInstance
}
if vl.Identifier.Type != "" {
tags["type"] = vl.Identifier.Type
}
if vl.Identifier.TypeInstance != "" {
tags["type_instance"] = vl.Identifier.TypeInstance
}

// Drop invalid points
m, err := metric.New(name, tags, fields, timestamp)
if err != nil {
log.Printf("E! Dropping metric %v: %v", name, err)
continue
}

metrics = append(metrics, m)
}
if vl.Identifier.TypeInstance != "" {
tags["type_instance"] = vl.Identifier.TypeInstance
case "join":
name := vl.Identifier.Plugin
tags := make(map[string]string)
fields := make(map[string]interface{})
for i := range vl.Values {
switch value := vl.Values[i].(type) {
case api.Gauge:
fields[vl.DSName(i)] = float64(value)
case api.Derive:
fields[vl.DSName(i)] = float64(value)
case api.Counter:
fields[vl.DSName(i)] = float64(value)
}

if vl.Identifier.Host != "" {
tags["host"] = vl.Identifier.Host
}
if vl.Identifier.PluginInstance != "" {
tags["instance"] = vl.Identifier.PluginInstance
}
if vl.Identifier.Type != "" {
tags["type"] = vl.Identifier.Type
}
if vl.Identifier.TypeInstance != "" {
tags["type_instance"] = vl.Identifier.TypeInstance
}
}

// Drop invalid points
m, err := metric.New(name, tags, fields, timestamp)
if err != nil {
log.Printf("E! Dropping metric %v: %v", name, err)
continue
}

metrics = append(metrics, m)
default:
log.Printf("parse-multi-value config can only be 'split' or 'join'")
}
return metrics
}
Expand Down
20 changes: 17 additions & 3 deletions plugins/parsers/collectd/parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

"collectd.org/api"
"collectd.org/network"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/influxdata/telegraf"
Expand Down Expand Up @@ -76,7 +77,7 @@ var multiMetric = testCase{
api.Derive(42),
api.Gauge(42),
},
DSNames: []string(nil),
DSNames: []string{"t1", "t2"},
},
},
[]metricData{
Expand Down Expand Up @@ -108,7 +109,7 @@ var multiMetric = testCase{
}

func TestNewCollectdParser(t *testing.T) {
parser, err := NewCollectdParser("", "", []string{})
parser, err := NewCollectdParser("", "", []string{}, "join")
require.Nil(t, err)
require.Equal(t, parser.popts.SecurityLevel, network.None)
require.NotNil(t, parser.popts.PasswordLookup)
Expand All @@ -133,6 +134,19 @@ func TestParse(t *testing.T) {
}
}

func TestParseMultiValueSplit(t *testing.T) {
buf, err := writeValueList(multiMetric.vl)
require.Nil(t, err)
bytes, err := buf.Bytes()
require.Nil(t, err)

parser := &CollectdParser{ParseMultiValue: "split"}
metrics, err := parser.Parse(bytes)
require.Nil(t, err)

assert.Equal(t, 2, len(metrics))
}

func TestParse_DefaultTags(t *testing.T) {
buf, err := writeValueList(singleMetric.vl)
require.Nil(t, err)
Expand Down Expand Up @@ -266,7 +280,7 @@ func TestParseLine(t *testing.T) {
bytes, err := buf.Bytes()
require.Nil(t, err)

parser, err := NewCollectdParser("", "", []string{})
parser, err := NewCollectdParser("", "", []string{}, "split")
require.Nil(t, err)
metric, err := parser.ParseLine(string(bytes))
require.Nil(t, err)
Expand Down
8 changes: 6 additions & 2 deletions plugins/parsers/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@ type Config struct {
// Dataset specification for collectd
CollectdTypesDB []string

// whether to split or join multivalue metrics
CollectdSplit string

// DataType only applies to value, this will be the type to parse value to
DataType string

Expand Down Expand Up @@ -109,7 +112,7 @@ func NewParser(config *Config) (Parser, error) {
config.Templates, config.DefaultTags)
case "collectd":
parser, err = NewCollectdParser(config.CollectdAuthFile,
config.CollectdSecurityLevel, config.CollectdTypesDB)
config.CollectdSecurityLevel, config.CollectdTypesDB, config.CollectdSplit)
case "dropwizard":
parser, err = NewDropwizardParser(
config.DropwizardMetricRegistryPath,
Expand Down Expand Up @@ -172,8 +175,9 @@ func NewCollectdParser(
authFile string,
securityLevel string,
typesDB []string,
split string,
) (Parser, error) {
return collectd.NewCollectdParser(authFile, securityLevel, typesDB)
return collectd.NewCollectdParser(authFile, securityLevel, typesDB, split)
}

func NewDropwizardParser(
Expand Down

0 comments on commit 8b6b741

Please sign in to comment.