diff --git a/pkg/kv/kvserver/BUILD.bazel b/pkg/kv/kvserver/BUILD.bazel index 1ac027efcad9..aac84ca16801 100644 --- a/pkg/kv/kvserver/BUILD.bazel +++ b/pkg/kv/kvserver/BUILD.bazel @@ -176,6 +176,7 @@ go_library( "//pkg/util/envutil", "//pkg/util/errorutil", "//pkg/util/grpcutil", + "//pkg/util/grunning", "//pkg/util/hlc", "//pkg/util/humanizeutil", "//pkg/util/iterutil", diff --git a/pkg/kv/kvserver/metrics.go b/pkg/kv/kvserver/metrics.go index 368fb07989a9..aba525901d96 100644 --- a/pkg/kv/kvserver/metrics.go +++ b/pkg/kv/kvserver/metrics.go @@ -354,6 +354,12 @@ var ( Measurement: "Bytes/Sec", Unit: metric.Unit_BYTES, } + metaAverageCPUNanosPerSecond = metric.Metadata{ + Name: "rebalancing.cpunanospersecond", + Help: "Average CPU nanoseconds spent on processing replica operations in the last 30 minutes.", + Measurement: "Nanoseconds/Sec", + Unit: metric.Unit_NANOSECONDS, + } // Metric for tracking follower reads. metaFollowerReadsCount = metric.Metadata{ @@ -1745,6 +1751,7 @@ type StoreMetrics struct { AverageRequestsPerSecond *metric.GaugeFloat64 AverageWriteBytesPerSecond *metric.GaugeFloat64 AverageReadBytesPerSecond *metric.GaugeFloat64 + AverageNanosPerSecond *metric.GaugeFloat64 // l0SublevelsWindowedMax doesn't get recorded to metrics itself, it maintains // an ad-hoc history for gosipping information for allocator use. l0SublevelsWindowedMax syncutil.AtomicFloat64 @@ -2286,6 +2293,7 @@ func newStoreMetrics(histogramWindow time.Duration) *StoreMetrics { AverageReadsPerSecond: metric.NewGaugeFloat64(metaAverageReadsPerSecond), AverageWriteBytesPerSecond: metric.NewGaugeFloat64(metaAverageWriteBytesPerSecond), AverageReadBytesPerSecond: metric.NewGaugeFloat64(metaAverageReadBytesPerSecond), + AverageNanosPerSecond: metric.NewGaugeFloat64(metaAverageCPUNanosPerSecond), // Follower reads metrics. FollowerReadsCount: metric.NewCounter(metaFollowerReadsCount), diff --git a/pkg/kv/kvserver/mvcc_gc_queue.go b/pkg/kv/kvserver/mvcc_gc_queue.go index 6c8d99c1368c..c849e00396b2 100644 --- a/pkg/kv/kvserver/mvcc_gc_queue.go +++ b/pkg/kv/kvserver/mvcc_gc_queue.go @@ -30,6 +30,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util" "github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb" "github.com/cockroachdb/cockroach/pkg/util/contextutil" + "github.com/cockroachdb/cockroach/pkg/util/grunning" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/humanizeutil" "github.com/cockroachdb/cockroach/pkg/util/log" @@ -671,6 +672,10 @@ func (mgcq *mvccGCQueue) process( ) (processed bool, err error) { // Lookup the descriptor and GC policy for the zone containing this key range. desc, conf := repl.DescAndSpanConfig() + grunningStart := grunning.Time() + defer func() { + repl.RecordNanosRunning(grunning.Difference(grunningStart, grunning.Time()).Nanoseconds()) + }() // Consult the protected timestamp state to determine whether we can GC and // the timestamp which can be used to calculate the score and updated GC diff --git a/pkg/kv/kvserver/replica.go b/pkg/kv/kvserver/replica.go index 53fb2545d398..2f4d69bffc9a 100644 --- a/pkg/kv/kvserver/replica.go +++ b/pkg/kv/kvserver/replica.go @@ -2052,6 +2052,12 @@ func init() { tracing.RegisterTagRemapping("r", "range") } +// RecordNanosRunning records the given duration against the replica's cpu time +// accounting. +func (r *Replica) RecordNanosRunning(duration int64) { + r.loadStats.cpuNanos.RecordCount(float64(duration), 0 /* nodeID */) +} + // ReadProtectedTimestampsForTesting is for use only by tests to read and update // the Replicas' cached protected timestamp state. func (r *Replica) ReadProtectedTimestampsForTesting(ctx context.Context) (err error) { diff --git a/pkg/kv/kvserver/replica_load.go b/pkg/kv/kvserver/replica_load.go index 00695b708314..1d9f6e3fc7bc 100644 --- a/pkg/kv/kvserver/replica_load.go +++ b/pkg/kv/kvserver/replica_load.go @@ -24,6 +24,7 @@ type ReplicaLoad struct { readKeys *replicastats.ReplicaStats writeBytes *replicastats.ReplicaStats readBytes *replicastats.ReplicaStats + cpuNanos *replicastats.ReplicaStats } // NewReplicaLoad returns a new ReplicaLoad, which may be used to track the @@ -40,6 +41,7 @@ func NewReplicaLoad(clock *hlc.Clock, getNodeLocality replicastats.LocalityOracl readKeys: replicastats.NewReplicaStats(clock, nil), writeBytes: replicastats.NewReplicaStats(clock, nil), readBytes: replicastats.NewReplicaStats(clock, nil), + cpuNanos: replicastats.NewReplicaStats(clock, nil), } } @@ -52,6 +54,7 @@ func (rl *ReplicaLoad) split(other *ReplicaLoad) { rl.readKeys.SplitRequestCounts(other.readKeys) rl.writeBytes.SplitRequestCounts(other.writeBytes) rl.readBytes.SplitRequestCounts(other.readBytes) + rl.cpuNanos.SplitRequestCounts(other.cpuNanos) } // merge will combine the tracked load in other, into the calling struct. @@ -62,6 +65,7 @@ func (rl *ReplicaLoad) merge(other *ReplicaLoad) { rl.readKeys.MergeRequestCounts(other.readKeys) rl.writeBytes.MergeRequestCounts(other.writeBytes) rl.readBytes.MergeRequestCounts(other.readBytes) + rl.cpuNanos.MergeRequestCounts(other.cpuNanos) } // reset will clear all recorded history. @@ -72,4 +76,5 @@ func (rl *ReplicaLoad) reset() { rl.readKeys.ResetRequestCounts() rl.writeBytes.ResetRequestCounts() rl.readBytes.ResetRequestCounts() + rl.cpuNanos.ResetRequestCounts() } diff --git a/pkg/kv/kvserver/replica_metrics.go b/pkg/kv/kvserver/replica_metrics.go index 8cc8156b6acf..fedef15a9367 100644 --- a/pkg/kv/kvserver/replica_metrics.go +++ b/pkg/kv/kvserver/replica_metrics.go @@ -331,6 +331,13 @@ func (r *Replica) ReadBytesPerSecond() float64 { return rbps } +// CPUNanosPerSecond tracks the time this replica spent on-processor averaged +// per second. +func (r *Replica) CPUNanosPerSecond() float64 { + cpus, _ := r.loadStats.cpuNanos.AverageRatePerSecond() + return cpus +} + func (r *Replica) needsSplitBySizeRLocked() bool { exceeded, _ := r.exceedsMultipleOfSplitSizeRLocked(1) return exceeded diff --git a/pkg/kv/kvserver/store.go b/pkg/kv/kvserver/store.go index 1e950873401a..e52603f76e4d 100644 --- a/pkg/kv/kvserver/store.go +++ b/pkg/kv/kvserver/store.go @@ -3212,6 +3212,7 @@ func (s *Store) updateReplicationGauges(ctx context.Context) error { averageWritesPerSecond float64 averageReadBytesPerSecond float64 averageWriteBytesPerSecond float64 + averageCPUNanosPerSecond float64 rangeCount int64 unavailableRangeCount int64 @@ -3309,6 +3310,10 @@ func (s *Store) updateReplicationGauges(ctx context.Context) error { if wbps, dur := rep.loadStats.writeBytes.AverageRatePerSecond(); dur >= replicastats.MinStatsDuration { averageWriteBytesPerSecond += wbps } + if nps, dur := rep.loadStats.cpuNanos.AverageRatePerSecond(); dur >= replicastats.MinStatsDuration { + averageCPUNanosPerSecond += nps + } + locks += metrics.LockTableMetrics.Locks totalLockHoldDurationNanos += metrics.LockTableMetrics.TotalLockHoldDurationNanos locksWithWaitQueues += metrics.LockTableMetrics.LocksWithWaitQueues @@ -3344,6 +3349,7 @@ func (s *Store) updateReplicationGauges(ctx context.Context) error { s.metrics.AverageReadsPerSecond.Update(averageReadsPerSecond) s.metrics.AverageReadBytesPerSecond.Update(averageReadBytesPerSecond) s.metrics.AverageWriteBytesPerSecond.Update(averageWriteBytesPerSecond) + s.metrics.AverageNanosPerSecond.Update(averageCPUNanosPerSecond) s.recordNewPerSecondStats(averageQueriesPerSecond, averageWritesPerSecond) s.metrics.RangeCount.Update(rangeCount) @@ -3547,6 +3553,7 @@ type HotReplicaInfo struct { WriteKeysPerSecond float64 WriteBytesPerSecond float64 ReadBytesPerSecond float64 + CPUNanosPerSecond float64 } // HottestReplicas returns the hottest replicas on a store, sorted by their diff --git a/pkg/kv/kvserver/store_raft.go b/pkg/kv/kvserver/store_raft.go index 51acf460fc74..9518a39556e2 100644 --- a/pkg/kv/kvserver/store_raft.go +++ b/pkg/kv/kvserver/store_raft.go @@ -21,6 +21,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/rpc" "github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb" + "github.com/cockroachdb/cockroach/pkg/util/grunning" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/mon" "github.com/cockroachdb/cockroach/pkg/util/stop" @@ -346,6 +347,12 @@ func (s *Store) withReplicaForRequest( func (s *Store) processRaftRequestWithReplica( ctx context.Context, r *Replica, req *kvserverpb.RaftMessageRequest, ) *roachpb.Error { + + // Record the CPU time processing the request for this replica. This is + // recorded regardless of errors that are encountered. + grunningStart := grunning.Time() + defer r.RecordNanosRunning(grunning.Difference(grunningStart, grunning.Time()).Nanoseconds()) + if verboseRaftLoggingEnabled() { log.Infof(ctx, "incoming raft message:\n%s", raftDescribeMessage(req.Message, raftEntryFormatter)) } @@ -633,9 +640,11 @@ func (s *Store) processReady(rangeID roachpb.RangeID) { ctx := r.raftCtx stats, err := r.handleRaftReady(ctx, noSnap) maybeFatalOnRaftReadyErr(ctx, err) + grunningStart := grunning.Time() elapsed := stats.tEnd.Sub(stats.tBegin) s.metrics.RaftWorkingDurationNanos.Inc(elapsed.Nanoseconds()) s.metrics.RaftHandleReadyLatency.RecordValue(elapsed.Nanoseconds()) + r.RecordNanosRunning(grunning.Difference(grunningStart, grunning.Time()).Nanoseconds()) // Warn if Raft processing took too long. We use the same duration as we // use for warning about excessive raft mutex lock hold times. Long // processing time means we'll have starved local replicas of ticks and @@ -654,12 +663,14 @@ func (s *Store) processTick(_ context.Context, rangeID roachpb.RangeID) bool { ioThresholds := s.ioThresholds.Current() start := timeutil.Now() + startGRunning := grunning.Time() ctx := r.raftCtx exists, err := r.tick(ctx, livenessMap, ioThresholds) if err != nil { log.Errorf(ctx, "%v", err) } + r.RecordNanosRunning(grunning.Difference(startGRunning, grunning.Time()).Nanoseconds()) s.metrics.RaftTickingDurationNanos.Inc(timeutil.Since(start).Nanoseconds()) return exists // ready } diff --git a/pkg/kv/kvserver/store_send.go b/pkg/kv/kvserver/store_send.go index 9431c5595b86..e26d91608531 100644 --- a/pkg/kv/kvserver/store_send.go +++ b/pkg/kv/kvserver/store_send.go @@ -18,6 +18,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvadmission" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/util/grunning" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/limit" "github.com/cockroachdb/cockroach/pkg/util/log" @@ -166,6 +167,19 @@ func (s *Store) SendWithWriteBytes( log.Eventf(ctx, "executing %s", ba) } + // Track the time spent on the goroutine, processing this batch. + startGrunning := grunning.Time() + defer func() { + repl, err := s.GetReplica(ba.RangeID) + if err != nil { + return + } + if !repl.IsInitialized() { + return + } + repl.RecordNanosRunning(grunning.Difference(startGrunning, grunning.Time()).Nanoseconds()) + }() + // Tracks suggested ranges to return to the caller. Suggested ranges are aggregated from // two sources. // (1): On a RangeKeyMismatchError that is retriable. diff --git a/pkg/ts/catalog/chart_catalog.go b/pkg/ts/catalog/chart_catalog.go index e14d41286880..561793e658a1 100644 --- a/pkg/ts/catalog/chart_catalog.go +++ b/pkg/ts/catalog/chart_catalog.go @@ -686,6 +686,7 @@ var charts = []sectionDescription{ Metrics: []string{"rebalancing.readspersecond"}, }, { + Title: "Bytes Read Per Second", Metrics: []string{"rebalancing.readbytespersecond"}, }, @@ -693,6 +694,10 @@ var charts = []sectionDescription{ Title: "Bytes Written Per Second", Metrics: []string{"rebalancing.writebytespersecond"}, }, + { + Title: "CPU Nanos Used Per Second", + Metrics: []string{"rebalancing.cpunanospersecond"}, + }, }, }, {