Skip to content

Commit

Permalink
feat: metrics for cache subsystem (#22915)
Browse files Browse the repository at this point in the history
* fix: drop complicated cache metrics and document remaining

* feat: metrics for cache
lesam authored Nov 23, 2021
1 parent a74e051 commit feb459c
Showing 7 changed files with 151 additions and 167 deletions.
2 changes: 1 addition & 1 deletion cmd/influxd/inspect/build_tsi/build_tsi.go
Original file line number Diff line number Diff line change
@@ -441,7 +441,7 @@ func IndexShard(sfile *tsdb.SeriesFile, dataDir, walDir string, maxLogFileSize i
}
} else {
log.Debug("Building cache from wal files")
cache := tsm1.NewCache(maxCacheSize)
cache := tsm1.NewCache(maxCacheSize, tsm1.EngineTags{}) // tags are for metrics only
loader := tsm1.NewCacheLoader(walPaths)
loader.WithLogger(log)
if err := loader.Load(cache); err != nil {
206 changes: 102 additions & 104 deletions tsdb/engine/tsm1/cache.go
Original file line number Diff line number Diff line change
@@ -11,6 +11,7 @@ import (
"github.com/influxdata/influxdb/v2/models"
"github.com/influxdata/influxdb/v2/tsdb"
"github.com/influxdata/influxql"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"
)

@@ -143,25 +144,6 @@ func (e *entry) InfluxQLType() (influxql.DataType, error) {
return e.values.InfluxQLType()
}

// Statistics gathered by the Cache.
const (
// levels - point in time measures

statCacheMemoryBytes = "memBytes" // level: Size of in-memory cache in bytes
statCacheDiskBytes = "diskBytes" // level: Size of on-disk snapshots in bytes
statSnapshots = "snapshotCount" // level: Number of active snapshots.
statCacheAgeMs = "cacheAgeMs" // level: Number of milliseconds since cache was last snapshoted at sample time

// counters - accumulative measures

statCachedBytes = "cachedBytes" // counter: Total number of bytes written into snapshots.
statWALCompactionTimeMs = "WALCompactionTimeMs" // counter: Total number of milliseconds spent compacting snapshots

statCacheWriteOK = "writeOk"
statCacheWriteErr = "writeErr"
statCacheWriteDropped = "writeDropped"
)

// storer is the interface that descibes a cache's store.
type storer interface {
entry(key []byte) *entry // Get an entry by its key.
@@ -196,8 +178,7 @@ type Cache struct {
// This number is the number of pending or failed WriteSnaphot attempts since the last successful one.
snapshotAttempts int

stats *CacheStatistics
lastSnapshot time.Time
stats *cacheMetrics
lastWriteTime time.Time

// A one time synchronization used to initial the cache with a store. Since the store can allocate a
@@ -208,52 +189,103 @@ type Cache struct {

// NewCache returns an instance of a cache which will use a maximum of maxSize bytes of memory.
// Only used for engine caches, never for snapshots.
func NewCache(maxSize uint64) *Cache {
// Note tags are for metrics only, so if metrics are not desired tags do not have to be set.
func NewCache(maxSize uint64, tags EngineTags) *Cache {
c := &Cache{
maxSize: maxSize,
store: emptyStore{},
stats: &CacheStatistics{},
lastSnapshot: time.Now(),
maxSize: maxSize,
store: emptyStore{},
stats: newCacheMetrics(tags),
}
c.stats.LastSnapshot.SetToCurrentTime()
c.initialize.Store(&sync.Once{})
c.UpdateAge()
c.UpdateCompactTime(0)
c.updateCachedBytes(0)
c.updateMemSize(0)
c.updateSnapshots()
return c
}

// CacheStatistics hold statistics related to the cache.
type CacheStatistics struct {
MemSizeBytes int64
DiskSizeBytes int64
SnapshotCount int64
CacheAgeMs int64
CachedBytes int64
WALCompactionTimeMs int64
WriteOK int64
WriteErr int64
WriteDropped int64
}

// Statistics returns statistics for periodic monitoring.
func (c *Cache) Statistics(tags map[string]string) []models.Statistic {
return []models.Statistic{{
Name: "tsm1_cache",
Tags: tags,
Values: map[string]interface{}{
statCacheMemoryBytes: atomic.LoadInt64(&c.stats.MemSizeBytes),
statCacheDiskBytes: atomic.LoadInt64(&c.stats.DiskSizeBytes),
statSnapshots: atomic.LoadInt64(&c.stats.SnapshotCount),
statCacheAgeMs: atomic.LoadInt64(&c.stats.CacheAgeMs),
statCachedBytes: atomic.LoadInt64(&c.stats.CachedBytes),
statWALCompactionTimeMs: atomic.LoadInt64(&c.stats.WALCompactionTimeMs),
statCacheWriteOK: atomic.LoadInt64(&c.stats.WriteOK),
statCacheWriteErr: atomic.LoadInt64(&c.stats.WriteErr),
statCacheWriteDropped: atomic.LoadInt64(&c.stats.WriteDropped),
},
}}
var globalCacheMetrics = newAllCacheMetrics()

const cacheSubsystem = "cache"

type allCacheMetrics struct {
MemBytes *prometheus.GaugeVec
DiskBytes *prometheus.GaugeVec
LastSnapshot *prometheus.GaugeVec
Writes *prometheus.CounterVec
WriteErr *prometheus.CounterVec
WriteDropped *prometheus.CounterVec
}

type cacheMetrics struct {
MemBytes prometheus.Gauge
DiskBytes prometheus.Gauge
LastSnapshot prometheus.Gauge
Writes prometheus.Counter
WriteErr prometheus.Counter
WriteDropped prometheus.Counter
}

func newAllCacheMetrics() *allCacheMetrics {
labels := engineLabelNames()
return &allCacheMetrics{
MemBytes: prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: storageNamespace,
Subsystem: cacheSubsystem,
Name: "inuse_bytes",
Help: "Gauge of current memory consumption of cache",
}, labels),
DiskBytes: prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: storageNamespace,
Subsystem: cacheSubsystem,
Name: "disk_bytes",
Help: "Gauge of size of most recent snapshot",
}, labels),
LastSnapshot: prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: storageNamespace,
Subsystem: cacheSubsystem,
Name: "latest_snapshot",
Help: "Unix time of most recent snapshot",
}, labels),
Writes: prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: storageNamespace,
Subsystem: cacheSubsystem,
Name: "writes_total",
Help: "Counter of all writes to cache",
}, labels),
WriteErr: prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: storageNamespace,
Subsystem: cacheSubsystem,
Name: "writes_err",
Help: "Counter of failed writes to cache",
}, labels),
WriteDropped: prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: storageNamespace,
Subsystem: cacheSubsystem,
Name: "writes_dropped",
Help: "Counter of writes to cache with some dropped points",
}, labels),
}
}

func CacheCollectors() []prometheus.Collector {
return []prometheus.Collector{
globalCacheMetrics.MemBytes,
globalCacheMetrics.DiskBytes,
globalCacheMetrics.LastSnapshot,
globalCacheMetrics.Writes,
globalCacheMetrics.WriteErr,
globalCacheMetrics.WriteDropped,
}
}

func newCacheMetrics(tags EngineTags) *cacheMetrics {
labels := tags.getLabels()
return &cacheMetrics{
MemBytes: globalCacheMetrics.MemBytes.With(labels),
DiskBytes: globalCacheMetrics.DiskBytes.With(labels),
LastSnapshot: globalCacheMetrics.LastSnapshot.With(labels),
Writes: globalCacheMetrics.Writes.With(labels),
WriteErr: globalCacheMetrics.WriteErr.With(labels),
WriteDropped: globalCacheMetrics.WriteDropped.With(labels),
}
}

// init initializes the cache and allocates the underlying store. Once initialized,
@@ -286,6 +318,7 @@ func (c *Cache) Free() {
// error will be returned.
func (c *Cache) WriteMulti(values map[string][]Value) error {
c.init()
c.stats.Writes.Inc()
var addedSize uint64
for _, v := range values {
addedSize += uint64(Values(v).Size())
@@ -295,7 +328,7 @@ func (c *Cache) WriteMulti(values map[string][]Value) error {
limit := c.maxSize // maxSize is safe for reading without a lock.
n := c.Size() + addedSize
if limit > 0 && n > limit {
atomic.AddInt64(&c.stats.WriteErr, 1)
c.stats.WriteErr.Inc()
return ErrCacheMemorySizeLimitExceeded(n, limit)
}

@@ -323,13 +356,12 @@ func (c *Cache) WriteMulti(values map[string][]Value) error {
// Some points in the batch were dropped. An error is returned so
// error stat is incremented as well.
if werr != nil {
atomic.AddInt64(&c.stats.WriteDropped, 1)
atomic.AddInt64(&c.stats.WriteErr, 1)
c.stats.WriteDropped.Inc()
c.stats.WriteErr.Inc()
}

// Update the memory size stat
c.updateMemSize(int64(addedSize))
atomic.AddInt64(&c.stats.WriteOK, 1)
c.stats.MemBytes.Set(float64(c.Size()))

c.mu.Lock()
c.lastWriteTime = time.Now()
@@ -382,10 +414,7 @@ func (c *Cache) Snapshot() (*Cache, error) {
// Reset the cache's store.
c.store.reset()
atomic.StoreUint64(&c.size, 0)
c.lastSnapshot = time.Now()

c.updateCachedBytes(snapshotSize) // increment the number of bytes added to the snapshot
c.updateSnapshots()
c.stats.LastSnapshot.SetToCurrentTime()

return c.snapshot, nil
}
@@ -423,16 +452,15 @@ func (c *Cache) ClearSnapshot(success bool) {

if success {
c.snapshotAttempts = 0
c.updateMemSize(-int64(atomic.LoadUint64(&c.snapshotSize))) // decrement the number of bytes in cache

// Reset the snapshot to a fresh Cache.
c.snapshot = &Cache{
store: c.snapshot.store,
}

c.stats.DiskBytes.Set(float64(atomic.LoadUint64(&c.snapshotSize)))
atomic.StoreUint64(&c.snapshotSize, 0)
c.updateSnapshots()
}
c.stats.MemBytes.Set(float64(c.Size()))
}

// Size returns the number of point-calcuated bytes the cache currently uses.
@@ -613,7 +641,7 @@ func (c *Cache) DeleteRange(keys [][]byte, min, max int64) {

c.decreaseSize(origSize - uint64(e.size()))
}
atomic.StoreInt64(&c.stats.MemSizeBytes, int64(c.Size()))
c.stats.MemBytes.Set(float64(c.Size()))
}

// SetMaxSize updates the memory limit of the cache.
@@ -740,29 +768,6 @@ func (c *Cache) LastWriteTime() time.Time {
return c.lastWriteTime
}

// UpdateAge updates the age statistic based on the current time.
func (c *Cache) UpdateAge() {
c.mu.RLock()
defer c.mu.RUnlock()
ageStat := int64(time.Since(c.lastSnapshot) / time.Millisecond)
atomic.StoreInt64(&c.stats.CacheAgeMs, ageStat)
}

// UpdateCompactTime updates WAL compaction time statistic based on d.
func (c *Cache) UpdateCompactTime(d time.Duration) {
atomic.AddInt64(&c.stats.WALCompactionTimeMs, int64(d/time.Millisecond))
}

// updateCachedBytes increases the cachedBytes counter by b.
func (c *Cache) updateCachedBytes(b uint64) {
atomic.AddInt64(&c.stats.CachedBytes, int64(b))
}

// updateMemSize updates the memSize level by b.
func (c *Cache) updateMemSize(b int64) {
atomic.AddInt64(&c.stats.MemSizeBytes, b)
}

const (
valueTypeUndefined = 0
valueTypeFloat64 = 1
@@ -789,13 +794,6 @@ func valueType(v Value) byte {
}
}

// updateSnapshots updates the snapshotsCount and the diskSize levels.
func (c *Cache) updateSnapshots() {
// Update disk stats
atomic.StoreInt64(&c.stats.DiskSizeBytes, int64(atomic.LoadUint64(&c.snapshotSize)))
atomic.StoreInt64(&c.stats.SnapshotCount, int64(c.snapshotAttempts))
}

type emptyStore struct{}

func (e emptyStore) entry(key []byte) *entry { return nil }
6 changes: 3 additions & 3 deletions tsdb/engine/tsm1/cache_race_test.go
Original file line number Diff line number Diff line change
@@ -26,7 +26,7 @@ func TestCacheCheckConcurrentReadsAreSafe(t *testing.T) {
}

wg := sync.WaitGroup{}
c := tsm1.NewCache(1000000)
c := tsm1.NewCache(1000000, tsm1.EngineTags{})

ch := make(chan struct{})
for _, s := range series {
@@ -71,7 +71,7 @@ func TestCacheRace(t *testing.T) {
}

wg := sync.WaitGroup{}
c := tsm1.NewCache(1000000)
c := tsm1.NewCache(1000000, tsm1.EngineTags{})

ch := make(chan struct{})
for _, s := range series {
@@ -136,7 +136,7 @@ func TestCacheRace2Compacters(t *testing.T) {
}

wg := sync.WaitGroup{}
c := tsm1.NewCache(1000000)
c := tsm1.NewCache(1000000, tsm1.EngineTags{})

ch := make(chan struct{})
for _, s := range series {
70 changes: 25 additions & 45 deletions tsdb/engine/tsm1/cache_test.go
Original file line number Diff line number Diff line change
@@ -23,7 +23,7 @@ func (c *Cache) Write(key []byte, values []Value) error {
}

func TestCache_NewCache(t *testing.T) {
c := NewCache(100)
c := NewCache(100, EngineTags{})
if c == nil {
t.Fatalf("failed to create new cache")
}
@@ -46,7 +46,7 @@ func TestCache_CacheWriteMulti(t *testing.T) {
values := Values{v0, v1, v2}
valuesSize := uint64(v0.Size() + v1.Size() + v2.Size())

c := NewCache(30 * valuesSize)
c := NewCache(30*valuesSize, EngineTags{})

if err := c.WriteMulti(map[string][]Value{"foo": values, "bar": values}); err != nil {
t.Fatalf("failed to write key foo to cache: %s", err.Error())
@@ -63,7 +63,7 @@ func TestCache_CacheWriteMulti(t *testing.T) {
// Tests that the cache stats and size are correctly maintained during writes.
func TestCache_WriteMulti_Stats(t *testing.T) {
limit := uint64(1)
c := NewCache(limit)
c := NewCache(limit, EngineTags{})
ms := NewTestStore()
c.store = ms

@@ -75,7 +75,7 @@ func TestCache_WriteMulti_Stats(t *testing.T) {
}

// Fail one of the values in the write.
c = NewCache(50)
c = NewCache(50, EngineTags{})
c.init()
c.store = ms

@@ -95,13 +95,6 @@ func TestCache_WriteMulti_Stats(t *testing.T) {
if got, exp := c.Size(), uint64(16)+3; got != exp {
t.Fatalf("got %v, expected %v", got, exp)
}

// Write stats updated
if got, exp := c.stats.WriteDropped, int64(1); got != exp {
t.Fatalf("got %v, expected %v", got, exp)
} else if got, exp := c.stats.WriteErr, int64(1); got != exp {
t.Fatalf("got %v, expected %v", got, exp)
}
}

func TestCache_CacheWriteMulti_TypeConflict(t *testing.T) {
@@ -111,7 +104,7 @@ func TestCache_CacheWriteMulti_TypeConflict(t *testing.T) {
values := Values{v0, v1, v2}
valuesSize := uint64(v0.Size() + v1.Size() + v2.Size())

c := NewCache(3 * valuesSize)
c := NewCache(3*valuesSize, EngineTags{})

if err := c.WriteMulti(map[string][]Value{"foo": values[:1], "bar": values[1:]}); err == nil {
t.Fatalf(" expected field type conflict")
@@ -133,7 +126,7 @@ func TestCache_Cache_DeleteRange(t *testing.T) {
values := Values{v0, v1, v2}
valuesSize := uint64(v0.Size() + v1.Size() + v2.Size())

c := NewCache(30 * valuesSize)
c := NewCache(30*valuesSize, EngineTags{})

if err := c.WriteMulti(map[string][]Value{"foo": values, "bar": values}); err != nil {
t.Fatalf("failed to write key foo to cache: %s", err.Error())
@@ -172,7 +165,7 @@ func TestCache_DeleteRange_NoValues(t *testing.T) {
values := Values{v0, v1, v2}
valuesSize := uint64(v0.Size() + v1.Size() + v2.Size())

c := NewCache(3 * valuesSize)
c := NewCache(3*valuesSize, EngineTags{})

if err := c.WriteMulti(map[string][]Value{"foo": values}); err != nil {
t.Fatalf("failed to write key foo to cache: %s", err.Error())
@@ -207,7 +200,7 @@ func TestCache_DeleteRange_NotSorted(t *testing.T) {
values := Values{v0, v1, v2}
valuesSize := uint64(v0.Size() + v1.Size() + v2.Size())

c := NewCache(3 * valuesSize)
c := NewCache(3*valuesSize, EngineTags{})

if err := c.WriteMulti(map[string][]Value{"foo": values}); err != nil {
t.Fatalf("failed to write key foo to cache: %s", err.Error())
@@ -242,7 +235,7 @@ func TestCache_Cache_Delete(t *testing.T) {
values := Values{v0, v1, v2}
valuesSize := uint64(v0.Size() + v1.Size() + v2.Size())

c := NewCache(30 * valuesSize)
c := NewCache(30*valuesSize, EngineTags{})

if err := c.WriteMulti(map[string][]Value{"foo": values, "bar": values}); err != nil {
t.Fatalf("failed to write key foo to cache: %s", err.Error())
@@ -275,7 +268,7 @@ func TestCache_Cache_Delete(t *testing.T) {
}

func TestCache_Cache_Delete_NonExistent(t *testing.T) {
c := NewCache(1024)
c := NewCache(1024, EngineTags{})

c.Delete([][]byte{[]byte("bar")})

@@ -296,7 +289,7 @@ func TestCache_CacheWriteMulti_Duplicates(t *testing.T) {
v5 := NewValue(5, 3.0)
values1 := Values{v3, v4, v5}

c := NewCache(0)
c := NewCache(0, EngineTags{})

if err := c.WriteMulti(map[string][]Value{"foo": values0}); err != nil {
t.Fatalf("failed to write key foo to cache: %s", err.Error())
@@ -326,7 +319,7 @@ func TestCache_CacheValues(t *testing.T) {
v3 := NewValue(1, 1.0)
v4 := NewValue(4, 4.0)

c := NewCache(512)
c := NewCache(512, EngineTags{})
if deduped := c.Values([]byte("no such key")); deduped != nil {
t.Fatalf("Values returned for no such key")
}
@@ -354,7 +347,7 @@ func TestCache_CacheSnapshot(t *testing.T) {
v6 := NewValue(7, 5.0)
v7 := NewValue(2, 5.0)

c := NewCache(512)
c := NewCache(512, EngineTags{})
if err := c.Write([]byte("foo"), Values{v0, v1, v2, v3}); err != nil {
t.Fatalf("failed to write 3 values, key foo to cache: %s", err.Error())
}
@@ -431,17 +424,13 @@ func TestCache_CacheSnapshot(t *testing.T) {
// Tests that Snapshot updates statistics correctly.
func TestCache_Snapshot_Stats(t *testing.T) {
limit := uint64(16)
c := NewCache(limit)
c := NewCache(limit, EngineTags{})

values := map[string][]Value{"foo": {NewValue(1, 1.0)}}
if err := c.WriteMulti(values); err != nil {
t.Fatal(err)
}

if got, exp := c.stats.MemSizeBytes, int64(16)+3; got != exp {
t.Fatalf("got %v, expected %v", got, exp)
}

_, err := c.Snapshot()
if err != nil {
t.Fatal(err)
@@ -451,19 +440,10 @@ func TestCache_Snapshot_Stats(t *testing.T) {
if got, exp := c.Size(), uint64(16)+3; got != exp {
t.Fatalf("got %v, expected %v", got, exp)
}

// Cached bytes should have been increased.
if got, exp := c.stats.CachedBytes, int64(16)+3; got != exp {
t.Fatalf("got %v, expected %v", got, exp)
}

if got, exp := c.stats.MemSizeBytes, int64(16)+3; got != exp {
t.Fatalf("got %v, expected %v", got, exp)
}
}

func TestCache_CacheEmptySnapshot(t *testing.T) {
c := NewCache(512)
c := NewCache(512, EngineTags{})

// Grab snapshot, and ensure it's as expected.
snapshot, err := c.Snapshot()
@@ -490,7 +470,7 @@ func TestCache_CacheWriteMemoryExceeded(t *testing.T) {
v0 := NewValue(1, 1.0)
v1 := NewValue(2, 2.0)

c := NewCache(uint64(v1.Size()))
c := NewCache(uint64(v1.Size()), EngineTags{})

if err := c.Write([]byte("foo"), Values{v0}); err != nil {
t.Fatalf("failed to write key foo to cache: %s", err.Error())
@@ -536,7 +516,7 @@ func TestCache_Deduplicate_Concurrent(t *testing.T) {
}

wg := sync.WaitGroup{}
c := NewCache(1000000)
c := NewCache(1000000, EngineTags{})

wg.Add(1)
go func() {
@@ -588,7 +568,7 @@ func TestCacheLoader_LoadSingle(t *testing.T) {
}

// Load the cache using the segment.
cache := NewCache(1024)
cache := NewCache(1024, EngineTags{})
loader := NewCacheLoader([]string{f.Name()})
if err := loader.Load(cache); err != nil {
t.Fatalf("failed to load cache: %s", err.Error())
@@ -611,7 +591,7 @@ func TestCacheLoader_LoadSingle(t *testing.T) {
}

// Reload the cache using the segment.
cache = NewCache(1024)
cache = NewCache(1024, EngineTags{})
loader = NewCacheLoader([]string{f.Name()})
if err := loader.Load(cache); err != nil {
t.Fatalf("failed to load cache: %s", err.Error())
@@ -673,7 +653,7 @@ func TestCacheLoader_LoadDouble(t *testing.T) {
}

// Load the cache using the segments.
cache := NewCache(1024)
cache := NewCache(1024, EngineTags{})
loader := NewCacheLoader([]string{f1.Name(), f2.Name()})
if err := loader.Load(cache); err != nil {
t.Fatalf("failed to load cache: %s", err.Error())
@@ -737,7 +717,7 @@ func TestCacheLoader_LoadDeleted(t *testing.T) {
}

// Load the cache using the segment.
cache := NewCache(1024)
cache := NewCache(1024, EngineTags{})
loader := NewCacheLoader([]string{f.Name()})
if err := loader.Load(cache); err != nil {
t.Fatalf("failed to load cache: %s", err.Error())
@@ -749,7 +729,7 @@ func TestCacheLoader_LoadDeleted(t *testing.T) {
}

// Reload the cache using the segment.
cache = NewCache(1024)
cache = NewCache(1024, EngineTags{})
loader = NewCacheLoader([]string{f.Name()})
if err := loader.Load(cache); err != nil {
t.Fatalf("failed to load cache: %s", err.Error())
@@ -768,7 +748,7 @@ func TestCache_Split(t *testing.T) {
values := Values{v0, v1, v2}
valuesSize := uint64(v0.Size() + v1.Size() + v2.Size())

c := NewCache(0)
c := NewCache(0, EngineTags{})

if err := c.Write([]byte("foo"), values); err != nil {
t.Fatalf("failed to write key foo to cache: %s", err.Error())
@@ -855,7 +835,7 @@ func (s *TestStore) count() int { return s.c
var fvSize = uint64(NewValue(1, float64(1)).Size())

func BenchmarkCacheFloatEntries(b *testing.B) {
cache := NewCache(uint64(b.N) * fvSize)
cache := NewCache(uint64(b.N)*fvSize, EngineTags{})
vals := make([][]Value, b.N)
for i := 0; i < b.N; i++ {
vals[i] = []Value{NewValue(1, float64(i))}
@@ -876,7 +856,7 @@ type points struct {

func BenchmarkCacheParallelFloatEntries(b *testing.B) {
c := b.N * runtime.GOMAXPROCS(0)
cache := NewCache(uint64(c) * fvSize * 10)
cache := NewCache(uint64(c)*fvSize*10, EngineTags{})
vals := make([]points, c)
for i := 0; i < c; i++ {
v := make([]Value, 10)
8 changes: 4 additions & 4 deletions tsdb/engine/tsm1/compact_test.go
Original file line number Diff line number Diff line change
@@ -29,7 +29,7 @@ func TestCompactor_Snapshot(t *testing.T) {
"cpu,host=B#!~#value": {v2, v3},
}

c := tsm1.NewCache(0)
c := tsm1.NewCache(0, tsm1.EngineTags{})
for k, v := range points1 {
if err := c.Write([]byte(k), v); err != nil {
t.Fatalf("failed to write key foo to cache: %s", err.Error())
@@ -1386,7 +1386,7 @@ func TestCacheKeyIterator_Single(t *testing.T) {
"cpu,host=A#!~#value": {v0},
}

c := tsm1.NewCache(0)
c := tsm1.NewCache(0, tsm1.EngineTags{})

for k, v := range writes {
if err := c.Write([]byte(k), v); err != nil {
@@ -1434,7 +1434,7 @@ func TestCacheKeyIterator_Chunked(t *testing.T) {
"cpu,host=A#!~#value": {v0, v1},
}

c := tsm1.NewCache(0)
c := tsm1.NewCache(0, tsm1.EngineTags{})

for k, v := range writes {
if err := c.Write([]byte(k), v); err != nil {
@@ -1484,7 +1484,7 @@ func TestCacheKeyIterator_Abort(t *testing.T) {
"cpu,host=A#!~#value": {v0},
}

c := tsm1.NewCache(0)
c := tsm1.NewCache(0, tsm1.EngineTags{})

for k, v := range writes {
if err := c.Write([]byte(k), v); err != nil {
19 changes: 9 additions & 10 deletions tsdb/engine/tsm1/engine.go
Original file line number Diff line number Diff line change
@@ -179,7 +179,7 @@ func NewEngine(id uint64, idx tsdb.Index, path string, walPath string, sfile *ts
}
fs.tsmMMAPWillNeed = opt.Config.TSMWillNeed

cache := NewCache(uint64(opt.Config.CacheMaxMemorySize))
cache := NewCache(uint64(opt.Config.CacheMaxMemorySize), etags)

c := NewCompactor()
c.Dir = path
@@ -594,14 +594,15 @@ var globalCompactionMetrics *compactionMetrics = newAllCompactionMetrics(engineL

// PrometheusCollectors returns all prometheus metrics for the tsm1 package.
func PrometheusCollectors() []prometheus.Collector {
return []prometheus.Collector{
collectors := []prometheus.Collector{
globalCompactionMetrics.Duration,
globalCompactionMetrics.Active,
globalCompactionMetrics.Failed,
globalCompactionMetrics.Queued,
globalFileStoreMetrics.files,
globalFileStoreMetrics.size,
}
collectors = append(collectors, FileStoreCollectors()...)
collectors = append(collectors, CacheCollectors()...)
return collectors
}

const (
@@ -1841,8 +1842,10 @@ func (e *Engine) WriteSnapshot() (err error) {
log, logEnd := logger.NewOperation(context.TODO(), e.logger, "Cache snapshot", "tsm1_cache_snapshot")
defer func() {
elapsed := time.Since(started)
e.Cache.UpdateCompactTime(elapsed)

if err != nil && err != errCompactionsDisabled {
e.stats.Failed.With(prometheus.Labels{levelKey: levelCache}).Inc()
}
e.stats.Duration.With(prometheus.Labels{levelKey: levelCache}).Observe(elapsed.Seconds())
if err == nil {
log.Info("Snapshot for path written", zap.String("path", e.path), zap.Duration("duration", elapsed))
}
@@ -1978,16 +1981,12 @@ func (e *Engine) compactCache() {
return

case <-t.C:
e.Cache.UpdateAge()
if e.ShouldCompactCache(time.Now()) {
start := time.Now()
e.traceLogger.Info("Compacting cache", zap.String("path", e.path))
err := e.WriteSnapshot()
if err != nil && err != errCompactionsDisabled {
e.logger.Info("Error writing snapshot", zap.Error(err))
e.stats.Failed.With(prometheus.Labels{levelKey: levelCache}).Inc()
}
e.stats.Duration.With(prometheus.Labels{levelKey: levelCache}).Observe(time.Since(start).Seconds())
}
}
}
7 changes: 7 additions & 0 deletions tsdb/engine/tsm1/file_store.go
Original file line number Diff line number Diff line change
@@ -326,6 +326,13 @@ func newAllFileStoreMetrics() *allFileStoreMetrics {
}
}

func FileStoreCollectors() []prometheus.Collector {
return []prometheus.Collector{
globalFileStoreMetrics.files,
globalFileStoreMetrics.size,
}
}

func newFileStoreMetrics(tags EngineTags) *fileStoreMetrics {
labels := tags.getLabels()
return &fileStoreMetrics{

0 comments on commit feb459c

Please sign in to comment.