diff --git a/pkg/kv/kvserver/helpers_test.go b/pkg/kv/kvserver/helpers_test.go index 243c6d9d773c..add89769008e 100644 --- a/pkg/kv/kvserver/helpers_test.go +++ b/pkg/kv/kvserver/helpers_test.go @@ -388,7 +388,7 @@ func (r *Replica) LargestPreviousMaxRangeSizeBytes() int64 { // LoadBasedSplitter returns the replica's split.Decider, which is used to // assist load-based split (and merge) decisions. func (r *Replica) LoadBasedSplitter() *split.Decider { - return &r.loadBasedSplitter + return &r.loadBasedSplitterMu.loadBasedSplitter } func MakeSSTable( diff --git a/pkg/kv/kvserver/merge_queue.go b/pkg/kv/kvserver/merge_queue.go index 1f5097d998f7..4d8da1862a33 100644 --- a/pkg/kv/kvserver/merge_queue.go +++ b/pkg/kv/kvserver/merge_queue.go @@ -185,7 +185,7 @@ var _ PurgatoryError = rangeMergePurgatoryError{} func (mq *mergeQueue) requestRangeStats( ctx context.Context, key roachpb.Key, -) (desc *roachpb.RangeDescriptor, stats enginepb.MVCCStats, ls loadSplitStat, err error) { +) (desc *roachpb.RangeDescriptor, stats enginepb.MVCCStats, lss []loadSplitStat, err error) { ba := &kvpb.BatchRequest{} ba.Add(&kvpb.RangeStatsRequest{ @@ -194,7 +194,7 @@ func (mq *mergeQueue) requestRangeStats( br, pErr := mq.db.NonTransactionalSender().Send(ctx, ba) if pErr != nil { - return nil, enginepb.MVCCStats{}, loadSplitStat{}, pErr.GoError() + return nil, enginepb.MVCCStats{}, []loadSplitStat{}, pErr.GoError() } res := br.Responses[0].GetInner().(*kvpb.RangeStatsResponse) @@ -207,18 +207,18 @@ func (mq *mergeQueue) requestRangeStats( // objective is currently CPU, it must be the case that every store in the // cluster is running a version that populates MaxCPUPerSecond so we don't // need to worry about it being 0 as default and passing the >= 0 check. - switch mq.store.splitConfig.SplitObjective() { - case SplitCPU: - ls.max = res.MaxCPUPerSecond - ls.ok = res.MaxCPUPerSecond >= 0 - ls.typ = SplitCPU - case SplitQPS: - ls.max = res.MaxQueriesPerSecond - ls.ok = res.MaxQueriesPerSecond >= 0 - ls.typ = SplitQPS - } - - return desc, stats, ls, nil + lss = append(lss, + loadSplitStat{ + max: res.MaxCPUPerSecond, + ok: res.MaxCPUPerSecond >= 0, + typ: SplitCPU, + }, + loadSplitStat{ + max: res.MaxQueriesPerSecond, + ok: res.MaxQueriesPerSecond >= 0, + typ: SplitQPS, + }) + return desc, stats, lss, nil } func (mq *mergeQueue) process( @@ -231,7 +231,6 @@ func (mq *mergeQueue) process( lhsDesc := lhsRepl.Desc() lhsStats := lhsRepl.GetMVCCStats() - lhsLoadSplitStat := lhsRepl.loadSplitStat(ctx) minBytes := lhsRepl.GetMinBytes() if lhsStats.Total() >= minBytes { log.VEventf(ctx, 2, "skipping merge: LHS meets minimum size threshold %d with %d bytes", @@ -239,7 +238,7 @@ func (mq *mergeQueue) process( return false, nil } - rhsDesc, rhsStats, rhsLoadSplitStat, err := mq.requestRangeStats(ctx, lhsDesc.EndKey.AsRawKey()) + rhsDesc, rhsStats, rhsLoadSplitStats, err := mq.requestRangeStats(ctx, lhsDesc.EndKey.AsRawKey()) if err != nil { return false, err } @@ -265,11 +264,23 @@ func (mq *mergeQueue) process( mergedStats := lhsStats mergedStats.Add(rhsStats) + var rhsLoadSplitStat loadSplitStat + lhsLoadSplitStat := lhsRepl.loadSplitStat(ctx) + lhsRepl.loadBasedSplitterMu.Lock() + splitObjective := lhsRepl.loadBasedSplitterMu.splitConfig.SplitObjective() + splitThreshold := lhsRepl.loadBasedSplitterMu.splitConfig.StatThreshold() + lhsRepl.loadBasedSplitterMu.Unlock() + for _, splitStat := range rhsLoadSplitStats { + if splitStat.typ == splitObjective { + rhsLoadSplitStat = splitStat + } + } + var loadMergeReason string if lhsRepl.SplitByLoadEnabled() { var canMergeLoad bool if canMergeLoad, loadMergeReason = canMergeRangeLoad( - ctx, lhsLoadSplitStat, rhsLoadSplitStat, mq.store.splitConfig, + ctx, lhsLoadSplitStat, rhsLoadSplitStat, splitObjective, splitThreshold, ); !canMergeLoad { log.VEventf(ctx, 2, "skipping merge to avoid thrashing: merged range %s may split %s", mergedDesc, loadMergeReason) @@ -416,7 +427,9 @@ func (mq *mergeQueue) process( // could just Reset the splitter, but then we'd need to wait out a full // measurement period (default of 5m) before merging this range again. if mergedLoadSplitStat := lhsLoadSplitStat.max + rhsLoadSplitStat.max; mergedLoadSplitStat != 0 { - lhsRepl.loadBasedSplitter.RecordMax(mq.store.Clock().PhysicalTime(), mergedLoadSplitStat) + lhsRepl.loadBasedSplitterMu.Lock() + lhsRepl.loadBasedSplitterMu.loadBasedSplitter.RecordMax(mq.store.Clock().PhysicalTime(), mergedLoadSplitStat) + lhsRepl.loadBasedSplitterMu.Unlock() } return true, nil } @@ -439,7 +452,10 @@ func (mq *mergeQueue) updateChan() <-chan time.Time { } func canMergeRangeLoad( - ctx context.Context, lhs, rhs loadSplitStat, rsc *replicaSplitConfig, + ctx context.Context, + lhs, rhs loadSplitStat, + splitObjective SplitObjective, + splitThreshold float64, ) (can bool, reason string) { // When load is a consideration for splits and, by extension, merges, the // mergeQueue is fairly conservative. In an effort to avoid thrashing and to @@ -462,7 +478,6 @@ func canMergeRangeLoad( // the current split objective they cannot merge together. This could occur // just after changing the split objective to a different value, where // there is a mismatch. - splitObjective, splitThreshold := rsc.SplitObjective(), rsc.StatThreshold() if lhs.typ != splitObjective { return false, "LHS load measurement is a different type (%s) than current split objective (%s)" } diff --git a/pkg/kv/kvserver/replica.go b/pkg/kv/kvserver/replica.go index a523d1985f59..1cf2c3cf677a 100644 --- a/pkg/kv/kvserver/replica.go +++ b/pkg/kv/kvserver/replica.go @@ -819,8 +819,13 @@ type Replica struct { // semaphores. splitQueueThrottle, mergeQueueThrottle util.EveryN - // loadBasedSplitter keeps information about load-based splitting. - loadBasedSplitter split.Decider + // loadBasedSplitterMu protects access to the load based split decider and + // configuration. + loadBasedSplitterMu struct { + syncutil.Mutex + loadBasedSplitter split.Decider + splitConfig *replicaSplitConfig + } // unreachablesMu contains a set of remote ReplicaIDs that are to be reported // as unreachable on the next raft tick. @@ -1301,10 +1306,13 @@ func (r *Replica) SetMVCCStatsForTesting(stats *enginepb.MVCCStats) { // // Use LoadStats.QueriesPerSecond for all other purposes. func (r *Replica) GetMaxSplitQPS(ctx context.Context) (float64, bool) { - if r.store.splitConfig.SplitObjective() != SplitQPS { + r.loadBasedSplitterMu.Lock() + defer r.loadBasedSplitterMu.Unlock() + + if r.loadBasedSplitterMu.splitConfig.SplitObjective() != SplitQPS { return 0, false } - return r.loadBasedSplitter.MaxStat(ctx, r.Clock().PhysicalTime()) + return r.loadBasedSplitterMu.loadBasedSplitter.MaxStat(ctx, r.Clock().PhysicalTime()) } // GetMaxSplitCPU returns the Replica's maximum CPU/s rate over a configured @@ -1317,10 +1325,13 @@ func (r *Replica) GetMaxSplitQPS(ctx context.Context) (float64, bool) { // Use LoadStats.RaftCPUNanosPerSecond and RequestCPUNanosPerSecond for current // CPU stats for all other purposes. func (r *Replica) GetMaxSplitCPU(ctx context.Context) (float64, bool) { - if r.store.splitConfig.SplitObjective() != SplitCPU { + r.loadBasedSplitterMu.Lock() + defer r.loadBasedSplitterMu.Unlock() + + if r.loadBasedSplitterMu.splitConfig.SplitObjective() != SplitCPU { return 0, false } - return r.loadBasedSplitter.MaxStat(ctx, r.Clock().PhysicalTime()) + return r.loadBasedSplitterMu.loadBasedSplitter.MaxStat(ctx, r.Clock().PhysicalTime()) } // ContainsKey returns whether this range contains the specified key. diff --git a/pkg/kv/kvserver/replica_init.go b/pkg/kv/kvserver/replica_init.go index 6bb104457c74..d27c04580767 100644 --- a/pkg/kv/kvserver/replica_init.go +++ b/pkg/kv/kvserver/replica_init.go @@ -107,11 +107,6 @@ func newUninitializedReplica( r.mu.stateLoader = stateloader.Make(rangeID) r.mu.quiescent = true r.mu.conf = store.cfg.DefaultSpanConfig - split.Init( - &r.loadBasedSplitter, - store.splitConfig, - store.metrics.LoadSplitterMetrics, - ) r.mu.proposals = map[kvserverbase.CmdIDKey]*ProposalData{} r.mu.checksums = map[uuid.UUID]*replicaChecksum{} @@ -125,6 +120,15 @@ func newUninitializedReplica( } if store.cfg.StorePool != nil { r.loadStats = load.NewReplicaLoad(store.Clock(), store.cfg.StorePool.GetNodeLocalityString) + r.loadBasedSplitterMu.splitConfig = newReplicaSplitConfig( + store.ClusterSettings(), + rebalanceToSplitObjective(store.rebalanceObjManager.Objective()), + ) + split.Init( + &r.loadBasedSplitterMu.loadBasedSplitter, + r.loadBasedSplitterMu.splitConfig, + store.metrics.LoadSplitterMetrics, + ) } // NB: the state will be loaded when the replica gets initialized. diff --git a/pkg/kv/kvserver/replica_proposal.go b/pkg/kv/kvserver/replica_proposal.go index 6e96d1cdadcb..74022a807218 100644 --- a/pkg/kv/kvserver/replica_proposal.go +++ b/pkg/kv/kvserver/replica_proposal.go @@ -343,7 +343,10 @@ func (r *Replica) leasePostApplyLocked( if r.loadStats != nil { r.loadStats.Reset() } - r.loadBasedSplitter.Reset(r.Clock().PhysicalTime()) + r.loadBasedSplitterMu.Lock() + r.loadBasedSplitterMu.loadBasedSplitter.Reset(r.Clock().PhysicalTime()) + r.loadBasedSplitterMu.Unlock() + } // Inform the concurrency manager that the lease holder has been updated. diff --git a/pkg/kv/kvserver/replica_split_load.go b/pkg/kv/kvserver/replica_split_load.go index 035cf82be008..75446dde2b3d 100644 --- a/pkg/kv/kvserver/replica_split_load.go +++ b/pkg/kv/kvserver/replica_split_load.go @@ -71,6 +71,17 @@ const ( SplitCPU ) +func rebalanceToSplitObjective(ro LBRebalancingObjective) SplitObjective { + switch ro { + case LBRebalancingQueries: + return SplitQPS + case LBRebalancingCPU: + return SplitCPU + default: + panic(fmt.Sprintf("unknown objective %d", ro)) + } +} + // String returns a human readable string representation of the dimension. func (d SplitObjective) String() string { switch d { @@ -97,18 +108,18 @@ func (d SplitObjective) Format(value float64) string { // replicaSplitConfig implements the split.SplitConfig interface. type replicaSplitConfig struct { - randSource split.RandSource - rebalanceObjectiveProvider RebalanceObjectiveProvider - st *cluster.Settings + randSource split.RandSource + splitObjective SplitObjective + st *cluster.Settings } func newReplicaSplitConfig( - st *cluster.Settings, rebalanceObjectiveProvider RebalanceObjectiveProvider, + st *cluster.Settings, splitObjective SplitObjective, ) *replicaSplitConfig { return &replicaSplitConfig{ - randSource: split.GlobalRandSource(), - rebalanceObjectiveProvider: rebalanceObjectiveProvider, - st: st, + randSource: split.GlobalRandSource(), + st: st, + splitObjective: splitObjective, } } @@ -116,15 +127,7 @@ func newReplicaSplitConfig( // 1:1 to the rebalance objective e.g. balancing QPS means also load based // splitting on QPS. func (c *replicaSplitConfig) SplitObjective() SplitObjective { - obj := c.rebalanceObjectiveProvider.Objective() - switch obj { - case LBRebalancingQueries: - return SplitQPS - case LBRebalancingCPU: - return SplitCPU - default: - panic(errors.AssertionFailedf("Unkown split objective %d", obj)) - } + return c.splitObjective } // NewLoadBasedSplitter returns a new LoadBasedSplitter that may be used to @@ -175,11 +178,13 @@ type loadSplitStat struct { } func (r *Replica) loadSplitStat(ctx context.Context) loadSplitStat { - max, ok := r.loadBasedSplitter.MaxStat(ctx, r.Clock().PhysicalTime()) + r.loadBasedSplitterMu.Lock() + defer r.loadBasedSplitterMu.Unlock() + max, ok := r.loadBasedSplitterMu.loadBasedSplitter.MaxStat(ctx, r.Clock().PhysicalTime()) lss := loadSplitStat{ max: max, ok: ok, - typ: r.store.splitConfig.SplitObjective(), + typ: r.loadBasedSplitterMu.splitConfig.SplitObjective(), } return lss } @@ -278,15 +283,19 @@ func (r *Replica) recordBatchForLoadBasedSplitting( len(ba.Requests), len(br.Responses)) } + r.loadBasedSplitterMu.Lock() + defer r.loadBasedSplitterMu.Unlock() + // When QPS splitting is enabled, use the number of requests rather than // the given stat for recording load. - if r.store.splitConfig.SplitObjective() == SplitQPS { + if r.loadBasedSplitterMu.splitConfig.SplitObjective() == SplitQPS { stat = len(ba.Requests) } - shouldInitSplit := r.loadBasedSplitter.Record(ctx, timeutil.Now(), stat, func() roachpb.Span { + shouldInitSplit := r.loadBasedSplitterMu.loadBasedSplitter.Record(ctx, timeutil.Now(), stat, func() roachpb.Span { return getResponseBoundarySpan(ba, br) }) + if shouldInitSplit { r.store.splitQueue.MaybeAddAsync(ctx, r, r.store.Clock().NowAsClockTimestamp()) } diff --git a/pkg/kv/kvserver/split_queue.go b/pkg/kv/kvserver/split_queue.go index 33771f922361..489b5895abd3 100644 --- a/pkg/kv/kvserver/split_queue.go +++ b/pkg/kv/kvserver/split_queue.go @@ -183,7 +183,9 @@ func (sq *splitQueue) shouldQueue( repl.GetMaxBytes(), repl.shouldBackpressureWrites(), confReader) if !shouldQ && repl.SplitByLoadEnabled() { - if splitKey := repl.loadBasedSplitter.MaybeSplitKey(ctx, timeutil.Now()); splitKey != nil { + repl.loadBasedSplitterMu.Lock() + defer repl.loadBasedSplitterMu.Unlock() + if splitKey := repl.loadBasedSplitterMu.loadBasedSplitter.MaybeSplitKey(ctx, timeutil.Now()); splitKey != nil { shouldQ, priority = true, 1.0 // default priority } } @@ -262,13 +264,16 @@ func (sq *splitQueue) processAttempt( return true, nil } + r.loadBasedSplitterMu.Lock() now := timeutil.Now() - if splitByLoadKey := r.loadBasedSplitter.MaybeSplitKey(ctx, now); splitByLoadKey != nil { + splitByLoadKey := r.loadBasedSplitterMu.loadBasedSplitter.MaybeSplitKey(ctx, now) + if splitByLoadKey != nil { loadStats := r.loadStats.Stats() batchHandledQPS := loadStats.QueriesPerSecond raftAppliedQPS := loadStats.WriteKeysPerSecond - lastSplitStat := r.loadBasedSplitter.LastStat(ctx, now) - splitObj := sq.store.splitConfig.SplitObjective() + lastSplitStat := r.loadBasedSplitterMu.loadBasedSplitter.LastStat(ctx, now) + splitObj := r.loadBasedSplitterMu.splitConfig.SplitObjective() + r.loadBasedSplitterMu.Unlock() reason := fmt.Sprintf( "load at key %s (%s %s, %.2f batches/sec, %.2f raft mutations/sec)", @@ -310,9 +315,13 @@ func (sq *splitQueue) processAttempt( sq.metrics.LoadBasedSplitCount.Inc(1) // Reset the splitter now that the bounds of the range changed. - r.loadBasedSplitter.Reset(sq.store.Clock().PhysicalTime()) + r.loadBasedSplitterMu.Lock() + r.loadBasedSplitterMu.loadBasedSplitter.Reset(sq.store.Clock().PhysicalTime()) + r.loadBasedSplitterMu.Unlock() return true, nil } + + r.loadBasedSplitterMu.Unlock() return false, nil } diff --git a/pkg/kv/kvserver/store.go b/pkg/kv/kvserver/store.go index aab168d91406..4b9dcdfb0c1f 100644 --- a/pkg/kv/kvserver/store.go +++ b/pkg/kv/kvserver/store.go @@ -775,7 +775,6 @@ type Store struct { ctSender *sidetransport.Sender storeGossip *StoreGossip rebalanceObjManager *RebalanceObjectiveManager - splitConfig *replicaSplitConfig coalescedMu struct { syncutil.Mutex @@ -1234,14 +1233,16 @@ func NewStore( s.rebalanceObjManager = newRebalanceObjectiveManager(ctx, s.cfg.Settings, func(ctx context.Context, obj LBRebalancingObjective) { s.VisitReplicas(func(r *Replica) (wantMore bool) { - r.loadBasedSplitter.Reset(s.Clock().PhysicalTime()) + r.loadBasedSplitterMu.Lock() + r.loadBasedSplitterMu.splitConfig.splitObjective = rebalanceToSplitObjective(obj) + r.loadBasedSplitterMu.loadBasedSplitter.Reset(s.Clock().PhysicalTime()) + r.loadBasedSplitterMu.Unlock() return true }) }, allocatorStorePool, /* storeDescProvider */ allocatorStorePool, /* capacityChangeNotifier */ ) - s.splitConfig = newReplicaSplitConfig(s.cfg.Settings, s.rebalanceObjManager) } if cfg.RPCContext != nil { s.allocator = allocatorimpl.MakeAllocator(