diff --git a/api.go b/api.go index d8623716f..a9e7e9026 100644 --- a/api.go +++ b/api.go @@ -7,6 +7,7 @@ import ( "os" "strconv" "sync" + "sync/atomic" "time" metrics "github.com/armon/go-metrics" @@ -138,6 +139,10 @@ type Raft struct { // the log/snapshot. configurations configurations + // Holds a copy of the latest configuration which can be read + // independently from main loop. + latestConfiguration atomic.Value + // RPC chan comes from the transport layer rpcCh <-chan RPC @@ -603,18 +608,17 @@ func (r *Raft) restoreSnapshot() error { r.setLastSnapshot(snapshot.Index, snapshot.Term) // Update the configuration + var conf Configuration + var index uint64 if snapshot.Version > 0 { - r.configurations.committed = snapshot.Configuration - r.configurations.committedIndex = snapshot.ConfigurationIndex - r.configurations.latest = snapshot.Configuration - r.configurations.latestIndex = snapshot.ConfigurationIndex + conf = snapshot.Configuration + index = snapshot.ConfigurationIndex } else { - configuration := decodePeers(snapshot.Peers, r.trans) - r.configurations.committed = configuration - r.configurations.committedIndex = snapshot.Index - r.configurations.latest = configuration - r.configurations.latestIndex = snapshot.Index + conf = decodePeers(snapshot.Peers, r.trans) + index = snapshot.Index } + r.setCommittedConfiguration(conf, index) + r.setLatestConfiguration(conf, index) // Success! return nil @@ -746,19 +750,14 @@ func (r *Raft) VerifyLeader() Future { } } -// GetConfiguration returns the latest configuration and its associated index -// currently in use. This may not yet be committed. This must not be called on -// the main thread (which can access the information directly). +// GetConfiguration returns the latest configuration. This may not yet be +// committed. The main loop can access this directly. func (r *Raft) GetConfiguration() ConfigurationFuture { configReq := &configurationsFuture{} configReq.init() - select { - case <-r.shutdownCh: - configReq.respond(ErrRaftShutdown) - return configReq - case r.configurationsCh <- configReq: - return configReq - } + configReq.configurations = configurations{latest: r.getLatestConfiguration()} + configReq.respond(nil) + return configReq } // AddPeer (deprecated) is used to add a new peer into the cluster. This must be diff --git a/raft.go b/raft.go index bdac6e149..81ed3a87a 100644 --- a/raft.go +++ b/raft.go @@ -623,8 +623,7 @@ func (r *Raft) leaderLoop() { // value. if r.configurations.latestIndex > oldCommitIndex && r.configurations.latestIndex <= commitIndex { - r.configurations.committed = r.configurations.latest - r.configurations.committedIndex = r.configurations.latestIndex + r.setCommittedConfiguration(r.configurations.latest, r.configurations.latestIndex) if !hasVote(r.configurations.committed, r.localID) { stepDown = true } @@ -1043,8 +1042,7 @@ func (r *Raft) appendConfigurationEntry(future *configurationChangeFuture) { r.dispatchLogs([]*logFuture{&future.logFuture}) index := future.Index() - r.configurations.latest = configuration - r.configurations.latestIndex = index + r.setLatestConfiguration(configuration, index) r.leaderState.commitment.setConfiguration(configuration) r.startStopReplication() } @@ -1329,8 +1327,7 @@ func (r *Raft) appendEntries(rpc RPC, a *AppendEntriesRequest) { return } if entry.Index <= r.configurations.latestIndex { - r.configurations.latest = r.configurations.committed - r.configurations.latestIndex = r.configurations.committedIndex + r.setLatestConfiguration(r.configurations.committed, r.configurations.committedIndex) } newEntries = a.Entries[i:] break @@ -1365,8 +1362,7 @@ func (r *Raft) appendEntries(rpc RPC, a *AppendEntriesRequest) { idx := min(a.LeaderCommitIndex, r.getLastIndex()) r.setCommitIndex(idx) if r.configurations.latestIndex <= idx { - r.configurations.committed = r.configurations.latest - r.configurations.committedIndex = r.configurations.latestIndex + r.setCommittedConfiguration(r.configurations.latest, r.configurations.latestIndex) } r.processLogs(idx, nil) metrics.MeasureSince([]string{"raft", "rpc", "appendEntries", "processLogs"}, start) @@ -1383,15 +1379,11 @@ func (r *Raft) appendEntries(rpc RPC, a *AppendEntriesRequest) { // called from the main thread, or from NewRaft() before any threads have begun. func (r *Raft) processConfigurationLogEntry(entry *Log) { if entry.Type == LogConfiguration { - r.configurations.committed = r.configurations.latest - r.configurations.committedIndex = r.configurations.latestIndex - r.configurations.latest = DecodeConfiguration(entry.Data) - r.configurations.latestIndex = entry.Index + r.setCommittedConfiguration(r.configurations.latest, r.configurations.latestIndex) + r.setLatestConfiguration(DecodeConfiguration(entry.Data), entry.Index) } else if entry.Type == LogAddPeerDeprecated || entry.Type == LogRemovePeerDeprecated { - r.configurations.committed = r.configurations.latest - r.configurations.committedIndex = r.configurations.latestIndex - r.configurations.latest = decodePeers(entry.Data, r.trans) - r.configurations.latestIndex = entry.Index + r.setCommittedConfiguration(r.configurations.latest, r.configurations.latestIndex) + r.setLatestConfiguration(decodePeers(entry.Data, r.trans), entry.Index) } } @@ -1606,10 +1598,8 @@ func (r *Raft) installSnapshot(rpc RPC, req *InstallSnapshotRequest) { r.setLastSnapshot(req.LastLogIndex, req.LastLogTerm) // Restore the peer set - r.configurations.latest = reqConfiguration - r.configurations.latestIndex = reqConfigurationIndex - r.configurations.committed = reqConfiguration - r.configurations.committedIndex = reqConfigurationIndex + r.setLatestConfiguration(reqConfiguration, reqConfigurationIndex) + r.setCommittedConfiguration(reqConfiguration, reqConfigurationIndex) // Compact logs, continue even if this fails if err := r.compactLogs(req.LastLogIndex); err != nil { @@ -1796,3 +1786,30 @@ func (r *Raft) timeoutNow(rpc RPC, req *TimeoutNowRequest) { r.candidateFromLeadershipTransfer = true rpc.Respond(&TimeoutNowResponse{}, nil) } + +// setLatestConfiguration stores the latest configuration and updates a copy of it. +func (r *Raft) setLatestConfiguration(c Configuration, i uint64) { + r.configurations.latest = c + r.configurations.latestIndex = i + r.latestConfiguration.Store(c.Clone()) +} + +// setCommittedConfiguration stores the committed configuration. +func (r *Raft) setCommittedConfiguration(c Configuration, i uint64) { + r.configurations.committed = c + r.configurations.committedIndex = i +} + +// getLatestConfiguration reads the configuration from a copy of the main +// configuration, which means it can be accessed independently from the main +// loop. +func (r *Raft) getLatestConfiguration() Configuration { + // this switch catches the case where this is called without having set + // a configuration previously. + switch c := r.latestConfiguration.Load().(type) { + case Configuration: + return c + default: + return Configuration{} + } +}