diff --git a/.travis.yml b/.travis.yml index 94eb8668b66..d16fb1ada7d 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,8 +1,6 @@ language: go go: - - 1.4 - - 1.5 - 1.6 - tip diff --git a/Makefile b/Makefile index 61499c50760..49f82992397 100644 --- a/Makefile +++ b/Makefile @@ -1,10 +1,10 @@ DEPS = $(go list -f '{{range .TestImports}}{{.}} {{end}}' ./...) test: - go test -timeout=45s ./... + go test -timeout=60s ./... integ: test - INTEG_TESTS=yes go test -timeout=3s -run=Integ ./... + INTEG_TESTS=yes go test -timeout=5s -run=Integ ./... deps: go get -d -v ./... diff --git a/api.go b/api.go index ff14131c4aa..26fb77d2cdc 100644 --- a/api.go +++ b/api.go @@ -3,6 +3,7 @@ package raft import ( "errors" "fmt" + "io" "log" "os" "strconv" @@ -64,11 +65,14 @@ type Raft struct { // FSM is the client state machine to apply commands to fsm FSM - // fsmCommitCh is used to trigger async application of logs to the fsm - fsmCommitCh chan commitTuple - - // fsmRestoreCh is used to trigger a restore from snapshot - fsmRestoreCh chan *restoreFuture + // fsmMutateCh is used to send state-changing updates to the FSM. This + // receives pointers to commitTuple structures when applying logs or + // pointers to restoreFuture structures when restoring a snapshot. We + // need control over the order of these operations when doing user + // restores so that we finish applying any old log applies before we + // take a user snapshot on the leader, otherwise we might restore the + // snapshot and apply old logs to it that were in the pipe. + fsmMutateCh chan interface{} // fsmSnapshotCh is used to trigger a new snapshot being taken fsmSnapshotCh chan *reqSnapshotFuture @@ -118,8 +122,12 @@ type Raft struct { // snapshots is used to store and retrieve snapshots snapshots SnapshotStore - // snapshotCh is used for user triggered snapshots - snapshotCh chan *snapshotFuture + // userSnapshotCh is used for user-triggered snapshots + userSnapshotCh chan *userSnapshotFuture + + // userRestoreCh is used for user-triggered restores of external + // snapshots + userRestoreCh chan *userRestoreFuture // stable is a StableStore implementation for durable state // It provides stable storage for many fields in raftState @@ -429,8 +437,7 @@ func NewRaft(conf *Config, fsm FSM, logs LogStore, stable StableStore, snaps Sna applyCh: make(chan *logFuture), conf: *conf, fsm: fsm, - fsmCommitCh: make(chan commitTuple, 128), - fsmRestoreCh: make(chan *restoreFuture), + fsmMutateCh: make(chan interface{}, 128), fsmSnapshotCh: make(chan *reqSnapshotFuture), leaderCh: make(chan bool), localID: localID, @@ -441,7 +448,8 @@ func NewRaft(conf *Config, fsm FSM, logs LogStore, stable StableStore, snaps Sna configurations: configurations{}, rpcCh: trans.Consumer(), snapshots: snaps, - snapshotCh: make(chan *snapshotFuture), + userSnapshotCh: make(chan *userSnapshotFuture), + userRestoreCh: make(chan *userRestoreFuture), shutdownCh: make(chan struct{}), stable: stable, trans: trans, @@ -792,18 +800,78 @@ func (r *Raft) Shutdown() Future { return &shutdownFuture{nil} } -// Snapshot is used to manually force Raft to take a snapshot. -// Returns a future that can be used to block until complete. -func (r *Raft) Snapshot() Future { - snapFuture := &snapshotFuture{} - snapFuture.init() +// Snapshot is used to manually force Raft to take a snapshot. Returns a future +// that can be used to block until complete, and that contains a function that +// can be used to open the snapshot. +func (r *Raft) Snapshot() SnapshotFuture { + future := &userSnapshotFuture{} + future.init() + select { + case r.userSnapshotCh <- future: + return future + case <-r.shutdownCh: + future.respond(ErrRaftShutdown) + return future + } +} + +// Restore is used to manually force Raft to consume an external snapshot, such +// as if restoring from a backup. We will use the current Raft configuration, +// not the one from the snapshot, so that we can restore into a new cluster. We +// will also use the higher of the index of the snapshot, or the current index, +// and then add 1 to that, so we force a new state with a hole in the Raft log, +// so that the snapshot will be sent to followers and used for any new joiners. +// This can only be run on the leader, and returns a future that can be used to +// block until complete. +// +// WARNING! This operation has the leader take on the state of the snapshot and +// then sets itself up so that it replicates that to its followers though the +// install snapshot process. This involves a potentially dangerous period where +// the leader commits ahead of its followers, so should only be used for disaster +// recovery into a fresh cluster, and should not be used in normal operations. +func (r *Raft) Restore(meta *SnapshotMeta, reader io.ReadCloser, timeout time.Duration) Future { + metrics.IncrCounter([]string{"raft", "restore"}, 1) + var timer <-chan time.Time + if timeout > 0 { + timer = time.After(timeout) + } + + // Perform the restore. + restore := &userRestoreFuture{ + meta: meta, + reader: reader, + } + restore.init() select { - case r.snapshotCh <- snapFuture: - return snapFuture + case <-timer: + return errorFuture{ErrEnqueueTimeout} case <-r.shutdownCh: return errorFuture{ErrRaftShutdown} + case r.userRestoreCh <- restore: + // If the restore is ingested then wait for it to complete. + if err := restore.Error(); err != nil { + return restore + } } + // Apply a no-op log entry. Waiting for this allows us to wait until the + // followers have gotten the restore and replicated at least this new + // entry, which shows that we've also faulted and installed the + // snapshot with the contents of the restore. + noop := &logFuture{ + log: Log{ + Type: LogNoop, + }, + } + noop.init() + select { + case <-timer: + return errorFuture{ErrEnqueueTimeout} + case <-r.shutdownCh: + return errorFuture{ErrRaftShutdown} + case r.applyCh <- noop: + return noop + } } // State is used to return the current raft state. @@ -870,7 +938,7 @@ func (r *Raft) Stats() map[string]string { "last_log_term": toString(lastLogTerm), "commit_index": toString(r.getCommitIndex()), "applied_index": toString(r.getLastApplied()), - "fsm_pending": toString(uint64(len(r.fsmCommitCh))), + "fsm_pending": toString(uint64(len(r.fsmMutateCh))), "last_snapshot_index": toString(lastSnapIndex), "last_snapshot_term": toString(lastSnapTerm), "protocol_version": toString(uint64(r.protocolVersion)), diff --git a/fsm.go b/fsm.go index 23da1e99b38..c89986c0fad 100644 --- a/fsm.go +++ b/fsm.go @@ -48,67 +48,87 @@ type FSMSnapshot interface { // the FSM to block our internal operations. func (r *Raft) runFSM() { var lastIndex, lastTerm uint64 - for { - select { - case req := <-r.fsmRestoreCh: - // Open the snapshot - meta, source, err := r.snapshots.Open(req.ID) - if err != nil { - req.respond(fmt.Errorf("failed to open snapshot %v: %v", req.ID, err)) - continue - } - // Attempt to restore + commit := func(req *commitTuple) { + // Apply the log if a command + var resp interface{} + if req.log.Type == LogCommand { start := time.Now() - if err := r.fsm.Restore(source); err != nil { - req.respond(fmt.Errorf("failed to restore snapshot %v: %v", req.ID, err)) - source.Close() - continue - } + resp = r.fsm.Apply(req.log) + metrics.MeasureSince([]string{"raft", "fsm", "apply"}, start) + } + + // Update the indexes + lastIndex = req.log.Index + lastTerm = req.log.Term + + // Invoke the future if given + if req.future != nil { + req.future.response = resp + req.future.respond(nil) + } + } + + restore := func(req *restoreFuture) { + // Open the snapshot + meta, source, err := r.snapshots.Open(req.ID) + if err != nil { + req.respond(fmt.Errorf("failed to open snapshot %v: %v", req.ID, err)) + return + } + + // Attempt to restore + start := time.Now() + if err := r.fsm.Restore(source); err != nil { + req.respond(fmt.Errorf("failed to restore snapshot %v: %v", req.ID, err)) source.Close() - metrics.MeasureSince([]string{"raft", "fsm", "restore"}, start) + return + } + source.Close() + metrics.MeasureSince([]string{"raft", "fsm", "restore"}, start) - // Update the last index and term - lastIndex = meta.Index - lastTerm = meta.Term - req.respond(nil) + // Update the last index and term + lastIndex = meta.Index + lastTerm = meta.Term + req.respond(nil) + } - case req := <-r.fsmSnapshotCh: - // Is there something to snapshot? - if lastIndex == 0 { - req.respond(ErrNothingNewToSnapshot) - continue - } + snapshot := func(req *reqSnapshotFuture) { + // Is there something to snapshot? + if lastIndex == 0 { + req.respond(ErrNothingNewToSnapshot) + return + } - // Start a snapshot - start := time.Now() - snap, err := r.fsm.Snapshot() - metrics.MeasureSince([]string{"raft", "fsm", "snapshot"}, start) - - // Respond to the request - req.index = lastIndex - req.term = lastTerm - req.snapshot = snap - req.respond(err) - - case commitEntry := <-r.fsmCommitCh: - // Apply the log if a command - var resp interface{} - if commitEntry.log.Type == LogCommand { - start := time.Now() - resp = r.fsm.Apply(commitEntry.log) - metrics.MeasureSince([]string{"raft", "fsm", "apply"}, start) - } + // Start a snapshot + start := time.Now() + snap, err := r.fsm.Snapshot() + metrics.MeasureSince([]string{"raft", "fsm", "snapshot"}, start) - // Update the indexes - lastIndex = commitEntry.log.Index - lastTerm = commitEntry.log.Term + // Respond to the request + req.index = lastIndex + req.term = lastTerm + req.snapshot = snap + req.respond(err) + } - // Invoke the future if given - if commitEntry.future != nil { - commitEntry.future.response = resp - commitEntry.future.respond(nil) + for { + select { + case ptr := <-r.fsmMutateCh: + switch req := ptr.(type) { + case *commitTuple: + commit(req) + + case *restoreFuture: + restore(req) + + default: + panic(fmt.Errorf("bad type passed to fsmMutateCh: %#v", ptr)) } + + case req := <-r.fsmSnapshotCh: + snapshot(req) + case <-r.shutdownCh: return } diff --git a/future.go b/future.go index 67c74fc42ee..b77e25a47f6 100644 --- a/future.go +++ b/future.go @@ -1,6 +1,8 @@ package raft import ( + "fmt" + "io" "sync" "time" ) @@ -46,6 +48,16 @@ type ConfigurationFuture interface { Configuration() Configuration } +// SnapshotFuture is used for waiting on a user-triggered snapshot to complete. +type SnapshotFuture interface { + Future + + // Open is a function you can call to access the underlying snapshot and + // its metadata. This must not be called until after the Error method + // has returned. + Open() (*SnapshotMeta, io.ReadCloser, error) +} + // errorFuture is used to return a static error. type errorFuture struct { err error @@ -150,9 +162,41 @@ func (s *shutdownFuture) Error() error { return nil } -// snapshotFuture is used for waiting on a snapshot to complete. -type snapshotFuture struct { +// userSnapshotFuture is used for waiting on a user-triggered snapshot to +// complete. +type userSnapshotFuture struct { + deferError + + // opener is a function used to open the snapshot. This is filled in + // once the future returns with no error. + opener func() (*SnapshotMeta, io.ReadCloser, error) +} + +// Open is a function you can call to access the underlying snapshot and its +// metadata. +func (u *userSnapshotFuture) Open() (*SnapshotMeta, io.ReadCloser, error) { + if u.opener == nil { + return nil, nil, fmt.Errorf("no snapshot available") + } else { + // Invalidate the opener so it can't get called multiple times, + // which isn't generally safe. + defer func() { + u.opener = nil + }() + return u.opener() + } +} + +// userRestoreFuture is used for waiting on a user-triggered restore of an +// external snapshot to complete. +type userRestoreFuture struct { deferError + + // meta is the metadata that belongs with the snapshot. + meta *SnapshotMeta + + // reader is the interface to read the snapshot contents from. + reader io.ReadCloser } // reqSnapshotFuture is used for requesting a snapshot start. diff --git a/raft.go b/raft.go index a6c729413a8..fbb0772d202 100644 --- a/raft.go +++ b/raft.go @@ -160,6 +160,10 @@ func (r *Raft) runFollower() { // Reject any operations since we are not the leader v.respond(ErrNotLeader) + case r := <-r.userRestoreCh: + // Reject any restores since we are not the leader + r.respond(ErrNotLeader) + case c := <-r.configurationsCh: c.configurations = r.configurations.Clone() c.respond(nil) @@ -283,6 +287,10 @@ func (r *Raft) runCandidate() { // Reject any operations since we are not the leader v.respond(ErrNotLeader) + case r := <-r.userRestoreCh: + // Reject any restores since we are not the leader + r.respond(ErrNotLeader) + case c := <-r.configurationsCh: c.configurations = r.configurations.Clone() c.respond(nil) @@ -550,6 +558,10 @@ func (r *Raft) leaderLoop() { v.respond(nil) } + case future := <-r.userRestoreCh: + err := r.restoreUserSnapshot(future.meta, future.reader) + future.respond(err) + case c := <-r.configurationsCh: c.configurations = r.configurations.Clone() c.respond(nil) @@ -680,6 +692,92 @@ func (r *Raft) quorumSize() int { return voters/2 + 1 } +// restoreUserSnapshot is used to manually consume an external snapshot, such +// as if restoring from a backup. We will use the current Raft configuration, +// not the one from the snapshot, so that we can restore into a new cluster. We +// will also use the higher of the index of the snapshot, or the current index, +// and then add 1 to that, so we force a new state with a hole in the Raft log, +// so that the snapshot will be sent to followers and used for any new joiners. +// This can only be run on the leader, and returns a future that can be used to +// block until complete. +func (r *Raft) restoreUserSnapshot(meta *SnapshotMeta, reader io.ReadCloser) error { + defer metrics.MeasureSince([]string{"raft", "restoreUserSnapshot"}, time.Now()) + + // Sanity check the version. + version := meta.Version + if version < SnapshotVersionMin || version > SnapshotVersionMax { + return fmt.Errorf("unsupported snapshot version %d", version) + } + + // We don't support snapshots while there's a config change + // outstanding since the snapshot doesn't have a means to + // represent this state. + committedIndex := r.configurations.committedIndex + latestIndex := r.configurations.latestIndex + if committedIndex != latestIndex { + return fmt.Errorf("cannot restore snapshot now, wait until the configuration entry at %v has been applied (have applied %v)", + latestIndex, committedIndex) + } + + // We will overwrite the snapshot metadata with the current term, + // an index that's greater than the current index, or the last + // index in the snapshot. It's important that we leave a hole in + // the index so we know there's nothing in the Raft log there and + // replication will fault and send the snapshot. + term := r.getCurrentTerm() + lastIndex := r.getLastIndex() + if meta.Index > lastIndex { + lastIndex = meta.Index + } + lastIndex++ + + // Dump the snapshot. Note that we use the latest configuration, + // not the one that came with the snapshot. + sink, err := r.snapshots.Create(version, lastIndex, term, + r.configurations.latest, r.configurations.latestIndex, r.trans) + if err != nil { + return fmt.Errorf("failed to create snapshot: %v", err) + } + n, err := io.Copy(sink, reader) + if err != nil { + sink.Cancel() + return fmt.Errorf("failed to write snapshot: %v", err) + } + if n != meta.Size { + sink.Cancel() + return fmt.Errorf("failed to write snapshot, size didn't match (%d != %d)", n, meta.Size) + } + if err := sink.Close(); err != nil { + return fmt.Errorf("failed to close snapshot: %v", err) + } + r.logger.Printf("[INFO] raft: Copied %d bytes to local snapshot", n) + + // Restore the snapshot into the FSM. If this fails we are in a + // bad state so we panic to take ourselves out. + fsm := &restoreFuture{ID: sink.ID()} + fsm.init() + select { + case r.fsmMutateCh <- fsm: + case <-r.shutdownCh: + return ErrRaftShutdown + } + if err := fsm.Error(); err != nil { + panic(fmt.Errorf("failed to restore snapshot: %v", err)) + } + + // We set the last log so it looks like we've stored the empty + // index we burned. The last applied is set because we made the + // FSM take the snapshot state, and we store the last snapshot + // in the stable store since we created a snapshot as part of + // this process. + r.setLastLog(lastIndex, term) + r.setLastApplied(lastIndex) + r.setLastSnapshot(lastIndex, term) + + r.logger.Printf("[INFO] raft: Restored user snapshot (index %d)", lastIndex) + return nil +} + // appendConfigurationEntry changes the configuration and adds a new // configuration entry to the log. This must only be called from the // main thread. @@ -804,7 +902,7 @@ func (r *Raft) processLog(l *Log, future *logFuture) { case LogCommand: // Forward to the fsm handler select { - case r.fsmCommitCh <- commitTuple{l, future}: + case r.fsmMutateCh <- &commitTuple{l, future}: case <-r.shutdownCh: if future != nil { future.respond(ErrRaftShutdown) @@ -1204,7 +1302,7 @@ func (r *Raft) installSnapshot(rpc RPC, req *InstallSnapshotRequest) { future := &restoreFuture{ID: sink.ID()} future.init() select { - case r.fsmRestoreCh <- future: + case r.fsmMutateCh <- future: case <-r.shutdownCh: future.respond(ErrRaftShutdown) return diff --git a/raft_test.go b/raft_test.go index 407531b928c..6dccd25ccf9 100644 --- a/raft_test.go +++ b/raft_test.go @@ -1683,21 +1683,50 @@ func TestRaft_AutoSnapshot(t *testing.T) { } } -func TestRaft_ManualSnapshot(t *testing.T) { - // Make the cluster +func TestRaft_UserSnapshot(t *testing.T) { + // Make the cluster. conf := inmemConfig(t) conf.SnapshotThreshold = 50 conf.TrailingLogs = 10 c := MakeCluster(1, t, conf) defer c.Close() + // With nothing committed, asking for a snapshot should return an error. leader := c.Leader() - // with nothing commited, asking for a snapshot should return an error - ssErr := leader.Snapshot().Error() - if ssErr != ErrNothingNewToSnapshot { - t.Errorf("Attempt to manualy create snapshot should of errored because there's nothing to do: %v", ssErr) + if err := leader.Snapshot().Error(); err != ErrNothingNewToSnapshot { + c.FailNowf("[ERR] Request for Snapshot failed: %v", err) + } + + // Commit some things. + var future Future + for i := 0; i < 10; i++ { + future = leader.Apply([]byte(fmt.Sprintf("test %d", i)), 0) + } + if err := future.Error(); err != nil { + c.FailNowf("[ERR] Error Apply new log entries: %v", err) + } + + // Now we should be able to ask for a snapshot without getting an error. + if err := leader.Snapshot().Error(); err != nil { + c.FailNowf("[ERR] Request for Snapshot failed: %v", err) + } + + // Check for snapshot + if snaps, _ := leader.snapshots.List(); len(snaps) == 0 { + c.FailNowf("[ERR] should have a snapshot") } - // commit some things +} + +// snapshotAndRestore does a snapshot and restore sequence and applies the given +// offset to the snapshot index, so we can try out different situations. +func snapshotAndRestore(t *testing.T, offset uint64) { + // Make the cluster. + conf := inmemConfig(t) + c := MakeCluster(3, t, conf) + defer c.Close() + + // Wait for things to get stable and commit some things. + leader := c.Leader() var future Future for i := 0; i < 10; i++ { future = leader.Apply([]byte(fmt.Sprintf("test %d", i)), 0) @@ -1705,11 +1734,87 @@ func TestRaft_ManualSnapshot(t *testing.T) { if err := future.Error(); err != nil { c.FailNowf("[ERR] Error Apply new log entries: %v", err) } - // now we should be able to ask for a snapshot without getting an error - ssErr = leader.Snapshot().Error() - if ssErr != nil { - t.Errorf("Request for Snapshot failed: %v", ssErr) + + // Take a snapshot. + snap := leader.Snapshot() + if err := snap.Error(); err != nil { + c.FailNowf("[ERR] Request for Snapshot failed: %v", err) + } + + // Commit some more things. + for i := 10; i < 20; i++ { + future = leader.Apply([]byte(fmt.Sprintf("test %d", i)), 0) + } + if err := future.Error(); err != nil { + c.FailNowf("[ERR] Error Apply new log entries: %v", err) + } + + // Get the last index before the restore. + preIndex := leader.getLastIndex() + + // Restore the snapshot, twiddling the index with the offset. + meta, reader, err := snap.Open() + meta.Index += offset + if err != nil { + c.FailNowf("[ERR] Snapshot open failed: %v", err) + } + defer reader.Close() + restore := leader.Restore(meta, reader, 5*time.Second) + if err := restore.Error(); err != nil { + c.FailNowf("[ERR] Restore failed: %v", err) + } + + // Make sure the index was updated correctly. We add 2 because we burn + // an index to create a hole, and then we apply a no-op after the + // restore. + var expected uint64 + if meta.Index < preIndex { + expected = preIndex + 2 + } else { + expected = meta.Index + 2 + } + lastIndex := leader.getLastIndex() + if lastIndex != expected { + c.FailNowf("[ERR] Index was not updated correctly: %d vs. %d", lastIndex, expected) } + + // Ensure all the logs are the same and that we have everything that was + // part of the original snapshot, and that the contents after were + // reverted. + c.EnsureSame(t) + fsm := c.fsms[0] + fsm.Lock() + if len(fsm.logs) != 10 { + c.FailNowf("[ERR] Log length bad: %d", len(fsm.logs)) + } + for i, entry := range fsm.logs { + expected := []byte(fmt.Sprintf("test %d", i)) + if bytes.Compare(entry, expected) != 0 { + c.FailNowf("[ERR] Log entry bad: %v", entry) + } + } + fsm.Unlock() + + // Commit some more things. + for i := 20; i < 30; i++ { + future = leader.Apply([]byte(fmt.Sprintf("test %d", i)), 0) + } + if err := future.Error(); err != nil { + c.FailNowf("[ERR] Error Apply new log entries: %v", err) + } + c.EnsureSame(t) +} + +func TestRaft_UserRestore(t *testing.T) { + // Snapshots from the past. + snapshotAndRestore(t, 0) + snapshotAndRestore(t, 1) + snapshotAndRestore(t, 2) + + // Snapshots from the future. + snapshotAndRestore(t, 100) + snapshotAndRestore(t, 1000) + snapshotAndRestore(t, 10000) } func TestRaft_SendSnapshotFollower(t *testing.T) { diff --git a/snapshot.go b/snapshot.go index 8402e093807..5287ebc4183 100644 --- a/snapshot.go +++ b/snapshot.go @@ -76,15 +76,19 @@ func (r *Raft) runSnapshots() { } // Trigger a snapshot - if err := r.takeSnapshot(); err != nil { + if _, err := r.takeSnapshot(); err != nil { r.logger.Printf("[ERR] raft: Failed to take snapshot: %v", err) } - case future := <-r.snapshotCh: + case future := <-r.userSnapshotCh: // User-triggered, run immediately - err := r.takeSnapshot() + id, err := r.takeSnapshot() if err != nil { r.logger.Printf("[ERR] raft: Failed to take snapshot: %v", err) + } else { + future.opener = func() (*SnapshotMeta, io.ReadCloser, error) { + return r.snapshots.Open(id) + } } future.respond(err) @@ -113,8 +117,9 @@ func (r *Raft) shouldSnapshot() bool { } // takeSnapshot is used to take a new snapshot. This must only be called from -// the snapshot thread, never the main thread. -func (r *Raft) takeSnapshot() error { +// the snapshot thread, never the main thread. This returns the ID of the new +// snapshot, along with an error. +func (r *Raft) takeSnapshot() (string, error) { defer metrics.MeasureSince([]string{"raft", "snapshot", "takeSnapshot"}, time.Now()) // Create a request for the FSM to perform a snapshot. @@ -125,7 +130,7 @@ func (r *Raft) takeSnapshot() error { select { case r.fsmSnapshotCh <- snapReq: case <-r.shutdownCh: - return ErrRaftShutdown + return "", ErrRaftShutdown } // Wait until we get a response @@ -133,7 +138,7 @@ func (r *Raft) takeSnapshot() error { if err != ErrNothingNewToSnapshot { err = fmt.Errorf("failed to start snapshot: %v", err) } - return err + return "", err } defer snapReq.snapshot.Release() @@ -145,10 +150,10 @@ func (r *Raft) takeSnapshot() error { select { case r.configurationsCh <- configReq: case <-r.shutdownCh: - return ErrRaftShutdown + return "", ErrRaftShutdown } if err := configReq.Error(); err != nil { - return err + return "", err } committed := configReq.configurations.committed committedIndex := configReq.configurations.committedIndex @@ -162,7 +167,7 @@ func (r *Raft) takeSnapshot() error { // then it's not crucial that we snapshot, since there's not much going // on Raft-wise. if snapReq.index < committedIndex { - return fmt.Errorf("cannot take snapshot now, wait until the configuration entry at %v has been applied (have applied %v)", + return "", fmt.Errorf("cannot take snapshot now, wait until the configuration entry at %v has been applied (have applied %v)", committedIndex, snapReq.index) } @@ -172,7 +177,7 @@ func (r *Raft) takeSnapshot() error { version := getSnapshotVersion(r.protocolVersion) sink, err := r.snapshots.Create(version, snapReq.index, snapReq.term, committed, committedIndex, r.trans) if err != nil { - return fmt.Errorf("failed to create snapshot: %v", err) + return "", fmt.Errorf("failed to create snapshot: %v", err) } metrics.MeasureSince([]string{"raft", "snapshot", "create"}, start) @@ -180,13 +185,13 @@ func (r *Raft) takeSnapshot() error { start = time.Now() if err := snapReq.snapshot.Persist(sink); err != nil { sink.Cancel() - return fmt.Errorf("failed to persist snapshot: %v", err) + return "", fmt.Errorf("failed to persist snapshot: %v", err) } metrics.MeasureSince([]string{"raft", "snapshot", "persist"}, start) // Close and check for error. if err := sink.Close(); err != nil { - return fmt.Errorf("failed to close snapshot: %v", err) + return "", fmt.Errorf("failed to close snapshot: %v", err) } // Update the last stable snapshot info. @@ -194,11 +199,11 @@ func (r *Raft) takeSnapshot() error { // Compact the logs. if err := r.compactLogs(snapReq.index); err != nil { - return err + return "", err } r.logger.Printf("[INFO] raft: Snapshot to %d complete", snapReq.index) - return nil + return sink.ID(), nil } // compactLogs takes the last inclusive index of a snapshot