Skip to content

Commit

Permalink
kvserver: fix deadlock on rebalance obj change
Browse files Browse the repository at this point in the history
Previously, changing the rebalance objective could lead to inconsistent
locking order between the load based splitter and rebalance objective.
When the objective was updated, the previous method also blocked
batch requests from completing until every replica lb splitter was
reset.

This commit moves the split objective to be a variable owned by the
decider, rather than inferred on each decider operation. The split
objective is updated on a rebalance objective change atomically over
each replica but not atomically over a store. This removes the need for
blocking batch requests until every replica is updated.

Resolves: cockroachdb#97000
Resolves: cockroachdb#97445
Resolves: cockroachdb#97450
Resolves: cockroachdb#97452
Resolves: cockroachdb#97457

Release note: None
  • Loading branch information
kvoli committed Feb 23, 2023
1 parent e67e396 commit 03cac61
Show file tree
Hide file tree
Showing 11 changed files with 318 additions and 217 deletions.
20 changes: 12 additions & 8 deletions pkg/kv/kvserver/asim/state/split_decider.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,9 @@ type loadSplitConfig struct {

// NewLoadBasedSplitter returns a new LoadBasedSplitter that may be used to
// find the midpoint based on recorded load.
func (lsc loadSplitConfig) NewLoadBasedSplitter(startTime time.Time) split.LoadBasedSplitter {
func (lsc loadSplitConfig) NewLoadBasedSplitter(
startTime time.Time, _ split.SplitObjective,
) split.LoadBasedSplitter {
return split.NewUnweightedFinder(startTime, lsc.randSource)
}

Expand All @@ -57,7 +59,7 @@ func (lsc loadSplitConfig) StatRetention() time.Duration {

// StatThreshold returns the threshold for load above which the range
// should be considered split.
func (lsc loadSplitConfig) StatThreshold() float64 {
func (lsc loadSplitConfig) StatThreshold(_ split.SplitObjective) float64 {
return lsc.settings.SplitQPSThreshold
}

Expand Down Expand Up @@ -85,7 +87,7 @@ func (s *SplitDecider) newDecider() *split.Decider {
split.Init(decider, s.splitConfig, &split.LoadSplitterMetrics{
PopularKeyCount: metric.NewCounter(metric.Metadata{}),
NoSplitKeyCount: metric.NewCounter(metric.Metadata{}),
})
}, split.SplitQPS)
return decider
}

Expand All @@ -100,11 +102,13 @@ func (s *SplitDecider) Record(tick time.Time, rangeID RangeID, le workload.LoadE
}

qps := LoadEventQPS(le)
shouldSplit := decider.Record(context.Background(), tick, int(qps), func() roachpb.Span {
return roachpb.Span{
Key: Key(le.Key).ToRKey().AsRawKey(),
}
})
shouldSplit := decider.Record(
context.Background(),
tick,
func(_ split.SplitObjective) int { return int(qps) },
func() roachpb.Span {
return roachpb.Span{Key: Key(le.Key).ToRKey().AsRawKey()}
})

if shouldSplit {
s.suggestions = append(s.suggestions, rangeID)
Expand Down
97 changes: 54 additions & 43 deletions pkg/kv/kvserver/merge_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/split"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/spanconfig"
Expand Down Expand Up @@ -185,7 +186,12 @@ var _ PurgatoryError = rangeMergePurgatoryError{}

func (mq *mergeQueue) requestRangeStats(
ctx context.Context, key roachpb.Key,
) (desc *roachpb.RangeDescriptor, stats enginepb.MVCCStats, ls loadSplitStat, err error) {
) (
desc *roachpb.RangeDescriptor,
stats enginepb.MVCCStats,
lbSnap split.LoadSplitSnapshot,
err error,
) {

ba := &kvpb.BatchRequest{}
ba.Add(&kvpb.RangeStatsRequest{
Expand All @@ -194,31 +200,36 @@ func (mq *mergeQueue) requestRangeStats(

br, pErr := mq.db.NonTransactionalSender().Send(ctx, ba)
if pErr != nil {
return nil, enginepb.MVCCStats{}, loadSplitStat{}, pErr.GoError()
return nil, enginepb.MVCCStats{}, lbSnap, pErr.GoError()
}
res := br.Responses[0].GetInner().(*kvpb.RangeStatsResponse)

desc = &res.RangeInfo.Desc
stats = res.MVCCStats

// The load based splitter will only track the max of at most one statistic
// at a time for load based splitting. This is either CPU or QPS. However
// we don't enforce that only one max stat is returned. If the split
// objective is currently CPU, it must be the case that every store in the
// cluster is running a version that populates MaxCPUPerSecond so we don't
// need to worry about it being 0 as default and passing the >= 0 check.
switch mq.store.splitConfig.SplitObjective() {
case SplitCPU:
ls.max = res.MaxCPUPerSecond
ls.ok = res.MaxCPUPerSecond >= 0
ls.typ = SplitCPU
case SplitQPS:
ls.max = res.MaxQueriesPerSecond
ls.ok = res.MaxQueriesPerSecond >= 0
ls.typ = SplitQPS
// at a time for load based splitting. This is either CPU or QPS. However we
// don't enforce that only one max stat is returned. We set QPS after CPU,
// possibly overwriting if both are set. A default value of 0 could be
// returned in the response if the rhs node is on a pre 23.1 version. To
// avoid merging the range due to low load (0 CPU), always overwrite the
// value if MaxQPS is set. If neither are >= 0, OK won't be set and the
// objective will be default value, QPS.
if res.MaxCPUPerSecond >= 0 {
lbSnap = split.LoadSplitSnapshot{
SplitObjective: split.SplitCPU,
Max: res.MaxCPUPerSecond,
Ok: true,
}
}

return desc, stats, ls, nil
if res.MaxQueriesPerSecond >= 0 {
lbSnap = split.LoadSplitSnapshot{
SplitObjective: split.SplitQPS,
Max: res.MaxQueriesPerSecond,
Ok: true,
}
}
return desc, stats, lbSnap, nil
}

func (mq *mergeQueue) process(
Expand All @@ -231,15 +242,14 @@ func (mq *mergeQueue) process(

lhsDesc := lhsRepl.Desc()
lhsStats := lhsRepl.GetMVCCStats()
lhsLoadSplitStat := lhsRepl.loadSplitStat(ctx)
minBytes := lhsRepl.GetMinBytes()
if lhsStats.Total() >= minBytes {
log.VEventf(ctx, 2, "skipping merge: LHS meets minimum size threshold %d with %d bytes",
minBytes, lhsStats.Total())
return false, nil
}

rhsDesc, rhsStats, rhsLoadSplitStat, err := mq.requestRangeStats(ctx, lhsDesc.EndKey.AsRawKey())
rhsDesc, rhsStats, rhsLoadSplitSnap, err := mq.requestRangeStats(ctx, lhsDesc.EndKey.AsRawKey())
if err != nil {
return false, err
}
Expand All @@ -265,11 +275,12 @@ func (mq *mergeQueue) process(
mergedStats := lhsStats
mergedStats.Add(rhsStats)

lhsLoadSplitSnap := lhsRepl.loadBasedSplitter.Snapshot(ctx, now.ToTimestamp().GoTime())
var loadMergeReason string
if lhsRepl.SplitByLoadEnabled() {
var canMergeLoad bool
if canMergeLoad, loadMergeReason = canMergeRangeLoad(
ctx, lhsLoadSplitStat, rhsLoadSplitStat, mq.store.splitConfig,
ctx, lhsLoadSplitSnap, rhsLoadSplitSnap,
); !canMergeLoad {
log.VEventf(ctx, 2, "skipping merge to avoid thrashing: merged range %s may split %s",
mergedDesc, loadMergeReason)
Expand Down Expand Up @@ -415,7 +426,7 @@ func (mq *mergeQueue) process(
// Adjust the splitter to account for the additional load from the RHS. We
// could just Reset the splitter, but then we'd need to wait out a full
// measurement period (default of 5m) before merging this range again.
if mergedLoadSplitStat := lhsLoadSplitStat.max + rhsLoadSplitStat.max; mergedLoadSplitStat != 0 {
if mergedLoadSplitStat := lhsLoadSplitSnap.Max + rhsLoadSplitSnap.Max; mergedLoadSplitStat != 0 {
lhsRepl.loadBasedSplitter.RecordMax(mq.store.Clock().PhysicalTime(), mergedLoadSplitStat)
}
return true, nil
Expand All @@ -439,7 +450,7 @@ func (mq *mergeQueue) updateChan() <-chan time.Time {
}

func canMergeRangeLoad(
ctx context.Context, lhs, rhs loadSplitStat, rsc *replicaSplitConfig,
ctx context.Context, lhs, rhs split.LoadSplitSnapshot,
) (can bool, reason string) {
// When load is a consideration for splits and, by extension, merges, the
// mergeQueue is fairly conservative. In an effort to avoid thrashing and to
Expand All @@ -451,47 +462,47 @@ func canMergeRangeLoad(
// maximum qps measurement from both sides to be sufficiently stable and
// reliable, meaning that it was a maximum measurement over some extended
// period of time.
if !lhs.ok {
if !lhs.Ok {
return false, "LHS load measurement not yet reliable"
}
if !rhs.ok {
if !rhs.Ok {
return false, "RHS load measurement not yet reliable"
}

// When the lhs and rhs split stats are of different types, or do not match
// the current split objective they cannot merge together. This could occur
// just after changing the split objective to a different value, where
// there is a mismatch.
splitObjective, splitThreshold := rsc.SplitObjective(), rsc.StatThreshold()
if lhs.typ != splitObjective {
return false, "LHS load measurement is a different type (%s) than current split objective (%s)"
}
if rhs.typ != splitObjective {
return false, "RHS load measurement is a different type (%s) than current split objective (%s)"
if lhs.SplitObjective != rhs.SplitObjective {
return false, fmt.Sprintf("LHS load measurement is a different type (%s) than the RHS (%s)",
lhs.SplitObjective,
rhs.SplitObjective,
)
}

obj := lhs.SplitObjective
// Check if the merged range would need to be split, if so, skip merge.
// Use a lower threshold for load based splitting so we don't find ourselves
// in a situation where we keep merging ranges that would be split soon after
// by a small increase in load.
merged := lhs.max + rhs.max
conservativeLoadBasedSplitThreshold := 0.5 * splitThreshold
merged := lhs.Max + rhs.Max
conservativeLoadBasedSplitThreshold := 0.5 * lhs.Threshold

if merged >= conservativeLoadBasedSplitThreshold {
return false, fmt.Sprintf("lhs+rhs %s (%s+%s=%s) above threshold (%s)",
splitObjective,
splitObjective.Format(lhs.max),
splitObjective.Format(lhs.max),
splitObjective.Format(merged),
splitObjective.Format(conservativeLoadBasedSplitThreshold),
obj,
obj.Format(lhs.Max),
obj.Format(rhs.Max),
obj.Format(merged),
obj.Format(conservativeLoadBasedSplitThreshold),
)
}

return true, fmt.Sprintf("lhs+rhs %s (%s+%s=%s) below threshold (%s)",
splitObjective,
splitObjective.Format(lhs.max),
splitObjective.Format(lhs.max),
splitObjective.Format(merged),
splitObjective.Format(conservativeLoadBasedSplitThreshold),
obj,
obj.Format(lhs.Max),
obj.Format(rhs.Max),
obj.Format(merged),
obj.Format(conservativeLoadBasedSplitThreshold),
)
}
12 changes: 8 additions & 4 deletions pkg/kv/kvserver/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -1301,10 +1301,12 @@ func (r *Replica) SetMVCCStatsForTesting(stats *enginepb.MVCCStats) {
//
// Use LoadStats.QueriesPerSecond for all other purposes.
func (r *Replica) GetMaxSplitQPS(ctx context.Context) (float64, bool) {
if r.store.splitConfig.SplitObjective() != SplitQPS {
snap := r.loadBasedSplitter.Snapshot(ctx, r.Clock().PhysicalTime())

if snap.SplitObjective != split.SplitQPS {
return 0, false
}
return r.loadBasedSplitter.MaxStat(ctx, r.Clock().PhysicalTime())
return snap.Max, snap.Ok
}

// GetMaxSplitCPU returns the Replica's maximum CPU/s rate over a configured
Expand All @@ -1317,10 +1319,12 @@ func (r *Replica) GetMaxSplitQPS(ctx context.Context) (float64, bool) {
// Use LoadStats.RaftCPUNanosPerSecond and RequestCPUNanosPerSecond for current
// CPU stats for all other purposes.
func (r *Replica) GetMaxSplitCPU(ctx context.Context) (float64, bool) {
if r.store.splitConfig.SplitObjective() != SplitCPU {
snap := r.loadBasedSplitter.Snapshot(ctx, r.Clock().PhysicalTime())

if snap.SplitObjective != split.SplitCPU {
return 0, false
}
return r.loadBasedSplitter.MaxStat(ctx, r.Clock().PhysicalTime())
return snap.Max, snap.Ok
}

// ContainsKey returns whether this range contains the specified key.
Expand Down
12 changes: 7 additions & 5 deletions pkg/kv/kvserver/replica_init.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,11 +107,6 @@ func newUninitializedReplica(
r.mu.stateLoader = stateloader.Make(rangeID)
r.mu.quiescent = true
r.mu.conf = store.cfg.DefaultSpanConfig
split.Init(
&r.loadBasedSplitter,
store.splitConfig,
store.metrics.LoadSplitterMetrics,
)

r.mu.proposals = map[kvserverbase.CmdIDKey]*ProposalData{}
r.mu.checksums = map[uuid.UUID]*replicaChecksum{}
Expand All @@ -123,8 +118,15 @@ func newUninitializedReplica(
if leaseHistoryMaxEntries > 0 {
r.leaseHistory = newLeaseHistory()
}

if store.cfg.StorePool != nil {
r.loadStats = load.NewReplicaLoad(store.Clock(), store.cfg.StorePool.GetNodeLocalityString)
split.Init(
&r.loadBasedSplitter,
newReplicaSplitConfig(store.ClusterSettings()),
store.metrics.LoadSplitterMetrics,
store.rebalanceObjManager.Objective().ToSplitObjective(),
)
}

// NB: the state will be loaded when the replica gets initialized.
Expand Down
Loading

0 comments on commit 03cac61

Please sign in to comment.