Skip to content

Commit

Permalink
feat: report telegraf plugin usage metrics (#16378)
Browse files Browse the repository at this point in the history
* Begin implementing retreival of telegraf plugin stats

* Implement storing/deletion of telegraf plugin stats

* Test plugin stats

* Initialize plugins bucket for tests

* Add comment

* Shorten time and frequency in bolt when providing telegraf plugins metrics

* Simplify ticker loop

* Leak underlying ticker while still satisfying linter
  • Loading branch information
glinton authored Feb 4, 2020
1 parent b6b5b4d commit e593119
Show file tree
Hide file tree
Showing 6 changed files with 209 additions and 11 deletions.
1 change: 1 addition & 0 deletions bolt/bbolt.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ func (c *Client) initialize(ctx context.Context) error {
organizationBucket,
scraperBucket,
telegrafBucket,
telegrafPluginsBucket,
userBucket,
}
for _, bktName := range bkts {
Expand Down
98 changes: 90 additions & 8 deletions bolt/metrics.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package bolt

import (
"encoding/json"
"time"

bolt "github.com/coreos/bbolt"
"github.com/prometheus/client_golang/prometheus"
)
Expand All @@ -10,13 +13,14 @@ var _ prometheus.Collector = (*Client)(nil)
// available buckets
// TODO: nuke this whole thing?
var (
authorizationBucket = []byte("authorizationsv1")
bucketBucket = []byte("bucketsv1")
dashboardBucket = []byte("dashboardsv2")
organizationBucket = []byte("organizationsv1")
scraperBucket = []byte("scraperv2")
telegrafBucket = []byte("telegrafv1")
userBucket = []byte("usersv1")
authorizationBucket = []byte("authorizationsv1")
bucketBucket = []byte("bucketsv1")
dashboardBucket = []byte("dashboardsv2")
organizationBucket = []byte("organizationsv1")
scraperBucket = []byte("scraperv2")
telegrafBucket = []byte("telegrafv1")
telegrafPluginsBucket = []byte("telegrafPluginsv1")
userBucket = []byte("usersv1")
)

var (
Expand Down Expand Up @@ -55,6 +59,11 @@ var (
"Number of total telegraf configurations on the server",
nil, nil)

telegrafPluginsDesc = prometheus.NewDesc(
"influxdb_telegraf_plugins_count",
"Number of individual telegraf plugins configured",
[]string{"plugin"}, nil)

boltWritesDesc = prometheus.NewDesc(
"boltdb_writes_total",
"Total number of boltdb writes",
Expand All @@ -75,10 +84,40 @@ func (c *Client) Describe(ch chan<- *prometheus.Desc) {
ch <- dashboardsDesc
ch <- scrapersDesc
ch <- telegrafsDesc
ch <- telegrafPluginsDesc
ch <- boltWritesDesc
ch <- boltReadsDesc
}

type instaTicker struct {
tick chan struct{}
timeCh <-chan time.Time
}

var (
// ticker is this influx' timer for when to renew the cache of configured plugin metrics.
ticker *instaTicker
// telegrafPlugins is a cache of this influx' metrics of configured plugins.
telegrafPlugins = map[string]float64{}
)

// Initialize a simple channel that will instantly "tick",
// backed by a time.Ticker's channel.
func init() {
ticker = &instaTicker{
tick: make(chan struct{}, 1),
timeCh: time.NewTicker(time.Minute * 59).C,
}

ticker.tick <- struct{}{}

go func() {
for range ticker.timeCh {
ticker.tick <- struct{}{}
}
}()
}

// Collect returns the current state of all metrics of the collector.
func (c *Client) Collect(ch chan<- prometheus.Metric) {
stats := c.db.Stats()
Expand Down Expand Up @@ -107,7 +146,41 @@ func (c *Client) Collect(ch chan<- prometheus.Metric) {
telegrafs = tx.Bucket(telegrafBucket).Stats().KeyN
tokens = tx.Bucket(authorizationBucket).Stats().KeyN
users = tx.Bucket(userBucket).Stats().KeyN
return nil

// Only process and store telegraf configs once per hour.
select {
case <-ticker.tick:
// Clear plugins from last check.
telegrafPlugins = map[string]float64{}
rawPlugins := [][]byte{}

// Loop through all reported number of plugins in the least intrusive way
// (vs a global map and locking every time a config is updated).
tx.Bucket(telegrafPluginsBucket).ForEach(func(k, v []byte) error {
rawPlugins = append(rawPlugins, v)
return nil
})

for _, v := range rawPlugins {
pStats := map[string]float64{}
err := json.Unmarshal(v, &pStats)
if err != nil {
return err
}

for k, v := range pStats {
if _, ok := telegrafPlugins[k]; ok {
telegrafPlugins[k] += v
} else {
telegrafPlugins[k] = v
}
}
}

return nil
default:
return nil
}
})

ch <- prometheus.MustNewConstMetric(
Expand Down Expand Up @@ -151,4 +224,13 @@ func (c *Client) Collect(ch chan<- prometheus.Metric) {
prometheus.CounterValue,
float64(telegrafs),
)

for k, v := range telegrafPlugins {
ch <- prometheus.MustNewConstMetric(
telegrafPluginsDesc,
prometheus.GaugeValue,
v,
k, // Adds a label for plugin type.name.
)
}
}
66 changes: 63 additions & 3 deletions kv/telegraf.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,8 @@ func ErrUnprocessableTelegraf(err error) *influxdb.Error {
}

var (
telegrafBucket = []byte("telegrafv1")
telegrafBucket = []byte("telegrafv1")
telegrafPluginsBucket = []byte("telegrafPluginsv1")
)

var _ influxdb.TelegrafConfigStore = (*Service)(nil)
Expand All @@ -77,6 +78,9 @@ func (s *Service) initializeTelegraf(ctx context.Context, tx Tx) error {
if _, err := s.telegrafBucket(tx); err != nil {
return err
}
if _, err := s.telegrafPluginsBucket(tx); err != nil {
return err
}
return nil
}

Expand All @@ -88,6 +92,14 @@ func (s *Service) telegrafBucket(tx Tx) (Bucket, error) {
return b, nil
}

func (s *Service) telegrafPluginsBucket(tx Tx) (Bucket, error) {
b, err := tx.Bucket(telegrafPluginsBucket)
if err != nil {
return nil, UnavailableTelegrafServiceError(err)
}
return b, nil
}

// FindTelegrafConfigByID returns a single telegraf config by ID.
func (s *Service) FindTelegrafConfigByID(ctx context.Context, id influxdb.ID) (*influxdb.TelegrafConfig, error) {
var (
Expand Down Expand Up @@ -195,6 +207,25 @@ func (s *Service) putTelegrafConfig(ctx context.Context, tx Tx, tc *influxdb.Tel
if err := bucket.Put(encodedID, v); err != nil {
return UnavailableTelegrafServiceError(err)
}

return s.putTelegrafConfigStats(encodedID, tx, tc)
}

func (s *Service) putTelegrafConfigStats(encodedID []byte, tx Tx, tc *influxdb.TelegrafConfig) error {
bucket, err := s.telegrafPluginsBucket(tx)
if err != nil {
return err
}

v, err := marshalTelegrafPlugins(tc.CountPlugins())
if err != nil {
return err
}

if err := bucket.Put(encodedID, v); err != nil {
return UnavailableTelegrafServiceError(err)
}

return nil
}

Expand Down Expand Up @@ -274,10 +305,31 @@ func (s *Service) deleteTelegrafConfig(ctx context.Context, tx Tx, id influxdb.I
return UnavailableTelegrafServiceError(err)
}

return s.deleteUserResourceMappings(ctx, tx, influxdb.UserResourceMappingFilter{
if err := s.deleteUserResourceMappings(ctx, tx, influxdb.UserResourceMappingFilter{
ResourceID: id,
ResourceType: influxdb.TelegrafsResourceType,
})
}); err != nil {
return err
}

return s.deleteTelegrafConfigStats(encodedID, tx)
}

func (s *Service) deleteTelegrafConfigStats(encodedID []byte, tx Tx) error {
bucket, err := s.telegrafPluginsBucket(tx)
if err != nil {
return err
}

if err := bucket.Delete(encodedID); err != nil {
return &influxdb.Error{
Code: influxdb.EInternal,
Msg: fmt.Sprintf("Unable to connect to telegraf config stats service. Please try again; Err: %v", err),
Op: "kv/telegraf",
}
}

return nil
}

// unmarshalTelegraf turns the stored byte slice in the kv into a *influxdb.TelegrafConfig.
Expand All @@ -296,3 +348,11 @@ func marshalTelegraf(tc *influxdb.TelegrafConfig) ([]byte, error) {
}
return v, nil
}

func marshalTelegrafPlugins(plugins map[string]float64) ([]byte, error) {
v, err := json.Marshal(plugins)
if err != nil {
return nil, ErrUnprocessableTelegraf(err)
}
return v, nil
}
22 changes: 22 additions & 0 deletions telegraf.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"fmt"
"regexp"

"github.com/BurntSushi/toml"
"github.com/influxdata/influxdb/telegraf/plugins"
Expand Down Expand Up @@ -70,6 +71,27 @@ type TelegrafConfig struct {
Metadata map[string]interface{} `json:"metadata,omitempty"` // Metadata for the config.
}

var pluginCount = regexp.MustCompilePOSIX(`\[\[(inputs\..*|outputs\..*|aggregators\..*|processors\..*)\]\]`)

// CountPlugins returns a map of the number of times each plugin is used.
func (tc *TelegrafConfig) CountPlugins() map[string]float64 {
plugins := map[string]float64{}
founds := pluginCount.FindAllStringSubmatch(tc.Config, -1)

for _, v := range founds {
if len(v) < 2 {
continue
}
if _, ok := plugins[v[1]]; ok {
plugins[v[1]]++
} else {
plugins[v[1]] = 1
}
}

return plugins
}

// UnmarshalJSON implement the json.Unmarshaler interface.
// Gets called when reading from the kv db. mostly legacy so loading old/stored configs still work.
// May not remove for a while. Primarily will get hit when user views/downloads config.
Expand Down
32 changes: 32 additions & 0 deletions telegraf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -610,3 +610,35 @@ func TestLegacyStruct(t *testing.T) {
t.Fatalf("telegraf config's toml is incorrect, got %+v", tc.Config)
}
}

func TestCountPlugins(t *testing.T) {
tc := TelegrafConfig{
Name: "test",
Config: `
[[inputs.file]]
some = "config"
[[inputs.file]]
some = "config"
[[outputs.influxdb_v2]]
some = "config"
[[inputs.cpu]]
some = "config"
[[outputs.stuff]]
some = "config"
[[aggregators.thing]]
some = "config"
[[processors.thing]]
some = "config"
[[serializers.thing]]
some = "config"
[[inputs.file]]
some = "config"
`,
}

pCount := tc.CountPlugins()

require.Equal(t, 6, len(pCount))

require.Equal(t, float64(3), pCount["inputs.file"])
}
1 change: 1 addition & 0 deletions telemetry/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ var telemetryMatcher = pr.NewMatcher().
Family("influxdb_dashboards_total").
Family("influxdb_scrapers_total").
Family("influxdb_telegrafs_total").
Family("influxdb_telegraf_plugins_count").
Family("task_scheduler_claims_active"). // Count of currently active tasks
/*
* Count of API requests including success and failure
Expand Down

0 comments on commit e593119

Please sign in to comment.