Skip to content

Commit

Permalink
Refactor recoverCluster (hashicorp#329)
Browse files Browse the repository at this point in the history
* Refactor recoverCluster into private functions

* added lastSnapIndex and some is
  • Loading branch information
schristoff authored Jul 17, 2019
1 parent 8ca366b commit 904f3b9
Show file tree
Hide file tree
Showing 2 changed files with 91 additions and 47 deletions.
127 changes: 86 additions & 41 deletions api.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,13 +260,59 @@ func BootstrapCluster(conf *Config, logs LogStore, stable StableStore,
// the usual APIs in order to bring the cluster back into a known state.
func RecoverCluster(conf *Config, fsm FSM, logs LogStore, stable StableStore,
snaps SnapshotStore, trans Transport, configuration Configuration) error {
var err error
var lastSnapIndex, lastTerm, lastLogIndex uint64
// check error conditions for misconfigured or unreadable configuration and state
if err = validateState(conf, logs, stable, snaps, configuration); err != nil {
return err
}

// The snapshot information is the best known end point for the data
// until we play back the Raft log entries.
// Attempt to restore any snapshots we find, newest to oldest.
lastSnapIndex, lastTerm, err = restoreSnapshots(snaps, fsm)
if err != nil {
return err
}

// Apply any Raft log entries past the snapshot.
lastSnapIndex, lastTerm, lastLogIndex, err = applyLogs(logs, fsm)
if err != nil {
return err
}

// Take new snapshot
if err = commitSnapshot(conf, logs, snaps, fsm, configuration, trans, lastSnapIndex, lastTerm); err != nil {
return err
}

// Compact the log so that we don't get bad interference from any
// configuration change log entries that might be there.
var firstLogIndex uint64
firstLogIndex, err = logs.FirstIndex()
if err != nil {
return fmt.Errorf("failed to get first log index: %v", err)
}
if err = logs.DeleteRange(firstLogIndex, lastLogIndex); err != nil {
return fmt.Errorf("log compaction failed: %v", err)
}

return nil
}

// validateState anticipates a couple of different error conditions
// for recovering the Raft cluster.
func validateState(conf *Config, logs LogStore, stable StableStore,
snaps SnapshotStore, configuration Configuration) error {
var err error
var hasState bool
// Validate the Raft server config.
if err := ValidateConfig(conf); err != nil {
if err = ValidateConfig(conf); err != nil {
return err
}

// Sanity check the Raft peer configuration.
if err := checkConfiguration(configuration); err != nil {
if err = checkConfiguration(configuration); err != nil {
return err
}

Expand All @@ -275,23 +321,29 @@ func RecoverCluster(conf *Config, fsm FSM, logs LogStore, stable StableStore,
// expect data to be there and it's not. By refusing, we force them
// to show intent to start a cluster fresh by explicitly doing a
// bootstrap, rather than quietly fire up a fresh cluster here.
hasState, err := HasExistingState(logs, stable, snaps)
hasState, err = HasExistingState(logs, stable, snaps)
if err != nil {
return fmt.Errorf("failed to check for existing state: %v", err)
}
if !hasState {
return fmt.Errorf("refused to recover cluster with no initial state, this is probably an operator error")
}
return nil
}

// restoreSnapshots attempts to restore each snapshot
func restoreSnapshots(snaps SnapshotStore, fsm FSM) (index, term uint64, err error) {
var snapshots []*SnapshotMeta
snapshots, err = snaps.List()

// Attempt to restore any snapshots we find, newest to oldest.
var snapshotIndex uint64
var snapshotTerm uint64
snapshots, err := snaps.List()
if err != nil {
return fmt.Errorf("failed to list snapshots: %v", err)
return 0, 0, fmt.Errorf("failed to list snapshots: %v", err)
}

for _, snapshot := range snapshots {
_, source, err := snaps.Open(snapshot.ID)
var source io.ReadCloser

_, source, err = snaps.Open(snapshot.ID)
if err != nil {
// Skip this one and try the next. We will detect if we
// couldn't open any snapshots.
Expand All @@ -305,65 +357,58 @@ func RecoverCluster(conf *Config, fsm FSM, logs LogStore, stable StableStore,
// Same here, skip and try the next one.
continue
}

snapshotIndex = snapshot.Index
snapshotTerm = snapshot.Term
index, term = snapshot.Index, snapshot.Term
break
}
if len(snapshots) > 0 && (snapshotIndex == 0 || snapshotTerm == 0) {
return fmt.Errorf("failed to restore any of the available snapshots")
}

// The snapshot information is the best known end point for the data
// until we play back the Raft log entries.
lastIndex := snapshotIndex
lastTerm := snapshotTerm
if len(snapshots) > 0 && (index == 0 || term == 0) {
err = fmt.Errorf("failed to restore any of the available snapshots")
return index, term, err
}
return index, term, nil
}

// Applies log entries that exist past the snapshot point
func applyLogs(logs LogStore, fsm FSM) (lastIndex, lastTerm, lastLogIndex uint64, err error) {
// Apply any Raft log entries past the snapshot.
lastLogIndex, err := logs.LastIndex()
lastLogIndex, err = logs.LastIndex()
if err != nil {
return fmt.Errorf("failed to find last log: %v", err)
err = fmt.Errorf("failed to find last log: %v", err)
}
for index := snapshotIndex + 1; index <= lastLogIndex; index++ {
for index := lastIndex + 1; index <= lastLogIndex; index++ {
var entry Log
if err := logs.GetLog(index, &entry); err != nil {
return fmt.Errorf("failed to get log at index %d: %v", index, err)
if err = logs.GetLog(index, &entry); err != nil {
err = fmt.Errorf("failed to get log at index %d: %v", index, err)
}
if entry.Type == LogCommand {
_ = fsm.Apply(&entry)
}
lastIndex = entry.Index
lastTerm = entry.Term
}
return lastIndex, lastTerm, lastLogIndex, nil
}

// Create a new snapshot, placing the configuration in as if it was
// committed at index 1.
snapshot, err := fsm.Snapshot()
func commitSnapshot(conf *Config, log LogStore, snaps SnapshotStore, fsm FSM, configuration Configuration, trans Transport,
lastIndex, lastTerm uint64) error {
var err error
var snapshot FSMSnapshot
snapshot, err = fsm.Snapshot()
if err != nil {
return fmt.Errorf("failed to snapshot FSM: %v", err)
}
version := getSnapshotVersion(conf.ProtocolVersion)
sink, err := snaps.Create(version, lastIndex, lastTerm, configuration, 1, trans)
var sink SnapshotSink
sink, err = snaps.Create(version, lastIndex, lastTerm, configuration, 1, trans)
if err != nil {
return fmt.Errorf("failed to create snapshot: %v", err)
}
if err := snapshot.Persist(sink); err != nil {
if err = snapshot.Persist(sink); err != nil {
return fmt.Errorf("failed to persist snapshot: %v", err)
}
if err := sink.Close(); err != nil {
if err = sink.Close(); err != nil {
return fmt.Errorf("failed to finalize snapshot: %v", err)
}

// Compact the log so that we don't get bad interference from any
// configuration change log entries that might be there.
firstLogIndex, err := logs.FirstIndex()
if err != nil {
return fmt.Errorf("failed to get first log index: %v", err)
}
if err := logs.DeleteRange(firstLogIndex, lastLogIndex); err != nil {
return fmt.Errorf("log compaction failed: %v", err)
}

return nil
}

Expand Down
11 changes: 5 additions & 6 deletions net_transport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func TestNetworkTransport_CloseStreams(t *testing.T) {
PrevLogEntry: 100,
PrevLogTerm: 4,
Entries: []*Log{
&Log{
{
Index: 101,
Term: 4,
Type: LogNoop,
Expand Down Expand Up @@ -76,7 +76,6 @@ func TestNetworkTransport_CloseStreams(t *testing.T) {
t.Fatalf("err: %v", err)
}
defer trans2.Close()

for i := 0; i < 2; i++ {
// Create wait group
wg := &sync.WaitGroup{}
Expand Down Expand Up @@ -199,7 +198,7 @@ func TestNetworkTransport_AppendEntries(t *testing.T) {
PrevLogEntry: 100,
PrevLogTerm: 4,
Entries: []*Log{
&Log{
{
Index: 101,
Term: 4,
Type: LogNoop,
Expand Down Expand Up @@ -268,7 +267,7 @@ func TestNetworkTransport_AppendEntriesPipeline(t *testing.T) {
PrevLogEntry: 100,
PrevLogTerm: 4,
Entries: []*Log{
&Log{
{
Index: 101,
Term: 4,
Type: LogNoop,
Expand Down Expand Up @@ -351,7 +350,7 @@ func TestNetworkTransport_AppendEntriesPipeline_CloseStreams(t *testing.T) {
PrevLogEntry: 100,
PrevLogTerm: 4,
Entries: []*Log{
&Log{
{
Index: 101,
Term: 4,
Type: LogNoop,
Expand Down Expand Up @@ -625,7 +624,7 @@ func TestNetworkTransport_PooledConn(t *testing.T) {
PrevLogEntry: 100,
PrevLogTerm: 4,
Entries: []*Log{
&Log{
{
Index: 101,
Term: 4,
Type: LogNoop,
Expand Down

0 comments on commit 904f3b9

Please sign in to comment.