From 46f13fb510253c63a203ab789696d8310073e576 Mon Sep 17 00:00:00 2001 From: Alex Sarkesian Date: Fri, 18 Nov 2022 23:35:22 -0500 Subject: [PATCH] kvserver: support checking allocator action and target by range Adds support for using the allocator to compute the action, if any, needed to "repair" the range and, if the action requires a replica addition (including replacements), the target of that addition. This can be checked by using the new `CheckRangeAction(..)` function as part of a store's replicate queue, and can use the default store pool or an override store pool, so that potential states can be evaluated prior to transition to those states. As such, this feature adds support for allocator action and target validation prior to decommission, in order to support decommission pre-checks. While this is similar to the replicate queue's `PlanOneChange(..)`, this new check supports evaluation based on a range descriptor, rather than an individual replica. Depends on #91941 Part of #91570. Release note: None --- pkg/kv/kvserver/allocator_impl_test.go | 293 +++++++++++++++++++++++++ pkg/kv/kvserver/replicate_queue.go | 213 ++++++++++++++++++ 2 files changed, 506 insertions(+) diff --git a/pkg/kv/kvserver/allocator_impl_test.go b/pkg/kv/kvserver/allocator_impl_test.go index 3f2a90729e84..351a063d60de 100644 --- a/pkg/kv/kvserver/allocator_impl_test.go +++ b/pkg/kv/kvserver/allocator_impl_test.go @@ -18,6 +18,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/allocatorimpl" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/storepool" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness/livenesspb" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/storage/enginepb" "github.com/cockroachdb/cockroach/pkg/testutils/gossiputil" @@ -25,6 +26,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "github.com/stretchr/testify/require" "go.etcd.io/etcd/raft/v3" "go.etcd.io/etcd/raft/v3/tracker" ) @@ -59,6 +61,100 @@ var singleStore = []*roachpb.StoreDescriptor{ }, } +var twoDCStores = []*roachpb.StoreDescriptor{ + { + StoreID: 1, + Attrs: roachpb.Attributes{Attrs: []string{"ssd"}}, + Node: roachpb.NodeDescriptor{ + NodeID: 1, + Attrs: roachpb.Attributes{Attrs: []string{"a"}}, + }, + Capacity: roachpb.StoreCapacity{ + Capacity: 200, + Available: 100, + LogicalBytes: 100, + }, + }, + { + StoreID: 2, + Attrs: roachpb.Attributes{Attrs: []string{"ssd"}}, + Node: roachpb.NodeDescriptor{ + NodeID: 2, + Attrs: roachpb.Attributes{Attrs: []string{"a"}}, + }, + Capacity: roachpb.StoreCapacity{ + Capacity: 200, + Available: 100, + LogicalBytes: 100, + }, + }, + { + StoreID: 3, + Attrs: roachpb.Attributes{Attrs: []string{"ssd"}}, + Node: roachpb.NodeDescriptor{ + NodeID: 3, + Attrs: roachpb.Attributes{Attrs: []string{"a"}}, + }, + Capacity: roachpb.StoreCapacity{ + Capacity: 200, + Available: 100, + LogicalBytes: 100, + }, + }, + { + StoreID: 4, + Attrs: roachpb.Attributes{Attrs: []string{"ssd"}}, + Node: roachpb.NodeDescriptor{ + NodeID: 4, + Attrs: roachpb.Attributes{Attrs: []string{"b"}}, + }, + Capacity: roachpb.StoreCapacity{ + Capacity: 200, + Available: 100, + LogicalBytes: 100, + }, + }, + { + StoreID: 5, + Attrs: roachpb.Attributes{Attrs: []string{"ssd"}}, + Node: roachpb.NodeDescriptor{ + NodeID: 5, + Attrs: roachpb.Attributes{Attrs: []string{"b"}}, + }, + Capacity: roachpb.StoreCapacity{ + Capacity: 200, + Available: 100, + LogicalBytes: 100, + }, + }, + { + StoreID: 6, + Attrs: roachpb.Attributes{Attrs: []string{"ssd"}}, + Node: roachpb.NodeDescriptor{ + NodeID: 6, + Attrs: roachpb.Attributes{Attrs: []string{"b"}}, + }, + Capacity: roachpb.StoreCapacity{ + Capacity: 200, + Available: 100, + LogicalBytes: 100, + }, + }, +} + +func constrainTo(numReplicas int, attr string) roachpb.SpanConfig { + return roachpb.SpanConfig{ + NumReplicas: int32(numReplicas), + Constraints: []roachpb.ConstraintsConjunction{ + { + Constraints: []roachpb.Constraint{ + {Value: attr, Type: roachpb.Constraint_REQUIRED}, + }, + }, + }, + } +} + // TestAllocatorRebalanceTarget could help us to verify whether we'll rebalance // to a target that we'll immediately remove. func TestAllocatorRebalanceTarget(t *testing.T) { @@ -241,6 +337,203 @@ func TestAllocatorRebalanceTarget(t *testing.T) { } } +// TestAllocatorCheckRangeActionUprelicate validates the allocator's action and +// target for a range in a basic upreplication case using the replicate queue's +// `CheckRangeAction(..)`. +func TestAllocatorCheckRangeActionUprelicate(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + stopper, g, sp, a, _ := allocatorimpl.CreateTestAllocator(ctx, 10, false /* deterministic */) + defer stopper.Stop(context.Background()) + + gossiputil.NewStoreGossiper(g).GossipStores(twoDCStores, t) + cfg := TestStoreConfig(nil) + cfg.Gossip = g + + // Ensure that there are no usages of the underlying store pool. + cfg.StorePool = nil + + firstStore := *twoDCStores[0] + s := createTestStoreWithoutStart(ctx, t, stopper, testStoreOpts{createSystemRanges: true}, &cfg) + s.Ident = &roachpb.StoreIdent{StoreID: firstStore.StoreID} + rq := newReplicateQueue(s, a) + + firstRange := &roachpb.RangeDescriptor{ + RangeID: 1, + InternalReplicas: []roachpb.ReplicaDescriptor{ + {NodeID: 2, StoreID: 2}, + }, + } + + storeIDsInB := []roachpb.StoreID{4, 5, 6} + + constrainToB3X := constrainTo(3, "b") + + // Validate that we need to upreplicate r1 to a node in "b". + action, target, err := rq.CheckRangeAction(ctx, sp, firstRange, constrainToB3X) + + require.NoError(t, err) + require.Equal(t, allocatorimpl.AllocatorAddVoter, action) + require.Contains(t, storeIDsInB, target.StoreID) + + newReplica := roachpb.ReplicaDescriptor{NodeID: target.NodeID, StoreID: target.StoreID} + firstRange.InternalReplicas = append(firstRange.InternalReplicas, newReplica) + + // Validate that we need to upreplicate r1 to another node in "b". + action, target, err = rq.CheckRangeAction(ctx, sp, firstRange, constrainToB3X) + + require.NoError(t, err) + require.Equal(t, allocatorimpl.AllocatorAddVoter, action) + require.Contains(t, storeIDsInB, target.StoreID) + + newReplica = roachpb.ReplicaDescriptor{NodeID: target.NodeID, StoreID: target.StoreID} + firstRange.InternalReplicas = append(firstRange.InternalReplicas, newReplica) + + // Determine the remaining node in "b". + var remainingStoreID roachpb.StoreID + for _, storeID := range storeIDsInB { + if !firstRange.Replicas().HasReplicaOnNode(roachpb.NodeID(storeID)) { + remainingStoreID = storeID + break + } + } + + // Validate that we need to rebalance r1 from n2 to the final node in "b". + action, target, err = rq.CheckRangeAction(ctx, sp, firstRange, constrainToB3X) + + require.NoError(t, err) + require.Equal(t, allocatorimpl.AllocatorConsiderRebalance, action) + // NB: For rebalance actions, the target is currently undetermined, but + // should be the remaining node. + require.Equal(t, roachpb.ReplicationTarget{}, target) + + // Simulate adding a replica on the remaining node in "b", without removing. + newReplica = roachpb.ReplicaDescriptor{NodeID: roachpb.NodeID(remainingStoreID), StoreID: remainingStoreID} + firstRange.InternalReplicas = append(firstRange.InternalReplicas, newReplica) + + // Validate that we need to remove r1 from the node in "a". + action, target, err = rq.CheckRangeAction(ctx, sp, firstRange, constrainToB3X) + + require.NoError(t, err) + require.Equal(t, allocatorimpl.AllocatorRemoveVoter, action) + // NB: For removal actions, the target is currently undetermined, but + // should be n2. + require.Equal(t, roachpb.ReplicationTarget{}, target) + + removeIdx := getRemoveIdx(firstRange.InternalReplicas, roachpb.ReplicaDescriptor{StoreID: 2}) + firstRange.InternalReplicas = append(firstRange.InternalReplicas[:removeIdx:removeIdx], + firstRange.InternalReplicas[removeIdx+1:]...) + + // Validate that we have no more actions on r1, except to consider rebalance. + action, target, err = rq.CheckRangeAction(ctx, sp, firstRange, constrainToB3X) + + require.NoError(t, err) + require.Equal(t, allocatorimpl.AllocatorConsiderRebalance, action) +} + +// TestAllocatorCheckRangeActionProposedDecommissionSelf validates the allocator's action and +// target for a range during a proposed (but not current) decommission using the +// replicate queue's `CheckRangeAction(..)`. +func TestAllocatorCheckRangeActionProposedDecommissionSelf(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + stopper, g, sp, a, _ := allocatorimpl.CreateTestAllocator(ctx, 10, false /* deterministic */) + defer stopper.Stop(context.Background()) + + gossiputil.NewStoreGossiper(g).GossipStores(twoDCStores, t) + cfg := TestStoreConfig(nil) + cfg.Gossip = g + + // Ensure that there are no usages of the underlying store pool. + cfg.StorePool = nil + + firstStore := *twoDCStores[0] + s := createTestStoreWithoutStart(ctx, t, stopper, testStoreOpts{createSystemRanges: true}, &cfg) + s.Ident = &roachpb.StoreIdent{StoreID: firstStore.StoreID} + rq := newReplicateQueue(s, a) + + firstRange := &roachpb.RangeDescriptor{ + RangeID: 1, + InternalReplicas: []roachpb.ReplicaDescriptor{ + {NodeID: 2, StoreID: 2}, + {NodeID: 3, StoreID: 3}, + {NodeID: 4, StoreID: 4}, + }, + } + + remainingStores := []roachpb.StoreID{5, 6} + + // Simulate n2 as decommissioning and n1 as down. + override := storepool.NewOverrideStorePool(sp, func(nid roachpb.NodeID, now time.Time, timeUntilStoreDead time.Duration) livenesspb.NodeLivenessStatus { + if nid == roachpb.NodeID(2) { + return livenesspb.NodeLivenessStatus_DECOMMISSIONING + } else if nid == roachpb.NodeID(1) { + return livenesspb.NodeLivenessStatus_DEAD + } else { + return livenesspb.NodeLivenessStatus_LIVE + } + }) + + // Validate that we need to do a decommissioning voter replacement for r1 to + // a node in "b". + action, target, err := rq.CheckRangeAction(ctx, override, firstRange, roachpb.SpanConfig{NumReplicas: 3}) + + require.NoError(t, err) + require.Equal(t, allocatorimpl.AllocatorReplaceDecommissioningVoter, action) + require.Contains(t, remainingStores, target.StoreID) + + // Validate that we'd just need to remove n2's replica if we only need one + // replica. + action, target, err = rq.CheckRangeAction(ctx, override, firstRange, roachpb.SpanConfig{NumReplicas: 1}) + + require.NoError(t, err) + require.Equal(t, allocatorimpl.AllocatorRemoveDecommissioningVoter, action) + // NB: For removal actions, the target is currently undetermined, but + // should be n2. + require.Equal(t, roachpb.ReplicationTarget{}, target) + + // Validate that we would get an error finding a target if we restrict r1 to + // only "a" nodes, since n1 is down. + constrainToA3X := constrainTo(3, "a") + action, target, err = rq.CheckRangeAction(ctx, override, firstRange, constrainToA3X) + + require.Error(t, err) + require.Equal(t, allocatorimpl.AllocatorReplaceDecommissioningVoter, action) + require.Equal(t, roachpb.ReplicationTarget{}, target) + + // Validate that any other type of replica other than voter or non-voter on + // n2 indicates that we must complete the atomic replication change prior to + // handling the decommissioning replica. + inChangeReplicaTypes := []roachpb.ReplicaType{ + roachpb.VOTER_INCOMING, roachpb.VOTER_OUTGOING, + roachpb.VOTER_DEMOTING_LEARNER, roachpb.VOTER_DEMOTING_NON_VOTER, + } + for _, replicaType := range inChangeReplicaTypes { + firstRange.InternalReplicas[0].Type = replicaType + + action, target, err = rq.CheckRangeAction(ctx, override, firstRange, roachpb.SpanConfig{NumReplicas: 3}) + require.NoError(t, err) + require.Equal(t, allocatorimpl.AllocatorFinalizeAtomicReplicationChange, action) + require.Equal(t, roachpb.ReplicationTarget{}, target) + } + + // Simulate n2's and n3's replicas of r1 as a non-voter replicas. + firstRange.InternalReplicas[0].Type = roachpb.NON_VOTER + firstRange.InternalReplicas[1].Type = roachpb.NON_VOTER + + // Validate that we'd need to replace the n2's non-voting replica if we need + // 3 replicas but only 1 voter. + action, target, err = rq.CheckRangeAction(ctx, override, firstRange, roachpb.SpanConfig{NumReplicas: 3, NumVoters: 1}) + + require.NoError(t, err) + require.Equal(t, allocatorimpl.AllocatorReplaceDecommissioningNonVoter, action) + require.Contains(t, remainingStores, target.StoreID) +} + // TestAllocatorThrottled ensures that when a store is throttled, the replica // will not be sent to purgatory. func TestAllocatorThrottled(t *testing.T) { diff --git a/pkg/kv/kvserver/replicate_queue.go b/pkg/kv/kvserver/replicate_queue.go index 5bdca9305eca..b9be6853971e 100644 --- a/pkg/kv/kvserver/replicate_queue.go +++ b/pkg/kv/kvserver/replicate_queue.go @@ -986,6 +986,217 @@ func (rq *replicateQueue) processOneChange( return rq.ShouldRequeue(ctx, change), nil } +// CheckRangeAction takes a RangeDescriptor and SpanConfig and uses the allocator, +// along with a provided StorePool (which may be distinct from the allocator's), +// to return the operation that would be required to repair the range, the +// upreplication target (if applicable), and any allocator errors encountered. +// It is similar to PlanOneChange, but range-scoped rather than replica-scoped, +// and as such cannot be used to plan a change, but can evaluate action viability. +// +// As this function does not take a replica (and in particular, a leaseholder +// replica), it cannot determine the target for actions that require state from +// a leaseholder and/or Raft leader. This includes replica rebalance, removal, +// and lease transfer. Actions that do not require the allocator to determine +// a target are treated as noops, and do not return errors. +func (rq *replicateQueue) CheckRangeAction( + ctx context.Context, + storePool storepool.AllocatorStorePool, + desc *roachpb.RangeDescriptor, + conf roachpb.SpanConfig, +) (allocatorimpl.AllocatorAction, roachpb.ReplicationTarget, error) { + // Use the store pool to evaluate the allocator action given the provided + // state (which may be overridden from the actual state). + action, _ := rq.allocator.ComputeActionWithStorePool(ctx, storePool, conf, desc) + + var err error + var noop, add, remove, replace, rebalance bool + var replicaStatus allocatorimpl.ReplicaStatus + var targetReplType allocatorimpl.TargetReplicaType + switch action { + // Noop replica actions. + case allocatorimpl.AllocatorNoop: + noop = true + case allocatorimpl.AllocatorRangeUnavailable: + noop = true + + // Add replicas. + case allocatorimpl.AllocatorAddVoter: + add = true + replicaStatus = allocatorimpl.Alive + targetReplType = allocatorimpl.VoterTarget + case allocatorimpl.AllocatorAddNonVoter: + add = true + replicaStatus = allocatorimpl.Alive + targetReplType = allocatorimpl.NonVoterTarget + + // Remove replicas. + case allocatorimpl.AllocatorRemoveVoter: + remove = true + replicaStatus = allocatorimpl.Alive + targetReplType = allocatorimpl.VoterTarget + case allocatorimpl.AllocatorRemoveNonVoter: + remove = true + replicaStatus = allocatorimpl.Alive + targetReplType = allocatorimpl.NonVoterTarget + + // Replace dead replicas. + case allocatorimpl.AllocatorReplaceDeadVoter: + replace = true + replicaStatus = allocatorimpl.Dead + targetReplType = allocatorimpl.VoterTarget + case allocatorimpl.AllocatorReplaceDeadNonVoter: + replace = true + replicaStatus = allocatorimpl.Dead + targetReplType = allocatorimpl.NonVoterTarget + + // Replace decommissioning replicas. + case allocatorimpl.AllocatorReplaceDecommissioningVoter: + replace = true + replicaStatus = allocatorimpl.Decommissioning + targetReplType = allocatorimpl.VoterTarget + case allocatorimpl.AllocatorReplaceDecommissioningNonVoter: + replace = true + replicaStatus = allocatorimpl.Decommissioning + targetReplType = allocatorimpl.NonVoterTarget + + // Remove decommissioning replicas. + case allocatorimpl.AllocatorRemoveDecommissioningVoter: + remove = true + replicaStatus = allocatorimpl.Decommissioning + targetReplType = allocatorimpl.VoterTarget + case allocatorimpl.AllocatorRemoveDecommissioningNonVoter: + remove = true + replicaStatus = allocatorimpl.Decommissioning + targetReplType = allocatorimpl.NonVoterTarget + + // Remove dead replicas. + case allocatorimpl.AllocatorRemoveDeadVoter: + remove = true + replicaStatus = allocatorimpl.Dead + targetReplType = allocatorimpl.VoterTarget + case allocatorimpl.AllocatorRemoveDeadNonVoter: + remove = true + replicaStatus = allocatorimpl.Dead + targetReplType = allocatorimpl.NonVoterTarget + + // Rebalance replicas. + case allocatorimpl.AllocatorConsiderRebalance: + rebalance = true + + // Finalize and cleanup replicas. + case allocatorimpl.AllocatorFinalizeAtomicReplicationChange, allocatorimpl.AllocatorRemoveLearner: + noop = true + default: + err = errors.Errorf("unknown allocator action %v", action) + } + + if err != nil { + return action, roachpb.ReplicationTarget{}, err + } + + if noop || remove || rebalance { + // NB: Currently, remove and rebalance actions require a leaseholder + // replica and/or Raft leader, and as such we cannot use the allocator to + // find a target using simply a range descriptor and a span config. + // + // E.g. When removing a voter, we need Raft leader state to determine + // a removal target (assuming we aren't removing dead or decommissioning + // replicas, which are self-evident), and we also need the lease to + // determine a new leaseholder, if a new leaseholder is needed after + // removal. When rebalancing, we similarly need to check leaseholder state. + // + // This means we unfortunately cannot validate that we have a valid target + // for removal, rebalance, or lease transfer (if needed). + return action, roachpb.ReplicationTarget{}, err + } + + voterReplicas := desc.Replicas().VoterDescriptors() + nonVoterReplicas := desc.Replicas().NonVoterDescriptors() + liveVoterReplicas, deadVoterReplicas := storePool.LiveAndDeadReplicas( + voterReplicas, true, /* includeSuspectAndDrainingStores */ + ) + liveNonVoterReplicas, deadNonVoterReplicas := storePool.LiveAndDeadReplicas( + nonVoterReplicas, true, /* includeSuspectAndDrainingStores */ + ) + remainingLiveVoters := liveVoterReplicas + + var target roachpb.ReplicationTarget + if replace && targetReplType == allocatorimpl.VoterTarget && len(voterReplicas) == 1 { + // If only one voter replica remains, that replica is the leaseholder and + // we won't be able to swap it out. Ignore the removal and simply add + // a replica. + replace = false + add = true + } + + if replace { + var replicas []roachpb.ReplicaDescriptor + var deadReplicas []roachpb.ReplicaDescriptor + var deadOrDecommissioningReplicas []roachpb.ReplicaDescriptor + + if targetReplType == allocatorimpl.VoterTarget { + replicas = voterReplicas + deadReplicas = deadVoterReplicas + } else if targetReplType == allocatorimpl.NonVoterTarget { + replicas = nonVoterReplicas + deadReplicas = deadNonVoterReplicas + } else { + return action, target, errors.AssertionFailedf("unexpected replica type: %v", + targetReplType) + } + + if replicaStatus == allocatorimpl.Decommissioning { + deadOrDecommissioningReplicas = storePool.DecommissioningReplicas(replicas) + } else if replicaStatus == allocatorimpl.Dead { + deadOrDecommissioningReplicas = deadReplicas + } else { + return action, target, errors.AssertionFailedf("unexpected replica status: %v", + replicaStatus) + } + + if len(deadOrDecommissioningReplicas) == 0 { + // Nothing to do. + return allocatorimpl.AllocatorNoop, target, err + } + + if targetReplType == allocatorimpl.VoterTarget { + // NB: See addOrReplaceVoters(..) vs addOrReplaceNonVoters(..). + removeIdx := getRemoveIdx(replicas, deadOrDecommissioningReplicas[0]) + replToRemove := replicas[removeIdx] + for i, r := range liveVoterReplicas { + if r.ReplicaID == replToRemove.ReplicaID { + remainingLiveVoters = append(liveVoterReplicas[:i:i], liveVoterReplicas[i+1:]...) + break + } + } + } + } else if !add { + return action, target, errors.AssertionFailedf( + "attempting to allocate target for unsupported action: %v", action, + ) + } + + if targetReplType == allocatorimpl.VoterTarget { + // NB: For the purposes of checking the range action, we do not check to + // ensure we are avoiding upreplication to fragile quorum. + // See addOrReplaceVoters(..). + target, _, err = rq.allocator.AllocateVoterWithStorePool(ctx, storePool, conf, + remainingLiveVoters, liveNonVoterReplicas, replicaStatus) + } else if targetReplType == allocatorimpl.NonVoterTarget { + target, _, err = rq.allocator.AllocateNonVoterWithStorePool(ctx, storePool, conf, + liveVoterReplicas, liveNonVoterReplicas, replicaStatus) + } else { + err = errors.AssertionFailedf("unexpected replica type: %v", targetReplType) + } + + if err == nil && target.Equal(roachpb.ReplicationTarget{}) { + return action, target, errors.AssertionFailedf( + "no replication target found for action: %v", action) + } + + return action, target, err +} + // PlanOneChange calls the allocator to determine an action to be taken upon a // range. The fn then calls back into the allocator to get the changes // necessary to act upon the action and returns them as a ReplicateQueueChange. @@ -1162,6 +1373,8 @@ func (rq *replicateQueue) PlanOneChange( canTransferLeaseFrom, scatter, ) + + // Finalize and cleanup replicas. case allocatorimpl.AllocatorFinalizeAtomicReplicationChange, allocatorimpl.AllocatorRemoveLearner: op = AllocationFinalizeAtomicReplicationOp{} default: