Skip to content

Commit

Permalink
Merge pull request hashicorp#159 from hashicorp/f-snapshot-restore
Browse files Browse the repository at this point in the history
Adds ability to access user snapshots and restore them.
  • Loading branch information
slackpad authored Sep 29, 2016
2 parents c69c15d + fe8cdcd commit 7c09f49
Show file tree
Hide file tree
Showing 8 changed files with 442 additions and 104 deletions.
2 changes: 0 additions & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
language: go

go:
- 1.4
- 1.5
- 1.6
- tip

Expand Down
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
DEPS = $(go list -f '{{range .TestImports}}{{.}} {{end}}' ./...)

test:
go test -timeout=45s ./...
go test -timeout=60s ./...

integ: test
INTEG_TESTS=yes go test -timeout=3s -run=Integ ./...
INTEG_TESTS=yes go test -timeout=5s -run=Integ ./...

deps:
go get -d -v ./...
Expand Down
104 changes: 86 additions & 18 deletions api.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package raft
import (
"errors"
"fmt"
"io"
"log"
"os"
"strconv"
Expand Down Expand Up @@ -64,11 +65,14 @@ type Raft struct {
// FSM is the client state machine to apply commands to
fsm FSM

// fsmCommitCh is used to trigger async application of logs to the fsm
fsmCommitCh chan commitTuple

// fsmRestoreCh is used to trigger a restore from snapshot
fsmRestoreCh chan *restoreFuture
// fsmMutateCh is used to send state-changing updates to the FSM. This
// receives pointers to commitTuple structures when applying logs or
// pointers to restoreFuture structures when restoring a snapshot. We
// need control over the order of these operations when doing user
// restores so that we finish applying any old log applies before we
// take a user snapshot on the leader, otherwise we might restore the
// snapshot and apply old logs to it that were in the pipe.
fsmMutateCh chan interface{}

// fsmSnapshotCh is used to trigger a new snapshot being taken
fsmSnapshotCh chan *reqSnapshotFuture
Expand Down Expand Up @@ -118,8 +122,12 @@ type Raft struct {
// snapshots is used to store and retrieve snapshots
snapshots SnapshotStore

// snapshotCh is used for user triggered snapshots
snapshotCh chan *snapshotFuture
// userSnapshotCh is used for user-triggered snapshots
userSnapshotCh chan *userSnapshotFuture

// userRestoreCh is used for user-triggered restores of external
// snapshots
userRestoreCh chan *userRestoreFuture

// stable is a StableStore implementation for durable state
// It provides stable storage for many fields in raftState
Expand Down Expand Up @@ -429,8 +437,7 @@ func NewRaft(conf *Config, fsm FSM, logs LogStore, stable StableStore, snaps Sna
applyCh: make(chan *logFuture),
conf: *conf,
fsm: fsm,
fsmCommitCh: make(chan commitTuple, 128),
fsmRestoreCh: make(chan *restoreFuture),
fsmMutateCh: make(chan interface{}, 128),
fsmSnapshotCh: make(chan *reqSnapshotFuture),
leaderCh: make(chan bool),
localID: localID,
Expand All @@ -441,7 +448,8 @@ func NewRaft(conf *Config, fsm FSM, logs LogStore, stable StableStore, snaps Sna
configurations: configurations{},
rpcCh: trans.Consumer(),
snapshots: snaps,
snapshotCh: make(chan *snapshotFuture),
userSnapshotCh: make(chan *userSnapshotFuture),
userRestoreCh: make(chan *userRestoreFuture),
shutdownCh: make(chan struct{}),
stable: stable,
trans: trans,
Expand Down Expand Up @@ -792,18 +800,78 @@ func (r *Raft) Shutdown() Future {
return &shutdownFuture{nil}
}

// Snapshot is used to manually force Raft to take a snapshot.
// Returns a future that can be used to block until complete.
func (r *Raft) Snapshot() Future {
snapFuture := &snapshotFuture{}
snapFuture.init()
// Snapshot is used to manually force Raft to take a snapshot. Returns a future
// that can be used to block until complete, and that contains a function that
// can be used to open the snapshot.
func (r *Raft) Snapshot() SnapshotFuture {
future := &userSnapshotFuture{}
future.init()
select {
case r.userSnapshotCh <- future:
return future
case <-r.shutdownCh:
future.respond(ErrRaftShutdown)
return future
}
}

// Restore is used to manually force Raft to consume an external snapshot, such
// as if restoring from a backup. We will use the current Raft configuration,
// not the one from the snapshot, so that we can restore into a new cluster. We
// will also use the higher of the index of the snapshot, or the current index,
// and then add 1 to that, so we force a new state with a hole in the Raft log,
// so that the snapshot will be sent to followers and used for any new joiners.
// This can only be run on the leader, and returns a future that can be used to
// block until complete.
//
// WARNING! This operation has the leader take on the state of the snapshot and
// then sets itself up so that it replicates that to its followers though the
// install snapshot process. This involves a potentially dangerous period where
// the leader commits ahead of its followers, so should only be used for disaster
// recovery into a fresh cluster, and should not be used in normal operations.
func (r *Raft) Restore(meta *SnapshotMeta, reader io.ReadCloser, timeout time.Duration) Future {
metrics.IncrCounter([]string{"raft", "restore"}, 1)
var timer <-chan time.Time
if timeout > 0 {
timer = time.After(timeout)
}

// Perform the restore.
restore := &userRestoreFuture{
meta: meta,
reader: reader,
}
restore.init()
select {
case r.snapshotCh <- snapFuture:
return snapFuture
case <-timer:
return errorFuture{ErrEnqueueTimeout}
case <-r.shutdownCh:
return errorFuture{ErrRaftShutdown}
case r.userRestoreCh <- restore:
// If the restore is ingested then wait for it to complete.
if err := restore.Error(); err != nil {
return restore
}
}

// Apply a no-op log entry. Waiting for this allows us to wait until the
// followers have gotten the restore and replicated at least this new
// entry, which shows that we've also faulted and installed the
// snapshot with the contents of the restore.
noop := &logFuture{
log: Log{
Type: LogNoop,
},
}
noop.init()
select {
case <-timer:
return errorFuture{ErrEnqueueTimeout}
case <-r.shutdownCh:
return errorFuture{ErrRaftShutdown}
case r.applyCh <- noop:
return noop
}
}

// State is used to return the current raft state.
Expand Down Expand Up @@ -870,7 +938,7 @@ func (r *Raft) Stats() map[string]string {
"last_log_term": toString(lastLogTerm),
"commit_index": toString(r.getCommitIndex()),
"applied_index": toString(r.getLastApplied()),
"fsm_pending": toString(uint64(len(r.fsmCommitCh))),
"fsm_pending": toString(uint64(len(r.fsmMutateCh))),
"last_snapshot_index": toString(lastSnapIndex),
"last_snapshot_term": toString(lastSnapTerm),
"protocol_version": toString(uint64(r.protocolVersion)),
Expand Down
124 changes: 72 additions & 52 deletions fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,67 +48,87 @@ type FSMSnapshot interface {
// the FSM to block our internal operations.
func (r *Raft) runFSM() {
var lastIndex, lastTerm uint64
for {
select {
case req := <-r.fsmRestoreCh:
// Open the snapshot
meta, source, err := r.snapshots.Open(req.ID)
if err != nil {
req.respond(fmt.Errorf("failed to open snapshot %v: %v", req.ID, err))
continue
}

// Attempt to restore
commit := func(req *commitTuple) {
// Apply the log if a command
var resp interface{}
if req.log.Type == LogCommand {
start := time.Now()
if err := r.fsm.Restore(source); err != nil {
req.respond(fmt.Errorf("failed to restore snapshot %v: %v", req.ID, err))
source.Close()
continue
}
resp = r.fsm.Apply(req.log)
metrics.MeasureSince([]string{"raft", "fsm", "apply"}, start)
}

// Update the indexes
lastIndex = req.log.Index
lastTerm = req.log.Term

// Invoke the future if given
if req.future != nil {
req.future.response = resp
req.future.respond(nil)
}
}

restore := func(req *restoreFuture) {
// Open the snapshot
meta, source, err := r.snapshots.Open(req.ID)
if err != nil {
req.respond(fmt.Errorf("failed to open snapshot %v: %v", req.ID, err))
return
}

// Attempt to restore
start := time.Now()
if err := r.fsm.Restore(source); err != nil {
req.respond(fmt.Errorf("failed to restore snapshot %v: %v", req.ID, err))
source.Close()
metrics.MeasureSince([]string{"raft", "fsm", "restore"}, start)
return
}
source.Close()
metrics.MeasureSince([]string{"raft", "fsm", "restore"}, start)

// Update the last index and term
lastIndex = meta.Index
lastTerm = meta.Term
req.respond(nil)
// Update the last index and term
lastIndex = meta.Index
lastTerm = meta.Term
req.respond(nil)
}

case req := <-r.fsmSnapshotCh:
// Is there something to snapshot?
if lastIndex == 0 {
req.respond(ErrNothingNewToSnapshot)
continue
}
snapshot := func(req *reqSnapshotFuture) {
// Is there something to snapshot?
if lastIndex == 0 {
req.respond(ErrNothingNewToSnapshot)
return
}

// Start a snapshot
start := time.Now()
snap, err := r.fsm.Snapshot()
metrics.MeasureSince([]string{"raft", "fsm", "snapshot"}, start)

// Respond to the request
req.index = lastIndex
req.term = lastTerm
req.snapshot = snap
req.respond(err)

case commitEntry := <-r.fsmCommitCh:
// Apply the log if a command
var resp interface{}
if commitEntry.log.Type == LogCommand {
start := time.Now()
resp = r.fsm.Apply(commitEntry.log)
metrics.MeasureSince([]string{"raft", "fsm", "apply"}, start)
}
// Start a snapshot
start := time.Now()
snap, err := r.fsm.Snapshot()
metrics.MeasureSince([]string{"raft", "fsm", "snapshot"}, start)

// Update the indexes
lastIndex = commitEntry.log.Index
lastTerm = commitEntry.log.Term
// Respond to the request
req.index = lastIndex
req.term = lastTerm
req.snapshot = snap
req.respond(err)
}

// Invoke the future if given
if commitEntry.future != nil {
commitEntry.future.response = resp
commitEntry.future.respond(nil)
for {
select {
case ptr := <-r.fsmMutateCh:
switch req := ptr.(type) {
case *commitTuple:
commit(req)

case *restoreFuture:
restore(req)

default:
panic(fmt.Errorf("bad type passed to fsmMutateCh: %#v", ptr))
}

case req := <-r.fsmSnapshotCh:
snapshot(req)

case <-r.shutdownCh:
return
}
Expand Down
48 changes: 46 additions & 2 deletions future.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package raft

import (
"fmt"
"io"
"sync"
"time"
)
Expand Down Expand Up @@ -46,6 +48,16 @@ type ConfigurationFuture interface {
Configuration() Configuration
}

// SnapshotFuture is used for waiting on a user-triggered snapshot to complete.
type SnapshotFuture interface {
Future

// Open is a function you can call to access the underlying snapshot and
// its metadata. This must not be called until after the Error method
// has returned.
Open() (*SnapshotMeta, io.ReadCloser, error)
}

// errorFuture is used to return a static error.
type errorFuture struct {
err error
Expand Down Expand Up @@ -150,9 +162,41 @@ func (s *shutdownFuture) Error() error {
return nil
}

// snapshotFuture is used for waiting on a snapshot to complete.
type snapshotFuture struct {
// userSnapshotFuture is used for waiting on a user-triggered snapshot to
// complete.
type userSnapshotFuture struct {
deferError

// opener is a function used to open the snapshot. This is filled in
// once the future returns with no error.
opener func() (*SnapshotMeta, io.ReadCloser, error)
}

// Open is a function you can call to access the underlying snapshot and its
// metadata.
func (u *userSnapshotFuture) Open() (*SnapshotMeta, io.ReadCloser, error) {
if u.opener == nil {
return nil, nil, fmt.Errorf("no snapshot available")
} else {
// Invalidate the opener so it can't get called multiple times,
// which isn't generally safe.
defer func() {
u.opener = nil
}()
return u.opener()
}
}

// userRestoreFuture is used for waiting on a user-triggered restore of an
// external snapshot to complete.
type userRestoreFuture struct {
deferError

// meta is the metadata that belongs with the snapshot.
meta *SnapshotMeta

// reader is the interface to read the snapshot contents from.
reader io.ReadCloser
}

// reqSnapshotFuture is used for requesting a snapshot start.
Expand Down
Loading

0 comments on commit 7c09f49

Please sign in to comment.