Skip to content

Commit

Permalink
Merge pull request #68552 from aayushshah15/backport21.1-67714
Browse files Browse the repository at this point in the history
  • Loading branch information
aayushshah15 authored Aug 30, 2021
2 parents a2725bd + 5e17dbd commit b4dab92
Show file tree
Hide file tree
Showing 7 changed files with 109 additions and 40 deletions.
23 changes: 16 additions & 7 deletions pkg/kv/kvserver/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -544,7 +544,14 @@ func (a *Allocator) computeAction(
return action, adjustedPriority
}

liveVoters, deadVoters := a.storePool.liveAndDeadReplicas(voterReplicas)
// NB: For the purposes of determining whether a range has quorum, we
// consider stores marked as "suspect" to be live. This is necessary because
// we would otherwise spuriously consider ranges with replicas on suspect
// stores to be unavailable, just because their nodes have failed a liveness
// heartbeat in the recent past. This means we won't move those replicas
// elsewhere (for a regular rebalance or for decommissioning).
const includeSuspectStores = true
liveVoters, deadVoters := a.storePool.liveAndDeadReplicas(voterReplicas, includeSuspectStores)

if len(liveVoters) < quorum {
// Do not take any replacement/removal action if we do not have a quorum of
Expand Down Expand Up @@ -623,7 +630,9 @@ func (a *Allocator) computeAction(
return action, action.Priority()
}

liveNonVoters, deadNonVoters := a.storePool.liveAndDeadReplicas(nonVoterReplicas)
liveNonVoters, deadNonVoters := a.storePool.liveAndDeadReplicas(
nonVoterReplicas, includeSuspectStores,
)
if haveNonVoters == neededNonVoters && len(deadNonVoters) > 0 {
// The range has non-voter(s) on a dead node that we should replace.
action = AllocatorReplaceDeadNonVoter
Expand Down Expand Up @@ -1309,7 +1318,7 @@ func (a *Allocator) TransferLeaseTarget(
return roachpb.ReplicaDescriptor{}
}
// Verify that the preferred replica is eligible to receive the lease.
preferred, _ = a.storePool.liveAndDeadReplicas(preferred)
preferred, _ = a.storePool.liveAndDeadReplicas(preferred, false /* includeSuspectStores */)
if len(preferred) == 1 {
return preferred[0]
}
Expand All @@ -1323,8 +1332,8 @@ func (a *Allocator) TransferLeaseTarget(
}
}

// Only consider live, non-draining replicas.
existing, _ = a.storePool.liveAndDeadReplicas(existing)
// Only consider live, non-draining, non-suspect replicas.
existing, _ = a.storePool.liveAndDeadReplicas(existing, false /* includeSuspectStores */)

// Short-circuit if there are no valid targets out there.
if len(existing) == 0 || (len(existing) == 1 && existing[0].StoreID == leaseStoreID) {
Expand Down Expand Up @@ -1427,8 +1436,8 @@ func (a *Allocator) ShouldTransferLease(
sl = sl.filter(zone.Constraints)
log.VEventf(ctx, 3, "ShouldTransferLease (lease-holder=%d):\n%s", leaseStoreID, sl)

// Only consider live, non-draining replicas.
existing, _ = a.storePool.liveAndDeadReplicas(existing)
// Only consider live, non-draining, non-suspect replicas.
existing, _ = a.storePool.liveAndDeadReplicas(existing, false /* includeSuspectNodes */)

// Short-circuit if there are no valid targets out there.
if len(existing) == 0 || (len(existing) == 1 && existing[0].StoreID == source.StoreID) {
Expand Down
6 changes: 4 additions & 2 deletions pkg/kv/kvserver/allocator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5612,12 +5612,14 @@ func TestAllocatorComputeActionSuspect(t *testing.T) {
suspect: []roachpb.StoreID{3},
expectedAction: AllocatorConsiderRebalance,
},
// Needs three replicas, two are suspect (i.e. the range lacks a quorum).
{
// When trying to determine whether a range can achieve quorum, we count
// suspect nodes as live because they _currently_ have a "live" node
// liveness record.
desc: threeReplDesc,
live: []roachpb.StoreID{1, 4},
suspect: []roachpb.StoreID{2, 3},
expectedAction: AllocatorRangeUnavailable,
expectedAction: AllocatorConsiderRebalance,
},
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/client_lease_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -627,12 +627,12 @@ func TestLeasesDontThrashWhenNodeBecomesSuspect(t *testing.T) {

testutils.SucceedsSoon(t, func() error {
for _, i := range []int{2, 3} {
suspect, err := tc.GetFirstStoreFromServer(t, i).GetStoreConfig().StorePool.IsSuspect(tc.Target(1).StoreID)
suspect, err := tc.GetFirstStoreFromServer(t, i).GetStoreConfig().StorePool.IsUnknown(tc.Target(1).StoreID)
if err != nil {
return err
}
if !suspect {
return errors.Errorf("Expected server 1 to be suspect on server %d", i)
return errors.Errorf("Expected server 1 to be in `storeStatusUnknown` on server %d", i)
}
}
return nil
Expand Down
26 changes: 20 additions & 6 deletions pkg/kv/kvserver/replica_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -2035,12 +2035,18 @@ type changeReplicasTxnArgs struct {
db *kv.DB

// liveAndDeadReplicas divides the provided repls slice into two slices: the
// first for live replicas, and the second for dead replicas. Replicas for
// which liveness or deadness cannot be ascertained are excluded from the
// returned slices. Replicas on decommissioning node/store are considered
// live.
// first for live replicas, and the second for dead replicas.
//
// - Replicas for which liveness or deadness cannot be ascertained are
// excluded from the returned slices.
//
// - Replicas on decommissioning node/store are considered live.
//
// - If `includeSuspectStores` is true, stores that are marked suspect (i.e.
// stores that have failed a liveness heartbeat in the recent past) are
// considered live. Otherwise, they are excluded from the returned slices.
liveAndDeadReplicas func(
repls []roachpb.ReplicaDescriptor,
repls []roachpb.ReplicaDescriptor, includeSuspectStores bool,
) (liveReplicas, deadReplicas []roachpb.ReplicaDescriptor)

logChange logChangeFn
Expand Down Expand Up @@ -2127,7 +2133,15 @@ func execChangeReplicasTxn(
// See:
// https://github.com/cockroachdb/cockroach/issues/54444#issuecomment-707706553
replicas := crt.Desc.Replicas()
liveReplicas, _ := args.liveAndDeadReplicas(replicas.Descriptors())
// We consider stores marked as "suspect" to be alive for the purposes of
// determining whether the range can achieve quorum since these stores are
// known to be currently live but have failed a liveness heartbeat in the
// recent past.
//
// Note that the allocator will avoid rebalancing to stores that are
// currently marked suspect. See uses of StorePool.getStoreList() in
// allocator.go.
liveReplicas, _ := args.liveAndDeadReplicas(replicas.Descriptors(), true /* includeSuspectStores */)
if !replicas.CanMakeProgress(
func(rDesc roachpb.ReplicaDescriptor) bool {
for _, inner := range liveReplicas {
Expand Down
12 changes: 8 additions & 4 deletions pkg/kv/kvserver/replicate_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -365,12 +365,16 @@ func (rq *replicateQueue) processOneChange(
// range descriptor.
desc, zone := repl.DescAndZone()

// Avoid taking action if the range has too many dead replicas to make
// quorum.
// Avoid taking action if the range has too many dead replicas to make quorum.
// Consider stores marked suspect as live in order to make this determination.
voterReplicas := desc.Replicas().VoterDescriptors()
nonVoterReplicas := desc.Replicas().NonVoterDescriptors()
liveVoterReplicas, deadVoterReplicas := rq.allocator.storePool.liveAndDeadReplicas(voterReplicas)
liveNonVoterReplicas, deadNonVoterReplicas := rq.allocator.storePool.liveAndDeadReplicas(nonVoterReplicas)
liveVoterReplicas, deadVoterReplicas := rq.allocator.storePool.liveAndDeadReplicas(
voterReplicas, true, /* includeSuspectStores */
)
liveNonVoterReplicas, deadNonVoterReplicas := rq.allocator.storePool.liveAndDeadReplicas(
nonVoterReplicas, true, /* includeSuspectStores */
)

// NB: the replication layer ensures that the below operations don't cause
// unavailability; see:
Expand Down
54 changes: 42 additions & 12 deletions pkg/kv/kvserver/store_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,26 @@ const (
func (sd *storeDetail) status(
now time.Time, threshold time.Duration, nl NodeLivenessFunc, suspectDuration time.Duration,
) storeStatus {
// During normal operation, we expect the state transitions for stores to look like the following:
//
// Successful heartbeats
// throughout the suspect
// +-----------------------+ duration
// | storeStatusAvailable |<-+------------------------------------+
// +-----------------------+ | |
// | |
// | +--------------------+
// | | storeStatusSuspect |
// +---------------------------+ +--------------------+
// | Failed liveness ^
// | heartbeat |
// | |
// | |
// | +----------------------+ |
// +->| storeStatusUnknown |--------------------------------------+
// +----------------------+ Successful liveness
// heartbeat
//
// The store is considered dead if it hasn't been updated via gossip
// within the liveness threshold. Note that lastUpdatedTime is set
// when the store detail is created and will have a non-zero value
Expand All @@ -270,11 +290,9 @@ func (sd *storeDetail) status(
return storeStatusDecommissioning
case livenesspb.NodeLivenessStatus_UNAVAILABLE:
// We don't want to suspect a node on startup or when it's first added to a
// cluster, because we dont know it's liveness yet. A node is only considered
// suspect if it's been alive and fails to heartbeat liveness.
// cluster, because we dont know its liveness yet.
if !sd.lastAvailable.IsZero() {
sd.lastUnavailable = now
return storeStatusSuspect
}
return storeStatusUnknown
case livenesspb.NodeLivenessStatus_UNKNOWN:
Expand Down Expand Up @@ -577,14 +595,16 @@ func (sp *StorePool) ClusterNodeCount() int {
return sp.nodeCountFn()
}

// IsSuspect returns true if the node is suspected by the store pool or an error
// if the store is not found in the pool.
func (sp *StorePool) IsSuspect(storeID roachpb.StoreID) (bool, error) {
// IsUnknown returns true if the given store's status is `storeStatusUnknown`
// (i.e. it just failed a liveness heartbeat and we cannot ascertain its
// liveness or deadness at the moment) or an error if the store is not found in
// the pool.
func (sp *StorePool) IsUnknown(storeID roachpb.StoreID) (bool, error) {
status, err := sp.storeStatus(storeID)
if err != nil {
return false, err
}
return status == storeStatusSuspect, nil
return status == storeStatusUnknown, nil
}

// IsLive returns true if the node is considered alive by the store pool or an error
Expand Down Expand Up @@ -615,11 +635,17 @@ func (sp *StorePool) storeStatus(storeID roachpb.StoreID) (storeStatus, error) {

// liveAndDeadReplicas divides the provided repls slice into two slices: the
// first for live replicas, and the second for dead replicas.
// Replicas for which liveness or deadness cannot be ascertained are excluded
// from the returned slices. Replicas on decommissioning node/store are
// considered live.
//
// - Replicas for which liveness or deadness cannot be ascertained
// (storeStatusUnknown) are excluded from the returned slices.
//
// - Replicas on decommissioning node/store are considered live.
//
// - If `includeSuspectStores` is true, stores that are marked suspect (i.e.
// stores that have failed a liveness heartbeat in the recent past) are
// considered live. Otherwise, they are excluded from the returned slices.
func (sp *StorePool) liveAndDeadReplicas(
repls []roachpb.ReplicaDescriptor,
repls []roachpb.ReplicaDescriptor, includeSuspectStores bool,
) (liveReplicas, deadReplicas []roachpb.ReplicaDescriptor) {
sp.detailsMu.Lock()
defer sp.detailsMu.Unlock()
Expand All @@ -641,8 +667,12 @@ func (sp *StorePool) liveAndDeadReplicas(
// We count decommissioning replicas to be alive because they are readable
// and should be used for up-replication if necessary.
liveReplicas = append(liveReplicas, repl)
case storeStatusUnknown, storeStatusSuspect:
case storeStatusUnknown:
// No-op.
case storeStatusSuspect:
if includeSuspectStores {
liveReplicas = append(liveReplicas, repl)
}
default:
log.Fatalf(context.TODO(), "unknown store status %d", status)
}
Expand Down
24 changes: 17 additions & 7 deletions pkg/kv/kvserver/store_pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -755,7 +755,7 @@ func TestStorePoolFindDeadReplicas(t *testing.T) {
mnl.setNodeStatus(roachpb.NodeID(i), livenesspb.NodeLivenessStatus_LIVE)
}

liveReplicas, deadReplicas := sp.liveAndDeadReplicas(replicas)
liveReplicas, deadReplicas := sp.liveAndDeadReplicas(replicas, false /* includeSuspectStores */)
if len(liveReplicas) != 5 {
t.Fatalf("expected five live replicas, found %d (%v)", len(liveReplicas), liveReplicas)
}
Expand All @@ -766,7 +766,7 @@ func TestStorePoolFindDeadReplicas(t *testing.T) {
mnl.setNodeStatus(4, livenesspb.NodeLivenessStatus_DEAD)
mnl.setNodeStatus(5, livenesspb.NodeLivenessStatus_DEAD)

liveReplicas, deadReplicas = sp.liveAndDeadReplicas(replicas)
liveReplicas, deadReplicas = sp.liveAndDeadReplicas(replicas, false /* includeSuspectNodes */)
if a, e := liveReplicas, replicas[:3]; !reflect.DeepEqual(a, e) {
t.Fatalf("expected live replicas %+v; got %+v", e, a)
}
Expand All @@ -777,7 +777,7 @@ func TestStorePoolFindDeadReplicas(t *testing.T) {
// Mark node 4 as merely unavailable.
mnl.setNodeStatus(4, livenesspb.NodeLivenessStatus_UNAVAILABLE)

liveReplicas, deadReplicas = sp.liveAndDeadReplicas(replicas)
liveReplicas, deadReplicas = sp.liveAndDeadReplicas(replicas, false /* includeSuspectStores */)
if a, e := liveReplicas, replicas[:3]; !reflect.DeepEqual(a, e) {
t.Fatalf("expected live replicas %+v; got %+v", e, a)
}
Expand All @@ -802,7 +802,10 @@ func TestStorePoolDefaultState(t *testing.T) {
livenesspb.NodeLivenessStatus_DEAD)
defer stopper.Stop(context.Background())

liveReplicas, deadReplicas := sp.liveAndDeadReplicas([]roachpb.ReplicaDescriptor{{StoreID: 1}})
liveReplicas, deadReplicas := sp.liveAndDeadReplicas(
[]roachpb.ReplicaDescriptor{{StoreID: 1}},
false, /* includeSuspectStores */
)
if len(liveReplicas) != 0 || len(deadReplicas) != 0 {
t.Errorf("expected 0 live and 0 dead replicas; got %v and %v", liveReplicas, deadReplicas)
}
Expand Down Expand Up @@ -875,6 +878,8 @@ func TestStorePoolSuspected(t *testing.T) {
timeUntilStoreDead := TimeUntilStoreDead.Get(&sp.st.SV)
timeAfterStoreSuspect := TimeAfterStoreSuspect.Get(&sp.st.SV)

// See state transition diagram in storeDetail.status() for a visual
// representation of what this test asserts.
mnl.setNodeStatus(store.Node.NodeID, livenesspb.NodeLivenessStatus_LIVE)
sp.detailsMu.Lock()
detail := sp.getStoreDetailLocked(store.StoreID)
Expand All @@ -888,11 +893,16 @@ func TestStorePoolSuspected(t *testing.T) {
sp.detailsMu.Lock()
s = detail.status(now, timeUntilStoreDead, sp.nodeLivenessFn, timeAfterStoreSuspect)
sp.detailsMu.Unlock()
require.Equal(t, s, storeStatusSuspect)
require.Equal(t, s, storeStatusUnknown)
require.False(t, detail.lastAvailable.IsZero())
require.False(t, detail.lastUnavailable.IsZero())

mnl.setNodeStatus(store.Node.NodeID, livenesspb.NodeLivenessStatus_LIVE)
sp.detailsMu.Lock()
s = detail.status(now, timeUntilStoreDead, sp.nodeLivenessFn, timeAfterStoreSuspect)
sp.detailsMu.Unlock()
require.Equal(t, s, storeStatusSuspect)

sp.detailsMu.Lock()
s = detail.status(now.Add(timeAfterStoreSuspect).Add(time.Millisecond),
timeUntilStoreDead, sp.nodeLivenessFn, timeAfterStoreSuspect)
Expand Down Expand Up @@ -1062,7 +1072,7 @@ func TestStorePoolDecommissioningReplicas(t *testing.T) {
mnl.setNodeStatus(roachpb.NodeID(i), livenesspb.NodeLivenessStatus_LIVE)
}

liveReplicas, deadReplicas := sp.liveAndDeadReplicas(replicas)
liveReplicas, deadReplicas := sp.liveAndDeadReplicas(replicas, false /* includeSuspectStores */)
if len(liveReplicas) != 5 {
t.Fatalf("expected five live replicas, found %d (%v)", len(liveReplicas), liveReplicas)
}
Expand All @@ -1074,7 +1084,7 @@ func TestStorePoolDecommissioningReplicas(t *testing.T) {
// Mark node 5 as dead.
mnl.setNodeStatus(5, livenesspb.NodeLivenessStatus_DEAD)

liveReplicas, deadReplicas = sp.liveAndDeadReplicas(replicas)
liveReplicas, deadReplicas = sp.liveAndDeadReplicas(replicas, false /* includeSuspectStores */)
// Decommissioning replicas are considered live.
if a, e := liveReplicas, replicas[:4]; !reflect.DeepEqual(a, e) {
t.Fatalf("expected live replicas %+v; got %+v", e, a)
Expand Down

0 comments on commit b4dab92

Please sign in to comment.