Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kvserver: refactor replicate queue to enable allocator code reuse #94114

Merged
merged 2 commits into from
Jan 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
274 changes: 271 additions & 3 deletions pkg/kv/kvserver/allocator/allocatorimpl/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,73 @@ const (
AllocatorFinalizeAtomicReplicationChange
)

// Add indicates an action adding a replica.
func (a AllocatorAction) Add() bool {
return a == AllocatorAddVoter || a == AllocatorAddNonVoter
}

// Replace indicates an action replacing a dead or decommissioning replica.
func (a AllocatorAction) Replace() bool {
return a == AllocatorReplaceDeadVoter ||
a == AllocatorReplaceDeadNonVoter ||
a == AllocatorReplaceDecommissioningVoter ||
a == AllocatorReplaceDecommissioningNonVoter
}

// Remove indicates an action removing a replica, i.e. in overreplication cases.
func (a AllocatorAction) Remove() bool {
return a == AllocatorRemoveVoter ||
a == AllocatorRemoveNonVoter ||
a == AllocatorRemoveDeadVoter ||
a == AllocatorRemoveDeadNonVoter ||
a == AllocatorRemoveDecommissioningVoter ||
a == AllocatorRemoveDecommissioningNonVoter
}

// TargetReplicaType returns that the action is for a voter or non-voter replica.
func (a AllocatorAction) TargetReplicaType() TargetReplicaType {
var t TargetReplicaType
if a == AllocatorRemoveVoter ||
a == AllocatorAddVoter ||
a == AllocatorReplaceDeadVoter ||
a == AllocatorRemoveDeadVoter ||
a == AllocatorReplaceDecommissioningVoter ||
a == AllocatorRemoveDecommissioningVoter {
t = VoterTarget
} else if a == AllocatorRemoveNonVoter ||
a == AllocatorAddNonVoter ||
a == AllocatorReplaceDeadNonVoter ||
a == AllocatorRemoveDeadNonVoter ||
a == AllocatorReplaceDecommissioningNonVoter ||
a == AllocatorRemoveDecommissioningNonVoter {
t = NonVoterTarget
}
return t
}

// ReplicaStatus returns that the action is due to a live, dead, or
// decommissioning replica.
func (a AllocatorAction) ReplicaStatus() ReplicaStatus {
var s ReplicaStatus
if a == AllocatorRemoveVoter ||
a == AllocatorRemoveNonVoter ||
a == AllocatorAddVoter ||
a == AllocatorAddNonVoter {
s = Alive
} else if a == AllocatorReplaceDeadVoter ||
a == AllocatorReplaceDeadNonVoter ||
a == AllocatorRemoveDeadVoter ||
a == AllocatorRemoveDeadNonVoter {
s = Dead
} else if a == AllocatorReplaceDecommissioningVoter ||
a == AllocatorReplaceDecommissioningNonVoter ||
a == AllocatorRemoveDecommissioningVoter ||
a == AllocatorRemoveDecommissioningNonVoter {
s = Decommissioning
}
return s
}

var allocatorActionNames = map[AllocatorAction]string{
AllocatorNoop: "noop",
AllocatorRemoveVoter: "remove voter",
Expand Down Expand Up @@ -268,6 +335,19 @@ func (t TargetReplicaType) String() string {
}
}

func (s ReplicaStatus) String() string {
switch s {
case Alive:
return "live"
case Dead:
return "dead"
case Decommissioning:
return "decommissioning"
default:
panic(fmt.Sprintf("unknown replicaStatus %d", s))
}
}

type transferDecision int

const (
Expand Down Expand Up @@ -595,6 +675,146 @@ func GetNeededNonVoters(numVoters, zoneConfigNonVoterCount, clusterNodes int) in
return need
}

// WillHaveFragileQuorum determines, based on the number of existing voters,
// incoming voters, and needed voters, if we will be upreplicating to a state
// in which we don't have enough needed voters and yet will have a fragile quorum
// due to an even number of voter replicas.
func WillHaveFragileQuorum(
numExistingVoters, numNewVoters, zoneConfigVoterCount, clusterNodes int,
) bool {
neededVoters := GetNeededVoters(int32(zoneConfigVoterCount), clusterNodes)
willHave := numExistingVoters + numNewVoters
// NB: If willHave >= neededVoters, then always allow up-replicating as that
// will be the case when up-replicating a range with a decommissioning
// replica.
return numNewVoters > 0 && willHave < neededVoters && willHave%2 == 0
}

// LiveAndDeadVoterAndNonVoterReplicas splits up the replica in the given range
// descriptor by voters vs non-voters and live replicas vs dead replicas.
func LiveAndDeadVoterAndNonVoterReplicas(
storePool storepool.AllocatorStorePool, desc *roachpb.RangeDescriptor,
) (
voterReplicas, nonVoterReplicas, liveVoterReplicas, deadVoterReplicas, liveNonVoterReplicas, deadNonVoterReplicas []roachpb.ReplicaDescriptor,
) {
voterReplicas = desc.Replicas().VoterDescriptors()
nonVoterReplicas = desc.Replicas().NonVoterDescriptors()
liveVoterReplicas, deadVoterReplicas = storePool.LiveAndDeadReplicas(
voterReplicas, true, /* includeSuspectAndDrainingStores */
)
liveNonVoterReplicas, deadNonVoterReplicas = storePool.LiveAndDeadReplicas(
nonVoterReplicas, true, /* includeSuspectAndDrainingStores */
)
return
}

// DetermineReplicaToReplaceAndFilter is used on add or replace allocator actions
// to filter the set of live voter and non-voter replicas to use in determining
// a new allocation target. It identifies a dead or decommissioning replica to
// replace from the list of voters or non-voters, depending on the replica
// status and target type, and returns the filtered live voters and non-voters
// along with the list of existing replicas and the index of the removal candidate.
// In case of an add action, no replicas are removed and a removeIdx of -1 is
// returned, and if no candidates for replacement can be found during a replace
// action, the returned nothingToDo flag will be set to true.
func DetermineReplicaToReplaceAndFilter(
storePool storepool.AllocatorStorePool,
action AllocatorAction,
voters, nonVoters []roachpb.ReplicaDescriptor,
liveVoterReplicas, deadVoterReplicas []roachpb.ReplicaDescriptor,
liveNonVoterReplicas, deadNonVoterReplicas []roachpb.ReplicaDescriptor,
) (
existing, remainingLiveVoters, remainingLiveNonVoters []roachpb.ReplicaDescriptor,
removeIdx int,
nothingToDo bool,
err error,
) {
removeIdx = -1
remainingLiveVoters = liveVoterReplicas
remainingLiveNonVoters = liveNonVoterReplicas
var deadReplicas, removalCandidates []roachpb.ReplicaDescriptor

if !(action.Add() || action.Replace()) {
err = errors.AssertionFailedf(
"unexpected attempt to filter replicas on non-add/non-replacement action %s",
action,
)
return
}

replicaType := action.TargetReplicaType()
replicaStatus := action.ReplicaStatus()

switch replicaType {
case VoterTarget:
existing = voters
deadReplicas = deadVoterReplicas
case NonVoterTarget:
existing = nonVoters
deadReplicas = deadNonVoterReplicas
default:
panic(fmt.Sprintf("unknown targetReplicaType: %s", replicaType))
}
switch replicaStatus {
case Alive:
// NB: Live replicas are not candidates for replacement.
return
case Dead:
removalCandidates = deadReplicas
case Decommissioning:
removalCandidates = storePool.DecommissioningReplicas(existing)
default:
panic(fmt.Sprintf("unknown replicaStatus: %s", replicaStatus))
}
if len(removalCandidates) == 0 {
nothingToDo = true
return
}

removeIdx = getRemoveIdx(existing, removalCandidates[0])
if removeIdx < 0 {
err = errors.AssertionFailedf(
"%s %s %v unexpectedly not found in %v",
replicaStatus, replicaType, removalCandidates[0], existing,
)
return
}

// TODO(sarkesian): Add comment on why this filtering only happens for voters.
if replicaType == VoterTarget {
if len(existing) == 1 {
// If only one replica remains, that replica is the leaseholder and
// we won't be able to swap it out. Ignore the removal and simply add
// a replica.
removeIdx = -1
}

if removeIdx >= 0 {
replToRemove := existing[removeIdx]
for i, r := range liveVoterReplicas {
if r.ReplicaID == replToRemove.ReplicaID {
remainingLiveVoters = append(liveVoterReplicas[:i:i], liveVoterReplicas[i+1:]...)
break
}
}
}
}
return
}

func getRemoveIdx(
repls []roachpb.ReplicaDescriptor, deadOrDecommissioningRepl roachpb.ReplicaDescriptor,
) (removeIdx int) {
removeIdx = -1
for i, rDesc := range repls {
if rDesc.StoreID == deadOrDecommissioningRepl.StoreID {
removeIdx = i
break
}
}
return removeIdx
}

// ComputeAction determines the exact operation needed to repair the
// supplied range, as governed by the supplied zone configuration. It
// returns the required action that should be taken and a priority.
Expand Down Expand Up @@ -925,7 +1145,11 @@ func (s *GoodCandidateSelector) selectOne(cl candidateList) *candidate {
return cl.selectGood(s.randGen)
}

func (a *Allocator) allocateTarget(
// AllocateTarget returns a suitable store for a new allocation of a voting or
// non-voting replica with the required attributes. Nodes already accommodating
// voting replicas are ruled out in the voter case, and nodes accommodating
// _any_ replicas are ruled out in the non-voter case.
func (a *Allocator) AllocateTarget(
ctx context.Context,
storePool storepool.AllocatorStorePool,
conf roachpb.SpanConfig,
Expand Down Expand Up @@ -986,6 +1210,50 @@ func (a *Allocator) allocateTarget(
}
}

// CheckAvoidsFragileQuorum ensures that if we are allocating a new voter and
// will result in an even number of voters, that we can allocate another voter
// target in order to avoid a fragile quorum state. This check should be
// performed whenever we are planning or testing allocation of a new voter.
//
// We can skip this check if we're swapping a replica or allocating a non-voter,
// since that does not change the quorum size.
func (a *Allocator) CheckAvoidsFragileQuorum(
ctx context.Context,
storePool storepool.AllocatorStorePool,
conf roachpb.SpanConfig,
existingVoters, remainingLiveNonVoters []roachpb.ReplicaDescriptor,
replicaStatus ReplicaStatus,
replicaType TargetReplicaType,
newTarget roachpb.ReplicationTarget,
isReplacement bool,
) error {
// Validation is only applicable when allocating new voters.
if replicaType != VoterTarget {
return nil
}
newVoters := 0
if !isReplacement {
newVoters = 1
}
clusterNodes := storePool.ClusterNodeCount()
neededVoters := GetNeededVoters(conf.GetNumVoters(), clusterNodes)

if WillHaveFragileQuorum(len(existingVoters), newVoters, neededVoters, clusterNodes) {
// This means we are going to up-replicate to an even replica state.
// Check if it is possible to go to an odd replica state beyond it.
oldPlusNewReplicas := existingVoters
oldPlusNewReplicas = append(
oldPlusNewReplicas,
roachpb.ReplicaDescriptor{NodeID: newTarget.NodeID, StoreID: newTarget.StoreID},
)

_, _, err := a.AllocateVoter(ctx, storePool, conf, oldPlusNewReplicas, remainingLiveNonVoters, replicaStatus)
return err
}

return nil
}

// AllocateVoter returns a suitable store for a new allocation of a voting
// replica with the required attributes. Nodes already accommodating existing
// voting replicas are ruled out as targets.
Expand All @@ -996,7 +1264,7 @@ func (a *Allocator) AllocateVoter(
existingVoters, existingNonVoters []roachpb.ReplicaDescriptor,
replicaStatus ReplicaStatus,
) (roachpb.ReplicationTarget, string, error) {
return a.allocateTarget(ctx, storePool, conf, existingVoters, existingNonVoters, replicaStatus, VoterTarget)
return a.AllocateTarget(ctx, storePool, conf, existingVoters, existingNonVoters, replicaStatus, VoterTarget)
}

// AllocateNonVoter returns a suitable store for a new allocation of a
Expand All @@ -1009,7 +1277,7 @@ func (a *Allocator) AllocateNonVoter(
existingVoters, existingNonVoters []roachpb.ReplicaDescriptor,
replicaStatus ReplicaStatus,
) (roachpb.ReplicationTarget, string, error) {
return a.allocateTarget(ctx, storePool, conf, existingVoters, existingNonVoters, replicaStatus, NonVoterTarget)
return a.AllocateTarget(ctx, storePool, conf, existingVoters, existingNonVoters, replicaStatus, NonVoterTarget)
}

// AllocateTargetFromList returns a suitable store for a new allocation of a
Expand Down
Loading