From e593119c3ddcd79531ab90d5aff7800778787fe4 Mon Sep 17 00:00:00 2001 From: Greg <2653109+glinton@users.noreply.github.com> Date: Tue, 4 Feb 2020 08:24:58 -0700 Subject: [PATCH] feat: report telegraf plugin usage metrics (#16378) * Begin implementing retreival of telegraf plugin stats * Implement storing/deletion of telegraf plugin stats * Test plugin stats * Initialize plugins bucket for tests * Add comment * Shorten time and frequency in bolt when providing telegraf plugins metrics * Simplify ticker loop * Leak underlying ticker while still satisfying linter --- bolt/bbolt.go | 1 + bolt/metrics.go | 98 ++++++++++++++++++++++++++++++++++++++++---- kv/telegraf.go | 66 +++++++++++++++++++++++++++-- telegraf.go | 22 ++++++++++ telegraf_test.go | 32 +++++++++++++++ telemetry/metrics.go | 1 + 6 files changed, 209 insertions(+), 11 deletions(-) diff --git a/bolt/bbolt.go b/bolt/bbolt.go index 6b97ed0d6c5..9088d5f54d2 100644 --- a/bolt/bbolt.go +++ b/bolt/bbolt.go @@ -85,6 +85,7 @@ func (c *Client) initialize(ctx context.Context) error { organizationBucket, scraperBucket, telegrafBucket, + telegrafPluginsBucket, userBucket, } for _, bktName := range bkts { diff --git a/bolt/metrics.go b/bolt/metrics.go index 968c96ea49e..af8e46f7aa1 100644 --- a/bolt/metrics.go +++ b/bolt/metrics.go @@ -1,6 +1,9 @@ package bolt import ( + "encoding/json" + "time" + bolt "github.com/coreos/bbolt" "github.com/prometheus/client_golang/prometheus" ) @@ -10,13 +13,14 @@ var _ prometheus.Collector = (*Client)(nil) // available buckets // TODO: nuke this whole thing? var ( - authorizationBucket = []byte("authorizationsv1") - bucketBucket = []byte("bucketsv1") - dashboardBucket = []byte("dashboardsv2") - organizationBucket = []byte("organizationsv1") - scraperBucket = []byte("scraperv2") - telegrafBucket = []byte("telegrafv1") - userBucket = []byte("usersv1") + authorizationBucket = []byte("authorizationsv1") + bucketBucket = []byte("bucketsv1") + dashboardBucket = []byte("dashboardsv2") + organizationBucket = []byte("organizationsv1") + scraperBucket = []byte("scraperv2") + telegrafBucket = []byte("telegrafv1") + telegrafPluginsBucket = []byte("telegrafPluginsv1") + userBucket = []byte("usersv1") ) var ( @@ -55,6 +59,11 @@ var ( "Number of total telegraf configurations on the server", nil, nil) + telegrafPluginsDesc = prometheus.NewDesc( + "influxdb_telegraf_plugins_count", + "Number of individual telegraf plugins configured", + []string{"plugin"}, nil) + boltWritesDesc = prometheus.NewDesc( "boltdb_writes_total", "Total number of boltdb writes", @@ -75,10 +84,40 @@ func (c *Client) Describe(ch chan<- *prometheus.Desc) { ch <- dashboardsDesc ch <- scrapersDesc ch <- telegrafsDesc + ch <- telegrafPluginsDesc ch <- boltWritesDesc ch <- boltReadsDesc } +type instaTicker struct { + tick chan struct{} + timeCh <-chan time.Time +} + +var ( + // ticker is this influx' timer for when to renew the cache of configured plugin metrics. + ticker *instaTicker + // telegrafPlugins is a cache of this influx' metrics of configured plugins. + telegrafPlugins = map[string]float64{} +) + +// Initialize a simple channel that will instantly "tick", +// backed by a time.Ticker's channel. +func init() { + ticker = &instaTicker{ + tick: make(chan struct{}, 1), + timeCh: time.NewTicker(time.Minute * 59).C, + } + + ticker.tick <- struct{}{} + + go func() { + for range ticker.timeCh { + ticker.tick <- struct{}{} + } + }() +} + // Collect returns the current state of all metrics of the collector. func (c *Client) Collect(ch chan<- prometheus.Metric) { stats := c.db.Stats() @@ -107,7 +146,41 @@ func (c *Client) Collect(ch chan<- prometheus.Metric) { telegrafs = tx.Bucket(telegrafBucket).Stats().KeyN tokens = tx.Bucket(authorizationBucket).Stats().KeyN users = tx.Bucket(userBucket).Stats().KeyN - return nil + + // Only process and store telegraf configs once per hour. + select { + case <-ticker.tick: + // Clear plugins from last check. + telegrafPlugins = map[string]float64{} + rawPlugins := [][]byte{} + + // Loop through all reported number of plugins in the least intrusive way + // (vs a global map and locking every time a config is updated). + tx.Bucket(telegrafPluginsBucket).ForEach(func(k, v []byte) error { + rawPlugins = append(rawPlugins, v) + return nil + }) + + for _, v := range rawPlugins { + pStats := map[string]float64{} + err := json.Unmarshal(v, &pStats) + if err != nil { + return err + } + + for k, v := range pStats { + if _, ok := telegrafPlugins[k]; ok { + telegrafPlugins[k] += v + } else { + telegrafPlugins[k] = v + } + } + } + + return nil + default: + return nil + } }) ch <- prometheus.MustNewConstMetric( @@ -151,4 +224,13 @@ func (c *Client) Collect(ch chan<- prometheus.Metric) { prometheus.CounterValue, float64(telegrafs), ) + + for k, v := range telegrafPlugins { + ch <- prometheus.MustNewConstMetric( + telegrafPluginsDesc, + prometheus.GaugeValue, + v, + k, // Adds a label for plugin type.name. + ) + } } diff --git a/kv/telegraf.go b/kv/telegraf.go index b5071c7b490..19ee91d1ebf 100644 --- a/kv/telegraf.go +++ b/kv/telegraf.go @@ -68,7 +68,8 @@ func ErrUnprocessableTelegraf(err error) *influxdb.Error { } var ( - telegrafBucket = []byte("telegrafv1") + telegrafBucket = []byte("telegrafv1") + telegrafPluginsBucket = []byte("telegrafPluginsv1") ) var _ influxdb.TelegrafConfigStore = (*Service)(nil) @@ -77,6 +78,9 @@ func (s *Service) initializeTelegraf(ctx context.Context, tx Tx) error { if _, err := s.telegrafBucket(tx); err != nil { return err } + if _, err := s.telegrafPluginsBucket(tx); err != nil { + return err + } return nil } @@ -88,6 +92,14 @@ func (s *Service) telegrafBucket(tx Tx) (Bucket, error) { return b, nil } +func (s *Service) telegrafPluginsBucket(tx Tx) (Bucket, error) { + b, err := tx.Bucket(telegrafPluginsBucket) + if err != nil { + return nil, UnavailableTelegrafServiceError(err) + } + return b, nil +} + // FindTelegrafConfigByID returns a single telegraf config by ID. func (s *Service) FindTelegrafConfigByID(ctx context.Context, id influxdb.ID) (*influxdb.TelegrafConfig, error) { var ( @@ -195,6 +207,25 @@ func (s *Service) putTelegrafConfig(ctx context.Context, tx Tx, tc *influxdb.Tel if err := bucket.Put(encodedID, v); err != nil { return UnavailableTelegrafServiceError(err) } + + return s.putTelegrafConfigStats(encodedID, tx, tc) +} + +func (s *Service) putTelegrafConfigStats(encodedID []byte, tx Tx, tc *influxdb.TelegrafConfig) error { + bucket, err := s.telegrafPluginsBucket(tx) + if err != nil { + return err + } + + v, err := marshalTelegrafPlugins(tc.CountPlugins()) + if err != nil { + return err + } + + if err := bucket.Put(encodedID, v); err != nil { + return UnavailableTelegrafServiceError(err) + } + return nil } @@ -274,10 +305,31 @@ func (s *Service) deleteTelegrafConfig(ctx context.Context, tx Tx, id influxdb.I return UnavailableTelegrafServiceError(err) } - return s.deleteUserResourceMappings(ctx, tx, influxdb.UserResourceMappingFilter{ + if err := s.deleteUserResourceMappings(ctx, tx, influxdb.UserResourceMappingFilter{ ResourceID: id, ResourceType: influxdb.TelegrafsResourceType, - }) + }); err != nil { + return err + } + + return s.deleteTelegrafConfigStats(encodedID, tx) +} + +func (s *Service) deleteTelegrafConfigStats(encodedID []byte, tx Tx) error { + bucket, err := s.telegrafPluginsBucket(tx) + if err != nil { + return err + } + + if err := bucket.Delete(encodedID); err != nil { + return &influxdb.Error{ + Code: influxdb.EInternal, + Msg: fmt.Sprintf("Unable to connect to telegraf config stats service. Please try again; Err: %v", err), + Op: "kv/telegraf", + } + } + + return nil } // unmarshalTelegraf turns the stored byte slice in the kv into a *influxdb.TelegrafConfig. @@ -296,3 +348,11 @@ func marshalTelegraf(tc *influxdb.TelegrafConfig) ([]byte, error) { } return v, nil } + +func marshalTelegrafPlugins(plugins map[string]float64) ([]byte, error) { + v, err := json.Marshal(plugins) + if err != nil { + return nil, ErrUnprocessableTelegraf(err) + } + return v, nil +} diff --git a/telegraf.go b/telegraf.go index 27f90c05536..62dafda94b0 100644 --- a/telegraf.go +++ b/telegraf.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" "fmt" + "regexp" "github.com/BurntSushi/toml" "github.com/influxdata/influxdb/telegraf/plugins" @@ -70,6 +71,27 @@ type TelegrafConfig struct { Metadata map[string]interface{} `json:"metadata,omitempty"` // Metadata for the config. } +var pluginCount = regexp.MustCompilePOSIX(`\[\[(inputs\..*|outputs\..*|aggregators\..*|processors\..*)\]\]`) + +// CountPlugins returns a map of the number of times each plugin is used. +func (tc *TelegrafConfig) CountPlugins() map[string]float64 { + plugins := map[string]float64{} + founds := pluginCount.FindAllStringSubmatch(tc.Config, -1) + + for _, v := range founds { + if len(v) < 2 { + continue + } + if _, ok := plugins[v[1]]; ok { + plugins[v[1]]++ + } else { + plugins[v[1]] = 1 + } + } + + return plugins +} + // UnmarshalJSON implement the json.Unmarshaler interface. // Gets called when reading from the kv db. mostly legacy so loading old/stored configs still work. // May not remove for a while. Primarily will get hit when user views/downloads config. diff --git a/telegraf_test.go b/telegraf_test.go index cb153c619c6..637db7fb642 100644 --- a/telegraf_test.go +++ b/telegraf_test.go @@ -610,3 +610,35 @@ func TestLegacyStruct(t *testing.T) { t.Fatalf("telegraf config's toml is incorrect, got %+v", tc.Config) } } + +func TestCountPlugins(t *testing.T) { + tc := TelegrafConfig{ + Name: "test", + Config: ` +[[inputs.file]] + some = "config" +[[inputs.file]] + some = "config" +[[outputs.influxdb_v2]] + some = "config" +[[inputs.cpu]] + some = "config" +[[outputs.stuff]] + some = "config" +[[aggregators.thing]] + some = "config" +[[processors.thing]] + some = "config" +[[serializers.thing]] + some = "config" +[[inputs.file]] + some = "config" +`, + } + + pCount := tc.CountPlugins() + + require.Equal(t, 6, len(pCount)) + + require.Equal(t, float64(3), pCount["inputs.file"]) +} diff --git a/telemetry/metrics.go b/telemetry/metrics.go index eb01b96e2cb..97837a62b1c 100644 --- a/telemetry/metrics.go +++ b/telemetry/metrics.go @@ -20,6 +20,7 @@ var telemetryMatcher = pr.NewMatcher(). Family("influxdb_dashboards_total"). Family("influxdb_scrapers_total"). Family("influxdb_telegrafs_total"). + Family("influxdb_telegraf_plugins_count"). Family("task_scheduler_claims_active"). // Count of currently active tasks /* * Count of API requests including success and failure