Skip to content

Commit

Permalink
raft: update LeadEpoch in response to MsgFortify
Browse files Browse the repository at this point in the history
This patch partially implements the handling of MsgFortify on peers.
In particular, peers (including the leader) now check whether they
support the leader when handling a fortification request. They persist
the leadEpoch from the StoreLiveness fabric if they do. For now, the
fortification request is trivially rejected regardless; this will change
shortly.

Most of this patch is building out some of the test infra we'll use to
set up the underlying StoreLiveness fabric.

Informs cockroachdb#125261

Release note: None
  • Loading branch information
arulajmani committed Aug 13, 2024
1 parent 1543e27 commit 57e7247
Show file tree
Hide file tree
Showing 36 changed files with 986 additions and 196 deletions.
5 changes: 5 additions & 0 deletions pkg/kv/kvserver/replica_store_liveness.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,11 @@ func (r *replicaRLockedStoreLiveness) SupportFor(replicaID raftpb.PeerID) (raftp
if !ok {
return 0, false
}
// TODO(arul): we can remove this once we start to assign storeLiveness in the
// Store constructor.
if r.store.storeLiveness == nil {
return 0, false
}
epoch, ok := r.store.storeLiveness.SupportFor(storeID)
if !ok {
return 0, false
Expand Down
4 changes: 2 additions & 2 deletions pkg/raft/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -453,13 +453,13 @@ func TestNodeStart(t *testing.T) {
MustSync: true,
},
{
HardState: raftpb.HardState{Term: 2, Commit: 2, Vote: 1, Lead: 1},
HardState: raftpb.HardState{Term: 2, Commit: 2, Vote: 1, Lead: 1, LeadEpoch: 1},
Entries: []raftpb.Entry{{Term: 2, Index: 3, Data: []byte("foo")}},
CommittedEntries: []raftpb.Entry{{Term: 2, Index: 2, Data: nil}},
MustSync: true,
},
{
HardState: raftpb.HardState{Term: 2, Commit: 3, Vote: 1, Lead: 1},
HardState: raftpb.HardState{Term: 2, Commit: 3, Vote: 1, Lead: 1, LeadEpoch: 1},
Entries: nil,
CommittedEntries: []raftpb.Entry{{Term: 2, Index: 3, Data: []byte("foo")}},
MustSync: false,
Expand Down
49 changes: 39 additions & 10 deletions pkg/raft/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -518,19 +518,22 @@ func (r *raft) send(m pb.Message) {
m.Term = r.Term
}
}
if m.Type == pb.MsgAppResp || m.Type == pb.MsgVoteResp || m.Type == pb.MsgPreVoteResp {
if m.Type == pb.MsgAppResp || m.Type == pb.MsgVoteResp ||
m.Type == pb.MsgPreVoteResp || m.Type == pb.MsgFortifyLeaderResp {
// If async storage writes are enabled, messages added to the msgs slice
// are allowed to be sent out before unstable state (e.g. log entry
// writes and election votes) have been durably synced to the local
// disk.
//
// For most message types, this is not an issue. However, response
// messages that relate to "voting" on either leader election or log
// appends require durability before they can be sent. It would be
// incorrect to publish a vote in an election before that vote has been
// synced to stable storage locally. Similarly, it would be incorrect to
// acknowledge a log append to the leader before that entry has been
// synced to stable storage locally.
// For most message types, this is not an issue. However, response messages
// that relate to "voting" on either leader election or log appends or
// messages that relate to leader fortification require durability before
// they can be sent. It would be incorrect to publish a vote in an election
// before that vote has been synced to stable storage locally. Similarly, it
// would be incorrect to acknowledge a log append to the leader before that
// entry has been synced to stable storage locally. Similarly, it would also
// be incorrect to promise fortification support to a leader without durably
// persisting the leader's epoch being supported.
//
// Per the Raft thesis, section 3.8 Persisted state and server restarts:
//
Expand Down Expand Up @@ -683,7 +686,16 @@ func (r *raft) sendFortify(to pb.PeerID) {
if to == r.id {
// We handle the case where the leader is trying to fortify itself specially.
// Doing so avoids a self-addressed message.
// TODO(arul): do this handling.
epoch, live := r.storeLiveness.SupportFor(r.lead)
if live {
r.leadEpoch = epoch
// TODO(arul): For now, we're not recording any support on the leader. Do
// this once we implement handleFortifyResp correctly.
} else {
r.logger.Infof(
"%x leader at term %d does not support itself in the liveness fabric", r.id, r.Term,
)
}
return
}
r.send(pb.Message{To: to, Type: pb.MsgFortifyLeader})
Expand Down Expand Up @@ -1701,6 +1713,8 @@ func stepFollower(r *raft, m pb.Message) error {
r.lead = m.From
r.handleSnapshot(m)
case pb.MsgFortifyLeader:
r.electionElapsed = 0
r.lead = m.From
r.handleFortify(m)
case pb.MsgTransferLeader:
if r.lead == None {
Expand Down Expand Up @@ -1868,7 +1882,22 @@ func (r *raft) handleSnapshot(m pb.Message) {
}

func (r *raft) handleFortify(m pb.Message) {
// TODO(arul): currently a no-op; implement.
assertTrue(r.state == StateFollower, "leaders should locally fortify without sending a message")

epoch, live := r.storeLiveness.SupportFor(r.lead)
if !live {
// The leader isn't supported by this peer in its liveness fabric. Reject
// the fortification request.
r.send(pb.Message{
To: m.From,
Type: pb.MsgFortifyLeaderResp,
Reject: true,
})
return
}
r.leadEpoch = epoch
// TODO(arul): for now, we reject the fortification request because the leader
// hasn't been taught how to handle it.
r.send(pb.Message{
To: m.From,
Type: pb.MsgFortifyLeaderResp,
Expand Down
3 changes: 3 additions & 0 deletions pkg/raft/rafttest/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ go_library(
"interaction_env_handler_set_randomized_election_timeout.go",
"interaction_env_handler_stabilize.go",
"interaction_env_handler_status.go",
"interaction_env_handler_store_liveness.go",
"interaction_env_handler_tick.go",
"interaction_env_handler_transfer_leadership.go",
"interaction_env_logger.go",
Expand All @@ -37,7 +38,9 @@ go_library(
"//pkg/raft/raftpb",
"//pkg/raft/raftstoreliveness",
"//pkg/raft/tracker",
"//pkg/util/hlc",
"@com_github_cockroachdb_datadriven//:datadriven",
"@com_github_cockroachdb_errors//:errors",
"@com_github_stretchr_testify//require",
],
)
Expand Down
4 changes: 2 additions & 2 deletions pkg/raft/rafttest/interaction_env.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (

"github.com/cockroachdb/cockroach/pkg/raft"
pb "github.com/cockroachdb/cockroach/pkg/raft/raftpb"
"github.com/cockroachdb/cockroach/pkg/raft/raftstoreliveness"
)

// InteractionOpts groups the options for an InteractionEnv.
Expand Down Expand Up @@ -54,6 +53,7 @@ type InteractionEnv struct {
Options *InteractionOpts
Nodes []Node
Messages []pb.Message // in-flight messages
Fabric *livenessFabric

Output *RedirectLogger
}
Expand All @@ -65,6 +65,7 @@ func NewInteractionEnv(opts *InteractionOpts) *InteractionEnv {
}
return &InteractionEnv{
Options: opts,
Fabric: newLivenessFabric(),
Output: &RedirectLogger{
Builder: &strings.Builder{},
},
Expand Down Expand Up @@ -103,7 +104,6 @@ func raftConfigStub() raft.Config {
HeartbeatTick: 1,
MaxSizePerMsg: math.MaxUint64,
MaxInflightMsgs: math.MaxInt32,
StoreLiveness: raftstoreliveness.AlwaysLive{},
}
}

Expand Down
68 changes: 66 additions & 2 deletions pkg/raft/rafttest/interaction_env_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"testing"

"github.com/cockroachdb/datadriven"
"github.com/cockroachdb/errors"
)

// Handle is the entrypoint for data-driven interaction testing. Commands and
Expand Down Expand Up @@ -192,6 +193,65 @@ func (env *InteractionEnv) Handle(t *testing.T, d datadriven.TestData) string {
// Example:
// report-unreachable 1 2
err = env.handleReportUnreachable(t, d)
case "store-liveness":
// Prints the global store liveness state.
//
// Example:
// store-liveness
if env.Fabric == nil {
err = errors.Newf("empty liveness fabric")
break
}
_, err = env.Output.WriteString(env.Fabric.String())
case "bump-epoch":
// Bumps the epoch of a store. As a result, the store stops seeking support
// from all remote stores at the prior epoch. It instead (successfully)
// seeks support for the newer epoch.
//
// bump-epoch id
// Arguments are:
// id - id of the store whose epoch is being bumped.
//
// Example:
// bump-epoch 1
err = env.handleBumpEpoch(t, d)

case "withdraw-support":
// Withdraw support for another store (for_store) from a store (from_store).
//
// Note that after invoking "withdraw-support", a test may establish support
// from from_store for for_store at a higher epoch by calling
// "grant-support".
//
// withdraw-support from_id for_id
// Arguments are:
// from_id - id of the store who is withdrawing support.
// for_id - id of the store for which support is being withdrawn.
//
// Example:
// withdraw-support 1 2
// Explanation:
// 1 (from_store) withdraws support for 2's (for_store) current epoch.
err = env.handleWithdrawSupport(t, d)

case "grant-support":
// Grant support for another store (for_store) by forcing for_store to bump
// its epoch and using it to seek support from from_store at this new epoch.
//
// Note that From_store should not be supporting for_store already; if it
// is, an error will be returned.
//
// grant-support from_id for_id
// Arguments are:
// from_id - id of the store who is withdrawing support.
// for_id - id of the store for which support is being withdrawn.
//
// Example:
// grant-support 1 2
// Explanation:
// 1 (from_store) grants support for 2 (for_store) at a higher epoch.
err = env.handleGrantSupport(t, d)

default:
err = fmt.Errorf("unknown command")
}
Expand All @@ -211,12 +271,16 @@ func (env *InteractionEnv) Handle(t *testing.T, d datadriven.TestData) string {
}

func firstAsInt(t *testing.T, d datadriven.TestData) int {
return nthAsInt(t, d, 0)
}

func nthAsInt(t *testing.T, d datadriven.TestData, n int) int {
t.Helper()
n, err := strconv.Atoi(d.CmdArgs[0].Key)
ret, err := strconv.Atoi(d.CmdArgs[n].Key)
if err != nil {
t.Fatal(err)
}
return n
return ret
}

func firstAsNodeIdx(t *testing.T, d datadriven.TestData) int {
Expand Down
4 changes: 4 additions & 0 deletions pkg/raft/rafttest/interaction_env_handler_add_nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,10 @@ func (env *InteractionEnv) AddNodes(n int, cfg raft.Config, snap pb.Snapshot) er
}
cfg := cfg // fork the config stub
cfg.ID, cfg.Storage = id, s

env.Fabric.addNode()
cfg.StoreLiveness = newStoreLiveness(env.Fabric, id)

if env.Options.OnConfig != nil {
env.Options.OnConfig(&cfg)
if cfg.ID != id {
Expand Down
Loading

0 comments on commit 57e7247

Please sign in to comment.