Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kvserver: clean up replica unquiescence #105041

Merged
merged 8 commits into from
Jun 20, 2023
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/client_merge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -709,7 +709,7 @@ func mergeCheckingTimestampCaches(
// Make sure the LHS range in uniquiesced so that it elects a new
// Raft leader after the partition is established.
for _, r := range lhsRepls {
r.MaybeUnquiesceAndWakeLeader()
r.MaybeUnquiesce()
}

// Issue an increment on the range. The leaseholder should evaluate
Expand Down
119 changes: 113 additions & 6 deletions pkg/kv/kvserver/client_raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1177,7 +1177,7 @@ func TestRequestsOnLaggingReplica(t *testing.T) {
// Make sure this replica has not inadvertently quiesced. We need the
// replica ticking so that it campaigns.
if otherRepl.IsQuiescent() {
otherRepl.MaybeUnquiesceAndWakeLeader()
otherRepl.MaybeUnquiesce()
}
lead := otherRepl.RaftStatus().Lead
if lead == raft.None {
Expand Down Expand Up @@ -3841,7 +3841,7 @@ func TestReplicaTooOldGC(t *testing.T) {
} else if replica != nil {
// Make sure the replica is unquiesced so that it will tick and
// contact the leader to discover it's no longer part of the range.
replica.MaybeUnquiesceAndWakeLeader()
replica.MaybeUnquiesce()
}
return errors.Errorf("found %s, waiting for it to be GC'd", replica)
})
Expand Down Expand Up @@ -6415,11 +6415,12 @@ func TestRaftCheckQuorum(t *testing.T) {
require.Equal(t, raft.StateLeader, initialStatus.RaftState)
logStatus(initialStatus)

// Unquiesce the leader if necessary. We have to use AndWakeLeader to
// submit a proposal, otherwise it will immediately quiesce again without
// ticking.
// Unquiesce the leader if necessary. We have to do so by submitting an
// empty proposal, otherwise the leader will immediately quiesce again.
if quiesce {
require.True(t, repl1.MaybeUnquiesceAndWakeLeader())
ok, err := repl1.MaybeUnquiesceAndPropose()
require.NoError(t, err)
require.True(t, ok)
t.Logf("n1 unquiesced")
} else {
require.False(t, repl1.IsQuiescent())
Expand Down Expand Up @@ -6582,3 +6583,109 @@ func TestRaftLeaderRemovesItself(t *testing.T) {
return false
}, 10*time.Second, 500*time.Millisecond)
}

// TestRaftUnquiesceLeaderNoProposal tests that unquiescing a Raft leader does
// not result in a proposal, since this is unnecessary and expensive.
func TestRaftUnquiesceLeaderNoProposal(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

// Timing-sensitive, so skip under deadlock detector and stressrace.
skip.UnderDeadlock(t)
skip.UnderStressRace(t)

ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second)
defer cancel()

// Disable lease extensions and expiration-based lease transfers,
// since these cause range writes and prevent quiescence.
st := cluster.MakeTestingClusterSettings()
kvserver.TransferExpirationLeasesFirstEnabled.Override(ctx, &st.SV, false)
kvserver.ExpirationLeasesOnly.Override(ctx, &st.SV, false)

// Block writes to the range, to prevent spurious proposals (typically due to
// txn record GC).
var blockRange atomic.Int64
reqFilter := func(ctx context.Context, ba *kvpb.BatchRequest) *kvpb.Error {
if rangeID := roachpb.RangeID(blockRange.Load()); rangeID > 0 && rangeID == ba.RangeID {
t.Logf("r%d write rejected: %s", rangeID, ba)
return kvpb.NewError(errors.New("rejected"))
}
return nil
}

tc := testcluster.StartTestCluster(t, 3, base.TestClusterArgs{
ReplicationMode: base.ReplicationManual,
ServerArgs: base.TestServerArgs{
Settings: st,
RaftConfig: base.RaftConfig{
RaftTickInterval: 100 * time.Millisecond, // speed up test
},
Knobs: base.TestingKnobs{
Store: &kvserver.StoreTestingKnobs{
TestingRequestFilter: reqFilter,
},
},
},
})
defer tc.Stopper().Stop(ctx)

logStatus := func(s *raft.Status) {
t.Helper()
require.NotNil(t, s)
t.Logf("n%d %s at term=%d commit=%d", s.ID, s.RaftState, s.Term, s.Commit)
}

sender := tc.GetFirstStoreFromServer(t, 0).TestSender()

// Create a range, upreplicate it, and replicate a write.
key := tc.ScratchRange(t)
desc := tc.AddVotersOrFatal(t, key, tc.Targets(1, 2)...)
_, pErr := kv.SendWrapped(ctx, sender, incrementArgs(key, 1))
require.NoError(t, pErr.GoError())
tc.WaitForValues(t, key, []int64{1, 1, 1})

repl1, err := tc.GetFirstStoreFromServer(t, 0).GetReplica(desc.RangeID)
require.NoError(t, err)
repl2, err := tc.GetFirstStoreFromServer(t, 1).GetReplica(desc.RangeID)
require.NoError(t, err)
repl3, err := tc.GetFirstStoreFromServer(t, 2).GetReplica(desc.RangeID)
require.NoError(t, err)
repls := []*kvserver.Replica{repl1, repl2, repl3}

// Block writes.
blockRange.Store(int64(desc.RangeID))
defer blockRange.Store(0)

// Wait for the range to quiesce.
require.Eventually(t, func() bool {
for _, repl := range repls {
if !repl.IsQuiescent() {
return false
}
}
return true
}, 10*time.Second, 100*time.Millisecond)
t.Logf("range quiesced")

// Make sure n1 is still leader.
initialStatus := repl1.RaftStatus()
require.Equal(t, raft.StateLeader, initialStatus.RaftState)
logStatus(initialStatus)
t.Logf("n1 leader")

// Unquiesce n1. This may result in it immediately quiescing again, which is
// fine, but it shouldn't submit a proposal to wake up the followers.
require.True(t, repl1.MaybeUnquiesce())
t.Logf("n1 unquiesced")

require.Eventually(t, repl1.IsQuiescent, 10*time.Second, 100*time.Millisecond)
t.Logf("n1 quiesced")

status := repl1.RaftStatus()
logStatus(status)
require.Equal(t, raft.StateLeader, status.RaftState)
require.Equal(t, initialStatus.Term, status.Term)
require.Equal(t, initialStatus.Progress[1].Match, status.Progress[1].Match)
t.Logf("n1 still leader with no new proposals at log index %d", status.Progress[1].Match)
}
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/client_replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3463,7 +3463,7 @@ func TestReplicaTombstone(t *testing.T) {
})
tc.RemoveVotersOrFatal(t, key, tc.Target(2))
testutils.SucceedsSoon(t, func() error {
repl.MaybeUnquiesceAndWakeLeader()
repl.MaybeUnquiesce()
if len(sawTooOld) == 0 {
return errors.New("still haven't seen ReplicaTooOldError")
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/kv/kvserver/flow_control_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1975,7 +1975,8 @@ ORDER BY name ASC;

h.comment(`-- (Unquiesce the range.)`)
testutils.SucceedsSoon(t, func() error {
tc.GetRaftLeader(t, roachpb.RKey(k)).MaybeUnquiesceAndWakeLeader()
_, err := tc.GetRaftLeader(t, roachpb.RKey(k)).MaybeUnquiesceAndPropose()
require.NoError(t, err)
return h.checkAllTokensReturned(ctx, 3)
})

Expand Down
19 changes: 14 additions & 5 deletions pkg/kv/kvserver/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -495,15 +495,24 @@ func (r *Replica) GetQueueLastProcessed(ctx context.Context, queue string) (hlc.
}

func (r *Replica) MaybeUnquiesce() bool {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The semantics of MaybeUnquiesce changed. Were there any callers who wanted the old behaviour (wakeLeader=mayCampaign=false)?

Copy link
Contributor Author

@erikgrinaker erikgrinaker Jun 20, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The tests don't care -- explained in the commit message:

A couple of tests have been changed to now wake the leader when unquiescing, but this has no bearing on the tests.

In case it wasn't clear, this is a test-only file, helpers_test.go.

r.mu.Lock()
defer r.mu.Unlock()
return r.maybeUnquiesceWithOptionsLocked(false /* campaignOnWake */)
return r.maybeUnquiesce(true /* wakeLeader */, true /* mayCampaign */)
}

func (r *Replica) MaybeUnquiesceAndWakeLeader() bool {
// MaybeUnquiesceAndPropose will unquiesce the range and submit a noop proposal.
// This is useful when unquiescing the leader and wanting to also unquiesce
// followers, since the leader may otherwise simply quiesce again immediately.
func (r *Replica) MaybeUnquiesceAndPropose() (bool, error) {
r.mu.Lock()
defer r.mu.Unlock()
return r.maybeUnquiesceAndWakeLeaderLocked()
if !r.canUnquiesceRLocked() {
return false, nil
}
return true, r.withRaftGroupLocked(false, func(r *raft.RawNode) (bool, error) {
if err := r.Propose(nil); err != nil {
return false, err
}
return true /* unquiesceAndWakeLeader */, nil
})
}

func (r *Replica) ReadCachedProtectedTS() (readAt, earliestProtectionTimestamp hlc.Timestamp) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -2065,7 +2065,7 @@ func (r *Replica) maybeWatchForMergeLocked(ctx context.Context) (bool, error) {
// orphaned followers would fail to queue themselves for GC.) Unquiesce the
// range in case it managed to quiesce between when the Subsume request
// arrived and now, which is rare but entirely legal.
r.maybeUnquiesceLocked()
r.maybeUnquiesceLocked(false /* wakeLeader */, true /* mayCampaign */)

taskCtx := r.AnnotateCtx(context.Background())
err = r.store.stopper.RunAsyncTask(taskCtx, "wait-for-merge", func(ctx context.Context) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/replica_proposal_buf.go
Original file line number Diff line number Diff line change
Expand Up @@ -1253,7 +1253,7 @@ func (rp *replicaProposer) withGroupLocked(fn func(raftGroup proposerRaft) error
return (*Replica)(rp).withRaftGroupLocked(true, func(raftGroup *raft.RawNode) (bool, error) {
// We're proposing a command here so there is no need to wake the leader
// if we were quiesced. However, we should make sure we are unquiesced.
(*Replica)(rp).maybeUnquiesceLocked()
(*Replica)(rp).maybeUnquiesceLocked(false /* wakeLeader */, true /* mayCampaign */)
return false /* maybeUnquiesceLocked */, fn(raftGroup)
})
}
Expand Down
9 changes: 5 additions & 4 deletions pkg/kv/kvserver/replica_raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -668,13 +668,14 @@ func (r *Replica) stepRaftGroup(req *kvserverpb.RaftMessageRequest) error {
st := r.raftBasicStatusRLocked()
hasLeader := st.RaftState == raft.StateFollower && st.Lead != 0
fromLeader := uint64(req.FromReplica.ReplicaID) == st.Lead

var wakeLeader, mayCampaign bool
if hasLeader && !fromLeader {
// TODO(erikgrinaker): This is likely to result in election ties, find
// some way to avoid that.
r.maybeUnquiesceAndWakeLeaderLocked()
} else {
r.maybeUnquiesceWithOptionsLocked(false /* campaignOnWake */)
wakeLeader, mayCampaign = true, true
}
r.maybeUnquiesceLocked(wakeLeader, mayCampaign)
}
r.mu.lastUpdateTimes.update(req.FromReplica.ReplicaID, timeutil.Now())
if req.Message.Type == raftpb.MsgSnap {
Expand Down Expand Up @@ -2117,7 +2118,7 @@ func (r *Replica) withRaftGroupLocked(
unquiesce = true
}
if unquiesce {
r.maybeUnquiesceAndWakeLeaderLocked()
r.maybeUnquiesceLocked(true /* wakeLeader */, true /* mayCampaign */)
}
return err
}
Expand Down
82 changes: 39 additions & 43 deletions pkg/kv/kvserver/replica_raft_quiesce.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,17 +52,27 @@ func (r *Replica) quiesceLocked(ctx context.Context, lagging laggingReplicaSet)
}
}

func (r *Replica) maybeUnquiesce() bool {
// maybeUnquiesce unquiesces the replica if it is quiesced and can be
// unquiesced, returning true in that case. See maybeUnquiesceLocked() for
// details.
func (r *Replica) maybeUnquiesce(wakeLeader, mayCampaign bool) bool {
r.mu.Lock()
defer r.mu.Unlock()
return r.maybeUnquiesceLocked()
return r.maybeUnquiesceLocked(wakeLeader, mayCampaign)
}

func (r *Replica) maybeUnquiesceLocked() bool {
return r.maybeUnquiesceWithOptionsLocked(true /* campaignOnWake */)
}

func (r *Replica) maybeUnquiesceWithOptionsLocked(campaignOnWake bool) bool {
// maybeUnquiesceLocked unquiesces the replica if it is quiesced and can be
// unquiesced, returning true in that case.
//
// If wakeLeader is true, wake the leader by proposing an empty command. Should
// typically be true, unless e.g. the caller is either about to propose a
// command anyway, or it knows the leader is awake because it received a message
// from it.
//
// If mayCampaign is true, the replica may campaign if appropriate. This will
// respect PreVote and CheckQuorum, and thus won't disrupt a current leader.
// Should typically be true, unless the caller wants to avoid election ties.
func (r *Replica) maybeUnquiesceLocked(wakeLeader, mayCampaign bool) bool {
if !r.canUnquiesceRLocked() {
return false
}
Expand All @@ -75,34 +85,29 @@ func (r *Replica) maybeUnquiesceWithOptionsLocked(campaignOnWake bool) bool {
r.store.unquiescedReplicas.Lock()
r.store.unquiescedReplicas.m[r.RangeID] = struct{}{}
r.store.unquiescedReplicas.Unlock()
if campaignOnWake {
r.maybeCampaignOnWakeLocked(ctx)
}
// NB: we know there's a non-nil RaftStatus because internalRaftGroup isn't nil.
r.mu.lastUpdateTimes.updateOnUnquiesce(
r.mu.state.Desc.Replicas().Descriptors(), r.raftSparseStatusRLocked().Progress, timeutil.Now(),
)
return true
}

func (r *Replica) maybeUnquiesceAndWakeLeaderLocked() bool {
if !r.canUnquiesceRLocked() {
return false
st := r.raftSparseStatusRLocked()
if st.RaftState == raft.StateLeader {
r.mu.lastUpdateTimes.updateOnUnquiesce(
r.mu.state.Desc.Replicas().Descriptors(), st.Progress, timeutil.Now())

} else if st.RaftState == raft.StateFollower && wakeLeader {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not in StateCandidate? Waking the leader doesn't make sense because the candidate "doesn't know" the leader?

Copy link
Contributor Author

@erikgrinaker erikgrinaker Jun 20, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct, a candidate has no leader, and won't send proposals.

// Propose an empty command which will wake the leader.
if log.V(3) {
log.Infof(ctx, "waking %d leader", r.RangeID)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about "r%d" instead of "%d". It's easier to search by "rNNN" in logs when investigating, this avoids false positives.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was following the convention of the existing log messages here, didn't particularly feel like updating a bunch of log messages. But I can do a pass if there aren't that many to update.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tacked on a commit.

}
data := raftlog.EncodeRaftCommand(raftlog.EntryEncodingStandardWithoutAC, makeIDKey(), nil)
_ = r.mu.internalRaftGroup.Propose(data)
r.mu.lastProposalAtTicks = r.mu.ticks // delay imminent quiescence
}
ctx := r.AnnotateCtx(context.TODO())
if log.V(3) {
log.Infof(ctx, "unquiescing %d: waking leader", r.RangeID)

// NB: campaign after attempting to wake leader, since we won't send the
// proposal in candidate state. This gives it a chance to assert leadership if
// we're wrong about it being dead.
if mayCampaign {
r.maybeCampaignOnWakeLocked(ctx)
}
r.mu.quiescent = false
r.mu.laggingFollowersOnQuiesce = nil
r.store.unquiescedReplicas.Lock()
r.store.unquiescedReplicas.m[r.RangeID] = struct{}{}
r.store.unquiescedReplicas.Unlock()
r.maybeCampaignOnWakeLocked(ctx)
// Propose an empty command which will wake the leader.
data := raftlog.EncodeRaftCommand(raftlog.EntryEncodingStandardWithoutAC, makeIDKey(), nil)
_ = r.mu.internalRaftGroup.Propose(data)
r.mu.lastProposalAtTicks = r.mu.ticks // delay imminent quiescence

return true
}

Expand Down Expand Up @@ -184,14 +189,6 @@ func (r *Replica) canUnquiesceRLocked() bool {
// are behind, whether or not they are live. If any entry in the livenessMap is
// nil, then the missing node ID is treated as live and will prevent the range
// from quiescing.
//
// TODO(peter): There remains a scenario in which a follower is left unquiesced
// while the leader is quiesced: the follower's receive queue is full and the
// "quiesce" message is dropped. This seems very very unlikely because if the
// follower isn't keeping up with raft messages it is unlikely that the leader
// would quiesce. The fallout from this situation are undesirable raft
// elections which will cause throughput hiccups to the range, but not
// correctness issues.
func (r *Replica) maybeQuiesceRaftMuLockedReplicaMuLocked(
ctx context.Context, leaseStatus kvserverpb.LeaseStatus, livenessMap livenesspb.IsLiveMap,
) bool {
Expand Down Expand Up @@ -468,13 +465,12 @@ func (r *Replica) quiesceAndNotifyRaftMuLockedReplicaMuLocked(
if roachpb.ReplicaID(id) == r.replicaID {
continue
}
toReplica, toErr := r.getReplicaDescriptorByIDRLocked(
roachpb.ReplicaID(id), lastFromReplica)
toReplica, toErr := r.getReplicaDescriptorByIDRLocked(roachpb.ReplicaID(id), lastFromReplica)
if toErr != nil {
if log.V(4) {
log.Infof(ctx, "failed to quiesce: cannot find to replica (%d)", id)
}
r.maybeUnquiesceLocked()
r.maybeUnquiesceLocked(false /* wakeLeader */, false /* mayCampaign */) // already leader
return false
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/store_raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -705,7 +705,7 @@ func (s *Store) nodeIsLiveCallback(l livenesspb.Liveness) {
lagging := r.mu.laggingFollowersOnQuiesce
r.mu.RUnlock()
if quiescent && lagging.MemberStale(l) {
r.maybeUnquiesce()
r.maybeUnquiesce(false /* wakeLeader */, false /* mayCampaign */) // already leader
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we know we're already leader here?

Copy link
Contributor Author

@erikgrinaker erikgrinaker Jun 20, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because laggingFollowersOnQuiesce is only non-nil on the leader. Conceptually, only the leader keeps track of lagging followers, because only leaders have followers.

}
})
}
Expand Down