From b7e8046c6f2a1de1c8069a0f21f03aca2b30692f Mon Sep 17 00:00:00 2001 From: Alex Sarkesian Date: Mon, 7 Nov 2022 23:20:32 -0500 Subject: [PATCH 1/4] allocator: refactor StorePool usage to interface This change refactors the usage of `StorePool` in the allocator to a new interface, `AllocatorStorePool`, in order to be able to utilize a store pool with overriden liveness to properly evaluate decommission pre-flight checks. Part of #91570. Release note: None --- pkg/kv/kvserver/allocation_op.go | 18 +- .../allocator/allocatorimpl/allocator.go | 34 +-- .../allocator/allocatorimpl/allocator_test.go | 46 +++-- .../allocator/allocatorimpl/test_helpers.go | 6 +- .../kvserver/allocator/storepool/BUILD.bazel | 2 + .../allocator/storepool/store_pool.go | 193 ++++++++++++++---- .../allocator/storepool/store_pool_test.go | 42 ++-- .../allocator/storepool/test_helpers.go | 2 +- pkg/kv/kvserver/allocator_impl_test.go | 7 +- pkg/kv/kvserver/asim/state/impl.go | 1 + pkg/kv/kvserver/store.go | 8 +- pkg/kv/kvserver/store_pool_test.go | 17 +- pkg/kv/kvserver/store_rebalancer.go | 6 +- pkg/kv/kvserver/store_rebalancer_test.go | 10 +- 14 files changed, 266 insertions(+), 126 deletions(-) diff --git a/pkg/kv/kvserver/allocation_op.go b/pkg/kv/kvserver/allocation_op.go index a6f91e14ca5d..9ef3587a35c5 100644 --- a/pkg/kv/kvserver/allocation_op.go +++ b/pkg/kv/kvserver/allocation_op.go @@ -27,7 +27,7 @@ type AllocationOp interface { trackPlanningMetrics() // applyImpact updates the given storepool to reflect the result of // applying this operation. - applyImpact(storepool *storepool.StorePool) + applyImpact(storepool storepool.AllocatorStorePool) // lhBeingRemoved returns true when the leaseholder is will be removed if // this operation succeeds, otherwise false. lhBeingRemoved() bool @@ -49,7 +49,7 @@ func (o AllocationTransferLeaseOp) lhBeingRemoved() bool { return true } -func (o AllocationTransferLeaseOp) applyImpact(storepool *storepool.StorePool) { +func (o AllocationTransferLeaseOp) applyImpact(storepool storepool.AllocatorStorePool) { // TODO(kvoli): Currently the local storepool is updated directly in the // lease transfer call, rather than in this function. Move the storepool // tracking from rq.TransferLease to this function once #89771 is merged. @@ -89,7 +89,7 @@ func (o AllocationChangeReplicasOp) lhBeingRemoved() bool { // applyEstimatedImpact updates the given storepool to reflect the result // of applying this operation. -func (o AllocationChangeReplicasOp) applyImpact(storepool *storepool.StorePool) { +func (o AllocationChangeReplicasOp) applyImpact(storepool storepool.AllocatorStorePool) { for _, chg := range o.chgs { storepool.UpdateLocalStoreAfterRebalance(chg.Target.StoreID, o.usage, chg.ChangeType) } @@ -109,16 +109,16 @@ type AllocationFinalizeAtomicReplicationOp struct{} // TODO(kvoli): This always returns false, however it is possible that the LH // may have been removed here. -func (o AllocationFinalizeAtomicReplicationOp) lhBeingRemoved() bool { return false } -func (o AllocationFinalizeAtomicReplicationOp) applyImpact(storepool *storepool.StorePool) {} -func (o AllocationFinalizeAtomicReplicationOp) trackPlanningMetrics() {} +func (o AllocationFinalizeAtomicReplicationOp) lhBeingRemoved() bool { return false } +func (o AllocationFinalizeAtomicReplicationOp) applyImpact(storepool storepool.AllocatorStorePool) {} +func (o AllocationFinalizeAtomicReplicationOp) trackPlanningMetrics() {} // AllocationNoop represents no operation. type AllocationNoop struct{} -func (o AllocationNoop) lhBeingRemoved() bool { return false } -func (o AllocationNoop) applyImpact(storepool *storepool.StorePool) {} -func (o AllocationNoop) trackPlanningMetrics() {} +func (o AllocationNoop) lhBeingRemoved() bool { return false } +func (o AllocationNoop) applyImpact(storepool storepool.AllocatorStorePool) {} +func (o AllocationNoop) trackPlanningMetrics() {} // effectBuilder is a utility struct to track a list of effects, which may be // used to construct a single effect function that in turn calls all tracked diff --git a/pkg/kv/kvserver/allocator/allocatorimpl/allocator.go b/pkg/kv/kvserver/allocator/allocatorimpl/allocator.go index bd61ed4c9cf7..ddd6881f3f53 100644 --- a/pkg/kv/kvserver/allocator/allocatorimpl/allocator.go +++ b/pkg/kv/kvserver/allocator/allocatorimpl/allocator.go @@ -477,7 +477,8 @@ type AllocatorMetrics struct { // Allocator tries to spread replicas as evenly as possible across the stores // in the cluster. type Allocator struct { - StorePool *storepool.StorePool + st *cluster.Settings + StorePool storepool.AllocatorStorePool nodeLatencyFn func(addr string) (time.Duration, bool) // TODO(aayush): Let's replace this with a *rand.Rand that has a rand.Source // wrapped inside a mutex, to avoid misuse. @@ -509,7 +510,8 @@ func makeAllocatorMetrics() AllocatorMetrics { // MakeAllocator creates a new allocator using the specified StorePool. func MakeAllocator( - storePool *storepool.StorePool, + st *cluster.Settings, + storePool storepool.AllocatorStorePool, nodeLatencyFn func(addr string) (time.Duration, bool), knobs *allocator.TestingKnobs, ) Allocator { @@ -517,12 +519,14 @@ func MakeAllocator( // There are number of test cases that make a test store but don't add // gossip or a store pool. So we can't rely on the existence of the // store pool in those cases. - if storePool != nil && storePool.Deterministic { + if storePool != nil && storePool.IsDeterministic() { randSource = rand.NewSource(777) } else { + randSource = rand.NewSource(rand.Int63()) } allocator := Allocator{ + st: st, StorePool: storePool, nodeLatencyFn: nodeLatencyFn, randGen: makeAllocatorRand(randSource), @@ -931,7 +935,7 @@ func (a *Allocator) allocateTarget( // as possible, and therefore any store that is good enough will be // considered. var selector CandidateSelector - if replicaStatus == Alive || recoveryStoreSelector.Get(&a.StorePool.St.SV) == "best" { + if replicaStatus == Alive || recoveryStoreSelector.Get(&a.st.SV) == "best" { selector = a.NewBestCandidateSelector() } else { selector = a.NewGoodCandidateSelector() @@ -1515,8 +1519,8 @@ func (a Allocator) RebalanceNonVoter( func (a *Allocator) ScorerOptions(ctx context.Context) *RangeCountScorerOptions { return &RangeCountScorerOptions{ StoreHealthOptions: a.StoreHealthOptions(ctx), - deterministic: a.StorePool.Deterministic, - rangeRebalanceThreshold: RangeRebalanceThreshold.Get(&a.StorePool.St.SV), + deterministic: a.StorePool.IsDeterministic(), + rangeRebalanceThreshold: RangeRebalanceThreshold.Get(&a.st.SV), } } @@ -1525,7 +1529,7 @@ func (a *Allocator) ScorerOptionsForScatter(ctx context.Context) *ScatterScorerO return &ScatterScorerOptions{ RangeCountScorerOptions: RangeCountScorerOptions{ StoreHealthOptions: a.StoreHealthOptions(ctx), - deterministic: a.StorePool.Deterministic, + deterministic: a.StorePool.IsDeterministic(), rangeRebalanceThreshold: 0, }, // We set jitter to be equal to the padding around replica-count rebalancing @@ -1534,7 +1538,7 @@ func (a *Allocator) ScorerOptionsForScatter(ctx context.Context) *ScatterScorerO // made by the replicateQueue during normal course of operations. In other // words, we don't want stores that are too far away from the mean to be // affected by the jitter. - jitter: RangeRebalanceThreshold.Get(&a.StorePool.St.SV), + jitter: RangeRebalanceThreshold.Get(&a.st.SV), } } @@ -1691,10 +1695,10 @@ func (a *Allocator) leaseholderShouldMoveDueToPreferences( // storeHealthLogOnly. By default storeHealthBlockRebalanceTo is the action taken. When // there is a mixed version cluster, storeHealthNoAction is set instead. func (a *Allocator) StoreHealthOptions(_ context.Context) StoreHealthOptions { - enforcementLevel := StoreHealthEnforcement(l0SublevelsThresholdEnforce.Get(&a.StorePool.St.SV)) + enforcementLevel := StoreHealthEnforcement(l0SublevelsThresholdEnforce.Get(&a.st.SV)) return StoreHealthOptions{ EnforcementLevel: enforcementLevel, - L0SublevelThreshold: l0SublevelsThreshold.Get(&a.StorePool.St.SV), + L0SublevelThreshold: l0SublevelsThreshold.Get(&a.st.SV), } } @@ -1862,8 +1866,8 @@ func (a *Allocator) TransferLeaseTarget( storeDescMap, &QPSScorerOptions{ StoreHealthOptions: a.StoreHealthOptions(ctx), - QPSRebalanceThreshold: allocator.QPSRebalanceThreshold.Get(&a.StorePool.St.SV), - MinRequiredQPSDiff: allocator.MinQPSDifferenceForTransfers.Get(&a.StorePool.St.SV), + QPSRebalanceThreshold: allocator.QPSRebalanceThreshold.Get(&a.st.SV), + MinRequiredQPSDiff: allocator.MinQPSDifferenceForTransfers.Get(&a.st.SV), }, ) @@ -2066,7 +2070,7 @@ func (a Allocator) shouldTransferLeaseForAccessLocality( // stats and locality information to base our decision on. if statSummary == nil || statSummary.LocalityCounts == nil || - !EnableLoadBasedLeaseRebalancing.Get(&a.StorePool.St.SV) { + !EnableLoadBasedLeaseRebalancing.Get(&a.st.SV) { return decideWithoutStats, roachpb.ReplicaDescriptor{} } replicaLocalities := a.StorePool.GetLocalitiesByNode(existing) @@ -2128,7 +2132,7 @@ func (a Allocator) shouldTransferLeaseForAccessLocality( if !ok { continue } - addr, err := a.StorePool.Gossip.GetNodeIDAddress(repl.NodeID) + addr, err := a.StorePool.GossipNodeIDAddress(repl.NodeID) if err != nil { log.KvDistribution.Errorf(ctx, "missing address for n%d: %+v", repl.NodeID, err) continue @@ -2140,7 +2144,7 @@ func (a Allocator) shouldTransferLeaseForAccessLocality( remoteWeight := math.Max(minReplicaWeight, replicaWeights[repl.NodeID]) replScore, rebalanceAdjustment := loadBasedLeaseRebalanceScore( - ctx, a.StorePool.St, remoteWeight, remoteLatency, storeDesc, sourceWeight, source, candidateLeasesMean) + ctx, a.st, remoteWeight, remoteLatency, storeDesc, sourceWeight, source, candidateLeasesMean) if replScore > bestReplScore { bestReplScore = replScore bestRepl = repl diff --git a/pkg/kv/kvserver/allocator/allocatorimpl/allocator_test.go b/pkg/kv/kvserver/allocator/allocatorimpl/allocator_test.go index 31576ed24cd1..df8207b30d2a 100644 --- a/pkg/kv/kvserver/allocator/allocatorimpl/allocator_test.go +++ b/pkg/kv/kvserver/allocator/allocatorimpl/allocator_test.go @@ -529,8 +529,8 @@ func mockStorePool( for _, storeID := range suspectedStoreIDs { liveNodeSet[roachpb.NodeID(storeID)] = livenesspb.NodeLivenessStatus_LIVE detail := storePool.GetStoreDetailLocked(storeID) - detail.LastAvailable = storePool.Clock.Now().GoTime() - detail.LastUnavailable = storePool.Clock.Now().GoTime() + detail.LastAvailable = storePool.Clock().Now().GoTime() + detail.LastUnavailable = storePool.Clock().Now().GoTime() detail.Desc = &roachpb.StoreDescriptor{ StoreID: storeID, Node: roachpb.NodeDescriptor{NodeID: roachpb.NodeID(storeID)}, @@ -538,6 +538,7 @@ func mockStorePool( } // Set the node liveness function using the set we constructed. + // TODO(sarkesian): This override needs to be fixed to stop exporting this field. storePool.NodeLivenessFn = func(nodeID roachpb.NodeID, now time.Time, threshold time.Duration) livenesspb.NodeLivenessStatus { if status, ok := liveNodeSet[nodeID]; ok { @@ -689,7 +690,7 @@ func TestAllocatorReadAmpCheck(t *testing.T) { sg.GossipStores(test.stores, t) // Enable read disk health checking in candidate exclusion. - l0SublevelsThresholdEnforce.Override(ctx, &a.StorePool.St.SV, int64(test.enforcement)) + l0SublevelsThresholdEnforce.Override(ctx, &a.st.SV, int64(test.enforcement)) // Allocate a voter where all replicas are alive (e.g. up-replicating a valid range). add, _, err := a.AllocateVoter( @@ -1326,8 +1327,7 @@ func TestAllocatorRebalanceThrashing(t *testing.T) { stopper, g, _, a, _ := CreateTestAllocator(ctx, 1, true /* deterministic */) defer stopper.Stop(ctx) - st := a.StorePool.St - cluster := tc.cluster(st) + cluster := tc.cluster(a.st) // It doesn't make sense to test sets of stores containing fewer than 4 // stores, because 4 stores is the minimum number of stores needed to @@ -2029,11 +2029,12 @@ func TestAllocatorTransferLeaseTargetDraining(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) ctx := context.Background() - stopper, g, _, storePool, nl := storepool.CreateTestStorePool(ctx, + st := cluster.MakeTestingClusterSettings() + stopper, g, _, storePool, nl := storepool.CreateTestStorePool(ctx, st, storepool.TestTimeUntilStoreDeadOff, true, /* deterministic */ func() int { return 10 }, /* nodeCount */ livenesspb.NodeLivenessStatus_LIVE) - a := MakeAllocator(storePool, func(string) (time.Duration, bool) { + a := MakeAllocator(st, storePool, func(string) (time.Duration, bool) { return 0, true }, nil) defer stopper.Stop(ctx) @@ -2413,11 +2414,12 @@ func TestAllocatorShouldTransferLeaseDraining(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) ctx := context.Background() - stopper, g, _, storePool, nl := storepool.CreateTestStorePool(ctx, + st := cluster.MakeTestingClusterSettings() + stopper, g, _, storePool, nl := storepool.CreateTestStorePool(ctx, st, storepool.TestTimeUntilStoreDeadOff, true, /* deterministic */ func() int { return 10 }, /* nodeCount */ livenesspb.NodeLivenessStatus_LIVE) - a := MakeAllocator(storePool, func(string) (time.Duration, bool) { + a := MakeAllocator(st, storePool, func(string) (time.Duration, bool) { return 0, true }, nil) defer stopper.Stop(context.Background()) @@ -2479,11 +2481,12 @@ func TestAllocatorShouldTransferSuspected(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) ctx := context.Background() - stopper, g, clock, storePool, nl := storepool.CreateTestStorePool(ctx, + st := cluster.MakeTestingClusterSettings() + stopper, g, clock, storePool, nl := storepool.CreateTestStorePool(ctx, st, storepool.TestTimeUntilStoreDeadOff, true, /* deterministic */ func() int { return 10 }, /* nodeCount */ livenesspb.NodeLivenessStatus_LIVE) - a := MakeAllocator(storePool, func(string) (time.Duration, bool) { + a := MakeAllocator(st, storePool, func(string) (time.Duration, bool) { return 0, true }, nil) defer stopper.Stop(context.Background()) @@ -2512,7 +2515,7 @@ func TestAllocatorShouldTransferSuspected(t *testing.T) { ) require.Equal(t, expected, result) } - timeAfterStoreSuspect := storepool.TimeAfterStoreSuspect.Get(&storePool.St.SV) + timeAfterStoreSuspect := storepool.TimeAfterStoreSuspect.Get(&a.st.SV) // Based on capacity node 1 is desirable. assertShouldTransferLease(true) // Flip node 1 to unavailable, there should be no lease transfer now. @@ -3394,7 +3397,8 @@ func TestAllocateCandidatesExcludeNonReadyNodes(t *testing.T) { removalConstraintsChecker := voterConstraintsCheckerForRemoval(analyzed, constraint.EmptyAnalyzedConstraints) rebalanceConstraintsChecker := voterConstraintsCheckerForRebalance(analyzed, constraint.EmptyAnalyzedConstraints) - a.StorePool.IsStoreReadyForRoutineReplicaTransfer = func(_ context.Context, storeID roachpb.StoreID) bool { + storePool := a.StorePool.(*storepool.StorePool) + storePool.OverrideIsStoreReadyForRoutineReplicaTransferFn = func(_ context.Context, storeID roachpb.StoreID) bool { for _, s := range tc.excluded { if s == storeID { return false @@ -5262,7 +5266,8 @@ func TestAllocatorTransferLeaseTargetLoadBased(t *testing.T) { defer log.Scope(t).Close(t) ctx := context.Background() - stopper, g, _, storePool, _ := storepool.CreateTestStorePool(ctx, + st := cluster.MakeTestingClusterSettings() + stopper, g, _, storePool, _ := storepool.CreateTestStorePool(ctx, st, storepool.TestTimeUntilStoreDeadOff, true, /* deterministic */ func() int { return 10 }, /* nodeCount */ livenesspb.NodeLivenessStatus_LIVE) @@ -5407,7 +5412,7 @@ func TestAllocatorTransferLeaseTargetLoadBased(t *testing.T) { for _, c := range testCases { t.Run("", func(t *testing.T) { - a := MakeAllocator(storePool, func(addr string) (time.Duration, bool) { + a := MakeAllocator(st, storePool, func(addr string) (time.Duration, bool) { return c.latency[addr], true }, nil) target := a.TransferLeaseTarget( @@ -6979,11 +6984,12 @@ func TestAllocatorComputeActionDynamicNumReplicas(t *testing.T) { var numNodes int ctx := context.Background() - stopper, _, _, sp, _ := storepool.CreateTestStorePool(ctx, + st := cluster.MakeTestingClusterSettings() + stopper, _, _, sp, _ := storepool.CreateTestStorePool(ctx, st, storepool.TestTimeUntilStoreDeadOff, false, /* deterministic */ func() int { return numNodes }, livenesspb.NodeLivenessStatus_LIVE) - a := MakeAllocator(sp, func(string) (time.Duration, bool) { + a := MakeAllocator(st, sp, func(string) (time.Duration, bool) { return 0, true }, nil) @@ -7089,7 +7095,7 @@ func TestAllocatorComputeActionNoStorePool(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - a := MakeAllocator(nil, nil, nil) + a := MakeAllocator(nil, nil, nil, nil) action, priority := a.ComputeAction(context.Background(), roachpb.SpanConfig{}, nil) if action != AllocatorNoop { t.Errorf("expected AllocatorNoop, but got %v", action) @@ -7730,7 +7736,7 @@ func TestAllocatorFullDisks(t *testing.T) { mockNodeLiveness.NodeLivenessFunc, false, /* deterministic */ ) - alloc := MakeAllocator(sp, func(string) (time.Duration, bool) { + alloc := MakeAllocator(st, sp, func(string) (time.Duration, bool) { return 0, false }, nil) @@ -8176,7 +8182,7 @@ func exampleRebalancing( storepool.NewMockNodeLiveness(livenesspb.NodeLivenessStatus_LIVE).NodeLivenessFunc, /* deterministic */ true, ) - alloc := MakeAllocator(sp, func(string) (time.Duration, bool) { + alloc := MakeAllocator(st, sp, func(string) (time.Duration, bool) { return 0, false }, nil) diff --git a/pkg/kv/kvserver/allocator/allocatorimpl/test_helpers.go b/pkg/kv/kvserver/allocator/allocatorimpl/test_helpers.go index 7f8e765ac8f9..8d9c88e84ee5 100644 --- a/pkg/kv/kvserver/allocator/allocatorimpl/test_helpers.go +++ b/pkg/kv/kvserver/allocator/allocatorimpl/test_helpers.go @@ -12,6 +12,7 @@ package allocatorimpl import ( "context" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" "time" "github.com/cockroachdb/cockroach/pkg/gossip" @@ -36,11 +37,12 @@ func CreateTestAllocator( func CreateTestAllocatorWithKnobs( ctx context.Context, numNodes int, deterministic bool, knobs *allocator.TestingKnobs, ) (*stop.Stopper, *gossip.Gossip, *storepool.StorePool, Allocator, *timeutil.ManualTime) { - stopper, g, manual, storePool, _ := storepool.CreateTestStorePool(ctx, + st := cluster.MakeTestingClusterSettings() + stopper, g, manual, storePool, _ := storepool.CreateTestStorePool(ctx, st, storepool.TestTimeUntilStoreDeadOff, deterministic, func() int { return numNodes }, livenesspb.NodeLivenessStatus_LIVE) - a := MakeAllocator(storePool, func(string) (time.Duration, bool) { + a := MakeAllocator(st, storePool, func(string) (time.Duration, bool) { return 0, true }, knobs) return stopper, g, storePool, a, manual diff --git a/pkg/kv/kvserver/allocator/storepool/BUILD.bazel b/pkg/kv/kvserver/allocator/storepool/BUILD.bazel index 8dedef999c49..cf76d060eeed 100644 --- a/pkg/kv/kvserver/allocator/storepool/BUILD.bazel +++ b/pkg/kv/kvserver/allocator/storepool/BUILD.bazel @@ -20,6 +20,7 @@ go_library( "//pkg/rpc", "//pkg/settings", "//pkg/settings/cluster", + "//pkg/util", "//pkg/util/hlc", "//pkg/util/humanizeutil", "//pkg/util/log", @@ -41,6 +42,7 @@ go_test( "//pkg/kv/kvserver/liveness/livenesspb", "//pkg/roachpb", "//pkg/testutils/gossiputil", + "//pkg/util", "//pkg/util/hlc", "//pkg/util/leaktest", "//pkg/util/log", diff --git a/pkg/kv/kvserver/allocator/storepool/store_pool.go b/pkg/kv/kvserver/allocator/storepool/store_pool.go index d6d7266cb41b..64da8c5d5d7c 100644 --- a/pkg/kv/kvserver/allocator/storepool/store_pool.go +++ b/pkg/kv/kvserver/allocator/storepool/store_pool.go @@ -24,6 +24,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/util" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/humanizeutil" "github.com/cockroachdb/cockroach/pkg/util/log" @@ -330,24 +331,100 @@ type localityWithString struct { str string } +// AllocatorStorePool provides an interface for use by the allocator to a list +// of all known stores in the cluster and information on their health. +type AllocatorStorePool interface { + // ClusterNodeCount returns the number of nodes that are possible allocation + // targets. + // See comment on StorePool.ClusterNodeCount(). + ClusterNodeCount() int + + // IsDeterministic returns true iff the pool is configured to be deterministic. + IsDeterministic() bool + + // IsStoreReadyForRoutineReplicaTransfer returns true iff the store's node is + // live (as indicated by its `NodeLivenessStatus`) and thus a legal candidate + // to receive a replica. + // See comment on StorePool.IsStoreReadyForRoutineReplicaTransfer(..). + IsStoreReadyForRoutineReplicaTransfer(ctx context.Context, targetStoreID roachpb.StoreID) bool + + // Clock returns the store pool's clock. + // TODO(sarkesian): If possible, this should be removed. + Clock() *hlc.Clock + + // DecommissioningReplicas selects the replicas on decommissioning + // node/stores from the provided list. + DecommissioningReplicas(repls []roachpb.ReplicaDescriptor) []roachpb.ReplicaDescriptor + + // GetLocalitiesByNode returns the localities for the provided replicas by NodeID. + // See comment on StorePool.GetLocalitiesByNode(..). + GetLocalitiesByNode(replicas []roachpb.ReplicaDescriptor) map[roachpb.NodeID]roachpb.Locality + + // GetLocalitiesByStore returns the localities for the provided replicas by StoreID. + // See comment on StorePool.GetLocalitiesByStore(..). + GetLocalitiesByStore(replicas []roachpb.ReplicaDescriptor) map[roachpb.StoreID]roachpb.Locality + + // GetStores returns information on all the stores with descriptor in the pool. + // See comment on StorePool.GetStores(). + GetStores() map[roachpb.StoreID]roachpb.StoreDescriptor + + // GetStoreDescriptor returns the latest store descriptor for the given + // storeID. + GetStoreDescriptor(storeID roachpb.StoreID) (roachpb.StoreDescriptor, bool) + + // GetStoreList returns a storeList of active stores based on a filter. + // See comment on StorePool.GetStoreList(..). + GetStoreList(filter StoreFilter) (StoreList, int, ThrottledStoreReasons) + + // GetStoreListFromIDs is the same function as GetStoreList but only returns stores + // from the subset of passed in store IDs. + GetStoreListFromIDs( + storeIDs roachpb.StoreIDSlice, + filter StoreFilter, + ) (StoreList, int, ThrottledStoreReasons) + + // GossipNodeIDAddress looks up the RPC address for the given node via gossip. + GossipNodeIDAddress(nodeID roachpb.NodeID) (*util.UnresolvedAddr, error) + + // LiveAndDeadReplicas divides the provided repls slice into two slices: the + // first for live replicas, and the second for dead replicas. + // See comment on StorePool.LiveAndDeadReplicas(..). + LiveAndDeadReplicas( + repls []roachpb.ReplicaDescriptor, + includeSuspectAndDrainingStores bool, + ) (liveReplicas, deadReplicas []roachpb.ReplicaDescriptor) + + // UpdateLocalStoreAfterRebalance is used to update the local copy of the + // target store immediately after a replica addition or removal. + UpdateLocalStoreAfterRebalance( + storeID roachpb.StoreID, + rangeUsageInfo allocator.RangeUsageInfo, + changeType roachpb.ReplicaChangeType, + ) + + // UpdateLocalStoresAfterLeaseTransfer is used to update the local copies of the + // involved store descriptors immediately after a lease transfer. + UpdateLocalStoresAfterLeaseTransfer(from roachpb.StoreID, to roachpb.StoreID, rangeQPS float64) +} + // StorePool maintains a list of all known stores in the cluster and // information on their health. -// -// TODO(irfansharif): Mediate access through a thin interface. type StorePool struct { log.AmbientContext - St *cluster.Settings // TODO(irfansharif): Shouldn't need to be exported. + st *cluster.Settings - Clock *hlc.Clock - Gossip *gossip.Gossip // TODO(irfansharif): Shouldn't need to be exported. + clock *hlc.Clock + gossip *gossip.Gossip nodeCountFn NodeCountFunc NodeLivenessFn NodeLivenessFunc startTime time.Time - Deterministic bool + deterministic bool + // We use separate mutexes for storeDetails and nodeLocalities because the // nodeLocalities map is used in the critical code path of Replica.Send() // and we'd rather not block that on something less important accessing // storeDetails. + // NB: Exported for use in tests and allocator simulator. DetailsMu struct { syncutil.RWMutex StoreDetails map[roachpb.StoreID]*StoreDetail @@ -357,22 +434,15 @@ type StorePool struct { nodeLocalities map[roachpb.NodeID]localityWithString } - // IsStoreReadyForRoutineReplicaTransfer returns true iff the store's node is - // live (as indicated by its `NodeLivenessStatus`) and thus a legal candidate - // to receive a replica. This is defined as a closure reference here instead + // OverrideIsStoreReadyForRoutineReplicaTransferFn, if set, is used in + // IsStoreReadyForRoutineReplicaTransfer. This is defined as a closure reference here instead // of a regular method so it can be overridden in tests. - // - // NB: What this method aims to capture is distinct from "dead" nodes. Nodes - // are classified as "dead" if they haven't successfully heartbeat their - // liveness record in the last `server.time_until_store_dead` seconds. - // - // Functionally, the distinction is that we simply avoid transferring replicas - // to "non-ready" nodes (i.e. nodes that _currently_ have a non-live - // `NodeLivenessStatus`), whereas we _actively move replicas off of "dead" - // nodes_. - IsStoreReadyForRoutineReplicaTransfer func(context.Context, roachpb.StoreID) bool + // TODO(sarkesian): Consider moving to a TestingKnobs struct. + OverrideIsStoreReadyForRoutineReplicaTransferFn func(context.Context, roachpb.StoreID) bool } +var _ AllocatorStorePool = &StorePool{} + // NewStorePool creates a StorePool and registers the store updating callback // with gossip. func NewStorePool( @@ -386,15 +456,14 @@ func NewStorePool( ) *StorePool { sp := &StorePool{ AmbientContext: ambient, - St: st, - Clock: clock, - Gossip: g, + st: st, + clock: clock, + gossip: g, nodeCountFn: nodeCountFn, NodeLivenessFn: nodeLivenessFn, startTime: clock.PhysicalTime(), - Deterministic: deterministic, + deterministic: deterministic, } - sp.IsStoreReadyForRoutineReplicaTransfer = sp.isStoreReadyForRoutineReplicaTransferInternal sp.DetailsMu.StoreDetails = make(map[roachpb.StoreID]*StoreDetail) sp.localitiesMu.nodeLocalities = make(map[roachpb.NodeID]localityWithString) @@ -418,9 +487,9 @@ func (sp *StorePool) String() string { sort.Sort(ids) var buf bytes.Buffer - now := sp.Clock.Now().GoTime() - timeUntilStoreDead := TimeUntilStoreDead.Get(&sp.St.SV) - timeAfterStoreSuspect := TimeAfterStoreSuspect.Get(&sp.St.SV) + now := sp.clock.Now().GoTime() + timeUntilStoreDead := TimeUntilStoreDead.Get(&sp.st.SV) + timeAfterStoreSuspect := TimeAfterStoreSuspect.Get(&sp.st.SV) for _, id := range ids { detail := sp.DetailsMu.StoreDetails[id] @@ -454,7 +523,7 @@ func (sp *StorePool) storeGossipUpdate(_ string, content roachpb.Value) { sp.DetailsMu.Lock() detail := sp.GetStoreDetailLocked(storeDesc.StoreID) detail.Desc = &storeDesc - detail.LastUpdatedTime = sp.Clock.PhysicalTime() + detail.LastUpdatedTime = sp.clock.PhysicalTime() sp.DetailsMu.Unlock() sp.localitiesMu.Lock() @@ -590,9 +659,9 @@ func (sp *StorePool) DecommissioningReplicas( // NB: We use clock.Now().GoTime() instead of clock.PhysicalTime() is order to // take clock signals from remote nodes into consideration. - now := sp.Clock.Now().GoTime() - timeUntilStoreDead := TimeUntilStoreDead.Get(&sp.St.SV) - timeAfterStoreSuspect := TimeAfterStoreSuspect.Get(&sp.St.SV) + now := sp.clock.Now().GoTime() + timeUntilStoreDead := TimeUntilStoreDead.Get(&sp.st.SV) + timeAfterStoreSuspect := TimeAfterStoreSuspect.Get(&sp.st.SV) for _, repl := range repls { detail := sp.GetStoreDetailLocked(repl.StoreID) @@ -611,6 +680,16 @@ func (sp *StorePool) ClusterNodeCount() int { return sp.nodeCountFn() } +// Clock returns the store pool's clock. +func (sp *StorePool) Clock() *hlc.Clock { + return sp.clock +} + +// IsDeterministic returns true iff the pool is configured to be deterministic. +func (sp *StorePool) IsDeterministic() bool { + return sp.deterministic +} + // IsDead determines if a store is dead. It will return an error if the store is // not found in the store pool or the status is unknown. If the store is not dead, // it returns the time to death. @@ -624,8 +703,8 @@ func (sp *StorePool) IsDead(storeID roachpb.StoreID) (bool, time.Duration, error } // NB: We use clock.Now().GoTime() instead of clock.PhysicalTime() is order to // take clock signals from remote nodes into consideration. - now := sp.Clock.Now().GoTime() - timeUntilStoreDead := TimeUntilStoreDead.Get(&sp.St.SV) + now := sp.clock.Now().GoTime() + timeUntilStoreDead := TimeUntilStoreDead.Get(&sp.st.SV) deadAsOf := sd.LastUpdatedTime.Add(timeUntilStoreDead) if now.After(deadAsOf) { @@ -681,9 +760,9 @@ func (sp *StorePool) storeStatus(storeID roachpb.StoreID) (storeStatus, error) { } // NB: We use clock.Now().GoTime() instead of clock.PhysicalTime() is order to // take clock signals from remote nodes into consideration. - now := sp.Clock.Now().GoTime() - timeUntilStoreDead := TimeUntilStoreDead.Get(&sp.St.SV) - timeAfterStoreSuspect := TimeAfterStoreSuspect.Get(&sp.St.SV) + now := sp.clock.Now().GoTime() + timeUntilStoreDead := TimeUntilStoreDead.Get(&sp.st.SV) + timeAfterStoreSuspect := TimeAfterStoreSuspect.Get(&sp.st.SV) return sd.status(now, timeUntilStoreDead, sp.NodeLivenessFn, timeAfterStoreSuspect), nil } @@ -705,9 +784,9 @@ func (sp *StorePool) LiveAndDeadReplicas( sp.DetailsMu.Lock() defer sp.DetailsMu.Unlock() - now := sp.Clock.Now().GoTime() - timeUntilStoreDead := TimeUntilStoreDead.Get(&sp.St.SV) - timeAfterStoreSuspect := TimeAfterStoreSuspect.Get(&sp.St.SV) + now := sp.clock.Now().GoTime() + timeUntilStoreDead := TimeUntilStoreDead.Get(&sp.st.SV) + timeAfterStoreSuspect := TimeAfterStoreSuspect.Get(&sp.st.SV) for _, repl := range repls { detail := sp.GetStoreDetailLocked(repl.StoreID) @@ -902,7 +981,7 @@ func (sp *StorePool) GetStoreListFromIDs( func (sp *StorePool) getStoreListFromIDsLocked( storeIDs roachpb.StoreIDSlice, filter StoreFilter, ) (StoreList, int, ThrottledStoreReasons) { - if sp.Deterministic { + if sp.deterministic { sort.Sort(storeIDs) } else { shuffle.Shuffle(storeIDs) @@ -912,9 +991,9 @@ func (sp *StorePool) getStoreListFromIDsLocked( var throttled ThrottledStoreReasons var storeDescriptors []roachpb.StoreDescriptor - now := sp.Clock.Now().GoTime() - timeUntilStoreDead := TimeUntilStoreDead.Get(&sp.St.SV) - timeAfterStoreSuspect := TimeAfterStoreSuspect.Get(&sp.St.SV) + now := sp.clock.Now().GoTime() + timeUntilStoreDead := TimeUntilStoreDead.Get(&sp.st.SV) + timeAfterStoreSuspect := TimeAfterStoreSuspect.Get(&sp.st.SV) for _, storeID := range storeIDs { detail, ok := sp.DetailsMu.StoreDetails[storeID] @@ -975,8 +1054,8 @@ func (sp *StorePool) Throttle(reason ThrottleReason, why string, storeID roachpb // configured timeout period has passed. switch reason { case ThrottleFailed: - timeout := FailedReservationsTimeout.Get(&sp.St.SV) - detail.ThrottledUntil = sp.Clock.PhysicalTime().Add(timeout) + timeout := FailedReservationsTimeout.Get(&sp.st.SV) + detail.ThrottledUntil = sp.clock.PhysicalTime().Add(timeout) if log.V(2) { ctx := sp.AnnotateCtx(context.TODO()) log.Infof(ctx, "snapshot failed (%s), s%d will be throttled for %s until %s", @@ -1030,6 +1109,11 @@ func (sp *StorePool) GetLocalitiesByNode( return localities } +// GossipNodeIDAddress looks up the RPC address for the given node via gossip. +func (sp *StorePool) GossipNodeIDAddress(nodeID roachpb.NodeID) (*util.UnresolvedAddr, error) { + return sp.gossip.GetNodeIDAddress(nodeID) +} + // GetNodeLocalityString returns the locality information for the given node // in its string format. func (sp *StorePool) GetNodeLocalityString(nodeID roachpb.NodeID) string { @@ -1042,6 +1126,25 @@ func (sp *StorePool) GetNodeLocalityString(nodeID roachpb.NodeID) string { return locality.str } +// IsStoreReadyForRoutineReplicaTransfer returns true iff the store's node is +// live (as indicated by its `NodeLivenessStatus`) and thus a legal candidate +// to receive a replica. +// +// NB: What this method aims to capture is distinct from "dead" nodes. Nodes +// are classified as "dead" if they haven't successfully heartbeat their +// liveness record in the last `server.time_until_store_dead` seconds. +// +// Functionally, the distinction is that we simply avoid transferring replicas +// to "non-ready" nodes (i.e. nodes that _currently_ have a non-live +// `NodeLivenessStatus`), whereas we _actively move replicas off of "dead" +// nodes_. +func (sp *StorePool) IsStoreReadyForRoutineReplicaTransfer(ctx context.Context, targetStoreID roachpb.StoreID) bool { + if sp.OverrideIsStoreReadyForRoutineReplicaTransferFn != nil { + return sp.OverrideIsStoreReadyForRoutineReplicaTransferFn(ctx, targetStoreID) + } + return sp.isStoreReadyForRoutineReplicaTransferInternal(ctx, targetStoreID) +} + func (sp *StorePool) isStoreReadyForRoutineReplicaTransferInternal( ctx context.Context, targetStoreID roachpb.StoreID, ) bool { diff --git a/pkg/kv/kvserver/allocator/storepool/store_pool_test.go b/pkg/kv/kvserver/allocator/storepool/store_pool_test.go index 2dababee9d9f..bdea163c45d1 100644 --- a/pkg/kv/kvserver/allocator/storepool/store_pool_test.go +++ b/pkg/kv/kvserver/allocator/storepool/store_pool_test.go @@ -21,6 +21,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness/livenesspb" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/testutils/gossiputil" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" @@ -52,7 +53,8 @@ func TestStorePoolGossipUpdate(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) ctx := context.Background() - stopper, g, _, sp, _ := CreateTestStorePool(ctx, + st := cluster.MakeTestingClusterSettings() + stopper, g, _, sp, _ := CreateTestStorePool(ctx, st, TestTimeUntilStoreDead, false, /* deterministic */ func() int { return 0 }, /* NodeCount */ livenesspb.NodeLivenessStatus_DEAD) @@ -120,8 +122,9 @@ func TestStorePoolGetStoreList(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) ctx := context.Background() + st := cluster.MakeTestingClusterSettings() // We're going to manually mark stores dead in this test. - stopper, g, _, sp, mnl := CreateTestStorePool(ctx, + stopper, g, _, sp, mnl := CreateTestStorePool(ctx, st, TestTimeUntilStoreDead, false, /* deterministic */ func() int { return 10 }, /* nodeCount */ livenesspb.NodeLivenessStatus_DEAD) @@ -203,10 +206,10 @@ func TestStorePoolGetStoreList(t *testing.T) { mnl.SetNodeStatus(deadStore.Node.NodeID, livenesspb.NodeLivenessStatus_DEAD) sp.DetailsMu.Lock() // Set declinedStore as throttled. - sp.DetailsMu.StoreDetails[declinedStore.StoreID].ThrottledUntil = sp.Clock.Now().GoTime().Add(time.Hour) + sp.DetailsMu.StoreDetails[declinedStore.StoreID].ThrottledUntil = sp.clock.Now().GoTime().Add(time.Hour) // Set suspectedStore as suspected. - sp.DetailsMu.StoreDetails[suspectedStore.StoreID].LastAvailable = sp.Clock.Now().GoTime() - sp.DetailsMu.StoreDetails[suspectedStore.StoreID].LastUnavailable = sp.Clock.Now().GoTime() + sp.DetailsMu.StoreDetails[suspectedStore.StoreID].LastAvailable = sp.clock.Now().GoTime() + sp.DetailsMu.StoreDetails[suspectedStore.StoreID].LastUnavailable = sp.clock.Now().GoTime() sp.DetailsMu.Unlock() // No filter or limited set of store IDs. @@ -417,7 +420,8 @@ func TestStorePoolGetStoreDetails(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) ctx := context.Background() - stopper, g, _, sp, _ := CreateTestStorePool(ctx, + st := cluster.MakeTestingClusterSettings() + stopper, g, _, sp, _ := CreateTestStorePool(ctx, st, TestTimeUntilStoreDead, false, /* deterministic */ func() int { return 10 }, /* nodeCount */ livenesspb.NodeLivenessStatus_DEAD) @@ -439,7 +443,8 @@ func TestStorePoolFindDeadReplicas(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) ctx := context.Background() - stopper, g, _, sp, mnl := CreateTestStorePool(ctx, + st := cluster.MakeTestingClusterSettings() + stopper, g, _, sp, mnl := CreateTestStorePool(ctx, st, TestTimeUntilStoreDead, false, /* deterministic */ func() int { return 10 }, /* nodeCount */ livenesspb.NodeLivenessStatus_DEAD) @@ -544,7 +549,8 @@ func TestStorePoolDefaultState(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) ctx := context.Background() - stopper, _, _, sp, _ := CreateTestStorePool(ctx, + st := cluster.MakeTestingClusterSettings() + stopper, _, _, sp, _ := CreateTestStorePool(ctx, st, TestTimeUntilStoreDead, false, /* deterministic */ func() int { return 10 }, /* nodeCount */ livenesspb.NodeLivenessStatus_DEAD) @@ -574,7 +580,8 @@ func TestStorePoolThrottle(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) ctx := context.Background() - stopper, g, _, sp, _ := CreateTestStorePool(ctx, + st := cluster.MakeTestingClusterSettings() + stopper, g, _, sp, _ := CreateTestStorePool(ctx, st, TestTimeUntilStoreDead, false, /* deterministic */ func() int { return 10 }, /* nodeCount */ livenesspb.NodeLivenessStatus_DEAD) @@ -583,7 +590,7 @@ func TestStorePoolThrottle(t *testing.T) { sg := gossiputil.NewStoreGossiper(g) sg.GossipStores(uniqueStore, t) - expected := sp.Clock.Now().GoTime().Add(FailedReservationsTimeout.Get(&sp.St.SV)) + expected := sp.clock.Now().GoTime().Add(FailedReservationsTimeout.Get(&sp.st.SV)) sp.Throttle(ThrottleFailed, "", 1) sp.DetailsMu.Lock() @@ -599,7 +606,8 @@ func TestStorePoolSuspected(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) ctx := context.Background() - stopper, g, _, sp, mnl := CreateTestStorePool(ctx, + st := cluster.MakeTestingClusterSettings() + stopper, g, _, sp, mnl := CreateTestStorePool(ctx, st, TestTimeUntilStoreDeadOff, false, /* deterministic */ func() int { return 10 }, /* nodeCount */ livenesspb.NodeLivenessStatus_DEAD) @@ -609,9 +617,9 @@ func TestStorePoolSuspected(t *testing.T) { sg.GossipStores(uniqueStore, t) store := uniqueStore[0] - now := sp.Clock.Now().GoTime() - timeUntilStoreDead := TimeUntilStoreDead.Get(&sp.St.SV) - timeAfterStoreSuspect := TimeAfterStoreSuspect.Get(&sp.St.SV) + now := sp.clock.Now().GoTime() + timeUntilStoreDead := TimeUntilStoreDead.Get(&sp.st.SV) + timeAfterStoreSuspect := TimeAfterStoreSuspect.Get(&sp.st.SV) // See state transition diagram in storeDetail.status() for a visual // representation of what this test asserts. @@ -665,7 +673,8 @@ func TestGetLocalities(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) ctx := context.Background() - stopper, g, _, sp, _ := CreateTestStorePool(ctx, + st := cluster.MakeTestingClusterSettings() + stopper, g, _, sp, _ := CreateTestStorePool(ctx, st, TestTimeUntilStoreDead, false, /* deterministic */ func() int { return 10 }, /* nodeCount */ livenesspb.NodeLivenessStatus_DEAD) @@ -746,7 +755,8 @@ func TestStorePoolDecommissioningReplicas(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) ctx := context.Background() - stopper, g, _, sp, mnl := CreateTestStorePool(ctx, + st := cluster.MakeTestingClusterSettings() + stopper, g, _, sp, mnl := CreateTestStorePool(ctx, st, TestTimeUntilStoreDead, false, /* deterministic */ func() int { return 10 }, /* nodeCount */ livenesspb.NodeLivenessStatus_DEAD) diff --git a/pkg/kv/kvserver/allocator/storepool/test_helpers.go b/pkg/kv/kvserver/allocator/storepool/test_helpers.go index 805434aeb172..4effe9240eff 100644 --- a/pkg/kv/kvserver/allocator/storepool/test_helpers.go +++ b/pkg/kv/kvserver/allocator/storepool/test_helpers.go @@ -71,6 +71,7 @@ func (m *MockNodeLiveness) NodeLivenessFunc( // tests. Stopper must be stopped by the caller. func CreateTestStorePool( ctx context.Context, + st *cluster.Settings, timeUntilStoreDeadValue time.Duration, deterministic bool, nodeCount NodeCountFunc, @@ -79,7 +80,6 @@ func CreateTestStorePool( stopper := stop.NewStopper() mc := timeutil.NewManualTime(timeutil.Unix(0, 123)) clock := hlc.NewClock(mc, time.Nanosecond /* maxOffset */) - st := cluster.MakeTestingClusterSettings() ambientCtx := log.MakeTestingAmbientContext(stopper.Tracer()) rpcContext := rpc.NewContext(ctx, rpc.ContextOptions{ diff --git a/pkg/kv/kvserver/allocator_impl_test.go b/pkg/kv/kvserver/allocator_impl_test.go index ba454a8bcf01..3f2a90729e84 100644 --- a/pkg/kv/kvserver/allocator_impl_test.go +++ b/pkg/kv/kvserver/allocator_impl_test.go @@ -269,13 +269,14 @@ func TestAllocatorThrottled(t *testing.T) { // Finally, set that store to be throttled and ensure we don't send the // replica to purgatory. - a.StorePool.DetailsMu.Lock() - storeDetail, ok := a.StorePool.DetailsMu.StoreDetails[singleStore[0].StoreID] + storePool := a.StorePool.(*storepool.StorePool) + storePool.DetailsMu.Lock() + storeDetail, ok := storePool.DetailsMu.StoreDetails[singleStore[0].StoreID] if !ok { t.Fatalf("store:%d was not found in the store pool", singleStore[0].StoreID) } storeDetail.ThrottledUntil = timeutil.Now().Add(24 * time.Hour) - a.StorePool.DetailsMu.Unlock() + storePool.DetailsMu.Unlock() _, _, err = a.AllocateVoter(ctx, simpleSpanConfig, []roachpb.ReplicaDescriptor{}, nil, allocatorimpl.Dead) if _, ok := IsPurgatoryError(err); ok { t.Fatalf("expected a non purgatory error, got: %+v", err) diff --git a/pkg/kv/kvserver/asim/state/impl.go b/pkg/kv/kvserver/asim/state/impl.go index b0db9f026bcd..cf68277f0ff5 100644 --- a/pkg/kv/kvserver/asim/state/impl.go +++ b/pkg/kv/kvserver/asim/state/impl.go @@ -729,6 +729,7 @@ func (s *state) NodeCountFn() storepool.NodeCountFunc { // populates the storepool with the current state. func (s *state) MakeAllocator(storeID StoreID) allocatorimpl.Allocator { return allocatorimpl.MakeAllocator( + s.stores[storeID].storepool.st, s.stores[storeID].storepool, func(addr string) (time.Duration, bool) { return 0, true }, &allocator.TestingKnobs{ diff --git a/pkg/kv/kvserver/store.go b/pkg/kv/kvserver/store.go index 333978381605..3524e6e40722 100644 --- a/pkg/kv/kvserver/store.go +++ b/pkg/kv/kvserver/store.go @@ -1219,10 +1219,14 @@ func NewStore( ioThresholds: &iot, } s.ioThreshold.t = &admissionpb.IOThreshold{} + var allocatorStorePool storepool.AllocatorStorePool + if cfg.StorePool != nil { + allocatorStorePool = cfg.StorePool + } if cfg.RPCContext != nil { - s.allocator = allocatorimpl.MakeAllocator(cfg.StorePool, cfg.RPCContext.RemoteClocks.Latency, cfg.TestingKnobs.AllocatorKnobs) + s.allocator = allocatorimpl.MakeAllocator(cfg.Settings, allocatorStorePool, cfg.RPCContext.RemoteClocks.Latency, cfg.TestingKnobs.AllocatorKnobs) } else { - s.allocator = allocatorimpl.MakeAllocator(cfg.StorePool, func(string) (time.Duration, bool) { + s.allocator = allocatorimpl.MakeAllocator(cfg.Settings, allocatorStorePool, func(string) (time.Duration, bool) { return 0, false }, cfg.TestingKnobs.AllocatorKnobs) } diff --git a/pkg/kv/kvserver/store_pool_test.go b/pkg/kv/kvserver/store_pool_test.go index 4d84d3de22b9..7267468361ba 100644 --- a/pkg/kv/kvserver/store_pool_test.go +++ b/pkg/kv/kvserver/store_pool_test.go @@ -19,12 +19,14 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness/livenesspb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/replicastats" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/storage/enginepb" "github.com/cockroachdb/cockroach/pkg/testutils/gossiputil" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/cockroach/pkg/util/uuid" ) @@ -36,7 +38,8 @@ func TestStorePoolUpdateLocalStore(t *testing.T) { clock := hlc.NewClock(manual, time.Nanosecond /* maxOffset */) ctx := context.Background() // We're going to manually mark stores dead in this test. - stopper, g, _, sp, _ := storepool.CreateTestStorePool(ctx, + st := cluster.MakeTestingClusterSettings() + stopper, g, _, sp, _ := storepool.CreateTestStorePool(ctx, st, storepool.TestTimeUntilStoreDead, false, /* deterministic */ func() int { return 10 }, /* nodeCount */ livenesspb.NodeLivenessStatus_DEAD) @@ -164,7 +167,9 @@ func TestStorePoolUpdateLocalStoreBeforeGossip(t *testing.T) { defer log.Scope(t).Close(t) ctx := context.Background() clock := hlc.NewClock(timeutil.NewManualTime(timeutil.Unix(0, 123)), time.Nanosecond /* maxOffset */) - stopper, _, _, sp, _ := storepool.CreateTestStorePool(ctx, + cfg := TestStoreConfig(clock) + var stopper *stop.Stopper + stopper, _, _, cfg.StorePool, _ = storepool.CreateTestStorePool(ctx, cfg.Settings, storepool.TestTimeUntilStoreDead, false, /* deterministic */ func() int { return 10 }, /* nodeCount */ livenesspb.NodeLivenessStatus_DEAD) @@ -174,7 +179,7 @@ func TestStorePoolUpdateLocalStoreBeforeGossip(t *testing.T) { node := roachpb.NodeDescriptor{NodeID: roachpb.NodeID(1)} eng := storage.NewDefaultInMemForTesting() stopper.AddCloser(eng) - cfg := TestStoreConfig(clock) + cfg.Transport = NewDummyRaftTransport(cfg.Settings, cfg.AmbientCtx.Tracer) store := NewStore(ctx, cfg, eng, &node) // Fake an ident because this test doesn't want to start the store @@ -203,11 +208,11 @@ func TestStorePoolUpdateLocalStoreBeforeGossip(t *testing.T) { // Update StorePool, which should be a no-op. storeID := roachpb.StoreID(1) - if _, ok := sp.GetStoreDescriptor(storeID); ok { + if _, ok := cfg.StorePool.GetStoreDescriptor(storeID); ok { t.Fatalf("StoreDescriptor not gossiped, should not be found") } - sp.UpdateLocalStoreAfterRebalance(storeID, rangeUsageInfo, roachpb.ADD_VOTER) - if _, ok := sp.GetStoreDescriptor(storeID); ok { + cfg.StorePool.UpdateLocalStoreAfterRebalance(storeID, rangeUsageInfo, roachpb.ADD_VOTER) + if _, ok := cfg.StorePool.GetStoreDescriptor(storeID); ok { t.Fatalf("StoreDescriptor still not gossiped, should not be found") } } diff --git a/pkg/kv/kvserver/store_rebalancer.go b/pkg/kv/kvserver/store_rebalancer.go index 5bcddc21bcb8..013a4d4af2bb 100644 --- a/pkg/kv/kvserver/store_rebalancer.go +++ b/pkg/kv/kvserver/store_rebalancer.go @@ -258,7 +258,7 @@ func (sr *StoreRebalancer) Start(ctx context.Context, stopper *stop.Stopper) { func (sr *StoreRebalancer) scorerOptions(ctx context.Context) *allocatorimpl.QPSScorerOptions { return &allocatorimpl.QPSScorerOptions{ StoreHealthOptions: sr.allocator.StoreHealthOptions(ctx), - Deterministic: sr.allocator.StorePool.Deterministic, + Deterministic: sr.allocator.StorePool.IsDeterministic(), QPSRebalanceThreshold: allocator.QPSRebalanceThreshold.Get(&sr.st.SV), MinRequiredQPSDiff: allocator.MinQPSDifferenceForTransfers.Get(&sr.st.SV), } @@ -616,7 +616,7 @@ func (sr *StoreRebalancer) chooseLeaseToTransfer( ctx context.Context, rctx *RebalanceContext, ) (CandidateReplica, roachpb.ReplicaDescriptor, []CandidateReplica) { var considerForRebalance []CandidateReplica - now := sr.allocator.StorePool.Clock.NowAsClockTimestamp() + now := sr.allocator.StorePool.Clock().NowAsClockTimestamp() for { if len(rctx.hottestRanges) == 0 { return nil, roachpb.ReplicaDescriptor{}, considerForRebalance @@ -728,7 +728,7 @@ type rangeRebalanceContext struct { func (sr *StoreRebalancer) chooseRangeToRebalance( ctx context.Context, rctx *RebalanceContext, ) (candidateReplica CandidateReplica, voterTargets, nonVoterTargets []roachpb.ReplicationTarget) { - now := sr.allocator.StorePool.Clock.NowAsClockTimestamp() + now := sr.allocator.StorePool.Clock().NowAsClockTimestamp() if len(rctx.rebalanceCandidates) == 0 && len(rctx.hottestRanges) >= 0 { // NB: In practice, the rebalanceCandidates will be populated with // hottest ranges by the preceeding function call, rebalance leases. diff --git a/pkg/kv/kvserver/store_rebalancer_test.go b/pkg/kv/kvserver/store_rebalancer_test.go index a03456fc8184..444c152bc01d 100644 --- a/pkg/kv/kvserver/store_rebalancer_test.go +++ b/pkg/kv/kvserver/store_rebalancer_test.go @@ -20,6 +20,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/allocatorimpl" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/storepool" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/storage/enginepb" "github.com/cockroachdb/cockroach/pkg/testutils/gossiputil" @@ -766,7 +767,8 @@ func TestChooseRangeToRebalanceRandom(t *testing.T) { sr.getRaftStatusFn = func(r CandidateReplica) *raft.Status { return TestingRaftStatusFn(r) } - a.StorePool.IsStoreReadyForRoutineReplicaTransfer = func(_ context.Context, this roachpb.StoreID) bool { + storePool := a.StorePool.(*storepool.StorePool) + storePool.OverrideIsStoreReadyForRoutineReplicaTransferFn = func(_ context.Context, this roachpb.StoreID) bool { for _, deadStore := range deadStores { // NodeID match StoreIDs here, so this comparison is valid. if deadStore.StoreID == this { @@ -1185,7 +1187,7 @@ func TestChooseRangeToRebalanceIgnoresRangeOnBestStores(t *testing.T) { localDesc := *noLocalityStores[len(noLocalityStores)-1] cfg := TestStoreConfig(nil) cfg.Gossip = g - cfg.StorePool = a.StorePool + cfg.StorePool = a.StorePool.(*storepool.StorePool) cfg.DefaultSpanConfig.NumVoters = 1 cfg.DefaultSpanConfig.NumReplicas = 1 s := createTestStoreWithoutStart(ctx, t, stopper, testStoreOpts{createSystemRanges: true}, &cfg) @@ -1421,7 +1423,7 @@ func TestNoLeaseTransferToBehindReplicas(t *testing.T) { localDesc := *noLocalityStores[0] cfg := TestStoreConfig(nil) cfg.Gossip = g - cfg.StorePool = a.StorePool + cfg.StorePool = a.StorePool.(*storepool.StorePool) s := createTestStoreWithoutStart(ctx, t, stopper, testStoreOpts{createSystemRanges: true}, &cfg) gossiputil.NewStoreGossiper(cfg.Gossip).GossipStores(noLocalityStores, t) s.Ident = &roachpb.StoreIdent{StoreID: localDesc.StoreID} @@ -1601,7 +1603,7 @@ func TestStoreRebalancerReadAmpCheck(t *testing.T) { localDesc := *noLocalityStores[0] cfg := TestStoreConfig(nil) cfg.Gossip = g - cfg.StorePool = a.StorePool + cfg.StorePool = a.StorePool.(*storepool.StorePool) s := createTestStoreWithoutStart(ctx, t, stopper, testStoreOpts{createSystemRanges: true}, &cfg) gossiputil.NewStoreGossiper(cfg.Gossip).GossipStores(test.stores, t) s.Ident = &roachpb.StoreIdent{StoreID: localDesc.StoreID} From 74a4d61aea4470cbfedf2b15de8bb4804b2979e3 Mon Sep 17 00:00:00 2001 From: Alex Sarkesian Date: Tue, 15 Nov 2022 20:25:43 -0500 Subject: [PATCH 2/4] allocator: add support for store pool liveness overrides While previously the allocator only evaluated using liveness obtained from gossip, this change introduces a new `OverrideStorePool` struct which can be used to override the liveness of a node for the purposes of evaluating allocator actions and targets. This `OverrideStorePool` is backed by an existing actual `StorePool`, which retains the majority of its logic. Depends on #91461. Part of #91570. Release note: None --- .../kvserver/allocator/storepool/BUILD.bazel | 6 +- .../storepool/override_store_pool.go | 128 ++++++ .../storepool/override_store_pool_test.go | 387 ++++++++++++++++++ .../allocator/storepool/store_pool.go | 54 ++- .../allocator/storepool/store_pool_test.go | 2 +- 5 files changed, 560 insertions(+), 17 deletions(-) create mode 100644 pkg/kv/kvserver/allocator/storepool/override_store_pool.go create mode 100644 pkg/kv/kvserver/allocator/storepool/override_store_pool_test.go diff --git a/pkg/kv/kvserver/allocator/storepool/BUILD.bazel b/pkg/kv/kvserver/allocator/storepool/BUILD.bazel index cf76d060eeed..d4b251659b36 100644 --- a/pkg/kv/kvserver/allocator/storepool/BUILD.bazel +++ b/pkg/kv/kvserver/allocator/storepool/BUILD.bazel @@ -4,6 +4,7 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") go_library( name = "storepool", srcs = [ + "override_store_pool.go", "store_pool.go", "test_helpers.go", ], @@ -35,7 +36,10 @@ go_library( go_test( name = "storepool_test", - srcs = ["store_pool_test.go"], + srcs = [ + "override_store_pool_test.go", + "store_pool_test.go", + ], args = ["-test.timeout=295s"], embed = [":storepool"], deps = [ diff --git a/pkg/kv/kvserver/allocator/storepool/override_store_pool.go b/pkg/kv/kvserver/allocator/storepool/override_store_pool.go new file mode 100644 index 000000000000..6f029abe1208 --- /dev/null +++ b/pkg/kv/kvserver/allocator/storepool/override_store_pool.go @@ -0,0 +1,128 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package storepool + +import ( + "context" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/util" + "github.com/cockroachdb/cockroach/pkg/util/hlc" +) + +// OverrideStorePool is an implementation of AllocatorStorePool that allows +// the ability to override a node's liveness status for the purposes of +// evaluation for the allocator, otherwise delegating to an actual StorePool +// for all its logic, including management and lookup of store descriptors. +type OverrideStorePool struct { + sp *StorePool + + overrideNodeLivenessFn NodeLivenessFunc +} + +var _ AllocatorStorePool = &OverrideStorePool{} + +func NewOverrideStorePool( + storePool *StorePool, nl NodeLivenessFunc, +) *OverrideStorePool { + return &OverrideStorePool{ + sp: storePool, + overrideNodeLivenessFn: nl, + } +} + +func (o *OverrideStorePool) String() string { + return o.sp.statusString(o.overrideNodeLivenessFn) +} + +// IsStoreReadyForRoutineReplicaTransfer implements the AllocatorStorePool interface. +func (o *OverrideStorePool) IsStoreReadyForRoutineReplicaTransfer(ctx context.Context, targetStoreID roachpb.StoreID) bool { + return o.sp.isStoreReadyForRoutineReplicaTransferInternal(ctx, targetStoreID, o.overrideNodeLivenessFn) +} + +// DecommissioningReplicas implements the AllocatorStorePool interface. +func (o *OverrideStorePool) DecommissioningReplicas(repls []roachpb.ReplicaDescriptor) []roachpb.ReplicaDescriptor { + return o.sp.decommissioningReplicasWithLiveness(repls, o.overrideNodeLivenessFn) +} + +// GetStoreList implements the AllocatorStorePool interface. +func (o *OverrideStorePool) GetStoreList(filter StoreFilter) (StoreList, int, ThrottledStoreReasons) { + o.sp.DetailsMu.Lock() + defer o.sp.DetailsMu.Unlock() + + var storeIDs roachpb.StoreIDSlice + for storeID := range o.sp.DetailsMu.StoreDetails { + storeIDs = append(storeIDs, storeID) + } + return o.sp.getStoreListFromIDsLocked(storeIDs, o.overrideNodeLivenessFn, filter) +} + +// GetStoreListFromIDs implements the AllocatorStorePool interface. +func (o *OverrideStorePool) GetStoreListFromIDs(storeIDs roachpb.StoreIDSlice, filter StoreFilter) (StoreList, int, ThrottledStoreReasons) { + o.sp.DetailsMu.Lock() + defer o.sp.DetailsMu.Unlock() + return o.sp.getStoreListFromIDsLocked(storeIDs, o.overrideNodeLivenessFn, filter) +} + +// LiveAndDeadReplicas implements the AllocatorStorePool interface. +func (o *OverrideStorePool) LiveAndDeadReplicas(repls []roachpb.ReplicaDescriptor, includeSuspectAndDrainingStores bool) (liveReplicas, deadReplicas []roachpb.ReplicaDescriptor) { + return o.sp.liveAndDeadReplicasWithLiveness(repls, o.overrideNodeLivenessFn, includeSuspectAndDrainingStores) +} + +// ClusterNodeCount implements the AllocatorStorePool interface. +func (o *OverrideStorePool) ClusterNodeCount() int { + return o.sp.ClusterNodeCount() +} + +// IsDeterministic implements the AllocatorStorePool interface. +func (o *OverrideStorePool) IsDeterministic() bool { + return o.sp.deterministic +} + +// Clock implements the AllocatorStorePool interface. +func (o *OverrideStorePool) Clock() *hlc.Clock { + return o.sp.clock +} + +// GetLocalitiesByNode implements the AllocatorStorePool interface. +func (o *OverrideStorePool) GetLocalitiesByNode(replicas []roachpb.ReplicaDescriptor) map[roachpb.NodeID]roachpb.Locality { + return o.sp.GetLocalitiesByNode(replicas) +} + +// GetLocalitiesByStore implements the AllocatorStorePool interface. +func (o *OverrideStorePool) GetLocalitiesByStore(replicas []roachpb.ReplicaDescriptor) map[roachpb.StoreID]roachpb.Locality { + return o.sp.GetLocalitiesByStore(replicas) +} + +// GetStores implements the AllocatorStorePool interface. +func (o *OverrideStorePool) GetStores() map[roachpb.StoreID]roachpb.StoreDescriptor { + return o.sp.GetStores() +} + +// GetStoreDescriptor implements the AllocatorStorePool interface. +func (o *OverrideStorePool) GetStoreDescriptor(storeID roachpb.StoreID) (roachpb.StoreDescriptor, bool) { + return o.sp.GetStoreDescriptor(storeID) +} + +// GossipNodeIDAddress implements the AllocatorStorePool interface. +func (o *OverrideStorePool) GossipNodeIDAddress(nodeID roachpb.NodeID) (*util.UnresolvedAddr, error) { + return o.sp.GossipNodeIDAddress(nodeID) +} + +// UpdateLocalStoreAfterRebalance implements the AllocatorStorePool interface. +func (o *OverrideStorePool) UpdateLocalStoreAfterRebalance(storeID roachpb.StoreID, rangeUsageInfo allocator.RangeUsageInfo, changeType roachpb.ReplicaChangeType) { + o.sp.UpdateLocalStoreAfterRebalance(storeID, rangeUsageInfo, changeType) +} + +// UpdateLocalStoresAfterLeaseTransfer implements the AllocatorStorePool interface. +func (o *OverrideStorePool) UpdateLocalStoresAfterLeaseTransfer(from roachpb.StoreID, to roachpb.StoreID, rangeQPS float64) { + o.sp.UpdateLocalStoresAfterLeaseTransfer(from, to, rangeQPS) +} diff --git a/pkg/kv/kvserver/allocator/storepool/override_store_pool_test.go b/pkg/kv/kvserver/allocator/storepool/override_store_pool_test.go new file mode 100644 index 000000000000..7ee067404850 --- /dev/null +++ b/pkg/kv/kvserver/allocator/storepool/override_store_pool_test.go @@ -0,0 +1,387 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package storepool + +import ( + "context" + "reflect" + "testing" + "time" + + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness/livenesspb" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/testutils/gossiputil" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/stretchr/testify/require" +) + +// TestOverrideStorePoolStatusString tests the status string output of the +// store pool implementation, including any liveness overrides. +func TestOverrideStorePoolStatusString(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + ctx := context.Background() + st := cluster.MakeTestingClusterSettings() + stopper, g, _, testStorePool, mnl := CreateTestStorePool(ctx, st, + TestTimeUntilStoreDead, false, /* deterministic */ + func() int { return 10 }, /* nodeCount */ + livenesspb.NodeLivenessStatus_DEAD) + defer stopper.Stop(ctx) + sg := gossiputil.NewStoreGossiper(g) + + livenessOverrides := make(map[roachpb.NodeID]livenesspb.NodeLivenessStatus) + sp := NewOverrideStorePool(testStorePool, func(nid roachpb.NodeID, now time.Time, timeUntilStoreDead time.Duration) livenesspb.NodeLivenessStatus { + if overriddenLiveness, ok := livenessOverrides[nid]; ok { + return overriddenLiveness + } + + return mnl.NodeLivenessFunc(nid, now, timeUntilStoreDead) + }) + + stores := []*roachpb.StoreDescriptor{ + { + StoreID: 1, + Node: roachpb.NodeDescriptor{NodeID: 1}, + }, + { + StoreID: 2, + Node: roachpb.NodeDescriptor{NodeID: 2}, + }, + { + StoreID: 3, + Node: roachpb.NodeDescriptor{NodeID: 3}, + }, + { + StoreID: 4, + Node: roachpb.NodeDescriptor{NodeID: 4}, + }, + { + StoreID: 5, + Node: roachpb.NodeDescriptor{NodeID: 5}, + }, + } + + sg.GossipStores(stores, t) + for i := 1; i <= 5; i++ { + mnl.SetNodeStatus(roachpb.NodeID(i), livenesspb.NodeLivenessStatus_LIVE) + } + + // Override node 2 as dead. + livenessOverrides[roachpb.NodeID(2)] = livenesspb.NodeLivenessStatus_DEAD + + // Override node 3 as decommissioning. + livenessOverrides[roachpb.NodeID(3)] = livenesspb.NodeLivenessStatus_DECOMMISSIONING + + // Mark node 5 as draining. + mnl.SetNodeStatus(5, livenesspb.NodeLivenessStatus_DRAINING) + + require.Equal(t, "1: range-count=0 fraction-used=0.00\n"+ + "2 (status=1): range-count=0 fraction-used=0.00\n"+ + "3 (status=5): range-count=0 fraction-used=0.00\n"+ + "4: range-count=0 fraction-used=0.00\n"+ + "5 (status=7): range-count=0 fraction-used=0.00\n", + sp.String(), + ) +} + +// TestOverrideStorePoolDecommissioningReplicas validates the ability of to use +// both liveness as well as liveness overrides in determining the set of +// decommissioning replicas. +func TestOverrideStorePoolDecommissioningReplicas(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + ctx := context.Background() + st := cluster.MakeTestingClusterSettings() + stopper, g, _, testStorePool, mnl := CreateTestStorePool(ctx, st, + TestTimeUntilStoreDead, false, /* deterministic */ + func() int { return 10 }, /* nodeCount */ + livenesspb.NodeLivenessStatus_DEAD) + defer stopper.Stop(ctx) + sg := gossiputil.NewStoreGossiper(g) + + livenessOverrides := make(map[roachpb.NodeID]livenesspb.NodeLivenessStatus) + sp := NewOverrideStorePool(testStorePool, func(nid roachpb.NodeID, now time.Time, timeUntilStoreDead time.Duration) livenesspb.NodeLivenessStatus { + if overriddenLiveness, ok := livenessOverrides[nid]; ok { + return overriddenLiveness + } + + return mnl.NodeLivenessFunc(nid, now, timeUntilStoreDead) + }) + + stores := []*roachpb.StoreDescriptor{ + { + StoreID: 1, + Node: roachpb.NodeDescriptor{NodeID: 1}, + }, + { + StoreID: 2, + Node: roachpb.NodeDescriptor{NodeID: 2}, + }, + { + StoreID: 3, + Node: roachpb.NodeDescriptor{NodeID: 3}, + }, + { + StoreID: 4, + Node: roachpb.NodeDescriptor{NodeID: 4}, + }, + { + StoreID: 5, + Node: roachpb.NodeDescriptor{NodeID: 5}, + }, + } + + replicas := []roachpb.ReplicaDescriptor{ + { + NodeID: 1, + StoreID: 1, + ReplicaID: 1, + }, + { + NodeID: 2, + StoreID: 2, + ReplicaID: 2, + }, + { + NodeID: 3, + StoreID: 3, + ReplicaID: 4, + }, + { + NodeID: 4, + StoreID: 4, + ReplicaID: 4, + }, + { + NodeID: 5, + StoreID: 5, + ReplicaID: 5, + }, + } + + sg.GossipStores(stores, t) + for i := 1; i <= 5; i++ { + mnl.SetNodeStatus(roachpb.NodeID(i), livenesspb.NodeLivenessStatus_LIVE) + } + + liveReplicas, deadReplicas := sp.LiveAndDeadReplicas(replicas, false /* includeSuspectAndDrainingStores */) + if len(liveReplicas) != 5 { + t.Fatalf("expected five live replicas, found %d (%v)", len(liveReplicas), liveReplicas) + } + if len(deadReplicas) > 0 { + t.Fatalf("expected no dead replicas initially, found %d (%v)", len(deadReplicas), deadReplicas) + } + // Mark node 4 as decommissioning. + mnl.SetNodeStatus(4, livenesspb.NodeLivenessStatus_DECOMMISSIONING) + // Mark node 5 as dead. + mnl.SetNodeStatus(5, livenesspb.NodeLivenessStatus_DEAD) + + // Override node 3 as decommissioning. + livenessOverrides[roachpb.NodeID(3)] = livenesspb.NodeLivenessStatus_DECOMMISSIONING + + liveReplicas, deadReplicas = sp.LiveAndDeadReplicas(replicas, false /* includeSuspectAndDrainingStores */) + // Decommissioning replicas are considered live. + if a, e := liveReplicas, replicas[:4]; !reflect.DeepEqual(a, e) { + t.Fatalf("expected live replicas %+v; got %+v", e, a) + } + if a, e := deadReplicas, replicas[4:]; !reflect.DeepEqual(a, e) { + t.Fatalf("expected dead replicas %+v; got %+v", e, a) + } + + decommissioningReplicas := sp.DecommissioningReplicas(replicas) + if a, e := decommissioningReplicas, replicas[2:4]; !reflect.DeepEqual(a, e) { + t.Fatalf("expected decommissioning replicas %+v; got %+v", e, a) + } +} + +// TestOverrideStorePoolGetStoreList tests the functionality of the store list +// with and without liveness overrides. +func TestOverrideStorePoolGetStoreList(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + ctx := context.Background() + st := cluster.MakeTestingClusterSettings() + // We're going to manually mark stores dead in this test. + stopper, g, _, testStorePool, mnl := CreateTestStorePool(ctx, st, + TestTimeUntilStoreDead, false, /* deterministic */ + func() int { return 10 }, /* nodeCount */ + livenesspb.NodeLivenessStatus_DEAD) + defer stopper.Stop(ctx) + sg := gossiputil.NewStoreGossiper(g) + + livenessOverrides := make(map[roachpb.NodeID]livenesspb.NodeLivenessStatus) + sp := NewOverrideStorePool(testStorePool, func(nid roachpb.NodeID, now time.Time, timeUntilStoreDead time.Duration) livenesspb.NodeLivenessStatus { + if overriddenLiveness, ok := livenessOverrides[nid]; ok { + return overriddenLiveness + } + + return mnl.NodeLivenessFunc(nid, now, timeUntilStoreDead) + }) + + constraints := []roachpb.ConstraintsConjunction{ + { + Constraints: []roachpb.Constraint{ + {Type: roachpb.Constraint_REQUIRED, Value: "ssd"}, + {Type: roachpb.Constraint_REQUIRED, Value: "dc"}, + }, + }, + } + required := []string{"ssd", "dc"} + // Nothing yet. + sl, _, _ := sp.GetStoreList(StoreFilterNone) + sl = sl.ExcludeInvalid(constraints) + if len(sl.Stores) != 0 { + t.Errorf("expected no stores, instead %+v", sl.Stores) + } + + matchingStore := roachpb.StoreDescriptor{ + StoreID: 1, + Node: roachpb.NodeDescriptor{NodeID: 1}, + Attrs: roachpb.Attributes{Attrs: required}, + } + supersetStore := roachpb.StoreDescriptor{ + StoreID: 2, + Node: roachpb.NodeDescriptor{NodeID: 2}, + Attrs: roachpb.Attributes{Attrs: append(required, "db")}, + } + unmatchingStore := roachpb.StoreDescriptor{ + StoreID: 3, + Node: roachpb.NodeDescriptor{NodeID: 3}, + Attrs: roachpb.Attributes{Attrs: []string{"ssd", "otherdc"}}, + } + emptyStore := roachpb.StoreDescriptor{ + StoreID: 4, + Node: roachpb.NodeDescriptor{NodeID: 4}, + Attrs: roachpb.Attributes{}, + } + deadStore := roachpb.StoreDescriptor{ + StoreID: 5, + Node: roachpb.NodeDescriptor{NodeID: 5}, + Attrs: roachpb.Attributes{Attrs: required}, + } + decommissioningStore := roachpb.StoreDescriptor{ + StoreID: 6, + Node: roachpb.NodeDescriptor{NodeID: 6}, + Attrs: roachpb.Attributes{Attrs: required}, + } + absentStore := roachpb.StoreDescriptor{ + StoreID: 7, + Node: roachpb.NodeDescriptor{NodeID: 7}, + Attrs: roachpb.Attributes{Attrs: required}, + } + suspectedStore := roachpb.StoreDescriptor{ + StoreID: 8, + Node: roachpb.NodeDescriptor{NodeID: 8}, + Attrs: roachpb.Attributes{Attrs: required}, + } + + // Gossip and mark all alive initially. + sg.GossipStores([]*roachpb.StoreDescriptor{ + &matchingStore, + &supersetStore, + &unmatchingStore, + &emptyStore, + &deadStore, + &decommissioningStore, + &suspectedStore, + // absentStore is purposefully not gossiped. + }, t) + for i := 1; i <= 8; i++ { + mnl.SetNodeStatus(roachpb.NodeID(i), livenesspb.NodeLivenessStatus_LIVE) + } + + // Set deadStore as dead. + livenessOverrides[deadStore.Node.NodeID] = livenesspb.NodeLivenessStatus_DEAD + + // Set decommissioningStore as decommissioning. + livenessOverrides[decommissioningStore.Node.NodeID] = livenesspb.NodeLivenessStatus_DECOMMISSIONING + + // Set suspectedStore as suspected. + testStorePool.DetailsMu.Lock() + testStorePool.DetailsMu.StoreDetails[suspectedStore.StoreID].LastAvailable = testStorePool.clock.Now().GoTime() + testStorePool.DetailsMu.StoreDetails[suspectedStore.StoreID].LastUnavailable = testStorePool.clock.Now().GoTime() + testStorePool.DetailsMu.Unlock() + + // No filter or limited set of store IDs. + if err := verifyStoreList( + sp, + constraints, + nil, /* storeIDs */ + StoreFilterNone, + []int{ + int(matchingStore.StoreID), + int(supersetStore.StoreID), + int(suspectedStore.StoreID), + }, + /* expectedAliveStoreCount */ 5, + /* expectedThrottledStoreCount */ 1, + ); err != nil { + t.Error(err) + } + + // Filter out suspected stores but don't limit the set of store IDs. + if err := verifyStoreList( + sp, + constraints, + nil, /* storeIDs */ + StoreFilterSuspect, + []int{ + int(matchingStore.StoreID), + int(supersetStore.StoreID), + }, + /* expectedAliveStoreCount */ 5, + /* expectedThrottledStoreCount */ 1, + ); err != nil { + t.Error(err) + } + + limitToStoreIDs := roachpb.StoreIDSlice{ + matchingStore.StoreID, + decommissioningStore.StoreID, + absentStore.StoreID, + suspectedStore.StoreID, + } + + // No filter but limited to limitToStoreIDs. + // Note that supersetStore is not included. + if err := verifyStoreList( + sp, + constraints, + limitToStoreIDs, + StoreFilterNone, + []int{ + int(matchingStore.StoreID), + int(suspectedStore.StoreID), + }, + /* expectedAliveStoreCount */ 2, + /* expectedThrottledStoreCount */ 1, + ); err != nil { + t.Error(err) + } + + // Filter out suspected stores and limit to limitToStoreIDs. + // Note that suspectedStore is not included. + if err := verifyStoreList( + sp, + constraints, + limitToStoreIDs, + StoreFilterSuspect, + []int{ + int(matchingStore.StoreID), + }, + /* expectedAliveStoreCount */ 2, + /* expectedThrottledStoreCount */ 1, + ); err != nil { + t.Error(err) + } +} diff --git a/pkg/kv/kvserver/allocator/storepool/store_pool.go b/pkg/kv/kvserver/allocator/storepool/store_pool.go index 64da8c5d5d7c..707cf8973344 100644 --- a/pkg/kv/kvserver/allocator/storepool/store_pool.go +++ b/pkg/kv/kvserver/allocator/storepool/store_pool.go @@ -334,6 +334,8 @@ type localityWithString struct { // AllocatorStorePool provides an interface for use by the allocator to a list // of all known stores in the cluster and information on their health. type AllocatorStorePool interface { + fmt.Stringer + // ClusterNodeCount returns the number of nodes that are possible allocation // targets. // See comment on StorePool.ClusterNodeCount(). @@ -477,6 +479,10 @@ func NewStorePool( } func (sp *StorePool) String() string { + return sp.statusString(sp.NodeLivenessFn) +} + +func (sp *StorePool) statusString(nl NodeLivenessFunc) string { sp.DetailsMu.RLock() defer sp.DetailsMu.RUnlock() @@ -494,7 +500,7 @@ func (sp *StorePool) String() string { for _, id := range ids { detail := sp.DetailsMu.StoreDetails[id] fmt.Fprintf(&buf, "%d", id) - status := detail.status(now, timeUntilStoreDead, sp.NodeLivenessFn, timeAfterStoreSuspect) + status := detail.status(now, timeUntilStoreDead, nl, timeAfterStoreSuspect) if status != storeStatusAvailable { fmt.Fprintf(&buf, " (status=%d)", status) } @@ -653,6 +659,14 @@ func (sp *StorePool) GetStoreDescriptor(storeID roachpb.StoreID) (roachpb.StoreD // from the provided repls and returns them in a slice. func (sp *StorePool) DecommissioningReplicas( repls []roachpb.ReplicaDescriptor, +) (decommissioningReplicas []roachpb.ReplicaDescriptor) { + return sp.decommissioningReplicasWithLiveness(repls, sp.NodeLivenessFn) +} + +// decommissioningReplicasWithLiveness filters out replicas on decommissioning node/store +// from the provided repls and returns them in a slice, using the provided NodeLivenessFunc. +func (sp *StorePool) decommissioningReplicasWithLiveness( + repls []roachpb.ReplicaDescriptor, nl NodeLivenessFunc, ) (decommissioningReplicas []roachpb.ReplicaDescriptor) { sp.DetailsMu.Lock() defer sp.DetailsMu.Unlock() @@ -665,7 +679,7 @@ func (sp *StorePool) DecommissioningReplicas( for _, repl := range repls { detail := sp.GetStoreDetailLocked(repl.StoreID) - switch detail.status(now, timeUntilStoreDead, sp.NodeLivenessFn, timeAfterStoreSuspect) { + switch detail.status(now, timeUntilStoreDead, nl, timeAfterStoreSuspect) { case storeStatusDecommissioning: decommissioningReplicas = append(decommissioningReplicas, repl) } @@ -723,7 +737,7 @@ func (sp *StorePool) IsDead(storeID roachpb.StoreID) (bool, time.Duration, error // liveness or deadness at the moment) or an error if the store is not found in // the pool. func (sp *StorePool) IsUnknown(storeID roachpb.StoreID) (bool, error) { - status, err := sp.storeStatus(storeID) + status, err := sp.storeStatus(storeID, sp.NodeLivenessFn) if err != nil { return false, err } @@ -733,7 +747,7 @@ func (sp *StorePool) IsUnknown(storeID roachpb.StoreID) (bool, error) { // IsDraining returns true if the given store's status is `storeStatusDraining` // or an error if the store is not found in the pool. func (sp *StorePool) IsDraining(storeID roachpb.StoreID) (bool, error) { - status, err := sp.storeStatus(storeID) + status, err := sp.storeStatus(storeID, sp.NodeLivenessFn) if err != nil { return false, err } @@ -743,14 +757,14 @@ func (sp *StorePool) IsDraining(storeID roachpb.StoreID) (bool, error) { // IsLive returns true if the node is considered alive by the store pool or an error // if the store is not found in the pool. func (sp *StorePool) IsLive(storeID roachpb.StoreID) (bool, error) { - status, err := sp.storeStatus(storeID) + status, err := sp.storeStatus(storeID, sp.NodeLivenessFn) if err != nil { return false, err } return status == storeStatusAvailable, nil } -func (sp *StorePool) storeStatus(storeID roachpb.StoreID) (storeStatus, error) { +func (sp *StorePool) storeStatus(storeID roachpb.StoreID, nl NodeLivenessFunc) (storeStatus, error) { sp.DetailsMu.Lock() defer sp.DetailsMu.Unlock() @@ -763,7 +777,7 @@ func (sp *StorePool) storeStatus(storeID roachpb.StoreID) (storeStatus, error) { now := sp.clock.Now().GoTime() timeUntilStoreDead := TimeUntilStoreDead.Get(&sp.st.SV) timeAfterStoreSuspect := TimeAfterStoreSuspect.Get(&sp.st.SV) - return sd.status(now, timeUntilStoreDead, sp.NodeLivenessFn, timeAfterStoreSuspect), nil + return sd.status(now, timeUntilStoreDead, nl, timeAfterStoreSuspect), nil } // LiveAndDeadReplicas divides the provided repls slice into two slices: the @@ -780,6 +794,16 @@ func (sp *StorePool) storeStatus(storeID roachpb.StoreID) (storeStatus, error) { // they are excluded from the returned slices. func (sp *StorePool) LiveAndDeadReplicas( repls []roachpb.ReplicaDescriptor, includeSuspectAndDrainingStores bool, +) (liveReplicas, deadReplicas []roachpb.ReplicaDescriptor) { + return sp.liveAndDeadReplicasWithLiveness(repls, sp.NodeLivenessFn, includeSuspectAndDrainingStores) +} + +// liveAndDeadReplicasWithLiveness divides the provided repls slice into two slices: the +// first for live replicas, and the second for dead replicas, using the +// provided NodeLivenessFunc. +// See comment on StorePool.LiveAndDeadReplicas(..). +func (sp *StorePool) liveAndDeadReplicasWithLiveness( + repls []roachpb.ReplicaDescriptor, nl NodeLivenessFunc, includeSuspectAndDrainingStores bool, ) (liveReplicas, deadReplicas []roachpb.ReplicaDescriptor) { sp.DetailsMu.Lock() defer sp.DetailsMu.Unlock() @@ -791,7 +815,7 @@ func (sp *StorePool) LiveAndDeadReplicas( for _, repl := range repls { detail := sp.GetStoreDetailLocked(repl.StoreID) // Mark replica as dead if store is dead. - status := detail.status(now, timeUntilStoreDead, sp.NodeLivenessFn, timeAfterStoreSuspect) + status := detail.status(now, timeUntilStoreDead, nl, timeAfterStoreSuspect) switch status { case storeStatusDead: deadReplicas = append(deadReplicas, repl) @@ -963,7 +987,7 @@ func (sp *StorePool) GetStoreList(filter StoreFilter) (StoreList, int, Throttled for storeID := range sp.DetailsMu.StoreDetails { storeIDs = append(storeIDs, storeID) } - return sp.getStoreListFromIDsLocked(storeIDs, filter) + return sp.getStoreListFromIDsLocked(storeIDs, sp.NodeLivenessFn, filter) } // GetStoreListFromIDs is the same function as GetStoreList but only returns stores @@ -973,13 +997,13 @@ func (sp *StorePool) GetStoreListFromIDs( ) (StoreList, int, ThrottledStoreReasons) { sp.DetailsMu.Lock() defer sp.DetailsMu.Unlock() - return sp.getStoreListFromIDsLocked(storeIDs, filter) + return sp.getStoreListFromIDsLocked(storeIDs, sp.NodeLivenessFn, filter) } // getStoreListFromIDsRLocked is the same function as GetStoreList but requires // that the detailsMU read lock is held. func (sp *StorePool) getStoreListFromIDsLocked( - storeIDs roachpb.StoreIDSlice, filter StoreFilter, + storeIDs roachpb.StoreIDSlice, nl NodeLivenessFunc, filter StoreFilter, ) (StoreList, int, ThrottledStoreReasons) { if sp.deterministic { sort.Sort(storeIDs) @@ -1001,7 +1025,7 @@ func (sp *StorePool) getStoreListFromIDsLocked( // Do nothing; this store is not in the StorePool. continue } - switch s := detail.status(now, timeUntilStoreDead, sp.NodeLivenessFn, timeAfterStoreSuspect); s { + switch s := detail.status(now, timeUntilStoreDead, nl, timeAfterStoreSuspect); s { case storeStatusThrottled: aliveStoreCount++ throttled = append(throttled, detail.throttledBecause) @@ -1142,13 +1166,13 @@ func (sp *StorePool) IsStoreReadyForRoutineReplicaTransfer(ctx context.Context, if sp.OverrideIsStoreReadyForRoutineReplicaTransferFn != nil { return sp.OverrideIsStoreReadyForRoutineReplicaTransferFn(ctx, targetStoreID) } - return sp.isStoreReadyForRoutineReplicaTransferInternal(ctx, targetStoreID) + return sp.isStoreReadyForRoutineReplicaTransferInternal(ctx, targetStoreID, sp.NodeLivenessFn) } func (sp *StorePool) isStoreReadyForRoutineReplicaTransferInternal( - ctx context.Context, targetStoreID roachpb.StoreID, + ctx context.Context, targetStoreID roachpb.StoreID, nl NodeLivenessFunc, ) bool { - status, err := sp.storeStatus(targetStoreID) + status, err := sp.storeStatus(targetStoreID, nl) if err != nil { return false } diff --git a/pkg/kv/kvserver/allocator/storepool/store_pool_test.go b/pkg/kv/kvserver/allocator/storepool/store_pool_test.go index bdea163c45d1..51c456d5390e 100644 --- a/pkg/kv/kvserver/allocator/storepool/store_pool_test.go +++ b/pkg/kv/kvserver/allocator/storepool/store_pool_test.go @@ -78,7 +78,7 @@ func TestStorePoolGossipUpdate(t *testing.T) { // verifyStoreList ensures that the returned list of stores is correct. func verifyStoreList( - sp *StorePool, + sp AllocatorStorePool, constraints []roachpb.ConstraintsConjunction, storeIDs roachpb.StoreIDSlice, // optional filter StoreFilter, From 874eae862401d1e63c2f681557ff6f235bf6c1c2 Mon Sep 17 00:00:00 2001 From: Alex Sarkesian Date: Tue, 15 Nov 2022 16:04:57 -0500 Subject: [PATCH 3/4] allocator: refactor allocator to accept StorePool arguments This change adds methods to be to evaluate allocator actions and targets utilizing a passed-in `StorePool` object, allowing for the allocator to consider potential scenarios rather than those simply based on the current liveness. Depends on #91461, #91965. Part of #91570. Release note: None --- .../allocator/allocatorimpl/allocator.go | 85 +++- .../allocator/allocatorimpl/allocator_test.go | 452 ++++++++++++++++++ 2 files changed, 522 insertions(+), 15 deletions(-) diff --git a/pkg/kv/kvserver/allocator/allocatorimpl/allocator.go b/pkg/kv/kvserver/allocator/allocatorimpl/allocator.go index ddd6881f3f53..f262acf62f4d 100644 --- a/pkg/kv/kvserver/allocator/allocatorimpl/allocator.go +++ b/pkg/kv/kvserver/allocator/allocatorimpl/allocator.go @@ -599,7 +599,18 @@ func GetNeededNonVoters(numVoters, zoneConfigNonVoterCount, clusterNodes int) in func (a *Allocator) ComputeAction( ctx context.Context, conf roachpb.SpanConfig, desc *roachpb.RangeDescriptor, ) (action AllocatorAction, priority float64) { - if a.StorePool == nil { + return a.ComputeActionWithStorePool(ctx, a.StorePool, conf, desc) +} + +// ComputeActionWithStorePool determines the exact operation needed to repair the +// supplied range using the provided StorePool, as governed by the supplied zone +// configuration. It returns the required action that should be taken and a +// priority. +func (a *Allocator) ComputeActionWithStorePool( + ctx context.Context, storePool storepool.AllocatorStorePool, conf roachpb.SpanConfig, + desc *roachpb.RangeDescriptor, +) (action AllocatorAction, priority float64) { + if storePool == nil { // Do nothing if storePool is nil for some unittests. action = AllocatorNoop return action, action.Priority() @@ -657,13 +668,14 @@ func (a *Allocator) ComputeAction( return action, action.Priority() } - return a.computeAction(ctx, conf, desc.Replicas().VoterDescriptors(), + return a.computeAction(ctx, storePool, conf, desc.Replicas().VoterDescriptors(), desc.Replicas().NonVoterDescriptors()) } func (a *Allocator) computeAction( ctx context.Context, + storePool storepool.AllocatorStorePool, conf roachpb.SpanConfig, voterReplicas []roachpb.ReplicaDescriptor, nonVoterReplicas []roachpb.ReplicaDescriptor, @@ -681,10 +693,10 @@ func (a *Allocator) computeAction( // After that we handle rebalancing related actions, followed by removal // actions. haveVoters := len(voterReplicas) - decommissioningVoters := a.StorePool.DecommissioningReplicas(voterReplicas) + decommissioningVoters := storePool.DecommissioningReplicas(voterReplicas) // Node count including dead nodes but excluding // decommissioning/decommissioned nodes. - clusterNodes := a.StorePool.ClusterNodeCount() + clusterNodes := storePool.ClusterNodeCount() neededVoters := GetNeededVoters(conf.GetNumVoters(), clusterNodes) desiredQuorum := computeQuorum(neededVoters) quorum := computeQuorum(haveVoters) @@ -710,7 +722,7 @@ func (a *Allocator) computeAction( // heartbeat in the recent past. This means we won't move those replicas // elsewhere (for a regular rebalance or for decommissioning). const includeSuspectAndDrainingStores = true - liveVoters, deadVoters := a.StorePool.LiveAndDeadReplicas(voterReplicas, includeSuspectAndDrainingStores) + liveVoters, deadVoters := storePool.LiveAndDeadReplicas(voterReplicas, includeSuspectAndDrainingStores) if len(liveVoters) < quorum { // Do not take any replacement/removal action if we do not have a quorum of @@ -789,7 +801,7 @@ func (a *Allocator) computeAction( return action, action.Priority() } - liveNonVoters, deadNonVoters := a.StorePool.LiveAndDeadReplicas( + liveNonVoters, deadNonVoters := storePool.LiveAndDeadReplicas( nonVoterReplicas, includeSuspectAndDrainingStores, ) if haveNonVoters == neededNonVoters && len(deadNonVoters) > 0 { @@ -800,7 +812,7 @@ func (a *Allocator) computeAction( return action, action.Priority() } - decommissioningNonVoters := a.StorePool.DecommissioningReplicas(nonVoterReplicas) + decommissioningNonVoters := storePool.DecommissioningReplicas(nonVoterReplicas) if haveNonVoters == neededNonVoters && len(decommissioningNonVoters) > 0 { // The range has non-voter(s) on a decommissioning node that we should // replace. @@ -922,12 +934,13 @@ func (s *GoodCandidateSelector) selectOne(cl candidateList) *candidate { func (a *Allocator) allocateTarget( ctx context.Context, + storePool storepool.AllocatorStorePool, conf roachpb.SpanConfig, existingVoters, existingNonVoters []roachpb.ReplicaDescriptor, replicaStatus ReplicaStatus, targetType TargetReplicaType, ) (roachpb.ReplicationTarget, string, error) { - candidateStoreList, aliveStoreCount, throttled := a.StorePool.GetStoreList(storepool.StoreFilterThrottled) + candidateStoreList, aliveStoreCount, throttled := storePool.GetStoreList(storepool.StoreFilterThrottled) // If the replica is alive we are upreplicating, and in that case we want to // allocate new replicas on the best possible store. Otherwise, the replica is @@ -941,8 +954,9 @@ func (a *Allocator) allocateTarget( selector = a.NewGoodCandidateSelector() } - target, details := a.AllocateTargetFromList( + target, details := a.allocateTargetFromList( ctx, + storePool, candidateStoreList, conf, existingVoters, @@ -988,7 +1002,20 @@ func (a *Allocator) AllocateVoter( existingVoters, existingNonVoters []roachpb.ReplicaDescriptor, replicaStatus ReplicaStatus, ) (roachpb.ReplicationTarget, string, error) { - return a.allocateTarget(ctx, conf, existingVoters, existingNonVoters, replicaStatus, VoterTarget) + return a.allocateTarget(ctx, a.StorePool, conf, existingVoters, existingNonVoters, replicaStatus, VoterTarget) +} + +// AllocateVoterWithStorePool returns a suitable store for a new allocation of a voting +// replica with the required attributes using the given storePool. Nodes already +// accommodating existing voting replicas are ruled out as targets. +func (a *Allocator) AllocateVoterWithStorePool( + ctx context.Context, + storePool storepool.AllocatorStorePool, + conf roachpb.SpanConfig, + existingVoters, existingNonVoters []roachpb.ReplicaDescriptor, + replicaStatus ReplicaStatus, +) (roachpb.ReplicationTarget, string, error) { + return a.allocateTarget(ctx, storePool, conf, existingVoters, existingNonVoters, replicaStatus, VoterTarget) } // AllocateNonVoter returns a suitable store for a new allocation of a @@ -1000,7 +1027,20 @@ func (a *Allocator) AllocateNonVoter( existingVoters, existingNonVoters []roachpb.ReplicaDescriptor, replicaStatus ReplicaStatus, ) (roachpb.ReplicationTarget, string, error) { - return a.allocateTarget(ctx, conf, existingVoters, existingNonVoters, replicaStatus, NonVoterTarget) + return a.allocateTarget(ctx, a.StorePool, conf, existingVoters, existingNonVoters, replicaStatus, NonVoterTarget) +} + +// AllocateNonVoterWithStorePool returns a suitable store for a new allocation of a +// non-voting replica with the required attributes using the given storePool. Nodes already +// accommodating _any_ existing replicas are ruled out as targets. +func (a *Allocator) AllocateNonVoterWithStorePool( + ctx context.Context, + storePool storepool.AllocatorStorePool, + conf roachpb.SpanConfig, + existingVoters, existingNonVoters []roachpb.ReplicaDescriptor, + replicaStatus ReplicaStatus, +) (roachpb.ReplicationTarget, string, error) { + return a.allocateTarget(ctx, storePool, conf, existingVoters, existingNonVoters, replicaStatus, NonVoterTarget) } // AllocateTargetFromList returns a suitable store for a new allocation of a @@ -1015,11 +1055,26 @@ func (a *Allocator) AllocateTargetFromList( selector CandidateSelector, allowMultipleReplsPerNode bool, targetType TargetReplicaType, +) (roachpb.ReplicationTarget, string) { + return a.allocateTargetFromList(ctx, a.StorePool, candidateStores, conf, existingVoters, + existingNonVoters, options, selector, allowMultipleReplsPerNode, targetType) +} + +func (a *Allocator) allocateTargetFromList( + ctx context.Context, + storePool storepool.AllocatorStorePool, + candidateStores storepool.StoreList, + conf roachpb.SpanConfig, + existingVoters, existingNonVoters []roachpb.ReplicaDescriptor, + options ScorerOptions, + selector CandidateSelector, + allowMultipleReplsPerNode bool, + targetType TargetReplicaType, ) (roachpb.ReplicationTarget, string) { existingReplicas := append(existingVoters, existingNonVoters...) - analyzedOverallConstraints := constraint.AnalyzeConstraints(ctx, a.StorePool.GetStoreDescriptor, + analyzedOverallConstraints := constraint.AnalyzeConstraints(ctx, storePool.GetStoreDescriptor, existingReplicas, conf.NumReplicas, conf.Constraints) - analyzedVoterConstraints := constraint.AnalyzeConstraints(ctx, a.StorePool.GetStoreDescriptor, + analyzedVoterConstraints := constraint.AnalyzeConstraints(ctx, storePool.GetStoreDescriptor, existingVoters, conf.GetNumVoters(), conf.VoterConstraints) var constraintsChecker constraintsCheckFn @@ -1048,8 +1103,8 @@ func (a *Allocator) AllocateTargetFromList( constraintsChecker, existingReplicaSet, existingNonVoters, - a.StorePool.GetLocalitiesByStore(existingReplicaSet), - a.StorePool.IsStoreReadyForRoutineReplicaTransfer, + storePool.GetLocalitiesByStore(existingReplicaSet), + storePool.IsStoreReadyForRoutineReplicaTransfer, allowMultipleReplsPerNode, options, targetType, diff --git a/pkg/kv/kvserver/allocator/allocatorimpl/allocator_test.go b/pkg/kv/kvserver/allocator/allocatorimpl/allocator_test.go index df8207b30d2a..d6a4bb635e80 100644 --- a/pkg/kv/kvserver/allocator/allocatorimpl/allocator_test.go +++ b/pkg/kv/kvserver/allocator/allocatorimpl/allocator_test.go @@ -821,6 +821,59 @@ func TestAllocatorExistingReplica(t *testing.T) { } } +func TestAllocatorReplaceDecommissioningReplica(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + stopper, g, sp, a, _ := CreateTestAllocator(ctx, 1, false /* deterministic */) + defer stopper.Stop(ctx) + gossiputil.NewStoreGossiper(g).GossipStores(sameDCStores, t) + + // Override liveness of n3 to decommissioning so the only available target is s4. + oSp := storepool.NewOverrideStorePool(sp, func(nid roachpb.NodeID, now time.Time, timeUntilStoreDead time.Duration) livenesspb.NodeLivenessStatus { + if nid == roachpb.NodeID(3) { + return livenesspb.NodeLivenessStatus_DECOMMISSIONING + } + + return sp.NodeLivenessFn(nid, now, timeUntilStoreDead) + }) + + result, _, err := a.AllocateVoterWithStorePool( + ctx, + oSp, + roachpb.SpanConfig{ + NumReplicas: 3, + Constraints: []roachpb.ConstraintsConjunction{ + { + Constraints: []roachpb.Constraint{ + {Value: "mem", Type: roachpb.Constraint_PROHIBITED}, + }, + }, + }, + }, + []roachpb.ReplicaDescriptor{ + { + NodeID: 1, + StoreID: 1, + ReplicaID: 1, + }, + { + NodeID: 2, + StoreID: 2, + ReplicaID: 2, + }, + }, nil, /* existingNonVoters */ + Decommissioning, + ) + if err != nil { + t.Fatalf("Unable to perform allocation: %+v", err) + } + if !(result.StoreID == 4) { + t.Errorf("expected result to have store ID 4: %+v", result) + } +} + func TestAllocatorMultipleStoresPerNode(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) @@ -6424,6 +6477,108 @@ func TestAllocatorComputeActionRemoveDead(t *testing.T) { } } +func TestAllocatorComputeActionWithStorePoolRemoveDead(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + conf := roachpb.SpanConfig{NumReplicas: 3} + threeReplDesc := roachpb.RangeDescriptor{ + InternalReplicas: []roachpb.ReplicaDescriptor{ + { + StoreID: 1, + NodeID: 1, + ReplicaID: 1, + }, + { + StoreID: 2, + NodeID: 2, + ReplicaID: 2, + }, + { + StoreID: 3, + NodeID: 3, + ReplicaID: 3, + }, + }, + } + fourReplDesc := threeReplDesc + fourReplDesc.InternalReplicas = append(fourReplDesc.InternalReplicas, roachpb.ReplicaDescriptor{ + StoreID: 4, + NodeID: 4, + ReplicaID: 4, + }) + + // Each test case should describe a repair situation which has a lower + // priority than the previous test case. + testCases := []struct { + desc roachpb.RangeDescriptor + live []roachpb.StoreID + dead []roachpb.StoreID + expectedAction AllocatorAction + }{ + // Needs three replicas, one is dead, and there's no replacement. Since + // there's no replacement we can't do anything, but an action is still + // emitted. + { + desc: threeReplDesc, + live: []roachpb.StoreID{1, 2}, + dead: []roachpb.StoreID{3}, + expectedAction: AllocatorReplaceDeadVoter, + }, + // Needs three replicas, one is dead, but there is a replacement. + { + desc: threeReplDesc, + live: []roachpb.StoreID{1, 2, 4}, + dead: []roachpb.StoreID{3}, + expectedAction: AllocatorReplaceDeadVoter, + }, + // Needs three replicas, two are dead (i.e. the range lacks a quorum). + { + desc: threeReplDesc, + live: []roachpb.StoreID{1, 4}, + dead: []roachpb.StoreID{2, 3}, + expectedAction: AllocatorRangeUnavailable, + }, + // Needs three replicas, has four, one is dead. + { + desc: fourReplDesc, + live: []roachpb.StoreID{1, 2, 4}, + dead: []roachpb.StoreID{3}, + expectedAction: AllocatorRemoveDeadVoter, + }, + // Needs three replicas, has four, two are dead (i.e. the range lacks a quorum). + { + desc: fourReplDesc, + live: []roachpb.StoreID{1, 4}, + dead: []roachpb.StoreID{2, 3}, + expectedAction: AllocatorRangeUnavailable, + }, + } + + ctx := context.Background() + stopper, _, sp, a, _ := CreateTestAllocator(ctx, 10, false /* deterministic */) + defer stopper.Stop(ctx) + + for i, tcase := range testCases { + // Mark all dead nodes as alive, so we can override later. + all := append(tcase.live, tcase.dead...) + mockStorePool(sp, all, nil, nil, nil, nil, nil) + oSp := storepool.NewOverrideStorePool(sp, func(nid roachpb.NodeID, now time.Time, timeUntilStoreDead time.Duration) livenesspb.NodeLivenessStatus { + for _, deadStoreID := range tcase.dead { + if nid == roachpb.NodeID(deadStoreID) { + return livenesspb.NodeLivenessStatus_DEAD + } + } + + return sp.NodeLivenessFn(nid, now, timeUntilStoreDead) + }) + action, _ := a.ComputeActionWithStorePool(ctx, oSp, conf, &tcase.desc) + if tcase.expectedAction != action { + t.Errorf("Test case %d expected action %d, got action %d", i, tcase.expectedAction, action) + } + } +} + func TestAllocatorComputeActionSuspect(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) @@ -6777,6 +6932,303 @@ func TestAllocatorComputeActionDecommission(t *testing.T) { } } +func TestAllocatorComputeActionWithStorePoolDecommission(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + testCases := []struct { + conf roachpb.SpanConfig + desc roachpb.RangeDescriptor + expectedAction AllocatorAction + live []roachpb.StoreID + dead []roachpb.StoreID + decommissioning []roachpb.StoreID + decommissioned []roachpb.StoreID + }{ + // Has three replicas, but one is in decommissioning status. We can't + // replace it (nor add a new replica) since there isn't a live target, + // but that's still the action being emitted. + { + conf: roachpb.SpanConfig{NumReplicas: 3}, + desc: roachpb.RangeDescriptor{ + InternalReplicas: []roachpb.ReplicaDescriptor{ + { + StoreID: 1, + NodeID: 1, + ReplicaID: 1, + }, + { + StoreID: 2, + NodeID: 2, + ReplicaID: 2, + }, + { + StoreID: 3, + NodeID: 3, + ReplicaID: 3, + }, + }, + }, + expectedAction: AllocatorReplaceDecommissioningVoter, + live: []roachpb.StoreID{1, 2}, + dead: nil, + decommissioning: []roachpb.StoreID{3}, + }, + // Has three replicas, one is in decommissioning status, and one is on a + // dead node. Replacing the dead replica is more important. + { + conf: roachpb.SpanConfig{NumReplicas: 3}, + desc: roachpb.RangeDescriptor{ + InternalReplicas: []roachpb.ReplicaDescriptor{ + { + StoreID: 1, + NodeID: 1, + ReplicaID: 1, + }, + { + StoreID: 2, + NodeID: 2, + ReplicaID: 2, + }, + { + StoreID: 3, + NodeID: 3, + ReplicaID: 3, + }, + }, + }, + expectedAction: AllocatorReplaceDeadVoter, + live: []roachpb.StoreID{1}, + dead: []roachpb.StoreID{2}, + decommissioning: []roachpb.StoreID{3}, + }, + // Needs three replicas, has four, where one is decommissioning and one is + // dead. + { + conf: roachpb.SpanConfig{NumReplicas: 3}, + desc: roachpb.RangeDescriptor{ + InternalReplicas: []roachpb.ReplicaDescriptor{ + { + StoreID: 1, + NodeID: 1, + ReplicaID: 1, + }, + { + StoreID: 2, + NodeID: 2, + ReplicaID: 2, + }, + { + StoreID: 3, + NodeID: 3, + ReplicaID: 3, + }, + { + StoreID: 4, + NodeID: 4, + ReplicaID: 4, + }, + }, + }, + expectedAction: AllocatorRemoveDeadVoter, + live: []roachpb.StoreID{1, 4}, + dead: []roachpb.StoreID{2}, + decommissioning: []roachpb.StoreID{3}, + }, + // Needs three replicas, has four, where one is decommissioning and one is + // decommissioned. + { + conf: roachpb.SpanConfig{NumReplicas: 3}, + desc: roachpb.RangeDescriptor{ + InternalReplicas: []roachpb.ReplicaDescriptor{ + { + StoreID: 1, + NodeID: 1, + ReplicaID: 1, + }, + { + StoreID: 2, + NodeID: 2, + ReplicaID: 2, + }, + { + StoreID: 3, + NodeID: 3, + ReplicaID: 3, + }, + { + StoreID: 4, + NodeID: 4, + ReplicaID: 4, + }, + }, + }, + expectedAction: AllocatorRemoveDeadVoter, + live: []roachpb.StoreID{1, 4}, + dead: nil, + decommissioning: []roachpb.StoreID{3}, + decommissioned: []roachpb.StoreID{2}, + }, + // Needs three replicas, has three, all decommissioning + { + conf: roachpb.SpanConfig{NumReplicas: 3}, + desc: roachpb.RangeDescriptor{ + InternalReplicas: []roachpb.ReplicaDescriptor{ + { + StoreID: 1, + NodeID: 1, + ReplicaID: 1, + }, + { + StoreID: 2, + NodeID: 2, + ReplicaID: 2, + }, + { + StoreID: 3, + NodeID: 3, + ReplicaID: 3, + }, + }, + }, + expectedAction: AllocatorReplaceDecommissioningVoter, + live: nil, + dead: nil, + decommissioning: []roachpb.StoreID{1, 2, 3}, + }, + // Needs 3. Has 1 live, 3 decommissioning. + { + conf: roachpb.SpanConfig{NumReplicas: 3}, + desc: roachpb.RangeDescriptor{ + InternalReplicas: []roachpb.ReplicaDescriptor{ + { + StoreID: 1, + NodeID: 1, + ReplicaID: 1, + }, + { + StoreID: 2, + NodeID: 2, + ReplicaID: 2, + }, + { + StoreID: 3, + NodeID: 3, + ReplicaID: 3, + }, + { + StoreID: 4, + NodeID: 4, + ReplicaID: 4, + }, + }, + }, + expectedAction: AllocatorRemoveDecommissioningVoter, + live: []roachpb.StoreID{4}, + dead: nil, + decommissioning: []roachpb.StoreID{1, 2, 3}, + }, + { + conf: roachpb.SpanConfig{ + NumVoters: 1, + NumReplicas: 3, + }, + desc: roachpb.RangeDescriptor{ + InternalReplicas: []roachpb.ReplicaDescriptor{ + { + StoreID: 1, + NodeID: 1, + ReplicaID: 1, + }, + { + StoreID: 4, + NodeID: 4, + ReplicaID: 4, + Type: roachpb.NON_VOTER, + }, + { + StoreID: 6, + NodeID: 6, + ReplicaID: 6, + Type: roachpb.NON_VOTER, + }, + { + StoreID: 7, + NodeID: 7, + ReplicaID: 7, + Type: roachpb.NON_VOTER, + }, + }, + }, + expectedAction: AllocatorRemoveDecommissioningNonVoter, + live: []roachpb.StoreID{1, 4, 6}, + dead: nil, + decommissioning: []roachpb.StoreID{7}, + }, + { + conf: roachpb.SpanConfig{ + NumVoters: 1, + NumReplicas: 3, + }, + desc: roachpb.RangeDescriptor{ + InternalReplicas: []roachpb.ReplicaDescriptor{ + { + StoreID: 1, + NodeID: 1, + ReplicaID: 1, + }, + { + StoreID: 4, + NodeID: 4, + ReplicaID: 4, + Type: roachpb.NON_VOTER, + }, + { + StoreID: 6, + NodeID: 6, + ReplicaID: 6, + Type: roachpb.NON_VOTER, + }, + }, + }, + expectedAction: AllocatorReplaceDecommissioningNonVoter, + live: []roachpb.StoreID{1, 2, 3, 4, 6}, + dead: nil, + decommissioning: []roachpb.StoreID{4}, + }, + } + + ctx := context.Background() + stopper, _, sp, a, _ := CreateTestAllocator(ctx, 10, false /* deterministic */) + defer stopper.Stop(ctx) + + for i, tcase := range testCases { + // Mark all decommissioning and decommissioned nodes as alive, so we can override later. + all := append(tcase.live, tcase.decommissioning...) + all = append(all, tcase.decommissioned...) + overrideLivenessMap := make(map[roachpb.NodeID]livenesspb.NodeLivenessStatus) + for _, sID := range tcase.decommissioned { + overrideLivenessMap[roachpb.NodeID(sID)] = livenesspb.NodeLivenessStatus_DECOMMISSIONED + } + for _, sID := range tcase.decommissioning { + overrideLivenessMap[roachpb.NodeID(sID)] = livenesspb.NodeLivenessStatus_DECOMMISSIONING + } + mockStorePool(sp, all, nil, tcase.dead, nil, nil, nil) + oSp := storepool.NewOverrideStorePool(sp, func(nid roachpb.NodeID, now time.Time, timeUntilStoreDead time.Duration) livenesspb.NodeLivenessStatus { + if liveness, ok := overrideLivenessMap[nid]; ok { + return liveness + } + + return sp.NodeLivenessFn(nid, now, timeUntilStoreDead) + }) + action, _ := a.ComputeActionWithStorePool(ctx, oSp, tcase.conf, &tcase.desc) + if tcase.expectedAction != action { + t.Errorf("Test case %d expected action %s, got action %s", i, tcase.expectedAction, action) + continue + } + } +} + func TestAllocatorRemoveLearner(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) From 2d4e0b28102b51091dceb77842c1c22ab73b4fdd Mon Sep 17 00:00:00 2001 From: Alex Sarkesian Date: Tue, 15 Nov 2022 17:03:18 -0500 Subject: [PATCH 4/4] allocator: refactor to accept StorePool on rebalance WIP Release note: None --- .../allocator/allocatorimpl/allocator.go | 169 +++++++++++++++--- 1 file changed, 147 insertions(+), 22 deletions(-) diff --git a/pkg/kv/kvserver/allocator/allocatorimpl/allocator.go b/pkg/kv/kvserver/allocator/allocatorimpl/allocator.go index f262acf62f4d..c62b1e7340a8 100644 --- a/pkg/kv/kvserver/allocator/allocatorimpl/allocator.go +++ b/pkg/kv/kvserver/allocator/allocatorimpl/allocator.go @@ -1128,6 +1128,7 @@ func (a *Allocator) allocateTargetFromList( func (a Allocator) simulateRemoveTarget( ctx context.Context, + storePool storepool.AllocatorStorePool, targetStore roachpb.StoreID, conf roachpb.SpanConfig, candidates []roachpb.ReplicaDescriptor, @@ -1150,8 +1151,8 @@ func (a Allocator) simulateRemoveTarget( // Update statistics first switch t := targetType; t { case VoterTarget: - a.StorePool.UpdateLocalStoreAfterRebalance(targetStore, rangeUsageInfo, roachpb.ADD_VOTER) - defer a.StorePool.UpdateLocalStoreAfterRebalance( + storePool.UpdateLocalStoreAfterRebalance(targetStore, rangeUsageInfo, roachpb.ADD_VOTER) + defer storePool.UpdateLocalStoreAfterRebalance( targetStore, rangeUsageInfo, roachpb.REMOVE_VOTER, @@ -1159,21 +1160,21 @@ func (a Allocator) simulateRemoveTarget( log.KvDistribution.VEventf(ctx, 3, "simulating which voter would be removed after adding s%d", targetStore) - return a.RemoveTarget( - ctx, conf, storepool.MakeStoreList(candidateStores), + return a.removeTarget( + ctx, storePool, conf, storepool.MakeStoreList(candidateStores), existingVoters, existingNonVoters, VoterTarget, options, ) case NonVoterTarget: - a.StorePool.UpdateLocalStoreAfterRebalance(targetStore, rangeUsageInfo, roachpb.ADD_NON_VOTER) - defer a.StorePool.UpdateLocalStoreAfterRebalance( + storePool.UpdateLocalStoreAfterRebalance(targetStore, rangeUsageInfo, roachpb.ADD_NON_VOTER) + defer storePool.UpdateLocalStoreAfterRebalance( targetStore, rangeUsageInfo, roachpb.REMOVE_NON_VOTER, ) log.KvDistribution.VEventf(ctx, 3, "simulating which non-voter would be removed after adding s%d", targetStore) - return a.RemoveTarget( - ctx, conf, storepool.MakeStoreList(candidateStores), + return a.removeTarget( + ctx, storePool, conf, storepool.MakeStoreList(candidateStores), existingVoters, existingNonVoters, NonVoterTarget, options, ) default: @@ -1205,6 +1206,20 @@ func (a Allocator) RemoveTarget( existingNonVoters []roachpb.ReplicaDescriptor, targetType TargetReplicaType, options ScorerOptions, +) (roachpb.ReplicationTarget, string, error) { + return a.removeTarget(ctx, a.StorePool, conf, candidateStoreList, existingVoters, + existingNonVoters, targetType, options) +} + +func (a Allocator) removeTarget( + ctx context.Context, + storePool storepool.AllocatorStorePool, + conf roachpb.SpanConfig, + candidateStoreList storepool.StoreList, + existingVoters []roachpb.ReplicaDescriptor, + existingNonVoters []roachpb.ReplicaDescriptor, + targetType TargetReplicaType, + options ScorerOptions, ) (roachpb.ReplicationTarget, string, error) { if len(candidateStoreList.Stores) == 0 { return roachpb.ReplicationTarget{}, "", errors.Errorf( @@ -1214,9 +1229,9 @@ func (a Allocator) RemoveTarget( } existingReplicas := append(existingVoters, existingNonVoters...) - analyzedOverallConstraints := constraint.AnalyzeConstraints(ctx, a.StorePool.GetStoreDescriptor, + analyzedOverallConstraints := constraint.AnalyzeConstraints(ctx, storePool.GetStoreDescriptor, existingReplicas, conf.NumReplicas, conf.Constraints) - analyzedVoterConstraints := constraint.AnalyzeConstraints(ctx, a.StorePool.GetStoreDescriptor, + analyzedVoterConstraints := constraint.AnalyzeConstraints(ctx, storePool.GetStoreDescriptor, existingVoters, conf.GetNumVoters(), conf.VoterConstraints) var constraintsChecker constraintsCheckFn @@ -1240,7 +1255,7 @@ func (a Allocator) RemoveTarget( ctx, candidateStoreList, constraintsChecker, - a.StorePool.GetLocalitiesByStore(replicaSetForDiversityCalc), + storePool.GetLocalitiesByStore(replicaSetForDiversityCalc), options, ) @@ -1275,16 +1290,35 @@ func (a Allocator) RemoveVoter( existingVoters []roachpb.ReplicaDescriptor, existingNonVoters []roachpb.ReplicaDescriptor, options ScorerOptions, +) (roachpb.ReplicationTarget, string, error) { + return a.RemoveVoterWithStorePool(ctx, a.StorePool, conf, voterCandidates, existingVoters, + existingNonVoters, options) +} + +// RemoveVoterWithStorePool returns a suitable replica to remove from the +// provided replica set using the given storePool. It first attempts to randomly +// select a target from the set of stores that have greater than the average +// number of replicas. Failing that, it falls back to selecting a random target +// from any of the existing voting replicas. +func (a Allocator) RemoveVoterWithStorePool( + ctx context.Context, + storePool storepool.AllocatorStorePool, + conf roachpb.SpanConfig, + voterCandidates []roachpb.ReplicaDescriptor, + existingVoters []roachpb.ReplicaDescriptor, + existingNonVoters []roachpb.ReplicaDescriptor, + options ScorerOptions, ) (roachpb.ReplicationTarget, string, error) { // Retrieve store descriptors for the provided candidates from the StorePool. candidateStoreIDs := make(roachpb.StoreIDSlice, len(voterCandidates)) for i, exist := range voterCandidates { candidateStoreIDs[i] = exist.StoreID } - candidateStoreList, _, _ := a.StorePool.GetStoreListFromIDs(candidateStoreIDs, storepool.StoreFilterNone) + candidateStoreList, _, _ := storePool.GetStoreListFromIDs(candidateStoreIDs, storepool.StoreFilterNone) - return a.RemoveTarget( + return a.removeTarget( ctx, + storePool, conf, candidateStoreList, existingVoters, @@ -1306,16 +1340,35 @@ func (a Allocator) RemoveNonVoter( existingVoters []roachpb.ReplicaDescriptor, existingNonVoters []roachpb.ReplicaDescriptor, options ScorerOptions, +) (roachpb.ReplicationTarget, string, error) { + return a.RemoveNonVoterWithStorePool(ctx, a.StorePool, conf, nonVoterCandidates, existingVoters, + existingNonVoters, options) +} + +// RemoveNonVoterWithStorePool returns a suitable non-voting replica to remove +// from the provided set using the given storePool. It first attempts to randomly +// select a target from the set of stores that have greater than the average +// number of replicas. Failing that, it falls back to selecting a random target +// from any of the existing non-voting replicas. +func (a Allocator) RemoveNonVoterWithStorePool( + ctx context.Context, + storePool storepool.AllocatorStorePool, + conf roachpb.SpanConfig, + nonVoterCandidates []roachpb.ReplicaDescriptor, + existingVoters []roachpb.ReplicaDescriptor, + existingNonVoters []roachpb.ReplicaDescriptor, + options ScorerOptions, ) (roachpb.ReplicationTarget, string, error) { // Retrieve store descriptors for the provided candidates from the StorePool. candidateStoreIDs := make(roachpb.StoreIDSlice, len(nonVoterCandidates)) for i, exist := range nonVoterCandidates { candidateStoreIDs[i] = exist.StoreID } - candidateStoreList, _, _ := a.StorePool.GetStoreListFromIDs(candidateStoreIDs, storepool.StoreFilterNone) + candidateStoreList, _, _ := storePool.GetStoreListFromIDs(candidateStoreIDs, storepool.StoreFilterNone) - return a.RemoveTarget( + return a.removeTarget( ctx, + storePool, conf, candidateStoreList, existingVoters, @@ -1337,7 +1390,22 @@ func (a Allocator) RebalanceTarget( targetType TargetReplicaType, options ScorerOptions, ) (add, remove roachpb.ReplicationTarget, details string, ok bool) { - sl, _, _ := a.StorePool.GetStoreList(filter) + return a.rebalanceTarget(ctx, a.StorePool, conf, raftStatus, existingVoters, existingNonVoters, + rangeUsageInfo, filter, targetType, options) +} + +func (a Allocator) rebalanceTarget( + ctx context.Context, + storePool storepool.AllocatorStorePool, + conf roachpb.SpanConfig, + raftStatus *raft.Status, + existingVoters, existingNonVoters []roachpb.ReplicaDescriptor, + rangeUsageInfo allocator.RangeUsageInfo, + filter storepool.StoreFilter, + targetType TargetReplicaType, + options ScorerOptions, +) (add, remove roachpb.ReplicationTarget, details string, ok bool) { + sl, _, _ := storePool.GetStoreList(filter) // If we're considering a rebalance due to an `AdminScatterRequest`, we'd like // to ensure that we're returning a random rebalance target to a new store @@ -1349,9 +1417,9 @@ func (a Allocator) RebalanceTarget( zero := roachpb.ReplicationTarget{} analyzedOverallConstraints := constraint.AnalyzeConstraints( - ctx, a.StorePool.GetStoreDescriptor, existingReplicas, conf.NumReplicas, conf.Constraints) + ctx, storePool.GetStoreDescriptor, existingReplicas, conf.NumReplicas, conf.Constraints) analyzedVoterConstraints := constraint.AnalyzeConstraints( - ctx, a.StorePool.GetStoreDescriptor, existingVoters, conf.GetNumVoters(), conf.VoterConstraints) + ctx, storePool.GetStoreDescriptor, existingVoters, conf.GetNumVoters(), conf.VoterConstraints) var removalConstraintsChecker constraintsCheckFn var rebalanceConstraintsChecker rebalanceConstraintsCheckFn var replicaSetToRebalance, replicasWithExcludedStores []roachpb.ReplicaDescriptor @@ -1391,8 +1459,8 @@ func (a Allocator) RebalanceTarget( rebalanceConstraintsChecker, replicaSetToRebalance, replicasWithExcludedStores, - a.StorePool.GetLocalitiesByStore(replicaSetForDiversityCalc), - a.StorePool.IsStoreReadyForRoutineReplicaTransfer, + storePool.GetLocalitiesByStore(replicaSetForDiversityCalc), + storePool.IsStoreReadyForRoutineReplicaTransfer, options, a.Metrics, ) @@ -1442,6 +1510,7 @@ func (a Allocator) RebalanceTarget( var err error removeReplica, removeDetails, err = a.simulateRemoveTarget( ctx, + storePool, target.store.StoreID, conf, replicaCandidates, @@ -1522,8 +1591,36 @@ func (a Allocator) RebalanceVoter( filter storepool.StoreFilter, options ScorerOptions, ) (add, remove roachpb.ReplicationTarget, details string, ok bool) { - return a.RebalanceTarget( + return a.rebalanceTarget( + ctx, + a.StorePool, + conf, + raftStatus, + existingVoters, + existingNonVoters, + rangeUsageInfo, + filter, + VoterTarget, + options, + ) +} + +// RebalanceVoterWithStorePool returns a suitable store for a rebalance target +// with required attributes using the given storePool. +// See comment on Allocator.RebalanceVoter(..). +func (a Allocator) RebalanceVoterWithStorePool( + ctx context.Context, + storePool storepool.AllocatorStorePool, + conf roachpb.SpanConfig, + raftStatus *raft.Status, + existingVoters, existingNonVoters []roachpb.ReplicaDescriptor, + rangeUsageInfo allocator.RangeUsageInfo, + filter storepool.StoreFilter, + options ScorerOptions, +) (add, remove roachpb.ReplicationTarget, details string, ok bool) { + return a.rebalanceTarget( ctx, + storePool, conf, raftStatus, existingVoters, @@ -1556,8 +1653,36 @@ func (a Allocator) RebalanceNonVoter( filter storepool.StoreFilter, options ScorerOptions, ) (add, remove roachpb.ReplicationTarget, details string, ok bool) { - return a.RebalanceTarget( + return a.rebalanceTarget( ctx, + a.StorePool, + conf, + raftStatus, + existingVoters, + existingNonVoters, + rangeUsageInfo, + filter, + NonVoterTarget, + options, + ) +} + +// RebalanceNonVoterWithStorePool returns a suitable pair of rebalance +// candidates for a non-voting replica using the given storePool. +// See comment on Allocator.RebalanceNonVoter(..). +func (a Allocator) RebalanceNonVoterWithStorePool( + ctx context.Context, + storePool storepool.AllocatorStorePool, + conf roachpb.SpanConfig, + raftStatus *raft.Status, + existingVoters, existingNonVoters []roachpb.ReplicaDescriptor, + rangeUsageInfo allocator.RangeUsageInfo, + filter storepool.StoreFilter, + options ScorerOptions, +) (add, remove roachpb.ReplicationTarget, details string, ok bool) { + return a.rebalanceTarget( + ctx, + storePool, conf, raftStatus, existingVoters,