diff --git a/pkg/kv/kvserver/replica.go b/pkg/kv/kvserver/replica.go index 36f172c014a9..36f8fa444a0f 100644 --- a/pkg/kv/kvserver/replica.go +++ b/pkg/kv/kvserver/replica.go @@ -842,6 +842,13 @@ func (r *Replica) raftStatusRLocked() *raft.Status { return nil } +func (r *Replica) raftBasicStatusRLocked() raft.BasicStatus { + if rg := r.mu.internalRaftGroup; rg != nil { + return rg.BasicStatus() + } + return raft.BasicStatus{} +} + // State returns a copy of the internal state of the Replica, along with some // auxiliary information. func (r *Replica) State() storagepb.RangeInfo { diff --git a/pkg/kv/kvserver/replica_proposal_buf.go b/pkg/kv/kvserver/replica_proposal_buf.go index 0053ed91f175..0e13e3810d49 100644 --- a/pkg/kv/kvserver/replica_proposal_buf.go +++ b/pkg/kv/kvserver/replica_proposal_buf.go @@ -346,7 +346,8 @@ func (b *propBuf) flushRLocked() error { func (b *propBuf) flushLocked() error { return b.p.withGroupLocked(func(raftGroup *raft.RawNode) error { - return b.FlushLockedWithRaftGroup(raftGroup) + _, err := b.FlushLockedWithRaftGroup(raftGroup) + return err }) } @@ -356,7 +357,9 @@ func (b *propBuf) flushLocked() error { // // If raftGroup is non-nil (the common case) then the commands will also be // proposed to the RawNode. This initiates Raft replication of the commands. -func (b *propBuf) FlushLockedWithRaftGroup(raftGroup *raft.RawNode) error { +// +// Returns the number of proposals handed to the RawNode. +func (b *propBuf) FlushLockedWithRaftGroup(raftGroup *raft.RawNode) (int, error) { // Before returning, make sure to forward the lease index base to at least // the proposer's currently applied lease index. This ensures that if the // lease applied index advances outside of this proposer's control (i.e. @@ -374,7 +377,7 @@ func (b *propBuf) FlushLockedWithRaftGroup(raftGroup *raft.RawNode) error { defer b.arr.adjustSize(used) if used == 0 { // The buffer is empty. Nothing to do. - return nil + return 0, nil } else if used > b.arr.len() { // The buffer is full and at least one writer has tried to allocate // on top of the full buffer, so notify them to try again. @@ -481,9 +484,9 @@ func (b *propBuf) FlushLockedWithRaftGroup(raftGroup *raft.RawNode) error { } } if firstErr != nil { - return firstErr + return 0, firstErr } - return proposeBatch(raftGroup, b.p.replicaID(), ents) + return used, proposeBatch(raftGroup, b.p.replicaID(), ents) } func (b *propBuf) forwardLeaseIndexBase(v uint64) { @@ -521,7 +524,7 @@ func proposeBatch(raftGroup *raft.RawNode, replID roachpb.ReplicaID, ents []raft // The representative example of this is a caller that wants to flush the buffer // into the proposals map before canceling all proposals. func (b *propBuf) FlushLockedWithoutProposing() { - if err := b.FlushLockedWithRaftGroup(nil /* raftGroup */); err != nil { + if _, err := b.FlushLockedWithRaftGroup(nil /* raftGroup */); err != nil { log.Fatalf(context.Background(), "unexpected error: %+v", err) } } diff --git a/pkg/kv/kvserver/replica_raft.go b/pkg/kv/kvserver/replica_raft.go index 58ea2a94e002..c413d1d1a15c 100644 --- a/pkg/kv/kvserver/replica_raft.go +++ b/pkg/kv/kvserver/replica_raft.go @@ -411,13 +411,27 @@ func (r *Replica) handleRaftReadyRaftMuLocked( leaderID := r.mu.leaderID lastLeaderID := leaderID err := r.withRaftGroupLocked(true, func(raftGroup *raft.RawNode) (bool, error) { - if err := r.mu.proposalBuf.FlushLockedWithRaftGroup(raftGroup); err != nil { + numFlushed, err := r.mu.proposalBuf.FlushLockedWithRaftGroup(raftGroup) + if err != nil { return false, err } if hasReady = raftGroup.HasReady(); hasReady { rd = raftGroup.Ready() } - return hasReady /* unquiesceAndWakeLeader */, nil + // We unquiesce if we have a Ready (= there's work to do). We also have + // to unquiesce if we just flushed some proposals but there isn't a + // Ready, which can happen if the proposals got dropped (raft does this + // if it doesn't know who the leader is). And, for extra defense in depth, + // we also unquiesce if there are outstanding proposals. + // + // NB: if we had the invariant that the group can only be in quiesced + // state if it knows the leader (state.Lead) AND we knew that raft would + // never give us an empty ready here (i.e. the only reason to drop a + // proposal is not knowing the leader) then numFlushed would not be + // necessary. The latter is likely true but we don't want to rely on + // it. The former is maybe true, but there's no easy way to enforce it. + unquiesceAndWakeLeader := hasReady || numFlushed > 0 || len(r.mu.proposals) > 0 + return unquiesceAndWakeLeader, nil }) r.mu.Unlock() if err == errRemoved { @@ -1336,6 +1350,26 @@ func (r *Replica) withRaftGroupLocked( unquiesce, err := func(rangeID roachpb.RangeID, raftGroup *raft.RawNode) (bool, error) { return f(raftGroup) }(r.RangeID, r.mu.internalRaftGroup) + if r.mu.internalRaftGroup.BasicStatus().Lead == 0 { + // If we don't know the leader, unquiesce unconditionally. As a + // follower, we can't wake up the leader if we don't know who that is, + // so we should find out now before someone needs us to unquiesce. + // + // This situation should occur rarely or never (ever since we got + // stricter about validating incoming Quiesce requests) but it's good + // defense-in-depth. + // + // Note that unquiesceAndWakeLeaderLocked won't manage to wake up the + // leader since it's unknown to this replica, and at the time of writing + // the heuristics for campaigning are defensive (won't campaign if there + // is a live leaseholder). But if we are trying to unquiesce because + // this follower was asked to propose something, then this means that a + // request is going to have to wait until the leader next contacts us, + // or, in the worst case, an election timeout. This is not ideal - if a + // node holds a live lease, we should direct the client to it + // immediately. + unquiesce = true + } if unquiesce { r.unquiesceAndWakeLeaderLocked() } diff --git a/pkg/kv/kvserver/replica_raft_quiesce.go b/pkg/kv/kvserver/replica_raft_quiesce.go index 6774da29573b..410f69ba9600 100644 --- a/pkg/kv/kvserver/replica_raft_quiesce.go +++ b/pkg/kv/kvserver/replica_raft_quiesce.go @@ -21,14 +21,6 @@ import ( "go.etcd.io/etcd/raft/raftpb" ) -// mark the replica as quiesced. Returns true if the Replica is successfully -// quiesced and false otherwise. -func (r *Replica) quiesce() bool { - r.mu.Lock() - defer r.mu.Unlock() - return r.quiesceLocked() -} - func (r *Replica) quiesceLocked() bool { ctx := r.AnnotateCtx(context.TODO()) if r.hasPendingProposalsRLocked() { diff --git a/pkg/kv/kvserver/store_raft.go b/pkg/kv/kvserver/store_raft.go index 4703f7cae740..04dfab60600a 100644 --- a/pkg/kv/kvserver/store_raft.go +++ b/pkg/kv/kvserver/store_raft.go @@ -218,11 +218,23 @@ func (s *Store) processRaftRequestWithReplica( if req.Message.Type != raftpb.MsgHeartbeat { log.Fatalf(ctx, "unexpected quiesce: %+v", req) } - status := r.RaftStatus() - if status != nil && status.Term == req.Message.Term && status.Commit == req.Message.Commit { - if r.quiesce() { - return nil - } + // If another replica tells us to quiesce, we verify that according to + // it, we are fully caught up, and that we believe it to be the leader. + // If we didn't do this, this replica could only unquiesce by means of + // an election, which means that the request prompting the unquiesce + // would end up with latency on the order of an election timeout. + // + // There are additional checks in quiesceLocked() that prevent us from + // quiescing if there's outstanding work. + r.mu.Lock() + status := r.raftBasicStatusRLocked() + ok := status.Term == req.Message.Term && + status.Commit == req.Message.Commit && + status.Lead == req.Message.From && + r.quiesceLocked() + r.mu.Unlock() + if ok { + return nil } if log.V(4) { log.Infof(ctx, "not quiescing: local raft status is %+v, incoming quiesce message is %+v", status, req.Message)