Skip to content

Commit

Permalink
feat(inputs.jti_openconfig_telemetry): Set timestamp from data (#12730)
Browse files Browse the repository at this point in the history
  • Loading branch information
powersj authored Feb 27, 2023
1 parent a90b6eb commit d40f46e
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 6 deletions.
5 changes: 5 additions & 0 deletions plugins/inputs/jti_openconfig_telemetry/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,11 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details.
"/interfaces",
]

## Timestamp Source
## Set to 'collection' for time of collection, and 'data' for using the time
## provided by the _timestamp field.
# timestamp_source = "collection"

## Optional TLS Config
# enable_tls = false
# tls_ca = "/etc/telegraf/ca.pem"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
package jti_openconfig_telemetry

import (
"context"
_ "embed"
"fmt"
"net"
Expand All @@ -10,7 +11,6 @@ import (
"sync"
"time"

"context"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
Expand All @@ -34,6 +34,7 @@ type OpenConfigTelemetry struct {
Username string `toml:"username"`
Password string `toml:"password"`
ClientID string `toml:"client_id"`
TimestampSource string `toml:"timestamp_source"`
SampleFrequency config.Duration `toml:"sample_frequency"`
StrAsTags bool `toml:"str_as_tags"`
RetryDelay config.Duration `toml:"retry_delay"`
Expand All @@ -56,6 +57,17 @@ func (*OpenConfigTelemetry) SampleConfig() string {
return sampleConfig
}

func (m *OpenConfigTelemetry) Init() error {
switch m.TimestampSource {
case "", "collection":
case "data":
default:
return fmt.Errorf("unknown option for timestamp_source: %q", m.TimestampSource)
}

return nil
}

func (m *OpenConfigTelemetry) Gather(_ telegraf.Accumulator) error {
return nil
}
Expand Down Expand Up @@ -279,13 +291,23 @@ func (m *OpenConfigTelemetry) collectData(
// Print final data collection
m.Log.Debugf("Available collection for %s is: %v", grpcServer, dgroups)

tnow := time.Now()
timestamp := time.Now()
// Iterate through data groups and add them
for _, group := range dgroups {
if m.TimestampSource == "data" {
// OpenConfig timestamp is in milliseconds since epoch
ts, ok := group.data["_timestamp"].(uint64)
if ok {
timestamp = time.UnixMilli(int64(ts))
} else {
m.Log.Warnf("invalid type %T for _timestamp %v", group.data["_timestamp"], group.data["_timestamp"])
}
}

if len(group.tags) == 0 {
acc.AddFields(sensor.measurementName, group.data, tags, tnow)
acc.AddFields(sensor.measurementName, group.data, tags, timestamp)
} else {
acc.AddFields(sensor.measurementName, group.data, group.tags, tnow)
acc.AddFields(sensor.measurementName, group.data, group.tags, timestamp)
}
}
}
Expand Down Expand Up @@ -363,8 +385,9 @@ func (m *OpenConfigTelemetry) Start(acc telegraf.Accumulator) error {
func init() {
inputs.Add("jti_openconfig_telemetry", func() telegraf.Input {
return &OpenConfigTelemetry{
RetryDelay: config.Duration(time.Second),
StrAsTags: false,
RetryDelay: config.Duration(time.Second),
StrAsTags: false,
TimestampSource: "collection",
}
})
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,14 @@ var dataWithStringValues = &telemetry.OpenConfigData{
{Key: "strKey[tag='tagValue']/strValue", Value: &telemetry.KeyValue_StrValue{StrValue: "10"}}},
}

var dataWithTimestamp = &telemetry.OpenConfigData{
Path: "/sensor_with_timestamp",
Kv: []*telemetry.KeyValue{
{Key: "/sensor[tag='tagValue']/intKey", Value: &telemetry.KeyValue_IntValue{IntValue: 10}},
},
Timestamp: 1676560510002,
}

type openConfigTelemetryServer struct {
telemetry.UnimplementedOpenConfigTelemetryServer
}
Expand All @@ -64,6 +72,8 @@ func (s *openConfigTelemetryServer) TelemetrySubscribe(
return stream.Send(dataWithMultipleTags)
case "/sensor_with_string_values":
return stream.Send(dataWithStringValues)
case "/sensor_with_timestamp":
return stream.Send(dataWithTimestamp)
}
return nil
}
Expand Down Expand Up @@ -124,6 +134,30 @@ func TestOpenConfigTelemetryData(t *testing.T) {
acc.AssertContainsTaggedFields(t, "/sensor", fields, tags)
}

func TestOpenConfigTelemetryData_timestamp(t *testing.T) {
var acc testutil.Accumulator
cfg.Sensors = []string{"/sensor_with_timestamp"}
require.NoError(t, cfg.Start(&acc))

timestamp := int64(1676560510002)
tags := map[string]string{
"device": "127.0.0.1",
"/sensor/@tag": "tagValue",
"system_id": "",
"path": "/sensor_with_timestamp",
}
fields := map[string]interface{}{
"/sensor/intKey": int64(10),
"_sequence": uint64(0),
"_timestamp": uint64(timestamp),
"_component_id": uint32(0),
"_subcomponent_id": uint32(0),
}

require.Eventually(t, func() bool { return acc.HasMeasurement("/sensor_with_timestamp") }, 5*time.Second, 10*time.Millisecond)
acc.AssertContainsTaggedFields(t, "/sensor_with_timestamp", fields, tags)
}

func TestOpenConfigTelemetryDataWithPrefix(t *testing.T) {
var acc testutil.Accumulator
cfg.Sensors = []string{"/sensor_with_prefix"}
Expand Down
5 changes: 5 additions & 0 deletions plugins/inputs/jti_openconfig_telemetry/sample.conf
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@
"/interfaces",
]

## Timestamp Source
## Set to 'collection' for time of collection, and 'data' for using the time
## provided by the _timestamp field.
# timestamp_source = "collection"

## Optional TLS Config
# enable_tls = false
# tls_ca = "/etc/telegraf/ca.pem"
Expand Down

0 comments on commit d40f46e

Please sign in to comment.