Skip to content

Commit

Permalink
pkg/storage: remove GetCompactionStats, refactor GetMetrics
Browse files Browse the repository at this point in the history
Refactor the 'compaction stats' and metrics exposed from the
storage.Engine to directly expose `*pebble.Metrics`, embedded within a
`storage.Metrics` struct.

Also, pass the `Metrics` type directly into log lines where we print the
full LSM.

Release note: None
  • Loading branch information
jbowens committed May 25, 2021
1 parent 1ae444a commit 0e0082d
Show file tree
Hide file tree
Showing 7 changed files with 101 additions and 136 deletions.
38 changes: 21 additions & 17 deletions pkg/kv/kvserver/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -1668,23 +1668,27 @@ func (sm *TenantsStorageMetrics) subtractMVCCStats(
}

func (sm *StoreMetrics) updateEngineMetrics(m storage.Metrics) {
sm.RdbBlockCacheHits.Update(m.BlockCacheHits)
sm.RdbBlockCacheMisses.Update(m.BlockCacheMisses)
sm.RdbBlockCacheUsage.Update(m.BlockCacheUsage)
sm.RdbBlockCachePinnedUsage.Update(m.BlockCachePinnedUsage)
sm.RdbBloomFilterPrefixUseful.Update(m.BloomFilterPrefixUseful)
sm.RdbBloomFilterPrefixChecked.Update(m.BloomFilterPrefixChecked)
sm.RdbMemtableTotalSize.Update(m.MemtableTotalSize)
sm.RdbFlushes.Update(m.Flushes)
sm.RdbFlushedBytes.Update(m.FlushedBytes)
sm.RdbCompactions.Update(m.Compactions)
sm.RdbIngestedBytes.Update(m.IngestedBytes)
sm.RdbCompactedBytesRead.Update(m.CompactedBytesRead)
sm.RdbCompactedBytesWritten.Update(m.CompactedBytesWritten)
sm.RdbTableReadersMemEstimate.Update(m.TableReadersMemEstimate)
sm.RdbReadAmplification.Update(m.ReadAmplification)
sm.RdbPendingCompaction.Update(m.PendingCompactionBytesEstimate)
sm.RdbNumSSTables.Update(m.NumSSTables)
sm.RdbBlockCacheHits.Update(m.BlockCache.Hits)
sm.RdbBlockCacheMisses.Update(m.BlockCache.Misses)
sm.RdbBlockCacheUsage.Update(m.BlockCache.Size)
// TODO(jackson): Delete RdbBlockCachePinnedUsage or calculate the
// equivalent (the sum of IteratorMetrics.ReadAmp for all open iterator,
// times the block size).
sm.RdbBlockCachePinnedUsage.Update(0)
sm.RdbBloomFilterPrefixUseful.Update(m.Filter.Hits)
sm.RdbBloomFilterPrefixChecked.Update(m.Filter.Hits + m.Filter.Misses)
sm.RdbMemtableTotalSize.Update(int64(m.MemTable.Size))
sm.RdbFlushes.Update(m.Flush.Count)
sm.RdbFlushedBytes.Update(int64(m.Levels[0].BytesFlushed))
sm.RdbCompactions.Update(m.Compact.Count)
sm.RdbIngestedBytes.Update(int64(m.IngestedBytes()))
compactedRead, compactedWritten := m.CompactedBytes()
sm.RdbCompactedBytesRead.Update(int64(compactedRead))
sm.RdbCompactedBytesWritten.Update(int64(compactedWritten))
sm.RdbTableReadersMemEstimate.Update(m.TableCache.Size)
sm.RdbReadAmplification.Update(int64(m.ReadAmp()))
sm.RdbPendingCompaction.Update(int64(m.Compact.EstimatedDebt))
sm.RdbNumSSTables.Update(m.NumSSTables())
sm.DiskSlow.Update(m.DiskSlowCount)
sm.DiskStalled.Update(m.DiskStallCount)
}
Expand Down
11 changes: 5 additions & 6 deletions pkg/kv/kvserver/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -2635,11 +2635,8 @@ func (s *Store) ComputeMetrics(ctx context.Context, tick int) error {
}

// Get the latest engine metrics.
m, err := s.engine.GetMetrics()
if err != nil {
return err
}
s.metrics.updateEngineMetrics(*m)
m := s.engine.GetMetrics()
s.metrics.updateEngineMetrics(m)

// Get engine Env stats.
envStats, err := s.engine.GetEnvStats()
Expand All @@ -2653,7 +2650,9 @@ func (s *Store) ComputeMetrics(ctx context.Context, tick int) error {
// non-periodic callers of this method don't trigger expensive
// stats.
if tick%logSSTInfoTicks == 1 /* every 10m */ {
log.Infof(ctx, "%s", s.engine.GetCompactionStats())
// NB: The initial blank line ensures that compaction stats display
// will not contain the log prefix.
log.Infof(ctx, "\n%s", m.Metrics)
}
return nil
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/server/debug/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ func (ds *Server) RegisterEngines(specs []base.StoreSpec, engines []storage.Engi
eng := engines[i]
ds.mux.HandleFunc(fmt.Sprintf("/debug/lsm/%d", id.StoreID),
func(w http.ResponseWriter, req *http.Request) {
_, _ = io.WriteString(w, eng.GetCompactionStats())
_, _ = io.WriteString(w, eng.GetMetrics().String())
})

dir := specs[i].Path
Expand Down
7 changes: 3 additions & 4 deletions pkg/server/node_engine_health.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,15 +58,14 @@ func (n *Node) assertEngineHealth(
func() {
t := time.AfterFunc(maxDuration, func() {
n.metrics.DiskStalls.Inc(1)
stats := "\n" + eng.GetCompactionStats()
m := eng.GetMetrics()
logger := log.Warningf
if fatalOnExceeded {
logger = guaranteedExitFatal
}
// NB: the disk-stall-detected roachtest matches on this message.
logger(ctx, "disk stall detected: unable to write to %s within %s %s",
eng, storage.MaxSyncDuration, stats,
)
logger(ctx, "disk stall detected: unable to write to %s within %s\n%s",
eng, storage.MaxSyncDuration, m)
})
defer t.Stop()
if err := storage.WriteSyncNoop(ctx, eng); err != nil {
Expand Down
98 changes: 48 additions & 50 deletions pkg/storage/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -648,11 +648,8 @@ type Engine interface {
// Flush causes the engine to write all in-memory data to disk
// immediately.
Flush() error
// GetCompactionStats returns the internal RocksDB compaction stats. See
// https://github.com/facebook/rocksdb/wiki/RocksDB-Tuning-Guide#rocksdb-statistics.
GetCompactionStats() string
// GetMetrics retrieves metrics from the engine.
GetMetrics() (*Metrics, error)
GetMetrics() Metrics
// GetEncryptionRegistries returns the file and key registries when encryption is enabled
// on the store.
GetEncryptionRegistries() (*EncryptionRegistries, error)
Expand Down Expand Up @@ -765,42 +762,45 @@ type Batch interface {
Repr() []byte
}

// Metrics is a set of Engine metrics. Most are described in RocksDB.
// Some metrics (eg, `IngestedBytes`) are only exposed by Pebble.
//
// Currently, we collect stats from the following sources:
// 1. RocksDB's internal "tickers" (i.e. counters). They're defined in
// rocksdb/statistics.h
// 2. DBEventListener, which implements RocksDB's EventListener interface.
// 3. rocksdb::DB::GetProperty().
//
// This is a good resource describing RocksDB's memory-related stats:
// https://github.com/facebook/rocksdb/wiki/Memory-usage-in-RocksDB
//
// TODO(jackson): Refactor to mirror or even expose pebble.Metrics when
// RocksDB is removed.
// Metrics is a set of Engine metrics. Most are contained in the embedded
// *pebble.Metrics struct, which has its own documentation.
type Metrics struct {
BlockCacheHits int64
BlockCacheMisses int64
BlockCacheUsage int64
BlockCachePinnedUsage int64
BloomFilterPrefixChecked int64
BloomFilterPrefixUseful int64
DiskSlowCount int64
DiskStallCount int64
MemtableTotalSize int64
Flushes int64
FlushedBytes int64
Compactions int64
IngestedBytes int64 // Pebble only
CompactedBytesRead int64
CompactedBytesWritten int64
TableReadersMemEstimate int64
PendingCompactionBytesEstimate int64
L0FileCount int64
L0SublevelCount int64
ReadAmplification int64
NumSSTables int64
*pebble.Metrics
// DiskSlowCount counts the number of times Pebble records disk slowness.
DiskSlowCount int64
// DiskStallCount counts the number of times Pebble observes slow writes
// lasting longer than MaxSyncDuration (`storage.max_sync_duration`).
DiskStallCount int64
}

// NumSSTables returns the total number of SSTables in the LSM, aggregated
// across levels.
func (m *Metrics) NumSSTables() int64 {
var num int64
for _, lm := range m.Metrics.Levels {
num += lm.NumFiles
}
return num
}

// IngestedBytes returns the sum of all ingested tables, aggregated across all
// levels of the LSM.
func (m *Metrics) IngestedBytes() uint64 {
var ingestedBytes uint64
for _, lm := range m.Metrics.Levels {
ingestedBytes += lm.BytesIngested
}
return ingestedBytes
}

// CompactedBytes returns the sum of bytes read and written during
// compactions across all levels of the LSM.
func (m *Metrics) CompactedBytes() (read, written uint64) {
for _, lm := range m.Metrics.Levels {
read += lm.BytesRead
written += lm.BytesCompacted
}
return read, written
}

// EnvStats is a set of RocksDB env stats, including encryption status.
Expand Down Expand Up @@ -1026,33 +1026,31 @@ func preIngestDelay(ctx context.Context, eng Engine, settings *cluster.Settings)
if settings == nil {
return
}
metrics, err := eng.GetMetrics()
if err != nil {
log.Warningf(ctx, "failed to read metrics: %+v", err)
return
}
targetDelay := calculatePreIngestDelay(settings, metrics)
metrics := eng.GetMetrics()
targetDelay := calculatePreIngestDelay(settings, metrics.Metrics)

if targetDelay == 0 {
return
}
log.VEventf(ctx, 2, "delaying SST ingestion %s. %d L0 files, %d L0 Sublevels", targetDelay, metrics.L0FileCount, metrics.L0SublevelCount)
log.VEventf(ctx, 2, "delaying SST ingestion %s. %d L0 files, %d L0 Sublevels",
targetDelay, metrics.Levels[0].NumFiles, metrics.Levels[0].Sublevels)

select {
case <-time.After(targetDelay):
case <-ctx.Done():
}
}

func calculatePreIngestDelay(settings *cluster.Settings, metrics *Metrics) time.Duration {
func calculatePreIngestDelay(settings *cluster.Settings, metrics *pebble.Metrics) time.Duration {
maxDelay := ingestDelayTime.Get(&settings.SV)
l0ReadAmpLimit := ingestDelayL0Threshold.Get(&settings.SV)

const ramp = 10
l0ReadAmp := metrics.L0FileCount
if metrics.L0SublevelCount >= 0 {
l0ReadAmp = metrics.L0SublevelCount
l0ReadAmp := metrics.Levels[0].NumFiles
if metrics.Levels[0].Sublevels >= 0 {
l0ReadAmp = int64(metrics.Levels[0].Sublevels)
}

if l0ReadAmp > l0ReadAmpLimit {
delayPerFile := maxDelay / time.Duration(ramp)
targetDelay := time.Duration(l0ReadAmp-l0ReadAmpLimit) * delayPerFile
Expand Down
33 changes: 17 additions & 16 deletions pkg/storage/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -790,11 +790,8 @@ func TestFlushNumSSTables(t *testing.T) {
t.Fatal(err)
}

m, err := engine.GetMetrics()
if err != nil {
t.Fatal(err)
}
if m.NumSSTables == 0 {
m := engine.GetMetrics()
if m.NumSSTables() == 0 {
t.Fatal("expected non-zero sstables, got 0")
}
})
Expand Down Expand Up @@ -1192,19 +1189,23 @@ func TestIngestDelayLimit(t *testing.T) {
max, ramp := time.Second*5, time.Second*5/10

for _, tc := range []struct {
exp time.Duration
metrics Metrics
exp time.Duration
fileCount int64
sublevelCount int32
}{
{0, Metrics{}},
{0, Metrics{L0FileCount: 19, L0SublevelCount: -1}},
{0, Metrics{L0FileCount: 20, L0SublevelCount: -1}},
{ramp, Metrics{L0FileCount: 21, L0SublevelCount: -1}},
{ramp * 2, Metrics{L0FileCount: 22, L0SublevelCount: -1}},
{ramp * 2, Metrics{L0FileCount: 22, L0SublevelCount: 22}},
{ramp * 2, Metrics{L0FileCount: 55, L0SublevelCount: 22}},
{max, Metrics{L0FileCount: 55, L0SublevelCount: -1}},
{0, 0, 0},
{0, 19, -1},
{0, 20, -1},
{ramp, 21, -1},
{ramp * 2, 22, -1},
{ramp * 2, 22, 22},
{ramp * 2, 55, 22},
{max, 55, -1},
} {
require.Equal(t, tc.exp, calculatePreIngestDelay(s, &tc.metrics))
var m pebble.Metrics
m.Levels[0].NumFiles = tc.fileCount
m.Levels[0].Sublevels = tc.sublevelCount
require.Equal(t, tc.exp, calculatePreIngestDelay(s, &m))
}
}

Expand Down
48 changes: 6 additions & 42 deletions pkg/storage/pebble.go
Original file line number Diff line number Diff line change
Expand Up @@ -701,14 +701,6 @@ func (p *Pebble) rawGet(key []byte) ([]byte, error) {
return ret, err
}

// GetCompactionStats implements the Engine interface.
func (p *Pebble) GetCompactionStats() string {
// NB: The initial blank line matches the formatting used by RocksDB and
// ensures that compaction stats display will not contain the log prefix
// (this method is only used for logging purposes).
return "\n" + p.db.Metrics().String()
}

// MVCCGetProto implements the Engine interface.
func (p *Pebble) MVCCGetProto(
key MVCCKey, msg protoutil.Message,
Expand Down Expand Up @@ -954,41 +946,13 @@ func (p *Pebble) Flush() error {
}

// GetMetrics implements the Engine interface.
func (p *Pebble) GetMetrics() (*Metrics, error) {
func (p *Pebble) GetMetrics() Metrics {
m := p.db.Metrics()

// Aggregate compaction stats across levels.
var ingestedBytes, compactedBytesRead, compactedBytesWritten, numSSTables int64
for _, lm := range m.Levels {
ingestedBytes += int64(lm.BytesIngested)
compactedBytesRead += int64(lm.BytesRead)
compactedBytesWritten += int64(lm.BytesCompacted)
numSSTables += lm.NumFiles
}

return &Metrics{
BlockCacheHits: m.BlockCache.Hits,
BlockCacheMisses: m.BlockCache.Misses,
BlockCacheUsage: m.BlockCache.Size,
BlockCachePinnedUsage: 0,
BloomFilterPrefixChecked: m.Filter.Hits + m.Filter.Misses,
BloomFilterPrefixUseful: m.Filter.Hits,
DiskSlowCount: int64(atomic.LoadUint64(&p.diskSlowCount)),
DiskStallCount: int64(atomic.LoadUint64(&p.diskStallCount)),
MemtableTotalSize: int64(m.MemTable.Size),
Flushes: m.Flush.Count,
FlushedBytes: int64(m.Levels[0].BytesFlushed),
Compactions: m.Compact.Count,
IngestedBytes: ingestedBytes,
CompactedBytesRead: compactedBytesRead,
CompactedBytesWritten: compactedBytesWritten,
TableReadersMemEstimate: m.TableCache.Size,
PendingCompactionBytesEstimate: int64(m.Compact.EstimatedDebt),
L0FileCount: m.Levels[0].NumFiles,
L0SublevelCount: int64(m.Levels[0].Sublevels),
ReadAmplification: int64(m.ReadAmp()),
NumSSTables: numSSTables,
}, nil
return Metrics{
Metrics: m,
DiskSlowCount: int64(atomic.LoadUint64(&p.diskSlowCount)),
DiskStallCount: int64(atomic.LoadUint64(&p.diskStallCount)),
}
}

// GetEncryptionRegistries implements the Engine interface.
Expand Down

0 comments on commit 0e0082d

Please sign in to comment.