From 9b44dc957542388de10cf85e661477e094934f96 Mon Sep 17 00:00:00 2001 From: Brandon Ewing Date: Fri, 8 May 2020 16:23:26 -0500 Subject: [PATCH] Add dynamic tagging to cisco_telemetry_gnmi --- plugins/inputs/cisco_telemetry_gnmi/README.md | 16 ++- .../cisco_telemetry_gnmi.go | 59 +++++++++ .../cisco_telemetry_gnmi_test.go | 117 ++++++++++++++++++ 3 files changed, 190 insertions(+), 2 deletions(-) diff --git a/plugins/inputs/cisco_telemetry_gnmi/README.md b/plugins/inputs/cisco_telemetry_gnmi/README.md index d12817da1f0ad..6d3d7c93648af 100644 --- a/plugins/inputs/cisco_telemetry_gnmi/README.md +++ b/plugins/inputs/cisco_telemetry_gnmi/README.md @@ -63,10 +63,22 @@ It has been optimized to support GNMI telemetry as produced by Cisco IOS XR (64- ## If suppression is enabled, send updates at least every X seconds anyway # heartbeat_interval = "60s" + + [[inputs.cisco_telemetry_gnmi.subscription]] + name = "descr" + origin = "openconfig-interfaces" + path = "/interfaces/interface/state/description" + subscription_mode = "on_change" + + ## If tag_only is set, the subscription in question will be utilized to maintain a map of + ## tags to apply to other measurements emitted by the plugin, by matching path keys + ## All fields from the tag-only subscription will be applied as tags to other readings, + ## in the format _. + tag_only = true ``` ### Example Output ``` -ifcounters,path=openconfig-interfaces:/interfaces/interface/state/counters,host=linux,name=MgmtEth0/RP0/CPU0/0,source=10.49.234.115 in-multicast-pkts=0i,out-multicast-pkts=0i,out-errors=0i,out-discards=0i,in-broadcast-pkts=0i,out-broadcast-pkts=0i,in-discards=0i,in-unknown-protos=0i,in-errors=0i,out-unicast-pkts=0i,in-octets=0i,out-octets=0i,last-clear="2019-05-22T16:53:21Z",in-unicast-pkts=0i 1559145777425000000 -ifcounters,path=openconfig-interfaces:/interfaces/interface/state/counters,host=linux,name=GigabitEthernet0/0/0/0,source=10.49.234.115 out-multicast-pkts=0i,out-broadcast-pkts=0i,in-errors=0i,out-errors=0i,in-discards=0i,out-octets=0i,in-unknown-protos=0i,in-unicast-pkts=0i,in-octets=0i,in-multicast-pkts=0i,in-broadcast-pkts=0i,last-clear="2019-05-22T16:54:50Z",out-unicast-pkts=0i,out-discards=0i 1559145777425000000 +ifcounters,path=openconfig-interfaces:/interfaces/interface/state/counters,host=linux,name=MgmtEth0/RP0/CPU0/0,source=10.49.234.115,descr_description=Foo in-multicast-pkts=0i,out-multicast-pkts=0i,out-errors=0i,out-discards=0i,in-broadcast-pkts=0i,out-broadcast-pkts=0i,in-discards=0i,in-unknown-protos=0i,in-errors=0i,out-unicast-pkts=0i,in-octets=0i,out-octets=0i,last-clear="2019-05-22T16:53:21Z",in-unicast-pkts=0i 1559145777425000000 +ifcounters,path=openconfig-interfaces:/interfaces/interface/state/counters,host=linux,name=GigabitEthernet0/0/0/0,source=10.49.234.115,descr_description=Bar out-multicast-pkts=0i,out-broadcast-pkts=0i,in-errors=0i,out-errors=0i,in-discards=0i,out-octets=0i,in-unknown-protos=0i,in-unicast-pkts=0i,in-octets=0i,in-multicast-pkts=0i,in-broadcast-pkts=0i,last-clear="2019-05-22T16:54:50Z",out-unicast-pkts=0i,out-discards=0i 1559145777425000000 ``` diff --git a/plugins/inputs/cisco_telemetry_gnmi/cisco_telemetry_gnmi.go b/plugins/inputs/cisco_telemetry_gnmi/cisco_telemetry_gnmi.go index 894b7feb0d196..af78830fbfd5c 100644 --- a/plugins/inputs/cisco_telemetry_gnmi/cisco_telemetry_gnmi.go +++ b/plugins/inputs/cisco_telemetry_gnmi/cisco_telemetry_gnmi.go @@ -55,6 +55,8 @@ type CiscoTelemetryGNMI struct { acc telegraf.Accumulator cancel context.CancelFunc wg sync.WaitGroup + // Lookup/device/name/key/value + lookup map[string]map[string]map[string]map[string]interface{} Log telegraf.Logger } @@ -72,6 +74,9 @@ type Subscription struct { // Duplicate suppression SuppressRedundant bool `toml:"suppress_redundant"` HeartbeatInterval internal.Duration `toml:"heartbeat_interval"` + + // Tag-only identification + TagOnly bool `toml:"tag_only"` } // Start the http listener service @@ -82,6 +87,7 @@ func (c *CiscoTelemetryGNMI) Start(acc telegraf.Accumulator) error { var request *gnmi.SubscribeRequest c.acc = acc ctx, c.cancel = context.WithCancel(context.Background()) + c.lookup = make(map[string]map[string]map[string]map[string]interface{}) // Validate configuration if request, err = c.newSubscribeRequest(); err != nil { @@ -126,6 +132,11 @@ func (c *CiscoTelemetryGNMI) Start(acc telegraf.Accumulator) error { c.aliases[longPath] = name c.aliases[shortPath] = name } + + if subscription.TagOnly { + // Create the top-level lookup for this tag + c.lookup[name] = make(map[string]map[string]map[string]interface{}) + } } for alias, path := range c.Aliases { c.aliases[path] = alias @@ -134,6 +145,11 @@ func (c *CiscoTelemetryGNMI) Start(acc telegraf.Accumulator) error { // Create a goroutine for each device, dial and subscribe c.wg.Add(len(c.Addresses)) for _, addr := range c.Addresses { + // Update the lookup table with this address + for lu := range c.lookup { + hostname, _, _ := net.SplitHostPort(addr) + c.lookup[lu][hostname] = make(map[string]map[string]interface{}) + } go func(address string) { defer c.wg.Done() for ctx.Err() == nil { @@ -280,6 +296,24 @@ func (c *CiscoTelemetryGNMI) handleSubscribeResponse(address string, reply *gnmi } } + // Update tag lookups and discard rest of update + if lu, ok := c.lookup[name]; ok { + if err := updateLookups(lu[tags["source"]], tags, fields); err != nil { + c.Log.Debugf("Error updating lookups") + } + continue + } + + // Apply lookups if present + for k, v := range c.lookup { + if t, ok := v[tags["source"]][tags["name"]]; ok { + for name, val := range t { + tagName := fmt.Sprintf("%s_%s", k, name) + tags[tagName] = val.(string) + } + } + } + // Group metrics for k, v := range fields { key := k @@ -312,6 +346,19 @@ func (c *CiscoTelemetryGNMI) handleSubscribeResponse(address string, reply *gnmi } } +func updateLookups(lu map[string]map[string]interface{}, tags map[string]string, fields map[string]interface{}) error { + name, ok := lu[tags["name"]] + if !ok { + name = make(map[string]interface{}) + lu[tags["name"]] = name + } + for k, v := range fields { + shortName := path.Base(k) + name[shortName] = v + } + return nil +} + // HandleTelemetryField and add it to a measurement func (c *CiscoTelemetryGNMI) handleTelemetryField(update *gnmi.Update, tags map[string]string, prefix string) (string, map[string]interface{}) { path, aliasPath := c.handlePath(update.Path, tags, prefix) @@ -529,6 +576,18 @@ const sampleConfig = ` ## If suppression is enabled, send updates at least every X seconds anyway # heartbeat_interval = "60s" + + [[inputs.cisco_telemetry_gnmi.subscription]] + name = "descr" + origin = "openconfig-interfaces" + path = "/interfaces/interface/state/description" + subscription_mode = "on_change" + + ## If tag_only is set, the subscription in question will be utilized to maintain a map of + ## tags to apply to other measurements emitted by the plugin, by matching path keys + ## All fields from the tag-only subscription will be applied as tags to other readings, + ## in the format _. + tag_only = true ` // SampleConfig of plugin diff --git a/plugins/inputs/cisco_telemetry_gnmi/cisco_telemetry_gnmi_test.go b/plugins/inputs/cisco_telemetry_gnmi/cisco_telemetry_gnmi_test.go index 1b12886b9efcf..6e364fdb86746 100644 --- a/plugins/inputs/cisco_telemetry_gnmi/cisco_telemetry_gnmi_test.go +++ b/plugins/inputs/cisco_telemetry_gnmi/cisco_telemetry_gnmi_test.go @@ -367,6 +367,123 @@ func TestNotification(t *testing.T) { ), }, }, + { + name: "tagged update pair", + plugin: &CiscoTelemetryGNMI{ + Log: testutil.Logger{}, + Encoding: "proto", + Redial: internal.Duration{Duration: 1 * time.Second}, + Subscriptions: []Subscription{ + { + Name: "oc-intf-desc", + Origin: "openconfig-interfaces", + Path: "/interfaces/interface/state/description", + SubscriptionMode: "on_change", + TagOnly: true, + }, + { + Name: "oc-intf-counters", + Origin: "openconfig-interfaces", + Path: "/interfaces/interface/state/counters", + SubscriptionMode: "sample", + }, + }, + }, + server: &MockServer{ + SubscribeF: func(server gnmi.GNMI_SubscribeServer) error { + tagResponse := &gnmi.SubscribeResponse{ + Response: &gnmi.SubscribeResponse_Update{ + Update: &gnmi.Notification{ + Timestamp: 1543236571000000000, + Prefix: &gnmi.Path{}, + Update: []*gnmi.Update{ + { + Path: &gnmi.Path{ + Origin: "", + Elem: []*gnmi.PathElem{ + { + Name: "interfaces", + }, + { + Name: "interface", + Key: map[string]string{"name": "Ethernet1"}, + }, + { + Name: "state", + }, + { + Name: "description", + }, + }, + Target: "", + }, + Val: &gnmi.TypedValue{ + Value: &gnmi.TypedValue_StringVal{StringVal: "foo"}, + }, + }, + }, + }, + }, + } + server.Send(tagResponse) + server.Send(&gnmi.SubscribeResponse{Response: &gnmi.SubscribeResponse_SyncResponse{SyncResponse: true}}) + taggedResponse := &gnmi.SubscribeResponse{ + Response: &gnmi.SubscribeResponse_Update{ + Update: &gnmi.Notification{ + Timestamp: 1543236572000000000, + Prefix: &gnmi.Path{}, + Update: []*gnmi.Update{ + { + Path: &gnmi.Path{ + Origin: "", + Elem: []*gnmi.PathElem{ + { + Name: "interfaces", + }, + { + Name: "interface", + Key: map[string]string{"name": "Ethernet1"}, + }, + { + Name: "state", + }, + { + Name: "counters", + }, + { + Name: "in-broadcast-pkts", + }, + }, + Target: "", + }, + Val: &gnmi.TypedValue{ + Value: &gnmi.TypedValue_IntVal{IntVal: 42}, + }, + }, + }, + }, + }, + } + server.Send(taggedResponse) + return nil + }, + }, + expected: []telegraf.Metric{ + testutil.MustMetric( + "oc-intf-counters", + map[string]string{ + "path": "", + "source": "127.0.0.1", + "name": "Ethernet1", + "oc-intf-desc_description": "foo", + }, + map[string]interface{}{ + "in_broadcast_pkts": 42, + }, + time.Unix(0, 0), + ), + }, + }, } for _, tt := range tests {