Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make snapshot timing and trailing logs hot-reloadable in raft #444

Merged
merged 4 commits into from
Mar 31, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 40 additions & 7 deletions api.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,15 @@ type Raft struct {
// be committed and applied to the FSM.
applyCh chan *logFuture

// Configuration provided at Raft initialization
conf Config
// conf stores the current configuration to use. This is the most recent one
// provided. All reads of config values should use the config() helper method
// to read this safely.
conf atomic.Value

// confReloadMu ensures that only one thread can reload config at once since
// we need to read-modify-write the atomic. It is NOT necessary to hold this
// for any other operation e.g. reading config using config().
confReloadMu sync.Mutex

// FSM is the client state machine to apply commands to
fsm FSM
Expand Down Expand Up @@ -385,9 +392,9 @@ func RecoverCluster(conf *Config, fsm FSM, logs LogStore, stable StableStore,
return nil
}

// GetConfiguration returns the configuration of the Raft cluster without
// starting a Raft instance or connecting to the cluster
// This function has identical behavior to Raft.GetConfiguration
// GetConfiguration returns the persisted configuration of the Raft cluster
// without starting a Raft instance or connecting to the cluster. This function
// has identical behavior to Raft.GetConfiguration.
func GetConfiguration(conf *Config, fsm FSM, logs LogStore, stable StableStore,
snaps SnapshotStore, trans Transport) (Configuration, error) {
conf.skipStartup = true
Expand Down Expand Up @@ -505,7 +512,6 @@ func NewRaft(conf *Config, fsm FSM, logs LogStore, stable StableStore, snaps Sna
r := &Raft{
protocolVersion: protocolVersion,
applyCh: applyCh,
conf: *conf,
fsm: fsm,
fsmMutateCh: make(chan interface{}, 128),
fsmSnapshotCh: make(chan *reqSnapshotFuture),
Expand All @@ -530,6 +536,8 @@ func NewRaft(conf *Config, fsm FSM, logs LogStore, stable StableStore, snaps Sna
leadershipTransferCh: make(chan *leadershipTransferFuture, 1),
}

r.conf.Store(*conf)

// Initialize as a follower.
r.setState(Follower)

Expand Down Expand Up @@ -583,7 +591,7 @@ func (r *Raft) restoreSnapshot() error {

// Try to load in order of newest to oldest
for _, snapshot := range snapshots {
if !r.conf.NoSnapshotRestoreOnStart {
if !r.config().NoSnapshotRestoreOnStart {
_, source, err := r.snapshots.Open(snapshot.ID)
if err != nil {
r.logger.Error("failed to open snapshot", "id", snapshot.ID, "error", err)
Expand Down Expand Up @@ -630,6 +638,31 @@ func (r *Raft) restoreSnapshot() error {
return nil
}

func (r *Raft) config() Config {
return r.conf.Load().(Config)
}

// ReloadConfig updates the configuration of a running raft node. If the new
// configuration is invalid an error is returned and no changes made to the
// instance.
func (r *Raft) ReloadConfig(rc *ReloadableConfig) error {
r.confReloadMu.Lock()
defer r.confReloadMu.Unlock()

// Load the current config (note we are under a lock so it can't be changed
// between this read and a later Store).
oldCfg := r.config()

// Set the reloadable fields
newCfg := rc.apply(oldCfg)

if err := ValidateConfig(&newCfg); err != nil {
return err
}
r.conf.Store(newCfg)
return nil
}

// BootstrapCluster is equivalent to non-member BootstrapCluster but can be
// called on an un-bootstrapped Raft instance after it has been created. This
// should only be called at the beginning of time for the cluster with an
Expand Down
52 changes: 45 additions & 7 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,19 +164,23 @@ type Config struct {
// we can become a leader of a cluster containing only this node.
ShutdownOnRemove bool

// TrailingLogs controls how many logs we leave after a snapshot. This is
// used so that we can quickly replay logs on a follower instead of being
// forced to send an entire snapshot.
// TrailingLogs controls how many logs we leave after a snapshot. This is used
// so that we can quickly replay logs on a follower instead of being forced to
// send an entire snapshot. The value passed here is the initial setting used.
// This can be tuned during operation using ReloadConfig.
TrailingLogs uint64

// SnapshotInterval controls how often we check if we should perform a snapshot.
// We randomly stagger between this value and 2x this value to avoid the entire
// cluster from performing a snapshot at once.
// SnapshotInterval controls how often we check if we should perform a
// snapshot. We randomly stagger between this value and 2x this value to avoid
// the entire cluster from performing a snapshot at once. The value passed
// here is the initial setting used. This can be tuned during operation using
// ReloadConfig.
SnapshotInterval time.Duration

// SnapshotThreshold controls how many outstanding logs there must be before
// we perform a snapshot. This is to prevent excessive snapshots when we can
// just replay a small set of logs.
// just replay a small set of logs. The value passed here is the initial
// setting used. This can be tuned during operation using ReloadConfig.
SnapshotThreshold uint64
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of duplicating these fields here and in ReloadableConfig can we embed ReloadableConfig?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Stated below,
" We choose to duplicate fields over embedding or accepting a Config but only using specific fields to keep the API clear."
I'm not entirely sold on the idea of embedding the ReloadableConfig into the "regular" Config because the separation of "this changes, and this doesn't" is clear and "feels safer" (which is probably not a good reason, I admit)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was on the fence there - I wasn't sure if it was technically binary compatible to embed which tipped the balance although in practice most reasonable usages would probably have been unaffected. I tend to agree with Sarah though - the separation ends up being kind of OK with me in the end for keeping things clearly delineated?


// LeaderLeaseTimeout is used to control how long the "lease" lasts
Expand Down Expand Up @@ -218,6 +222,40 @@ type Config struct {
skipStartup bool
}

// ReloadableConfig is the subset of Config that may be reconfigured during
// runtime using raft.ReloadConfig. We choose to duplicate fields over embedding
// or accepting a Config but only using specific fields to keep the API clear.
// Reconfiguring some fields is potentially dangerous so we should only
// selectively enable it for fields where that is allowed.
type ReloadableConfig struct {
// TrailingLogs controls how many logs we leave after a snapshot. This is used
// so that we can quickly replay logs on a follower instead of being forced to
// send an entire snapshot. The value passed here updates the setting at runtime
// which will take effect as soon as the next snapshot completes and truncation
// occurs.
TrailingLogs uint64

// SnapshotInterval controls how often we check if we should perform a snapshot.
// We randomly stagger between this value and 2x this value to avoid the entire
// cluster from performing a snapshot at once.
SnapshotInterval time.Duration

// SnapshotThreshold controls how many outstanding logs there must be before
// we perform a snapshot. This is to prevent excessive snapshots when we can
// just replay a small set of logs.
SnapshotThreshold uint64
}

// apply sets the reloadable fields on the passed Config to the values in
// `ReloadableConfig`. It returns a copy of Config with the fields from this
// ReloadableConfig set.
func (rc *ReloadableConfig) apply(to Config) Config {
to.TrailingLogs = rc.TrailingLogs
to.SnapshotInterval = rc.SnapshotInterval
to.SnapshotThreshold = rc.SnapshotThreshold
return to
}

// DefaultConfig returns a Config with usable defaults.
func DefaultConfig() *Config {
return &Config{
Expand Down
56 changes: 37 additions & 19 deletions raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ var (
// responses.
func (r *Raft) getRPCHeader() RPCHeader {
return RPCHeader{
ProtocolVersion: r.conf.ProtocolVersion,
ProtocolVersion: r.config().ProtocolVersion,
}
}

Expand All @@ -56,7 +56,7 @@ func (r *Raft) checkRPCHeader(rpc RPC) error {
// currently what we want, and in general support one version back. We
// may need to revisit this policy depending on how future protocol
// changes evolve.
if header.ProtocolVersion < r.conf.ProtocolVersion-1 {
if header.ProtocolVersion < r.config().ProtocolVersion-1 {
return ErrUnsupportedProtocol
}

Expand Down Expand Up @@ -151,7 +151,7 @@ func (r *Raft) runFollower() {
didWarn := false
r.logger.Info("entering follower state", "follower", r, "leader", r.Leader())
metrics.IncrCounter([]string{"raft", "state", "follower"}, 1)
heartbeatTimer := randomTimeout(r.conf.HeartbeatTimeout)
heartbeatTimer := randomTimeout(r.config().HeartbeatTimeout)

for r.getState() == Follower {
select {
Expand Down Expand Up @@ -187,11 +187,12 @@ func (r *Raft) runFollower() {

case <-heartbeatTimer:
// Restart the heartbeat timer
heartbeatTimer = randomTimeout(r.conf.HeartbeatTimeout)
hbTimeout := r.config().HeartbeatTimeout
heartbeatTimer = randomTimeout(hbTimeout)

// Check if we have had a successful contact
lastContact := r.LastContact()
if time.Now().Sub(lastContact) < r.conf.HeartbeatTimeout {
if time.Now().Sub(lastContact) < hbTimeout {
continue
}

Expand Down Expand Up @@ -228,7 +229,8 @@ func (r *Raft) runFollower() {
// called on the main thread, and only makes sense in the follower state.
func (r *Raft) liveBootstrap(configuration Configuration) error {
// Use the pre-init API to make the static updates.
err := BootstrapCluster(&r.conf, r.logs, r.stable, r.snapshots,
cfg := r.config()
err := BootstrapCluster(&cfg, r.logs, r.stable, r.snapshots,
r.trans, configuration)
if err != nil {
return err
Expand Down Expand Up @@ -260,7 +262,7 @@ func (r *Raft) runCandidate() {
// otherwise.
defer func() { r.candidateFromLeadershipTransfer = false }()

electionTimer := randomTimeout(r.conf.ElectionTimeout)
electionTimer := randomTimeout(r.config().ElectionTimeout)

// Tally the votes, need a simple majority
grantedVotes := 0
Expand Down Expand Up @@ -370,8 +372,13 @@ func (r *Raft) runLeader() {
// Notify that we are the leader
overrideNotifyBool(r.leaderCh, true)

// Store the notify chan. It's not reloadable so shouldn't change before the
// defer below runs, but this makes sure we always notify the same chan if
// ever for both gaining and loosing leadership.
notify := r.config().NotifyCh

// Push to the notify channel if given
if notify := r.conf.NotifyCh; notify != nil {
if notify != nil {
select {
case notify <- true:
case <-r.shutdownCh:
Expand Down Expand Up @@ -427,7 +434,7 @@ func (r *Raft) runLeader() {
overrideNotifyBool(r.leaderCh, false)

// Push to the notify channel if given
if notify := r.conf.NotifyCh; notify != nil {
if notify != nil {
select {
case notify <- false:
case <-r.shutdownCh:
Expand Down Expand Up @@ -548,7 +555,9 @@ func (r *Raft) leaderLoop() {
// only a single peer (ourself) and replicating to an undefined set
// of peers.
stepDown := false
lease := time.After(r.conf.LeaderLeaseTimeout)
// This is only used for the first lease check, we reload lease below
// based on the current config value.
lease := time.After(r.config().LeaderLeaseTimeout)

for r.getState() == Leader {
select {
Expand Down Expand Up @@ -583,7 +592,7 @@ func (r *Raft) leaderLoop() {
// the stopCh and doneCh.
go func() {
select {
case <-time.After(r.conf.ElectionTimeout):
case <-time.After(r.config().ElectionTimeout):
close(stopCh)
err := fmt.Errorf("leadership transfer timeout")
r.logger.Debug(err.Error())
Expand Down Expand Up @@ -680,7 +689,7 @@ func (r *Raft) leaderLoop() {
metrics.SetGauge([]string{"raft", "commitNumLogs"}, float32(len(groupReady)))

if stepDown {
if r.conf.ShutdownOnRemove {
if r.config().ShutdownOnRemove {
r.logger.Info("removed ourself, shutting down")
r.Shutdown()
} else {
Expand Down Expand Up @@ -751,7 +760,7 @@ func (r *Raft) leaderLoop() {
// Group commit, gather all the ready commits
ready := []*logFuture{newLog}
GROUP_COMMIT_LOOP:
for i := 0; i < r.conf.MaxAppendEntries; i++ {
for i := 0; i < r.config().MaxAppendEntries; i++ {
select {
case newLog := <-r.applyCh:
ready = append(ready, newLog)
Expand All @@ -776,7 +785,7 @@ func (r *Raft) leaderLoop() {

// Next check interval should adjust for the last node we've
// contacted, without going negative
checkInterval := r.conf.LeaderLeaseTimeout - maxDiff
checkInterval := r.config().LeaderLeaseTimeout - maxDiff
if checkInterval < minCheckInterval {
checkInterval = minCheckInterval
}
Expand Down Expand Up @@ -872,6 +881,11 @@ func (r *Raft) checkLeaderLease() time.Duration {
// Track contacted nodes, we can always contact ourself
contacted := 0

// Store lease timeout for this one check invocation as we need to refer to it
// in the loop and would be confusing if it ever becomes reloadable and
// changes between iterations below.
leaseTimeout := r.config().LeaderLeaseTimeout

// Check each follower
var maxDiff time.Duration
now := time.Now()
Expand All @@ -883,14 +897,14 @@ func (r *Raft) checkLeaderLease() time.Duration {
}
f := r.leaderState.replState[server.ID]
diff := now.Sub(f.LastContact())
if diff <= r.conf.LeaderLeaseTimeout {
if diff <= leaseTimeout {
contacted++
if diff > maxDiff {
maxDiff = diff
}
} else {
// Log at least once at high value, then debug. Otherwise it gets very verbose.
if diff <= 3*r.conf.LeaderLeaseTimeout {
if diff <= 3*leaseTimeout {
r.logger.Warn("failed to contact", "server-id", server.ID, "time", diff)
} else {
r.logger.Debug("failed to contact", "server-id", server.ID, "time", diff)
Expand Down Expand Up @@ -1131,7 +1145,11 @@ func (r *Raft) processLogs(index uint64, futures map[uint64]*logFuture) {
}
}

batch := make([]*commitTuple, 0, r.conf.MaxAppendEntries)
// Store maxAppendEntries for this call in case it ever becomes reloadable. We
// need to use the same value for all lines here to get the expected result.
maxAppendEntries := r.config().MaxAppendEntries

batch := make([]*commitTuple, 0, maxAppendEntries)

// Apply all the preceding logs
for idx := lastApplied + 1; idx <= index; idx++ {
Expand All @@ -1156,9 +1174,9 @@ func (r *Raft) processLogs(index uint64, futures map[uint64]*logFuture) {
batch = append(batch, preparedLog)

// If we have filled up a batch, send it to the FSM
if len(batch) >= r.conf.MaxAppendEntries {
if len(batch) >= maxAppendEntries {
applyBatch(batch)
batch = make([]*commitTuple, 0, r.conf.MaxAppendEntries)
batch = make([]*commitTuple, 0, maxAppendEntries)
}

case futureOk:
Expand Down
Loading