Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add dynamic tagging to gnmi plugin #7484

Merged
merged 3 commits into from
Feb 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 14 additions & 2 deletions plugins/inputs/gnmi/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,23 @@ It has been optimized to support gNMI telemetry as produced by Cisco IOS XR (64-

## If suppression is enabled, send updates at least every X seconds anyway
# heartbeat_interval = "60s"

#[[inputs.gnmi.subscription]]
# name = "descr"
# origin = "openconfig-interfaces"
# path = "/interfaces/interface/state/description"
# subscription_mode = "on_change"

## If tag_only is set, the subscription in question will be utilized to maintain a map of
## tags to apply to other measurements emitted by the plugin, by matching path keys
## All fields from the tag-only subscription will be applied as tags to other readings,
## in the format <name>_<fieldBase>.
# tag_only = true
```

## Example Output

```shell
ifcounters,path=openconfig-interfaces:/interfaces/interface/state/counters,host=linux,name=MgmtEth0/RP0/CPU0/0,source=10.49.234.115 in-multicast-pkts=0i,out-multicast-pkts=0i,out-errors=0i,out-discards=0i,in-broadcast-pkts=0i,out-broadcast-pkts=0i,in-discards=0i,in-unknown-protos=0i,in-errors=0i,out-unicast-pkts=0i,in-octets=0i,out-octets=0i,last-clear="2019-05-22T16:53:21Z",in-unicast-pkts=0i 1559145777425000000
ifcounters,path=openconfig-interfaces:/interfaces/interface/state/counters,host=linux,name=GigabitEthernet0/0/0/0,source=10.49.234.115 out-multicast-pkts=0i,out-broadcast-pkts=0i,in-errors=0i,out-errors=0i,in-discards=0i,out-octets=0i,in-unknown-protos=0i,in-unicast-pkts=0i,in-octets=0i,in-multicast-pkts=0i,in-broadcast-pkts=0i,last-clear="2019-05-22T16:54:50Z",out-unicast-pkts=0i,out-discards=0i 1559145777425000000
ifcounters,path=openconfig-interfaces:/interfaces/interface/state/counters,host=linux,name=MgmtEth0/RP0/CPU0/0,source=10.49.234.115,descr/description=Foo in-multicast-pkts=0i,out-multicast-pkts=0i,out-errors=0i,out-discards=0i,in-broadcast-pkts=0i,out-broadcast-pkts=0i,in-discards=0i,in-unknown-protos=0i,in-errors=0i,out-unicast-pkts=0i,in-octets=0i,out-octets=0i,last-clear="2019-05-22T16:53:21Z",in-unicast-pkts=0i 1559145777425000000
ifcounters,path=openconfig-interfaces:/interfaces/interface/state/counters,host=linux,name=GigabitEthernet0/0/0/0,source=10.49.234.115,descr/description=Bar out-multicast-pkts=0i,out-broadcast-pkts=0i,in-errors=0i,out-errors=0i,in-discards=0i,out-octets=0i,in-unknown-protos=0i,in-unicast-pkts=0i,in-octets=0i,in-multicast-pkts=0i,in-broadcast-pkts=0i,last-clear="2019-05-22T16:54:50Z",out-unicast-pkts=0i,out-discards=0i 1559145777425000000
```
46 changes: 46 additions & 0 deletions plugins/inputs/gnmi/gnmi.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ type GNMI struct {
acc telegraf.Accumulator
cancel context.CancelFunc
wg sync.WaitGroup
// Lookup/device+name/key/value
lookup map[string]map[string]map[string]interface{}

Log telegraf.Logger
}
Expand All @@ -73,6 +75,9 @@ type Subscription struct {
// Duplicate suppression
SuppressRedundant bool `toml:"suppress_redundant"`
HeartbeatInterval config.Duration `toml:"heartbeat_interval"`

// Mark this subscription as a tag-only lookup source, not emitting any metric
TagOnly bool `toml:"tag_only"`
srebhan marked this conversation as resolved.
Show resolved Hide resolved
}

// Start the http listener service
Expand All @@ -83,6 +88,7 @@ func (c *GNMI) Start(acc telegraf.Accumulator) error {
var request *gnmiLib.SubscribeRequest
c.acc = acc
ctx, c.cancel = context.WithCancel(context.Background())
c.lookup = make(map[string]map[string]map[string]interface{})

// Validate configuration
if request, err = c.newSubscribeRequest(); err != nil {
Expand Down Expand Up @@ -133,6 +139,11 @@ func (c *GNMI) Start(acc telegraf.Accumulator) error {
c.internalAliases[longPath] = name
c.internalAliases[shortPath] = name
}

if subscription.TagOnly {
// Create the top-level lookup for this tag
c.lookup[name] = make(map[string]map[string]interface{})
}
}
for alias, encodingPath := range c.Aliases {
c.internalAliases[encodingPath] = alias
Expand Down Expand Up @@ -297,6 +308,29 @@ func (c *GNMI) handleSubscribeResponseUpdate(address string, response *gnmiLib.S
}
}

// Update tag lookups and discard rest of update
subscriptionKey := tags["source"] + "/" + tags["name"]
if _, ok := c.lookup[name]; ok {
// We are subscribed to this, so add the fields to the lookup-table
if _, ok := c.lookup[name][subscriptionKey]; !ok {
c.lookup[name][subscriptionKey] = make(map[string]interface{})
}
for k, v := range fields {
c.lookup[name][subscriptionKey][path.Base(k)] = v
}
// Do not process the data further as we only subscribed here for the lookup table
continue
}

// Apply lookups if present
for subscriptionName, values := range c.lookup {
if annotations, ok := values[subscriptionKey]; ok {
for k, v := range annotations {
tags[subscriptionName+"/"+k] = v.(string)
}
}
}

// Group metrics
for k, v := range fields {
key := k
Expand Down Expand Up @@ -559,6 +593,18 @@ const sampleConfig = `

## If suppression is enabled, send updates at least every X seconds anyway
# heartbeat_interval = "60s"

#[[inputs.gnmi.subscription]]
# name = "descr"
# origin = "openconfig-interfaces"
# path = "/interfaces/interface/state/description"
# subscription_mode = "on_change"

## If tag_only is set, the subscription in question will be utilized to maintain a map of
## tags to apply to other measurements emitted by the plugin, by matching path keys
## All fields from the tag-only subscription will be applied as tags to other readings,
## in the format <name>_<fieldBase>.
# tag_only = true
`

// SampleConfig of plugin
Expand Down
120 changes: 120 additions & 0 deletions plugins/inputs/gnmi/gnmi_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -371,6 +371,126 @@ func TestNotification(t *testing.T) {
),
},
},
{
name: "tagged update pair",
plugin: &GNMI{
Log: testutil.Logger{},
Encoding: "proto",
Redial: config.Duration(1 * time.Second),
Subscriptions: []Subscription{
{
Name: "oc-intf-desc",
Origin: "openconfig-interfaces",
Path: "/interfaces/interface/state/description",
SubscriptionMode: "on_change",
TagOnly: true,
},
{
Name: "oc-intf-counters",
Origin: "openconfig-interfaces",
Path: "/interfaces/interface/state/counters",
SubscriptionMode: "sample",
},
},
},
server: &MockServer{
SubscribeF: func(server gnmiLib.GNMI_SubscribeServer) error {
tagResponse := &gnmiLib.SubscribeResponse{
Response: &gnmiLib.SubscribeResponse_Update{
Update: &gnmiLib.Notification{
Timestamp: 1543236571000000000,
Prefix: &gnmiLib.Path{},
Update: []*gnmiLib.Update{
{
Path: &gnmiLib.Path{
Origin: "",
Elem: []*gnmiLib.PathElem{
{
Name: "interfaces",
},
{
Name: "interface",
Key: map[string]string{"name": "Ethernet1"},
},
{
Name: "state",
},
{
Name: "description",
},
},
Target: "",
},
Val: &gnmiLib.TypedValue{
Value: &gnmiLib.TypedValue_StringVal{StringVal: "foo"},
},
},
},
},
},
}
if err := server.Send(tagResponse); err != nil {
return err
}
if err := server.Send(&gnmiLib.SubscribeResponse{Response: &gnmiLib.SubscribeResponse_SyncResponse{SyncResponse: true}}); err != nil {
return err
}
taggedResponse := &gnmiLib.SubscribeResponse{
Response: &gnmiLib.SubscribeResponse_Update{
Update: &gnmiLib.Notification{
Timestamp: 1543236572000000000,
Prefix: &gnmiLib.Path{},
Update: []*gnmiLib.Update{
{
Path: &gnmiLib.Path{
Origin: "",
Elem: []*gnmiLib.PathElem{
{
Name: "interfaces",
},
{
Name: "interface",
Key: map[string]string{"name": "Ethernet1"},
},
{
Name: "state",
},
{
Name: "counters",
},
{
Name: "in-broadcast-pkts",
},
},
Target: "",
},
Val: &gnmiLib.TypedValue{
Value: &gnmiLib.TypedValue_IntVal{IntVal: 42},
},
},
},
},
},
}
return server.Send(taggedResponse)
},
},
expected: []telegraf.Metric{
testutil.MustMetric(
"oc-intf-counters",
map[string]string{
"path": "",
"source": "127.0.0.1",
"name": "Ethernet1",
"oc-intf-desc/description": "foo",
},
map[string]interface{}{
"in_broadcast_pkts": 42,
},
time.Unix(0, 0),
),
},
},
}

for _, tt := range tests {
Expand Down