Skip to content

Commit

Permalink
kvserver: support checking allocator action and target by range
Browse files Browse the repository at this point in the history
Adds support for using the allocator to compute the action, if any,
needed to "repair" the range and, if the action requires a replica
addition (including replacements), the target of that addition. This can
be checked by using the new `CheckRangeAction(..)` function as part of a
store's replicate queue, and can use the default store pool or an
override store pool, so that potential states can be evaluated prior to
transition to those states. As such, this feature adds support for
allocator action and target validation prior to decommission, in order
to support decommission pre-checks.

While this is similar to the replicate queue's `PlanOneChange(..)`, this
new check supports evaluation based on a range descriptor, rather than
an individual replica.

Depends on cockroachdb#91941

Part of cockroachdb#91570.

Release note: None
  • Loading branch information
AlexTalks committed Dec 15, 2022
1 parent 758b541 commit 46f13fb
Show file tree
Hide file tree
Showing 2 changed files with 506 additions and 0 deletions.
293 changes: 293 additions & 0 deletions pkg/kv/kvserver/allocator_impl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,15 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/allocatorimpl"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/storepool"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness/livenesspb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
"github.com/cockroachdb/cockroach/pkg/testutils/gossiputil"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/stretchr/testify/require"
"go.etcd.io/etcd/raft/v3"
"go.etcd.io/etcd/raft/v3/tracker"
)
Expand Down Expand Up @@ -59,6 +61,100 @@ var singleStore = []*roachpb.StoreDescriptor{
},
}

var twoDCStores = []*roachpb.StoreDescriptor{
{
StoreID: 1,
Attrs: roachpb.Attributes{Attrs: []string{"ssd"}},
Node: roachpb.NodeDescriptor{
NodeID: 1,
Attrs: roachpb.Attributes{Attrs: []string{"a"}},
},
Capacity: roachpb.StoreCapacity{
Capacity: 200,
Available: 100,
LogicalBytes: 100,
},
},
{
StoreID: 2,
Attrs: roachpb.Attributes{Attrs: []string{"ssd"}},
Node: roachpb.NodeDescriptor{
NodeID: 2,
Attrs: roachpb.Attributes{Attrs: []string{"a"}},
},
Capacity: roachpb.StoreCapacity{
Capacity: 200,
Available: 100,
LogicalBytes: 100,
},
},
{
StoreID: 3,
Attrs: roachpb.Attributes{Attrs: []string{"ssd"}},
Node: roachpb.NodeDescriptor{
NodeID: 3,
Attrs: roachpb.Attributes{Attrs: []string{"a"}},
},
Capacity: roachpb.StoreCapacity{
Capacity: 200,
Available: 100,
LogicalBytes: 100,
},
},
{
StoreID: 4,
Attrs: roachpb.Attributes{Attrs: []string{"ssd"}},
Node: roachpb.NodeDescriptor{
NodeID: 4,
Attrs: roachpb.Attributes{Attrs: []string{"b"}},
},
Capacity: roachpb.StoreCapacity{
Capacity: 200,
Available: 100,
LogicalBytes: 100,
},
},
{
StoreID: 5,
Attrs: roachpb.Attributes{Attrs: []string{"ssd"}},
Node: roachpb.NodeDescriptor{
NodeID: 5,
Attrs: roachpb.Attributes{Attrs: []string{"b"}},
},
Capacity: roachpb.StoreCapacity{
Capacity: 200,
Available: 100,
LogicalBytes: 100,
},
},
{
StoreID: 6,
Attrs: roachpb.Attributes{Attrs: []string{"ssd"}},
Node: roachpb.NodeDescriptor{
NodeID: 6,
Attrs: roachpb.Attributes{Attrs: []string{"b"}},
},
Capacity: roachpb.StoreCapacity{
Capacity: 200,
Available: 100,
LogicalBytes: 100,
},
},
}

func constrainTo(numReplicas int, attr string) roachpb.SpanConfig {
return roachpb.SpanConfig{
NumReplicas: int32(numReplicas),
Constraints: []roachpb.ConstraintsConjunction{
{
Constraints: []roachpb.Constraint{
{Value: attr, Type: roachpb.Constraint_REQUIRED},
},
},
},
}
}

// TestAllocatorRebalanceTarget could help us to verify whether we'll rebalance
// to a target that we'll immediately remove.
func TestAllocatorRebalanceTarget(t *testing.T) {
Expand Down Expand Up @@ -241,6 +337,203 @@ func TestAllocatorRebalanceTarget(t *testing.T) {
}
}

// TestAllocatorCheckRangeActionUprelicate validates the allocator's action and
// target for a range in a basic upreplication case using the replicate queue's
// `CheckRangeAction(..)`.
func TestAllocatorCheckRangeActionUprelicate(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()
stopper, g, sp, a, _ := allocatorimpl.CreateTestAllocator(ctx, 10, false /* deterministic */)
defer stopper.Stop(context.Background())

gossiputil.NewStoreGossiper(g).GossipStores(twoDCStores, t)
cfg := TestStoreConfig(nil)
cfg.Gossip = g

// Ensure that there are no usages of the underlying store pool.
cfg.StorePool = nil

firstStore := *twoDCStores[0]
s := createTestStoreWithoutStart(ctx, t, stopper, testStoreOpts{createSystemRanges: true}, &cfg)
s.Ident = &roachpb.StoreIdent{StoreID: firstStore.StoreID}
rq := newReplicateQueue(s, a)

firstRange := &roachpb.RangeDescriptor{
RangeID: 1,
InternalReplicas: []roachpb.ReplicaDescriptor{
{NodeID: 2, StoreID: 2},
},
}

storeIDsInB := []roachpb.StoreID{4, 5, 6}

constrainToB3X := constrainTo(3, "b")

// Validate that we need to upreplicate r1 to a node in "b".
action, target, err := rq.CheckRangeAction(ctx, sp, firstRange, constrainToB3X)

require.NoError(t, err)
require.Equal(t, allocatorimpl.AllocatorAddVoter, action)
require.Contains(t, storeIDsInB, target.StoreID)

newReplica := roachpb.ReplicaDescriptor{NodeID: target.NodeID, StoreID: target.StoreID}
firstRange.InternalReplicas = append(firstRange.InternalReplicas, newReplica)

// Validate that we need to upreplicate r1 to another node in "b".
action, target, err = rq.CheckRangeAction(ctx, sp, firstRange, constrainToB3X)

require.NoError(t, err)
require.Equal(t, allocatorimpl.AllocatorAddVoter, action)
require.Contains(t, storeIDsInB, target.StoreID)

newReplica = roachpb.ReplicaDescriptor{NodeID: target.NodeID, StoreID: target.StoreID}
firstRange.InternalReplicas = append(firstRange.InternalReplicas, newReplica)

// Determine the remaining node in "b".
var remainingStoreID roachpb.StoreID
for _, storeID := range storeIDsInB {
if !firstRange.Replicas().HasReplicaOnNode(roachpb.NodeID(storeID)) {
remainingStoreID = storeID
break
}
}

// Validate that we need to rebalance r1 from n2 to the final node in "b".
action, target, err = rq.CheckRangeAction(ctx, sp, firstRange, constrainToB3X)

require.NoError(t, err)
require.Equal(t, allocatorimpl.AllocatorConsiderRebalance, action)
// NB: For rebalance actions, the target is currently undetermined, but
// should be the remaining node.
require.Equal(t, roachpb.ReplicationTarget{}, target)

// Simulate adding a replica on the remaining node in "b", without removing.
newReplica = roachpb.ReplicaDescriptor{NodeID: roachpb.NodeID(remainingStoreID), StoreID: remainingStoreID}
firstRange.InternalReplicas = append(firstRange.InternalReplicas, newReplica)

// Validate that we need to remove r1 from the node in "a".
action, target, err = rq.CheckRangeAction(ctx, sp, firstRange, constrainToB3X)

require.NoError(t, err)
require.Equal(t, allocatorimpl.AllocatorRemoveVoter, action)
// NB: For removal actions, the target is currently undetermined, but
// should be n2.
require.Equal(t, roachpb.ReplicationTarget{}, target)

removeIdx := getRemoveIdx(firstRange.InternalReplicas, roachpb.ReplicaDescriptor{StoreID: 2})
firstRange.InternalReplicas = append(firstRange.InternalReplicas[:removeIdx:removeIdx],
firstRange.InternalReplicas[removeIdx+1:]...)

// Validate that we have no more actions on r1, except to consider rebalance.
action, target, err = rq.CheckRangeAction(ctx, sp, firstRange, constrainToB3X)

require.NoError(t, err)
require.Equal(t, allocatorimpl.AllocatorConsiderRebalance, action)
}

// TestAllocatorCheckRangeActionProposedDecommissionSelf validates the allocator's action and
// target for a range during a proposed (but not current) decommission using the
// replicate queue's `CheckRangeAction(..)`.
func TestAllocatorCheckRangeActionProposedDecommissionSelf(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()
stopper, g, sp, a, _ := allocatorimpl.CreateTestAllocator(ctx, 10, false /* deterministic */)
defer stopper.Stop(context.Background())

gossiputil.NewStoreGossiper(g).GossipStores(twoDCStores, t)
cfg := TestStoreConfig(nil)
cfg.Gossip = g

// Ensure that there are no usages of the underlying store pool.
cfg.StorePool = nil

firstStore := *twoDCStores[0]
s := createTestStoreWithoutStart(ctx, t, stopper, testStoreOpts{createSystemRanges: true}, &cfg)
s.Ident = &roachpb.StoreIdent{StoreID: firstStore.StoreID}
rq := newReplicateQueue(s, a)

firstRange := &roachpb.RangeDescriptor{
RangeID: 1,
InternalReplicas: []roachpb.ReplicaDescriptor{
{NodeID: 2, StoreID: 2},
{NodeID: 3, StoreID: 3},
{NodeID: 4, StoreID: 4},
},
}

remainingStores := []roachpb.StoreID{5, 6}

// Simulate n2 as decommissioning and n1 as down.
override := storepool.NewOverrideStorePool(sp, func(nid roachpb.NodeID, now time.Time, timeUntilStoreDead time.Duration) livenesspb.NodeLivenessStatus {
if nid == roachpb.NodeID(2) {
return livenesspb.NodeLivenessStatus_DECOMMISSIONING
} else if nid == roachpb.NodeID(1) {
return livenesspb.NodeLivenessStatus_DEAD
} else {
return livenesspb.NodeLivenessStatus_LIVE
}
})

// Validate that we need to do a decommissioning voter replacement for r1 to
// a node in "b".
action, target, err := rq.CheckRangeAction(ctx, override, firstRange, roachpb.SpanConfig{NumReplicas: 3})

require.NoError(t, err)
require.Equal(t, allocatorimpl.AllocatorReplaceDecommissioningVoter, action)
require.Contains(t, remainingStores, target.StoreID)

// Validate that we'd just need to remove n2's replica if we only need one
// replica.
action, target, err = rq.CheckRangeAction(ctx, override, firstRange, roachpb.SpanConfig{NumReplicas: 1})

require.NoError(t, err)
require.Equal(t, allocatorimpl.AllocatorRemoveDecommissioningVoter, action)
// NB: For removal actions, the target is currently undetermined, but
// should be n2.
require.Equal(t, roachpb.ReplicationTarget{}, target)

// Validate that we would get an error finding a target if we restrict r1 to
// only "a" nodes, since n1 is down.
constrainToA3X := constrainTo(3, "a")
action, target, err = rq.CheckRangeAction(ctx, override, firstRange, constrainToA3X)

require.Error(t, err)
require.Equal(t, allocatorimpl.AllocatorReplaceDecommissioningVoter, action)
require.Equal(t, roachpb.ReplicationTarget{}, target)

// Validate that any other type of replica other than voter or non-voter on
// n2 indicates that we must complete the atomic replication change prior to
// handling the decommissioning replica.
inChangeReplicaTypes := []roachpb.ReplicaType{
roachpb.VOTER_INCOMING, roachpb.VOTER_OUTGOING,
roachpb.VOTER_DEMOTING_LEARNER, roachpb.VOTER_DEMOTING_NON_VOTER,
}
for _, replicaType := range inChangeReplicaTypes {
firstRange.InternalReplicas[0].Type = replicaType

action, target, err = rq.CheckRangeAction(ctx, override, firstRange, roachpb.SpanConfig{NumReplicas: 3})
require.NoError(t, err)
require.Equal(t, allocatorimpl.AllocatorFinalizeAtomicReplicationChange, action)
require.Equal(t, roachpb.ReplicationTarget{}, target)
}

// Simulate n2's and n3's replicas of r1 as a non-voter replicas.
firstRange.InternalReplicas[0].Type = roachpb.NON_VOTER
firstRange.InternalReplicas[1].Type = roachpb.NON_VOTER

// Validate that we'd need to replace the n2's non-voting replica if we need
// 3 replicas but only 1 voter.
action, target, err = rq.CheckRangeAction(ctx, override, firstRange, roachpb.SpanConfig{NumReplicas: 3, NumVoters: 1})

require.NoError(t, err)
require.Equal(t, allocatorimpl.AllocatorReplaceDecommissioningNonVoter, action)
require.Contains(t, remainingStores, target.StoreID)
}

// TestAllocatorThrottled ensures that when a store is throttled, the replica
// will not be sent to purgatory.
func TestAllocatorThrottled(t *testing.T) {
Expand Down
Loading

0 comments on commit 46f13fb

Please sign in to comment.