Skip to content

Commit

Permalink
Merge #128845
Browse files Browse the repository at this point in the history
128845: raft: update LeadEpoch in response to MsgFortify  r=nvanbenschoten a=arulajmani

This patch partially implements the handling of MsgFortify on peers.
In particular, peers (including the leader) now check whether they
support the leader when handling a fortification request. They persist
the leadEpoch from the StoreLiveness fabric if they do. For now, the
fortification request is trivially rejected regardless; this will change
shortly.

Most of this patch is building out some of the test infra we'll use to
set up the underlying StoreLiveness fabric.

Informs #125261

Release note: None

Co-authored-by: Arul Ajmani <[email protected]>
  • Loading branch information
craig[bot] and arulajmani committed Aug 14, 2024
2 parents 667785c + 3e07f42 commit abe3d27
Show file tree
Hide file tree
Showing 40 changed files with 1,088 additions and 296 deletions.
5 changes: 5 additions & 0 deletions pkg/kv/kvserver/replica_store_liveness.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,11 @@ func (r *replicaRLockedStoreLiveness) SupportFor(replicaID raftpb.PeerID) (raftp
if !ok {
return 0, false
}
// TODO(arul): we can remove this once we start to assign storeLiveness in the
// Store constructor.
if r.store.storeLiveness == nil {
return 0, false
}
epoch, ok := r.store.storeLiveness.SupportFor(storeID)
if !ok {
return 0, false
Expand Down
2 changes: 1 addition & 1 deletion pkg/raft/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ type Ready struct {
}

func isHardStateEqual(a, b pb.HardState) bool {
return a.Term == b.Term && a.Vote == b.Vote && a.Commit == b.Commit && a.Lead == b.Lead
return a == b
}

// IsEmptyHardState returns true if the given HardState is empty.
Expand Down
4 changes: 2 additions & 2 deletions pkg/raft/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -453,13 +453,13 @@ func TestNodeStart(t *testing.T) {
MustSync: true,
},
{
HardState: raftpb.HardState{Term: 2, Commit: 2, Vote: 1, Lead: 1},
HardState: raftpb.HardState{Term: 2, Commit: 2, Vote: 1, Lead: 1, LeadEpoch: 1},
Entries: []raftpb.Entry{{Term: 2, Index: 3, Data: []byte("foo")}},
CommittedEntries: []raftpb.Entry{{Term: 2, Index: 2, Data: nil}},
MustSync: true,
},
{
HardState: raftpb.HardState{Term: 2, Commit: 3, Vote: 1, Lead: 1},
HardState: raftpb.HardState{Term: 2, Commit: 3, Vote: 1, Lead: 1, LeadEpoch: 1},
Entries: nil,
CommittedEntries: []raftpb.Entry{{Term: 2, Index: 3, Data: []byte("foo")}},
MustSync: false,
Expand Down
47 changes: 38 additions & 9 deletions pkg/raft/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -518,19 +518,22 @@ func (r *raft) send(m pb.Message) {
m.Term = r.Term
}
}
if m.Type == pb.MsgAppResp || m.Type == pb.MsgVoteResp || m.Type == pb.MsgPreVoteResp {
if m.Type == pb.MsgAppResp || m.Type == pb.MsgVoteResp ||
m.Type == pb.MsgPreVoteResp || m.Type == pb.MsgFortifyLeaderResp {
// If async storage writes are enabled, messages added to the msgs slice
// are allowed to be sent out before unstable state (e.g. log entry
// writes and election votes) have been durably synced to the local
// disk.
//
// For most message types, this is not an issue. However, response
// messages that relate to "voting" on either leader election or log
// appends require durability before they can be sent. It would be
// incorrect to publish a vote in an election before that vote has been
// For most message types, this is not an issue. However, response messages
// that relate to "voting" on either leader election or log appends or
// leader fortification require durability before they can be sent. It would
// be incorrect to publish a vote in an election before that vote has been
// synced to stable storage locally. Similarly, it would be incorrect to
// acknowledge a log append to the leader before that entry has been
// synced to stable storage locally.
// acknowledge a log append to the leader before that entry has been synced
// to stable storage locally. Similarly, it would also be incorrect to
// promise fortification support to a leader without durably persisting the
// leader's epoch being supported.
//
// Per the Raft thesis, section 3.8 Persisted state and server restarts:
//
Expand Down Expand Up @@ -683,7 +686,16 @@ func (r *raft) sendFortify(to pb.PeerID) {
if to == r.id {
// We handle the case where the leader is trying to fortify itself specially.
// Doing so avoids a self-addressed message.
// TODO(arul): do this handling.
epoch, live := r.storeLiveness.SupportFor(r.lead)
if live {
r.leadEpoch = epoch
// TODO(arul): For now, we're not recording any support on the leader. Do
// this once we implement handleFortifyResp correctly.
} else {
r.logger.Infof(
"%x leader at term %d does not support itself in the liveness fabric", r.id, r.Term,
)
}
return
}
r.send(pb.Message{To: to, Type: pb.MsgFortifyLeader})
Expand Down Expand Up @@ -1701,6 +1713,8 @@ func stepFollower(r *raft, m pb.Message) error {
r.lead = m.From
r.handleSnapshot(m)
case pb.MsgFortifyLeader:
r.electionElapsed = 0
r.lead = m.From
r.handleFortify(m)
case pb.MsgTransferLeader:
if r.lead == None {
Expand Down Expand Up @@ -1868,7 +1882,22 @@ func (r *raft) handleSnapshot(m pb.Message) {
}

func (r *raft) handleFortify(m pb.Message) {
// TODO(arul): currently a no-op; implement.
assertTrue(r.state == StateFollower, "leaders should locally fortify without sending a message")

epoch, live := r.storeLiveness.SupportFor(r.lead)
if !live {
// The leader isn't supported by this peer in its liveness fabric. Reject
// the fortification request.
r.send(pb.Message{
To: m.From,
Type: pb.MsgFortifyLeaderResp,
Reject: true,
})
return
}
r.leadEpoch = epoch
// TODO(arul): for now, we reject the fortification request because the leader
// hasn't been taught how to handle it.
r.send(pb.Message{
To: m.From,
Type: pb.MsgFortifyLeaderResp,
Expand Down
3 changes: 3 additions & 0 deletions pkg/raft/rafttest/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ go_library(
"interaction_env_handler_set_randomized_election_timeout.go",
"interaction_env_handler_stabilize.go",
"interaction_env_handler_status.go",
"interaction_env_handler_store_liveness.go",
"interaction_env_handler_tick.go",
"interaction_env_handler_transfer_leadership.go",
"interaction_env_logger.go",
Expand All @@ -37,7 +38,9 @@ go_library(
"//pkg/raft/raftpb",
"//pkg/raft/raftstoreliveness",
"//pkg/raft/tracker",
"//pkg/util/hlc",
"@com_github_cockroachdb_datadriven//:datadriven",
"@com_github_cockroachdb_errors//:errors",
"@com_github_stretchr_testify//require",
],
)
Expand Down
4 changes: 2 additions & 2 deletions pkg/raft/rafttest/interaction_env.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (

"github.com/cockroachdb/cockroach/pkg/raft"
pb "github.com/cockroachdb/cockroach/pkg/raft/raftpb"
"github.com/cockroachdb/cockroach/pkg/raft/raftstoreliveness"
)

// InteractionOpts groups the options for an InteractionEnv.
Expand Down Expand Up @@ -54,6 +53,7 @@ type InteractionEnv struct {
Options *InteractionOpts
Nodes []Node
Messages []pb.Message // in-flight messages
Fabric *livenessFabric

Output *RedirectLogger
}
Expand All @@ -65,6 +65,7 @@ func NewInteractionEnv(opts *InteractionOpts) *InteractionEnv {
}
return &InteractionEnv{
Options: opts,
Fabric: newLivenessFabric(),
Output: &RedirectLogger{
Builder: &strings.Builder{},
},
Expand Down Expand Up @@ -103,7 +104,6 @@ func raftConfigStub() raft.Config {
HeartbeatTick: 1,
MaxSizePerMsg: math.MaxUint64,
MaxInflightMsgs: math.MaxInt32,
StoreLiveness: raftstoreliveness.AlwaysLive{},
}
}

Expand Down
71 changes: 69 additions & 2 deletions pkg/raft/rafttest/interaction_env_handler.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
// This code has been modified from its original form by Cockroach Labs, Inc.
// All modifications are Copyright 2024 Cockroach Labs, Inc.
//
// Copyright 2019 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
Expand All @@ -20,6 +23,7 @@ import (
"testing"

"github.com/cockroachdb/datadriven"
"github.com/cockroachdb/errors"
)

// Handle is the entrypoint for data-driven interaction testing. Commands and
Expand Down Expand Up @@ -192,6 +196,65 @@ func (env *InteractionEnv) Handle(t *testing.T, d datadriven.TestData) string {
// Example:
// report-unreachable 1 2
err = env.handleReportUnreachable(t, d)
case "store-liveness":
// Prints the global store liveness state.
//
// Example:
// store-liveness
if env.Fabric == nil {
err = errors.Newf("empty liveness fabric")
break
}
_, err = env.Output.WriteString(env.Fabric.String())
case "bump-epoch":
// Bumps the epoch of a store. As a result, the store stops seeking support
// from all remote stores at the prior epoch. It instead (successfully)
// seeks support for the newer epoch.
//
// bump-epoch id
// Arguments are:
// id - id of the store whose epoch is being bumped.
//
// Example:
// bump-epoch 1
err = env.handleBumpEpoch(t, d)

case "withdraw-support":
// Withdraw support for another store (for_store) from a store (from_store).
//
// Note that after invoking "withdraw-support", a test may establish support
// from from_store for for_store at a higher epoch by calling
// "grant-support".
//
// withdraw-support from_id for_id
// Arguments are:
// from_id - id of the store who is withdrawing support.
// for_id - id of the store for which support is being withdrawn.
//
// Example:
// withdraw-support 1 2
// Explanation:
// 1 (from_store) withdraws support for 2's (for_store) current epoch.
err = env.handleWithdrawSupport(t, d)

case "grant-support":
// Grant support for another store (for_store) by forcing for_store to bump
// its epoch and using it to seek support from from_store at this new epoch.
//
// Note that from_store should not be supporting for_store already; if it
// is, an error will be returned.
//
// grant-support from_id for_id
// Arguments are:
// from_id - id of the store who is granting support.
// for_id - id of the store for which support is being granted.
//
// Example:
// grant-support 1 2
// Explanation:
// 1 (from_store) grants support for 2 (for_store) at a higher epoch.
err = env.handleGrantSupport(t, d)

default:
err = fmt.Errorf("unknown command")
}
Expand All @@ -211,12 +274,16 @@ func (env *InteractionEnv) Handle(t *testing.T, d datadriven.TestData) string {
}

func firstAsInt(t *testing.T, d datadriven.TestData) int {
return nthAsInt(t, d, 0)
}

func nthAsInt(t *testing.T, d datadriven.TestData, n int) int {
t.Helper()
n, err := strconv.Atoi(d.CmdArgs[0].Key)
ret, err := strconv.Atoi(d.CmdArgs[n].Key)
if err != nil {
t.Fatal(err)
}
return n
return ret
}

func firstAsNodeIdx(t *testing.T, d datadriven.TestData) int {
Expand Down
4 changes: 4 additions & 0 deletions pkg/raft/rafttest/interaction_env_handler_add_nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,10 @@ func (env *InteractionEnv) AddNodes(n int, cfg raft.Config, snap pb.Snapshot) er
}
cfg := cfg // fork the config stub
cfg.ID, cfg.Storage = id, s

env.Fabric.addNode()
cfg.StoreLiveness = newStoreLiveness(env.Fabric, id)

if env.Options.OnConfig != nil {
env.Options.OnConfig(&cfg)
if cfg.ID != id {
Expand Down
Loading

0 comments on commit abe3d27

Please sign in to comment.