diff --git a/pkg/kv/kvserver/asim/state/split_decider.go b/pkg/kv/kvserver/asim/state/split_decider.go index 822ea36c1b5d..b30a3848214f 100644 --- a/pkg/kv/kvserver/asim/state/split_decider.go +++ b/pkg/kv/kvserver/asim/state/split_decider.go @@ -46,7 +46,9 @@ type loadSplitConfig struct { // NewLoadBasedSplitter returns a new LoadBasedSplitter that may be used to // find the midpoint based on recorded load. -func (lsc loadSplitConfig) NewLoadBasedSplitter(startTime time.Time) split.LoadBasedSplitter { +func (lsc loadSplitConfig) NewLoadBasedSplitter( + startTime time.Time, _ split.SplitObjective, +) split.LoadBasedSplitter { return split.NewUnweightedFinder(startTime, lsc.randSource) } @@ -57,7 +59,7 @@ func (lsc loadSplitConfig) StatRetention() time.Duration { // StatThreshold returns the threshold for load above which the range // should be considered split. -func (lsc loadSplitConfig) StatThreshold() float64 { +func (lsc loadSplitConfig) StatThreshold(_ split.SplitObjective) float64 { return lsc.settings.SplitQPSThreshold } @@ -85,7 +87,7 @@ func (s *SplitDecider) newDecider() *split.Decider { split.Init(decider, s.splitConfig, &split.LoadSplitterMetrics{ PopularKeyCount: metric.NewCounter(metric.Metadata{}), NoSplitKeyCount: metric.NewCounter(metric.Metadata{}), - }) + }, split.SplitQPS) return decider } @@ -100,11 +102,13 @@ func (s *SplitDecider) Record(tick time.Time, rangeID RangeID, le workload.LoadE } qps := LoadEventQPS(le) - shouldSplit := decider.Record(context.Background(), tick, int(qps), func() roachpb.Span { - return roachpb.Span{ - Key: Key(le.Key).ToRKey().AsRawKey(), - } - }) + shouldSplit := decider.Record( + context.Background(), + tick, + func(_ split.SplitObjective) int { return int(qps) }, + func() roachpb.Span { + return roachpb.Span{Key: Key(le.Key).ToRKey().AsRawKey()} + }) if shouldSplit { s.suggestions = append(s.suggestions, rangeID) diff --git a/pkg/kv/kvserver/merge_queue.go b/pkg/kv/kvserver/merge_queue.go index 1f5097d998f7..3b686e62861a 100644 --- a/pkg/kv/kvserver/merge_queue.go +++ b/pkg/kv/kvserver/merge_queue.go @@ -19,6 +19,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/kv/kvpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/split" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/spanconfig" @@ -185,7 +186,12 @@ var _ PurgatoryError = rangeMergePurgatoryError{} func (mq *mergeQueue) requestRangeStats( ctx context.Context, key roachpb.Key, -) (desc *roachpb.RangeDescriptor, stats enginepb.MVCCStats, ls loadSplitStat, err error) { +) ( + desc *roachpb.RangeDescriptor, + stats enginepb.MVCCStats, + lbSnap split.LoadSplitSnapshot, + err error, +) { ba := &kvpb.BatchRequest{} ba.Add(&kvpb.RangeStatsRequest{ @@ -194,7 +200,7 @@ func (mq *mergeQueue) requestRangeStats( br, pErr := mq.db.NonTransactionalSender().Send(ctx, ba) if pErr != nil { - return nil, enginepb.MVCCStats{}, loadSplitStat{}, pErr.GoError() + return nil, enginepb.MVCCStats{}, lbSnap, pErr.GoError() } res := br.Responses[0].GetInner().(*kvpb.RangeStatsResponse) @@ -202,23 +208,28 @@ func (mq *mergeQueue) requestRangeStats( stats = res.MVCCStats // The load based splitter will only track the max of at most one statistic - // at a time for load based splitting. This is either CPU or QPS. However - // we don't enforce that only one max stat is returned. If the split - // objective is currently CPU, it must be the case that every store in the - // cluster is running a version that populates MaxCPUPerSecond so we don't - // need to worry about it being 0 as default and passing the >= 0 check. - switch mq.store.splitConfig.SplitObjective() { - case SplitCPU: - ls.max = res.MaxCPUPerSecond - ls.ok = res.MaxCPUPerSecond >= 0 - ls.typ = SplitCPU - case SplitQPS: - ls.max = res.MaxQueriesPerSecond - ls.ok = res.MaxQueriesPerSecond >= 0 - ls.typ = SplitQPS + // at a time for load based splitting. This is either CPU or QPS. However we + // don't enforce that only one max stat is returned. We set QPS after CPU, + // possibly overwriting if both are set. A default value of 0 could be + // returned in the response if the rhs node is on a pre 23.1 version. To + // avoid merging the range due to low load (0 CPU), always overwrite the + // value if MaxQPS is set. If neither are >= 0, OK won't be set and the + // objective will be default value, QPS. + if res.MaxCPUPerSecond >= 0 { + lbSnap = split.LoadSplitSnapshot{ + SplitObjective: split.SplitCPU, + Max: res.MaxCPUPerSecond, + Ok: true, + } } - - return desc, stats, ls, nil + if res.MaxQueriesPerSecond >= 0 { + lbSnap = split.LoadSplitSnapshot{ + SplitObjective: split.SplitQPS, + Max: res.MaxQueriesPerSecond, + Ok: true, + } + } + return desc, stats, lbSnap, nil } func (mq *mergeQueue) process( @@ -231,7 +242,6 @@ func (mq *mergeQueue) process( lhsDesc := lhsRepl.Desc() lhsStats := lhsRepl.GetMVCCStats() - lhsLoadSplitStat := lhsRepl.loadSplitStat(ctx) minBytes := lhsRepl.GetMinBytes() if lhsStats.Total() >= minBytes { log.VEventf(ctx, 2, "skipping merge: LHS meets minimum size threshold %d with %d bytes", @@ -239,7 +249,7 @@ func (mq *mergeQueue) process( return false, nil } - rhsDesc, rhsStats, rhsLoadSplitStat, err := mq.requestRangeStats(ctx, lhsDesc.EndKey.AsRawKey()) + rhsDesc, rhsStats, rhsLoadSplitSnap, err := mq.requestRangeStats(ctx, lhsDesc.EndKey.AsRawKey()) if err != nil { return false, err } @@ -265,11 +275,12 @@ func (mq *mergeQueue) process( mergedStats := lhsStats mergedStats.Add(rhsStats) + lhsLoadSplitSnap := lhsRepl.loadBasedSplitter.Snapshot(ctx, now.ToTimestamp().GoTime()) var loadMergeReason string if lhsRepl.SplitByLoadEnabled() { var canMergeLoad bool if canMergeLoad, loadMergeReason = canMergeRangeLoad( - ctx, lhsLoadSplitStat, rhsLoadSplitStat, mq.store.splitConfig, + ctx, lhsLoadSplitSnap, rhsLoadSplitSnap, ); !canMergeLoad { log.VEventf(ctx, 2, "skipping merge to avoid thrashing: merged range %s may split %s", mergedDesc, loadMergeReason) @@ -415,7 +426,7 @@ func (mq *mergeQueue) process( // Adjust the splitter to account for the additional load from the RHS. We // could just Reset the splitter, but then we'd need to wait out a full // measurement period (default of 5m) before merging this range again. - if mergedLoadSplitStat := lhsLoadSplitStat.max + rhsLoadSplitStat.max; mergedLoadSplitStat != 0 { + if mergedLoadSplitStat := lhsLoadSplitSnap.Max + rhsLoadSplitSnap.Max; mergedLoadSplitStat != 0 { lhsRepl.loadBasedSplitter.RecordMax(mq.store.Clock().PhysicalTime(), mergedLoadSplitStat) } return true, nil @@ -439,7 +450,7 @@ func (mq *mergeQueue) updateChan() <-chan time.Time { } func canMergeRangeLoad( - ctx context.Context, lhs, rhs loadSplitStat, rsc *replicaSplitConfig, + ctx context.Context, lhs, rhs split.LoadSplitSnapshot, ) (can bool, reason string) { // When load is a consideration for splits and, by extension, merges, the // mergeQueue is fairly conservative. In an effort to avoid thrashing and to @@ -451,10 +462,10 @@ func canMergeRangeLoad( // maximum qps measurement from both sides to be sufficiently stable and // reliable, meaning that it was a maximum measurement over some extended // period of time. - if !lhs.ok { + if !lhs.Ok { return false, "LHS load measurement not yet reliable" } - if !rhs.ok { + if !rhs.Ok { return false, "RHS load measurement not yet reliable" } @@ -462,36 +473,36 @@ func canMergeRangeLoad( // the current split objective they cannot merge together. This could occur // just after changing the split objective to a different value, where // there is a mismatch. - splitObjective, splitThreshold := rsc.SplitObjective(), rsc.StatThreshold() - if lhs.typ != splitObjective { - return false, "LHS load measurement is a different type (%s) than current split objective (%s)" - } - if rhs.typ != splitObjective { - return false, "RHS load measurement is a different type (%s) than current split objective (%s)" + if lhs.SplitObjective != rhs.SplitObjective { + return false, fmt.Sprintf("LHS load measurement is a different type (%s) than the RHS (%s)", + lhs.SplitObjective, + rhs.SplitObjective, + ) } + obj := lhs.SplitObjective // Check if the merged range would need to be split, if so, skip merge. // Use a lower threshold for load based splitting so we don't find ourselves // in a situation where we keep merging ranges that would be split soon after // by a small increase in load. - merged := lhs.max + rhs.max - conservativeLoadBasedSplitThreshold := 0.5 * splitThreshold + merged := lhs.Max + rhs.Max + conservativeLoadBasedSplitThreshold := 0.5 * lhs.Threshold if merged >= conservativeLoadBasedSplitThreshold { return false, fmt.Sprintf("lhs+rhs %s (%s+%s=%s) above threshold (%s)", - splitObjective, - splitObjective.Format(lhs.max), - splitObjective.Format(lhs.max), - splitObjective.Format(merged), - splitObjective.Format(conservativeLoadBasedSplitThreshold), + obj, + obj.Format(lhs.Max), + obj.Format(rhs.Max), + obj.Format(merged), + obj.Format(conservativeLoadBasedSplitThreshold), ) } return true, fmt.Sprintf("lhs+rhs %s (%s+%s=%s) below threshold (%s)", - splitObjective, - splitObjective.Format(lhs.max), - splitObjective.Format(lhs.max), - splitObjective.Format(merged), - splitObjective.Format(conservativeLoadBasedSplitThreshold), + obj, + obj.Format(lhs.Max), + obj.Format(rhs.Max), + obj.Format(merged), + obj.Format(conservativeLoadBasedSplitThreshold), ) } diff --git a/pkg/kv/kvserver/replica.go b/pkg/kv/kvserver/replica.go index a523d1985f59..007a9d70d8ac 100644 --- a/pkg/kv/kvserver/replica.go +++ b/pkg/kv/kvserver/replica.go @@ -1301,10 +1301,12 @@ func (r *Replica) SetMVCCStatsForTesting(stats *enginepb.MVCCStats) { // // Use LoadStats.QueriesPerSecond for all other purposes. func (r *Replica) GetMaxSplitQPS(ctx context.Context) (float64, bool) { - if r.store.splitConfig.SplitObjective() != SplitQPS { + snap := r.loadBasedSplitter.Snapshot(ctx, r.Clock().PhysicalTime()) + + if snap.SplitObjective != split.SplitQPS { return 0, false } - return r.loadBasedSplitter.MaxStat(ctx, r.Clock().PhysicalTime()) + return snap.Max, snap.Ok } // GetMaxSplitCPU returns the Replica's maximum CPU/s rate over a configured @@ -1317,10 +1319,12 @@ func (r *Replica) GetMaxSplitQPS(ctx context.Context) (float64, bool) { // Use LoadStats.RaftCPUNanosPerSecond and RequestCPUNanosPerSecond for current // CPU stats for all other purposes. func (r *Replica) GetMaxSplitCPU(ctx context.Context) (float64, bool) { - if r.store.splitConfig.SplitObjective() != SplitCPU { + snap := r.loadBasedSplitter.Snapshot(ctx, r.Clock().PhysicalTime()) + + if snap.SplitObjective != split.SplitCPU { return 0, false } - return r.loadBasedSplitter.MaxStat(ctx, r.Clock().PhysicalTime()) + return snap.Max, snap.Ok } // ContainsKey returns whether this range contains the specified key. diff --git a/pkg/kv/kvserver/replica_init.go b/pkg/kv/kvserver/replica_init.go index 6bb104457c74..2407d55eab3e 100644 --- a/pkg/kv/kvserver/replica_init.go +++ b/pkg/kv/kvserver/replica_init.go @@ -107,11 +107,6 @@ func newUninitializedReplica( r.mu.stateLoader = stateloader.Make(rangeID) r.mu.quiescent = true r.mu.conf = store.cfg.DefaultSpanConfig - split.Init( - &r.loadBasedSplitter, - store.splitConfig, - store.metrics.LoadSplitterMetrics, - ) r.mu.proposals = map[kvserverbase.CmdIDKey]*ProposalData{} r.mu.checksums = map[uuid.UUID]*replicaChecksum{} @@ -123,8 +118,15 @@ func newUninitializedReplica( if leaseHistoryMaxEntries > 0 { r.leaseHistory = newLeaseHistory() } + if store.cfg.StorePool != nil { r.loadStats = load.NewReplicaLoad(store.Clock(), store.cfg.StorePool.GetNodeLocalityString) + split.Init( + &r.loadBasedSplitter, + newReplicaSplitConfig(store.ClusterSettings()), + store.metrics.LoadSplitterMetrics, + store.rebalanceObjManager.Objective().ToSplitObjective(), + ) } // NB: the state will be loaded when the replica gets initialized. diff --git a/pkg/kv/kvserver/replica_split_load.go b/pkg/kv/kvserver/replica_split_load.go index 035cf82be008..dae2a327a80e 100644 --- a/pkg/kv/kvserver/replica_split_load.go +++ b/pkg/kv/kvserver/replica_split_load.go @@ -21,7 +21,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/settings/cluster" - "github.com/cockroachdb/cockroach/pkg/util/humanizeutil" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/errors" @@ -61,80 +60,39 @@ var SplitByLoadCPUThreshold = settings.RegisterDurationSetting( 500*time.Millisecond, ).WithPublic() -// SplitObjective is a type that specifies a load based splitting objective. -type SplitObjective int - -const ( - // SplitQPS will track and split QPS (queries-per-second) over a range. - SplitQPS SplitObjective = iota - // SplitCPU will track and split CPU (cpu-per-second) over a range. - SplitCPU -) - -// String returns a human readable string representation of the dimension. -func (d SplitObjective) String() string { - switch d { - case SplitQPS: - return "qps" - case SplitCPU: - return "cpu" - default: - panic(fmt.Sprintf("cannot name: unknown objective with ordinal %d", d)) - } -} - -// Format returns a formatted string for a value. -func (d SplitObjective) Format(value float64) string { - switch d { - case SplitQPS: - return fmt.Sprintf("%.1f", value) - case SplitCPU: - return string(humanizeutil.Duration(time.Duration(int64(value)))) +func (obj LBRebalancingObjective) ToSplitObjective() split.SplitObjective { + switch obj { + case LBRebalancingQueries: + return split.SplitQPS + case LBRebalancingCPU: + return split.SplitCPU default: - panic(fmt.Sprintf("cannot format value: unknown objective with ordinal %d", d)) + panic(fmt.Sprintf("unknown objective %d", obj)) } } // replicaSplitConfig implements the split.SplitConfig interface. type replicaSplitConfig struct { - randSource split.RandSource - rebalanceObjectiveProvider RebalanceObjectiveProvider - st *cluster.Settings + randSource split.RandSource + st *cluster.Settings } -func newReplicaSplitConfig( - st *cluster.Settings, rebalanceObjectiveProvider RebalanceObjectiveProvider, -) *replicaSplitConfig { +func newReplicaSplitConfig(st *cluster.Settings) *replicaSplitConfig { return &replicaSplitConfig{ - randSource: split.GlobalRandSource(), - rebalanceObjectiveProvider: rebalanceObjectiveProvider, - st: st, - } -} - -// SplitObjective returns the current split objective. Currently this tracks -// 1:1 to the rebalance objective e.g. balancing QPS means also load based -// splitting on QPS. -func (c *replicaSplitConfig) SplitObjective() SplitObjective { - obj := c.rebalanceObjectiveProvider.Objective() - switch obj { - case LBRebalancingQueries: - return SplitQPS - case LBRebalancingCPU: - return SplitCPU - default: - panic(errors.AssertionFailedf("Unkown split objective %d", obj)) + randSource: split.GlobalRandSource(), + st: st, } } // NewLoadBasedSplitter returns a new LoadBasedSplitter that may be used to // find the midpoint based on recorded load. -func (c *replicaSplitConfig) NewLoadBasedSplitter(startTime time.Time) split.LoadBasedSplitter { - obj := c.SplitObjective() +func (c *replicaSplitConfig) NewLoadBasedSplitter( + startTime time.Time, obj split.SplitObjective, +) split.LoadBasedSplitter { switch obj { - case SplitQPS: + case split.SplitQPS: return split.NewUnweightedFinder(startTime, c.randSource) - case SplitCPU: + case split.SplitCPU: return split.NewWeightedFinder(startTime, c.randSource) default: panic(errors.AssertionFailedf("Unkown rebalance objective %d", obj)) @@ -148,12 +106,11 @@ func (c *replicaSplitConfig) StatRetention() time.Duration { // StatThreshold returns the threshold for load above which the range should be // considered split. -func (c *replicaSplitConfig) StatThreshold() float64 { - obj := c.SplitObjective() +func (c *replicaSplitConfig) StatThreshold(obj split.SplitObjective) float64 { switch obj { - case SplitQPS: + case split.SplitQPS: return float64(SplitByLoadQPSThreshold.Get(&c.st.SV)) - case SplitCPU: + case split.SplitCPU: return float64(SplitByLoadCPUThreshold.Get(&c.st.SV)) default: panic(errors.AssertionFailedf("Unkown rebalance objective %d", obj)) @@ -168,22 +125,6 @@ func (r *Replica) SplitByLoadEnabled() bool { !r.store.TestingKnobs().DisableLoadBasedSplitting } -type loadSplitStat struct { - max float64 - ok bool - typ SplitObjective -} - -func (r *Replica) loadSplitStat(ctx context.Context) loadSplitStat { - max, ok := r.loadBasedSplitter.MaxStat(ctx, r.Clock().PhysicalTime()) - lss := loadSplitStat{ - max: max, - ok: ok, - typ: r.store.splitConfig.SplitObjective(), - } - return lss -} - // getResponseBoundarySpan computes the union span of the true spans that were // iterated over using the request span and the response's resumeSpan. // @@ -260,7 +201,7 @@ func getResponseBoundarySpan( // recordBatchForLoadBasedSplitting records the batch's spans to be considered // for load based splitting. func (r *Replica) recordBatchForLoadBasedSplitting( - ctx context.Context, ba *kvpb.BatchRequest, br *kvpb.BatchResponse, stat int, + ctx context.Context, ba *kvpb.BatchRequest, br *kvpb.BatchResponse, cpu int, ) { if !r.SplitByLoadEnabled() { return @@ -278,15 +219,18 @@ func (r *Replica) recordBatchForLoadBasedSplitting( len(ba.Requests), len(br.Responses)) } - // When QPS splitting is enabled, use the number of requests rather than - // the given stat for recording load. - if r.store.splitConfig.SplitObjective() == SplitQPS { - stat = len(ba.Requests) - } + shouldInitSplit := r.loadBasedSplitter.Record(ctx, timeutil.Now(), + func(obj split.SplitObjective) int { + switch obj { + case split.SplitCPU: + return cpu + default: + return len(ba.Requests) + } + }, func() roachpb.Span { + return getResponseBoundarySpan(ba, br) + }) - shouldInitSplit := r.loadBasedSplitter.Record(ctx, timeutil.Now(), stat, func() roachpb.Span { - return getResponseBoundarySpan(ba, br) - }) if shouldInitSplit { r.store.splitQueue.MaybeAddAsync(ctx, r, r.store.Clock().NowAsClockTimestamp()) } diff --git a/pkg/kv/kvserver/split/BUILD.bazel b/pkg/kv/kvserver/split/BUILD.bazel index 514c4112e635..26de0dd675e3 100644 --- a/pkg/kv/kvserver/split/BUILD.bazel +++ b/pkg/kv/kvserver/split/BUILD.bazel @@ -5,6 +5,7 @@ go_library( name = "split", srcs = [ "decider.go", + "objective.go", "unweighted_finder.go", "weighted_finder.go", ], @@ -13,6 +14,7 @@ go_library( deps = [ "//pkg/keys", "//pkg/roachpb", + "//pkg/util/humanizeutil", "//pkg/util/log", "//pkg/util/metric", "//pkg/util/syncutil", diff --git a/pkg/kv/kvserver/split/decider.go b/pkg/kv/kvserver/split/decider.go index e13e89c302be..ac9402a86578 100644 --- a/pkg/kv/kvserver/split/decider.go +++ b/pkg/kv/kvserver/split/decider.go @@ -56,12 +56,12 @@ type LoadBasedSplitter interface { type LoadSplitConfig interface { // NewLoadBasedSplitter returns a new LoadBasedSplitter that may be used to // find the midpoint based on recorded load. - NewLoadBasedSplitter(time.Time) LoadBasedSplitter + NewLoadBasedSplitter(time.Time, SplitObjective) LoadBasedSplitter // StatRetention returns the duration that recorded load is to be retained. StatRetention() time.Duration // StatThreshold returns the threshold for load above which the range // should be considered split. - StatThreshold() float64 + StatThreshold(SplitObjective) float64 } type RandSource interface { @@ -95,17 +95,17 @@ func GlobalRandSource() RandSource { return globalRandSource{} } -// A Decider collects measurements about the activity (measured in qps) on a -// Replica and, assuming that qps thresholds are exceeded, tries to determine a +// A Decider collects measurements about the load activity on a +// Replica and, assuming that load thresholds are exceeded, tries to determine a // split key that would approximately result in halving the load on each of the // resultant ranges. Similarly, these measurements are used to determine when a // range is serving sufficiently little load, such that it should be allowed to // merge with its left or right hand neighbor. // // Operations should call `Record` with a current timestamp. Operation counts -// are aggregated over a second and a QPS is computed. +// are aggregated over a second and a load-per-second is computed. // -// If the QPS is above a threshold, a split finder is instantiated and the spans +// If the load is above a threshold, a split finder is instantiated and the spans // supplied to Record are sampled for a duration (on the order of ten seconds). // Assuming that load consistently remains over threshold, and the workload // touches a diverse enough set of keys to benefit from a split, sampling will @@ -114,12 +114,17 @@ func GlobalRandSource() RandSource { // (which may have disappeared either due to a drop in qps or a change in the // workload). // -// These second-long QPS samples are also aggregated together to track the -// maximum historical QPS over a configurable retention period. This maximum QPS -// measurement, which is accessible through the MaxQPS method, can be used to +// These second-long load samples are also aggregated together to track the +// maximum historical load over a configurable retention period. This maximum load +// measurement, which is accessible through the MaxStat method, can be used to // prevent load-based splits from being merged away until the resulting ranges -// have consistently remained below a certain QPS threshold for a sufficiently +// have consistently remained below a certain load threshold for a sufficiently // long period of time. +// +// The Decider also maintains ownership of the SplitObjective. The +// SplitObjective controls which load stat threshold and split finder are used. +// We keep the SplitObjective under the finder mutex to prevent inconsistency +// that could result from separate calls to the decider, then split objective. // LoadSplitterMetrics consists of metrics for load-based splitter split key. type LoadSplitterMetrics struct { @@ -127,15 +132,16 @@ type LoadSplitterMetrics struct { NoSplitKeyCount *metric.Counter } -// Decider tracks the latest QPS and if certain conditions are met, records +// Decider tracks the latest load and if certain conditions are met, records // incoming requests to find potential split keys and checks if sampled // candidate split keys satisfy certain requirements. type Decider struct { - config LoadSplitConfig // supplied to Init loadSplitterMetrics *LoadSplitterMetrics // supplied to Init + config LoadSplitConfig // supplied to Init mu struct { syncutil.Mutex + objective SplitObjective // supplied to Init // Fields tracking the current qps sample. lastStatRollover time.Time // most recent time recorded by requests. @@ -158,9 +164,15 @@ type Decider struct { // embedding the Decider into a larger struct outside of the scope of this package // without incurring a pointer reference. This is relevant since many Deciders // may exist in the system at any given point in time. -func Init(lbs *Decider, config LoadSplitConfig, loadSplitterMetrics *LoadSplitterMetrics) { +func Init( + lbs *Decider, + config LoadSplitConfig, + loadSplitterMetrics *LoadSplitterMetrics, + objective SplitObjective, +) { lbs.loadSplitterMetrics = loadSplitterMetrics lbs.config = config + lbs.mu.objective = objective } // Record notifies the Decider that 'n' operations are being carried out which @@ -171,11 +183,13 @@ func Init(lbs *Decider, config LoadSplitConfig, loadSplitterMetrics *LoadSplitte // If the returned boolean is true, a split key is available (though it may // disappear as more keys are sampled) and should be initiated by the caller, // which can call MaybeSplitKey to retrieve the suggested key. -func (d *Decider) Record(ctx context.Context, now time.Time, n int, span func() roachpb.Span) bool { +func (d *Decider) Record( + ctx context.Context, now time.Time, load func(SplitObjective) int, span func() roachpb.Span, +) bool { d.mu.Lock() defer d.mu.Unlock() - return d.recordLocked(ctx, now, n, span) + return d.recordLocked(ctx, now, load(d.mu.objective), span) } func (d *Decider) recordLocked( @@ -203,9 +217,9 @@ func (d *Decider) recordLocked( // begin to Record requests so it can find a split point. If a // splitFinder already exists, we check if a split point is ready // to be used. - if d.mu.lastStatVal >= d.config.StatThreshold() { + if d.mu.lastStatVal >= d.config.StatThreshold(d.mu.objective) { if d.mu.splitFinder == nil { - d.mu.splitFinder = d.config.NewLoadBasedSplitter(now) + d.mu.splitFinder = d.config.NewLoadBasedSplitter(now, d.mu.objective) } } else { d.mu.splitFinder = nil @@ -258,6 +272,10 @@ func (d *Decider) LastStat(ctx context.Context, now time.Time) float64 { d.mu.Lock() defer d.mu.Unlock() + return d.lastStatLocked(ctx, now) +} + +func (d *Decider) lastStatLocked(ctx context.Context, now time.Time) float64 { d.recordLocked(ctx, now, 0, nil) // force stat computation return d.mu.lastStatVal } @@ -269,6 +287,10 @@ func (d *Decider) MaxStat(ctx context.Context, now time.Time) (float64, bool) { d.mu.Lock() defer d.mu.Unlock() + return d.maxStatLocked(ctx, now) +} + +func (d *Decider) maxStatLocked(ctx context.Context, now time.Time) (float64, bool) { d.recordLocked(ctx, now, 0, nil) // force stat computation return d.mu.maxStat.max(now, d.config.StatRetention()) } @@ -332,6 +354,10 @@ func (d *Decider) Reset(now time.Time) { d.mu.Lock() defer d.mu.Unlock() + d.resetLocked(now) +} + +func (d *Decider) resetLocked(now time.Time) { d.mu.lastStatRollover = time.Time{} d.mu.lastStatVal = 0 d.mu.count = 0 @@ -341,6 +367,49 @@ func (d *Decider) Reset(now time.Time) { d.mu.lastNoSplitKeyLoggingMetrics = time.Time{} } +// SetSplitObjective sets the decider split objective to the given value and +// discards any existing state. +func (d *Decider) SetSplitObjective(now time.Time, obj SplitObjective) { + d.mu.Lock() + defer d.mu.Unlock() + + d.mu.objective = obj + d.resetLocked(now) +} + +// LoadSplitSnapshot contains a consistent snapshot of the decider state. It +// should be used when comparing the threshold, last value, max value or split +// objective; it is possible for inconsistent values otherwise e.g. +// p1 max_stat = decider.MaxStat() +// p2 decider.SetSplitObjective(...) +// p1 threshold = decider.threshold(..) (doesn't exist for this reason) +// +// p1 then asserts that max_stat < threshold, however the threhsold value will +// be in terms of the split objective set by p2; which could be widly wrong. +type LoadSplitSnapshot struct { + SplitObjective + Max, Last, Threshold float64 + Ok bool +} + +// Snapshot returns a consistent snapshot of the decider state. +func (d *Decider) Snapshot(ctx context.Context, now time.Time) LoadSplitSnapshot { + d.mu.Lock() + defer d.mu.Unlock() + + maxStat, ok := d.maxStatLocked(ctx, now) + lastStat := d.lastStatLocked(ctx, now) + threshold := d.config.StatThreshold(d.mu.objective) + + return LoadSplitSnapshot{ + SplitObjective: d.mu.objective, + Max: maxStat, + Last: lastStat, + Ok: ok, + Threshold: threshold, + } +} + // maxStatTracker collects a series of stat per-second measurement samples and // tracks the maximum observed over a period of time. // diff --git a/pkg/kv/kvserver/split/decider_test.go b/pkg/kv/kvserver/split/decider_test.go index 8b362d6748b4..80a7aede38f3 100644 --- a/pkg/kv/kvserver/split/decider_test.go +++ b/pkg/kv/kvserver/split/decider_test.go @@ -37,7 +37,9 @@ type testLoadSplitConfig struct { // NewLoadBasedSplitter returns a new LoadBasedSplitter that may be used to // find the midpoint based on recorded load. -func (t *testLoadSplitConfig) NewLoadBasedSplitter(startTime time.Time) LoadBasedSplitter { +func (t *testLoadSplitConfig) NewLoadBasedSplitter( + startTime time.Time, _ SplitObjective, +) LoadBasedSplitter { if t.useWeighted { return NewWeightedFinder(startTime, t.randSource) } @@ -51,10 +53,16 @@ func (t *testLoadSplitConfig) StatRetention() time.Duration { // StatThreshold returns the threshold for load above which the range // should be considered split. -func (t *testLoadSplitConfig) StatThreshold() float64 { +func (t *testLoadSplitConfig) StatThreshold(_ SplitObjective) float64 { return t.statThreshold } +func ld(n int) func(SplitObjective) int { + return func(_ SplitObjective) int { + return n + } +} + func ms(i int) time.Time { ts, err := time.Parse(time.RFC3339, "2000-01-01T00:00:00Z") if err != nil { @@ -78,7 +86,9 @@ func TestDecider(t *testing.T) { Init(&d, &loadSplitConfig, &LoadSplitterMetrics{ PopularKeyCount: metric.NewCounter(metric.Metadata{}), NoSplitKeyCount: metric.NewCounter(metric.Metadata{}), - }) + }, + SplitQPS, + ) op := func(s string) func() roachpb.Span { return func() roachpb.Span { return roachpb.Span{Key: roachpb.Key(s)} } @@ -97,27 +107,27 @@ func TestDecider(t *testing.T) { assert.Equal(t, expOK, ok) } - assert.Equal(t, false, d.Record(context.Background(), ms(100), 1, nil)) + assert.Equal(t, false, d.Record(context.Background(), ms(100), ld(1), nil)) assertStat(100, 0) assertMaxStat(100, 0, false) assert.Equal(t, ms(100), d.mu.lastStatRollover) assert.EqualValues(t, 1, d.mu.count) - assert.Equal(t, false, d.Record(context.Background(), ms(400), 3, nil)) + assert.Equal(t, false, d.Record(context.Background(), ms(400), ld(3), nil)) assertStat(100, 0) assertStat(700, 0) assertMaxStat(400, 0, false) - assert.Equal(t, false, d.Record(context.Background(), ms(300), 3, nil)) + assert.Equal(t, false, d.Record(context.Background(), ms(300), ld(3), nil)) assertStat(100, 0) assertMaxStat(300, 0, false) - assert.Equal(t, false, d.Record(context.Background(), ms(900), 1, nil)) + assert.Equal(t, false, d.Record(context.Background(), ms(900), ld(1), nil)) assertStat(0, 0) assertMaxStat(900, 0, false) - assert.Equal(t, false, d.Record(context.Background(), ms(1099), 1, nil)) + assert.Equal(t, false, d.Record(context.Background(), ms(1099), ld(1), nil)) assertStat(0, 0) assertMaxStat(1099, 0, false) @@ -126,18 +136,18 @@ func TestDecider(t *testing.T) { // It won't engage because the duration between the rollovers is 1.1s, and // we had 10 events over that interval. - assert.Equal(t, false, d.Record(context.Background(), ms(1200), 1, nil)) + assert.Equal(t, false, d.Record(context.Background(), ms(1200), ld(1), nil)) assertStat(0, float64(10)/float64(1.1)) assert.Equal(t, ms(1200), d.mu.lastStatRollover) assertMaxStat(1099, 0, false) assert.Equal(t, nil, d.mu.splitFinder) - assert.Equal(t, false, d.Record(context.Background(), ms(2199), 12, nil)) + assert.Equal(t, false, d.Record(context.Background(), ms(2199), ld(12), nil)) assert.Equal(t, nil, d.mu.splitFinder) // 2200 is the next rollover point, and 12+1=13 stat should be computed. - assert.Equal(t, false, d.Record(context.Background(), ms(2200), 1, op("a"))) + assert.Equal(t, false, d.Record(context.Background(), ms(2200), ld(1), op("a"))) assert.Equal(t, ms(2200), d.mu.lastStatRollover) assertStat(0, float64(13)) assertMaxStat(2200, 13, true) @@ -149,7 +159,7 @@ func TestDecider(t *testing.T) { // to split. We don't test the details of exactly when that happens because // this is done in the finder tests. tick := 2200 - for o := op("a"); !d.Record(context.Background(), ms(tick), 11, o); tick += 1000 { + for o := op("a"); !d.Record(context.Background(), ms(tick), ld(11), o); tick += 1000 { if tick/1000%2 == 0 { o = op("z") } else { @@ -166,27 +176,27 @@ func TestDecider(t *testing.T) { if i%2 != 0 { o = op("a") } - assert.False(t, d.Record(context.Background(), ms(tick), 11, o)) + assert.False(t, d.Record(context.Background(), ms(tick), ld(11), o)) assert.True(t, d.LastStat(context.Background(), ms(tick)) > 1.0) // Even though the split key remains. assert.Equal(t, roachpb.Key("z"), d.MaybeSplitKey(context.Background(), ms(tick+999))) tick += 1000 } // But after minSplitSuggestionInterval of ticks, we get another one. - assert.True(t, d.Record(context.Background(), ms(tick), 11, op("a"))) + assert.True(t, d.Record(context.Background(), ms(tick), ld(11), op("a"))) assertStat(tick, float64(11)) assertMaxStat(tick, 11, true) // Split key suggestion vanishes once stat drops. tick += 1000 - assert.False(t, d.Record(context.Background(), ms(tick), 9, op("a"))) + assert.False(t, d.Record(context.Background(), ms(tick), ld(9), op("a"))) assert.Equal(t, roachpb.Key(nil), d.MaybeSplitKey(context.Background(), ms(tick))) assert.Equal(t, nil, d.mu.splitFinder) // Hammer a key with writes above threshold. There shouldn't be a split // since everyone is hitting the same key and load can't be balanced. for i := 0; i < 1000; i++ { - assert.False(t, d.Record(context.Background(), ms(tick), 11, op("q"))) + assert.False(t, d.Record(context.Background(), ms(tick), ld(11), op("q"))) tick += 1000 } assert.True(t, d.mu.splitFinder.Ready(ms(tick))) @@ -194,7 +204,7 @@ func TestDecider(t *testing.T) { // But the finder keeps sampling to adapt to changing workload... for i := 0; i < 1000; i++ { - assert.False(t, d.Record(context.Background(), ms(tick), 11, op("p"))) + assert.False(t, d.Record(context.Background(), ms(tick), ld(11), op("p"))) tick += 1000 } @@ -214,7 +224,7 @@ func TestDecider(t *testing.T) { if i%2 != 0 { o = op("a") } - d.Record(context.Background(), ms(tick), 11, o) + d.Record(context.Background(), ms(tick), ld(11), o) tick += 500 } @@ -242,7 +252,7 @@ func TestDecider_MaxStat(t *testing.T) { Init(&d, &loadSplitConfig, &LoadSplitterMetrics{ PopularKeyCount: metric.NewCounter(metric.Metadata{}), NoSplitKeyCount: metric.NewCounter(metric.Metadata{}), - }) + }, SplitQPS) assertMaxStat := func(i int, expMaxStat float64, expOK bool) { t.Helper() @@ -254,22 +264,22 @@ func TestDecider_MaxStat(t *testing.T) { assertMaxStat(1000, 0, false) // Record a large number of samples. - d.Record(context.Background(), ms(1500), 5, nil) - d.Record(context.Background(), ms(2000), 5, nil) - d.Record(context.Background(), ms(4500), 1, nil) - d.Record(context.Background(), ms(5000), 15, nil) - d.Record(context.Background(), ms(5500), 2, nil) - d.Record(context.Background(), ms(8000), 5, nil) - d.Record(context.Background(), ms(10000), 9, nil) + d.Record(context.Background(), ms(1500), ld(5), nil) + d.Record(context.Background(), ms(2000), ld(5), nil) + d.Record(context.Background(), ms(4500), ld(1), nil) + d.Record(context.Background(), ms(5000), ld(15), nil) + d.Record(context.Background(), ms(5500), ld(2), nil) + d.Record(context.Background(), ms(8000), ld(5), nil) + d.Record(context.Background(), ms(10000), ld(9), nil) assertMaxStat(10000, 0, false) assertMaxStat(11000, 17, true) // Record more samples with a lower Stat. - d.Record(context.Background(), ms(12000), 1, nil) - d.Record(context.Background(), ms(13000), 4, nil) - d.Record(context.Background(), ms(15000), 2, nil) - d.Record(context.Background(), ms(19000), 3, nil) + d.Record(context.Background(), ms(12000), ld(1), nil) + d.Record(context.Background(), ms(13000), ld(4), nil) + d.Record(context.Background(), ms(15000), ld(2), nil) + d.Record(context.Background(), ms(19000), ld(3), nil) assertMaxStat(20000, 4.5, true) assertMaxStat(21000, 4, true) @@ -295,7 +305,7 @@ func TestDeciderCallsEnsureSafeSplitKey(t *testing.T) { Init(&d, &loadSplitConfig, &LoadSplitterMetrics{ PopularKeyCount: metric.NewCounter(metric.Metadata{}), NoSplitKeyCount: metric.NewCounter(metric.Metadata{}), - }) + }, SplitCPU) baseKey := keys.SystemSQLCodec.TablePrefix(51) for i := 0; i < 4; i++ { @@ -311,9 +321,9 @@ func TestDeciderCallsEnsureSafeSplitKey(t *testing.T) { var now time.Time for i := 0; i < 2*int(minSplitSuggestionInterval/time.Second); i++ { now = now.Add(500 * time.Millisecond) - d.Record(context.Background(), now, 1, c0) + d.Record(context.Background(), now, ld(1), c0) now = now.Add(500 * time.Millisecond) - d.Record(context.Background(), now, 1, c1) + d.Record(context.Background(), now, ld(1), c1) k = d.MaybeSplitKey(context.Background(), now) if len(k) != 0 { break @@ -338,7 +348,7 @@ func TestDeciderIgnoresEnsureSafeSplitKeyOnError(t *testing.T) { Init(&d, &loadSplitConfig, &LoadSplitterMetrics{ PopularKeyCount: metric.NewCounter(metric.Metadata{}), NoSplitKeyCount: metric.NewCounter(metric.Metadata{}), - }) + }, SplitCPU) baseKey := keys.SystemSQLCodec.TablePrefix(51) for i := 0; i < 4; i++ { @@ -358,9 +368,9 @@ func TestDeciderIgnoresEnsureSafeSplitKeyOnError(t *testing.T) { var now time.Time for i := 0; i < 2*int(minSplitSuggestionInterval/time.Second); i++ { now = now.Add(500 * time.Millisecond) - d.Record(context.Background(), now, 1, c0) + d.Record(context.Background(), now, ld(1), c0) now = now.Add(500 * time.Millisecond) - d.Record(context.Background(), now, 1, c1) + d.Record(context.Background(), now, ld(1), c1) k = d.MaybeSplitKey(context.Background(), now) if len(k) != 0 { break @@ -477,16 +487,16 @@ func TestDeciderMetrics(t *testing.T) { Init(&dPopular, &loadSplitConfig, &LoadSplitterMetrics{ PopularKeyCount: metric.NewCounter(metric.Metadata{}), NoSplitKeyCount: metric.NewCounter(metric.Metadata{}), - }) + }, SplitCPU) // No split key, popular key for i := 0; i < 20; i++ { - dPopular.Record(context.Background(), ms(timeStart), 1, func() roachpb.Span { + dPopular.Record(context.Background(), ms(timeStart), ld(1), func() roachpb.Span { return roachpb.Span{Key: keys.SystemSQLCodec.TablePrefix(uint32(0))} }) } for i := 1; i <= 2000; i++ { - dPopular.Record(context.Background(), ms(timeStart+i*50), 1, func() roachpb.Span { + dPopular.Record(context.Background(), ms(timeStart+i*50), ld(1), func() roachpb.Span { return roachpb.Span{Key: keys.SystemSQLCodec.TablePrefix(uint32(0))} }) } @@ -499,15 +509,15 @@ func TestDeciderMetrics(t *testing.T) { Init(&dNotPopular, &loadSplitConfig, &LoadSplitterMetrics{ PopularKeyCount: metric.NewCounter(metric.Metadata{}), NoSplitKeyCount: metric.NewCounter(metric.Metadata{}), - }) + }, SplitCPU) for i := 0; i < 20; i++ { - dNotPopular.Record(context.Background(), ms(timeStart), 1, func() roachpb.Span { + dNotPopular.Record(context.Background(), ms(timeStart), ld(1), func() roachpb.Span { return roachpb.Span{Key: keys.SystemSQLCodec.TablePrefix(uint32(0))} }) } for i := 1; i <= 2000; i++ { - dNotPopular.Record(context.Background(), ms(timeStart+i*50), 1, func() roachpb.Span { + dNotPopular.Record(context.Background(), ms(timeStart+i*50), ld(1), func() roachpb.Span { return roachpb.Span{Key: keys.SystemSQLCodec.TablePrefix(uint32(i))} }) } @@ -520,14 +530,14 @@ func TestDeciderMetrics(t *testing.T) { Init(&dAllInsufficientCounters, &loadSplitConfig, &LoadSplitterMetrics{ PopularKeyCount: metric.NewCounter(metric.Metadata{}), NoSplitKeyCount: metric.NewCounter(metric.Metadata{}), - }) + }, SplitCPU) for i := 0; i < 20; i++ { - dAllInsufficientCounters.Record(context.Background(), ms(timeStart), 1, func() roachpb.Span { + dAllInsufficientCounters.Record(context.Background(), ms(timeStart), ld(1), func() roachpb.Span { return roachpb.Span{Key: keys.SystemSQLCodec.TablePrefix(uint32(0))} }) } for i := 1; i <= 80; i++ { - dAllInsufficientCounters.Record(context.Background(), ms(timeStart+i*1000), 1, func() roachpb.Span { + dAllInsufficientCounters.Record(context.Background(), ms(timeStart+i*1000), ld(1), func() roachpb.Span { return roachpb.Span{Key: keys.SystemSQLCodec.TablePrefix(uint32(0))} }) } diff --git a/pkg/kv/kvserver/split/objective.go b/pkg/kv/kvserver/split/objective.go new file mode 100644 index 000000000000..85aa0f9b5d4d --- /dev/null +++ b/pkg/kv/kvserver/split/objective.go @@ -0,0 +1,52 @@ +// Copyright 2023 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package split + +import ( + "fmt" + "time" + + "github.com/cockroachdb/cockroach/pkg/util/humanizeutil" +) + +// SplitObjective is a type that specifies a load based splitting objective. +type SplitObjective int + +const ( + // SplitQPS will track and split QPS (queries-per-second) over a range. + SplitQPS SplitObjective = iota + // SplitCPU will track and split CPU (cpu-per-second) over a range. + SplitCPU +) + +// String returns a human readable string representation of the dimension. +func (d SplitObjective) String() string { + switch d { + case SplitQPS: + return "qps" + case SplitCPU: + return "cpu" + default: + panic(fmt.Sprintf("cannot name: unknown objective with ordinal %d", d)) + } +} + +// Format returns a formatted string for a value. +func (d SplitObjective) Format(value float64) string { + switch d { + case SplitQPS: + return fmt.Sprintf("%.1f", value) + case SplitCPU: + return string(humanizeutil.Duration(time.Duration(int64(value)))) + default: + panic(fmt.Sprintf("cannot format value: unknown objective with ordinal %d", d)) + } +} diff --git a/pkg/kv/kvserver/split_queue.go b/pkg/kv/kvserver/split_queue.go index 33771f922361..8817b1cb3998 100644 --- a/pkg/kv/kvserver/split_queue.go +++ b/pkg/kv/kvserver/split_queue.go @@ -263,18 +263,19 @@ func (sq *splitQueue) processAttempt( } now := timeutil.Now() - if splitByLoadKey := r.loadBasedSplitter.MaybeSplitKey(ctx, now); splitByLoadKey != nil { + splitByLoadKey := r.loadBasedSplitter.MaybeSplitKey(ctx, now) + if splitByLoadKey != nil { loadStats := r.loadStats.Stats() batchHandledQPS := loadStats.QueriesPerSecond raftAppliedQPS := loadStats.WriteKeysPerSecond - lastSplitStat := r.loadBasedSplitter.LastStat(ctx, now) - splitObj := sq.store.splitConfig.SplitObjective() + lbSplitSnap := r.loadBasedSplitter.Snapshot(ctx, now) + splitObj := lbSplitSnap.SplitObjective reason := fmt.Sprintf( "load at key %s (%s %s, %.2f batches/sec, %.2f raft mutations/sec)", splitByLoadKey, splitObj, - splitObj.Format(lastSplitStat), + splitObj.Format(lbSplitSnap.Last), batchHandledQPS, raftAppliedQPS, ) @@ -313,6 +314,7 @@ func (sq *splitQueue) processAttempt( r.loadBasedSplitter.Reset(sq.store.Clock().PhysicalTime()) return true, nil } + return false, nil } diff --git a/pkg/kv/kvserver/store.go b/pkg/kv/kvserver/store.go index 9f6069076b8b..fbcbeaa3eda7 100644 --- a/pkg/kv/kvserver/store.go +++ b/pkg/kv/kvserver/store.go @@ -775,7 +775,6 @@ type Store struct { ctSender *sidetransport.Sender storeGossip *StoreGossip rebalanceObjManager *RebalanceObjectiveManager - splitConfig *replicaSplitConfig coalescedMu struct { syncutil.Mutex @@ -1234,14 +1233,16 @@ func NewStore( s.rebalanceObjManager = newRebalanceObjectiveManager(ctx, s.cfg.Settings, func(ctx context.Context, obj LBRebalancingObjective) { s.VisitReplicas(func(r *Replica) (wantMore bool) { - r.loadBasedSplitter.Reset(s.Clock().PhysicalTime()) + r.loadBasedSplitter.SetSplitObjective( + s.Clock().PhysicalTime(), + obj.ToSplitObjective(), + ) return true }) }, allocatorStorePool, /* storeDescProvider */ allocatorStorePool, /* capacityChangeNotifier */ ) - s.splitConfig = newReplicaSplitConfig(s.cfg.Settings, s.rebalanceObjManager) } if cfg.RPCContext != nil { s.allocator = allocatorimpl.MakeAllocator(