diff --git a/pkg/kv/kvserver/allocator_impl_test.go b/pkg/kv/kvserver/allocator_impl_test.go index 3f2a90729e84..351a063d60de 100644 --- a/pkg/kv/kvserver/allocator_impl_test.go +++ b/pkg/kv/kvserver/allocator_impl_test.go @@ -18,6 +18,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/allocatorimpl" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/storepool" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness/livenesspb" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/storage/enginepb" "github.com/cockroachdb/cockroach/pkg/testutils/gossiputil" @@ -25,6 +26,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "github.com/stretchr/testify/require" "go.etcd.io/etcd/raft/v3" "go.etcd.io/etcd/raft/v3/tracker" ) @@ -59,6 +61,100 @@ var singleStore = []*roachpb.StoreDescriptor{ }, } +var twoDCStores = []*roachpb.StoreDescriptor{ + { + StoreID: 1, + Attrs: roachpb.Attributes{Attrs: []string{"ssd"}}, + Node: roachpb.NodeDescriptor{ + NodeID: 1, + Attrs: roachpb.Attributes{Attrs: []string{"a"}}, + }, + Capacity: roachpb.StoreCapacity{ + Capacity: 200, + Available: 100, + LogicalBytes: 100, + }, + }, + { + StoreID: 2, + Attrs: roachpb.Attributes{Attrs: []string{"ssd"}}, + Node: roachpb.NodeDescriptor{ + NodeID: 2, + Attrs: roachpb.Attributes{Attrs: []string{"a"}}, + }, + Capacity: roachpb.StoreCapacity{ + Capacity: 200, + Available: 100, + LogicalBytes: 100, + }, + }, + { + StoreID: 3, + Attrs: roachpb.Attributes{Attrs: []string{"ssd"}}, + Node: roachpb.NodeDescriptor{ + NodeID: 3, + Attrs: roachpb.Attributes{Attrs: []string{"a"}}, + }, + Capacity: roachpb.StoreCapacity{ + Capacity: 200, + Available: 100, + LogicalBytes: 100, + }, + }, + { + StoreID: 4, + Attrs: roachpb.Attributes{Attrs: []string{"ssd"}}, + Node: roachpb.NodeDescriptor{ + NodeID: 4, + Attrs: roachpb.Attributes{Attrs: []string{"b"}}, + }, + Capacity: roachpb.StoreCapacity{ + Capacity: 200, + Available: 100, + LogicalBytes: 100, + }, + }, + { + StoreID: 5, + Attrs: roachpb.Attributes{Attrs: []string{"ssd"}}, + Node: roachpb.NodeDescriptor{ + NodeID: 5, + Attrs: roachpb.Attributes{Attrs: []string{"b"}}, + }, + Capacity: roachpb.StoreCapacity{ + Capacity: 200, + Available: 100, + LogicalBytes: 100, + }, + }, + { + StoreID: 6, + Attrs: roachpb.Attributes{Attrs: []string{"ssd"}}, + Node: roachpb.NodeDescriptor{ + NodeID: 6, + Attrs: roachpb.Attributes{Attrs: []string{"b"}}, + }, + Capacity: roachpb.StoreCapacity{ + Capacity: 200, + Available: 100, + LogicalBytes: 100, + }, + }, +} + +func constrainTo(numReplicas int, attr string) roachpb.SpanConfig { + return roachpb.SpanConfig{ + NumReplicas: int32(numReplicas), + Constraints: []roachpb.ConstraintsConjunction{ + { + Constraints: []roachpb.Constraint{ + {Value: attr, Type: roachpb.Constraint_REQUIRED}, + }, + }, + }, + } +} + // TestAllocatorRebalanceTarget could help us to verify whether we'll rebalance // to a target that we'll immediately remove. func TestAllocatorRebalanceTarget(t *testing.T) { @@ -241,6 +337,203 @@ func TestAllocatorRebalanceTarget(t *testing.T) { } } +// TestAllocatorCheckRangeActionUprelicate validates the allocator's action and +// target for a range in a basic upreplication case using the replicate queue's +// `CheckRangeAction(..)`. +func TestAllocatorCheckRangeActionUprelicate(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + stopper, g, sp, a, _ := allocatorimpl.CreateTestAllocator(ctx, 10, false /* deterministic */) + defer stopper.Stop(context.Background()) + + gossiputil.NewStoreGossiper(g).GossipStores(twoDCStores, t) + cfg := TestStoreConfig(nil) + cfg.Gossip = g + + // Ensure that there are no usages of the underlying store pool. + cfg.StorePool = nil + + firstStore := *twoDCStores[0] + s := createTestStoreWithoutStart(ctx, t, stopper, testStoreOpts{createSystemRanges: true}, &cfg) + s.Ident = &roachpb.StoreIdent{StoreID: firstStore.StoreID} + rq := newReplicateQueue(s, a) + + firstRange := &roachpb.RangeDescriptor{ + RangeID: 1, + InternalReplicas: []roachpb.ReplicaDescriptor{ + {NodeID: 2, StoreID: 2}, + }, + } + + storeIDsInB := []roachpb.StoreID{4, 5, 6} + + constrainToB3X := constrainTo(3, "b") + + // Validate that we need to upreplicate r1 to a node in "b". + action, target, err := rq.CheckRangeAction(ctx, sp, firstRange, constrainToB3X) + + require.NoError(t, err) + require.Equal(t, allocatorimpl.AllocatorAddVoter, action) + require.Contains(t, storeIDsInB, target.StoreID) + + newReplica := roachpb.ReplicaDescriptor{NodeID: target.NodeID, StoreID: target.StoreID} + firstRange.InternalReplicas = append(firstRange.InternalReplicas, newReplica) + + // Validate that we need to upreplicate r1 to another node in "b". + action, target, err = rq.CheckRangeAction(ctx, sp, firstRange, constrainToB3X) + + require.NoError(t, err) + require.Equal(t, allocatorimpl.AllocatorAddVoter, action) + require.Contains(t, storeIDsInB, target.StoreID) + + newReplica = roachpb.ReplicaDescriptor{NodeID: target.NodeID, StoreID: target.StoreID} + firstRange.InternalReplicas = append(firstRange.InternalReplicas, newReplica) + + // Determine the remaining node in "b". + var remainingStoreID roachpb.StoreID + for _, storeID := range storeIDsInB { + if !firstRange.Replicas().HasReplicaOnNode(roachpb.NodeID(storeID)) { + remainingStoreID = storeID + break + } + } + + // Validate that we need to rebalance r1 from n2 to the final node in "b". + action, target, err = rq.CheckRangeAction(ctx, sp, firstRange, constrainToB3X) + + require.NoError(t, err) + require.Equal(t, allocatorimpl.AllocatorConsiderRebalance, action) + // NB: For rebalance actions, the target is currently undetermined, but + // should be the remaining node. + require.Equal(t, roachpb.ReplicationTarget{}, target) + + // Simulate adding a replica on the remaining node in "b", without removing. + newReplica = roachpb.ReplicaDescriptor{NodeID: roachpb.NodeID(remainingStoreID), StoreID: remainingStoreID} + firstRange.InternalReplicas = append(firstRange.InternalReplicas, newReplica) + + // Validate that we need to remove r1 from the node in "a". + action, target, err = rq.CheckRangeAction(ctx, sp, firstRange, constrainToB3X) + + require.NoError(t, err) + require.Equal(t, allocatorimpl.AllocatorRemoveVoter, action) + // NB: For removal actions, the target is currently undetermined, but + // should be n2. + require.Equal(t, roachpb.ReplicationTarget{}, target) + + removeIdx := getRemoveIdx(firstRange.InternalReplicas, roachpb.ReplicaDescriptor{StoreID: 2}) + firstRange.InternalReplicas = append(firstRange.InternalReplicas[:removeIdx:removeIdx], + firstRange.InternalReplicas[removeIdx+1:]...) + + // Validate that we have no more actions on r1, except to consider rebalance. + action, target, err = rq.CheckRangeAction(ctx, sp, firstRange, constrainToB3X) + + require.NoError(t, err) + require.Equal(t, allocatorimpl.AllocatorConsiderRebalance, action) +} + +// TestAllocatorCheckRangeActionProposedDecommissionSelf validates the allocator's action and +// target for a range during a proposed (but not current) decommission using the +// replicate queue's `CheckRangeAction(..)`. +func TestAllocatorCheckRangeActionProposedDecommissionSelf(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + stopper, g, sp, a, _ := allocatorimpl.CreateTestAllocator(ctx, 10, false /* deterministic */) + defer stopper.Stop(context.Background()) + + gossiputil.NewStoreGossiper(g).GossipStores(twoDCStores, t) + cfg := TestStoreConfig(nil) + cfg.Gossip = g + + // Ensure that there are no usages of the underlying store pool. + cfg.StorePool = nil + + firstStore := *twoDCStores[0] + s := createTestStoreWithoutStart(ctx, t, stopper, testStoreOpts{createSystemRanges: true}, &cfg) + s.Ident = &roachpb.StoreIdent{StoreID: firstStore.StoreID} + rq := newReplicateQueue(s, a) + + firstRange := &roachpb.RangeDescriptor{ + RangeID: 1, + InternalReplicas: []roachpb.ReplicaDescriptor{ + {NodeID: 2, StoreID: 2}, + {NodeID: 3, StoreID: 3}, + {NodeID: 4, StoreID: 4}, + }, + } + + remainingStores := []roachpb.StoreID{5, 6} + + // Simulate n2 as decommissioning and n1 as down. + override := storepool.NewOverrideStorePool(sp, func(nid roachpb.NodeID, now time.Time, timeUntilStoreDead time.Duration) livenesspb.NodeLivenessStatus { + if nid == roachpb.NodeID(2) { + return livenesspb.NodeLivenessStatus_DECOMMISSIONING + } else if nid == roachpb.NodeID(1) { + return livenesspb.NodeLivenessStatus_DEAD + } else { + return livenesspb.NodeLivenessStatus_LIVE + } + }) + + // Validate that we need to do a decommissioning voter replacement for r1 to + // a node in "b". + action, target, err := rq.CheckRangeAction(ctx, override, firstRange, roachpb.SpanConfig{NumReplicas: 3}) + + require.NoError(t, err) + require.Equal(t, allocatorimpl.AllocatorReplaceDecommissioningVoter, action) + require.Contains(t, remainingStores, target.StoreID) + + // Validate that we'd just need to remove n2's replica if we only need one + // replica. + action, target, err = rq.CheckRangeAction(ctx, override, firstRange, roachpb.SpanConfig{NumReplicas: 1}) + + require.NoError(t, err) + require.Equal(t, allocatorimpl.AllocatorRemoveDecommissioningVoter, action) + // NB: For removal actions, the target is currently undetermined, but + // should be n2. + require.Equal(t, roachpb.ReplicationTarget{}, target) + + // Validate that we would get an error finding a target if we restrict r1 to + // only "a" nodes, since n1 is down. + constrainToA3X := constrainTo(3, "a") + action, target, err = rq.CheckRangeAction(ctx, override, firstRange, constrainToA3X) + + require.Error(t, err) + require.Equal(t, allocatorimpl.AllocatorReplaceDecommissioningVoter, action) + require.Equal(t, roachpb.ReplicationTarget{}, target) + + // Validate that any other type of replica other than voter or non-voter on + // n2 indicates that we must complete the atomic replication change prior to + // handling the decommissioning replica. + inChangeReplicaTypes := []roachpb.ReplicaType{ + roachpb.VOTER_INCOMING, roachpb.VOTER_OUTGOING, + roachpb.VOTER_DEMOTING_LEARNER, roachpb.VOTER_DEMOTING_NON_VOTER, + } + for _, replicaType := range inChangeReplicaTypes { + firstRange.InternalReplicas[0].Type = replicaType + + action, target, err = rq.CheckRangeAction(ctx, override, firstRange, roachpb.SpanConfig{NumReplicas: 3}) + require.NoError(t, err) + require.Equal(t, allocatorimpl.AllocatorFinalizeAtomicReplicationChange, action) + require.Equal(t, roachpb.ReplicationTarget{}, target) + } + + // Simulate n2's and n3's replicas of r1 as a non-voter replicas. + firstRange.InternalReplicas[0].Type = roachpb.NON_VOTER + firstRange.InternalReplicas[1].Type = roachpb.NON_VOTER + + // Validate that we'd need to replace the n2's non-voting replica if we need + // 3 replicas but only 1 voter. + action, target, err = rq.CheckRangeAction(ctx, override, firstRange, roachpb.SpanConfig{NumReplicas: 3, NumVoters: 1}) + + require.NoError(t, err) + require.Equal(t, allocatorimpl.AllocatorReplaceDecommissioningNonVoter, action) + require.Contains(t, remainingStores, target.StoreID) +} + // TestAllocatorThrottled ensures that when a store is throttled, the replica // will not be sent to purgatory. func TestAllocatorThrottled(t *testing.T) { diff --git a/pkg/kv/kvserver/replicate_queue.go b/pkg/kv/kvserver/replicate_queue.go index 5bdca9305eca..b9be6853971e 100644 --- a/pkg/kv/kvserver/replicate_queue.go +++ b/pkg/kv/kvserver/replicate_queue.go @@ -986,6 +986,217 @@ func (rq *replicateQueue) processOneChange( return rq.ShouldRequeue(ctx, change), nil } +// CheckRangeAction takes a RangeDescriptor and SpanConfig and uses the allocator, +// along with a provided StorePool (which may be distinct from the allocator's), +// to return the operation that would be required to repair the range, the +// upreplication target (if applicable), and any allocator errors encountered. +// It is similar to PlanOneChange, but range-scoped rather than replica-scoped, +// and as such cannot be used to plan a change, but can evaluate action viability. +// +// As this function does not take a replica (and in particular, a leaseholder +// replica), it cannot determine the target for actions that require state from +// a leaseholder and/or Raft leader. This includes replica rebalance, removal, +// and lease transfer. Actions that do not require the allocator to determine +// a target are treated as noops, and do not return errors. +func (rq *replicateQueue) CheckRangeAction( + ctx context.Context, + storePool storepool.AllocatorStorePool, + desc *roachpb.RangeDescriptor, + conf roachpb.SpanConfig, +) (allocatorimpl.AllocatorAction, roachpb.ReplicationTarget, error) { + // Use the store pool to evaluate the allocator action given the provided + // state (which may be overridden from the actual state). + action, _ := rq.allocator.ComputeActionWithStorePool(ctx, storePool, conf, desc) + + var err error + var noop, add, remove, replace, rebalance bool + var replicaStatus allocatorimpl.ReplicaStatus + var targetReplType allocatorimpl.TargetReplicaType + switch action { + // Noop replica actions. + case allocatorimpl.AllocatorNoop: + noop = true + case allocatorimpl.AllocatorRangeUnavailable: + noop = true + + // Add replicas. + case allocatorimpl.AllocatorAddVoter: + add = true + replicaStatus = allocatorimpl.Alive + targetReplType = allocatorimpl.VoterTarget + case allocatorimpl.AllocatorAddNonVoter: + add = true + replicaStatus = allocatorimpl.Alive + targetReplType = allocatorimpl.NonVoterTarget + + // Remove replicas. + case allocatorimpl.AllocatorRemoveVoter: + remove = true + replicaStatus = allocatorimpl.Alive + targetReplType = allocatorimpl.VoterTarget + case allocatorimpl.AllocatorRemoveNonVoter: + remove = true + replicaStatus = allocatorimpl.Alive + targetReplType = allocatorimpl.NonVoterTarget + + // Replace dead replicas. + case allocatorimpl.AllocatorReplaceDeadVoter: + replace = true + replicaStatus = allocatorimpl.Dead + targetReplType = allocatorimpl.VoterTarget + case allocatorimpl.AllocatorReplaceDeadNonVoter: + replace = true + replicaStatus = allocatorimpl.Dead + targetReplType = allocatorimpl.NonVoterTarget + + // Replace decommissioning replicas. + case allocatorimpl.AllocatorReplaceDecommissioningVoter: + replace = true + replicaStatus = allocatorimpl.Decommissioning + targetReplType = allocatorimpl.VoterTarget + case allocatorimpl.AllocatorReplaceDecommissioningNonVoter: + replace = true + replicaStatus = allocatorimpl.Decommissioning + targetReplType = allocatorimpl.NonVoterTarget + + // Remove decommissioning replicas. + case allocatorimpl.AllocatorRemoveDecommissioningVoter: + remove = true + replicaStatus = allocatorimpl.Decommissioning + targetReplType = allocatorimpl.VoterTarget + case allocatorimpl.AllocatorRemoveDecommissioningNonVoter: + remove = true + replicaStatus = allocatorimpl.Decommissioning + targetReplType = allocatorimpl.NonVoterTarget + + // Remove dead replicas. + case allocatorimpl.AllocatorRemoveDeadVoter: + remove = true + replicaStatus = allocatorimpl.Dead + targetReplType = allocatorimpl.VoterTarget + case allocatorimpl.AllocatorRemoveDeadNonVoter: + remove = true + replicaStatus = allocatorimpl.Dead + targetReplType = allocatorimpl.NonVoterTarget + + // Rebalance replicas. + case allocatorimpl.AllocatorConsiderRebalance: + rebalance = true + + // Finalize and cleanup replicas. + case allocatorimpl.AllocatorFinalizeAtomicReplicationChange, allocatorimpl.AllocatorRemoveLearner: + noop = true + default: + err = errors.Errorf("unknown allocator action %v", action) + } + + if err != nil { + return action, roachpb.ReplicationTarget{}, err + } + + if noop || remove || rebalance { + // NB: Currently, remove and rebalance actions require a leaseholder + // replica and/or Raft leader, and as such we cannot use the allocator to + // find a target using simply a range descriptor and a span config. + // + // E.g. When removing a voter, we need Raft leader state to determine + // a removal target (assuming we aren't removing dead or decommissioning + // replicas, which are self-evident), and we also need the lease to + // determine a new leaseholder, if a new leaseholder is needed after + // removal. When rebalancing, we similarly need to check leaseholder state. + // + // This means we unfortunately cannot validate that we have a valid target + // for removal, rebalance, or lease transfer (if needed). + return action, roachpb.ReplicationTarget{}, err + } + + voterReplicas := desc.Replicas().VoterDescriptors() + nonVoterReplicas := desc.Replicas().NonVoterDescriptors() + liveVoterReplicas, deadVoterReplicas := storePool.LiveAndDeadReplicas( + voterReplicas, true, /* includeSuspectAndDrainingStores */ + ) + liveNonVoterReplicas, deadNonVoterReplicas := storePool.LiveAndDeadReplicas( + nonVoterReplicas, true, /* includeSuspectAndDrainingStores */ + ) + remainingLiveVoters := liveVoterReplicas + + var target roachpb.ReplicationTarget + if replace && targetReplType == allocatorimpl.VoterTarget && len(voterReplicas) == 1 { + // If only one voter replica remains, that replica is the leaseholder and + // we won't be able to swap it out. Ignore the removal and simply add + // a replica. + replace = false + add = true + } + + if replace { + var replicas []roachpb.ReplicaDescriptor + var deadReplicas []roachpb.ReplicaDescriptor + var deadOrDecommissioningReplicas []roachpb.ReplicaDescriptor + + if targetReplType == allocatorimpl.VoterTarget { + replicas = voterReplicas + deadReplicas = deadVoterReplicas + } else if targetReplType == allocatorimpl.NonVoterTarget { + replicas = nonVoterReplicas + deadReplicas = deadNonVoterReplicas + } else { + return action, target, errors.AssertionFailedf("unexpected replica type: %v", + targetReplType) + } + + if replicaStatus == allocatorimpl.Decommissioning { + deadOrDecommissioningReplicas = storePool.DecommissioningReplicas(replicas) + } else if replicaStatus == allocatorimpl.Dead { + deadOrDecommissioningReplicas = deadReplicas + } else { + return action, target, errors.AssertionFailedf("unexpected replica status: %v", + replicaStatus) + } + + if len(deadOrDecommissioningReplicas) == 0 { + // Nothing to do. + return allocatorimpl.AllocatorNoop, target, err + } + + if targetReplType == allocatorimpl.VoterTarget { + // NB: See addOrReplaceVoters(..) vs addOrReplaceNonVoters(..). + removeIdx := getRemoveIdx(replicas, deadOrDecommissioningReplicas[0]) + replToRemove := replicas[removeIdx] + for i, r := range liveVoterReplicas { + if r.ReplicaID == replToRemove.ReplicaID { + remainingLiveVoters = append(liveVoterReplicas[:i:i], liveVoterReplicas[i+1:]...) + break + } + } + } + } else if !add { + return action, target, errors.AssertionFailedf( + "attempting to allocate target for unsupported action: %v", action, + ) + } + + if targetReplType == allocatorimpl.VoterTarget { + // NB: For the purposes of checking the range action, we do not check to + // ensure we are avoiding upreplication to fragile quorum. + // See addOrReplaceVoters(..). + target, _, err = rq.allocator.AllocateVoterWithStorePool(ctx, storePool, conf, + remainingLiveVoters, liveNonVoterReplicas, replicaStatus) + } else if targetReplType == allocatorimpl.NonVoterTarget { + target, _, err = rq.allocator.AllocateNonVoterWithStorePool(ctx, storePool, conf, + liveVoterReplicas, liveNonVoterReplicas, replicaStatus) + } else { + err = errors.AssertionFailedf("unexpected replica type: %v", targetReplType) + } + + if err == nil && target.Equal(roachpb.ReplicationTarget{}) { + return action, target, errors.AssertionFailedf( + "no replication target found for action: %v", action) + } + + return action, target, err +} + // PlanOneChange calls the allocator to determine an action to be taken upon a // range. The fn then calls back into the allocator to get the changes // necessary to act upon the action and returns them as a ReplicateQueueChange. @@ -1162,6 +1373,8 @@ func (rq *replicateQueue) PlanOneChange( canTransferLeaseFrom, scatter, ) + + // Finalize and cleanup replicas. case allocatorimpl.AllocatorFinalizeAtomicReplicationChange, allocatorimpl.AllocatorRemoveLearner: op = AllocationFinalizeAtomicReplicationOp{} default: