diff --git a/api.go b/api.go index 76275fd14..e024e58ba 100644 --- a/api.go +++ b/api.go @@ -81,8 +81,15 @@ type Raft struct { // be committed and applied to the FSM. applyCh chan *logFuture - // Configuration provided at Raft initialization - conf Config + // conf stores the current configuration to use. This is the most recent one + // provided. All reads of config values should use the config() helper method + // to read this safely. + conf atomic.Value + + // confReloadMu ensures that only one thread can reload config at once since + // we need to read-modify-write the atomic. It is NOT necessary to hold this + // for any other operation e.g. reading config using config(). + confReloadMu sync.Mutex // FSM is the client state machine to apply commands to fsm FSM @@ -385,9 +392,9 @@ func RecoverCluster(conf *Config, fsm FSM, logs LogStore, stable StableStore, return nil } -// GetConfiguration returns the configuration of the Raft cluster without -// starting a Raft instance or connecting to the cluster -// This function has identical behavior to Raft.GetConfiguration +// GetConfiguration returns the persisted configuration of the Raft cluster +// without starting a Raft instance or connecting to the cluster. This function +// has identical behavior to Raft.GetConfiguration. func GetConfiguration(conf *Config, fsm FSM, logs LogStore, stable StableStore, snaps SnapshotStore, trans Transport) (Configuration, error) { conf.skipStartup = true @@ -505,7 +512,6 @@ func NewRaft(conf *Config, fsm FSM, logs LogStore, stable StableStore, snaps Sna r := &Raft{ protocolVersion: protocolVersion, applyCh: applyCh, - conf: *conf, fsm: fsm, fsmMutateCh: make(chan interface{}, 128), fsmSnapshotCh: make(chan *reqSnapshotFuture), @@ -530,6 +536,8 @@ func NewRaft(conf *Config, fsm FSM, logs LogStore, stable StableStore, snaps Sna leadershipTransferCh: make(chan *leadershipTransferFuture, 1), } + r.conf.Store(*conf) + // Initialize as a follower. r.setState(Follower) @@ -583,7 +591,7 @@ func (r *Raft) restoreSnapshot() error { // Try to load in order of newest to oldest for _, snapshot := range snapshots { - if !r.conf.NoSnapshotRestoreOnStart { + if !r.config().NoSnapshotRestoreOnStart { _, source, err := r.snapshots.Open(snapshot.ID) if err != nil { r.logger.Error("failed to open snapshot", "id", snapshot.ID, "error", err) @@ -630,6 +638,31 @@ func (r *Raft) restoreSnapshot() error { return nil } +func (r *Raft) config() Config { + return r.conf.Load().(Config) +} + +// ReloadConfig updates the configuration of a running raft node. If the new +// configuration is invalid an error is returned and no changes made to the +// instance. +func (r *Raft) ReloadConfig(rc *ReloadableConfig) error { + r.confReloadMu.Lock() + defer r.confReloadMu.Unlock() + + // Load the current config (note we are under a lock so it can't be changed + // between this read and a later Store). + oldCfg := r.config() + + // Set the reloadable fields + newCfg := rc.apply(oldCfg) + + if err := ValidateConfig(&newCfg); err != nil { + return err + } + r.conf.Store(newCfg) + return nil +} + // BootstrapCluster is equivalent to non-member BootstrapCluster but can be // called on an un-bootstrapped Raft instance after it has been created. This // should only be called at the beginning of time for the cluster with an diff --git a/config.go b/config.go index 7abbf5d97..7ca0a049a 100644 --- a/config.go +++ b/config.go @@ -164,19 +164,23 @@ type Config struct { // we can become a leader of a cluster containing only this node. ShutdownOnRemove bool - // TrailingLogs controls how many logs we leave after a snapshot. This is - // used so that we can quickly replay logs on a follower instead of being - // forced to send an entire snapshot. + // TrailingLogs controls how many logs we leave after a snapshot. This is used + // so that we can quickly replay logs on a follower instead of being forced to + // send an entire snapshot. The value passed here is the initial setting used. + // This can be tuned during operation using ReloadConfig. TrailingLogs uint64 - // SnapshotInterval controls how often we check if we should perform a snapshot. - // We randomly stagger between this value and 2x this value to avoid the entire - // cluster from performing a snapshot at once. + // SnapshotInterval controls how often we check if we should perform a + // snapshot. We randomly stagger between this value and 2x this value to avoid + // the entire cluster from performing a snapshot at once. The value passed + // here is the initial setting used. This can be tuned during operation using + // ReloadConfig. SnapshotInterval time.Duration // SnapshotThreshold controls how many outstanding logs there must be before // we perform a snapshot. This is to prevent excessive snapshots when we can - // just replay a small set of logs. + // just replay a small set of logs. The value passed here is the initial + // setting used. This can be tuned during operation using ReloadConfig. SnapshotThreshold uint64 // LeaderLeaseTimeout is used to control how long the "lease" lasts @@ -218,6 +222,40 @@ type Config struct { skipStartup bool } +// ReloadableConfig is the subset of Config that may be reconfigured during +// runtime using raft.ReloadConfig. We choose to duplicate fields over embedding +// or accepting a Config but only using specific fields to keep the API clear. +// Reconfiguring some fields is potentially dangerous so we should only +// selectively enable it for fields where that is allowed. +type ReloadableConfig struct { + // TrailingLogs controls how many logs we leave after a snapshot. This is used + // so that we can quickly replay logs on a follower instead of being forced to + // send an entire snapshot. The value passed here updates the setting at runtime + // which will take effect as soon as the next snapshot completes and truncation + // occurs. + TrailingLogs uint64 + + // SnapshotInterval controls how often we check if we should perform a snapshot. + // We randomly stagger between this value and 2x this value to avoid the entire + // cluster from performing a snapshot at once. + SnapshotInterval time.Duration + + // SnapshotThreshold controls how many outstanding logs there must be before + // we perform a snapshot. This is to prevent excessive snapshots when we can + // just replay a small set of logs. + SnapshotThreshold uint64 +} + +// apply sets the reloadable fields on the passed Config to the values in +// `ReloadableConfig`. It returns a copy of Config with the fields from this +// ReloadableConfig set. +func (rc *ReloadableConfig) apply(to Config) Config { + to.TrailingLogs = rc.TrailingLogs + to.SnapshotInterval = rc.SnapshotInterval + to.SnapshotThreshold = rc.SnapshotThreshold + return to +} + // DefaultConfig returns a Config with usable defaults. func DefaultConfig() *Config { return &Config{ diff --git a/raft.go b/raft.go index e12768fac..5a42cc093 100644 --- a/raft.go +++ b/raft.go @@ -29,7 +29,7 @@ var ( // responses. func (r *Raft) getRPCHeader() RPCHeader { return RPCHeader{ - ProtocolVersion: r.conf.ProtocolVersion, + ProtocolVersion: r.config().ProtocolVersion, } } @@ -56,7 +56,7 @@ func (r *Raft) checkRPCHeader(rpc RPC) error { // currently what we want, and in general support one version back. We // may need to revisit this policy depending on how future protocol // changes evolve. - if header.ProtocolVersion < r.conf.ProtocolVersion-1 { + if header.ProtocolVersion < r.config().ProtocolVersion-1 { return ErrUnsupportedProtocol } @@ -151,7 +151,7 @@ func (r *Raft) runFollower() { didWarn := false r.logger.Info("entering follower state", "follower", r, "leader", r.Leader()) metrics.IncrCounter([]string{"raft", "state", "follower"}, 1) - heartbeatTimer := randomTimeout(r.conf.HeartbeatTimeout) + heartbeatTimer := randomTimeout(r.config().HeartbeatTimeout) for r.getState() == Follower { select { @@ -187,11 +187,12 @@ func (r *Raft) runFollower() { case <-heartbeatTimer: // Restart the heartbeat timer - heartbeatTimer = randomTimeout(r.conf.HeartbeatTimeout) + hbTimeout := r.config().HeartbeatTimeout + heartbeatTimer = randomTimeout(hbTimeout) // Check if we have had a successful contact lastContact := r.LastContact() - if time.Now().Sub(lastContact) < r.conf.HeartbeatTimeout { + if time.Now().Sub(lastContact) < hbTimeout { continue } @@ -228,7 +229,8 @@ func (r *Raft) runFollower() { // called on the main thread, and only makes sense in the follower state. func (r *Raft) liveBootstrap(configuration Configuration) error { // Use the pre-init API to make the static updates. - err := BootstrapCluster(&r.conf, r.logs, r.stable, r.snapshots, + cfg := r.config() + err := BootstrapCluster(&cfg, r.logs, r.stable, r.snapshots, r.trans, configuration) if err != nil { return err @@ -260,7 +262,7 @@ func (r *Raft) runCandidate() { // otherwise. defer func() { r.candidateFromLeadershipTransfer = false }() - electionTimer := randomTimeout(r.conf.ElectionTimeout) + electionTimer := randomTimeout(r.config().ElectionTimeout) // Tally the votes, need a simple majority grantedVotes := 0 @@ -370,8 +372,13 @@ func (r *Raft) runLeader() { // Notify that we are the leader overrideNotifyBool(r.leaderCh, true) + // Store the notify chan. It's not reloadable so shouldn't change before the + // defer below runs, but this makes sure we always notify the same chan if + // ever for both gaining and loosing leadership. + notify := r.config().NotifyCh + // Push to the notify channel if given - if notify := r.conf.NotifyCh; notify != nil { + if notify != nil { select { case notify <- true: case <-r.shutdownCh: @@ -427,7 +434,7 @@ func (r *Raft) runLeader() { overrideNotifyBool(r.leaderCh, false) // Push to the notify channel if given - if notify := r.conf.NotifyCh; notify != nil { + if notify != nil { select { case notify <- false: case <-r.shutdownCh: @@ -548,7 +555,9 @@ func (r *Raft) leaderLoop() { // only a single peer (ourself) and replicating to an undefined set // of peers. stepDown := false - lease := time.After(r.conf.LeaderLeaseTimeout) + // This is only used for the first lease check, we reload lease below + // based on the current config value. + lease := time.After(r.config().LeaderLeaseTimeout) for r.getState() == Leader { select { @@ -583,7 +592,7 @@ func (r *Raft) leaderLoop() { // the stopCh and doneCh. go func() { select { - case <-time.After(r.conf.ElectionTimeout): + case <-time.After(r.config().ElectionTimeout): close(stopCh) err := fmt.Errorf("leadership transfer timeout") r.logger.Debug(err.Error()) @@ -680,7 +689,7 @@ func (r *Raft) leaderLoop() { metrics.SetGauge([]string{"raft", "commitNumLogs"}, float32(len(groupReady))) if stepDown { - if r.conf.ShutdownOnRemove { + if r.config().ShutdownOnRemove { r.logger.Info("removed ourself, shutting down") r.Shutdown() } else { @@ -751,7 +760,7 @@ func (r *Raft) leaderLoop() { // Group commit, gather all the ready commits ready := []*logFuture{newLog} GROUP_COMMIT_LOOP: - for i := 0; i < r.conf.MaxAppendEntries; i++ { + for i := 0; i < r.config().MaxAppendEntries; i++ { select { case newLog := <-r.applyCh: ready = append(ready, newLog) @@ -776,7 +785,7 @@ func (r *Raft) leaderLoop() { // Next check interval should adjust for the last node we've // contacted, without going negative - checkInterval := r.conf.LeaderLeaseTimeout - maxDiff + checkInterval := r.config().LeaderLeaseTimeout - maxDiff if checkInterval < minCheckInterval { checkInterval = minCheckInterval } @@ -872,6 +881,11 @@ func (r *Raft) checkLeaderLease() time.Duration { // Track contacted nodes, we can always contact ourself contacted := 0 + // Store lease timeout for this one check invocation as we need to refer to it + // in the loop and would be confusing if it ever becomes reloadable and + // changes between iterations below. + leaseTimeout := r.config().LeaderLeaseTimeout + // Check each follower var maxDiff time.Duration now := time.Now() @@ -883,14 +897,14 @@ func (r *Raft) checkLeaderLease() time.Duration { } f := r.leaderState.replState[server.ID] diff := now.Sub(f.LastContact()) - if diff <= r.conf.LeaderLeaseTimeout { + if diff <= leaseTimeout { contacted++ if diff > maxDiff { maxDiff = diff } } else { // Log at least once at high value, then debug. Otherwise it gets very verbose. - if diff <= 3*r.conf.LeaderLeaseTimeout { + if diff <= 3*leaseTimeout { r.logger.Warn("failed to contact", "server-id", server.ID, "time", diff) } else { r.logger.Debug("failed to contact", "server-id", server.ID, "time", diff) @@ -1131,7 +1145,11 @@ func (r *Raft) processLogs(index uint64, futures map[uint64]*logFuture) { } } - batch := make([]*commitTuple, 0, r.conf.MaxAppendEntries) + // Store maxAppendEntries for this call in case it ever becomes reloadable. We + // need to use the same value for all lines here to get the expected result. + maxAppendEntries := r.config().MaxAppendEntries + + batch := make([]*commitTuple, 0, maxAppendEntries) // Apply all the preceding logs for idx := lastApplied + 1; idx <= index; idx++ { @@ -1156,9 +1174,9 @@ func (r *Raft) processLogs(index uint64, futures map[uint64]*logFuture) { batch = append(batch, preparedLog) // If we have filled up a batch, send it to the FSM - if len(batch) >= r.conf.MaxAppendEntries { + if len(batch) >= maxAppendEntries { applyBatch(batch) - batch = make([]*commitTuple, 0, r.conf.MaxAppendEntries) + batch = make([]*commitTuple, 0, maxAppendEntries) } case futureOk: diff --git a/raft_test.go b/raft_test.go index e0086b085..d30266e2b 100644 --- a/raft_test.go +++ b/raft_test.go @@ -12,6 +12,8 @@ import ( "sync/atomic" "testing" "time" + + "github.com/stretchr/testify/require" ) func TestRaft_StartStop(t *testing.T) { @@ -108,7 +110,8 @@ func TestRaft_RecoverCluster_NoState(t *testing.T) { }, }, } - err := RecoverCluster(&r.conf, &MockFSM{}, r.logs, r.stable, + cfg := r.config() + err := RecoverCluster(&cfg, &MockFSM{}, r.logs, r.stable, r.snapshots, r.trans, configuration) if err == nil || !strings.Contains(err.Error(), "no initial state") { c.FailNowf("should have failed for no initial state: %v", err) @@ -158,7 +161,8 @@ func TestRaft_RecoverCluster(t *testing.T) { if err != nil { c.FailNowf("snapshot list err: %v", err) } - if err = RecoverCluster(&r.conf, &MockFSM{}, r.logs, r.stable, + cfg := r.config() + if err = RecoverCluster(&cfg, &MockFSM{}, r.logs, r.stable, r.snapshots, r.trans, configuration); err != nil { c.FailNowf("recover err: %v", err) } @@ -191,7 +195,7 @@ func TestRaft_RecoverCluster(t *testing.T) { // operation. _, trans := NewInmemTransport(r.localAddr) var r2 *Raft - r2, err = NewRaft(&r.conf, &MockFSM{}, r.logs, r.stable, r.snapshots, trans) + r2, err = NewRaft(&cfg, &MockFSM{}, r.logs, r.stable, r.snapshots, trans) if err != nil { c.FailNowf("new raft err: %v", err) } @@ -963,7 +967,8 @@ func TestRaft_SnapshotRestore(t *testing.T) { r := leader // Can't just reuse the old transport as it will be closed _, trans2 := NewInmemTransport(r.trans.LocalAddr()) - r, err := NewRaft(&r.conf, r.fsm, r.logs, r.stable, r.snapshots, trans2) + cfg := r.config() + r, err := NewRaft(&cfg, r.fsm, r.logs, r.stable, r.snapshots, trans2) if err != nil { c.FailNowf("err: %v", err) } @@ -1013,7 +1018,8 @@ func TestRaft_NoRestoreOnStart(t *testing.T) { _, trans := NewInmemTransport(leader.localAddr) newFSM := &MockFSM{} - _, err := NewRaft(&leader.conf, newFSM, leader.logs, leader.stable, leader.snapshots, trans) + cfg := leader.config() + _, err := NewRaft(&cfg, newFSM, leader.logs, leader.stable, leader.snapshots, trans) if err != nil { c.FailNowf("err: %v", err) } @@ -1094,7 +1100,8 @@ func TestRaft_SnapshotRestore_PeerChange(t *testing.T) { if err != nil { c.FailNowf("err: %v", err) } - if err = RecoverCluster(&r.conf, &MockFSM{}, r.logs, r.stable, + cfg := r.config() + if err = RecoverCluster(&cfg, &MockFSM{}, r.logs, r.stable, r.snapshots, r.trans, configuration); err != nil { c.FailNowf("err: %v", err) } @@ -1102,7 +1109,7 @@ func TestRaft_SnapshotRestore_PeerChange(t *testing.T) { // Can't just reuse the old transport as it will be closed. We also start // with a fresh FSM for good measure so no state can carry over. _, trans := NewInmemTransport(r.localAddr) - r, err = NewRaft(&r.conf, &MockFSM{}, r.logs, r.stable, r.snapshots, trans) + r, err = NewRaft(&cfg, &MockFSM{}, r.logs, r.stable, r.snapshots, trans) if err != nil { c.FailNowf("err: %v", err) } @@ -2080,7 +2087,9 @@ func TestRaft_LeadershipTransferLeaderReplicationTimeout(t *testing.T) { // set ElectionTimeout really short because this is used to determine // how long a transfer is allowed to take. - l.conf.ElectionTimeout = 1 * time.Nanosecond + cfg := l.config() + cfg.ElectionTimeout = 1 * time.Nanosecond + l.conf.Store(cfg) future := l.LeadershipTransferToServer(behind.localID, behind.localAddr) if future.Error() == nil { @@ -2167,6 +2176,58 @@ func TestRaft_GetConfigurationNoBootstrap(t *testing.T) { } } +func TestRaft_ReloadConfig(t *testing.T) { + conf := inmemConfig(t) + c := MakeCluster(1, t, conf) + defer c.Close() + raft := c.rafts[0] + + // Make sure the reloadable values are as expected before + require.Equal(t, uint64(10240), raft.config().TrailingLogs) + require.Equal(t, 120*time.Second, raft.config().SnapshotInterval) + require.Equal(t, uint64(8192), raft.config().SnapshotThreshold) + + // Reload with different values + newCfg := &ReloadableConfig{ + TrailingLogs: 12345, + SnapshotInterval: 234 * time.Second, + SnapshotThreshold: 6789, + } + + require.NoError(t, raft.ReloadConfig(newCfg)) + + // Now we should have new values + require.Equal(t, newCfg.TrailingLogs, raft.config().TrailingLogs) + require.Equal(t, newCfg.SnapshotInterval, raft.config().SnapshotInterval) + require.Equal(t, newCfg.SnapshotThreshold, raft.config().SnapshotThreshold) +} + +func TestRaft_ReloadConfigValidates(t *testing.T) { + conf := inmemConfig(t) + c := MakeCluster(1, t, conf) + defer c.Close() + raft := c.rafts[0] + + // Make sure the reloadable values are as expected before + require.Equal(t, uint64(10240), raft.config().TrailingLogs) + require.Equal(t, 120*time.Second, raft.config().SnapshotInterval) + require.Equal(t, uint64(8192), raft.config().SnapshotThreshold) + + // Reload with different values that are invalid per ValidateConfig + newCfg := &ReloadableConfig{ + TrailingLogs: 12345, + SnapshotInterval: 1 * time.Millisecond, // must be >= 5 millisecond + SnapshotThreshold: 6789, + } + + require.Error(t, raft.ReloadConfig(newCfg)) + + // Now we should have same values + require.Equal(t, uint64(10240), raft.config().TrailingLogs) + require.Equal(t, 120*time.Second, raft.config().SnapshotInterval) + require.Equal(t, uint64(8192), raft.config().SnapshotThreshold) +} + // TODO: These are test cases we'd like to write for appendEntries(). // Unfortunately, it's difficult to do so with the current way this file is // tested. diff --git a/replication.go b/replication.go index 30258a007..3154fda39 100644 --- a/replication.go +++ b/replication.go @@ -161,7 +161,7 @@ RPC: // raft commits stop flowing naturally. The actual heartbeats // can't do this to keep them unblocked by disk IO on the // follower. See https://github.com/hashicorp/raft/issues/282. - case <-randomTimeout(r.conf.CommitTimeout): + case <-randomTimeout(r.config().CommitTimeout): lastLogIdx, _ := r.getLastLog() shouldStop = r.replicateTo(s, lastLogIdx) } @@ -373,7 +373,7 @@ func (r *Raft) heartbeat(s *followerReplication, stopCh chan struct{}) { // Wait for the next heartbeat interval or forced notify select { case <-s.notifyCh: - case <-randomTimeout(r.conf.HeartbeatTimeout / 10): + case <-randomTimeout(r.config().HeartbeatTimeout / 10): case <-stopCh: return } @@ -447,7 +447,7 @@ SEND: case <-s.triggerCh: lastLogIdx, _ := r.getLastLog() shouldStop = r.pipelineSend(s, pipeline, &nextIndex, lastLogIdx) - case <-randomTimeout(r.conf.CommitTimeout): + case <-randomTimeout(r.config().CommitTimeout): lastLogIdx, _ := r.getLastLog() shouldStop = r.pipelineSend(s, pipeline, &nextIndex, lastLogIdx) } @@ -562,9 +562,12 @@ func (r *Raft) setPreviousLog(req *AppendEntriesRequest, nextIndex uint64) error // setNewLogs is used to setup the logs which should be appended for a request. func (r *Raft) setNewLogs(req *AppendEntriesRequest, nextIndex, lastIndex uint64) error { - // Append up to MaxAppendEntries or up to the lastIndex - req.Entries = make([]*Log, 0, r.conf.MaxAppendEntries) - maxIndex := min(nextIndex+uint64(r.conf.MaxAppendEntries)-1, lastIndex) + // Append up to MaxAppendEntries or up to the lastIndex. we need to use a + // consistent value for maxAppendEntries in the lines below in case it ever + // becomes reloadable. + maxAppendEntries := r.config().MaxAppendEntries + req.Entries = make([]*Log, 0, maxAppendEntries) + maxIndex := min(nextIndex+uint64(maxAppendEntries)-1, lastIndex) for i := nextIndex; i <= maxIndex; i++ { oldLog := new(Log) if err := r.logs.GetLog(i, oldLog); err != nil { diff --git a/snapshot.go b/snapshot.go index 805a09d70..d6b267963 100644 --- a/snapshot.go +++ b/snapshot.go @@ -69,7 +69,7 @@ type SnapshotSink interface { func (r *Raft) runSnapshots() { for { select { - case <-randomTimeout(r.conf.SnapshotInterval): + case <-randomTimeout(r.config().SnapshotInterval): // Check if we should snapshot if !r.shouldSnapshot() { continue @@ -113,7 +113,7 @@ func (r *Raft) shouldSnapshot() bool { // Compare the delta to the threshold delta := lastIdx - lastSnap - return delta >= r.conf.SnapshotThreshold + return delta >= r.config().SnapshotThreshold } // takeSnapshot is used to take a new snapshot. This must only be called from @@ -219,7 +219,11 @@ func (r *Raft) compactLogs(snapIdx uint64) error { // Check if we have enough logs to truncate lastLogIdx, _ := r.getLastLog() - if lastLogIdx <= r.conf.TrailingLogs { + + // Use a consistent value for trailingLogs for the duration of this method + // call to avoid surprising behaviour. + trailingLogs := r.config().TrailingLogs + if lastLogIdx <= trailingLogs { return nil } @@ -227,7 +231,7 @@ func (r *Raft) compactLogs(snapIdx uint64) error { // back from the head, which ever is further back. This ensures // at least `TrailingLogs` entries, but does not allow logs // after the snapshot to be removed. - maxLog := min(snapIdx, lastLogIdx-r.conf.TrailingLogs) + maxLog := min(snapIdx, lastLogIdx-trailingLogs) if minLog > maxLog { r.logger.Info("no logs to truncate")