Skip to content

Commit

Permalink
Read latest configuration independently from main loop (#379)
Browse files Browse the repository at this point in the history
  • Loading branch information
hanshasselberg authored Jan 8, 2020
1 parent 9ecdba6 commit 51329a7
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 39 deletions.
37 changes: 18 additions & 19 deletions api.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"os"
"strconv"
"sync"
"sync/atomic"
"time"

metrics "github.com/armon/go-metrics"
Expand Down Expand Up @@ -138,6 +139,10 @@ type Raft struct {
// the log/snapshot.
configurations configurations

// Holds a copy of the latest configuration which can be read
// independently from main loop.
latestConfiguration atomic.Value

// RPC chan comes from the transport layer
rpcCh <-chan RPC

Expand Down Expand Up @@ -603,18 +608,17 @@ func (r *Raft) restoreSnapshot() error {
r.setLastSnapshot(snapshot.Index, snapshot.Term)

// Update the configuration
var conf Configuration
var index uint64
if snapshot.Version > 0 {
r.configurations.committed = snapshot.Configuration
r.configurations.committedIndex = snapshot.ConfigurationIndex
r.configurations.latest = snapshot.Configuration
r.configurations.latestIndex = snapshot.ConfigurationIndex
conf = snapshot.Configuration
index = snapshot.ConfigurationIndex
} else {
configuration := decodePeers(snapshot.Peers, r.trans)
r.configurations.committed = configuration
r.configurations.committedIndex = snapshot.Index
r.configurations.latest = configuration
r.configurations.latestIndex = snapshot.Index
conf = decodePeers(snapshot.Peers, r.trans)
index = snapshot.Index
}
r.setCommittedConfiguration(conf, index)
r.setLatestConfiguration(conf, index)

// Success!
return nil
Expand Down Expand Up @@ -746,19 +750,14 @@ func (r *Raft) VerifyLeader() Future {
}
}

// GetConfiguration returns the latest configuration and its associated index
// currently in use. This may not yet be committed. This must not be called on
// the main thread (which can access the information directly).
// GetConfiguration returns the latest configuration. This may not yet be
// committed. The main loop can access this directly.
func (r *Raft) GetConfiguration() ConfigurationFuture {
configReq := &configurationsFuture{}
configReq.init()
select {
case <-r.shutdownCh:
configReq.respond(ErrRaftShutdown)
return configReq
case r.configurationsCh <- configReq:
return configReq
}
configReq.configurations = configurations{latest: r.getLatestConfiguration()}
configReq.respond(nil)
return configReq
}

// AddPeer (deprecated) is used to add a new peer into the cluster. This must be
Expand Down
57 changes: 37 additions & 20 deletions raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -623,8 +623,7 @@ func (r *Raft) leaderLoop() {
// value.
if r.configurations.latestIndex > oldCommitIndex &&
r.configurations.latestIndex <= commitIndex {
r.configurations.committed = r.configurations.latest
r.configurations.committedIndex = r.configurations.latestIndex
r.setCommittedConfiguration(r.configurations.latest, r.configurations.latestIndex)
if !hasVote(r.configurations.committed, r.localID) {
stepDown = true
}
Expand Down Expand Up @@ -1043,8 +1042,7 @@ func (r *Raft) appendConfigurationEntry(future *configurationChangeFuture) {

r.dispatchLogs([]*logFuture{&future.logFuture})
index := future.Index()
r.configurations.latest = configuration
r.configurations.latestIndex = index
r.setLatestConfiguration(configuration, index)
r.leaderState.commitment.setConfiguration(configuration)
r.startStopReplication()
}
Expand Down Expand Up @@ -1329,8 +1327,7 @@ func (r *Raft) appendEntries(rpc RPC, a *AppendEntriesRequest) {
return
}
if entry.Index <= r.configurations.latestIndex {
r.configurations.latest = r.configurations.committed
r.configurations.latestIndex = r.configurations.committedIndex
r.setLatestConfiguration(r.configurations.committed, r.configurations.committedIndex)
}
newEntries = a.Entries[i:]
break
Expand Down Expand Up @@ -1365,8 +1362,7 @@ func (r *Raft) appendEntries(rpc RPC, a *AppendEntriesRequest) {
idx := min(a.LeaderCommitIndex, r.getLastIndex())
r.setCommitIndex(idx)
if r.configurations.latestIndex <= idx {
r.configurations.committed = r.configurations.latest
r.configurations.committedIndex = r.configurations.latestIndex
r.setCommittedConfiguration(r.configurations.latest, r.configurations.latestIndex)
}
r.processLogs(idx, nil)
metrics.MeasureSince([]string{"raft", "rpc", "appendEntries", "processLogs"}, start)
Expand All @@ -1383,15 +1379,11 @@ func (r *Raft) appendEntries(rpc RPC, a *AppendEntriesRequest) {
// called from the main thread, or from NewRaft() before any threads have begun.
func (r *Raft) processConfigurationLogEntry(entry *Log) {
if entry.Type == LogConfiguration {
r.configurations.committed = r.configurations.latest
r.configurations.committedIndex = r.configurations.latestIndex
r.configurations.latest = DecodeConfiguration(entry.Data)
r.configurations.latestIndex = entry.Index
r.setCommittedConfiguration(r.configurations.latest, r.configurations.latestIndex)
r.setLatestConfiguration(DecodeConfiguration(entry.Data), entry.Index)
} else if entry.Type == LogAddPeerDeprecated || entry.Type == LogRemovePeerDeprecated {
r.configurations.committed = r.configurations.latest
r.configurations.committedIndex = r.configurations.latestIndex
r.configurations.latest = decodePeers(entry.Data, r.trans)
r.configurations.latestIndex = entry.Index
r.setCommittedConfiguration(r.configurations.latest, r.configurations.latestIndex)
r.setLatestConfiguration(decodePeers(entry.Data, r.trans), entry.Index)
}
}

Expand Down Expand Up @@ -1606,10 +1598,8 @@ func (r *Raft) installSnapshot(rpc RPC, req *InstallSnapshotRequest) {
r.setLastSnapshot(req.LastLogIndex, req.LastLogTerm)

// Restore the peer set
r.configurations.latest = reqConfiguration
r.configurations.latestIndex = reqConfigurationIndex
r.configurations.committed = reqConfiguration
r.configurations.committedIndex = reqConfigurationIndex
r.setLatestConfiguration(reqConfiguration, reqConfigurationIndex)
r.setCommittedConfiguration(reqConfiguration, reqConfigurationIndex)

// Compact logs, continue even if this fails
if err := r.compactLogs(req.LastLogIndex); err != nil {
Expand Down Expand Up @@ -1796,3 +1786,30 @@ func (r *Raft) timeoutNow(rpc RPC, req *TimeoutNowRequest) {
r.candidateFromLeadershipTransfer = true
rpc.Respond(&TimeoutNowResponse{}, nil)
}

// setLatestConfiguration stores the latest configuration and updates a copy of it.
func (r *Raft) setLatestConfiguration(c Configuration, i uint64) {
r.configurations.latest = c
r.configurations.latestIndex = i
r.latestConfiguration.Store(c.Clone())
}

// setCommittedConfiguration stores the committed configuration.
func (r *Raft) setCommittedConfiguration(c Configuration, i uint64) {
r.configurations.committed = c
r.configurations.committedIndex = i
}

// getLatestConfiguration reads the configuration from a copy of the main
// configuration, which means it can be accessed independently from the main
// loop.
func (r *Raft) getLatestConfiguration() Configuration {
// this switch catches the case where this is called without having set
// a configuration previously.
switch c := r.latestConfiguration.Load().(type) {
case Configuration:
return c
default:
return Configuration{}
}
}

0 comments on commit 51329a7

Please sign in to comment.