Skip to content

Commit

Permalink
Moves recovery out into its own pre-NewRaft() call.
Browse files Browse the repository at this point in the history
  • Loading branch information
James Phillips committed Jul 19, 2016
1 parent 4f21678 commit 83c1730
Show file tree
Hide file tree
Showing 5 changed files with 77 additions and 62 deletions.
106 changes: 58 additions & 48 deletions api.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,14 +209,67 @@ func BootstrapCluster(config *Config, logs LogStore, stable StableStore, snaps S
return nil
}

// RecoverCluster is used to manually force a new configuration in order to
// recover from a loss of quorum. If this is used, ALL SERVERS in the cluster
// must be started with the same recovery settings. Otherwise, the configuration
// may be incorrect depending on which server is elected.
func RecoverCluster(conf *Config, logs LogStore, trans Transport, recovery Recovery) error {
// Validate the configuration.
if err := ValidateConfig(conf); err != nil {
return err
}

// Read the last log value.
lastIdx, err := logs.LastIndex()
if err != nil {
return fmt.Errorf("failed to find last log: %v", err)
}

// Get the log.
var lastLog Log
if lastIdx > 0 {
if err = logs.GetLog(lastIdx, &lastLog); err != nil {
return fmt.Errorf("failed to get last log: %v", err)
}
} else {
return fmt.Errorf("cannot run recovery on a cluster that's not bootstrapped")
}

// See if there's a configuration override.
if configuration, ok := recovery.Override(lastLog.Index); ok {
fakeIndex := lastLog.Index + 1

// Add a new log entry.
entry := &Log{
Index: fakeIndex,
Term: lastLog.Term,
}
if conf.ProtocolVersion < 1 {
entry.Type = LogRemovePeerDeprecated
entry.Data = encodePeers(configuration, trans)
} else {
entry.Type = LogConfiguration
entry.Data = encodeConfiguration(configuration)
}
if err := logs.StoreLog(entry); err != nil {
return fmt.Errorf("failed to append configuration entry to log: %v", err)
}

// Disarm the recovery manager so that we won't revert a
// subsequent configuration change.
if err := recovery.Disarm(); err != nil {
return fmt.Errorf("failed to disarm recovery manager: %v", err)
}
}

return nil
}

// NewRaft is used to construct a new Raft node. It takes a configuration, as well
// as implementations of various interfaces that are required. If we have any
// old state, such as snapshots, logs, peers, etc, all those will be restored
// when creating the Raft node. The recovery manager is optional (may be nil) and
// provides a hook for injecting an operator-specified configuration. If this is
// used, ALL NODES in the cluster must be started with the same recovery settings.
func NewRaft(conf *Config, fsm FSM, logs LogStore, stable StableStore, snaps SnapshotStore,
trans Transport, recovery Recovery) (*Raft, error) {
// when creating the Raft node.
func NewRaft(conf *Config, fsm FSM, logs LogStore, stable StableStore, snaps SnapshotStore, trans Transport) (*Raft, error) {
// Validate the configuration.
if err := ValidateConfig(conf); err != nil {
return nil, err
Expand Down Expand Up @@ -325,49 +378,6 @@ func NewRaft(conf *Config, fsm FSM, logs LogStore, stable StableStore, snaps Sna
}
r.checkAndProcessConfigurationLog(&entry)
}

// Allow the recovery manager to intervene, if one was supplied. We force
// the current configuration based on the recovery override and add it
// to the log so that followers will get it once a leader steps up.
if recovery != nil {
latest := r.configurations.latest
latestIndex := r.configurations.latestIndex
if configuration, ok := recovery.Override(latest, latestIndex); ok {
fakeIndex := lastLog.Index + 1
r.logger.Printf("[INFO] Recovering configuration by adding new log at index %d: %+v", fakeIndex, configuration)

// Make this is active configuration, and make it look like
// it was committed.
r.configurations.latest = configuration
r.configurations.latestIndex = fakeIndex
r.configurations.committed = configuration
r.configurations.committedIndex = fakeIndex

// Add a new log entry.
entry := &Log{
Index: fakeIndex,
Term: lastLog.Term,
}
if protocolVersion < 1 {
entry.Type = LogRemovePeerDeprecated
entry.Data = encodePeers(configuration, trans)
} else {
entry.Type = LogConfiguration
entry.Data = encodeConfiguration(configuration)
}
if err := logs.StoreLog(entry); err != nil {
return nil, fmt.Errorf("failed to append configuration entry to log: %v", err)
}
r.setLastLog(fakeIndex, lastLog.Term)

// Disarm the recovery manager so that we won't revert a
// subsequent configuration change.
if err := recovery.Disarm(); err != nil {
return nil, fmt.Errorf("failed to disarm recovery manager: %v", err)
}
}
}

r.logger.Printf("[INFO] NewRaft configurations: %+v", r.configurations)

// Setup a heartbeat fast-path to avoid head-of-line
Expand Down
2 changes: 1 addition & 1 deletion integ_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func MakeRaft(t *testing.T, conf *Config) *RaftEnv {
}

log.Printf("[INFO] Starting node at %v", trans.LocalAddr())
raft, err := NewRaft(conf, env.fsm, stable, stable, snap, trans, nil)
raft, err := NewRaft(conf, env.fsm, stable, stable, snap, trans)
if err != nil {
t.Fatalf("err: %v", err)
}
Expand Down
19 changes: 12 additions & 7 deletions raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -632,7 +632,7 @@ func makeCluster(n int, bootstrap bool, t *testing.T, conf *Config) *cluster {
}
}

raft, err := NewRaft(peerConf, c.fsms[i], logs, store, snap, trans, nil)
raft, err := NewRaft(peerConf, c.fsms[i], logs, store, snap, trans)
if err != nil {
c.FailNowf("[ERR] NewRaft failed: %v", err)
}
Expand Down Expand Up @@ -1322,7 +1322,7 @@ func TestRaft_SnapshotRestore(t *testing.T) {
r := leader
// Can't just reuse the old transport as it will be closed
_, trans2 := NewInmemTransport(r.trans.LocalAddr())
r, err := NewRaft(r.conf, r.fsm, r.logs, r.stable, r.snapshots, trans2, nil)
r, err := NewRaft(r.conf, r.fsm, r.logs, r.stable, r.snapshots, trans2)
if err != nil {
c.FailNowf("[ERR] err: %v", err)
}
Expand All @@ -1340,6 +1340,8 @@ func TestRaft_SnapshotRestore(t *testing.T) {
// TODO: Need a test to process old-style entries in the Raft log when starting
// up.

// TODO: Add a dedicated test for RecoverCluster() hitting the edges.

func testRecover(t *testing.T, protocolVersion int) {
// Make the cluster.
conf := inmemConfig(t)
Expand Down Expand Up @@ -1383,6 +1385,9 @@ func testRecover(t *testing.T, protocolVersion int) {
}
}

// Restart the Raft with new peers.
r := leader

// Gather the new peer address list.
var peers []string
peers = append(peers, fmt.Sprintf("%q", leader.trans.LocalAddr()))
Expand All @@ -1391,7 +1396,7 @@ func testRecover(t *testing.T, protocolVersion int) {
}
content := []byte(fmt.Sprintf("[%s]", strings.Join(peers, ",")))

// Set up a recovery manager.
// Perform a manual recovery on the cluster.
base, err := ioutil.TempDir("", "")
if err != nil {
c.FailNowf("[ERR] err: %v", err)
Expand All @@ -1405,13 +1410,13 @@ func testRecover(t *testing.T, protocolVersion int) {
if err != nil {
c.FailNowf("[ERR] err: %v", err)
}

// Restart the Raft with new peers.
r := leader
if err := RecoverCluster(r.conf, r.logs, r.trans, recovery); err != nil {
c.FailNowf("[ERR] err: %v", err)
}

// Can't just reuse the old transport as it will be closed.
_, trans2 := NewInmemTransport(r.trans.LocalAddr())
r, err = NewRaft(r.conf, r.fsm, r.logs, r.stable, r.snapshots, trans2, recovery)
r, err = NewRaft(r.conf, r.fsm, r.logs, r.stable, r.snapshots, trans2)
if err != nil {
c.FailNowf("[ERR] err: %v", err)
}
Expand Down
10 changes: 5 additions & 5 deletions recovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@ import (
type Recovery interface {
// Override is called when Raft is starting up. If an override is
// requested, this returns true along with the new configuration. We
// include the latest configuration and index so that the recovery
// manager can examine those and decide whether or not to run, such as
// if we want to inject a configuration only under a specific index.
Override(latest Configuration, latestIndex uint64) (Configuration, bool)
// include the last index so that the recovery manager can examine
// that and decide whether or not to run, such as if we want to inject a
// configuration only under a specific index.
Override(lastIndex uint64) (Configuration, bool)

// Disarm is called whenever the recovery configuration becomes durable
// in the Raft system, such as when another configuration change is
Expand Down Expand Up @@ -84,7 +84,7 @@ func NewPeersJSONRecovery(base string) (*PeersJSONRecovery, error) {
}

// See the Recovery interface documentation for Override.
func (r *PeersJSONRecovery) Override(latest Configuration, latestIndex uint64) (Configuration, bool) {
func (r *PeersJSONRecovery) Override(lastIndex uint64) (Configuration, bool) {
return r.configuration, true
}

Expand Down
2 changes: 1 addition & 1 deletion recovery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func TestRecovery_PeersJSON(t *testing.T) {
t.Fatalf("err: %v", err)
}

configuration, ok := recovery.Override(Configuration{}, 0)
configuration, ok := recovery.Override(0)
if !ok {
t.Fatalf("bad: should have an override")
}
Expand Down

0 comments on commit 83c1730

Please sign in to comment.