Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kvserver: fix deadlock on rebalance obj change #97539

Merged
merged 1 commit into from
Feb 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 12 additions & 8 deletions pkg/kv/kvserver/asim/state/split_decider.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,9 @@ type loadSplitConfig struct {

// NewLoadBasedSplitter returns a new LoadBasedSplitter that may be used to
// find the midpoint based on recorded load.
func (lsc loadSplitConfig) NewLoadBasedSplitter(startTime time.Time) split.LoadBasedSplitter {
func (lsc loadSplitConfig) NewLoadBasedSplitter(
startTime time.Time, _ split.SplitObjective,
) split.LoadBasedSplitter {
return split.NewUnweightedFinder(startTime, lsc.randSource)
}

Expand All @@ -57,7 +59,7 @@ func (lsc loadSplitConfig) StatRetention() time.Duration {

// StatThreshold returns the threshold for load above which the range
// should be considered split.
func (lsc loadSplitConfig) StatThreshold() float64 {
func (lsc loadSplitConfig) StatThreshold(_ split.SplitObjective) float64 {
return lsc.settings.SplitQPSThreshold
}

Expand Down Expand Up @@ -85,7 +87,7 @@ func (s *SplitDecider) newDecider() *split.Decider {
split.Init(decider, s.splitConfig, &split.LoadSplitterMetrics{
PopularKeyCount: metric.NewCounter(metric.Metadata{}),
NoSplitKeyCount: metric.NewCounter(metric.Metadata{}),
})
}, split.SplitQPS)
return decider
}

Expand All @@ -100,11 +102,13 @@ func (s *SplitDecider) Record(tick time.Time, rangeID RangeID, le workload.LoadE
}

qps := LoadEventQPS(le)
shouldSplit := decider.Record(context.Background(), tick, int(qps), func() roachpb.Span {
return roachpb.Span{
Key: Key(le.Key).ToRKey().AsRawKey(),
}
})
shouldSplit := decider.Record(
context.Background(),
tick,
func(_ split.SplitObjective) int { return int(qps) },
func() roachpb.Span {
return roachpb.Span{Key: Key(le.Key).ToRKey().AsRawKey()}
})

if shouldSplit {
s.suggestions = append(s.suggestions, rangeID)
Expand Down
97 changes: 54 additions & 43 deletions pkg/kv/kvserver/merge_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/split"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/spanconfig"
Expand Down Expand Up @@ -185,7 +186,12 @@ var _ PurgatoryError = rangeMergePurgatoryError{}

func (mq *mergeQueue) requestRangeStats(
ctx context.Context, key roachpb.Key,
) (desc *roachpb.RangeDescriptor, stats enginepb.MVCCStats, ls loadSplitStat, err error) {
) (
desc *roachpb.RangeDescriptor,
stats enginepb.MVCCStats,
lbSnap split.LoadSplitSnapshot,
err error,
) {

ba := &kvpb.BatchRequest{}
ba.Add(&kvpb.RangeStatsRequest{
Expand All @@ -194,31 +200,36 @@ func (mq *mergeQueue) requestRangeStats(

br, pErr := mq.db.NonTransactionalSender().Send(ctx, ba)
if pErr != nil {
return nil, enginepb.MVCCStats{}, loadSplitStat{}, pErr.GoError()
return nil, enginepb.MVCCStats{}, lbSnap, pErr.GoError()
}
res := br.Responses[0].GetInner().(*kvpb.RangeStatsResponse)

desc = &res.RangeInfo.Desc
stats = res.MVCCStats

// The load based splitter will only track the max of at most one statistic
// at a time for load based splitting. This is either CPU or QPS. However
// we don't enforce that only one max stat is returned. If the split
// objective is currently CPU, it must be the case that every store in the
// cluster is running a version that populates MaxCPUPerSecond so we don't
// need to worry about it being 0 as default and passing the >= 0 check.
switch mq.store.splitConfig.SplitObjective() {
case SplitCPU:
ls.max = res.MaxCPUPerSecond
ls.ok = res.MaxCPUPerSecond >= 0
ls.typ = SplitCPU
case SplitQPS:
ls.max = res.MaxQueriesPerSecond
ls.ok = res.MaxQueriesPerSecond >= 0
ls.typ = SplitQPS
// at a time for load based splitting. This is either CPU or QPS. However we
// don't enforce that only one max stat is returned. We set QPS after CPU,
// possibly overwriting if both are set. A default value of 0 could be
// returned in the response if the rhs node is on a pre 23.1 version. To
// avoid merging the range due to low load (0 CPU), always overwrite the
// value if MaxQPS is set. If neither are >= 0, OK won't be set and the
// objective will be default value, QPS.
if res.MaxCPUPerSecond >= 0 {
lbSnap = split.LoadSplitSnapshot{
SplitObjective: split.SplitCPU,
Max: res.MaxCPUPerSecond,
Ok: true,
}
}

return desc, stats, ls, nil
if res.MaxQueriesPerSecond >= 0 {
lbSnap = split.LoadSplitSnapshot{
SplitObjective: split.SplitQPS,
Max: res.MaxQueriesPerSecond,
Ok: true,
}
}
return desc, stats, lbSnap, nil
}

func (mq *mergeQueue) process(
Expand All @@ -231,15 +242,14 @@ func (mq *mergeQueue) process(

lhsDesc := lhsRepl.Desc()
lhsStats := lhsRepl.GetMVCCStats()
lhsLoadSplitStat := lhsRepl.loadSplitStat(ctx)
minBytes := lhsRepl.GetMinBytes()
if lhsStats.Total() >= minBytes {
log.VEventf(ctx, 2, "skipping merge: LHS meets minimum size threshold %d with %d bytes",
minBytes, lhsStats.Total())
return false, nil
}

rhsDesc, rhsStats, rhsLoadSplitStat, err := mq.requestRangeStats(ctx, lhsDesc.EndKey.AsRawKey())
rhsDesc, rhsStats, rhsLoadSplitSnap, err := mq.requestRangeStats(ctx, lhsDesc.EndKey.AsRawKey())
if err != nil {
return false, err
}
Expand All @@ -265,11 +275,12 @@ func (mq *mergeQueue) process(
mergedStats := lhsStats
mergedStats.Add(rhsStats)

lhsLoadSplitSnap := lhsRepl.loadBasedSplitter.Snapshot(ctx, mq.store.Clock().PhysicalTime())
var loadMergeReason string
if lhsRepl.SplitByLoadEnabled() {
var canMergeLoad bool
if canMergeLoad, loadMergeReason = canMergeRangeLoad(
ctx, lhsLoadSplitStat, rhsLoadSplitStat, mq.store.splitConfig,
ctx, lhsLoadSplitSnap, rhsLoadSplitSnap,
); !canMergeLoad {
log.VEventf(ctx, 2, "skipping merge to avoid thrashing: merged range %s may split %s",
mergedDesc, loadMergeReason)
Expand Down Expand Up @@ -415,7 +426,7 @@ func (mq *mergeQueue) process(
// Adjust the splitter to account for the additional load from the RHS. We
// could just Reset the splitter, but then we'd need to wait out a full
// measurement period (default of 5m) before merging this range again.
if mergedLoadSplitStat := lhsLoadSplitStat.max + rhsLoadSplitStat.max; mergedLoadSplitStat != 0 {
if mergedLoadSplitStat := lhsLoadSplitSnap.Max + rhsLoadSplitSnap.Max; mergedLoadSplitStat != 0 {
lhsRepl.loadBasedSplitter.RecordMax(mq.store.Clock().PhysicalTime(), mergedLoadSplitStat)
}
return true, nil
Expand All @@ -439,7 +450,7 @@ func (mq *mergeQueue) updateChan() <-chan time.Time {
}

func canMergeRangeLoad(
ctx context.Context, lhs, rhs loadSplitStat, rsc *replicaSplitConfig,
ctx context.Context, lhs, rhs split.LoadSplitSnapshot,
) (can bool, reason string) {
// When load is a consideration for splits and, by extension, merges, the
// mergeQueue is fairly conservative. In an effort to avoid thrashing and to
Expand All @@ -451,47 +462,47 @@ func canMergeRangeLoad(
// maximum qps measurement from both sides to be sufficiently stable and
// reliable, meaning that it was a maximum measurement over some extended
// period of time.
if !lhs.ok {
if !lhs.Ok {
return false, "LHS load measurement not yet reliable"
}
if !rhs.ok {
if !rhs.Ok {
return false, "RHS load measurement not yet reliable"
}

// When the lhs and rhs split stats are of different types, or do not match
// the current split objective they cannot merge together. This could occur
// just after changing the split objective to a different value, where
// there is a mismatch.
splitObjective, splitThreshold := rsc.SplitObjective(), rsc.StatThreshold()
if lhs.typ != splitObjective {
return false, "LHS load measurement is a different type (%s) than current split objective (%s)"
}
if rhs.typ != splitObjective {
return false, "RHS load measurement is a different type (%s) than current split objective (%s)"
if lhs.SplitObjective != rhs.SplitObjective {
return false, fmt.Sprintf("LHS load measurement is a different type (%s) than the RHS (%s)",
lhs.SplitObjective,
rhs.SplitObjective,
)
}

obj := lhs.SplitObjective
// Check if the merged range would need to be split, if so, skip merge.
// Use a lower threshold for load based splitting so we don't find ourselves
// in a situation where we keep merging ranges that would be split soon after
// by a small increase in load.
merged := lhs.max + rhs.max
conservativeLoadBasedSplitThreshold := 0.5 * splitThreshold
merged := lhs.Max + rhs.Max
conservativeLoadBasedSplitThreshold := 0.5 * lhs.Threshold

if merged >= conservativeLoadBasedSplitThreshold {
return false, fmt.Sprintf("lhs+rhs %s (%s+%s=%s) above threshold (%s)",
splitObjective,
splitObjective.Format(lhs.max),
splitObjective.Format(lhs.max),
splitObjective.Format(merged),
splitObjective.Format(conservativeLoadBasedSplitThreshold),
obj,
obj.Format(lhs.Max),
obj.Format(rhs.Max),
obj.Format(merged),
obj.Format(conservativeLoadBasedSplitThreshold),
)
}

return true, fmt.Sprintf("lhs+rhs %s (%s+%s=%s) below threshold (%s)",
splitObjective,
splitObjective.Format(lhs.max),
splitObjective.Format(lhs.max),
splitObjective.Format(merged),
splitObjective.Format(conservativeLoadBasedSplitThreshold),
obj,
obj.Format(lhs.Max),
obj.Format(rhs.Max),
obj.Format(merged),
obj.Format(conservativeLoadBasedSplitThreshold),
)
}
12 changes: 8 additions & 4 deletions pkg/kv/kvserver/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -1301,10 +1301,12 @@ func (r *Replica) SetMVCCStatsForTesting(stats *enginepb.MVCCStats) {
//
// Use LoadStats.QueriesPerSecond for all other purposes.
func (r *Replica) GetMaxSplitQPS(ctx context.Context) (float64, bool) {
if r.store.splitConfig.SplitObjective() != SplitQPS {
snap := r.loadBasedSplitter.Snapshot(ctx, r.Clock().PhysicalTime())

if snap.SplitObjective != split.SplitQPS {
return 0, false
}
return r.loadBasedSplitter.MaxStat(ctx, r.Clock().PhysicalTime())
return snap.Max, snap.Ok
}

// GetMaxSplitCPU returns the Replica's maximum CPU/s rate over a configured
Expand All @@ -1317,10 +1319,12 @@ func (r *Replica) GetMaxSplitQPS(ctx context.Context) (float64, bool) {
// Use LoadStats.RaftCPUNanosPerSecond and RequestCPUNanosPerSecond for current
// CPU stats for all other purposes.
func (r *Replica) GetMaxSplitCPU(ctx context.Context) (float64, bool) {
if r.store.splitConfig.SplitObjective() != SplitCPU {
snap := r.loadBasedSplitter.Snapshot(ctx, r.Clock().PhysicalTime())

if snap.SplitObjective != split.SplitCPU {
return 0, false
}
return r.loadBasedSplitter.MaxStat(ctx, r.Clock().PhysicalTime())
return snap.Max, snap.Ok
}

// ContainsKey returns whether this range contains the specified key.
Expand Down
12 changes: 7 additions & 5 deletions pkg/kv/kvserver/replica_init.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,11 +107,6 @@ func newUninitializedReplica(
r.mu.stateLoader = stateloader.Make(rangeID)
r.mu.quiescent = true
r.mu.conf = store.cfg.DefaultSpanConfig
split.Init(
&r.loadBasedSplitter,
store.splitConfig,
store.metrics.LoadSplitterMetrics,
)

r.mu.proposals = map[kvserverbase.CmdIDKey]*ProposalData{}
r.mu.checksums = map[uuid.UUID]*replicaChecksum{}
Expand All @@ -123,8 +118,15 @@ func newUninitializedReplica(
if leaseHistoryMaxEntries > 0 {
r.leaseHistory = newLeaseHistory()
}

if store.cfg.StorePool != nil {
r.loadStats = load.NewReplicaLoad(store.Clock(), store.cfg.StorePool.GetNodeLocalityString)
split.Init(
&r.loadBasedSplitter,
newReplicaSplitConfig(store.ClusterSettings()),
store.metrics.LoadSplitterMetrics,
store.rebalanceObjManager.Objective().ToSplitObjective(),
)
}

// NB: the state will be loaded when the replica gets initialized.
Expand Down
Loading