diff --git a/pkg/cmd/roachtest/gossip.go b/pkg/cmd/roachtest/gossip.go index e097a6ce5593..aaca4c4f6d5c 100644 --- a/pkg/cmd/roachtest/gossip.go +++ b/pkg/cmd/roachtest/gossip.go @@ -32,8 +32,10 @@ import ( func registerGossip(r *testRegistry) { runGossipChaos := func(ctx context.Context, t *test, c *cluster) { + args := startArgs("--args=--vmodule=*=1") c.Put(ctx, cockroach, "./cockroach", c.All()) - c.Start(ctx, t, c.All()) + c.Start(ctx, t, c.All(), args) + waitForFullReplication(t, c.Conn(ctx, 1)) gossipNetwork := func(node int) string { const query = ` @@ -65,6 +67,7 @@ SELECT string_agg(source_id::TEXT || ':' || target_id::TEXT, ',') if i == deadNode { continue } + c.l.Printf("%d: checking gossip\n", i) s := gossipNetwork(i) if !initialized { deadNodeStr := fmt.Sprint(deadNode) @@ -88,7 +91,7 @@ SELECT string_agg(source_id::TEXT || ':' || target_id::TEXT, ',') return false } } - fmt.Printf("gossip ok: %s (%0.0fs)\n", expected, timeutil.Since(start).Seconds()) + c.l.Printf("gossip ok: %s (%0.0fs)\n", expected, timeutil.Since(start).Seconds()) return true } @@ -109,7 +112,7 @@ SELECT string_agg(source_id::TEXT || ':' || target_id::TEXT, ',') deadNode = nodes.randNode()[0] c.Stop(ctx, c.Node(deadNode)) waitForGossip() - c.Start(ctx, t, c.Node(deadNode)) + c.Start(ctx, t, c.Node(deadNode), args) } } diff --git a/pkg/cmd/roachtest/test.go b/pkg/cmd/roachtest/test.go index 30b63a804e55..688974c3c5bf 100644 --- a/pkg/cmd/roachtest/test.go +++ b/pkg/cmd/roachtest/test.go @@ -196,7 +196,7 @@ func (t *test) GetStatus() string { defer t.mu.Unlock() status, ok := t.mu.status[t.runnerID] if ok { - return fmt.Sprintf("%s (set %s ago)", status.msg, timeutil.Now().Sub(status.time)) + return fmt.Sprintf("%s (set %s ago)", status.msg, timeutil.Now().Sub(status.time).Round(time.Second)) } return "N/A" } diff --git a/pkg/kv/kvserver/replica.go b/pkg/kv/kvserver/replica.go index 644cbb0dd2c2..52a1a6247a72 100644 --- a/pkg/kv/kvserver/replica.go +++ b/pkg/kv/kvserver/replica.go @@ -315,6 +315,10 @@ type Replica struct { // evaluation and is consumed by the Raft processing thread. Once // consumed, commands are proposed through Raft and moved to the // proposals map. + // + // Access to proposalBuf must occur *without* holding the mutex. + // Instead, the buffer internally holds a reference to mu and will use + // it appropriately. proposalBuf propBuf // proposals stores the Raft in-flight commands which originated at // this Replica, i.e. all commands for which propose has been called, @@ -898,6 +902,13 @@ func (r *Replica) raftStatusRLocked() *raft.Status { return nil } +func (r *Replica) raftBasicStatusRLocked() raft.BasicStatus { + if rg := r.mu.internalRaftGroup; rg != nil { + return rg.BasicStatus() + } + return raft.BasicStatus{} +} + // State returns a copy of the internal state of the Replica, along with some // auxiliary information. func (r *Replica) State() storagepb.RangeInfo { diff --git a/pkg/kv/kvserver/replica_proposal_buf.go b/pkg/kv/kvserver/replica_proposal_buf.go index 0053ed91f175..0e13e3810d49 100644 --- a/pkg/kv/kvserver/replica_proposal_buf.go +++ b/pkg/kv/kvserver/replica_proposal_buf.go @@ -346,7 +346,8 @@ func (b *propBuf) flushRLocked() error { func (b *propBuf) flushLocked() error { return b.p.withGroupLocked(func(raftGroup *raft.RawNode) error { - return b.FlushLockedWithRaftGroup(raftGroup) + _, err := b.FlushLockedWithRaftGroup(raftGroup) + return err }) } @@ -356,7 +357,9 @@ func (b *propBuf) flushLocked() error { // // If raftGroup is non-nil (the common case) then the commands will also be // proposed to the RawNode. This initiates Raft replication of the commands. -func (b *propBuf) FlushLockedWithRaftGroup(raftGroup *raft.RawNode) error { +// +// Returns the number of proposals handed to the RawNode. +func (b *propBuf) FlushLockedWithRaftGroup(raftGroup *raft.RawNode) (int, error) { // Before returning, make sure to forward the lease index base to at least // the proposer's currently applied lease index. This ensures that if the // lease applied index advances outside of this proposer's control (i.e. @@ -374,7 +377,7 @@ func (b *propBuf) FlushLockedWithRaftGroup(raftGroup *raft.RawNode) error { defer b.arr.adjustSize(used) if used == 0 { // The buffer is empty. Nothing to do. - return nil + return 0, nil } else if used > b.arr.len() { // The buffer is full and at least one writer has tried to allocate // on top of the full buffer, so notify them to try again. @@ -481,9 +484,9 @@ func (b *propBuf) FlushLockedWithRaftGroup(raftGroup *raft.RawNode) error { } } if firstErr != nil { - return firstErr + return 0, firstErr } - return proposeBatch(raftGroup, b.p.replicaID(), ents) + return used, proposeBatch(raftGroup, b.p.replicaID(), ents) } func (b *propBuf) forwardLeaseIndexBase(v uint64) { @@ -521,7 +524,7 @@ func proposeBatch(raftGroup *raft.RawNode, replID roachpb.ReplicaID, ents []raft // The representative example of this is a caller that wants to flush the buffer // into the proposals map before canceling all proposals. func (b *propBuf) FlushLockedWithoutProposing() { - if err := b.FlushLockedWithRaftGroup(nil /* raftGroup */); err != nil { + if _, err := b.FlushLockedWithRaftGroup(nil /* raftGroup */); err != nil { log.Fatalf(context.Background(), "unexpected error: %+v", err) } } diff --git a/pkg/kv/kvserver/replica_raft.go b/pkg/kv/kvserver/replica_raft.go index 53a3909de3ea..7570d70cb543 100644 --- a/pkg/kv/kvserver/replica_raft.go +++ b/pkg/kv/kvserver/replica_raft.go @@ -319,6 +319,9 @@ func (r *Replica) propose(ctx context.Context, p *ProposalData) (index int64, pE // Insert into the proposal buffer, which passes the command to Raft to be // proposed. The proposal buffer assigns the command a maximum lease index // when it sequences it. + // + // NB: we must not hold r.mu while using the proposal buffer, see comment + // on the field. maxLeaseIndex, err := r.mu.proposalBuf.Insert(p, data) if err != nil { return 0, roachpb.NewError(err) @@ -408,13 +411,27 @@ func (r *Replica) handleRaftReadyRaftMuLocked( leaderID := r.mu.leaderID lastLeaderID := leaderID err := r.withRaftGroupLocked(true, func(raftGroup *raft.RawNode) (bool, error) { - if err := r.mu.proposalBuf.FlushLockedWithRaftGroup(raftGroup); err != nil { + numFlushed, err := r.mu.proposalBuf.FlushLockedWithRaftGroup(raftGroup) + if err != nil { return false, err } if hasReady = raftGroup.HasReady(); hasReady { rd = raftGroup.Ready() } - return hasReady /* unquiesceAndWakeLeader */, nil + // We unquiesce if we have a Ready (= there's work to do). We also have + // to unquiesce if we just flushed some proposals but there isn't a + // Ready, which can happen if the proposals got dropped (raft does this + // if it doesn't know who the leader is). And, for extra defense in depth, + // we also unquiesce if there are outstanding proposals. + // + // NB: if we had the invariant that the group can only be in quiesced + // state if it knows the leader (state.Lead) AND we knew that raft would + // never give us an empty ready here (i.e. the only reason to drop a + // proposal is not knowing the leader) then numFlushed would not be + // necessary. The latter is likely true but we don't want to rely on + // it. The former is maybe true, but there's no easy way to enforce it. + unquiesceAndWakeLeader := hasReady || numFlushed > 0 || len(r.mu.proposals) > 0 + return unquiesceAndWakeLeader, nil }) r.mu.Unlock() if err == errRemoved { @@ -1333,6 +1350,26 @@ func (r *Replica) withRaftGroupLocked( unquiesce, err := func(rangeID roachpb.RangeID, raftGroup *raft.RawNode) (bool, error) { return f(raftGroup) }(r.RangeID, r.mu.internalRaftGroup) + if r.mu.internalRaftGroup.BasicStatus().Lead == 0 { + // If we don't know the leader, unquiesce unconditionally. As a + // follower, we can't wake up the leader if we don't know who that is, + // so we should find out now before someone needs us to unquiesce. + // + // This situation should occur rarely or never (ever since we got + // stricter about validating incoming Quiesce requests) but it's good + // defense-in-depth. + // + // Note that unquiesceAndWakeLeaderLocked won't manage to wake up the + // leader since it's unknown to this replica, and at the time of writing + // the heuristics for campaigning are defensive (won't campaign if there + // is a live leaseholder). But if we are trying to unquiesce because + // this follower was asked to propose something, then this means that a + // request is going to have to wait until the leader next contacts us, + // or, in the worst case, an election timeout. This is not ideal - if a + // node holds a live lease, we should direct the client to it + // immediately. + unquiesce = true + } if unquiesce { r.unquiesceAndWakeLeaderLocked() } diff --git a/pkg/kv/kvserver/replica_raft_quiesce.go b/pkg/kv/kvserver/replica_raft_quiesce.go index 6774da29573b..410f69ba9600 100644 --- a/pkg/kv/kvserver/replica_raft_quiesce.go +++ b/pkg/kv/kvserver/replica_raft_quiesce.go @@ -21,14 +21,6 @@ import ( "go.etcd.io/etcd/raft/raftpb" ) -// mark the replica as quiesced. Returns true if the Replica is successfully -// quiesced and false otherwise. -func (r *Replica) quiesce() bool { - r.mu.Lock() - defer r.mu.Unlock() - return r.quiesceLocked() -} - func (r *Replica) quiesceLocked() bool { ctx := r.AnnotateCtx(context.TODO()) if r.hasPendingProposalsRLocked() { diff --git a/pkg/kv/kvserver/store_raft.go b/pkg/kv/kvserver/store_raft.go index 4703f7cae740..04dfab60600a 100644 --- a/pkg/kv/kvserver/store_raft.go +++ b/pkg/kv/kvserver/store_raft.go @@ -218,11 +218,23 @@ func (s *Store) processRaftRequestWithReplica( if req.Message.Type != raftpb.MsgHeartbeat { log.Fatalf(ctx, "unexpected quiesce: %+v", req) } - status := r.RaftStatus() - if status != nil && status.Term == req.Message.Term && status.Commit == req.Message.Commit { - if r.quiesce() { - return nil - } + // If another replica tells us to quiesce, we verify that according to + // it, we are fully caught up, and that we believe it to be the leader. + // If we didn't do this, this replica could only unquiesce by means of + // an election, which means that the request prompting the unquiesce + // would end up with latency on the order of an election timeout. + // + // There are additional checks in quiesceLocked() that prevent us from + // quiescing if there's outstanding work. + r.mu.Lock() + status := r.raftBasicStatusRLocked() + ok := status.Term == req.Message.Term && + status.Commit == req.Message.Commit && + status.Lead == req.Message.From && + r.quiesceLocked() + r.mu.Unlock() + if ok { + return nil } if log.V(4) { log.Infof(ctx, "not quiescing: local raft status is %+v, incoming quiesce message is %+v", status, req.Message)