From 12e90a6e71894453fa2c89cac6e8c06e111d892f Mon Sep 17 00:00:00 2001 From: Greg Schrock Date: Tue, 24 Nov 2020 10:08:28 -0500 Subject: [PATCH] Add influx_timestamp_units to the influx output serializer Follows this change for the JSON serializer: https://github.com/influxdata/telegraf/pull/2587 --- config/config.go | 22 +++++++++++++++- plugins/serializers/influx/README.md | 9 +++++++ plugins/serializers/influx/influx.go | 20 ++++++++++++-- plugins/serializers/influx/influx_test.go | 32 ++++++++++++++++++----- plugins/serializers/registry.go | 4 +++ 5 files changed, 77 insertions(+), 10 deletions(-) diff --git a/config/config.go b/config/config.go index 4fd65139e2ab9..3f197f935ecd8 100644 --- a/config/config.go +++ b/config/config.go @@ -1892,7 +1892,10 @@ func getParserConfig(name string, tbl *ast.Table) (*parsers.Config, error) { // a serializers.Serializer object, and creates it, which can then be added onto // an Output object. func buildSerializer(name string, tbl *ast.Table) (serializers.Serializer, error) { - c := &serializers.Config{TimestampUnits: time.Duration(1 * time.Second)} + c := &serializers.Config{ + TimestampUnits: time.Second, + InfluxTimestampUnits: time.Nanosecond, + } if node, ok := tbl.Fields["data_format"]; ok { if kv, ok := node.(*ast.KeyValue); ok { @@ -1978,6 +1981,22 @@ func buildSerializer(name string, tbl *ast.Table) (serializers.Serializer, error } } + if node, ok := tbl.Fields["influx_timestamp_units"]; ok { + if kv, ok := node.(*ast.KeyValue); ok { + if str, ok := kv.Value.(*ast.String); ok { + timestampVal, err := time.ParseDuration(str.Value) + if err != nil { + return nil, fmt.Errorf("Unable to parse influx_timestamp_units as a duration, %s", err) + } + // now that we have a duration, truncate it to the nearest + // power of ten (just in case) + nearestExponent := int64(math.Log10(float64(timestampVal.Nanoseconds()))) + newNanoseconds := int64(math.Pow(10.0, float64(nearestExponent))) + c.InfluxTimestampUnits = time.Duration(newNanoseconds) + } + } + } + if node, ok := tbl.Fields["graphite_tag_support"]; ok { if kv, ok := node.(*ast.KeyValue); ok { if b, ok := kv.Value.(*ast.Boolean); ok { @@ -2102,6 +2121,7 @@ func buildSerializer(name string, tbl *ast.Table) (serializers.Serializer, error delete(tbl.Fields, "influx_max_line_bytes") delete(tbl.Fields, "influx_sort_fields") delete(tbl.Fields, "influx_uint_support") + delete(tbl.Fields, "influx_timestamp_units") delete(tbl.Fields, "graphite_tag_support") delete(tbl.Fields, "graphite_separator") delete(tbl.Fields, "data_format") diff --git a/plugins/serializers/influx/README.md b/plugins/serializers/influx/README.md index d21ead8758f38..4a62f0985a88c 100644 --- a/plugins/serializers/influx/README.md +++ b/plugins/serializers/influx/README.md @@ -30,6 +30,15 @@ for interoperability. ## integer values. Enabling this option will result in field type errors if ## existing data has been written. influx_uint_support = false + + ## By default, the line format timestamp is at nanosecond precision. The + ## precision can be adjusted here. This parameter can be used to set the + ## timestamp units to nanoseconds (`ns`), microseconds (`us` or `µs`), + ## milliseconds (`ms`), or seconds (`s`). Note that this parameter will be + ## truncated to the nearest power of 10, so if the `influx_timestamp_units` + ## are set to `15ms` the timestamps for the serialized line will be output in + ## hundredths of a second (`10ms`). + influx_timestamp_units = "1ns" ``` ### Metrics diff --git a/plugins/serializers/influx/influx.go b/plugins/serializers/influx/influx.go index aa76b8accb8e1..3581bb5de2b68 100644 --- a/plugins/serializers/influx/influx.go +++ b/plugins/serializers/influx/influx.go @@ -9,6 +9,7 @@ import ( "sort" "strconv" "strings" + "time" "github.com/influxdata/telegraf" ) @@ -62,6 +63,7 @@ type Serializer struct { bytesWritten int fieldSortOrder FieldSortOrder fieldTypeSupport FieldTypeSupport + unitsNanoseconds int64 buf bytes.Buffer header []byte @@ -71,7 +73,8 @@ type Serializer struct { func NewSerializer() *Serializer { serializer := &Serializer{ - fieldSortOrder: NoSortFields, + fieldSortOrder: NoSortFields, + unitsNanoseconds: 1, header: make([]byte, 0, 50), footer: make([]byte, 0, 21), @@ -92,6 +95,19 @@ func (s *Serializer) SetFieldTypeSupport(typeSupport FieldTypeSupport) { s.fieldTypeSupport = typeSupport } +func (s *Serializer) SetTimestampUnits(units time.Duration) { + unitsNanoseconds := units.Nanoseconds() + fmt.Printf("Nanoseconds: %v\n", unitsNanoseconds) + + // if the units passed in were less than or equal to zero, + // then serialize the timestamp in seconds (the default) + if unitsNanoseconds <= 0 { + unitsNanoseconds = 1 + } + + s.unitsNanoseconds = unitsNanoseconds +} + // Serialize writes the telegraf.Metric to a byte slice. May produce multiple // lines of output if longer than maximum line length. Lines are terminated // with a newline (LF) char. @@ -182,7 +198,7 @@ func (s *Serializer) buildHeader(m telegraf.Metric) error { func (s *Serializer) buildFooter(m telegraf.Metric) { s.footer = s.footer[:0] s.footer = append(s.footer, ' ') - s.footer = strconv.AppendInt(s.footer, m.Time().UnixNano(), 10) + s.footer = strconv.AppendInt(s.footer, m.Time().UnixNano()/s.unitsNanoseconds, 10) s.footer = append(s.footer, '\n') } diff --git a/plugins/serializers/influx/influx_test.go b/plugins/serializers/influx/influx_test.go index a86215d94bf4b..0455d4ed91977 100644 --- a/plugins/serializers/influx/influx_test.go +++ b/plugins/serializers/influx/influx_test.go @@ -18,12 +18,13 @@ func MustMetric(v telegraf.Metric, err error) telegraf.Metric { } var tests = []struct { - name string - maxBytes int - typeSupport FieldTypeSupport - input telegraf.Metric - output []byte - errReason string + name string + maxBytes int + typeSupport FieldTypeSupport + input telegraf.Metric + output []byte + errReason string + timestampUnits time.Duration }{ { name: "minimal", @@ -230,7 +231,7 @@ var tests = []struct { output: []byte("cpu value=\"howdy\" 0\n"), }, { - name: "timestamp", + name: "default timestamp", input: MustMetric( metric.New( "cpu", @@ -243,6 +244,21 @@ var tests = []struct { ), output: []byte("cpu value=42 1519194109000000042\n"), }, + { + name: "timestamp second units", + input: MustMetric( + metric.New( + "cpu", + map[string]string{}, + map[string]interface{}{ + "value": 42.0, + }, + time.Unix(1519194109, 42), + ), + ), + timestampUnits: time.Second, + output: []byte("cpu value=42 1519194109\n"), + }, { name: "split fields exact", maxBytes: 33, @@ -539,6 +555,7 @@ func TestSerializer(t *testing.T) { serializer.SetMaxLineBytes(tt.maxBytes) serializer.SetFieldSortOrder(SortFields) serializer.SetFieldTypeSupport(tt.typeSupport) + serializer.SetTimestampUnits(tt.timestampUnits) output, err := serializer.Serialize(tt.input) if tt.errReason != "" { require.Error(t, err) @@ -555,6 +572,7 @@ func BenchmarkSerializer(b *testing.B) { serializer := NewSerializer() serializer.SetMaxLineBytes(tt.maxBytes) serializer.SetFieldTypeSupport(tt.typeSupport) + serializer.SetTimestampUnits(tt.timestampUnits) for n := 0; n < b.N; n++ { output, err := serializer.Serialize(tt.input) _ = err diff --git a/plugins/serializers/registry.go b/plugins/serializers/registry.go index b12ef7660b981..82ec42e2bdbe4 100644 --- a/plugins/serializers/registry.go +++ b/plugins/serializers/registry.go @@ -67,6 +67,9 @@ type Config struct { // Support unsigned integer output; influx format only InfluxUintSupport bool `toml:"influx_uint_support"` + // Timestamp units to use for Influx formatted output + InfluxTimestampUnits time.Duration `toml:"influx_timestamp_units"` + // Prefix to add to all measurements, only supports Graphite Prefix string `toml:"prefix"` @@ -190,6 +193,7 @@ func NewInfluxSerializerConfig(config *Config) (Serializer, error) { s.SetMaxLineBytes(config.InfluxMaxLineBytes) s.SetFieldSortOrder(sort) s.SetFieldTypeSupport(typeSupport) + s.SetTimestampUnits(config.InfluxTimestampUnits) return s, nil }