From 0e0082d16bd9a19b5c11662c3d83cdfe54c5e506 Mon Sep 17 00:00:00 2001 From: Jackson Owens Date: Fri, 23 Apr 2021 11:01:48 -0400 Subject: [PATCH] pkg/storage: remove GetCompactionStats, refactor GetMetrics Refactor the 'compaction stats' and metrics exposed from the storage.Engine to directly expose `*pebble.Metrics`, embedded within a `storage.Metrics` struct. Also, pass the `Metrics` type directly into log lines where we print the full LSM. Release note: None --- pkg/kv/kvserver/metrics.go | 38 +++++++------ pkg/kv/kvserver/store.go | 11 ++-- pkg/server/debug/server.go | 2 +- pkg/server/node_engine_health.go | 7 +-- pkg/storage/engine.go | 98 ++++++++++++++++---------------- pkg/storage/engine_test.go | 33 +++++------ pkg/storage/pebble.go | 48 ++-------------- 7 files changed, 101 insertions(+), 136 deletions(-) diff --git a/pkg/kv/kvserver/metrics.go b/pkg/kv/kvserver/metrics.go index 691dfef3a58a..8fa2a5292ca4 100644 --- a/pkg/kv/kvserver/metrics.go +++ b/pkg/kv/kvserver/metrics.go @@ -1668,23 +1668,27 @@ func (sm *TenantsStorageMetrics) subtractMVCCStats( } func (sm *StoreMetrics) updateEngineMetrics(m storage.Metrics) { - sm.RdbBlockCacheHits.Update(m.BlockCacheHits) - sm.RdbBlockCacheMisses.Update(m.BlockCacheMisses) - sm.RdbBlockCacheUsage.Update(m.BlockCacheUsage) - sm.RdbBlockCachePinnedUsage.Update(m.BlockCachePinnedUsage) - sm.RdbBloomFilterPrefixUseful.Update(m.BloomFilterPrefixUseful) - sm.RdbBloomFilterPrefixChecked.Update(m.BloomFilterPrefixChecked) - sm.RdbMemtableTotalSize.Update(m.MemtableTotalSize) - sm.RdbFlushes.Update(m.Flushes) - sm.RdbFlushedBytes.Update(m.FlushedBytes) - sm.RdbCompactions.Update(m.Compactions) - sm.RdbIngestedBytes.Update(m.IngestedBytes) - sm.RdbCompactedBytesRead.Update(m.CompactedBytesRead) - sm.RdbCompactedBytesWritten.Update(m.CompactedBytesWritten) - sm.RdbTableReadersMemEstimate.Update(m.TableReadersMemEstimate) - sm.RdbReadAmplification.Update(m.ReadAmplification) - sm.RdbPendingCompaction.Update(m.PendingCompactionBytesEstimate) - sm.RdbNumSSTables.Update(m.NumSSTables) + sm.RdbBlockCacheHits.Update(m.BlockCache.Hits) + sm.RdbBlockCacheMisses.Update(m.BlockCache.Misses) + sm.RdbBlockCacheUsage.Update(m.BlockCache.Size) + // TODO(jackson): Delete RdbBlockCachePinnedUsage or calculate the + // equivalent (the sum of IteratorMetrics.ReadAmp for all open iterator, + // times the block size). + sm.RdbBlockCachePinnedUsage.Update(0) + sm.RdbBloomFilterPrefixUseful.Update(m.Filter.Hits) + sm.RdbBloomFilterPrefixChecked.Update(m.Filter.Hits + m.Filter.Misses) + sm.RdbMemtableTotalSize.Update(int64(m.MemTable.Size)) + sm.RdbFlushes.Update(m.Flush.Count) + sm.RdbFlushedBytes.Update(int64(m.Levels[0].BytesFlushed)) + sm.RdbCompactions.Update(m.Compact.Count) + sm.RdbIngestedBytes.Update(int64(m.IngestedBytes())) + compactedRead, compactedWritten := m.CompactedBytes() + sm.RdbCompactedBytesRead.Update(int64(compactedRead)) + sm.RdbCompactedBytesWritten.Update(int64(compactedWritten)) + sm.RdbTableReadersMemEstimate.Update(m.TableCache.Size) + sm.RdbReadAmplification.Update(int64(m.ReadAmp())) + sm.RdbPendingCompaction.Update(int64(m.Compact.EstimatedDebt)) + sm.RdbNumSSTables.Update(m.NumSSTables()) sm.DiskSlow.Update(m.DiskSlowCount) sm.DiskStalled.Update(m.DiskStallCount) } diff --git a/pkg/kv/kvserver/store.go b/pkg/kv/kvserver/store.go index 142274140840..af9bb10980fd 100644 --- a/pkg/kv/kvserver/store.go +++ b/pkg/kv/kvserver/store.go @@ -2635,11 +2635,8 @@ func (s *Store) ComputeMetrics(ctx context.Context, tick int) error { } // Get the latest engine metrics. - m, err := s.engine.GetMetrics() - if err != nil { - return err - } - s.metrics.updateEngineMetrics(*m) + m := s.engine.GetMetrics() + s.metrics.updateEngineMetrics(m) // Get engine Env stats. envStats, err := s.engine.GetEnvStats() @@ -2653,7 +2650,9 @@ func (s *Store) ComputeMetrics(ctx context.Context, tick int) error { // non-periodic callers of this method don't trigger expensive // stats. if tick%logSSTInfoTicks == 1 /* every 10m */ { - log.Infof(ctx, "%s", s.engine.GetCompactionStats()) + // NB: The initial blank line ensures that compaction stats display + // will not contain the log prefix. + log.Infof(ctx, "\n%s", m.Metrics) } return nil } diff --git a/pkg/server/debug/server.go b/pkg/server/debug/server.go index feb9480ccdab..b793ebac6382 100644 --- a/pkg/server/debug/server.go +++ b/pkg/server/debug/server.go @@ -211,7 +211,7 @@ func (ds *Server) RegisterEngines(specs []base.StoreSpec, engines []storage.Engi eng := engines[i] ds.mux.HandleFunc(fmt.Sprintf("/debug/lsm/%d", id.StoreID), func(w http.ResponseWriter, req *http.Request) { - _, _ = io.WriteString(w, eng.GetCompactionStats()) + _, _ = io.WriteString(w, eng.GetMetrics().String()) }) dir := specs[i].Path diff --git a/pkg/server/node_engine_health.go b/pkg/server/node_engine_health.go index efff31544fef..76a3a661c18e 100644 --- a/pkg/server/node_engine_health.go +++ b/pkg/server/node_engine_health.go @@ -58,15 +58,14 @@ func (n *Node) assertEngineHealth( func() { t := time.AfterFunc(maxDuration, func() { n.metrics.DiskStalls.Inc(1) - stats := "\n" + eng.GetCompactionStats() + m := eng.GetMetrics() logger := log.Warningf if fatalOnExceeded { logger = guaranteedExitFatal } // NB: the disk-stall-detected roachtest matches on this message. - logger(ctx, "disk stall detected: unable to write to %s within %s %s", - eng, storage.MaxSyncDuration, stats, - ) + logger(ctx, "disk stall detected: unable to write to %s within %s\n%s", + eng, storage.MaxSyncDuration, m) }) defer t.Stop() if err := storage.WriteSyncNoop(ctx, eng); err != nil { diff --git a/pkg/storage/engine.go b/pkg/storage/engine.go index 9e9ca4fa5d20..2b24dc7e9e0d 100644 --- a/pkg/storage/engine.go +++ b/pkg/storage/engine.go @@ -648,11 +648,8 @@ type Engine interface { // Flush causes the engine to write all in-memory data to disk // immediately. Flush() error - // GetCompactionStats returns the internal RocksDB compaction stats. See - // https://github.com/facebook/rocksdb/wiki/RocksDB-Tuning-Guide#rocksdb-statistics. - GetCompactionStats() string // GetMetrics retrieves metrics from the engine. - GetMetrics() (*Metrics, error) + GetMetrics() Metrics // GetEncryptionRegistries returns the file and key registries when encryption is enabled // on the store. GetEncryptionRegistries() (*EncryptionRegistries, error) @@ -765,42 +762,45 @@ type Batch interface { Repr() []byte } -// Metrics is a set of Engine metrics. Most are described in RocksDB. -// Some metrics (eg, `IngestedBytes`) are only exposed by Pebble. -// -// Currently, we collect stats from the following sources: -// 1. RocksDB's internal "tickers" (i.e. counters). They're defined in -// rocksdb/statistics.h -// 2. DBEventListener, which implements RocksDB's EventListener interface. -// 3. rocksdb::DB::GetProperty(). -// -// This is a good resource describing RocksDB's memory-related stats: -// https://github.com/facebook/rocksdb/wiki/Memory-usage-in-RocksDB -// -// TODO(jackson): Refactor to mirror or even expose pebble.Metrics when -// RocksDB is removed. +// Metrics is a set of Engine metrics. Most are contained in the embedded +// *pebble.Metrics struct, which has its own documentation. type Metrics struct { - BlockCacheHits int64 - BlockCacheMisses int64 - BlockCacheUsage int64 - BlockCachePinnedUsage int64 - BloomFilterPrefixChecked int64 - BloomFilterPrefixUseful int64 - DiskSlowCount int64 - DiskStallCount int64 - MemtableTotalSize int64 - Flushes int64 - FlushedBytes int64 - Compactions int64 - IngestedBytes int64 // Pebble only - CompactedBytesRead int64 - CompactedBytesWritten int64 - TableReadersMemEstimate int64 - PendingCompactionBytesEstimate int64 - L0FileCount int64 - L0SublevelCount int64 - ReadAmplification int64 - NumSSTables int64 + *pebble.Metrics + // DiskSlowCount counts the number of times Pebble records disk slowness. + DiskSlowCount int64 + // DiskStallCount counts the number of times Pebble observes slow writes + // lasting longer than MaxSyncDuration (`storage.max_sync_duration`). + DiskStallCount int64 +} + +// NumSSTables returns the total number of SSTables in the LSM, aggregated +// across levels. +func (m *Metrics) NumSSTables() int64 { + var num int64 + for _, lm := range m.Metrics.Levels { + num += lm.NumFiles + } + return num +} + +// IngestedBytes returns the sum of all ingested tables, aggregated across all +// levels of the LSM. +func (m *Metrics) IngestedBytes() uint64 { + var ingestedBytes uint64 + for _, lm := range m.Metrics.Levels { + ingestedBytes += lm.BytesIngested + } + return ingestedBytes +} + +// CompactedBytes returns the sum of bytes read and written during +// compactions across all levels of the LSM. +func (m *Metrics) CompactedBytes() (read, written uint64) { + for _, lm := range m.Metrics.Levels { + read += lm.BytesRead + written += lm.BytesCompacted + } + return read, written } // EnvStats is a set of RocksDB env stats, including encryption status. @@ -1026,17 +1026,14 @@ func preIngestDelay(ctx context.Context, eng Engine, settings *cluster.Settings) if settings == nil { return } - metrics, err := eng.GetMetrics() - if err != nil { - log.Warningf(ctx, "failed to read metrics: %+v", err) - return - } - targetDelay := calculatePreIngestDelay(settings, metrics) + metrics := eng.GetMetrics() + targetDelay := calculatePreIngestDelay(settings, metrics.Metrics) if targetDelay == 0 { return } - log.VEventf(ctx, 2, "delaying SST ingestion %s. %d L0 files, %d L0 Sublevels", targetDelay, metrics.L0FileCount, metrics.L0SublevelCount) + log.VEventf(ctx, 2, "delaying SST ingestion %s. %d L0 files, %d L0 Sublevels", + targetDelay, metrics.Levels[0].NumFiles, metrics.Levels[0].Sublevels) select { case <-time.After(targetDelay): @@ -1044,15 +1041,16 @@ func preIngestDelay(ctx context.Context, eng Engine, settings *cluster.Settings) } } -func calculatePreIngestDelay(settings *cluster.Settings, metrics *Metrics) time.Duration { +func calculatePreIngestDelay(settings *cluster.Settings, metrics *pebble.Metrics) time.Duration { maxDelay := ingestDelayTime.Get(&settings.SV) l0ReadAmpLimit := ingestDelayL0Threshold.Get(&settings.SV) const ramp = 10 - l0ReadAmp := metrics.L0FileCount - if metrics.L0SublevelCount >= 0 { - l0ReadAmp = metrics.L0SublevelCount + l0ReadAmp := metrics.Levels[0].NumFiles + if metrics.Levels[0].Sublevels >= 0 { + l0ReadAmp = int64(metrics.Levels[0].Sublevels) } + if l0ReadAmp > l0ReadAmpLimit { delayPerFile := maxDelay / time.Duration(ramp) targetDelay := time.Duration(l0ReadAmp-l0ReadAmpLimit) * delayPerFile diff --git a/pkg/storage/engine_test.go b/pkg/storage/engine_test.go index e91cf5d0dc5a..3bad9afc0ec0 100644 --- a/pkg/storage/engine_test.go +++ b/pkg/storage/engine_test.go @@ -790,11 +790,8 @@ func TestFlushNumSSTables(t *testing.T) { t.Fatal(err) } - m, err := engine.GetMetrics() - if err != nil { - t.Fatal(err) - } - if m.NumSSTables == 0 { + m := engine.GetMetrics() + if m.NumSSTables() == 0 { t.Fatal("expected non-zero sstables, got 0") } }) @@ -1192,19 +1189,23 @@ func TestIngestDelayLimit(t *testing.T) { max, ramp := time.Second*5, time.Second*5/10 for _, tc := range []struct { - exp time.Duration - metrics Metrics + exp time.Duration + fileCount int64 + sublevelCount int32 }{ - {0, Metrics{}}, - {0, Metrics{L0FileCount: 19, L0SublevelCount: -1}}, - {0, Metrics{L0FileCount: 20, L0SublevelCount: -1}}, - {ramp, Metrics{L0FileCount: 21, L0SublevelCount: -1}}, - {ramp * 2, Metrics{L0FileCount: 22, L0SublevelCount: -1}}, - {ramp * 2, Metrics{L0FileCount: 22, L0SublevelCount: 22}}, - {ramp * 2, Metrics{L0FileCount: 55, L0SublevelCount: 22}}, - {max, Metrics{L0FileCount: 55, L0SublevelCount: -1}}, + {0, 0, 0}, + {0, 19, -1}, + {0, 20, -1}, + {ramp, 21, -1}, + {ramp * 2, 22, -1}, + {ramp * 2, 22, 22}, + {ramp * 2, 55, 22}, + {max, 55, -1}, } { - require.Equal(t, tc.exp, calculatePreIngestDelay(s, &tc.metrics)) + var m pebble.Metrics + m.Levels[0].NumFiles = tc.fileCount + m.Levels[0].Sublevels = tc.sublevelCount + require.Equal(t, tc.exp, calculatePreIngestDelay(s, &m)) } } diff --git a/pkg/storage/pebble.go b/pkg/storage/pebble.go index 230adb22934c..29c7a8a360e8 100644 --- a/pkg/storage/pebble.go +++ b/pkg/storage/pebble.go @@ -701,14 +701,6 @@ func (p *Pebble) rawGet(key []byte) ([]byte, error) { return ret, err } -// GetCompactionStats implements the Engine interface. -func (p *Pebble) GetCompactionStats() string { - // NB: The initial blank line matches the formatting used by RocksDB and - // ensures that compaction stats display will not contain the log prefix - // (this method is only used for logging purposes). - return "\n" + p.db.Metrics().String() -} - // MVCCGetProto implements the Engine interface. func (p *Pebble) MVCCGetProto( key MVCCKey, msg protoutil.Message, @@ -954,41 +946,13 @@ func (p *Pebble) Flush() error { } // GetMetrics implements the Engine interface. -func (p *Pebble) GetMetrics() (*Metrics, error) { +func (p *Pebble) GetMetrics() Metrics { m := p.db.Metrics() - - // Aggregate compaction stats across levels. - var ingestedBytes, compactedBytesRead, compactedBytesWritten, numSSTables int64 - for _, lm := range m.Levels { - ingestedBytes += int64(lm.BytesIngested) - compactedBytesRead += int64(lm.BytesRead) - compactedBytesWritten += int64(lm.BytesCompacted) - numSSTables += lm.NumFiles - } - - return &Metrics{ - BlockCacheHits: m.BlockCache.Hits, - BlockCacheMisses: m.BlockCache.Misses, - BlockCacheUsage: m.BlockCache.Size, - BlockCachePinnedUsage: 0, - BloomFilterPrefixChecked: m.Filter.Hits + m.Filter.Misses, - BloomFilterPrefixUseful: m.Filter.Hits, - DiskSlowCount: int64(atomic.LoadUint64(&p.diskSlowCount)), - DiskStallCount: int64(atomic.LoadUint64(&p.diskStallCount)), - MemtableTotalSize: int64(m.MemTable.Size), - Flushes: m.Flush.Count, - FlushedBytes: int64(m.Levels[0].BytesFlushed), - Compactions: m.Compact.Count, - IngestedBytes: ingestedBytes, - CompactedBytesRead: compactedBytesRead, - CompactedBytesWritten: compactedBytesWritten, - TableReadersMemEstimate: m.TableCache.Size, - PendingCompactionBytesEstimate: int64(m.Compact.EstimatedDebt), - L0FileCount: m.Levels[0].NumFiles, - L0SublevelCount: int64(m.Levels[0].Sublevels), - ReadAmplification: int64(m.ReadAmp()), - NumSSTables: numSSTables, - }, nil + return Metrics{ + Metrics: m, + DiskSlowCount: int64(atomic.LoadUint64(&p.diskSlowCount)), + DiskStallCount: int64(atomic.LoadUint64(&p.diskStallCount)), + } } // GetEncryptionRegistries implements the Engine interface.