From d40f46e7ce6774d8b8e031d00cb65d06935214df Mon Sep 17 00:00:00 2001 From: Joshua Powers Date: Mon, 27 Feb 2023 10:39:25 -0700 Subject: [PATCH] feat(inputs.jti_openconfig_telemetry): Set timestamp from data (#12730) --- .../inputs/jti_openconfig_telemetry/README.md | 5 +++ .../jti_openconfig_telemetry.go | 35 +++++++++++++++---- .../jti_openconfig_telemetry_test.go | 34 ++++++++++++++++++ .../jti_openconfig_telemetry/sample.conf | 5 +++ 4 files changed, 73 insertions(+), 6 deletions(-) diff --git a/plugins/inputs/jti_openconfig_telemetry/README.md b/plugins/inputs/jti_openconfig_telemetry/README.md index b784bb19dfe3f..3ee7647340ec0 100644 --- a/plugins/inputs/jti_openconfig_telemetry/README.md +++ b/plugins/inputs/jti_openconfig_telemetry/README.md @@ -54,6 +54,11 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details. "/interfaces", ] + ## Timestamp Source + ## Set to 'collection' for time of collection, and 'data' for using the time + ## provided by the _timestamp field. + # timestamp_source = "collection" + ## Optional TLS Config # enable_tls = false # tls_ca = "/etc/telegraf/ca.pem" diff --git a/plugins/inputs/jti_openconfig_telemetry/jti_openconfig_telemetry.go b/plugins/inputs/jti_openconfig_telemetry/jti_openconfig_telemetry.go index cc703d125fc9f..7bd662df0de14 100644 --- a/plugins/inputs/jti_openconfig_telemetry/jti_openconfig_telemetry.go +++ b/plugins/inputs/jti_openconfig_telemetry/jti_openconfig_telemetry.go @@ -2,6 +2,7 @@ package jti_openconfig_telemetry import ( + "context" _ "embed" "fmt" "net" @@ -10,7 +11,6 @@ import ( "sync" "time" - "context" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials" @@ -34,6 +34,7 @@ type OpenConfigTelemetry struct { Username string `toml:"username"` Password string `toml:"password"` ClientID string `toml:"client_id"` + TimestampSource string `toml:"timestamp_source"` SampleFrequency config.Duration `toml:"sample_frequency"` StrAsTags bool `toml:"str_as_tags"` RetryDelay config.Duration `toml:"retry_delay"` @@ -56,6 +57,17 @@ func (*OpenConfigTelemetry) SampleConfig() string { return sampleConfig } +func (m *OpenConfigTelemetry) Init() error { + switch m.TimestampSource { + case "", "collection": + case "data": + default: + return fmt.Errorf("unknown option for timestamp_source: %q", m.TimestampSource) + } + + return nil +} + func (m *OpenConfigTelemetry) Gather(_ telegraf.Accumulator) error { return nil } @@ -279,13 +291,23 @@ func (m *OpenConfigTelemetry) collectData( // Print final data collection m.Log.Debugf("Available collection for %s is: %v", grpcServer, dgroups) - tnow := time.Now() + timestamp := time.Now() // Iterate through data groups and add them for _, group := range dgroups { + if m.TimestampSource == "data" { + // OpenConfig timestamp is in milliseconds since epoch + ts, ok := group.data["_timestamp"].(uint64) + if ok { + timestamp = time.UnixMilli(int64(ts)) + } else { + m.Log.Warnf("invalid type %T for _timestamp %v", group.data["_timestamp"], group.data["_timestamp"]) + } + } + if len(group.tags) == 0 { - acc.AddFields(sensor.measurementName, group.data, tags, tnow) + acc.AddFields(sensor.measurementName, group.data, tags, timestamp) } else { - acc.AddFields(sensor.measurementName, group.data, group.tags, tnow) + acc.AddFields(sensor.measurementName, group.data, group.tags, timestamp) } } } @@ -363,8 +385,9 @@ func (m *OpenConfigTelemetry) Start(acc telegraf.Accumulator) error { func init() { inputs.Add("jti_openconfig_telemetry", func() telegraf.Input { return &OpenConfigTelemetry{ - RetryDelay: config.Duration(time.Second), - StrAsTags: false, + RetryDelay: config.Duration(time.Second), + StrAsTags: false, + TimestampSource: "collection", } }) } diff --git a/plugins/inputs/jti_openconfig_telemetry/jti_openconfig_telemetry_test.go b/plugins/inputs/jti_openconfig_telemetry/jti_openconfig_telemetry_test.go index 52119a823893d..12f428ac85599 100644 --- a/plugins/inputs/jti_openconfig_telemetry/jti_openconfig_telemetry_test.go +++ b/plugins/inputs/jti_openconfig_telemetry/jti_openconfig_telemetry_test.go @@ -46,6 +46,14 @@ var dataWithStringValues = &telemetry.OpenConfigData{ {Key: "strKey[tag='tagValue']/strValue", Value: &telemetry.KeyValue_StrValue{StrValue: "10"}}}, } +var dataWithTimestamp = &telemetry.OpenConfigData{ + Path: "/sensor_with_timestamp", + Kv: []*telemetry.KeyValue{ + {Key: "/sensor[tag='tagValue']/intKey", Value: &telemetry.KeyValue_IntValue{IntValue: 10}}, + }, + Timestamp: 1676560510002, +} + type openConfigTelemetryServer struct { telemetry.UnimplementedOpenConfigTelemetryServer } @@ -64,6 +72,8 @@ func (s *openConfigTelemetryServer) TelemetrySubscribe( return stream.Send(dataWithMultipleTags) case "/sensor_with_string_values": return stream.Send(dataWithStringValues) + case "/sensor_with_timestamp": + return stream.Send(dataWithTimestamp) } return nil } @@ -124,6 +134,30 @@ func TestOpenConfigTelemetryData(t *testing.T) { acc.AssertContainsTaggedFields(t, "/sensor", fields, tags) } +func TestOpenConfigTelemetryData_timestamp(t *testing.T) { + var acc testutil.Accumulator + cfg.Sensors = []string{"/sensor_with_timestamp"} + require.NoError(t, cfg.Start(&acc)) + + timestamp := int64(1676560510002) + tags := map[string]string{ + "device": "127.0.0.1", + "/sensor/@tag": "tagValue", + "system_id": "", + "path": "/sensor_with_timestamp", + } + fields := map[string]interface{}{ + "/sensor/intKey": int64(10), + "_sequence": uint64(0), + "_timestamp": uint64(timestamp), + "_component_id": uint32(0), + "_subcomponent_id": uint32(0), + } + + require.Eventually(t, func() bool { return acc.HasMeasurement("/sensor_with_timestamp") }, 5*time.Second, 10*time.Millisecond) + acc.AssertContainsTaggedFields(t, "/sensor_with_timestamp", fields, tags) +} + func TestOpenConfigTelemetryDataWithPrefix(t *testing.T) { var acc testutil.Accumulator cfg.Sensors = []string{"/sensor_with_prefix"} diff --git a/plugins/inputs/jti_openconfig_telemetry/sample.conf b/plugins/inputs/jti_openconfig_telemetry/sample.conf index 500744910a658..c2ae9da12465b 100644 --- a/plugins/inputs/jti_openconfig_telemetry/sample.conf +++ b/plugins/inputs/jti_openconfig_telemetry/sample.conf @@ -33,6 +33,11 @@ "/interfaces", ] + ## Timestamp Source + ## Set to 'collection' for time of collection, and 'data' for using the time + ## provided by the _timestamp field. + # timestamp_source = "collection" + ## Optional TLS Config # enable_tls = false # tls_ca = "/etc/telegraf/ca.pem"