diff --git a/bolt/bbolt.go b/bolt/bbolt.go index 8a566029956..d1ab6863e00 100644 --- a/bolt/bbolt.go +++ b/bolt/bbolt.go @@ -25,6 +25,8 @@ type Client struct { IDGenerator platform.IDGenerator TokenGenerator platform.TokenGenerator platform.TimeGenerator + + pluginsCollector *pluginMetricsCollector } // NewClient returns an instance of a Client. @@ -34,6 +36,8 @@ func NewClient(log *zap.Logger) *Client { IDGenerator: snowflake.NewIDGenerator(), TokenGenerator: rand.NewTokenGenerator(64), TimeGenerator: platform.RealTimeGenerator{}, + // Refresh telegraf plugin metrics every hour. + pluginsCollector: NewPluginMetricsCollector(time.Minute * 59), } } @@ -73,6 +77,8 @@ func (c *Client) Open(ctx context.Context) error { return err } + c.pluginsCollector.Open(c.db) + c.log.Info("Resources opened", zap.String("path", c.Path)) return nil } @@ -112,6 +118,7 @@ func (c *Client) initialize(ctx context.Context) error { // Close the connection to the bolt database func (c *Client) Close() error { + c.pluginsCollector.Close() if c.db != nil { return c.db.Close() } diff --git a/bolt/metrics.go b/bolt/metrics.go index 535827e4661..6a88e26ae23 100644 --- a/bolt/metrics.go +++ b/bolt/metrics.go @@ -2,6 +2,7 @@ package bolt import ( "encoding/json" + "sync" "time" "github.com/prometheus/client_golang/prometheus" @@ -84,38 +85,108 @@ func (c *Client) Describe(ch chan<- *prometheus.Desc) { ch <- dashboardsDesc ch <- scrapersDesc ch <- telegrafsDesc - ch <- telegrafPluginsDesc ch <- boltWritesDesc ch <- boltReadsDesc + + c.pluginsCollector.Describe(ch) } -type instaTicker struct { - tick chan struct{} - timeCh <-chan time.Time +type pluginMetricsCollector struct { + ticker *time.Ticker + tickerDone chan struct{} + + // cacheMu protects cache + cacheMu sync.RWMutex + cache map[string]float64 } -var ( - // ticker is this influx' timer for when to renew the cache of configured plugin metrics. - ticker *instaTicker - // telegrafPlugins is a cache of this influx' metrics of configured plugins. - telegrafPlugins = map[string]float64{} -) +func (c *pluginMetricsCollector) Open(db *bolt.DB) { + go c.pollTelegrafStats(db) +} + +func (c *pluginMetricsCollector) pollTelegrafStats(db *bolt.DB) { + for { + select { + case <-c.tickerDone: + return + case <-c.ticker.C: + c.refreshTelegrafStats(db) + } + } +} -// Initialize a simple channel that will instantly "tick", -// backed by a time.Ticker's channel. -func init() { - ticker = &instaTicker{ - tick: make(chan struct{}, 1), - timeCh: time.NewTicker(time.Minute * 59).C, +func (c *pluginMetricsCollector) refreshTelegrafStats(db *bolt.DB) { + c.cacheMu.Lock() + defer c.cacheMu.Unlock() + + // Check if stats-polling got canceled between the point of receiving + // a tick and grabbing the lock. + select { + case <-c.tickerDone: + return + default: } - ticker.tick <- struct{}{} + // Clear plugins from last check. + c.cache = map[string]float64{} - go func() { - for range ticker.timeCh { - ticker.tick <- struct{}{} + // Loop through all registered plugins. + _ = db.View(func(tx *bolt.Tx) error { + rawPlugins := [][]byte{} + if err := tx.Bucket(telegrafPluginsBucket).ForEach(func(k, v []byte) error { + rawPlugins = append(rawPlugins, v) + return nil + }); err != nil { + return err } - }() + + for _, v := range rawPlugins { + pStats := map[string]float64{} + if err := json.Unmarshal(v, &pStats); err != nil { + return err + } + + for k, v := range pStats { + c.cache[k] += v + } + } + + return nil + }) +} + +func (c *pluginMetricsCollector) Describe(ch chan<- *prometheus.Desc) { + ch <- telegrafPluginsDesc +} + +func (c *pluginMetricsCollector) Collect(ch chan<- prometheus.Metric) { + c.cacheMu.RLock() + defer c.cacheMu.RUnlock() + + for k, v := range c.cache { + ch <- prometheus.MustNewConstMetric( + telegrafPluginsDesc, + prometheus.GaugeValue, + v, + k, // Adds a label for plugin type.name. + ) + } +} + +func (c *pluginMetricsCollector) Close() { + // Wait for any already-running cache-refresh procedures to complete. + c.cacheMu.Lock() + defer c.cacheMu.Unlock() + + close(c.tickerDone) +} + +func NewPluginMetricsCollector(tickDuration time.Duration) *pluginMetricsCollector { + return &pluginMetricsCollector{ + ticker: time.NewTicker(tickDuration), + tickerDone: make(chan struct{}), + cache: make(map[string]float64), + } } // Collect returns the current state of all metrics of the collector. @@ -146,36 +217,7 @@ func (c *Client) Collect(ch chan<- prometheus.Metric) { telegrafs = tx.Bucket(telegrafBucket).Stats().KeyN tokens = tx.Bucket(authorizationBucket).Stats().KeyN users = tx.Bucket(userBucket).Stats().KeyN - - // Only process and store telegraf configs once per hour. - select { - case <-ticker.tick: - // Clear plugins from last check. - telegrafPlugins = map[string]float64{} - rawPlugins := [][]byte{} - - // Loop through all reported number of plugins in the least intrusive way - // (vs a global map and locking every time a config is updated). - tx.Bucket(telegrafPluginsBucket).ForEach(func(k, v []byte) error { - rawPlugins = append(rawPlugins, v) - return nil - }) - - for _, v := range rawPlugins { - pStats := map[string]float64{} - if err := json.Unmarshal(v, &pStats); err != nil { - return err - } - - for k, v := range pStats { - telegrafPlugins[k] += v - } - } - - return nil - default: - return nil - } + return nil }) ch <- prometheus.MustNewConstMetric( @@ -220,12 +262,5 @@ func (c *Client) Collect(ch chan<- prometheus.Metric) { float64(telegrafs), ) - for k, v := range telegrafPlugins { - ch <- prometheus.MustNewConstMetric( - telegrafPluginsDesc, - prometheus.GaugeValue, - v, - k, // Adds a label for plugin type.name. - ) - } + c.pluginsCollector.Collect(ch) } diff --git a/bolt/metrics_test.go b/bolt/metrics_test.go index d72bdbd82f7..1c177196b61 100644 --- a/bolt/metrics_test.go +++ b/bolt/metrics_test.go @@ -1,14 +1,23 @@ package bolt_test import ( + "context" "testing" + "time" + "github.com/influxdata/influxdb/v2" + "github.com/influxdata/influxdb/v2/bolt" "github.com/influxdata/influxdb/v2/kit/prom" "github.com/influxdata/influxdb/v2/kit/prom/promtest" + "github.com/influxdata/influxdb/v2/kv/migration/all" + telegrafservice "github.com/influxdata/influxdb/v2/telegraf/service" + "github.com/stretchr/testify/require" "go.uber.org/zap/zaptest" ) func TestInitialMetrics(t *testing.T) { + t.Parallel() + client, teardown, err := NewTestClient(t) if err != nil { t.Fatalf("unable to setup bolt client: %v", err) @@ -38,3 +47,83 @@ func TestInitialMetrics(t *testing.T) { } } } + +func TestPluginMetrics(t *testing.T) { + t.Parallel() + + // Set up a BoltDB, and register a telegraf config. + client, teardown, err := NewTestClient(t) + require.NoError(t, err) + defer teardown() + + ctx := context.Background() + log := zaptest.NewLogger(t) + kvStore := bolt.NewKVStore(log, client.Path) + kvStore.WithDB(client.DB()) + require.NoError(t, all.Up(ctx, log, kvStore)) + + tsvc := telegrafservice.New(kvStore) + tconf := influxdb.TelegrafConfig{ + Name: "test", + Config: "[[inputs.cpu]]\n[[outputs.influxdb_v2]]", + OrgID: 1, + } + require.NoError(t, tsvc.CreateTelegrafConfig(ctx, &tconf, 1)) + + // Run a plugin metrics collector with a quicker tick interval than the default. + pluginCollector := bolt.NewPluginMetricsCollector(time.Millisecond) + pluginCollector.Open(client.DB()) + defer pluginCollector.Close() + + reg := prom.NewRegistry(zaptest.NewLogger(t)) + reg.MustRegister(pluginCollector) + + // Run a periodic gather in the background. + gatherTick := time.NewTicker(time.Millisecond) + doneCh := make(chan struct{}) + defer close(doneCh) + + go func() { + for { + select { + case <-doneCh: + return + case <-gatherTick.C: + _, err := reg.Gather() + require.NoError(t, err) + } + } + }() + + // Run a few gathers to see if any race conditions are flushed out. + time.Sleep(250 * time.Millisecond) + + // Gather plugin metrics and ensure they're correct. + metrics, err := reg.Gather() + require.NoError(t, err) + inCpu := promtest.MustFindMetric(t, metrics, "influxdb_telegraf_plugins_count", map[string]string{"plugin": "inputs.cpu"}) + outInfluxDb := promtest.MustFindMetric(t, metrics, "influxdb_telegraf_plugins_count", map[string]string{"plugin": "outputs.influxdb_v2"}) + require.Equal(t, 1, int(inCpu.GetGauge().GetValue())) + require.Equal(t, 1, int(outInfluxDb.GetGauge().GetValue())) + + // Register some more plugins. + tconf = influxdb.TelegrafConfig{ + Name: "test", + Config: "[[inputs.mem]]\n[[outputs.influxdb_v2]]", + OrgID: 1, + } + require.NoError(t, tsvc.CreateTelegrafConfig(ctx, &tconf, 2)) + + // Let a few more background gathers run. + time.Sleep(250 * time.Millisecond) + + // Gather again, and ensure plugin metrics have been updated. + metrics, err = reg.Gather() + require.NoError(t, err) + inCpu = promtest.MustFindMetric(t, metrics, "influxdb_telegraf_plugins_count", map[string]string{"plugin": "inputs.cpu"}) + inMem := promtest.MustFindMetric(t, metrics, "influxdb_telegraf_plugins_count", map[string]string{"plugin": "inputs.mem"}) + outInfluxDb = promtest.MustFindMetric(t, metrics, "influxdb_telegraf_plugins_count", map[string]string{"plugin": "outputs.influxdb_v2"}) + require.Equal(t, 1, int(inCpu.GetGauge().GetValue())) + require.Equal(t, 1, int(inMem.GetGauge().GetValue())) + require.Equal(t, 2, int(outInfluxDb.GetGauge().GetValue())) +}