Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Split apart and test appendConfigurationEntry #132

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,18 @@ const (
Staging
)

func (s ServerSuffrage) String() string {
switch s {
case Voter:
return "Voter"
case Nonvoter:
return "Nonvoter"
case Staging:
return "Staging"
}
return "ServerSuffrage"
}

// ServerID is a unique string identifying a server for all time.
type ServerID string

Expand Down
16 changes: 16 additions & 0 deletions future.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,22 @@ const (
Promote
)

func (c ConfigurationChangeCommand) String() string {
switch c {
case AddStaging:
return "AddStaging"
case AddNonvoter:
return "AddNonvoter"
case DemoteVoter:
return "DemoteVoter"
case RemoveServer:
return "RemoveServer"
case Promote:
return "Promote"
}
return "ConfigurationChangeCommand"
}

// There are several types of requests that cause a configuration entry to
// be appended to the log. These are encoded here for leaderLoop() to process.
// This is internal to a single server.
Expand Down
129 changes: 64 additions & 65 deletions raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -956,9 +956,7 @@ func (r *Raft) runLeader() {
}()

// Start a replication routine for each peer
for _, server := range r.configurations.latest.Servers {
r.startReplication(server)
}
r.startStopReplication()

// Dispatch a no-op log entry first. This gets this leader up to the latest
// possible commit index, even in the absence of client commands. This used
Expand All @@ -977,41 +975,49 @@ func (r *Raft) runLeader() {
r.leaderLoop()
}

// startReplication is a helper to setup state and start async replication to a peer.
// It's safe to call this with the local server (in that case, it does nothing.)
func (r *Raft) startReplication(server Server) {
if server.ID == r.localID {
return
}
// startStopReplication will set up state and start asynchronous replication to
// new peers, and stop replication to removed peers. Before removing a peer,
// it'll instruct the replication routines to try to replicate to the current
// index.
func (r *Raft) startStopReplication() {
inConfig := make(map[ServerID]bool, len(r.leaderState.replState))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should probably be len(r.configurations.latest.Servers). This'll be 0 at this time.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nevermind - not for the stop case.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even then, you're still right: there'll be one entry in the map per len(r.configurations.latest.Servers), perhaps except for the local server.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah it can't hurt to use that as the size for sure.

lastIdx := r.getLastIndex()
s := &followerReplication{
peer: server,
commitment: r.leaderState.commitment,
stopCh: make(chan uint64, 1),
triggerCh: make(chan struct{}, 1),
currentTerm: r.getCurrentTerm(),
nextIndex: lastIdx + 1,
lastContact: time.Now(),
notifyCh: make(chan struct{}, 1),
stepDown: r.leaderState.stepDown,
}
r.leaderState.replState[server.ID] = s
r.goFunc(func() { r.replicate(s) })
asyncNotifyCh(s.triggerCh)
}

// stopReplication is a helper to stop the replication goroutine for the given
// peer, after it's tried to replicate up through the given index.
// It's safe to call this with the local server (in that case, it does nothing.)
func (r *Raft) stopReplication(server Server, index uint64) {
if server.ID == r.localID {
return
// Start replication goroutines that need starting
for _, server := range r.configurations.latest.Servers {
if server.ID == r.localID {
continue
}
inConfig[server.ID] = true
r.logger.Printf("[INFO] raft: Added peer %v, starting replication", server)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would be good to put down after the !present check so it only logs if we are starting replication. Also, might be good to log the ID here since we log that when removing a server below.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. Logging the entire server will contain the ID, right? Cool?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah this is a superset so it will have the ID. I was worried more about consistency but we probably need to make a sweep over the logging related to IDs and can do that separately.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, got it. I'll switch this one to ID for now, but agreed--I haven't been watching the log messages at all.

if _, present := r.leaderState.replState[server.ID]; !present {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was initially worried that this wouldn't create a new replication task if there was one in effect that was draining (if you added and removed a peer quickly), because the old code didn't have the !present check, but we delete from replState whenever we stop replication so I think we are good.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For idiomatic go-ness we should rename present to ok.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not for this PR, but are we ok with two replication routines for the same peer? Might be ok, might be confusing.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree - we should think that through - I'll add it to the list.

s := &followerReplication{
peer: server,
commitment: r.leaderState.commitment,
stopCh: make(chan uint64, 1),
triggerCh: make(chan struct{}, 1),
currentTerm: r.getCurrentTerm(),
nextIndex: lastIdx + 1,
lastContact: time.Now(),
notifyCh: make(chan struct{}, 1),
stepDown: r.leaderState.stepDown,
}
r.leaderState.replState[server.ID] = s
r.goFunc(func() { r.replicate(s) })
asyncNotifyCh(s.triggerCh)
}
}
if repl, ok := r.leaderState.replState[server.ID]; ok {
// Replicate up to this index and stop
repl.stopCh <- index

// Stop replication goroutines that need stopping
for serverID, repl := range r.leaderState.replState {
if inConfig[serverID] {
continue
}
// Replicate up to lastIdx and stop
r.logger.Printf("[INFO] raft: Removed peer %v, stopping replication after %v", serverID, lastIdx)
repl.stopCh <- lastIdx
close(repl.stopCh)
delete(r.leaderState.replState, server.ID)
delete(r.leaderState.replState, serverID)
}
}

Expand Down Expand Up @@ -1232,23 +1238,22 @@ func (r *Raft) quorumSize() int {
return voters/2 + 1
}

// appendConfigurationEntry changes the configuration and adds a new
// configuration entry to the log
func (r *Raft) appendConfigurationEntry(future *configurationChangeFuture) {
configuration := cloneConfiguration(r.configurations.latest) // same as committed

if future.prevIndex > 0 && future.prevIndex != r.configurations.latestIndex {
// nextConfiguration generates a new Configuration from the current one and a
// configuration change request. It's split from appendConfigurationEntry so
// that it can be unit tested easily. This function reports errors directly to
// the future; it returns an empty configuration if and only if
// future.responded is true.
func nextConfiguration(current Configuration, currentIndex uint64, future *configurationChangeFuture) Configuration {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I <3 non-member functions for things like this!

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess. The signature is uglier than I hoped. Maybe the future should have a nested struct called req, and this should take a req and return a (Configuration, error)?

The other ugly thing is that we can't log in this function. But overall it's better than pulling in all the test setup of raft_test.go.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah agree the future at this level is a bit ugly. I'd be in favor of a nested struct in the future as the argument to this with (Configuration, error) as the return so it's more natural to call this.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool, will do.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That cleaned things up nicely.

if future.prevIndex > 0 && future.prevIndex != currentIndex {
future.respond(fmt.Errorf("Configuration changed since %v (latest is %v)",
future.prevIndex, r.configurations.latestIndex))
future.prevIndex, currentIndex))
return Configuration{}
}

var start []Server
var stop []Server
var commandStr string
configuration := cloneConfiguration(current)
switch future.command {
case AddStaging:
// TODO: barf on new address?
commandStr = "AddStaging"
newServer := Server{
// TODO: This should add the server as Staging, to be automatically
// promoted to Voter later. However, the promoton to Voter is not yet
Expand All @@ -1274,10 +1279,8 @@ func (r *Raft) appendConfigurationEntry(future *configurationChangeFuture) {
}
if !found {
configuration.Servers = append(configuration.Servers, newServer)
start = append(start, newServer)
}
case AddNonvoter:
commandStr = "AddNonvoter"
newServer := Server{
Suffrage: Nonvoter,
ID: future.serverID,
Expand All @@ -1286,7 +1289,7 @@ func (r *Raft) appendConfigurationEntry(future *configurationChangeFuture) {
found := false
for i, server := range configuration.Servers {
if server.ID == future.serverID {
if server.Suffrage == Voter {
if server.Suffrage != Nonvoter {
configuration.Servers[i].Address = future.serverAddress
} else {
configuration.Servers[i] = newServer
Expand All @@ -1297,27 +1300,22 @@ func (r *Raft) appendConfigurationEntry(future *configurationChangeFuture) {
}
if !found {
configuration.Servers = append(configuration.Servers, newServer)
start = append(start, newServer)
}
case DemoteVoter:
commandStr = "DemoteVoter"
for i, server := range configuration.Servers {
if server.ID == future.serverID {
configuration.Servers[i].Suffrage = Nonvoter
break
}
}
case RemoveServer:
commandStr = "RemoveServer"
for i, server := range configuration.Servers {
if server.ID == future.serverID {
configuration.Servers = append(configuration.Servers[:i], configuration.Servers[i+1:]...)
stop = append(stop, server)
break
}
}
case Promote:
commandStr = "Promote"
for i, server := range configuration.Servers {
if server.ID == future.serverID && server.Suffrage == Staging {
configuration.Servers[i].Suffrage = Voter
Expand All @@ -1329,12 +1327,21 @@ func (r *Raft) appendConfigurationEntry(future *configurationChangeFuture) {
// Make sure we didn't do something bad like remove the last voter
if err := checkConfiguration(configuration); err != nil {
future.respond(err)
return
return Configuration{}
}

r.logger.Printf("[INFO] raft: Updating configuration with %v (%v, %v) to %v",
commandStr, future.serverAddress, future.serverID, configuration)
return configuration
}

// appendConfigurationEntry changes the configuration and adds a new
// configuration entry to the log
func (r *Raft) appendConfigurationEntry(future *configurationChangeFuture) {
configuration := nextConfiguration(r.configurations.latest, r.configurations.latestIndex, future)
if future.responded {
return
}
r.logger.Printf("[INFO] raft: Updating configuration with %s (%v, %v) to %v",
future.command, future.serverAddress, future.serverID, configuration)
future.log = Log{
Type: LogConfiguration,
Data: encodeConfiguration(configuration),
Expand All @@ -1344,15 +1351,7 @@ func (r *Raft) appendConfigurationEntry(future *configurationChangeFuture) {
r.configurations.latest = configuration
r.configurations.latestIndex = index
r.leaderState.commitment.setConfiguration(configuration)

for _, server := range start {
r.logger.Printf("[INFO] raft: Added peer %v, starting replication", server)
r.startReplication(server)
}
for _, server := range stop {
r.logger.Printf("[INFO] raft: Removed peer %v, stopping replication after %v", server, index)
r.stopReplication(server, index)
}
r.startStopReplication()
}

// dispatchLog is called on the leader to push a log to disk, mark it
Expand Down
Loading