Skip to content

Commit

Permalink
kvserver: add raft.storage.read_bytes
Browse files Browse the repository at this point in the history
This tracks the cases where we fall back to reading log entries from storage
(i.e. pebble) in `(raft.Storage.Entries)`

Ideally this is zero, as everything ought to be served from the raft entry cache. We know that this cache is not configured well[^1] and so we can't really expect this to work, but you can't improve what you don't measure.

Additionally, this metric has been useful in investigations related to raft overload[^2].

[^1]: cockroachdb#98666
[^2]: cockroachdb#98576

Epic: none
Release note: None
  • Loading branch information
tbg committed Mar 28, 2023
1 parent 2bd2c80 commit 4849055
Show file tree
Hide file tree
Showing 5 changed files with 64 additions and 18 deletions.
32 changes: 17 additions & 15 deletions pkg/kv/kvserver/logstore/logstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -496,9 +496,9 @@ func LoadEntries(
eCache *raftentry.Cache,
sideloaded SideloadStorage,
lo, hi, maxBytes uint64,
) ([]raftpb.Entry, error) {
) (_ []raftpb.Entry, _cachedSize int, _loadedSize int, _ error) {
if lo > hi {
return nil, errors.Errorf("lo:%d is greater than hi:%d", lo, hi)
return nil, 0, 0, errors.Errorf("lo:%d is greater than hi:%d", lo, hi)
}

n := hi - lo
Expand All @@ -507,14 +507,16 @@ func LoadEntries(
}
ents := make([]raftpb.Entry, 0, n)

ents, size, hitIndex, exceededMaxBytes := eCache.Scan(ents, rangeID, lo, hi, maxBytes)
ents, cachedSize, hitIndex, exceededMaxBytes := eCache.Scan(ents, rangeID, lo, hi, maxBytes)

// Return results if the correct number of results came back or if
// we ran into the max bytes limit.
if uint64(len(ents)) == hi-lo || exceededMaxBytes {
return ents, nil
return ents, int(cachedSize), 0, nil
}

combinedSize := cachedSize // size tracks total size of ents.

// Scan over the log to find the requested entries in the range [lo, hi),
// stopping once we have enough.
expectedIndex := hitIndex
Expand Down Expand Up @@ -543,8 +545,8 @@ func LoadEntries(
}

// Note that we track the size of proposals with payloads inlined.
size += uint64(ent.Size())
if size > maxBytes {
combinedSize += uint64(ent.Size())
if combinedSize > maxBytes {
exceededMaxBytes = true
if len(ents) == 0 { // make sure to return at least one entry
ents = append(ents, ent)
Expand All @@ -559,44 +561,44 @@ func LoadEntries(
reader := eng.NewReadOnly(storage.StandardDurability)
defer reader.Close()
if err := raftlog.Visit(reader, rangeID, expectedIndex, hi, scanFunc); err != nil {
return nil, err
return nil, 0, 0, err
}
eCache.Add(rangeID, ents, false /* truncate */)

// Did the correct number of results come back? If so, we're all good.
if uint64(len(ents)) == hi-lo {
return ents, nil
return ents, int(cachedSize), int(combinedSize - cachedSize), nil
}

// Did we hit the size limit? If so, return what we have.
if exceededMaxBytes {
return ents, nil
return ents, int(cachedSize), int(combinedSize - cachedSize), nil
}

// Did we get any results at all? Because something went wrong.
if len(ents) > 0 {
// Was the missing index after the last index?
lastIndex, err := rsl.LoadLastIndex(ctx, reader)
if err != nil {
return nil, err
return nil, 0, 0, err
}
if lastIndex <= expectedIndex {
return nil, raft.ErrUnavailable
return nil, 0, 0, raft.ErrUnavailable
}

// We have a gap in the record, if so, return a nasty error.
return nil, errors.Errorf("there is a gap in the index record between lo:%d and hi:%d at index:%d", lo, hi, expectedIndex)
return nil, 0, 0, errors.Errorf("there is a gap in the index record between lo:%d and hi:%d at index:%d", lo, hi, expectedIndex)
}

// No results, was it due to unavailability or truncation?
ts, err := rsl.LoadRaftTruncatedState(ctx, reader)
if err != nil {
return nil, err
return nil, 0, 0, err
}
if ts.Index >= lo {
// The requested lo index has already been truncated.
return nil, raft.ErrCompacted
return nil, 0, 0, raft.ErrCompacted
}
// The requested lo index does not yet exist.
return nil, raft.ErrUnavailable
return nil, 0, 0, raft.ErrUnavailable
}
32 changes: 31 additions & 1 deletion pkg/kv/kvserver/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -979,6 +979,34 @@ of processing.
Measurement: "Elections called after timeout",
Unit: metric.Unit_COUNT,
}
metaRaftStorageReadBytes = metric.Metadata{
Name: "raft.storage.read_bytes",
Help: `Counter of raftpb.Entry.Size() read from pebble for raft log entries.
These are the bytes returned from the (raft.Storage).Entries method that were not
returned via the raft entry cache. This metric plus the raft.storage.read_bytes
metric represent the total bytes returned from the Entries method.
Since pebble might serve these entries from the block cache, only a fraction of this
throughput might manifest in disk metrics.
Entries tracked in this metric incur an unmarshalling-related CPU and memory
overhead that would not be incurred would the entries be served from the raft
entry cache.
The bytes returned here do not correspond 1:1 to bytes read from pebble. This
metric measures the in-memory size of the raftpb.Entry, whereas we read its
encoded representation from pebble. As there is no compression involved, these
will generally be comparable.
A common reason for elevated measurements on this metric is that a store is
falling behind on raft log application. The raft entry cache generally tracks
entries that were recently appended, so if log application falls behind the
cache will already have moved on to newer entries.
`,
Measurement: "Bytes",
Unit: metric.Unit_BYTES,
}

// Raft message metrics.
metaRaftRcvdProp = metric.Metadata{
Expand Down Expand Up @@ -1938,6 +1966,7 @@ type StoreMetrics struct {
RaftApplyCommittedLatency metric.IHistogram
RaftSchedulerLatency metric.IHistogram
RaftTimeoutCampaign *metric.Counter
RaftStorageReadBytes *metric.Counter

// Raft message metrics.
//
Expand Down Expand Up @@ -2504,7 +2533,8 @@ func newStoreMetrics(histogramWindow time.Duration) *StoreMetrics {
Duration: histogramWindow,
Buckets: metric.IOLatencyBuckets,
}),
RaftTimeoutCampaign: metric.NewCounter(metaRaftTimeoutCampaign),
RaftTimeoutCampaign: metric.NewCounter(metaRaftTimeoutCampaign),
RaftStorageReadBytes: metric.NewCounter(metaRaftStorageReadBytes),

// Raft message metrics.
RaftRcvdMessages: [maxRaftMsgType + 1]*metric.Counter{
Expand Down
4 changes: 3 additions & 1 deletion pkg/kv/kvserver/replica_raftstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,10 @@ func (r *replicaRaftStorage) Entries(lo, hi, maxBytes uint64) ([]raftpb.Entry, e
if r.raftMu.sideloaded == nil {
return nil, errors.New("sideloaded storage is uninitialized")
}
return logstore.LoadEntries(ctx, r.mu.stateLoader.StateLoader, r.store.TODOEngine(), r.RangeID,
ents, _, loadedSize, err := logstore.LoadEntries(ctx, r.mu.stateLoader.StateLoader, r.store.TODOEngine(), r.RangeID,
r.store.raftEntryCache, r.raftMu.sideloaded, lo, hi, maxBytes)
r.store.metrics.RaftStorageReadBytes.Inc(int64(loadedSize))
return ents, err
}

// raftEntriesLocked requires that r.mu is held for writing.
Expand Down
3 changes: 2 additions & 1 deletion pkg/kv/kvserver/replica_sideload_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,12 +183,13 @@ func TestRaftSSTableSideloading(t *testing.T) {
hi := tc.repl.mu.lastIndexNotDurable + 1

tc.store.raftEntryCache.Clear(tc.repl.RangeID, hi)
ents, err := logstore.LoadEntries(
ents, cachedBytes, _, err := logstore.LoadEntries(
ctx, rsl, tc.store.TODOEngine(), tc.repl.RangeID, tc.store.raftEntryCache,
tc.repl.raftMu.sideloaded, lo, hi, math.MaxUint64,
)
require.NoError(t, err)
require.Len(t, ents, int(hi-lo))
require.Zero(t, cachedBytes)

// Check that the Raft entry cache was populated.
_, okLo := tc.store.raftEntryCache.Get(tc.repl.RangeID, lo)
Expand Down
11 changes: 11 additions & 0 deletions pkg/ts/catalog/chart_catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -3981,6 +3981,17 @@ var charts = []sectionDescription{
},
},
},
{
Organization: [][]string{{ReplicationLayer, "Raft reads"}},
Charts: []chartDescription{
{
Title: "raft.Storage.Engine Read bytes",
Metrics: []string{
"raft.storage.read_bytes",
},
},
},
},
}

func jobTypeCharts(title string, varName string) chartDescription {
Expand Down

0 comments on commit 4849055

Please sign in to comment.