From feb459c785de6bc4f780bd25482b4c8105c35cd7 Mon Sep 17 00:00:00 2001 From: Sam Arnold Date: Tue, 23 Nov 2021 10:11:22 -0500 Subject: [PATCH] feat: metrics for cache subsystem (#22915) * fix: drop complicated cache metrics and document remaining * feat: metrics for cache --- cmd/influxd/inspect/build_tsi/build_tsi.go | 2 +- tsdb/engine/tsm1/cache.go | 206 ++++++++++----------- tsdb/engine/tsm1/cache_race_test.go | 6 +- tsdb/engine/tsm1/cache_test.go | 70 +++---- tsdb/engine/tsm1/compact_test.go | 8 +- tsdb/engine/tsm1/engine.go | 19 +- tsdb/engine/tsm1/file_store.go | 7 + 7 files changed, 151 insertions(+), 167 deletions(-) diff --git a/cmd/influxd/inspect/build_tsi/build_tsi.go b/cmd/influxd/inspect/build_tsi/build_tsi.go index 5bb26df0a99..e24a65efe50 100644 --- a/cmd/influxd/inspect/build_tsi/build_tsi.go +++ b/cmd/influxd/inspect/build_tsi/build_tsi.go @@ -441,7 +441,7 @@ func IndexShard(sfile *tsdb.SeriesFile, dataDir, walDir string, maxLogFileSize i } } else { log.Debug("Building cache from wal files") - cache := tsm1.NewCache(maxCacheSize) + cache := tsm1.NewCache(maxCacheSize, tsm1.EngineTags{}) // tags are for metrics only loader := tsm1.NewCacheLoader(walPaths) loader.WithLogger(log) if err := loader.Load(cache); err != nil { diff --git a/tsdb/engine/tsm1/cache.go b/tsdb/engine/tsm1/cache.go index 44a6e1aba54..be5c0514a41 100644 --- a/tsdb/engine/tsm1/cache.go +++ b/tsdb/engine/tsm1/cache.go @@ -11,6 +11,7 @@ import ( "github.com/influxdata/influxdb/v2/models" "github.com/influxdata/influxdb/v2/tsdb" "github.com/influxdata/influxql" + "github.com/prometheus/client_golang/prometheus" "go.uber.org/zap" ) @@ -143,25 +144,6 @@ func (e *entry) InfluxQLType() (influxql.DataType, error) { return e.values.InfluxQLType() } -// Statistics gathered by the Cache. -const ( - // levels - point in time measures - - statCacheMemoryBytes = "memBytes" // level: Size of in-memory cache in bytes - statCacheDiskBytes = "diskBytes" // level: Size of on-disk snapshots in bytes - statSnapshots = "snapshotCount" // level: Number of active snapshots. - statCacheAgeMs = "cacheAgeMs" // level: Number of milliseconds since cache was last snapshoted at sample time - - // counters - accumulative measures - - statCachedBytes = "cachedBytes" // counter: Total number of bytes written into snapshots. - statWALCompactionTimeMs = "WALCompactionTimeMs" // counter: Total number of milliseconds spent compacting snapshots - - statCacheWriteOK = "writeOk" - statCacheWriteErr = "writeErr" - statCacheWriteDropped = "writeDropped" -) - // storer is the interface that descibes a cache's store. type storer interface { entry(key []byte) *entry // Get an entry by its key. @@ -196,8 +178,7 @@ type Cache struct { // This number is the number of pending or failed WriteSnaphot attempts since the last successful one. snapshotAttempts int - stats *CacheStatistics - lastSnapshot time.Time + stats *cacheMetrics lastWriteTime time.Time // A one time synchronization used to initial the cache with a store. Since the store can allocate a @@ -208,52 +189,103 @@ type Cache struct { // NewCache returns an instance of a cache which will use a maximum of maxSize bytes of memory. // Only used for engine caches, never for snapshots. -func NewCache(maxSize uint64) *Cache { +// Note tags are for metrics only, so if metrics are not desired tags do not have to be set. +func NewCache(maxSize uint64, tags EngineTags) *Cache { c := &Cache{ - maxSize: maxSize, - store: emptyStore{}, - stats: &CacheStatistics{}, - lastSnapshot: time.Now(), + maxSize: maxSize, + store: emptyStore{}, + stats: newCacheMetrics(tags), } + c.stats.LastSnapshot.SetToCurrentTime() c.initialize.Store(&sync.Once{}) - c.UpdateAge() - c.UpdateCompactTime(0) - c.updateCachedBytes(0) - c.updateMemSize(0) - c.updateSnapshots() return c } -// CacheStatistics hold statistics related to the cache. -type CacheStatistics struct { - MemSizeBytes int64 - DiskSizeBytes int64 - SnapshotCount int64 - CacheAgeMs int64 - CachedBytes int64 - WALCompactionTimeMs int64 - WriteOK int64 - WriteErr int64 - WriteDropped int64 -} - -// Statistics returns statistics for periodic monitoring. -func (c *Cache) Statistics(tags map[string]string) []models.Statistic { - return []models.Statistic{{ - Name: "tsm1_cache", - Tags: tags, - Values: map[string]interface{}{ - statCacheMemoryBytes: atomic.LoadInt64(&c.stats.MemSizeBytes), - statCacheDiskBytes: atomic.LoadInt64(&c.stats.DiskSizeBytes), - statSnapshots: atomic.LoadInt64(&c.stats.SnapshotCount), - statCacheAgeMs: atomic.LoadInt64(&c.stats.CacheAgeMs), - statCachedBytes: atomic.LoadInt64(&c.stats.CachedBytes), - statWALCompactionTimeMs: atomic.LoadInt64(&c.stats.WALCompactionTimeMs), - statCacheWriteOK: atomic.LoadInt64(&c.stats.WriteOK), - statCacheWriteErr: atomic.LoadInt64(&c.stats.WriteErr), - statCacheWriteDropped: atomic.LoadInt64(&c.stats.WriteDropped), - }, - }} +var globalCacheMetrics = newAllCacheMetrics() + +const cacheSubsystem = "cache" + +type allCacheMetrics struct { + MemBytes *prometheus.GaugeVec + DiskBytes *prometheus.GaugeVec + LastSnapshot *prometheus.GaugeVec + Writes *prometheus.CounterVec + WriteErr *prometheus.CounterVec + WriteDropped *prometheus.CounterVec +} + +type cacheMetrics struct { + MemBytes prometheus.Gauge + DiskBytes prometheus.Gauge + LastSnapshot prometheus.Gauge + Writes prometheus.Counter + WriteErr prometheus.Counter + WriteDropped prometheus.Counter +} + +func newAllCacheMetrics() *allCacheMetrics { + labels := engineLabelNames() + return &allCacheMetrics{ + MemBytes: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: storageNamespace, + Subsystem: cacheSubsystem, + Name: "inuse_bytes", + Help: "Gauge of current memory consumption of cache", + }, labels), + DiskBytes: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: storageNamespace, + Subsystem: cacheSubsystem, + Name: "disk_bytes", + Help: "Gauge of size of most recent snapshot", + }, labels), + LastSnapshot: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: storageNamespace, + Subsystem: cacheSubsystem, + Name: "latest_snapshot", + Help: "Unix time of most recent snapshot", + }, labels), + Writes: prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: storageNamespace, + Subsystem: cacheSubsystem, + Name: "writes_total", + Help: "Counter of all writes to cache", + }, labels), + WriteErr: prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: storageNamespace, + Subsystem: cacheSubsystem, + Name: "writes_err", + Help: "Counter of failed writes to cache", + }, labels), + WriteDropped: prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: storageNamespace, + Subsystem: cacheSubsystem, + Name: "writes_dropped", + Help: "Counter of writes to cache with some dropped points", + }, labels), + } +} + +func CacheCollectors() []prometheus.Collector { + return []prometheus.Collector{ + globalCacheMetrics.MemBytes, + globalCacheMetrics.DiskBytes, + globalCacheMetrics.LastSnapshot, + globalCacheMetrics.Writes, + globalCacheMetrics.WriteErr, + globalCacheMetrics.WriteDropped, + } +} + +func newCacheMetrics(tags EngineTags) *cacheMetrics { + labels := tags.getLabels() + return &cacheMetrics{ + MemBytes: globalCacheMetrics.MemBytes.With(labels), + DiskBytes: globalCacheMetrics.DiskBytes.With(labels), + LastSnapshot: globalCacheMetrics.LastSnapshot.With(labels), + Writes: globalCacheMetrics.Writes.With(labels), + WriteErr: globalCacheMetrics.WriteErr.With(labels), + WriteDropped: globalCacheMetrics.WriteDropped.With(labels), + } } // init initializes the cache and allocates the underlying store. Once initialized, @@ -286,6 +318,7 @@ func (c *Cache) Free() { // error will be returned. func (c *Cache) WriteMulti(values map[string][]Value) error { c.init() + c.stats.Writes.Inc() var addedSize uint64 for _, v := range values { addedSize += uint64(Values(v).Size()) @@ -295,7 +328,7 @@ func (c *Cache) WriteMulti(values map[string][]Value) error { limit := c.maxSize // maxSize is safe for reading without a lock. n := c.Size() + addedSize if limit > 0 && n > limit { - atomic.AddInt64(&c.stats.WriteErr, 1) + c.stats.WriteErr.Inc() return ErrCacheMemorySizeLimitExceeded(n, limit) } @@ -323,13 +356,12 @@ func (c *Cache) WriteMulti(values map[string][]Value) error { // Some points in the batch were dropped. An error is returned so // error stat is incremented as well. if werr != nil { - atomic.AddInt64(&c.stats.WriteDropped, 1) - atomic.AddInt64(&c.stats.WriteErr, 1) + c.stats.WriteDropped.Inc() + c.stats.WriteErr.Inc() } // Update the memory size stat - c.updateMemSize(int64(addedSize)) - atomic.AddInt64(&c.stats.WriteOK, 1) + c.stats.MemBytes.Set(float64(c.Size())) c.mu.Lock() c.lastWriteTime = time.Now() @@ -382,10 +414,7 @@ func (c *Cache) Snapshot() (*Cache, error) { // Reset the cache's store. c.store.reset() atomic.StoreUint64(&c.size, 0) - c.lastSnapshot = time.Now() - - c.updateCachedBytes(snapshotSize) // increment the number of bytes added to the snapshot - c.updateSnapshots() + c.stats.LastSnapshot.SetToCurrentTime() return c.snapshot, nil } @@ -423,16 +452,15 @@ func (c *Cache) ClearSnapshot(success bool) { if success { c.snapshotAttempts = 0 - c.updateMemSize(-int64(atomic.LoadUint64(&c.snapshotSize))) // decrement the number of bytes in cache // Reset the snapshot to a fresh Cache. c.snapshot = &Cache{ store: c.snapshot.store, } - + c.stats.DiskBytes.Set(float64(atomic.LoadUint64(&c.snapshotSize))) atomic.StoreUint64(&c.snapshotSize, 0) - c.updateSnapshots() } + c.stats.MemBytes.Set(float64(c.Size())) } // Size returns the number of point-calcuated bytes the cache currently uses. @@ -613,7 +641,7 @@ func (c *Cache) DeleteRange(keys [][]byte, min, max int64) { c.decreaseSize(origSize - uint64(e.size())) } - atomic.StoreInt64(&c.stats.MemSizeBytes, int64(c.Size())) + c.stats.MemBytes.Set(float64(c.Size())) } // SetMaxSize updates the memory limit of the cache. @@ -740,29 +768,6 @@ func (c *Cache) LastWriteTime() time.Time { return c.lastWriteTime } -// UpdateAge updates the age statistic based on the current time. -func (c *Cache) UpdateAge() { - c.mu.RLock() - defer c.mu.RUnlock() - ageStat := int64(time.Since(c.lastSnapshot) / time.Millisecond) - atomic.StoreInt64(&c.stats.CacheAgeMs, ageStat) -} - -// UpdateCompactTime updates WAL compaction time statistic based on d. -func (c *Cache) UpdateCompactTime(d time.Duration) { - atomic.AddInt64(&c.stats.WALCompactionTimeMs, int64(d/time.Millisecond)) -} - -// updateCachedBytes increases the cachedBytes counter by b. -func (c *Cache) updateCachedBytes(b uint64) { - atomic.AddInt64(&c.stats.CachedBytes, int64(b)) -} - -// updateMemSize updates the memSize level by b. -func (c *Cache) updateMemSize(b int64) { - atomic.AddInt64(&c.stats.MemSizeBytes, b) -} - const ( valueTypeUndefined = 0 valueTypeFloat64 = 1 @@ -789,13 +794,6 @@ func valueType(v Value) byte { } } -// updateSnapshots updates the snapshotsCount and the diskSize levels. -func (c *Cache) updateSnapshots() { - // Update disk stats - atomic.StoreInt64(&c.stats.DiskSizeBytes, int64(atomic.LoadUint64(&c.snapshotSize))) - atomic.StoreInt64(&c.stats.SnapshotCount, int64(c.snapshotAttempts)) -} - type emptyStore struct{} func (e emptyStore) entry(key []byte) *entry { return nil } diff --git a/tsdb/engine/tsm1/cache_race_test.go b/tsdb/engine/tsm1/cache_race_test.go index 3393eb98b13..6e6830424e4 100644 --- a/tsdb/engine/tsm1/cache_race_test.go +++ b/tsdb/engine/tsm1/cache_race_test.go @@ -26,7 +26,7 @@ func TestCacheCheckConcurrentReadsAreSafe(t *testing.T) { } wg := sync.WaitGroup{} - c := tsm1.NewCache(1000000) + c := tsm1.NewCache(1000000, tsm1.EngineTags{}) ch := make(chan struct{}) for _, s := range series { @@ -71,7 +71,7 @@ func TestCacheRace(t *testing.T) { } wg := sync.WaitGroup{} - c := tsm1.NewCache(1000000) + c := tsm1.NewCache(1000000, tsm1.EngineTags{}) ch := make(chan struct{}) for _, s := range series { @@ -136,7 +136,7 @@ func TestCacheRace2Compacters(t *testing.T) { } wg := sync.WaitGroup{} - c := tsm1.NewCache(1000000) + c := tsm1.NewCache(1000000, tsm1.EngineTags{}) ch := make(chan struct{}) for _, s := range series { diff --git a/tsdb/engine/tsm1/cache_test.go b/tsdb/engine/tsm1/cache_test.go index bfb46f67cea..3dafba46e04 100644 --- a/tsdb/engine/tsm1/cache_test.go +++ b/tsdb/engine/tsm1/cache_test.go @@ -23,7 +23,7 @@ func (c *Cache) Write(key []byte, values []Value) error { } func TestCache_NewCache(t *testing.T) { - c := NewCache(100) + c := NewCache(100, EngineTags{}) if c == nil { t.Fatalf("failed to create new cache") } @@ -46,7 +46,7 @@ func TestCache_CacheWriteMulti(t *testing.T) { values := Values{v0, v1, v2} valuesSize := uint64(v0.Size() + v1.Size() + v2.Size()) - c := NewCache(30 * valuesSize) + c := NewCache(30*valuesSize, EngineTags{}) if err := c.WriteMulti(map[string][]Value{"foo": values, "bar": values}); err != nil { t.Fatalf("failed to write key foo to cache: %s", err.Error()) @@ -63,7 +63,7 @@ func TestCache_CacheWriteMulti(t *testing.T) { // Tests that the cache stats and size are correctly maintained during writes. func TestCache_WriteMulti_Stats(t *testing.T) { limit := uint64(1) - c := NewCache(limit) + c := NewCache(limit, EngineTags{}) ms := NewTestStore() c.store = ms @@ -75,7 +75,7 @@ func TestCache_WriteMulti_Stats(t *testing.T) { } // Fail one of the values in the write. - c = NewCache(50) + c = NewCache(50, EngineTags{}) c.init() c.store = ms @@ -95,13 +95,6 @@ func TestCache_WriteMulti_Stats(t *testing.T) { if got, exp := c.Size(), uint64(16)+3; got != exp { t.Fatalf("got %v, expected %v", got, exp) } - - // Write stats updated - if got, exp := c.stats.WriteDropped, int64(1); got != exp { - t.Fatalf("got %v, expected %v", got, exp) - } else if got, exp := c.stats.WriteErr, int64(1); got != exp { - t.Fatalf("got %v, expected %v", got, exp) - } } func TestCache_CacheWriteMulti_TypeConflict(t *testing.T) { @@ -111,7 +104,7 @@ func TestCache_CacheWriteMulti_TypeConflict(t *testing.T) { values := Values{v0, v1, v2} valuesSize := uint64(v0.Size() + v1.Size() + v2.Size()) - c := NewCache(3 * valuesSize) + c := NewCache(3*valuesSize, EngineTags{}) if err := c.WriteMulti(map[string][]Value{"foo": values[:1], "bar": values[1:]}); err == nil { t.Fatalf(" expected field type conflict") @@ -133,7 +126,7 @@ func TestCache_Cache_DeleteRange(t *testing.T) { values := Values{v0, v1, v2} valuesSize := uint64(v0.Size() + v1.Size() + v2.Size()) - c := NewCache(30 * valuesSize) + c := NewCache(30*valuesSize, EngineTags{}) if err := c.WriteMulti(map[string][]Value{"foo": values, "bar": values}); err != nil { t.Fatalf("failed to write key foo to cache: %s", err.Error()) @@ -172,7 +165,7 @@ func TestCache_DeleteRange_NoValues(t *testing.T) { values := Values{v0, v1, v2} valuesSize := uint64(v0.Size() + v1.Size() + v2.Size()) - c := NewCache(3 * valuesSize) + c := NewCache(3*valuesSize, EngineTags{}) if err := c.WriteMulti(map[string][]Value{"foo": values}); err != nil { t.Fatalf("failed to write key foo to cache: %s", err.Error()) @@ -207,7 +200,7 @@ func TestCache_DeleteRange_NotSorted(t *testing.T) { values := Values{v0, v1, v2} valuesSize := uint64(v0.Size() + v1.Size() + v2.Size()) - c := NewCache(3 * valuesSize) + c := NewCache(3*valuesSize, EngineTags{}) if err := c.WriteMulti(map[string][]Value{"foo": values}); err != nil { t.Fatalf("failed to write key foo to cache: %s", err.Error()) @@ -242,7 +235,7 @@ func TestCache_Cache_Delete(t *testing.T) { values := Values{v0, v1, v2} valuesSize := uint64(v0.Size() + v1.Size() + v2.Size()) - c := NewCache(30 * valuesSize) + c := NewCache(30*valuesSize, EngineTags{}) if err := c.WriteMulti(map[string][]Value{"foo": values, "bar": values}); err != nil { t.Fatalf("failed to write key foo to cache: %s", err.Error()) @@ -275,7 +268,7 @@ func TestCache_Cache_Delete(t *testing.T) { } func TestCache_Cache_Delete_NonExistent(t *testing.T) { - c := NewCache(1024) + c := NewCache(1024, EngineTags{}) c.Delete([][]byte{[]byte("bar")}) @@ -296,7 +289,7 @@ func TestCache_CacheWriteMulti_Duplicates(t *testing.T) { v5 := NewValue(5, 3.0) values1 := Values{v3, v4, v5} - c := NewCache(0) + c := NewCache(0, EngineTags{}) if err := c.WriteMulti(map[string][]Value{"foo": values0}); err != nil { t.Fatalf("failed to write key foo to cache: %s", err.Error()) @@ -326,7 +319,7 @@ func TestCache_CacheValues(t *testing.T) { v3 := NewValue(1, 1.0) v4 := NewValue(4, 4.0) - c := NewCache(512) + c := NewCache(512, EngineTags{}) if deduped := c.Values([]byte("no such key")); deduped != nil { t.Fatalf("Values returned for no such key") } @@ -354,7 +347,7 @@ func TestCache_CacheSnapshot(t *testing.T) { v6 := NewValue(7, 5.0) v7 := NewValue(2, 5.0) - c := NewCache(512) + c := NewCache(512, EngineTags{}) if err := c.Write([]byte("foo"), Values{v0, v1, v2, v3}); err != nil { t.Fatalf("failed to write 3 values, key foo to cache: %s", err.Error()) } @@ -431,17 +424,13 @@ func TestCache_CacheSnapshot(t *testing.T) { // Tests that Snapshot updates statistics correctly. func TestCache_Snapshot_Stats(t *testing.T) { limit := uint64(16) - c := NewCache(limit) + c := NewCache(limit, EngineTags{}) values := map[string][]Value{"foo": {NewValue(1, 1.0)}} if err := c.WriteMulti(values); err != nil { t.Fatal(err) } - if got, exp := c.stats.MemSizeBytes, int64(16)+3; got != exp { - t.Fatalf("got %v, expected %v", got, exp) - } - _, err := c.Snapshot() if err != nil { t.Fatal(err) @@ -451,19 +440,10 @@ func TestCache_Snapshot_Stats(t *testing.T) { if got, exp := c.Size(), uint64(16)+3; got != exp { t.Fatalf("got %v, expected %v", got, exp) } - - // Cached bytes should have been increased. - if got, exp := c.stats.CachedBytes, int64(16)+3; got != exp { - t.Fatalf("got %v, expected %v", got, exp) - } - - if got, exp := c.stats.MemSizeBytes, int64(16)+3; got != exp { - t.Fatalf("got %v, expected %v", got, exp) - } } func TestCache_CacheEmptySnapshot(t *testing.T) { - c := NewCache(512) + c := NewCache(512, EngineTags{}) // Grab snapshot, and ensure it's as expected. snapshot, err := c.Snapshot() @@ -490,7 +470,7 @@ func TestCache_CacheWriteMemoryExceeded(t *testing.T) { v0 := NewValue(1, 1.0) v1 := NewValue(2, 2.0) - c := NewCache(uint64(v1.Size())) + c := NewCache(uint64(v1.Size()), EngineTags{}) if err := c.Write([]byte("foo"), Values{v0}); err != nil { t.Fatalf("failed to write key foo to cache: %s", err.Error()) @@ -536,7 +516,7 @@ func TestCache_Deduplicate_Concurrent(t *testing.T) { } wg := sync.WaitGroup{} - c := NewCache(1000000) + c := NewCache(1000000, EngineTags{}) wg.Add(1) go func() { @@ -588,7 +568,7 @@ func TestCacheLoader_LoadSingle(t *testing.T) { } // Load the cache using the segment. - cache := NewCache(1024) + cache := NewCache(1024, EngineTags{}) loader := NewCacheLoader([]string{f.Name()}) if err := loader.Load(cache); err != nil { t.Fatalf("failed to load cache: %s", err.Error()) @@ -611,7 +591,7 @@ func TestCacheLoader_LoadSingle(t *testing.T) { } // Reload the cache using the segment. - cache = NewCache(1024) + cache = NewCache(1024, EngineTags{}) loader = NewCacheLoader([]string{f.Name()}) if err := loader.Load(cache); err != nil { t.Fatalf("failed to load cache: %s", err.Error()) @@ -673,7 +653,7 @@ func TestCacheLoader_LoadDouble(t *testing.T) { } // Load the cache using the segments. - cache := NewCache(1024) + cache := NewCache(1024, EngineTags{}) loader := NewCacheLoader([]string{f1.Name(), f2.Name()}) if err := loader.Load(cache); err != nil { t.Fatalf("failed to load cache: %s", err.Error()) @@ -737,7 +717,7 @@ func TestCacheLoader_LoadDeleted(t *testing.T) { } // Load the cache using the segment. - cache := NewCache(1024) + cache := NewCache(1024, EngineTags{}) loader := NewCacheLoader([]string{f.Name()}) if err := loader.Load(cache); err != nil { t.Fatalf("failed to load cache: %s", err.Error()) @@ -749,7 +729,7 @@ func TestCacheLoader_LoadDeleted(t *testing.T) { } // Reload the cache using the segment. - cache = NewCache(1024) + cache = NewCache(1024, EngineTags{}) loader = NewCacheLoader([]string{f.Name()}) if err := loader.Load(cache); err != nil { t.Fatalf("failed to load cache: %s", err.Error()) @@ -768,7 +748,7 @@ func TestCache_Split(t *testing.T) { values := Values{v0, v1, v2} valuesSize := uint64(v0.Size() + v1.Size() + v2.Size()) - c := NewCache(0) + c := NewCache(0, EngineTags{}) if err := c.Write([]byte("foo"), values); err != nil { t.Fatalf("failed to write key foo to cache: %s", err.Error()) @@ -855,7 +835,7 @@ func (s *TestStore) count() int { return s.c var fvSize = uint64(NewValue(1, float64(1)).Size()) func BenchmarkCacheFloatEntries(b *testing.B) { - cache := NewCache(uint64(b.N) * fvSize) + cache := NewCache(uint64(b.N)*fvSize, EngineTags{}) vals := make([][]Value, b.N) for i := 0; i < b.N; i++ { vals[i] = []Value{NewValue(1, float64(i))} @@ -876,7 +856,7 @@ type points struct { func BenchmarkCacheParallelFloatEntries(b *testing.B) { c := b.N * runtime.GOMAXPROCS(0) - cache := NewCache(uint64(c) * fvSize * 10) + cache := NewCache(uint64(c)*fvSize*10, EngineTags{}) vals := make([]points, c) for i := 0; i < c; i++ { v := make([]Value, 10) diff --git a/tsdb/engine/tsm1/compact_test.go b/tsdb/engine/tsm1/compact_test.go index feb2402d9b8..f62358aaf17 100644 --- a/tsdb/engine/tsm1/compact_test.go +++ b/tsdb/engine/tsm1/compact_test.go @@ -29,7 +29,7 @@ func TestCompactor_Snapshot(t *testing.T) { "cpu,host=B#!~#value": {v2, v3}, } - c := tsm1.NewCache(0) + c := tsm1.NewCache(0, tsm1.EngineTags{}) for k, v := range points1 { if err := c.Write([]byte(k), v); err != nil { t.Fatalf("failed to write key foo to cache: %s", err.Error()) @@ -1386,7 +1386,7 @@ func TestCacheKeyIterator_Single(t *testing.T) { "cpu,host=A#!~#value": {v0}, } - c := tsm1.NewCache(0) + c := tsm1.NewCache(0, tsm1.EngineTags{}) for k, v := range writes { if err := c.Write([]byte(k), v); err != nil { @@ -1434,7 +1434,7 @@ func TestCacheKeyIterator_Chunked(t *testing.T) { "cpu,host=A#!~#value": {v0, v1}, } - c := tsm1.NewCache(0) + c := tsm1.NewCache(0, tsm1.EngineTags{}) for k, v := range writes { if err := c.Write([]byte(k), v); err != nil { @@ -1484,7 +1484,7 @@ func TestCacheKeyIterator_Abort(t *testing.T) { "cpu,host=A#!~#value": {v0}, } - c := tsm1.NewCache(0) + c := tsm1.NewCache(0, tsm1.EngineTags{}) for k, v := range writes { if err := c.Write([]byte(k), v); err != nil { diff --git a/tsdb/engine/tsm1/engine.go b/tsdb/engine/tsm1/engine.go index ed01908bdfc..cdeb1017b0f 100644 --- a/tsdb/engine/tsm1/engine.go +++ b/tsdb/engine/tsm1/engine.go @@ -179,7 +179,7 @@ func NewEngine(id uint64, idx tsdb.Index, path string, walPath string, sfile *ts } fs.tsmMMAPWillNeed = opt.Config.TSMWillNeed - cache := NewCache(uint64(opt.Config.CacheMaxMemorySize)) + cache := NewCache(uint64(opt.Config.CacheMaxMemorySize), etags) c := NewCompactor() c.Dir = path @@ -594,14 +594,15 @@ var globalCompactionMetrics *compactionMetrics = newAllCompactionMetrics(engineL // PrometheusCollectors returns all prometheus metrics for the tsm1 package. func PrometheusCollectors() []prometheus.Collector { - return []prometheus.Collector{ + collectors := []prometheus.Collector{ globalCompactionMetrics.Duration, globalCompactionMetrics.Active, globalCompactionMetrics.Failed, globalCompactionMetrics.Queued, - globalFileStoreMetrics.files, - globalFileStoreMetrics.size, } + collectors = append(collectors, FileStoreCollectors()...) + collectors = append(collectors, CacheCollectors()...) + return collectors } const ( @@ -1841,8 +1842,10 @@ func (e *Engine) WriteSnapshot() (err error) { log, logEnd := logger.NewOperation(context.TODO(), e.logger, "Cache snapshot", "tsm1_cache_snapshot") defer func() { elapsed := time.Since(started) - e.Cache.UpdateCompactTime(elapsed) - + if err != nil && err != errCompactionsDisabled { + e.stats.Failed.With(prometheus.Labels{levelKey: levelCache}).Inc() + } + e.stats.Duration.With(prometheus.Labels{levelKey: levelCache}).Observe(elapsed.Seconds()) if err == nil { log.Info("Snapshot for path written", zap.String("path", e.path), zap.Duration("duration", elapsed)) } @@ -1978,16 +1981,12 @@ func (e *Engine) compactCache() { return case <-t.C: - e.Cache.UpdateAge() if e.ShouldCompactCache(time.Now()) { - start := time.Now() e.traceLogger.Info("Compacting cache", zap.String("path", e.path)) err := e.WriteSnapshot() if err != nil && err != errCompactionsDisabled { e.logger.Info("Error writing snapshot", zap.Error(err)) - e.stats.Failed.With(prometheus.Labels{levelKey: levelCache}).Inc() } - e.stats.Duration.With(prometheus.Labels{levelKey: levelCache}).Observe(time.Since(start).Seconds()) } } } diff --git a/tsdb/engine/tsm1/file_store.go b/tsdb/engine/tsm1/file_store.go index 9553a6e0825..2a7161cd69c 100644 --- a/tsdb/engine/tsm1/file_store.go +++ b/tsdb/engine/tsm1/file_store.go @@ -326,6 +326,13 @@ func newAllFileStoreMetrics() *allFileStoreMetrics { } } +func FileStoreCollectors() []prometheus.Collector { + return []prometheus.Collector{ + globalFileStoreMetrics.files, + globalFileStoreMetrics.size, + } +} + func newFileStoreMetrics(tags EngineTags) *fileStoreMetrics { labels := tags.getLabels() return &fileStoreMetrics{