Skip to content

Commit

Permalink
Puts FSM commits and restores into a single pipe so we control order.
Browse files Browse the repository at this point in the history
  • Loading branch information
slackpad committed Sep 29, 2016
1 parent 41b9e23 commit fe8cdcd
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 63 deletions.
18 changes: 10 additions & 8 deletions api.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,11 +65,14 @@ type Raft struct {
// FSM is the client state machine to apply commands to
fsm FSM

// fsmCommitCh is used to trigger async application of logs to the fsm
fsmCommitCh chan commitTuple

// fsmRestoreCh is used to trigger a restore from snapshot
fsmRestoreCh chan *restoreFuture
// fsmMutateCh is used to send state-changing updates to the FSM. This
// receives pointers to commitTuple structures when applying logs or
// pointers to restoreFuture structures when restoring a snapshot. We
// need control over the order of these operations when doing user
// restores so that we finish applying any old log applies before we
// take a user snapshot on the leader, otherwise we might restore the
// snapshot and apply old logs to it that were in the pipe.
fsmMutateCh chan interface{}

// fsmSnapshotCh is used to trigger a new snapshot being taken
fsmSnapshotCh chan *reqSnapshotFuture
Expand Down Expand Up @@ -434,8 +437,7 @@ func NewRaft(conf *Config, fsm FSM, logs LogStore, stable StableStore, snaps Sna
applyCh: make(chan *logFuture),
conf: *conf,
fsm: fsm,
fsmCommitCh: make(chan commitTuple, 128),
fsmRestoreCh: make(chan *restoreFuture),
fsmMutateCh: make(chan interface{}, 128),
fsmSnapshotCh: make(chan *reqSnapshotFuture),
leaderCh: make(chan bool),
localID: localID,
Expand Down Expand Up @@ -936,7 +938,7 @@ func (r *Raft) Stats() map[string]string {
"last_log_term": toString(lastLogTerm),
"commit_index": toString(r.getCommitIndex()),
"applied_index": toString(r.getLastApplied()),
"fsm_pending": toString(uint64(len(r.fsmCommitCh))),
"fsm_pending": toString(uint64(len(r.fsmMutateCh))),
"last_snapshot_index": toString(lastSnapIndex),
"last_snapshot_term": toString(lastSnapTerm),
"protocol_version": toString(uint64(r.protocolVersion)),
Expand Down
123 changes: 71 additions & 52 deletions fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,68 +48,87 @@ type FSMSnapshot interface {
// the FSM to block our internal operations.
func (r *Raft) runFSM() {
var lastIndex, lastTerm uint64
for {
select {
case req := <-r.fsmRestoreCh:
// Open the snapshot
meta, source, err := r.snapshots.Open(req.ID)
if err != nil {
req.respond(fmt.Errorf("failed to open snapshot %v: %v", req.ID, err))
continue
}

// Attempt to restore
commit := func(req *commitTuple) {
// Apply the log if a command
var resp interface{}
if req.log.Type == LogCommand {
start := time.Now()
if err := r.fsm.Restore(source); err != nil {
req.respond(fmt.Errorf("failed to restore snapshot %v: %v", req.ID, err))
source.Close()
continue
}
resp = r.fsm.Apply(req.log)
metrics.MeasureSince([]string{"raft", "fsm", "apply"}, start)
}

// Update the indexes
lastIndex = req.log.Index
lastTerm = req.log.Term

// Invoke the future if given
if req.future != nil {
req.future.response = resp
req.future.respond(nil)
}
}

restore := func(req *restoreFuture) {
// Open the snapshot
meta, source, err := r.snapshots.Open(req.ID)
if err != nil {
req.respond(fmt.Errorf("failed to open snapshot %v: %v", req.ID, err))
return
}

// Attempt to restore
start := time.Now()
if err := r.fsm.Restore(source); err != nil {
req.respond(fmt.Errorf("failed to restore snapshot %v: %v", req.ID, err))
source.Close()
metrics.MeasureSince([]string{"raft", "fsm", "restore"}, start)
return
}
source.Close()
metrics.MeasureSince([]string{"raft", "fsm", "restore"}, start)

// Update the last index and term
lastIndex = meta.Index
lastTerm = meta.Term
req.respond(nil)
// Update the last index and term
lastIndex = meta.Index
lastTerm = meta.Term
req.respond(nil)
}

case req := <-r.fsmSnapshotCh:
// Is there something to snapshot?
if lastIndex == 0 {
req.respond(ErrNothingNewToSnapshot)
continue
}
snapshot := func(req *reqSnapshotFuture) {
// Is there something to snapshot?
if lastIndex == 0 {
req.respond(ErrNothingNewToSnapshot)
return
}

// Start a snapshot
start := time.Now()
snap, err := r.fsm.Snapshot()
metrics.MeasureSince([]string{"raft", "fsm", "snapshot"}, start)

// Respond to the request
req.index = lastIndex
req.term = lastTerm
req.snapshot = snap
req.respond(err)

case commitEntry := <-r.fsmCommitCh:
// Apply the log if a command
var resp interface{}
if commitEntry.log.Type == LogCommand {
start := time.Now()
resp = r.fsm.Apply(commitEntry.log)
metrics.MeasureSince([]string{"raft", "fsm", "apply"}, start)
}
// Start a snapshot
start := time.Now()
snap, err := r.fsm.Snapshot()
metrics.MeasureSince([]string{"raft", "fsm", "snapshot"}, start)

// Update the indexes
lastIndex = commitEntry.log.Index
lastTerm = commitEntry.log.Term
// Respond to the request
req.index = lastIndex
req.term = lastTerm
req.snapshot = snap
req.respond(err)
}

for {
select {
case ptr := <-r.fsmMutateCh:
switch req := ptr.(type) {
case *commitTuple:
commit(req)

// Invoke the future if given
if commitEntry.future != nil {
commitEntry.future.response = resp
commitEntry.future.respond(nil)
case *restoreFuture:
restore(req)

default:
panic(fmt.Errorf("bad type passed to fsmMutateCh: %#v", ptr))
}

case req := <-r.fsmSnapshotCh:
snapshot(req)

case <-r.shutdownCh:
return
}
Expand Down
6 changes: 3 additions & 3 deletions raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -757,7 +757,7 @@ func (r *Raft) restoreUserSnapshot(meta *SnapshotMeta, reader io.ReadCloser) err
fsm := &restoreFuture{ID: sink.ID()}
fsm.init()
select {
case r.fsmRestoreCh <- fsm:
case r.fsmMutateCh <- fsm:
case <-r.shutdownCh:
return ErrRaftShutdown
}
Expand Down Expand Up @@ -902,7 +902,7 @@ func (r *Raft) processLog(l *Log, future *logFuture) {
case LogCommand:
// Forward to the fsm handler
select {
case r.fsmCommitCh <- commitTuple{l, future}:
case r.fsmMutateCh <- &commitTuple{l, future}:
case <-r.shutdownCh:
if future != nil {
future.respond(ErrRaftShutdown)
Expand Down Expand Up @@ -1302,7 +1302,7 @@ func (r *Raft) installSnapshot(rpc RPC, req *InstallSnapshotRequest) {
future := &restoreFuture{ID: sink.ID()}
future.init()
select {
case r.fsmRestoreCh <- future:
case r.fsmMutateCh <- future:
case <-r.shutdownCh:
future.respond(ErrRaftShutdown)
return
Expand Down

0 comments on commit fe8cdcd

Please sign in to comment.