Skip to content

Commit

Permalink
kvserver: add average cpu nanos per replica
Browse files Browse the repository at this point in the history
This patch adds partial accounting for the time spent on processing
requests, for a specific replica. The time spent is recorded in
`ReplicaStats` and allows for observing the aggregate cpu time spent on
replicas via an exported metric `rebalancing.cpunanospersecond`.

resolves #90589

Release note: None
  • Loading branch information
kvoli committed Dec 5, 2022
1 parent 80c4287 commit 2063297
Show file tree
Hide file tree
Showing 10 changed files with 69 additions and 0 deletions.
1 change: 1 addition & 0 deletions pkg/kv/kvserver/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ go_library(
"//pkg/util/envutil",
"//pkg/util/errorutil",
"//pkg/util/grpcutil",
"//pkg/util/grunning",
"//pkg/util/hlc",
"//pkg/util/humanizeutil",
"//pkg/util/iterutil",
Expand Down
8 changes: 8 additions & 0 deletions pkg/kv/kvserver/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,12 @@ var (
Measurement: "Bytes/Sec",
Unit: metric.Unit_BYTES,
}
metaAverageCPUNanosPerSecond = metric.Metadata{
Name: "rebalancing.cpunanospersecond",
Help: "Average CPU nanoseconds spent on processing replica operations in the last 30 minutes.",
Measurement: "Nanoseconds/Sec",
Unit: metric.Unit_NANOSECONDS,
}

// Metric for tracking follower reads.
metaFollowerReadsCount = metric.Metadata{
Expand Down Expand Up @@ -1745,6 +1751,7 @@ type StoreMetrics struct {
AverageRequestsPerSecond *metric.GaugeFloat64
AverageWriteBytesPerSecond *metric.GaugeFloat64
AverageReadBytesPerSecond *metric.GaugeFloat64
AverageNanosPerSecond *metric.GaugeFloat64
// l0SublevelsWindowedMax doesn't get recorded to metrics itself, it maintains
// an ad-hoc history for gosipping information for allocator use.
l0SublevelsWindowedMax syncutil.AtomicFloat64
Expand Down Expand Up @@ -2286,6 +2293,7 @@ func newStoreMetrics(histogramWindow time.Duration) *StoreMetrics {
AverageReadsPerSecond: metric.NewGaugeFloat64(metaAverageReadsPerSecond),
AverageWriteBytesPerSecond: metric.NewGaugeFloat64(metaAverageWriteBytesPerSecond),
AverageReadBytesPerSecond: metric.NewGaugeFloat64(metaAverageReadBytesPerSecond),
AverageNanosPerSecond: metric.NewGaugeFloat64(metaAverageCPUNanosPerSecond),

// Follower reads metrics.
FollowerReadsCount: metric.NewCounter(metaFollowerReadsCount),
Expand Down
5 changes: 5 additions & 0 deletions pkg/kv/kvserver/mvcc_gc_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb"
"github.com/cockroachdb/cockroach/pkg/util/contextutil"
"github.com/cockroachdb/cockroach/pkg/util/grunning"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/humanizeutil"
"github.com/cockroachdb/cockroach/pkg/util/log"
Expand Down Expand Up @@ -671,6 +672,10 @@ func (mgcq *mvccGCQueue) process(
) (processed bool, err error) {
// Lookup the descriptor and GC policy for the zone containing this key range.
desc, conf := repl.DescAndSpanConfig()
grunningStart := grunning.Time()
defer func() {
repl.RecordNanosRunning(grunning.Difference(grunningStart, grunning.Time()).Nanoseconds())
}()

// Consult the protected timestamp state to determine whether we can GC and
// the timestamp which can be used to calculate the score and updated GC
Expand Down
6 changes: 6 additions & 0 deletions pkg/kv/kvserver/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -2052,6 +2052,12 @@ func init() {
tracing.RegisterTagRemapping("r", "range")
}

// RecordNanosRunning records the given duration against the replica's cpu time
// accounting.
func (r *Replica) RecordNanosRunning(duration int64) {
r.loadStats.cpuNanos.RecordCount(float64(duration), 0 /* nodeID */)
}

// ReadProtectedTimestampsForTesting is for use only by tests to read and update
// the Replicas' cached protected timestamp state.
func (r *Replica) ReadProtectedTimestampsForTesting(ctx context.Context) (err error) {
Expand Down
5 changes: 5 additions & 0 deletions pkg/kv/kvserver/replica_load.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ type ReplicaLoad struct {
readKeys *replicastats.ReplicaStats
writeBytes *replicastats.ReplicaStats
readBytes *replicastats.ReplicaStats
cpuNanos *replicastats.ReplicaStats
}

// NewReplicaLoad returns a new ReplicaLoad, which may be used to track the
Expand All @@ -40,6 +41,7 @@ func NewReplicaLoad(clock *hlc.Clock, getNodeLocality replicastats.LocalityOracl
readKeys: replicastats.NewReplicaStats(clock, nil),
writeBytes: replicastats.NewReplicaStats(clock, nil),
readBytes: replicastats.NewReplicaStats(clock, nil),
cpuNanos: replicastats.NewReplicaStats(clock, nil),
}
}

Expand All @@ -52,6 +54,7 @@ func (rl *ReplicaLoad) split(other *ReplicaLoad) {
rl.readKeys.SplitRequestCounts(other.readKeys)
rl.writeBytes.SplitRequestCounts(other.writeBytes)
rl.readBytes.SplitRequestCounts(other.readBytes)
rl.cpuNanos.SplitRequestCounts(other.cpuNanos)
}

// merge will combine the tracked load in other, into the calling struct.
Expand All @@ -62,6 +65,7 @@ func (rl *ReplicaLoad) merge(other *ReplicaLoad) {
rl.readKeys.MergeRequestCounts(other.readKeys)
rl.writeBytes.MergeRequestCounts(other.writeBytes)
rl.readBytes.MergeRequestCounts(other.readBytes)
rl.cpuNanos.MergeRequestCounts(other.cpuNanos)
}

// reset will clear all recorded history.
Expand All @@ -72,4 +76,5 @@ func (rl *ReplicaLoad) reset() {
rl.readKeys.ResetRequestCounts()
rl.writeBytes.ResetRequestCounts()
rl.readBytes.ResetRequestCounts()
rl.cpuNanos.ResetRequestCounts()
}
7 changes: 7 additions & 0 deletions pkg/kv/kvserver/replica_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,13 @@ func (r *Replica) ReadBytesPerSecond() float64 {
return rbps
}

// CPUNanosPerSecond tracks the time this replica spent on-processor averaged
// per second.
func (r *Replica) CPUNanosPerSecond() float64 {
cpus, _ := r.loadStats.cpuNanos.AverageRatePerSecond()
return cpus
}

func (r *Replica) needsSplitBySizeRLocked() bool {
exceeded, _ := r.exceedsMultipleOfSplitSizeRLocked(1)
return exceeded
Expand Down
7 changes: 7 additions & 0 deletions pkg/kv/kvserver/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -3212,6 +3212,7 @@ func (s *Store) updateReplicationGauges(ctx context.Context) error {
averageWritesPerSecond float64
averageReadBytesPerSecond float64
averageWriteBytesPerSecond float64
averageCPUNanosPerSecond float64

rangeCount int64
unavailableRangeCount int64
Expand Down Expand Up @@ -3309,6 +3310,10 @@ func (s *Store) updateReplicationGauges(ctx context.Context) error {
if wbps, dur := rep.loadStats.writeBytes.AverageRatePerSecond(); dur >= replicastats.MinStatsDuration {
averageWriteBytesPerSecond += wbps
}
if nps, dur := rep.loadStats.cpuNanos.AverageRatePerSecond(); dur >= replicastats.MinStatsDuration {
averageCPUNanosPerSecond += nps
}

locks += metrics.LockTableMetrics.Locks
totalLockHoldDurationNanos += metrics.LockTableMetrics.TotalLockHoldDurationNanos
locksWithWaitQueues += metrics.LockTableMetrics.LocksWithWaitQueues
Expand Down Expand Up @@ -3344,6 +3349,7 @@ func (s *Store) updateReplicationGauges(ctx context.Context) error {
s.metrics.AverageReadsPerSecond.Update(averageReadsPerSecond)
s.metrics.AverageReadBytesPerSecond.Update(averageReadBytesPerSecond)
s.metrics.AverageWriteBytesPerSecond.Update(averageWriteBytesPerSecond)
s.metrics.AverageNanosPerSecond.Update(averageCPUNanosPerSecond)
s.recordNewPerSecondStats(averageQueriesPerSecond, averageWritesPerSecond)

s.metrics.RangeCount.Update(rangeCount)
Expand Down Expand Up @@ -3547,6 +3553,7 @@ type HotReplicaInfo struct {
WriteKeysPerSecond float64
WriteBytesPerSecond float64
ReadBytesPerSecond float64
CPUNanosPerSecond float64
}

// HottestReplicas returns the hottest replicas on a store, sorted by their
Expand Down
11 changes: 11 additions & 0 deletions pkg/kv/kvserver/store_raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/rpc"
"github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb"
"github.com/cockroachdb/cockroach/pkg/util/grunning"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/mon"
"github.com/cockroachdb/cockroach/pkg/util/stop"
Expand Down Expand Up @@ -346,6 +347,12 @@ func (s *Store) withReplicaForRequest(
func (s *Store) processRaftRequestWithReplica(
ctx context.Context, r *Replica, req *kvserverpb.RaftMessageRequest,
) *roachpb.Error {

// Record the CPU time processing the request for this replica. This is
// recorded regardless of errors that are encountered.
grunningStart := grunning.Time()
defer r.RecordNanosRunning(grunning.Difference(grunningStart, grunning.Time()).Nanoseconds())

if verboseRaftLoggingEnabled() {
log.Infof(ctx, "incoming raft message:\n%s", raftDescribeMessage(req.Message, raftEntryFormatter))
}
Expand Down Expand Up @@ -633,9 +640,11 @@ func (s *Store) processReady(rangeID roachpb.RangeID) {
ctx := r.raftCtx
stats, err := r.handleRaftReady(ctx, noSnap)
maybeFatalOnRaftReadyErr(ctx, err)
grunningStart := grunning.Time()
elapsed := stats.tEnd.Sub(stats.tBegin)
s.metrics.RaftWorkingDurationNanos.Inc(elapsed.Nanoseconds())
s.metrics.RaftHandleReadyLatency.RecordValue(elapsed.Nanoseconds())
r.RecordNanosRunning(grunning.Difference(grunningStart, grunning.Time()).Nanoseconds())
// Warn if Raft processing took too long. We use the same duration as we
// use for warning about excessive raft mutex lock hold times. Long
// processing time means we'll have starved local replicas of ticks and
Expand All @@ -654,12 +663,14 @@ func (s *Store) processTick(_ context.Context, rangeID roachpb.RangeID) bool {
ioThresholds := s.ioThresholds.Current()

start := timeutil.Now()
startGRunning := grunning.Time()
ctx := r.raftCtx

exists, err := r.tick(ctx, livenessMap, ioThresholds)
if err != nil {
log.Errorf(ctx, "%v", err)
}
r.RecordNanosRunning(grunning.Difference(startGRunning, grunning.Time()).Nanoseconds())
s.metrics.RaftTickingDurationNanos.Inc(timeutil.Since(start).Nanoseconds())
return exists // ready
}
Expand Down
14 changes: 14 additions & 0 deletions pkg/kv/kvserver/store_send.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvadmission"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/util/grunning"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/limit"
"github.com/cockroachdb/cockroach/pkg/util/log"
Expand Down Expand Up @@ -166,6 +167,19 @@ func (s *Store) SendWithWriteBytes(
log.Eventf(ctx, "executing %s", ba)
}

// Track the time spent on the goroutine, processing this batch.
startGrunning := grunning.Time()
defer func() {
repl, err := s.GetReplica(ba.RangeID)
if err != nil {
return
}
if !repl.IsInitialized() {
return
}
repl.RecordNanosRunning(grunning.Difference(startGrunning, grunning.Time()).Nanoseconds())
}()

// Tracks suggested ranges to return to the caller. Suggested ranges are aggregated from
// two sources.
// (1): On a RangeKeyMismatchError that is retriable.
Expand Down
5 changes: 5 additions & 0 deletions pkg/ts/catalog/chart_catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -686,13 +686,18 @@ var charts = []sectionDescription{
Metrics: []string{"rebalancing.readspersecond"},
},
{

Title: "Bytes Read Per Second",
Metrics: []string{"rebalancing.readbytespersecond"},
},
{
Title: "Bytes Written Per Second",
Metrics: []string{"rebalancing.writebytespersecond"},
},
{
Title: "CPU Nanos Used Per Second",
Metrics: []string{"rebalancing.cpunanospersecond"},
},
},
},
{
Expand Down

0 comments on commit 2063297

Please sign in to comment.