Skip to content

Commit

Permalink
Merge #103776
Browse files Browse the repository at this point in the history
103776: kvserver: fix rebalance obj callback ctx r=knz a=kvoli

The `RebalanceObjectiveManager` updates the rebalance objective by using callbacks on cluster setting, cluster version and store descriptor gossip changes. The callback on store descriptor changes was re-using the initialization function's (`newRebalanceObjectiveManager`) context. This re-use could cause use of tracing spans after finish, if the callback path evaluates when initialized with a context containing tracing spans.

Use a background context in the store descriptor callback to prevent this problem.

This PR also adds an ambient context to the `RebalanceObjectiveManager`
and uses it to annotate callbacks.

Fixes: #103763

Release note: None

Co-authored-by: Austen McClernon <[email protected]>
  • Loading branch information
craig[bot] and kvoli committed May 24, 2023
2 parents ff6a4ac + 74e9d05 commit b737031
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 10 deletions.
31 changes: 23 additions & 8 deletions pkg/kv/kvserver/rebalance_objective.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ type gossipStoreCapacityChangeNotifier interface {
// interface and registers a callback at creation time, that will be called on
// a reblanace objective change.
type RebalanceObjectiveManager struct {
log.AmbientContext
st *cluster.Settings
storeDescProvider gossipStoreDescriptorProvider

Expand All @@ -181,20 +182,28 @@ type RebalanceObjectiveManager struct {

func newRebalanceObjectiveManager(
ctx context.Context,
ambientCtx log.AmbientContext,
st *cluster.Settings,
onChange func(ctx context.Context, obj LBRebalancingObjective),
storeDescProvider gossipStoreDescriptorProvider,
capacityChangeNotifier gossipStoreCapacityChangeNotifier,
) *RebalanceObjectiveManager {
rom := &RebalanceObjectiveManager{st: st, storeDescProvider: storeDescProvider}
rom := &RebalanceObjectiveManager{
st: st,
storeDescProvider: storeDescProvider,
AmbientContext: ambientCtx,
}
rom.AddLogTag("rebalance-objective", nil)
ctx = rom.AnnotateCtx(ctx)

rom.mu.obj = ResolveLBRebalancingObjective(ctx, st, storeDescProvider.GetStores())
rom.mu.onChange = onChange

LoadBasedRebalancingObjective.SetOnChange(&rom.st.SV, func(ctx context.Context) {
rom.maybeUpdateRebalanceObjective(ctx)
rom.maybeUpdateRebalanceObjective(rom.AnnotateCtx(ctx))
})
rom.st.Version.SetOnChange(func(ctx context.Context, _ clusterversion.ClusterVersion) {
rom.maybeUpdateRebalanceObjective(ctx)
rom.maybeUpdateRebalanceObjective(rom.AnnotateCtx(ctx))
})
// Rather than caching each capacity locally, use the callback as a trigger
// to recalculate the objective. This is less expensive than recacluating
Expand All @@ -208,7 +217,11 @@ func newRebalanceObjectiveManager(
capacityChangeNotifier.SetOnCapacityChange(
func(storeID roachpb.StoreID, old, cur roachpb.StoreCapacity) {
if (old.CPUPerSecond < 0) != (cur.CPUPerSecond < 0) {
rom.maybeUpdateRebalanceObjective(ctx)
// NB: On capacity changes we don't have access to a context. Create a
// background context on callback.
cbCtx, span := rom.AnnotateCtxWithSpan(context.Background(), "capacity-change")
defer span.Finish()
rom.maybeUpdateRebalanceObjective(cbCtx)
}
})

Expand All @@ -227,16 +240,18 @@ func (rom *RebalanceObjectiveManager) maybeUpdateRebalanceObjective(ctx context.
rom.mu.Lock()
defer rom.mu.Unlock()

ctx = rom.AnnotateCtx(ctx)
prev := rom.mu.obj
new := ResolveLBRebalancingObjective(ctx, rom.st, rom.storeDescProvider.GetStores())
next := ResolveLBRebalancingObjective(ctx, rom.st, rom.storeDescProvider.GetStores())
// Nothing to do when the objective hasn't changed.
if prev == new {
if prev == next {
return
}

log.Infof(ctx, "Updating the rebalance objective from %s to %s", prev.ToDimension(), new.ToDimension())
log.Infof(ctx, "Updating the rebalance objective from %s to %s",
prev, next)

rom.mu.obj = new
rom.mu.obj = next
rom.mu.onChange(ctx, rom.mu.obj)
}

Expand Down
4 changes: 3 additions & 1 deletion pkg/kv/kvserver/rebalance_objective_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,9 @@ func TestRebalanceObjectiveManager(t *testing.T) {
callbacks = append(callbacks, obj)
}
return newRebalanceObjectiveManager(
ctx, st, cb, providerNotifier, providerNotifier,
ctx, log.MakeTestingAmbientCtxWithNewTracer(),
st, cb,
providerNotifier, providerNotifier,
), &callbacks
}

Expand Down
5 changes: 4 additions & 1 deletion pkg/kv/kvserver/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -1289,7 +1289,10 @@ func NewStore(
allocatorStorePool = cfg.StorePool
storePoolIsDeterministic = allocatorStorePool.IsDeterministic()

s.rebalanceObjManager = newRebalanceObjectiveManager(ctx, s.cfg.Settings,
s.rebalanceObjManager = newRebalanceObjectiveManager(
ctx,
s.cfg.AmbientCtx,
s.cfg.Settings,
func(ctx context.Context, obj LBRebalancingObjective) {
s.VisitReplicas(func(r *Replica) (wantMore bool) {
r.loadBasedSplitter.SetSplitObjective(
Expand Down

0 comments on commit b737031

Please sign in to comment.