Skip to content

Commit

Permalink
Merge #46045
Browse files Browse the repository at this point in the history
46045: kvserver: avoid hanging proposal after leader goes down  r=nvanbenschoten,petermattis a=tbg

Deflakes gossip/chaos/nodes=9, i.e.

Fixes #38829.

There was a bug in range quiescence due to which commands would hang in
raft for minutes before actually getting replicated. This would occur
whenever a range was quiesced but a follower replica which didn't know
the (Raft) leader would receive a request.  This request would be
evaluated and put into the Raft proposal buffer, and a ready check would
be enqueued. However, no ready would be produced (since the proposal got
dropped by raft; leader unknown) and so the replica would not unquiesce.

This commit prevents this by always waking up the group if the proposal
buffer was initially nonempty, even if an empty Ready is produced.

It goes further than that by trying to ensure that a leader is always
known while quiesced. Previously, on an incoming request to quiesce, we
did not verify that the raft group had learned the leader's identity.

One shortcoming here is that in the situation in which the proposal
would originally hang "forever", it will now hang for one heartbeat
or election timeout where ideally it would be proposed more reactively. Since
this is so rare I didn't try to address this. Instead, refer to
the ideas in

#37906 (comment)

and

#21849

for future changes that could mitigate this.

Without this PR, the test would fail around 10% of the time. With this
change, it passed 40 iterations in a row without a hitch, via:

    ./bin/roachtest run -u tobias --count 40 --parallelism 10 --cpu-quota 1280 gossip/chaos/nodes=9

Release justification: bug fix
Release note (bug fix): a rare case in which requests to a quiesced
range could hang in the KV replication layer was fixed. This would
manifest as a message saying "have been waiting ... for proposing" even
though no loss of quorum occurred.


Co-authored-by: Peter Mattis <[email protected]>
Co-authored-by: Tobias Schottdorf <[email protected]>
  • Loading branch information
3 people committed Mar 17, 2020
2 parents 0022abe + 1f95860 commit 021781a
Show file tree
Hide file tree
Showing 7 changed files with 83 additions and 25 deletions.
9 changes: 6 additions & 3 deletions pkg/cmd/roachtest/gossip.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,10 @@ import (

func registerGossip(r *testRegistry) {
runGossipChaos := func(ctx context.Context, t *test, c *cluster) {
args := startArgs("--args=--vmodule=*=1")
c.Put(ctx, cockroach, "./cockroach", c.All())
c.Start(ctx, t, c.All())
c.Start(ctx, t, c.All(), args)
waitForFullReplication(t, c.Conn(ctx, 1))

gossipNetwork := func(node int) string {
const query = `
Expand Down Expand Up @@ -65,6 +67,7 @@ SELECT string_agg(source_id::TEXT || ':' || target_id::TEXT, ',')
if i == deadNode {
continue
}
c.l.Printf("%d: checking gossip\n", i)
s := gossipNetwork(i)
if !initialized {
deadNodeStr := fmt.Sprint(deadNode)
Expand All @@ -88,7 +91,7 @@ SELECT string_agg(source_id::TEXT || ':' || target_id::TEXT, ',')
return false
}
}
fmt.Printf("gossip ok: %s (%0.0fs)\n", expected, timeutil.Since(start).Seconds())
c.l.Printf("gossip ok: %s (%0.0fs)\n", expected, timeutil.Since(start).Seconds())
return true
}

Expand All @@ -109,7 +112,7 @@ SELECT string_agg(source_id::TEXT || ':' || target_id::TEXT, ',')
deadNode = nodes.randNode()[0]
c.Stop(ctx, c.Node(deadNode))
waitForGossip()
c.Start(ctx, t, c.Node(deadNode))
c.Start(ctx, t, c.Node(deadNode), args)
}
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/cmd/roachtest/test.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ func (t *test) GetStatus() string {
defer t.mu.Unlock()
status, ok := t.mu.status[t.runnerID]
if ok {
return fmt.Sprintf("%s (set %s ago)", status.msg, timeutil.Now().Sub(status.time))
return fmt.Sprintf("%s (set %s ago)", status.msg, timeutil.Now().Sub(status.time).Round(time.Second))
}
return "N/A"
}
Expand Down
11 changes: 11 additions & 0 deletions pkg/kv/kvserver/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,10 @@ type Replica struct {
// evaluation and is consumed by the Raft processing thread. Once
// consumed, commands are proposed through Raft and moved to the
// proposals map.
//
// Access to proposalBuf must occur *without* holding the mutex.
// Instead, the buffer internally holds a reference to mu and will use
// it appropriately.
proposalBuf propBuf
// proposals stores the Raft in-flight commands which originated at
// this Replica, i.e. all commands for which propose has been called,
Expand Down Expand Up @@ -898,6 +902,13 @@ func (r *Replica) raftStatusRLocked() *raft.Status {
return nil
}

func (r *Replica) raftBasicStatusRLocked() raft.BasicStatus {
if rg := r.mu.internalRaftGroup; rg != nil {
return rg.BasicStatus()
}
return raft.BasicStatus{}
}

// State returns a copy of the internal state of the Replica, along with some
// auxiliary information.
func (r *Replica) State() storagepb.RangeInfo {
Expand Down
15 changes: 9 additions & 6 deletions pkg/kv/kvserver/replica_proposal_buf.go
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,8 @@ func (b *propBuf) flushRLocked() error {

func (b *propBuf) flushLocked() error {
return b.p.withGroupLocked(func(raftGroup *raft.RawNode) error {
return b.FlushLockedWithRaftGroup(raftGroup)
_, err := b.FlushLockedWithRaftGroup(raftGroup)
return err
})
}

Expand All @@ -356,7 +357,9 @@ func (b *propBuf) flushLocked() error {
//
// If raftGroup is non-nil (the common case) then the commands will also be
// proposed to the RawNode. This initiates Raft replication of the commands.
func (b *propBuf) FlushLockedWithRaftGroup(raftGroup *raft.RawNode) error {
//
// Returns the number of proposals handed to the RawNode.
func (b *propBuf) FlushLockedWithRaftGroup(raftGroup *raft.RawNode) (int, error) {
// Before returning, make sure to forward the lease index base to at least
// the proposer's currently applied lease index. This ensures that if the
// lease applied index advances outside of this proposer's control (i.e.
Expand All @@ -374,7 +377,7 @@ func (b *propBuf) FlushLockedWithRaftGroup(raftGroup *raft.RawNode) error {
defer b.arr.adjustSize(used)
if used == 0 {
// The buffer is empty. Nothing to do.
return nil
return 0, nil
} else if used > b.arr.len() {
// The buffer is full and at least one writer has tried to allocate
// on top of the full buffer, so notify them to try again.
Expand Down Expand Up @@ -481,9 +484,9 @@ func (b *propBuf) FlushLockedWithRaftGroup(raftGroup *raft.RawNode) error {
}
}
if firstErr != nil {
return firstErr
return 0, firstErr
}
return proposeBatch(raftGroup, b.p.replicaID(), ents)
return used, proposeBatch(raftGroup, b.p.replicaID(), ents)
}

func (b *propBuf) forwardLeaseIndexBase(v uint64) {
Expand Down Expand Up @@ -521,7 +524,7 @@ func proposeBatch(raftGroup *raft.RawNode, replID roachpb.ReplicaID, ents []raft
// The representative example of this is a caller that wants to flush the buffer
// into the proposals map before canceling all proposals.
func (b *propBuf) FlushLockedWithoutProposing() {
if err := b.FlushLockedWithRaftGroup(nil /* raftGroup */); err != nil {
if _, err := b.FlushLockedWithRaftGroup(nil /* raftGroup */); err != nil {
log.Fatalf(context.Background(), "unexpected error: %+v", err)
}
}
Expand Down
41 changes: 39 additions & 2 deletions pkg/kv/kvserver/replica_raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,9 @@ func (r *Replica) propose(ctx context.Context, p *ProposalData) (index int64, pE
// Insert into the proposal buffer, which passes the command to Raft to be
// proposed. The proposal buffer assigns the command a maximum lease index
// when it sequences it.
//
// NB: we must not hold r.mu while using the proposal buffer, see comment
// on the field.
maxLeaseIndex, err := r.mu.proposalBuf.Insert(p, data)
if err != nil {
return 0, roachpb.NewError(err)
Expand Down Expand Up @@ -408,13 +411,27 @@ func (r *Replica) handleRaftReadyRaftMuLocked(
leaderID := r.mu.leaderID
lastLeaderID := leaderID
err := r.withRaftGroupLocked(true, func(raftGroup *raft.RawNode) (bool, error) {
if err := r.mu.proposalBuf.FlushLockedWithRaftGroup(raftGroup); err != nil {
numFlushed, err := r.mu.proposalBuf.FlushLockedWithRaftGroup(raftGroup)
if err != nil {
return false, err
}
if hasReady = raftGroup.HasReady(); hasReady {
rd = raftGroup.Ready()
}
return hasReady /* unquiesceAndWakeLeader */, nil
// We unquiesce if we have a Ready (= there's work to do). We also have
// to unquiesce if we just flushed some proposals but there isn't a
// Ready, which can happen if the proposals got dropped (raft does this
// if it doesn't know who the leader is). And, for extra defense in depth,
// we also unquiesce if there are outstanding proposals.
//
// NB: if we had the invariant that the group can only be in quiesced
// state if it knows the leader (state.Lead) AND we knew that raft would
// never give us an empty ready here (i.e. the only reason to drop a
// proposal is not knowing the leader) then numFlushed would not be
// necessary. The latter is likely true but we don't want to rely on
// it. The former is maybe true, but there's no easy way to enforce it.
unquiesceAndWakeLeader := hasReady || numFlushed > 0 || len(r.mu.proposals) > 0
return unquiesceAndWakeLeader, nil
})
r.mu.Unlock()
if err == errRemoved {
Expand Down Expand Up @@ -1333,6 +1350,26 @@ func (r *Replica) withRaftGroupLocked(
unquiesce, err := func(rangeID roachpb.RangeID, raftGroup *raft.RawNode) (bool, error) {
return f(raftGroup)
}(r.RangeID, r.mu.internalRaftGroup)
if r.mu.internalRaftGroup.BasicStatus().Lead == 0 {
// If we don't know the leader, unquiesce unconditionally. As a
// follower, we can't wake up the leader if we don't know who that is,
// so we should find out now before someone needs us to unquiesce.
//
// This situation should occur rarely or never (ever since we got
// stricter about validating incoming Quiesce requests) but it's good
// defense-in-depth.
//
// Note that unquiesceAndWakeLeaderLocked won't manage to wake up the
// leader since it's unknown to this replica, and at the time of writing
// the heuristics for campaigning are defensive (won't campaign if there
// is a live leaseholder). But if we are trying to unquiesce because
// this follower was asked to propose something, then this means that a
// request is going to have to wait until the leader next contacts us,
// or, in the worst case, an election timeout. This is not ideal - if a
// node holds a live lease, we should direct the client to it
// immediately.
unquiesce = true
}
if unquiesce {
r.unquiesceAndWakeLeaderLocked()
}
Expand Down
8 changes: 0 additions & 8 deletions pkg/kv/kvserver/replica_raft_quiesce.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,6 @@ import (
"go.etcd.io/etcd/raft/raftpb"
)

// mark the replica as quiesced. Returns true if the Replica is successfully
// quiesced and false otherwise.
func (r *Replica) quiesce() bool {
r.mu.Lock()
defer r.mu.Unlock()
return r.quiesceLocked()
}

func (r *Replica) quiesceLocked() bool {
ctx := r.AnnotateCtx(context.TODO())
if r.hasPendingProposalsRLocked() {
Expand Down
22 changes: 17 additions & 5 deletions pkg/kv/kvserver/store_raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,11 +218,23 @@ func (s *Store) processRaftRequestWithReplica(
if req.Message.Type != raftpb.MsgHeartbeat {
log.Fatalf(ctx, "unexpected quiesce: %+v", req)
}
status := r.RaftStatus()
if status != nil && status.Term == req.Message.Term && status.Commit == req.Message.Commit {
if r.quiesce() {
return nil
}
// If another replica tells us to quiesce, we verify that according to
// it, we are fully caught up, and that we believe it to be the leader.
// If we didn't do this, this replica could only unquiesce by means of
// an election, which means that the request prompting the unquiesce
// would end up with latency on the order of an election timeout.
//
// There are additional checks in quiesceLocked() that prevent us from
// quiescing if there's outstanding work.
r.mu.Lock()
status := r.raftBasicStatusRLocked()
ok := status.Term == req.Message.Term &&
status.Commit == req.Message.Commit &&
status.Lead == req.Message.From &&
r.quiesceLocked()
r.mu.Unlock()
if ok {
return nil
}
if log.V(4) {
log.Infof(ctx, "not quiescing: local raft status is %+v, incoming quiesce message is %+v", status, req.Message)
Expand Down

0 comments on commit 021781a

Please sign in to comment.