Skip to content

Commit

Permalink
Add influx_timestamp_units to the influx output serializer
Browse files Browse the repository at this point in the history
Follows this change for the JSON serializer:
influxdata#2587
  • Loading branch information
gregschrock committed Dec 1, 2020
1 parent a9e6abc commit 12e90a6
Show file tree
Hide file tree
Showing 5 changed files with 77 additions and 10 deletions.
22 changes: 21 additions & 1 deletion config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -1892,7 +1892,10 @@ func getParserConfig(name string, tbl *ast.Table) (*parsers.Config, error) {
// a serializers.Serializer object, and creates it, which can then be added onto
// an Output object.
func buildSerializer(name string, tbl *ast.Table) (serializers.Serializer, error) {
c := &serializers.Config{TimestampUnits: time.Duration(1 * time.Second)}
c := &serializers.Config{
TimestampUnits: time.Second,
InfluxTimestampUnits: time.Nanosecond,
}

if node, ok := tbl.Fields["data_format"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
Expand Down Expand Up @@ -1978,6 +1981,22 @@ func buildSerializer(name string, tbl *ast.Table) (serializers.Serializer, error
}
}

if node, ok := tbl.Fields["influx_timestamp_units"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if str, ok := kv.Value.(*ast.String); ok {
timestampVal, err := time.ParseDuration(str.Value)
if err != nil {
return nil, fmt.Errorf("Unable to parse influx_timestamp_units as a duration, %s", err)
}
// now that we have a duration, truncate it to the nearest
// power of ten (just in case)
nearestExponent := int64(math.Log10(float64(timestampVal.Nanoseconds())))
newNanoseconds := int64(math.Pow(10.0, float64(nearestExponent)))
c.InfluxTimestampUnits = time.Duration(newNanoseconds)
}
}
}

if node, ok := tbl.Fields["graphite_tag_support"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if b, ok := kv.Value.(*ast.Boolean); ok {
Expand Down Expand Up @@ -2102,6 +2121,7 @@ func buildSerializer(name string, tbl *ast.Table) (serializers.Serializer, error
delete(tbl.Fields, "influx_max_line_bytes")
delete(tbl.Fields, "influx_sort_fields")
delete(tbl.Fields, "influx_uint_support")
delete(tbl.Fields, "influx_timestamp_units")
delete(tbl.Fields, "graphite_tag_support")
delete(tbl.Fields, "graphite_separator")
delete(tbl.Fields, "data_format")
Expand Down
9 changes: 9 additions & 0 deletions plugins/serializers/influx/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,15 @@ for interoperability.
## integer values. Enabling this option will result in field type errors if
## existing data has been written.
influx_uint_support = false

## By default, the line format timestamp is at nanosecond precision. The
## precision can be adjusted here. This parameter can be used to set the
## timestamp units to nanoseconds (`ns`), microseconds (`us` or `µs`),
## milliseconds (`ms`), or seconds (`s`). Note that this parameter will be
## truncated to the nearest power of 10, so if the `influx_timestamp_units`
## are set to `15ms` the timestamps for the serialized line will be output in
## hundredths of a second (`10ms`).
influx_timestamp_units = "1ns"
```

### Metrics
Expand Down
20 changes: 18 additions & 2 deletions plugins/serializers/influx/influx.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"sort"
"strconv"
"strings"
"time"

"github.com/influxdata/telegraf"
)
Expand Down Expand Up @@ -62,6 +63,7 @@ type Serializer struct {
bytesWritten int
fieldSortOrder FieldSortOrder
fieldTypeSupport FieldTypeSupport
unitsNanoseconds int64

buf bytes.Buffer
header []byte
Expand All @@ -71,7 +73,8 @@ type Serializer struct {

func NewSerializer() *Serializer {
serializer := &Serializer{
fieldSortOrder: NoSortFields,
fieldSortOrder: NoSortFields,
unitsNanoseconds: 1,

header: make([]byte, 0, 50),
footer: make([]byte, 0, 21),
Expand All @@ -92,6 +95,19 @@ func (s *Serializer) SetFieldTypeSupport(typeSupport FieldTypeSupport) {
s.fieldTypeSupport = typeSupport
}

func (s *Serializer) SetTimestampUnits(units time.Duration) {
unitsNanoseconds := units.Nanoseconds()
fmt.Printf("Nanoseconds: %v\n", unitsNanoseconds)

// if the units passed in were less than or equal to zero,
// then serialize the timestamp in seconds (the default)
if unitsNanoseconds <= 0 {
unitsNanoseconds = 1
}

s.unitsNanoseconds = unitsNanoseconds
}

// Serialize writes the telegraf.Metric to a byte slice. May produce multiple
// lines of output if longer than maximum line length. Lines are terminated
// with a newline (LF) char.
Expand Down Expand Up @@ -182,7 +198,7 @@ func (s *Serializer) buildHeader(m telegraf.Metric) error {
func (s *Serializer) buildFooter(m telegraf.Metric) {
s.footer = s.footer[:0]
s.footer = append(s.footer, ' ')
s.footer = strconv.AppendInt(s.footer, m.Time().UnixNano(), 10)
s.footer = strconv.AppendInt(s.footer, m.Time().UnixNano()/s.unitsNanoseconds, 10)
s.footer = append(s.footer, '\n')
}

Expand Down
32 changes: 25 additions & 7 deletions plugins/serializers/influx/influx_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,13 @@ func MustMetric(v telegraf.Metric, err error) telegraf.Metric {
}

var tests = []struct {
name string
maxBytes int
typeSupport FieldTypeSupport
input telegraf.Metric
output []byte
errReason string
name string
maxBytes int
typeSupport FieldTypeSupport
input telegraf.Metric
output []byte
errReason string
timestampUnits time.Duration
}{
{
name: "minimal",
Expand Down Expand Up @@ -230,7 +231,7 @@ var tests = []struct {
output: []byte("cpu value=\"howdy\" 0\n"),
},
{
name: "timestamp",
name: "default timestamp",
input: MustMetric(
metric.New(
"cpu",
Expand All @@ -243,6 +244,21 @@ var tests = []struct {
),
output: []byte("cpu value=42 1519194109000000042\n"),
},
{
name: "timestamp second units",
input: MustMetric(
metric.New(
"cpu",
map[string]string{},
map[string]interface{}{
"value": 42.0,
},
time.Unix(1519194109, 42),
),
),
timestampUnits: time.Second,
output: []byte("cpu value=42 1519194109\n"),
},
{
name: "split fields exact",
maxBytes: 33,
Expand Down Expand Up @@ -539,6 +555,7 @@ func TestSerializer(t *testing.T) {
serializer.SetMaxLineBytes(tt.maxBytes)
serializer.SetFieldSortOrder(SortFields)
serializer.SetFieldTypeSupport(tt.typeSupport)
serializer.SetTimestampUnits(tt.timestampUnits)
output, err := serializer.Serialize(tt.input)
if tt.errReason != "" {
require.Error(t, err)
Expand All @@ -555,6 +572,7 @@ func BenchmarkSerializer(b *testing.B) {
serializer := NewSerializer()
serializer.SetMaxLineBytes(tt.maxBytes)
serializer.SetFieldTypeSupport(tt.typeSupport)
serializer.SetTimestampUnits(tt.timestampUnits)
for n := 0; n < b.N; n++ {
output, err := serializer.Serialize(tt.input)
_ = err
Expand Down
4 changes: 4 additions & 0 deletions plugins/serializers/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,9 @@ type Config struct {
// Support unsigned integer output; influx format only
InfluxUintSupport bool `toml:"influx_uint_support"`

// Timestamp units to use for Influx formatted output
InfluxTimestampUnits time.Duration `toml:"influx_timestamp_units"`

// Prefix to add to all measurements, only supports Graphite
Prefix string `toml:"prefix"`

Expand Down Expand Up @@ -190,6 +193,7 @@ func NewInfluxSerializerConfig(config *Config) (Serializer, error) {
s.SetMaxLineBytes(config.InfluxMaxLineBytes)
s.SetFieldSortOrder(sort)
s.SetFieldTypeSupport(typeSupport)
s.SetTimestampUnits(config.InfluxTimestampUnits)
return s, nil
}

Expand Down

0 comments on commit 12e90a6

Please sign in to comment.