Skip to content

Commit

Permalink
Changes to an "edit the log" override approach.
Browse files Browse the repository at this point in the history
  • Loading branch information
James Phillips committed Jul 19, 2016
1 parent 10778ed commit f75a881
Show file tree
Hide file tree
Showing 6 changed files with 57 additions and 162 deletions.
42 changes: 26 additions & 16 deletions api.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,9 +113,6 @@ type Raft struct {
// RPC chan comes from the transport layer
rpcCh <-chan RPC

// recoveryCh is used to signal a recovery manager that it should disarm.
recoveryCh chan struct{}

// Shutdown channel to exit, protected to prevent concurrent exits
shutdown bool
shutdownCh chan struct{}
Expand Down Expand Up @@ -291,7 +288,6 @@ func NewRaft(conf *Config, fsm FSM, logs LogStore, stable StableStore, snaps Sna
rpcCh: trans.Consumer(),
snapshots: snaps,
snapshotCh: make(chan *snapshotFuture),
recoveryCh: make(chan struct{}, 1),
shutdownCh: make(chan struct{}),
stable: stable,
trans: trans,
Expand Down Expand Up @@ -330,25 +326,37 @@ func NewRaft(conf *Config, fsm FSM, logs LogStore, stable StableStore, snaps Sna
r.checkAndProcessConfigurationLog(&entry)
}

// Start the FSM and snapshot handlers here in case we need a recovery
// snapshot.
r.goFunc(r.runFSM)
r.goFunc(r.runSnapshots)

// Allow the recovery manager to intervene, if one was supplied. We force
// the current configuration based on the recovery override and then
// arrange for a no-op configuration change to happen as soon as possible
// in order to have the leader flush this out to the followers.
// the current configuration based on the recovery override and add it
// to the log so that followers will get it once a leader steps up.
if recovery != nil {
latest := r.configurations.latest
latestIndex := r.configurations.latestIndex
if configuration, ok := recovery.Override(latest, latestIndex); ok {
r.logger.Printf("[INFO] Recovering configuration: %+v", configuration)
fakeIndex := lastLog.Index + 1
r.configurations.latest = configuration
r.configurations.latestIndex = latestIndex
r.configurations.latestIndex = fakeIndex
r.configurations.committed = configuration
r.configurations.committedIndex = latestIndex
r.goFunc(func() { runRecovery(r.logger, recovery, r.recoveryCh, r.shutdownCh) })
r.configurations.committedIndex = fakeIndex
entry := &Log{
Index: fakeIndex,
Term: lastLog.Term,
}
if protocolVersion < 1 {
entry.Type = LogRemovePeerDeprecated
entry.Data = encodePeers(configuration, trans)
} else {
entry.Type = LogConfiguration
entry.Data = encodeConfiguration(configuration)
}
if err := logs.StoreLog(entry); err != nil {
return nil, fmt.Errorf("failed to append configuration entry to log: %v", err)
}
r.setLastLog(fakeIndex, lastLog.Term)
if err := recovery.Disarm(); err != nil {
return nil, fmt.Errorf("failed to disarm recovery manager: %v", err)
}
}
}

Expand All @@ -359,8 +367,10 @@ func NewRaft(conf *Config, fsm FSM, logs LogStore, stable StableStore, snaps Sna
// to be called concurrently with a blocking RPC.
trans.SetHeartbeatHandler(r.processHeartbeat)

// Start the rest of the background work.
// Start the background work.
r.goFunc(r.run)
r.goFunc(r.runFSM)
r.goFunc(r.runSnapshots)
return r, nil
}

Expand Down
17 changes: 2 additions & 15 deletions raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -654,10 +654,6 @@ func (r *Raft) appendConfigurationEntry(future *configurationChangeFuture) {
r.configurations.latestIndex = index
r.leaderState.commitment.setConfiguration(configuration)
r.startStopReplication()

// Since we've added the recovery configuration to our log, we can
// disarm the recovery manager, if any.
asyncNotifyCh(r.recoveryCh)
}

// dispatchLog is called on the leader to push a log to disk, mark it
Expand Down Expand Up @@ -914,12 +910,7 @@ func (r *Raft) appendEntries(rpc RPC, a *AppendEntriesRequest) {

// Handle any new configuration changes
for _, newEntry := range newEntries {
if ok := r.checkAndProcessConfigurationLog(newEntry); ok {
// Since we've added the recovery
// configuration to our log, we can
// disarm the recovery manager, if any.
asyncNotifyCh(r.recoveryCh)
}
r.checkAndProcessConfigurationLog(newEntry)
}

// Update the lastLog
Expand Down Expand Up @@ -952,22 +943,18 @@ func (r *Raft) appendEntries(rpc RPC, a *AppendEntriesRequest) {
// checkAndProcessConfigurationLog takes a log entry and updates the latest
// configuration if the entry results in a new configuration. This must only be
// called from the main thread, or from NewRaft() before any threads have begun.
// Returns true if a configuration log entry was found.
func (r *Raft) checkAndProcessConfigurationLog(entry *Log) bool {
func (r *Raft) checkAndProcessConfigurationLog(entry *Log) {
if entry.Type == LogConfiguration {
r.configurations.committed = r.configurations.latest
r.configurations.committedIndex = r.configurations.latestIndex
r.configurations.latest = decodeConfiguration(entry.Data)
r.configurations.latestIndex = entry.Index
return true
} else if entry.Type == LogAddPeerDeprecated || entry.Type == LogRemovePeerDeprecated {
r.configurations.committed = r.configurations.latest
r.configurations.committedIndex = r.configurations.latestIndex
r.configurations.latest = decodePeers(entry.Data, r.trans)
r.configurations.latestIndex = entry.Index
return true
}
return false
}

// requestVote is invoked when we get an request vote RPC call.
Expand Down
65 changes: 29 additions & 36 deletions raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1337,16 +1337,18 @@ func TestRaft_SnapshotRestore(t *testing.T) {
// TODO: Need a test that has a previous format Snapshot and check that it can
// be read/installed on the new code.

// TODO: Need a test to process old-style entries when starting up.
// TODO: Need a test to process old-style entries in the Raft log when starting
// up.

func TestRaft_SnapshotRestore_PeerChange(t *testing.T) {
// Make the cluster
func testRecover(t *testing.T, protocolVersion int) {
// Make the cluster.
conf := inmemConfig(t)
conf.ProtocolVersion = protocolVersion
conf.TrailingLogs = 10
c := MakeCluster(3, t, conf)
defer c.Close()

// Commit a lot of things
// Commit a lot of things.
leader := c.Leader()
var future Future
for i := 0; i < 100; i++ {
Expand All @@ -1358,38 +1360,38 @@ func TestRaft_SnapshotRestore_PeerChange(t *testing.T) {
c.FailNowf("[ERR] err: %v", err)
}

// Take a snapshot
// Take a snapshot.
snapFuture := leader.Snapshot()
if err := snapFuture.Error(); err != nil {
c.FailNowf("[ERR] err: %v", err)
}

// Shutdown
// Shutdown.
shutdown := leader.Shutdown()
if err := shutdown.Error(); err != nil {
c.FailNowf("[ERR] err: %v", err)
}

// Make a separate cluster
// Make a separate cluster.
c2 := MakeClusterNoBootstrap(2, t, conf)
defer c2.Close()

// Kill the old cluster
// Kill the old cluster.
for _, sec := range c.rafts {
if sec != leader {
sec.Shutdown()
}
}

// Change the peer addresses
// Gather the new peer address list.
var peers []string
peers = append(peers, fmt.Sprintf("%q", leader.trans.LocalAddr()))
for _, sec := range c2.rafts {
peers = append(peers, fmt.Sprintf("%q", sec.trans.LocalAddr()))
}
content := []byte(fmt.Sprintf("[%s]", strings.Join(peers, ",")))

// Set up a recovery manager
// Set up a recovery manager.
base, err := ioutil.TempDir("", "")
if err != nil {
c.FailNowf("[ERR] err: %v", err)
Expand All @@ -1404,10 +1406,10 @@ func TestRaft_SnapshotRestore_PeerChange(t *testing.T) {
c.FailNowf("[ERR] err: %v", err)
}

// Restart the Raft with new peers
// Restart the Raft with new peers.
r := leader

// Can't just reuse the old transport as it will be closed
// Can't just reuse the old transport as it will be closed.
_, trans2 := NewInmemTransport(r.trans.LocalAddr())
r, err = NewRaft(r.conf, r.fsm, r.logs, r.stable, r.snapshots, trans2, recovery)
if err != nil {
Expand All @@ -1419,45 +1421,36 @@ func TestRaft_SnapshotRestore_PeerChange(t *testing.T) {
c2.fsms = append(c2.fsms, r.fsm.(*MockFSM))
c2.FullyConnect()

// Wait a while
// Wait a while.
time.Sleep(c.propagateTimeout)

// Ensure we elect a leader, and that we replicate
// to our new followers
// Ensure we elect a leader, and that we replicate to our new followers.
c2.EnsureSame(t)

// We should have restored from the snapshot! Note that there's one more
// index bump from the noop the leader applies when taking over.
if last := r.getLastApplied(); last != 103 {
// We should have restored from the snapshot! Note that there's one
// index bump from the log entry added during recovery, and another
// bump from the noop the leader tees up when it takes over.
if last := r.getLastApplied(); last != 104 {
c.FailNowf("[ERR] bad last: %v", last)
}

// TODO (slackpad) - This isn't ideal because it shifts the burden of
// finalizing the config override onto the application. Until they do
// this, or a snapshot occurs, leader elections won't work if the other
// servers have installed an old snapshot and don't have the right peers.
// We used to re-assert the configuration whenever a leader was elected
// which papers over this. Can we go ahead and append to the log during
// recovery?

// The followers will have the old configuration from the snapshot so we
// have to kick out one config change in order to get them the overridder
// configuration.
peerFuture := c2.Leader().AddPeer(c2.Leader().localAddr)
if err := peerFuture.Error(); err != nil {
c.FailNowf("[ERR] failed to make peer change: %v", err)
}

// Check the peers
// Check the peers.
c2.EnsureSamePeers(t)

// Make sure the recovery disarm step ran.
// Make sure the recovery disarm step ran, otherwise we could revert
// some configuration change that happens later.
_, err = os.Stat(peersFile)
if !os.IsNotExist(err) {
c.FailNowf("[ERR] peers.json file should be deleted: %v", err)
}
}

func TestRaft_SnapshotRestore_PeerChange(t *testing.T) {
for v := ProtocolVersionMin; v <= ProtocolVersionMax; v++ {
testRecover(t, v)
}
}

func TestRaft_AutoSnapshot(t *testing.T) {
// Make the cluster
conf := inmemConfig(t)
Expand Down
21 changes: 0 additions & 21 deletions recovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"bytes"
"encoding/json"
"io/ioutil"
"log"
"os"
"path/filepath"
)
Expand All @@ -29,26 +28,6 @@ type Recovery interface {
Disarm() error
}

// runRecovery is a goroutine that handles notifying a recovery manager that it
// has been disarmed. It'll keep trying until it has disarmed the recovery
// manager or it has been shut down.
func runRecovery(logger *log.Logger, recovery Recovery, recoveryCh chan struct{}, shutdownCh chan struct{}) {
for {
select {
case <-recoveryCh:
if err := recovery.Disarm(); err != nil {
logger.Printf("[ERR] raft: Recovery manager failed to disarm: %v", err)
} else {
logger.Printf("[INFO] raft: Recovery manager disarmed successfully")
return
}

case <-shutdownCh:
return
}
}
}

// PeersJSONRecovery is a recovery manager that reads the old-style peers.json
// file. It does not support server IDs, so it should only be used along with
// ProtocolVersion 0. Upon disarming, it'll delete the peers.json file.
Expand Down
70 changes: 0 additions & 70 deletions recovery_test.go
Original file line number Diff line number Diff line change
@@ -1,84 +1,14 @@
package raft

import (
"fmt"
"io/ioutil"
"log"
"os"
"path/filepath"
"reflect"
"strings"
"testing"
"time"
)

type MockRecovery struct {
errors int
eventCh chan string
}

func (m *MockRecovery) Override(latest Configuration, latestIndex uint64) (Configuration, bool) {
return Configuration{}, false
}

func (m *MockRecovery) Disarm() error {
if m.errors > 0 {
m.errors--
m.eventCh <- "error"
return fmt.Errorf("error")
}

m.eventCh <- "disarm"
return nil
}

func TestRecovery_Goroutine(t *testing.T) {
eventCh := make(chan string, 10)
mock := &MockRecovery{
errors: 1,
eventCh: eventCh,
}

logger := log.New(os.Stderr, "", log.LstdFlags)
recoveryCh := make(chan struct{})
shutdownCh := make(chan struct{})
waitForEvent := func(expected string) {
deadline := time.Now().Add(1 * time.Second)
for {
select {
case event := <-eventCh:
if event != expected {
t.Fatalf("bad: expected %s, got %s", expected, event)
}
return

case <-time.After(10 * time.Millisecond):
if time.Now().After(deadline) {
t.Fatalf("bad: timed out waiting for %s", expected)
}
asyncNotifyCh(recoveryCh)
}
}
}

// Try a case where the first disarm fails, then finally works.
go func() {
runRecovery(logger, mock, recoveryCh, shutdownCh)
eventCh <- "exit"
}()
waitForEvent("error")
waitForEvent("disarm")
waitForEvent("exit")

// Try the shutdown channel.
go func() {
runRecovery(logger, mock, recoveryCh, shutdownCh)
eventCh <- "exit"
}()
close(shutdownCh)
waitForEvent("exit")
}

func TestRecovery_PeersJSON_BadConfiguration(t *testing.T) {
base, err := ioutil.TempDir("", "")
if err != nil {
Expand Down
Loading

0 comments on commit f75a881

Please sign in to comment.