diff --git a/api.go b/api.go index 6cb8065db1a..6bbc74d910f 100644 --- a/api.go +++ b/api.go @@ -260,13 +260,59 @@ func BootstrapCluster(conf *Config, logs LogStore, stable StableStore, // the usual APIs in order to bring the cluster back into a known state. func RecoverCluster(conf *Config, fsm FSM, logs LogStore, stable StableStore, snaps SnapshotStore, trans Transport, configuration Configuration) error { + var err error + var lastSnapIndex, lastTerm, lastLogIndex uint64 + // check error conditions for misconfigured or unreadable configuration and state + if err = validateState(conf, logs, stable, snaps, configuration); err != nil { + return err + } + + // The snapshot information is the best known end point for the data + // until we play back the Raft log entries. + // Attempt to restore any snapshots we find, newest to oldest. + lastSnapIndex, lastTerm, err = restoreSnapshots(snaps, fsm) + if err != nil { + return err + } + + // Apply any Raft log entries past the snapshot. + lastSnapIndex, lastTerm, lastLogIndex, err = applyLogs(logs, fsm) + if err != nil { + return err + } + + // Take new snapshot + if err = commitSnapshot(conf, logs, snaps, fsm, configuration, trans, lastSnapIndex, lastTerm); err != nil { + return err + } + + // Compact the log so that we don't get bad interference from any + // configuration change log entries that might be there. + var firstLogIndex uint64 + firstLogIndex, err = logs.FirstIndex() + if err != nil { + return fmt.Errorf("failed to get first log index: %v", err) + } + if err = logs.DeleteRange(firstLogIndex, lastLogIndex); err != nil { + return fmt.Errorf("log compaction failed: %v", err) + } + + return nil +} + +// validateState anticipates a couple of different error conditions +// for recovering the Raft cluster. +func validateState(conf *Config, logs LogStore, stable StableStore, + snaps SnapshotStore, configuration Configuration) error { + var err error + var hasState bool // Validate the Raft server config. - if err := ValidateConfig(conf); err != nil { + if err = ValidateConfig(conf); err != nil { return err } // Sanity check the Raft peer configuration. - if err := checkConfiguration(configuration); err != nil { + if err = checkConfiguration(configuration); err != nil { return err } @@ -275,23 +321,29 @@ func RecoverCluster(conf *Config, fsm FSM, logs LogStore, stable StableStore, // expect data to be there and it's not. By refusing, we force them // to show intent to start a cluster fresh by explicitly doing a // bootstrap, rather than quietly fire up a fresh cluster here. - hasState, err := HasExistingState(logs, stable, snaps) + hasState, err = HasExistingState(logs, stable, snaps) if err != nil { return fmt.Errorf("failed to check for existing state: %v", err) } if !hasState { return fmt.Errorf("refused to recover cluster with no initial state, this is probably an operator error") } + return nil +} + +// restoreSnapshots attempts to restore each snapshot +func restoreSnapshots(snaps SnapshotStore, fsm FSM) (index, term uint64, err error) { + var snapshots []*SnapshotMeta + snapshots, err = snaps.List() - // Attempt to restore any snapshots we find, newest to oldest. - var snapshotIndex uint64 - var snapshotTerm uint64 - snapshots, err := snaps.List() if err != nil { - return fmt.Errorf("failed to list snapshots: %v", err) + return 0, 0, fmt.Errorf("failed to list snapshots: %v", err) } + for _, snapshot := range snapshots { - _, source, err := snaps.Open(snapshot.ID) + var source io.ReadCloser + + _, source, err = snaps.Open(snapshot.ID) if err != nil { // Skip this one and try the next. We will detect if we // couldn't open any snapshots. @@ -305,29 +357,28 @@ func RecoverCluster(conf *Config, fsm FSM, logs LogStore, stable StableStore, // Same here, skip and try the next one. continue } - - snapshotIndex = snapshot.Index - snapshotTerm = snapshot.Term + index, term = snapshot.Index, snapshot.Term break } - if len(snapshots) > 0 && (snapshotIndex == 0 || snapshotTerm == 0) { - return fmt.Errorf("failed to restore any of the available snapshots") - } - // The snapshot information is the best known end point for the data - // until we play back the Raft log entries. - lastIndex := snapshotIndex - lastTerm := snapshotTerm + if len(snapshots) > 0 && (index == 0 || term == 0) { + err = fmt.Errorf("failed to restore any of the available snapshots") + return index, term, err + } + return index, term, nil +} +// Applies log entries that exist past the snapshot point +func applyLogs(logs LogStore, fsm FSM) (lastIndex, lastTerm, lastLogIndex uint64, err error) { // Apply any Raft log entries past the snapshot. - lastLogIndex, err := logs.LastIndex() + lastLogIndex, err = logs.LastIndex() if err != nil { - return fmt.Errorf("failed to find last log: %v", err) + err = fmt.Errorf("failed to find last log: %v", err) } - for index := snapshotIndex + 1; index <= lastLogIndex; index++ { + for index := lastIndex + 1; index <= lastLogIndex; index++ { var entry Log - if err := logs.GetLog(index, &entry); err != nil { - return fmt.Errorf("failed to get log at index %d: %v", index, err) + if err = logs.GetLog(index, &entry); err != nil { + err = fmt.Errorf("failed to get log at index %d: %v", index, err) } if entry.Type == LogCommand { _ = fsm.Apply(&entry) @@ -335,35 +386,29 @@ func RecoverCluster(conf *Config, fsm FSM, logs LogStore, stable StableStore, lastIndex = entry.Index lastTerm = entry.Term } + return lastIndex, lastTerm, lastLogIndex, nil +} - // Create a new snapshot, placing the configuration in as if it was - // committed at index 1. - snapshot, err := fsm.Snapshot() +func commitSnapshot(conf *Config, log LogStore, snaps SnapshotStore, fsm FSM, configuration Configuration, trans Transport, + lastIndex, lastTerm uint64) error { + var err error + var snapshot FSMSnapshot + snapshot, err = fsm.Snapshot() if err != nil { return fmt.Errorf("failed to snapshot FSM: %v", err) } version := getSnapshotVersion(conf.ProtocolVersion) - sink, err := snaps.Create(version, lastIndex, lastTerm, configuration, 1, trans) + var sink SnapshotSink + sink, err = snaps.Create(version, lastIndex, lastTerm, configuration, 1, trans) if err != nil { return fmt.Errorf("failed to create snapshot: %v", err) } - if err := snapshot.Persist(sink); err != nil { + if err = snapshot.Persist(sink); err != nil { return fmt.Errorf("failed to persist snapshot: %v", err) } - if err := sink.Close(); err != nil { + if err = sink.Close(); err != nil { return fmt.Errorf("failed to finalize snapshot: %v", err) } - - // Compact the log so that we don't get bad interference from any - // configuration change log entries that might be there. - firstLogIndex, err := logs.FirstIndex() - if err != nil { - return fmt.Errorf("failed to get first log index: %v", err) - } - if err := logs.DeleteRange(firstLogIndex, lastLogIndex); err != nil { - return fmt.Errorf("log compaction failed: %v", err) - } - return nil } diff --git a/net_transport_test.go b/net_transport_test.go index 5c614b7ecd4..869b45a9921 100644 --- a/net_transport_test.go +++ b/net_transport_test.go @@ -38,7 +38,7 @@ func TestNetworkTransport_CloseStreams(t *testing.T) { PrevLogEntry: 100, PrevLogTerm: 4, Entries: []*Log{ - &Log{ + { Index: 101, Term: 4, Type: LogNoop, @@ -76,7 +76,6 @@ func TestNetworkTransport_CloseStreams(t *testing.T) { t.Fatalf("err: %v", err) } defer trans2.Close() - for i := 0; i < 2; i++ { // Create wait group wg := &sync.WaitGroup{} @@ -199,7 +198,7 @@ func TestNetworkTransport_AppendEntries(t *testing.T) { PrevLogEntry: 100, PrevLogTerm: 4, Entries: []*Log{ - &Log{ + { Index: 101, Term: 4, Type: LogNoop, @@ -268,7 +267,7 @@ func TestNetworkTransport_AppendEntriesPipeline(t *testing.T) { PrevLogEntry: 100, PrevLogTerm: 4, Entries: []*Log{ - &Log{ + { Index: 101, Term: 4, Type: LogNoop, @@ -351,7 +350,7 @@ func TestNetworkTransport_AppendEntriesPipeline_CloseStreams(t *testing.T) { PrevLogEntry: 100, PrevLogTerm: 4, Entries: []*Log{ - &Log{ + { Index: 101, Term: 4, Type: LogNoop, @@ -625,7 +624,7 @@ func TestNetworkTransport_PooledConn(t *testing.T) { PrevLogEntry: 100, PrevLogTerm: 4, Entries: []*Log{ - &Log{ + { Index: 101, Term: 4, Type: LogNoop,