Skip to content

Commit

Permalink
raft: update LeadEpoch in response to MsgFortify
Browse files Browse the repository at this point in the history
This patch partially implements the handling of MsgFortify on peers.
In particular, peers (including the leader) now check whether they
support the leader when handling a fortification request. They persist
the leadEpoch from the StoreLiveness fabric if they do. For now, the
fortification request is trivially rejected regardless; this will change
shortly.

Most of this patch is building out some of the test infra we'll use to
set up the underlying StoreLiveness fabric.

Informs cockroachdb#125261

Release note: None
  • Loading branch information
arulajmani committed Aug 14, 2024
1 parent 1543e27 commit 5892efb
Show file tree
Hide file tree
Showing 36 changed files with 988 additions and 195 deletions.
5 changes: 5 additions & 0 deletions pkg/kv/kvserver/replica_store_liveness.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,11 @@ func (r *replicaRLockedStoreLiveness) SupportFor(replicaID raftpb.PeerID) (raftp
if !ok {
return 0, false
}
// TODO(arul): we can remove this once we start to assign storeLiveness in the
// Store constructor.
if r.store.storeLiveness == nil {
return 0, false
}
epoch, ok := r.store.storeLiveness.SupportFor(storeID)
if !ok {
return 0, false
Expand Down
4 changes: 2 additions & 2 deletions pkg/raft/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -453,13 +453,13 @@ func TestNodeStart(t *testing.T) {
MustSync: true,
},
{
HardState: raftpb.HardState{Term: 2, Commit: 2, Vote: 1, Lead: 1},
HardState: raftpb.HardState{Term: 2, Commit: 2, Vote: 1, Lead: 1, LeadEpoch: 1},
Entries: []raftpb.Entry{{Term: 2, Index: 3, Data: []byte("foo")}},
CommittedEntries: []raftpb.Entry{{Term: 2, Index: 2, Data: nil}},
MustSync: true,
},
{
HardState: raftpb.HardState{Term: 2, Commit: 3, Vote: 1, Lead: 1},
HardState: raftpb.HardState{Term: 2, Commit: 3, Vote: 1, Lead: 1, LeadEpoch: 1},
Entries: nil,
CommittedEntries: []raftpb.Entry{{Term: 2, Index: 3, Data: []byte("foo")}},
MustSync: false,
Expand Down
47 changes: 38 additions & 9 deletions pkg/raft/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -518,19 +518,22 @@ func (r *raft) send(m pb.Message) {
m.Term = r.Term
}
}
if m.Type == pb.MsgAppResp || m.Type == pb.MsgVoteResp || m.Type == pb.MsgPreVoteResp {
if m.Type == pb.MsgAppResp || m.Type == pb.MsgVoteResp ||
m.Type == pb.MsgPreVoteResp || m.Type == pb.MsgFortifyLeaderResp {
// If async storage writes are enabled, messages added to the msgs slice
// are allowed to be sent out before unstable state (e.g. log entry
// writes and election votes) have been durably synced to the local
// disk.
//
// For most message types, this is not an issue. However, response
// messages that relate to "voting" on either leader election or log
// appends require durability before they can be sent. It would be
// incorrect to publish a vote in an election before that vote has been
// For most message types, this is not an issue. However, response messages
// that relate to "voting" on either leader election or log appends or
// leader fortification require durability before they can be sent. It would
// be incorrect to publish a vote in an election before that vote has been
// synced to stable storage locally. Similarly, it would be incorrect to
// acknowledge a log append to the leader before that entry has been
// synced to stable storage locally.
// acknowledge a log append to the leader before that entry has been synced
// to stable storage locally. Similarly, it would also be incorrect to
// promise fortification support to a leader without durably persisting the
// leader's epoch being supported.
//
// Per the Raft thesis, section 3.8 Persisted state and server restarts:
//
Expand Down Expand Up @@ -683,7 +686,16 @@ func (r *raft) sendFortify(to pb.PeerID) {
if to == r.id {
// We handle the case where the leader is trying to fortify itself specially.
// Doing so avoids a self-addressed message.
// TODO(arul): do this handling.
epoch, live := r.storeLiveness.SupportFor(r.lead)
if live {
r.leadEpoch = epoch
// TODO(arul): For now, we're not recording any support on the leader. Do
// this once we implement handleFortifyResp correctly.
} else {
r.logger.Infof(
"%x leader at term %d does not support itself in the liveness fabric", r.id, r.Term,
)
}
return
}
r.send(pb.Message{To: to, Type: pb.MsgFortifyLeader})
Expand Down Expand Up @@ -1701,6 +1713,8 @@ func stepFollower(r *raft, m pb.Message) error {
r.lead = m.From
r.handleSnapshot(m)
case pb.MsgFortifyLeader:
r.electionElapsed = 0
r.lead = m.From
r.handleFortify(m)
case pb.MsgTransferLeader:
if r.lead == None {
Expand Down Expand Up @@ -1868,7 +1882,22 @@ func (r *raft) handleSnapshot(m pb.Message) {
}

func (r *raft) handleFortify(m pb.Message) {
// TODO(arul): currently a no-op; implement.
assertTrue(r.state == StateFollower, "leaders should locally fortify without sending a message")

epoch, live := r.storeLiveness.SupportFor(r.lead)
if !live {
// The leader isn't supported by this peer in its liveness fabric. Reject
// the fortification request.
r.send(pb.Message{
To: m.From,
Type: pb.MsgFortifyLeaderResp,
Reject: true,
})
return
}
r.leadEpoch = epoch
// TODO(arul): for now, we reject the fortification request because the leader
// hasn't been taught how to handle it.
r.send(pb.Message{
To: m.From,
Type: pb.MsgFortifyLeaderResp,
Expand Down
3 changes: 3 additions & 0 deletions pkg/raft/rafttest/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ go_library(
"interaction_env_handler_set_randomized_election_timeout.go",
"interaction_env_handler_stabilize.go",
"interaction_env_handler_status.go",
"interaction_env_handler_store_liveness.go",
"interaction_env_handler_tick.go",
"interaction_env_handler_transfer_leadership.go",
"interaction_env_logger.go",
Expand All @@ -37,7 +38,9 @@ go_library(
"//pkg/raft/raftpb",
"//pkg/raft/raftstoreliveness",
"//pkg/raft/tracker",
"//pkg/util/hlc",
"@com_github_cockroachdb_datadriven//:datadriven",
"@com_github_cockroachdb_errors//:errors",
"@com_github_stretchr_testify//require",
],
)
Expand Down
4 changes: 2 additions & 2 deletions pkg/raft/rafttest/interaction_env.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (

"github.com/cockroachdb/cockroach/pkg/raft"
pb "github.com/cockroachdb/cockroach/pkg/raft/raftpb"
"github.com/cockroachdb/cockroach/pkg/raft/raftstoreliveness"
)

// InteractionOpts groups the options for an InteractionEnv.
Expand Down Expand Up @@ -54,6 +53,7 @@ type InteractionEnv struct {
Options *InteractionOpts
Nodes []Node
Messages []pb.Message // in-flight messages
Fabric *livenessFabric

Output *RedirectLogger
}
Expand All @@ -65,6 +65,7 @@ func NewInteractionEnv(opts *InteractionOpts) *InteractionEnv {
}
return &InteractionEnv{
Options: opts,
Fabric: newLivenessFabric(),
Output: &RedirectLogger{
Builder: &strings.Builder{},
},
Expand Down Expand Up @@ -103,7 +104,6 @@ func raftConfigStub() raft.Config {
HeartbeatTick: 1,
MaxSizePerMsg: math.MaxUint64,
MaxInflightMsgs: math.MaxInt32,
StoreLiveness: raftstoreliveness.AlwaysLive{},
}
}

Expand Down
71 changes: 69 additions & 2 deletions pkg/raft/rafttest/interaction_env_handler.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
// This code has been modified from its original form by Cockroach Labs, Inc.
// All modifications are Copyright 2024 Cockroach Labs, Inc.
//
// Copyright 2019 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
Expand All @@ -20,6 +23,7 @@ import (
"testing"

"github.com/cockroachdb/datadriven"
"github.com/cockroachdb/errors"
)

// Handle is the entrypoint for data-driven interaction testing. Commands and
Expand Down Expand Up @@ -192,6 +196,65 @@ func (env *InteractionEnv) Handle(t *testing.T, d datadriven.TestData) string {
// Example:
// report-unreachable 1 2
err = env.handleReportUnreachable(t, d)
case "store-liveness":
// Prints the global store liveness state.
//
// Example:
// store-liveness
if env.Fabric == nil {
err = errors.Newf("empty liveness fabric")
break
}
_, err = env.Output.WriteString(env.Fabric.String())
case "bump-epoch":
// Bumps the epoch of a store. As a result, the store stops seeking support
// from all remote stores at the prior epoch. It instead (successfully)
// seeks support for the newer epoch.
//
// bump-epoch id
// Arguments are:
// id - id of the store whose epoch is being bumped.
//
// Example:
// bump-epoch 1
err = env.handleBumpEpoch(t, d)

case "withdraw-support":
// Withdraw support for another store (for_store) from a store (from_store).
//
// Note that after invoking "withdraw-support", a test may establish support
// from from_store for for_store at a higher epoch by calling
// "grant-support".
//
// withdraw-support from_id for_id
// Arguments are:
// from_id - id of the store who is withdrawing support.
// for_id - id of the store for which support is being withdrawn.
//
// Example:
// withdraw-support 1 2
// Explanation:
// 1 (from_store) withdraws support for 2's (for_store) current epoch.
err = env.handleWithdrawSupport(t, d)

case "grant-support":
// Grant support for another store (for_store) by forcing for_store to bump
// its epoch and using it to seek support from from_store at this new epoch.
//
// Note that from_store should not be supporting for_store already; if it
// is, an error will be returned.
//
// grant-support from_id for_id
// Arguments are:
// from_id - id of the store who is granting support.
// for_id - id of the store for which support is being granted.
//
// Example:
// grant-support 1 2
// Explanation:
// 1 (from_store) grants support for 2 (for_store) at a higher epoch.
err = env.handleGrantSupport(t, d)

default:
err = fmt.Errorf("unknown command")
}
Expand All @@ -211,12 +274,16 @@ func (env *InteractionEnv) Handle(t *testing.T, d datadriven.TestData) string {
}

func firstAsInt(t *testing.T, d datadriven.TestData) int {
return nthAsInt(t, d, 0)
}

func nthAsInt(t *testing.T, d datadriven.TestData, n int) int {
t.Helper()
n, err := strconv.Atoi(d.CmdArgs[0].Key)
ret, err := strconv.Atoi(d.CmdArgs[n].Key)
if err != nil {
t.Fatal(err)
}
return n
return ret
}

func firstAsNodeIdx(t *testing.T, d datadriven.TestData) int {
Expand Down
4 changes: 4 additions & 0 deletions pkg/raft/rafttest/interaction_env_handler_add_nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,10 @@ func (env *InteractionEnv) AddNodes(n int, cfg raft.Config, snap pb.Snapshot) er
}
cfg := cfg // fork the config stub
cfg.ID, cfg.Storage = id, s

env.Fabric.addNode()
cfg.StoreLiveness = newStoreLiveness(env.Fabric, id)

if env.Options.OnConfig != nil {
env.Options.OnConfig(&cfg)
if cfg.ID != id {
Expand Down
Loading

0 comments on commit 5892efb

Please sign in to comment.