From 4849055d168764e46a0fae650d35d27d772f5de8 Mon Sep 17 00:00:00 2001 From: Tobias Grieger Date: Tue, 28 Mar 2023 11:22:43 +0200 Subject: [PATCH] kvserver: add raft.storage.read_bytes This tracks the cases where we fall back to reading log entries from storage (i.e. pebble) in `(raft.Storage.Entries)` Ideally this is zero, as everything ought to be served from the raft entry cache. We know that this cache is not configured well[^1] and so we can't really expect this to work, but you can't improve what you don't measure. Additionally, this metric has been useful in investigations related to raft overload[^2]. [^1]: https://github.com/cockroachdb/cockroach/issues/98666 [^2]: https://github.com/cockroachdb/cockroach/pull/98576 Epic: none Release note: None --- pkg/kv/kvserver/logstore/logstore.go | 32 +++++++++++++----------- pkg/kv/kvserver/metrics.go | 32 +++++++++++++++++++++++- pkg/kv/kvserver/replica_raftstorage.go | 4 ++- pkg/kv/kvserver/replica_sideload_test.go | 3 ++- pkg/ts/catalog/chart_catalog.go | 11 ++++++++ 5 files changed, 64 insertions(+), 18 deletions(-) diff --git a/pkg/kv/kvserver/logstore/logstore.go b/pkg/kv/kvserver/logstore/logstore.go index a621f71fb284..924394ed85f7 100644 --- a/pkg/kv/kvserver/logstore/logstore.go +++ b/pkg/kv/kvserver/logstore/logstore.go @@ -496,9 +496,9 @@ func LoadEntries( eCache *raftentry.Cache, sideloaded SideloadStorage, lo, hi, maxBytes uint64, -) ([]raftpb.Entry, error) { +) (_ []raftpb.Entry, _cachedSize int, _loadedSize int, _ error) { if lo > hi { - return nil, errors.Errorf("lo:%d is greater than hi:%d", lo, hi) + return nil, 0, 0, errors.Errorf("lo:%d is greater than hi:%d", lo, hi) } n := hi - lo @@ -507,14 +507,16 @@ func LoadEntries( } ents := make([]raftpb.Entry, 0, n) - ents, size, hitIndex, exceededMaxBytes := eCache.Scan(ents, rangeID, lo, hi, maxBytes) + ents, cachedSize, hitIndex, exceededMaxBytes := eCache.Scan(ents, rangeID, lo, hi, maxBytes) // Return results if the correct number of results came back or if // we ran into the max bytes limit. if uint64(len(ents)) == hi-lo || exceededMaxBytes { - return ents, nil + return ents, int(cachedSize), 0, nil } + combinedSize := cachedSize // size tracks total size of ents. + // Scan over the log to find the requested entries in the range [lo, hi), // stopping once we have enough. expectedIndex := hitIndex @@ -543,8 +545,8 @@ func LoadEntries( } // Note that we track the size of proposals with payloads inlined. - size += uint64(ent.Size()) - if size > maxBytes { + combinedSize += uint64(ent.Size()) + if combinedSize > maxBytes { exceededMaxBytes = true if len(ents) == 0 { // make sure to return at least one entry ents = append(ents, ent) @@ -559,18 +561,18 @@ func LoadEntries( reader := eng.NewReadOnly(storage.StandardDurability) defer reader.Close() if err := raftlog.Visit(reader, rangeID, expectedIndex, hi, scanFunc); err != nil { - return nil, err + return nil, 0, 0, err } eCache.Add(rangeID, ents, false /* truncate */) // Did the correct number of results come back? If so, we're all good. if uint64(len(ents)) == hi-lo { - return ents, nil + return ents, int(cachedSize), int(combinedSize - cachedSize), nil } // Did we hit the size limit? If so, return what we have. if exceededMaxBytes { - return ents, nil + return ents, int(cachedSize), int(combinedSize - cachedSize), nil } // Did we get any results at all? Because something went wrong. @@ -578,25 +580,25 @@ func LoadEntries( // Was the missing index after the last index? lastIndex, err := rsl.LoadLastIndex(ctx, reader) if err != nil { - return nil, err + return nil, 0, 0, err } if lastIndex <= expectedIndex { - return nil, raft.ErrUnavailable + return nil, 0, 0, raft.ErrUnavailable } // We have a gap in the record, if so, return a nasty error. - return nil, errors.Errorf("there is a gap in the index record between lo:%d and hi:%d at index:%d", lo, hi, expectedIndex) + return nil, 0, 0, errors.Errorf("there is a gap in the index record between lo:%d and hi:%d at index:%d", lo, hi, expectedIndex) } // No results, was it due to unavailability or truncation? ts, err := rsl.LoadRaftTruncatedState(ctx, reader) if err != nil { - return nil, err + return nil, 0, 0, err } if ts.Index >= lo { // The requested lo index has already been truncated. - return nil, raft.ErrCompacted + return nil, 0, 0, raft.ErrCompacted } // The requested lo index does not yet exist. - return nil, raft.ErrUnavailable + return nil, 0, 0, raft.ErrUnavailable } diff --git a/pkg/kv/kvserver/metrics.go b/pkg/kv/kvserver/metrics.go index abaf1fc3d982..377bcb562fbe 100644 --- a/pkg/kv/kvserver/metrics.go +++ b/pkg/kv/kvserver/metrics.go @@ -979,6 +979,34 @@ of processing. Measurement: "Elections called after timeout", Unit: metric.Unit_COUNT, } + metaRaftStorageReadBytes = metric.Metadata{ + Name: "raft.storage.read_bytes", + Help: `Counter of raftpb.Entry.Size() read from pebble for raft log entries. + +These are the bytes returned from the (raft.Storage).Entries method that were not +returned via the raft entry cache. This metric plus the raft.storage.read_bytes +metric represent the total bytes returned from the Entries method. + +Since pebble might serve these entries from the block cache, only a fraction of this +throughput might manifest in disk metrics. + +Entries tracked in this metric incur an unmarshalling-related CPU and memory +overhead that would not be incurred would the entries be served from the raft +entry cache. + +The bytes returned here do not correspond 1:1 to bytes read from pebble. This +metric measures the in-memory size of the raftpb.Entry, whereas we read its +encoded representation from pebble. As there is no compression involved, these +will generally be comparable. + +A common reason for elevated measurements on this metric is that a store is +falling behind on raft log application. The raft entry cache generally tracks +entries that were recently appended, so if log application falls behind the +cache will already have moved on to newer entries. +`, + Measurement: "Bytes", + Unit: metric.Unit_BYTES, + } // Raft message metrics. metaRaftRcvdProp = metric.Metadata{ @@ -1938,6 +1966,7 @@ type StoreMetrics struct { RaftApplyCommittedLatency metric.IHistogram RaftSchedulerLatency metric.IHistogram RaftTimeoutCampaign *metric.Counter + RaftStorageReadBytes *metric.Counter // Raft message metrics. // @@ -2504,7 +2533,8 @@ func newStoreMetrics(histogramWindow time.Duration) *StoreMetrics { Duration: histogramWindow, Buckets: metric.IOLatencyBuckets, }), - RaftTimeoutCampaign: metric.NewCounter(metaRaftTimeoutCampaign), + RaftTimeoutCampaign: metric.NewCounter(metaRaftTimeoutCampaign), + RaftStorageReadBytes: metric.NewCounter(metaRaftStorageReadBytes), // Raft message metrics. RaftRcvdMessages: [maxRaftMsgType + 1]*metric.Counter{ diff --git a/pkg/kv/kvserver/replica_raftstorage.go b/pkg/kv/kvserver/replica_raftstorage.go index a8dc4298bda9..a8dc29036293 100644 --- a/pkg/kv/kvserver/replica_raftstorage.go +++ b/pkg/kv/kvserver/replica_raftstorage.go @@ -84,8 +84,10 @@ func (r *replicaRaftStorage) Entries(lo, hi, maxBytes uint64) ([]raftpb.Entry, e if r.raftMu.sideloaded == nil { return nil, errors.New("sideloaded storage is uninitialized") } - return logstore.LoadEntries(ctx, r.mu.stateLoader.StateLoader, r.store.TODOEngine(), r.RangeID, + ents, _, loadedSize, err := logstore.LoadEntries(ctx, r.mu.stateLoader.StateLoader, r.store.TODOEngine(), r.RangeID, r.store.raftEntryCache, r.raftMu.sideloaded, lo, hi, maxBytes) + r.store.metrics.RaftStorageReadBytes.Inc(int64(loadedSize)) + return ents, err } // raftEntriesLocked requires that r.mu is held for writing. diff --git a/pkg/kv/kvserver/replica_sideload_test.go b/pkg/kv/kvserver/replica_sideload_test.go index 6375b3953b28..d8b4146ad8ab 100644 --- a/pkg/kv/kvserver/replica_sideload_test.go +++ b/pkg/kv/kvserver/replica_sideload_test.go @@ -183,12 +183,13 @@ func TestRaftSSTableSideloading(t *testing.T) { hi := tc.repl.mu.lastIndexNotDurable + 1 tc.store.raftEntryCache.Clear(tc.repl.RangeID, hi) - ents, err := logstore.LoadEntries( + ents, cachedBytes, _, err := logstore.LoadEntries( ctx, rsl, tc.store.TODOEngine(), tc.repl.RangeID, tc.store.raftEntryCache, tc.repl.raftMu.sideloaded, lo, hi, math.MaxUint64, ) require.NoError(t, err) require.Len(t, ents, int(hi-lo)) + require.Zero(t, cachedBytes) // Check that the Raft entry cache was populated. _, okLo := tc.store.raftEntryCache.Get(tc.repl.RangeID, lo) diff --git a/pkg/ts/catalog/chart_catalog.go b/pkg/ts/catalog/chart_catalog.go index 54ed9ff70c4c..274ca314e6b4 100644 --- a/pkg/ts/catalog/chart_catalog.go +++ b/pkg/ts/catalog/chart_catalog.go @@ -3981,6 +3981,17 @@ var charts = []sectionDescription{ }, }, }, + { + Organization: [][]string{{ReplicationLayer, "Raft reads"}}, + Charts: []chartDescription{ + { + Title: "raft.Storage.Engine Read bytes", + Metrics: []string{ + "raft.storage.read_bytes", + }, + }, + }, + }, } func jobTypeCharts(title string, varName string) chartDescription {