Skip to content

Commit

Permalink
fix(bolt): prevent concurrent access to telegraf plugin metrics cache (
Browse files Browse the repository at this point in the history
  • Loading branch information
danxmoran authored Mar 23, 2021
1 parent 4f535d2 commit 4c59a34
Show file tree
Hide file tree
Showing 3 changed files with 190 additions and 59 deletions.
7 changes: 7 additions & 0 deletions bolt/bbolt.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ type Client struct {
IDGenerator platform.IDGenerator
TokenGenerator platform.TokenGenerator
platform.TimeGenerator

pluginsCollector *pluginMetricsCollector
}

// NewClient returns an instance of a Client.
Expand All @@ -34,6 +36,8 @@ func NewClient(log *zap.Logger) *Client {
IDGenerator: snowflake.NewIDGenerator(),
TokenGenerator: rand.NewTokenGenerator(64),
TimeGenerator: platform.RealTimeGenerator{},
// Refresh telegraf plugin metrics every hour.
pluginsCollector: NewPluginMetricsCollector(time.Minute * 59),
}
}

Expand Down Expand Up @@ -73,6 +77,8 @@ func (c *Client) Open(ctx context.Context) error {
return err
}

c.pluginsCollector.Open(c.db)

c.log.Info("Resources opened", zap.String("path", c.Path))
return nil
}
Expand Down Expand Up @@ -112,6 +118,7 @@ func (c *Client) initialize(ctx context.Context) error {

// Close the connection to the bolt database
func (c *Client) Close() error {
c.pluginsCollector.Close()
if c.db != nil {
return c.db.Close()
}
Expand Down
153 changes: 94 additions & 59 deletions bolt/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package bolt

import (
"encoding/json"
"sync"
"time"

"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -84,38 +85,108 @@ func (c *Client) Describe(ch chan<- *prometheus.Desc) {
ch <- dashboardsDesc
ch <- scrapersDesc
ch <- telegrafsDesc
ch <- telegrafPluginsDesc
ch <- boltWritesDesc
ch <- boltReadsDesc

c.pluginsCollector.Describe(ch)
}

type instaTicker struct {
tick chan struct{}
timeCh <-chan time.Time
type pluginMetricsCollector struct {
ticker *time.Ticker
tickerDone chan struct{}

// cacheMu protects cache
cacheMu sync.RWMutex
cache map[string]float64
}

var (
// ticker is this influx' timer for when to renew the cache of configured plugin metrics.
ticker *instaTicker
// telegrafPlugins is a cache of this influx' metrics of configured plugins.
telegrafPlugins = map[string]float64{}
)
func (c *pluginMetricsCollector) Open(db *bolt.DB) {
go c.pollTelegrafStats(db)
}

func (c *pluginMetricsCollector) pollTelegrafStats(db *bolt.DB) {
for {
select {
case <-c.tickerDone:
return
case <-c.ticker.C:
c.refreshTelegrafStats(db)
}
}
}

// Initialize a simple channel that will instantly "tick",
// backed by a time.Ticker's channel.
func init() {
ticker = &instaTicker{
tick: make(chan struct{}, 1),
timeCh: time.NewTicker(time.Minute * 59).C,
func (c *pluginMetricsCollector) refreshTelegrafStats(db *bolt.DB) {
c.cacheMu.Lock()
defer c.cacheMu.Unlock()

// Check if stats-polling got canceled between the point of receiving
// a tick and grabbing the lock.
select {
case <-c.tickerDone:
return
default:
}

ticker.tick <- struct{}{}
// Clear plugins from last check.
c.cache = map[string]float64{}

go func() {
for range ticker.timeCh {
ticker.tick <- struct{}{}
// Loop through all registered plugins.
_ = db.View(func(tx *bolt.Tx) error {
rawPlugins := [][]byte{}
if err := tx.Bucket(telegrafPluginsBucket).ForEach(func(k, v []byte) error {
rawPlugins = append(rawPlugins, v)
return nil
}); err != nil {
return err
}
}()

for _, v := range rawPlugins {
pStats := map[string]float64{}
if err := json.Unmarshal(v, &pStats); err != nil {
return err
}

for k, v := range pStats {
c.cache[k] += v
}
}

return nil
})
}

func (c *pluginMetricsCollector) Describe(ch chan<- *prometheus.Desc) {
ch <- telegrafPluginsDesc
}

func (c *pluginMetricsCollector) Collect(ch chan<- prometheus.Metric) {
c.cacheMu.RLock()
defer c.cacheMu.RUnlock()

for k, v := range c.cache {
ch <- prometheus.MustNewConstMetric(
telegrafPluginsDesc,
prometheus.GaugeValue,
v,
k, // Adds a label for plugin type.name.
)
}
}

func (c *pluginMetricsCollector) Close() {
// Wait for any already-running cache-refresh procedures to complete.
c.cacheMu.Lock()
defer c.cacheMu.Unlock()

close(c.tickerDone)
}

func NewPluginMetricsCollector(tickDuration time.Duration) *pluginMetricsCollector {
return &pluginMetricsCollector{
ticker: time.NewTicker(tickDuration),
tickerDone: make(chan struct{}),
cache: make(map[string]float64),
}
}

// Collect returns the current state of all metrics of the collector.
Expand Down Expand Up @@ -146,36 +217,7 @@ func (c *Client) Collect(ch chan<- prometheus.Metric) {
telegrafs = tx.Bucket(telegrafBucket).Stats().KeyN
tokens = tx.Bucket(authorizationBucket).Stats().KeyN
users = tx.Bucket(userBucket).Stats().KeyN

// Only process and store telegraf configs once per hour.
select {
case <-ticker.tick:
// Clear plugins from last check.
telegrafPlugins = map[string]float64{}
rawPlugins := [][]byte{}

// Loop through all reported number of plugins in the least intrusive way
// (vs a global map and locking every time a config is updated).
tx.Bucket(telegrafPluginsBucket).ForEach(func(k, v []byte) error {
rawPlugins = append(rawPlugins, v)
return nil
})

for _, v := range rawPlugins {
pStats := map[string]float64{}
if err := json.Unmarshal(v, &pStats); err != nil {
return err
}

for k, v := range pStats {
telegrafPlugins[k] += v
}
}

return nil
default:
return nil
}
return nil
})

ch <- prometheus.MustNewConstMetric(
Expand Down Expand Up @@ -220,12 +262,5 @@ func (c *Client) Collect(ch chan<- prometheus.Metric) {
float64(telegrafs),
)

for k, v := range telegrafPlugins {
ch <- prometheus.MustNewConstMetric(
telegrafPluginsDesc,
prometheus.GaugeValue,
v,
k, // Adds a label for plugin type.name.
)
}
c.pluginsCollector.Collect(ch)
}
89 changes: 89 additions & 0 deletions bolt/metrics_test.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,23 @@
package bolt_test

import (
"context"
"testing"
"time"

"github.com/influxdata/influxdb/v2"
"github.com/influxdata/influxdb/v2/bolt"
"github.com/influxdata/influxdb/v2/kit/prom"
"github.com/influxdata/influxdb/v2/kit/prom/promtest"
"github.com/influxdata/influxdb/v2/kv/migration/all"
telegrafservice "github.com/influxdata/influxdb/v2/telegraf/service"
"github.com/stretchr/testify/require"
"go.uber.org/zap/zaptest"
)

func TestInitialMetrics(t *testing.T) {
t.Parallel()

client, teardown, err := NewTestClient(t)
if err != nil {
t.Fatalf("unable to setup bolt client: %v", err)
Expand Down Expand Up @@ -38,3 +47,83 @@ func TestInitialMetrics(t *testing.T) {
}
}
}

func TestPluginMetrics(t *testing.T) {
t.Parallel()

// Set up a BoltDB, and register a telegraf config.
client, teardown, err := NewTestClient(t)
require.NoError(t, err)
defer teardown()

ctx := context.Background()
log := zaptest.NewLogger(t)
kvStore := bolt.NewKVStore(log, client.Path)
kvStore.WithDB(client.DB())
require.NoError(t, all.Up(ctx, log, kvStore))

tsvc := telegrafservice.New(kvStore)
tconf := influxdb.TelegrafConfig{
Name: "test",
Config: "[[inputs.cpu]]\n[[outputs.influxdb_v2]]",
OrgID: 1,
}
require.NoError(t, tsvc.CreateTelegrafConfig(ctx, &tconf, 1))

// Run a plugin metrics collector with a quicker tick interval than the default.
pluginCollector := bolt.NewPluginMetricsCollector(time.Millisecond)
pluginCollector.Open(client.DB())
defer pluginCollector.Close()

reg := prom.NewRegistry(zaptest.NewLogger(t))
reg.MustRegister(pluginCollector)

// Run a periodic gather in the background.
gatherTick := time.NewTicker(time.Millisecond)
doneCh := make(chan struct{})
defer close(doneCh)

go func() {
for {
select {
case <-doneCh:
return
case <-gatherTick.C:
_, err := reg.Gather()
require.NoError(t, err)
}
}
}()

// Run a few gathers to see if any race conditions are flushed out.
time.Sleep(250 * time.Millisecond)

// Gather plugin metrics and ensure they're correct.
metrics, err := reg.Gather()
require.NoError(t, err)
inCpu := promtest.MustFindMetric(t, metrics, "influxdb_telegraf_plugins_count", map[string]string{"plugin": "inputs.cpu"})
outInfluxDb := promtest.MustFindMetric(t, metrics, "influxdb_telegraf_plugins_count", map[string]string{"plugin": "outputs.influxdb_v2"})
require.Equal(t, 1, int(inCpu.GetGauge().GetValue()))
require.Equal(t, 1, int(outInfluxDb.GetGauge().GetValue()))

// Register some more plugins.
tconf = influxdb.TelegrafConfig{
Name: "test",
Config: "[[inputs.mem]]\n[[outputs.influxdb_v2]]",
OrgID: 1,
}
require.NoError(t, tsvc.CreateTelegrafConfig(ctx, &tconf, 2))

// Let a few more background gathers run.
time.Sleep(250 * time.Millisecond)

// Gather again, and ensure plugin metrics have been updated.
metrics, err = reg.Gather()
require.NoError(t, err)
inCpu = promtest.MustFindMetric(t, metrics, "influxdb_telegraf_plugins_count", map[string]string{"plugin": "inputs.cpu"})
inMem := promtest.MustFindMetric(t, metrics, "influxdb_telegraf_plugins_count", map[string]string{"plugin": "inputs.mem"})
outInfluxDb = promtest.MustFindMetric(t, metrics, "influxdb_telegraf_plugins_count", map[string]string{"plugin": "outputs.influxdb_v2"})
require.Equal(t, 1, int(inCpu.GetGauge().GetValue()))
require.Equal(t, 1, int(inMem.GetGauge().GetValue()))
require.Equal(t, 2, int(outInfluxDb.GetGauge().GetValue()))
}

0 comments on commit 4c59a34

Please sign in to comment.