Skip to content

Commit

Permalink
kvserver: fix deadlock on rebalance obj change
Browse files Browse the repository at this point in the history
Previously, changing the rebalance objective could lead to inconsistent
locking order between the load based splitter and rebalance objective.

The split config is created per replica, rather than per store as it was
previously. The split config and split decider are bundled underneath a
new mutex which ensures consistent access.

Resolves: #97000

Release note: None
  • Loading branch information
kvoli committed Feb 22, 2023
1 parent 286b3e2 commit ee624b8
Show file tree
Hide file tree
Showing 8 changed files with 113 additions and 61 deletions.
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -388,7 +388,7 @@ func (r *Replica) LargestPreviousMaxRangeSizeBytes() int64 {
// LoadBasedSplitter returns the replica's split.Decider, which is used to
// assist load-based split (and merge) decisions.
func (r *Replica) LoadBasedSplitter() *split.Decider {
return &r.loadBasedSplitter
return &r.loadBasedSplitterMu.loadBasedSplitter
}

func MakeSSTable(
Expand Down
55 changes: 35 additions & 20 deletions pkg/kv/kvserver/merge_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ var _ PurgatoryError = rangeMergePurgatoryError{}

func (mq *mergeQueue) requestRangeStats(
ctx context.Context, key roachpb.Key,
) (desc *roachpb.RangeDescriptor, stats enginepb.MVCCStats, ls loadSplitStat, err error) {
) (desc *roachpb.RangeDescriptor, stats enginepb.MVCCStats, lss []loadSplitStat, err error) {

ba := &kvpb.BatchRequest{}
ba.Add(&kvpb.RangeStatsRequest{
Expand All @@ -194,7 +194,7 @@ func (mq *mergeQueue) requestRangeStats(

br, pErr := mq.db.NonTransactionalSender().Send(ctx, ba)
if pErr != nil {
return nil, enginepb.MVCCStats{}, loadSplitStat{}, pErr.GoError()
return nil, enginepb.MVCCStats{}, []loadSplitStat{}, pErr.GoError()
}
res := br.Responses[0].GetInner().(*kvpb.RangeStatsResponse)

Expand All @@ -207,18 +207,18 @@ func (mq *mergeQueue) requestRangeStats(
// objective is currently CPU, it must be the case that every store in the
// cluster is running a version that populates MaxCPUPerSecond so we don't
// need to worry about it being 0 as default and passing the >= 0 check.
switch mq.store.splitConfig.SplitObjective() {
case SplitCPU:
ls.max = res.MaxCPUPerSecond
ls.ok = res.MaxCPUPerSecond >= 0
ls.typ = SplitCPU
case SplitQPS:
ls.max = res.MaxQueriesPerSecond
ls.ok = res.MaxQueriesPerSecond >= 0
ls.typ = SplitQPS
}

return desc, stats, ls, nil
lss = append(lss,
loadSplitStat{
max: res.MaxCPUPerSecond,
ok: res.MaxCPUPerSecond >= 0,
typ: SplitCPU,
},
loadSplitStat{
max: res.MaxQueriesPerSecond,
ok: res.MaxQueriesPerSecond >= 0,
typ: SplitQPS,
})
return desc, stats, lss, nil
}

func (mq *mergeQueue) process(
Expand All @@ -231,15 +231,14 @@ func (mq *mergeQueue) process(

lhsDesc := lhsRepl.Desc()
lhsStats := lhsRepl.GetMVCCStats()
lhsLoadSplitStat := lhsRepl.loadSplitStat(ctx)
minBytes := lhsRepl.GetMinBytes()
if lhsStats.Total() >= minBytes {
log.VEventf(ctx, 2, "skipping merge: LHS meets minimum size threshold %d with %d bytes",
minBytes, lhsStats.Total())
return false, nil
}

rhsDesc, rhsStats, rhsLoadSplitStat, err := mq.requestRangeStats(ctx, lhsDesc.EndKey.AsRawKey())
rhsDesc, rhsStats, rhsLoadSplitStats, err := mq.requestRangeStats(ctx, lhsDesc.EndKey.AsRawKey())
if err != nil {
return false, err
}
Expand All @@ -265,11 +264,23 @@ func (mq *mergeQueue) process(
mergedStats := lhsStats
mergedStats.Add(rhsStats)

var rhsLoadSplitStat loadSplitStat
lhsLoadSplitStat := lhsRepl.loadSplitStat(ctx)
lhsRepl.loadBasedSplitterMu.Lock()
splitObjective := lhsRepl.loadBasedSplitterMu.splitConfig.SplitObjective()
splitThreshold := lhsRepl.loadBasedSplitterMu.splitConfig.StatThreshold()
lhsRepl.loadBasedSplitterMu.Unlock()
for _, splitStat := range rhsLoadSplitStats {
if splitStat.typ == splitObjective {
rhsLoadSplitStat = splitStat
}
}

var loadMergeReason string
if lhsRepl.SplitByLoadEnabled() {
var canMergeLoad bool
if canMergeLoad, loadMergeReason = canMergeRangeLoad(
ctx, lhsLoadSplitStat, rhsLoadSplitStat, mq.store.splitConfig,
ctx, lhsLoadSplitStat, rhsLoadSplitStat, splitObjective, splitThreshold,
); !canMergeLoad {
log.VEventf(ctx, 2, "skipping merge to avoid thrashing: merged range %s may split %s",
mergedDesc, loadMergeReason)
Expand Down Expand Up @@ -416,7 +427,9 @@ func (mq *mergeQueue) process(
// could just Reset the splitter, but then we'd need to wait out a full
// measurement period (default of 5m) before merging this range again.
if mergedLoadSplitStat := lhsLoadSplitStat.max + rhsLoadSplitStat.max; mergedLoadSplitStat != 0 {
lhsRepl.loadBasedSplitter.RecordMax(mq.store.Clock().PhysicalTime(), mergedLoadSplitStat)
lhsRepl.loadBasedSplitterMu.Lock()
lhsRepl.loadBasedSplitterMu.loadBasedSplitter.RecordMax(mq.store.Clock().PhysicalTime(), mergedLoadSplitStat)
lhsRepl.loadBasedSplitterMu.Unlock()
}
return true, nil
}
Expand All @@ -439,7 +452,10 @@ func (mq *mergeQueue) updateChan() <-chan time.Time {
}

func canMergeRangeLoad(
ctx context.Context, lhs, rhs loadSplitStat, rsc *replicaSplitConfig,
ctx context.Context,
lhs, rhs loadSplitStat,
splitObjective SplitObjective,
splitThreshold float64,
) (can bool, reason string) {
// When load is a consideration for splits and, by extension, merges, the
// mergeQueue is fairly conservative. In an effort to avoid thrashing and to
Expand All @@ -462,7 +478,6 @@ func canMergeRangeLoad(
// the current split objective they cannot merge together. This could occur
// just after changing the split objective to a different value, where
// there is a mismatch.
splitObjective, splitThreshold := rsc.SplitObjective(), rsc.StatThreshold()
if lhs.typ != splitObjective {
return false, "LHS load measurement is a different type (%s) than current split objective (%s)"
}
Expand Down
23 changes: 17 additions & 6 deletions pkg/kv/kvserver/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -819,8 +819,13 @@ type Replica struct {
// semaphores.
splitQueueThrottle, mergeQueueThrottle util.EveryN

// loadBasedSplitter keeps information about load-based splitting.
loadBasedSplitter split.Decider
// loadBasedSplitterMu protects access to the load based split decider and
// configuration.
loadBasedSplitterMu struct {
syncutil.Mutex
loadBasedSplitter split.Decider
splitConfig *replicaSplitConfig
}

// unreachablesMu contains a set of remote ReplicaIDs that are to be reported
// as unreachable on the next raft tick.
Expand Down Expand Up @@ -1301,10 +1306,13 @@ func (r *Replica) SetMVCCStatsForTesting(stats *enginepb.MVCCStats) {
//
// Use LoadStats.QueriesPerSecond for all other purposes.
func (r *Replica) GetMaxSplitQPS(ctx context.Context) (float64, bool) {
if r.store.splitConfig.SplitObjective() != SplitQPS {
r.loadBasedSplitterMu.Lock()
defer r.loadBasedSplitterMu.Unlock()

if r.loadBasedSplitterMu.splitConfig.SplitObjective() != SplitQPS {
return 0, false
}
return r.loadBasedSplitter.MaxStat(ctx, r.Clock().PhysicalTime())
return r.loadBasedSplitterMu.loadBasedSplitter.MaxStat(ctx, r.Clock().PhysicalTime())
}

// GetMaxSplitCPU returns the Replica's maximum CPU/s rate over a configured
Expand All @@ -1317,10 +1325,13 @@ func (r *Replica) GetMaxSplitQPS(ctx context.Context) (float64, bool) {
// Use LoadStats.RaftCPUNanosPerSecond and RequestCPUNanosPerSecond for current
// CPU stats for all other purposes.
func (r *Replica) GetMaxSplitCPU(ctx context.Context) (float64, bool) {
if r.store.splitConfig.SplitObjective() != SplitCPU {
r.loadBasedSplitterMu.Lock()
defer r.loadBasedSplitterMu.Unlock()

if r.loadBasedSplitterMu.splitConfig.SplitObjective() != SplitCPU {
return 0, false
}
return r.loadBasedSplitter.MaxStat(ctx, r.Clock().PhysicalTime())
return r.loadBasedSplitterMu.loadBasedSplitter.MaxStat(ctx, r.Clock().PhysicalTime())
}

// ContainsKey returns whether this range contains the specified key.
Expand Down
14 changes: 9 additions & 5 deletions pkg/kv/kvserver/replica_init.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,11 +107,6 @@ func newUninitializedReplica(
r.mu.stateLoader = stateloader.Make(rangeID)
r.mu.quiescent = true
r.mu.conf = store.cfg.DefaultSpanConfig
split.Init(
&r.loadBasedSplitter,
store.splitConfig,
store.metrics.LoadSplitterMetrics,
)

r.mu.proposals = map[kvserverbase.CmdIDKey]*ProposalData{}
r.mu.checksums = map[uuid.UUID]*replicaChecksum{}
Expand All @@ -125,6 +120,15 @@ func newUninitializedReplica(
}
if store.cfg.StorePool != nil {
r.loadStats = load.NewReplicaLoad(store.Clock(), store.cfg.StorePool.GetNodeLocalityString)
r.loadBasedSplitterMu.splitConfig = newReplicaSplitConfig(
store.ClusterSettings(),
rebalanceToSplitObjective(store.rebalanceObjManager.Objective()),
)
split.Init(
&r.loadBasedSplitterMu.loadBasedSplitter,
r.loadBasedSplitterMu.splitConfig,
store.metrics.LoadSplitterMetrics,
)
}

// NB: the state will be loaded when the replica gets initialized.
Expand Down
5 changes: 4 additions & 1 deletion pkg/kv/kvserver/replica_proposal.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,10 @@ func (r *Replica) leasePostApplyLocked(
if r.loadStats != nil {
r.loadStats.Reset()
}
r.loadBasedSplitter.Reset(r.Clock().PhysicalTime())
r.loadBasedSplitterMu.Lock()
r.loadBasedSplitterMu.loadBasedSplitter.Reset(r.Clock().PhysicalTime())
r.loadBasedSplitterMu.Unlock()

}

// Inform the concurrency manager that the lease holder has been updated.
Expand Down
49 changes: 29 additions & 20 deletions pkg/kv/kvserver/replica_split_load.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,17 @@ const (
SplitCPU
)

func rebalanceToSplitObjective(ro LBRebalancingObjective) SplitObjective {
switch ro {
case LBRebalancingQueries:
return SplitQPS
case LBRebalancingCPU:
return SplitCPU
default:
panic(fmt.Sprintf("unknown objective %d", ro))
}
}

// String returns a human readable string representation of the dimension.
func (d SplitObjective) String() string {
switch d {
Expand All @@ -97,34 +108,26 @@ func (d SplitObjective) Format(value float64) string {

// replicaSplitConfig implements the split.SplitConfig interface.
type replicaSplitConfig struct {
randSource split.RandSource
rebalanceObjectiveProvider RebalanceObjectiveProvider
st *cluster.Settings
randSource split.RandSource
splitObjective SplitObjective
st *cluster.Settings
}

func newReplicaSplitConfig(
st *cluster.Settings, rebalanceObjectiveProvider RebalanceObjectiveProvider,
st *cluster.Settings, splitObjective SplitObjective,
) *replicaSplitConfig {
return &replicaSplitConfig{
randSource: split.GlobalRandSource(),
rebalanceObjectiveProvider: rebalanceObjectiveProvider,
st: st,
randSource: split.GlobalRandSource(),
st: st,
splitObjective: splitObjective,
}
}

// SplitObjective returns the current split objective. Currently this tracks
// 1:1 to the rebalance objective e.g. balancing QPS means also load based
// splitting on QPS.
func (c *replicaSplitConfig) SplitObjective() SplitObjective {
obj := c.rebalanceObjectiveProvider.Objective()
switch obj {
case LBRebalancingQueries:
return SplitQPS
case LBRebalancingCPU:
return SplitCPU
default:
panic(errors.AssertionFailedf("Unkown split objective %d", obj))
}
return c.splitObjective
}

// NewLoadBasedSplitter returns a new LoadBasedSplitter that may be used to
Expand Down Expand Up @@ -175,11 +178,13 @@ type loadSplitStat struct {
}

func (r *Replica) loadSplitStat(ctx context.Context) loadSplitStat {
max, ok := r.loadBasedSplitter.MaxStat(ctx, r.Clock().PhysicalTime())
r.loadBasedSplitterMu.Lock()
defer r.loadBasedSplitterMu.Unlock()
max, ok := r.loadBasedSplitterMu.loadBasedSplitter.MaxStat(ctx, r.Clock().PhysicalTime())
lss := loadSplitStat{
max: max,
ok: ok,
typ: r.store.splitConfig.SplitObjective(),
typ: r.loadBasedSplitterMu.splitConfig.SplitObjective(),
}
return lss
}
Expand Down Expand Up @@ -278,15 +283,19 @@ func (r *Replica) recordBatchForLoadBasedSplitting(
len(ba.Requests), len(br.Responses))
}

r.loadBasedSplitterMu.Lock()
defer r.loadBasedSplitterMu.Unlock()

// When QPS splitting is enabled, use the number of requests rather than
// the given stat for recording load.
if r.store.splitConfig.SplitObjective() == SplitQPS {
if r.loadBasedSplitterMu.splitConfig.SplitObjective() == SplitQPS {
stat = len(ba.Requests)
}

shouldInitSplit := r.loadBasedSplitter.Record(ctx, timeutil.Now(), stat, func() roachpb.Span {
shouldInitSplit := r.loadBasedSplitterMu.loadBasedSplitter.Record(ctx, timeutil.Now(), stat, func() roachpb.Span {
return getResponseBoundarySpan(ba, br)
})

if shouldInitSplit {
r.store.splitQueue.MaybeAddAsync(ctx, r, r.store.Clock().NowAsClockTimestamp())
}
Expand Down
19 changes: 14 additions & 5 deletions pkg/kv/kvserver/split_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,9 @@ func (sq *splitQueue) shouldQueue(
repl.GetMaxBytes(), repl.shouldBackpressureWrites(), confReader)

if !shouldQ && repl.SplitByLoadEnabled() {
if splitKey := repl.loadBasedSplitter.MaybeSplitKey(ctx, timeutil.Now()); splitKey != nil {
repl.loadBasedSplitterMu.Lock()
defer repl.loadBasedSplitterMu.Unlock()
if splitKey := repl.loadBasedSplitterMu.loadBasedSplitter.MaybeSplitKey(ctx, timeutil.Now()); splitKey != nil {
shouldQ, priority = true, 1.0 // default priority
}
}
Expand Down Expand Up @@ -262,13 +264,16 @@ func (sq *splitQueue) processAttempt(
return true, nil
}

r.loadBasedSplitterMu.Lock()
now := timeutil.Now()
if splitByLoadKey := r.loadBasedSplitter.MaybeSplitKey(ctx, now); splitByLoadKey != nil {
splitByLoadKey := r.loadBasedSplitterMu.loadBasedSplitter.MaybeSplitKey(ctx, now)
if splitByLoadKey != nil {
loadStats := r.loadStats.Stats()
batchHandledQPS := loadStats.QueriesPerSecond
raftAppliedQPS := loadStats.WriteKeysPerSecond
lastSplitStat := r.loadBasedSplitter.LastStat(ctx, now)
splitObj := sq.store.splitConfig.SplitObjective()
lastSplitStat := r.loadBasedSplitterMu.loadBasedSplitter.LastStat(ctx, now)
splitObj := r.loadBasedSplitterMu.splitConfig.SplitObjective()
r.loadBasedSplitterMu.Unlock()

reason := fmt.Sprintf(
"load at key %s (%s %s, %.2f batches/sec, %.2f raft mutations/sec)",
Expand Down Expand Up @@ -310,9 +315,13 @@ func (sq *splitQueue) processAttempt(
sq.metrics.LoadBasedSplitCount.Inc(1)

// Reset the splitter now that the bounds of the range changed.
r.loadBasedSplitter.Reset(sq.store.Clock().PhysicalTime())
r.loadBasedSplitterMu.Lock()
r.loadBasedSplitterMu.loadBasedSplitter.Reset(sq.store.Clock().PhysicalTime())
r.loadBasedSplitterMu.Unlock()
return true, nil
}

r.loadBasedSplitterMu.Unlock()
return false, nil
}

Expand Down
7 changes: 4 additions & 3 deletions pkg/kv/kvserver/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -775,7 +775,6 @@ type Store struct {
ctSender *sidetransport.Sender
storeGossip *StoreGossip
rebalanceObjManager *RebalanceObjectiveManager
splitConfig *replicaSplitConfig

coalescedMu struct {
syncutil.Mutex
Expand Down Expand Up @@ -1234,14 +1233,16 @@ func NewStore(
s.rebalanceObjManager = newRebalanceObjectiveManager(ctx, s.cfg.Settings,
func(ctx context.Context, obj LBRebalancingObjective) {
s.VisitReplicas(func(r *Replica) (wantMore bool) {
r.loadBasedSplitter.Reset(s.Clock().PhysicalTime())
r.loadBasedSplitterMu.Lock()
r.loadBasedSplitterMu.splitConfig.splitObjective = rebalanceToSplitObjective(obj)
r.loadBasedSplitterMu.loadBasedSplitter.Reset(s.Clock().PhysicalTime())
r.loadBasedSplitterMu.Unlock()
return true
})
},
allocatorStorePool, /* storeDescProvider */
allocatorStorePool, /* capacityChangeNotifier */
)
s.splitConfig = newReplicaSplitConfig(s.cfg.Settings, s.rebalanceObjManager)
}
if cfg.RPCContext != nil {
s.allocator = allocatorimpl.MakeAllocator(
Expand Down

0 comments on commit ee624b8

Please sign in to comment.