Skip to content

Commit

Permalink
kvserver: consider suspect stores "live" for computing quorum
Browse files Browse the repository at this point in the history
Previously, when making the determination of whether a range could achieve
quorum, the allocator ignored "suspect" stores. In other words, a range with 3
replicas would be considered unavailable for rebalancing decisions if it had 2
or more replicas on stores that are marked suspect.

This meant that if a given cluster had multiple nodes missing their liveness
heartbeats intermittently, operations like node decommissioning would never
make progress past a certain point (the replicate queue would never decide to
move replicas away because it would think their ranges are unavailable, even
though they're really not).

This patch fixes this by slightly altering the state transitions for how stores
go in and out of "suspect" and by having the replica rebalancing code
specifically ask for suspect stores to be included in the set of "live"
replicas when it makes the determination of whether a given range can achieve.

Release note (bug fix): A bug that was introduced in 21.1.5, which prevented
nodes from decommissioning in a cluster if it had multiple nodes intermittently
missing their liveness heartbeats has been fixed.
  • Loading branch information
aayushshah15 committed Aug 30, 2021
1 parent 71459e8 commit 5e17dbd
Show file tree
Hide file tree
Showing 7 changed files with 109 additions and 40 deletions.
23 changes: 16 additions & 7 deletions pkg/kv/kvserver/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -544,7 +544,14 @@ func (a *Allocator) computeAction(
return action, adjustedPriority
}

liveVoters, deadVoters := a.storePool.liveAndDeadReplicas(voterReplicas)
// NB: For the purposes of determining whether a range has quorum, we
// consider stores marked as "suspect" to be live. This is necessary because
// we would otherwise spuriously consider ranges with replicas on suspect
// stores to be unavailable, just because their nodes have failed a liveness
// heartbeat in the recent past. This means we won't move those replicas
// elsewhere (for a regular rebalance or for decommissioning).
const includeSuspectStores = true
liveVoters, deadVoters := a.storePool.liveAndDeadReplicas(voterReplicas, includeSuspectStores)

if len(liveVoters) < quorum {
// Do not take any replacement/removal action if we do not have a quorum of
Expand Down Expand Up @@ -623,7 +630,9 @@ func (a *Allocator) computeAction(
return action, action.Priority()
}

liveNonVoters, deadNonVoters := a.storePool.liveAndDeadReplicas(nonVoterReplicas)
liveNonVoters, deadNonVoters := a.storePool.liveAndDeadReplicas(
nonVoterReplicas, includeSuspectStores,
)
if haveNonVoters == neededNonVoters && len(deadNonVoters) > 0 {
// The range has non-voter(s) on a dead node that we should replace.
action = AllocatorReplaceDeadNonVoter
Expand Down Expand Up @@ -1309,7 +1318,7 @@ func (a *Allocator) TransferLeaseTarget(
return roachpb.ReplicaDescriptor{}
}
// Verify that the preferred replica is eligible to receive the lease.
preferred, _ = a.storePool.liveAndDeadReplicas(preferred)
preferred, _ = a.storePool.liveAndDeadReplicas(preferred, false /* includeSuspectStores */)
if len(preferred) == 1 {
return preferred[0]
}
Expand All @@ -1323,8 +1332,8 @@ func (a *Allocator) TransferLeaseTarget(
}
}

// Only consider live, non-draining replicas.
existing, _ = a.storePool.liveAndDeadReplicas(existing)
// Only consider live, non-draining, non-suspect replicas.
existing, _ = a.storePool.liveAndDeadReplicas(existing, false /* includeSuspectStores */)

// Short-circuit if there are no valid targets out there.
if len(existing) == 0 || (len(existing) == 1 && existing[0].StoreID == leaseStoreID) {
Expand Down Expand Up @@ -1427,8 +1436,8 @@ func (a *Allocator) ShouldTransferLease(
sl = sl.filter(zone.Constraints)
log.VEventf(ctx, 3, "ShouldTransferLease (lease-holder=%d):\n%s", leaseStoreID, sl)

// Only consider live, non-draining replicas.
existing, _ = a.storePool.liveAndDeadReplicas(existing)
// Only consider live, non-draining, non-suspect replicas.
existing, _ = a.storePool.liveAndDeadReplicas(existing, false /* includeSuspectNodes */)

// Short-circuit if there are no valid targets out there.
if len(existing) == 0 || (len(existing) == 1 && existing[0].StoreID == source.StoreID) {
Expand Down
6 changes: 4 additions & 2 deletions pkg/kv/kvserver/allocator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5612,12 +5612,14 @@ func TestAllocatorComputeActionSuspect(t *testing.T) {
suspect: []roachpb.StoreID{3},
expectedAction: AllocatorConsiderRebalance,
},
// Needs three replicas, two are suspect (i.e. the range lacks a quorum).
{
// When trying to determine whether a range can achieve quorum, we count
// suspect nodes as live because they _currently_ have a "live" node
// liveness record.
desc: threeReplDesc,
live: []roachpb.StoreID{1, 4},
suspect: []roachpb.StoreID{2, 3},
expectedAction: AllocatorRangeUnavailable,
expectedAction: AllocatorConsiderRebalance,
},
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/client_lease_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -627,12 +627,12 @@ func TestLeasesDontThrashWhenNodeBecomesSuspect(t *testing.T) {

testutils.SucceedsSoon(t, func() error {
for _, i := range []int{2, 3} {
suspect, err := tc.GetFirstStoreFromServer(t, i).GetStoreConfig().StorePool.IsSuspect(tc.Target(1).StoreID)
suspect, err := tc.GetFirstStoreFromServer(t, i).GetStoreConfig().StorePool.IsUnknown(tc.Target(1).StoreID)
if err != nil {
return err
}
if !suspect {
return errors.Errorf("Expected server 1 to be suspect on server %d", i)
return errors.Errorf("Expected server 1 to be in `storeStatusUnknown` on server %d", i)
}
}
return nil
Expand Down
26 changes: 20 additions & 6 deletions pkg/kv/kvserver/replica_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -2035,12 +2035,18 @@ type changeReplicasTxnArgs struct {
db *kv.DB

// liveAndDeadReplicas divides the provided repls slice into two slices: the
// first for live replicas, and the second for dead replicas. Replicas for
// which liveness or deadness cannot be ascertained are excluded from the
// returned slices. Replicas on decommissioning node/store are considered
// live.
// first for live replicas, and the second for dead replicas.
//
// - Replicas for which liveness or deadness cannot be ascertained are
// excluded from the returned slices.
//
// - Replicas on decommissioning node/store are considered live.
//
// - If `includeSuspectStores` is true, stores that are marked suspect (i.e.
// stores that have failed a liveness heartbeat in the recent past) are
// considered live. Otherwise, they are excluded from the returned slices.
liveAndDeadReplicas func(
repls []roachpb.ReplicaDescriptor,
repls []roachpb.ReplicaDescriptor, includeSuspectStores bool,
) (liveReplicas, deadReplicas []roachpb.ReplicaDescriptor)

logChange logChangeFn
Expand Down Expand Up @@ -2127,7 +2133,15 @@ func execChangeReplicasTxn(
// See:
// https://github.com/cockroachdb/cockroach/issues/54444#issuecomment-707706553
replicas := crt.Desc.Replicas()
liveReplicas, _ := args.liveAndDeadReplicas(replicas.Descriptors())
// We consider stores marked as "suspect" to be alive for the purposes of
// determining whether the range can achieve quorum since these stores are
// known to be currently live but have failed a liveness heartbeat in the
// recent past.
//
// Note that the allocator will avoid rebalancing to stores that are
// currently marked suspect. See uses of StorePool.getStoreList() in
// allocator.go.
liveReplicas, _ := args.liveAndDeadReplicas(replicas.Descriptors(), true /* includeSuspectStores */)
if !replicas.CanMakeProgress(
func(rDesc roachpb.ReplicaDescriptor) bool {
for _, inner := range liveReplicas {
Expand Down
12 changes: 8 additions & 4 deletions pkg/kv/kvserver/replicate_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -365,12 +365,16 @@ func (rq *replicateQueue) processOneChange(
// range descriptor.
desc, zone := repl.DescAndZone()

// Avoid taking action if the range has too many dead replicas to make
// quorum.
// Avoid taking action if the range has too many dead replicas to make quorum.
// Consider stores marked suspect as live in order to make this determination.
voterReplicas := desc.Replicas().VoterDescriptors()
nonVoterReplicas := desc.Replicas().NonVoterDescriptors()
liveVoterReplicas, deadVoterReplicas := rq.allocator.storePool.liveAndDeadReplicas(voterReplicas)
liveNonVoterReplicas, deadNonVoterReplicas := rq.allocator.storePool.liveAndDeadReplicas(nonVoterReplicas)
liveVoterReplicas, deadVoterReplicas := rq.allocator.storePool.liveAndDeadReplicas(
voterReplicas, true, /* includeSuspectStores */
)
liveNonVoterReplicas, deadNonVoterReplicas := rq.allocator.storePool.liveAndDeadReplicas(
nonVoterReplicas, true, /* includeSuspectStores */
)

// NB: the replication layer ensures that the below operations don't cause
// unavailability; see:
Expand Down
54 changes: 42 additions & 12 deletions pkg/kv/kvserver/store_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,26 @@ const (
func (sd *storeDetail) status(
now time.Time, threshold time.Duration, nl NodeLivenessFunc, suspectDuration time.Duration,
) storeStatus {
// During normal operation, we expect the state transitions for stores to look like the following:
//
// Successful heartbeats
// throughout the suspect
// +-----------------------+ duration
// | storeStatusAvailable |<-+------------------------------------+
// +-----------------------+ | |
// | |
// | +--------------------+
// | | storeStatusSuspect |
// +---------------------------+ +--------------------+
// | Failed liveness ^
// | heartbeat |
// | |
// | |
// | +----------------------+ |
// +->| storeStatusUnknown |--------------------------------------+
// +----------------------+ Successful liveness
// heartbeat
//
// The store is considered dead if it hasn't been updated via gossip
// within the liveness threshold. Note that lastUpdatedTime is set
// when the store detail is created and will have a non-zero value
Expand All @@ -270,11 +290,9 @@ func (sd *storeDetail) status(
return storeStatusDecommissioning
case livenesspb.NodeLivenessStatus_UNAVAILABLE:
// We don't want to suspect a node on startup or when it's first added to a
// cluster, because we dont know it's liveness yet. A node is only considered
// suspect if it's been alive and fails to heartbeat liveness.
// cluster, because we dont know its liveness yet.
if !sd.lastAvailable.IsZero() {
sd.lastUnavailable = now
return storeStatusSuspect
}
return storeStatusUnknown
case livenesspb.NodeLivenessStatus_UNKNOWN:
Expand Down Expand Up @@ -577,14 +595,16 @@ func (sp *StorePool) ClusterNodeCount() int {
return sp.nodeCountFn()
}

// IsSuspect returns true if the node is suspected by the store pool or an error
// if the store is not found in the pool.
func (sp *StorePool) IsSuspect(storeID roachpb.StoreID) (bool, error) {
// IsUnknown returns true if the given store's status is `storeStatusUnknown`
// (i.e. it just failed a liveness heartbeat and we cannot ascertain its
// liveness or deadness at the moment) or an error if the store is not found in
// the pool.
func (sp *StorePool) IsUnknown(storeID roachpb.StoreID) (bool, error) {
status, err := sp.storeStatus(storeID)
if err != nil {
return false, err
}
return status == storeStatusSuspect, nil
return status == storeStatusUnknown, nil
}

// IsLive returns true if the node is considered alive by the store pool or an error
Expand Down Expand Up @@ -615,11 +635,17 @@ func (sp *StorePool) storeStatus(storeID roachpb.StoreID) (storeStatus, error) {

// liveAndDeadReplicas divides the provided repls slice into two slices: the
// first for live replicas, and the second for dead replicas.
// Replicas for which liveness or deadness cannot be ascertained are excluded
// from the returned slices. Replicas on decommissioning node/store are
// considered live.
//
// - Replicas for which liveness or deadness cannot be ascertained
// (storeStatusUnknown) are excluded from the returned slices.
//
// - Replicas on decommissioning node/store are considered live.
//
// - If `includeSuspectStores` is true, stores that are marked suspect (i.e.
// stores that have failed a liveness heartbeat in the recent past) are
// considered live. Otherwise, they are excluded from the returned slices.
func (sp *StorePool) liveAndDeadReplicas(
repls []roachpb.ReplicaDescriptor,
repls []roachpb.ReplicaDescriptor, includeSuspectStores bool,
) (liveReplicas, deadReplicas []roachpb.ReplicaDescriptor) {
sp.detailsMu.Lock()
defer sp.detailsMu.Unlock()
Expand All @@ -641,8 +667,12 @@ func (sp *StorePool) liveAndDeadReplicas(
// We count decommissioning replicas to be alive because they are readable
// and should be used for up-replication if necessary.
liveReplicas = append(liveReplicas, repl)
case storeStatusUnknown, storeStatusSuspect:
case storeStatusUnknown:
// No-op.
case storeStatusSuspect:
if includeSuspectStores {
liveReplicas = append(liveReplicas, repl)
}
default:
log.Fatalf(context.TODO(), "unknown store status %d", status)
}
Expand Down
24 changes: 17 additions & 7 deletions pkg/kv/kvserver/store_pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -755,7 +755,7 @@ func TestStorePoolFindDeadReplicas(t *testing.T) {
mnl.setNodeStatus(roachpb.NodeID(i), livenesspb.NodeLivenessStatus_LIVE)
}

liveReplicas, deadReplicas := sp.liveAndDeadReplicas(replicas)
liveReplicas, deadReplicas := sp.liveAndDeadReplicas(replicas, false /* includeSuspectStores */)
if len(liveReplicas) != 5 {
t.Fatalf("expected five live replicas, found %d (%v)", len(liveReplicas), liveReplicas)
}
Expand All @@ -766,7 +766,7 @@ func TestStorePoolFindDeadReplicas(t *testing.T) {
mnl.setNodeStatus(4, livenesspb.NodeLivenessStatus_DEAD)
mnl.setNodeStatus(5, livenesspb.NodeLivenessStatus_DEAD)

liveReplicas, deadReplicas = sp.liveAndDeadReplicas(replicas)
liveReplicas, deadReplicas = sp.liveAndDeadReplicas(replicas, false /* includeSuspectNodes */)
if a, e := liveReplicas, replicas[:3]; !reflect.DeepEqual(a, e) {
t.Fatalf("expected live replicas %+v; got %+v", e, a)
}
Expand All @@ -777,7 +777,7 @@ func TestStorePoolFindDeadReplicas(t *testing.T) {
// Mark node 4 as merely unavailable.
mnl.setNodeStatus(4, livenesspb.NodeLivenessStatus_UNAVAILABLE)

liveReplicas, deadReplicas = sp.liveAndDeadReplicas(replicas)
liveReplicas, deadReplicas = sp.liveAndDeadReplicas(replicas, false /* includeSuspectStores */)
if a, e := liveReplicas, replicas[:3]; !reflect.DeepEqual(a, e) {
t.Fatalf("expected live replicas %+v; got %+v", e, a)
}
Expand All @@ -802,7 +802,10 @@ func TestStorePoolDefaultState(t *testing.T) {
livenesspb.NodeLivenessStatus_DEAD)
defer stopper.Stop(context.Background())

liveReplicas, deadReplicas := sp.liveAndDeadReplicas([]roachpb.ReplicaDescriptor{{StoreID: 1}})
liveReplicas, deadReplicas := sp.liveAndDeadReplicas(
[]roachpb.ReplicaDescriptor{{StoreID: 1}},
false, /* includeSuspectStores */
)
if len(liveReplicas) != 0 || len(deadReplicas) != 0 {
t.Errorf("expected 0 live and 0 dead replicas; got %v and %v", liveReplicas, deadReplicas)
}
Expand Down Expand Up @@ -875,6 +878,8 @@ func TestStorePoolSuspected(t *testing.T) {
timeUntilStoreDead := TimeUntilStoreDead.Get(&sp.st.SV)
timeAfterStoreSuspect := TimeAfterStoreSuspect.Get(&sp.st.SV)

// See state transition diagram in storeDetail.status() for a visual
// representation of what this test asserts.
mnl.setNodeStatus(store.Node.NodeID, livenesspb.NodeLivenessStatus_LIVE)
sp.detailsMu.Lock()
detail := sp.getStoreDetailLocked(store.StoreID)
Expand All @@ -888,11 +893,16 @@ func TestStorePoolSuspected(t *testing.T) {
sp.detailsMu.Lock()
s = detail.status(now, timeUntilStoreDead, sp.nodeLivenessFn, timeAfterStoreSuspect)
sp.detailsMu.Unlock()
require.Equal(t, s, storeStatusSuspect)
require.Equal(t, s, storeStatusUnknown)
require.False(t, detail.lastAvailable.IsZero())
require.False(t, detail.lastUnavailable.IsZero())

mnl.setNodeStatus(store.Node.NodeID, livenesspb.NodeLivenessStatus_LIVE)
sp.detailsMu.Lock()
s = detail.status(now, timeUntilStoreDead, sp.nodeLivenessFn, timeAfterStoreSuspect)
sp.detailsMu.Unlock()
require.Equal(t, s, storeStatusSuspect)

sp.detailsMu.Lock()
s = detail.status(now.Add(timeAfterStoreSuspect).Add(time.Millisecond),
timeUntilStoreDead, sp.nodeLivenessFn, timeAfterStoreSuspect)
Expand Down Expand Up @@ -1062,7 +1072,7 @@ func TestStorePoolDecommissioningReplicas(t *testing.T) {
mnl.setNodeStatus(roachpb.NodeID(i), livenesspb.NodeLivenessStatus_LIVE)
}

liveReplicas, deadReplicas := sp.liveAndDeadReplicas(replicas)
liveReplicas, deadReplicas := sp.liveAndDeadReplicas(replicas, false /* includeSuspectStores */)
if len(liveReplicas) != 5 {
t.Fatalf("expected five live replicas, found %d (%v)", len(liveReplicas), liveReplicas)
}
Expand All @@ -1074,7 +1084,7 @@ func TestStorePoolDecommissioningReplicas(t *testing.T) {
// Mark node 5 as dead.
mnl.setNodeStatus(5, livenesspb.NodeLivenessStatus_DEAD)

liveReplicas, deadReplicas = sp.liveAndDeadReplicas(replicas)
liveReplicas, deadReplicas = sp.liveAndDeadReplicas(replicas, false /* includeSuspectStores */)
// Decommissioning replicas are considered live.
if a, e := liveReplicas, replicas[:4]; !reflect.DeepEqual(a, e) {
t.Fatalf("expected live replicas %+v; got %+v", e, a)
Expand Down

0 comments on commit 5e17dbd

Please sign in to comment.