From 1f9586007a242bd783dfcd0ae62da13cf62f6bed Mon Sep 17 00:00:00 2001 From: Tobias Schottdorf Date: Mon, 16 Mar 2020 23:14:38 +0100 Subject: [PATCH] kvserver: avoid hanging proposal after leader goes down There was a bug in range quiescence due to which commands would hang in raft for minutes before actually getting replicated. This would occur whenever a range was quiesced but a follower replica which didn't know the (Raft) leader would receive a request. This request would be evaluated and put into the Raft proposal buffer, and a ready check would be enqueued. However, no ready would be produced (since the proposal got dropped by raft; leader unknown) and so the replica would not unquiesce. This commit prevents this by always waking up the group if the proposal buffer was initially nonempty, even if an empty Ready is produced. It goes further than that by trying to ensure that a leader is always known while quiesced. Previously, on an incoming request to quiesce, we did not verify that the raft group had learned the leader's identity. One shortcoming here is that in the situation in which the proposal would originally hang "forever", it will now hang for one heartbeat timeout where ideally it would be proposed more reactively. Since this is so rare I didn't try to address this. Instead, refer to the ideas in https://github.com/cockroachdb/cockroach/issues/37906#issuecomment-529041377 and https://github.com/cockroachdb/cockroach/issues/21849 for future changes that could mitigate this. Without this PR, the test would fail around 10% of the time. With this change, it passed 40 iterations in a row without a hitch, via: ./bin/roachtest run -u tobias --count 40 --parallelism 10 --cpu-quota 1280 gossip/chaos/nodes=9 Release justification: bug fix Release note (bug fix): a rare case in which requests to a quiesced range could hang in the KV replication layer was fixed. This would manifest as a message saying "have been waiting ... for proposing" even though no loss of quorum occurred. --- pkg/kv/kvserver/replica.go | 7 +++++ pkg/kv/kvserver/replica_proposal_buf.go | 15 ++++++---- pkg/kv/kvserver/replica_raft.go | 38 +++++++++++++++++++++++-- pkg/kv/kvserver/replica_raft_quiesce.go | 8 ------ pkg/kv/kvserver/store_raft.go | 22 ++++++++++---- 5 files changed, 69 insertions(+), 21 deletions(-) diff --git a/pkg/kv/kvserver/replica.go b/pkg/kv/kvserver/replica.go index 36f172c014a9..36f8fa444a0f 100644 --- a/pkg/kv/kvserver/replica.go +++ b/pkg/kv/kvserver/replica.go @@ -842,6 +842,13 @@ func (r *Replica) raftStatusRLocked() *raft.Status { return nil } +func (r *Replica) raftBasicStatusRLocked() raft.BasicStatus { + if rg := r.mu.internalRaftGroup; rg != nil { + return rg.BasicStatus() + } + return raft.BasicStatus{} +} + // State returns a copy of the internal state of the Replica, along with some // auxiliary information. func (r *Replica) State() storagepb.RangeInfo { diff --git a/pkg/kv/kvserver/replica_proposal_buf.go b/pkg/kv/kvserver/replica_proposal_buf.go index 0053ed91f175..0e13e3810d49 100644 --- a/pkg/kv/kvserver/replica_proposal_buf.go +++ b/pkg/kv/kvserver/replica_proposal_buf.go @@ -346,7 +346,8 @@ func (b *propBuf) flushRLocked() error { func (b *propBuf) flushLocked() error { return b.p.withGroupLocked(func(raftGroup *raft.RawNode) error { - return b.FlushLockedWithRaftGroup(raftGroup) + _, err := b.FlushLockedWithRaftGroup(raftGroup) + return err }) } @@ -356,7 +357,9 @@ func (b *propBuf) flushLocked() error { // // If raftGroup is non-nil (the common case) then the commands will also be // proposed to the RawNode. This initiates Raft replication of the commands. -func (b *propBuf) FlushLockedWithRaftGroup(raftGroup *raft.RawNode) error { +// +// Returns the number of proposals handed to the RawNode. +func (b *propBuf) FlushLockedWithRaftGroup(raftGroup *raft.RawNode) (int, error) { // Before returning, make sure to forward the lease index base to at least // the proposer's currently applied lease index. This ensures that if the // lease applied index advances outside of this proposer's control (i.e. @@ -374,7 +377,7 @@ func (b *propBuf) FlushLockedWithRaftGroup(raftGroup *raft.RawNode) error { defer b.arr.adjustSize(used) if used == 0 { // The buffer is empty. Nothing to do. - return nil + return 0, nil } else if used > b.arr.len() { // The buffer is full and at least one writer has tried to allocate // on top of the full buffer, so notify them to try again. @@ -481,9 +484,9 @@ func (b *propBuf) FlushLockedWithRaftGroup(raftGroup *raft.RawNode) error { } } if firstErr != nil { - return firstErr + return 0, firstErr } - return proposeBatch(raftGroup, b.p.replicaID(), ents) + return used, proposeBatch(raftGroup, b.p.replicaID(), ents) } func (b *propBuf) forwardLeaseIndexBase(v uint64) { @@ -521,7 +524,7 @@ func proposeBatch(raftGroup *raft.RawNode, replID roachpb.ReplicaID, ents []raft // The representative example of this is a caller that wants to flush the buffer // into the proposals map before canceling all proposals. func (b *propBuf) FlushLockedWithoutProposing() { - if err := b.FlushLockedWithRaftGroup(nil /* raftGroup */); err != nil { + if _, err := b.FlushLockedWithRaftGroup(nil /* raftGroup */); err != nil { log.Fatalf(context.Background(), "unexpected error: %+v", err) } } diff --git a/pkg/kv/kvserver/replica_raft.go b/pkg/kv/kvserver/replica_raft.go index 58ea2a94e002..c413d1d1a15c 100644 --- a/pkg/kv/kvserver/replica_raft.go +++ b/pkg/kv/kvserver/replica_raft.go @@ -411,13 +411,27 @@ func (r *Replica) handleRaftReadyRaftMuLocked( leaderID := r.mu.leaderID lastLeaderID := leaderID err := r.withRaftGroupLocked(true, func(raftGroup *raft.RawNode) (bool, error) { - if err := r.mu.proposalBuf.FlushLockedWithRaftGroup(raftGroup); err != nil { + numFlushed, err := r.mu.proposalBuf.FlushLockedWithRaftGroup(raftGroup) + if err != nil { return false, err } if hasReady = raftGroup.HasReady(); hasReady { rd = raftGroup.Ready() } - return hasReady /* unquiesceAndWakeLeader */, nil + // We unquiesce if we have a Ready (= there's work to do). We also have + // to unquiesce if we just flushed some proposals but there isn't a + // Ready, which can happen if the proposals got dropped (raft does this + // if it doesn't know who the leader is). And, for extra defense in depth, + // we also unquiesce if there are outstanding proposals. + // + // NB: if we had the invariant that the group can only be in quiesced + // state if it knows the leader (state.Lead) AND we knew that raft would + // never give us an empty ready here (i.e. the only reason to drop a + // proposal is not knowing the leader) then numFlushed would not be + // necessary. The latter is likely true but we don't want to rely on + // it. The former is maybe true, but there's no easy way to enforce it. + unquiesceAndWakeLeader := hasReady || numFlushed > 0 || len(r.mu.proposals) > 0 + return unquiesceAndWakeLeader, nil }) r.mu.Unlock() if err == errRemoved { @@ -1336,6 +1350,26 @@ func (r *Replica) withRaftGroupLocked( unquiesce, err := func(rangeID roachpb.RangeID, raftGroup *raft.RawNode) (bool, error) { return f(raftGroup) }(r.RangeID, r.mu.internalRaftGroup) + if r.mu.internalRaftGroup.BasicStatus().Lead == 0 { + // If we don't know the leader, unquiesce unconditionally. As a + // follower, we can't wake up the leader if we don't know who that is, + // so we should find out now before someone needs us to unquiesce. + // + // This situation should occur rarely or never (ever since we got + // stricter about validating incoming Quiesce requests) but it's good + // defense-in-depth. + // + // Note that unquiesceAndWakeLeaderLocked won't manage to wake up the + // leader since it's unknown to this replica, and at the time of writing + // the heuristics for campaigning are defensive (won't campaign if there + // is a live leaseholder). But if we are trying to unquiesce because + // this follower was asked to propose something, then this means that a + // request is going to have to wait until the leader next contacts us, + // or, in the worst case, an election timeout. This is not ideal - if a + // node holds a live lease, we should direct the client to it + // immediately. + unquiesce = true + } if unquiesce { r.unquiesceAndWakeLeaderLocked() } diff --git a/pkg/kv/kvserver/replica_raft_quiesce.go b/pkg/kv/kvserver/replica_raft_quiesce.go index 6774da29573b..410f69ba9600 100644 --- a/pkg/kv/kvserver/replica_raft_quiesce.go +++ b/pkg/kv/kvserver/replica_raft_quiesce.go @@ -21,14 +21,6 @@ import ( "go.etcd.io/etcd/raft/raftpb" ) -// mark the replica as quiesced. Returns true if the Replica is successfully -// quiesced and false otherwise. -func (r *Replica) quiesce() bool { - r.mu.Lock() - defer r.mu.Unlock() - return r.quiesceLocked() -} - func (r *Replica) quiesceLocked() bool { ctx := r.AnnotateCtx(context.TODO()) if r.hasPendingProposalsRLocked() { diff --git a/pkg/kv/kvserver/store_raft.go b/pkg/kv/kvserver/store_raft.go index 4703f7cae740..04dfab60600a 100644 --- a/pkg/kv/kvserver/store_raft.go +++ b/pkg/kv/kvserver/store_raft.go @@ -218,11 +218,23 @@ func (s *Store) processRaftRequestWithReplica( if req.Message.Type != raftpb.MsgHeartbeat { log.Fatalf(ctx, "unexpected quiesce: %+v", req) } - status := r.RaftStatus() - if status != nil && status.Term == req.Message.Term && status.Commit == req.Message.Commit { - if r.quiesce() { - return nil - } + // If another replica tells us to quiesce, we verify that according to + // it, we are fully caught up, and that we believe it to be the leader. + // If we didn't do this, this replica could only unquiesce by means of + // an election, which means that the request prompting the unquiesce + // would end up with latency on the order of an election timeout. + // + // There are additional checks in quiesceLocked() that prevent us from + // quiescing if there's outstanding work. + r.mu.Lock() + status := r.raftBasicStatusRLocked() + ok := status.Term == req.Message.Term && + status.Commit == req.Message.Commit && + status.Lead == req.Message.From && + r.quiesceLocked() + r.mu.Unlock() + if ok { + return nil } if log.V(4) { log.Infof(ctx, "not quiescing: local raft status is %+v, incoming quiesce message is %+v", status, req.Message)