Skip to content

Commit

Permalink
Panic on storage failures
Browse files Browse the repository at this point in the history
  • Loading branch information
ongardie committed Oct 31, 2016
1 parent ded7e63 commit d62e269
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 24 deletions.
3 changes: 1 addition & 2 deletions api.go
Original file line number Diff line number Diff line change
Expand Up @@ -523,8 +523,7 @@ func NewRaft(conf *Config, fsm FSM, logs LogStore, stable StableStore, snaps Sna
for index := snapshotIndex + 1; index <= lastLog.Index; index++ {
var entry Log
if err := r.logs.GetLog(index, &entry); err != nil {
r.logger.Error("Failed to get log", "index", index, "error", err)
panic(err)
r.logger.Fatal("Failed to get log", "index", index, "error", err)
}
r.processMembershipLogEntry(&entry)
}
Expand Down
5 changes: 2 additions & 3 deletions peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -983,7 +983,7 @@ func (rpc *appendEntriesRPC) prepare(shared *peerShared, control peerControl) er
term, err := getTerm(rpc.req.PrevLogEntry)
if err != nil {
if err != errNeedsSnapshot {
shared.logger.Error("Failed to get log term",
shared.logger.Fatal("Failed to get log term",
"index", rpc.req.PrevLogEntry,
"error", err)
}
Expand All @@ -1008,9 +1008,8 @@ func (rpc *appendEntriesRPC) prepare(shared *peerShared, control peerControl) er
return errNeedsSnapshot
}
if err != nil {
shared.logger.Error("Failed to get log entry",
shared.logger.Fatal("Failed to get log entry",
"index", i, "error", err)
return err
}
rpc.req.Entries = append(rpc.req.Entries, &entry)
}
Expand Down
34 changes: 15 additions & 19 deletions raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,8 @@ func (r *raftServer) liveBootstrap(membership Membership) error {
// Make the configuration live.
var entry Log
if err := r.logs.GetLog(1, &entry); err != nil {
panic(err)
r.logger.Fatal("Could not read live-bootstrapped membership entry",
"error", err)
}
r.currentTerm = 1
r.persistCurrentTerm()
Expand Down Expand Up @@ -863,12 +864,7 @@ func (r *raftServer) dispatchLogs(applyLogs []*logFuture) {

// Write the log entry locally
if err := r.logs.StoreLogs(logs); err != nil {
r.logger.Error("Failed to commit logs", "error", err)
for _, applyLog := range applyLogs {
applyLog.respond(err)
}
r.stepDown()
return
r.logger.Fatal("Failed to store log entries", "error", err)
}

// Update the last log since it's on disk now
Expand Down Expand Up @@ -904,9 +900,8 @@ func (r *raftServer) processLogs(index Index, future *logFuture) {
} else {
l := new(Log)
if err := r.logs.GetLog(idx, l); err != nil {
r.logger.Error("Failed to get log",
r.logger.Fatal("Failed to get log entry",
"index", idx, "error", err)
panic(err)
}
r.processLog(l, nil)
}
Expand Down Expand Up @@ -1040,11 +1035,17 @@ func (r *raftServer) appendEntries(rpc RPC, a *AppendEntriesRequest) {
} else {
var prevLog Log
if err := r.logs.GetLog(a.PrevLogEntry, &prevLog); err != nil {
r.logger.Warn("Failed to get previous log entry",
if err == ErrLogNotFound {
r.logger.Warn("Failed to get previous log entry",
"prev_index", a.PrevLogEntry,
"last_index", lastIdx,
"error", err)
return
}
r.logger.Fatal("Failed to get previous log entry",
"prev_index", a.PrevLogEntry,
"last_index", lastIdx,
"error", err)
return
}
prevLogTerm = prevLog.Term
}
Expand All @@ -1071,16 +1072,14 @@ func (r *raftServer) appendEntries(rpc RPC, a *AppendEntriesRequest) {
}
var storeEntry Log
if err := r.logs.GetLog(entry.Index, &storeEntry); err != nil {
r.logger.Warn("Failed to get log entry",
r.logger.Fatal("Failed to get log entry",
"index", entry.Index, "error", err)
return
}
if entry.Term != storeEntry.Term {
r.logger.Warn("Clearing log suffix",
"from_index", entry.Index, "to_index", lastLogIdx)
if err := r.logs.DeleteRange(entry.Index, lastLogIdx); err != nil {
r.logger.Error("Failed to clear log suffix", "error", err)
return
r.logger.Fatal("Failed to clear log suffix", "error", err)
}
if entry.Index <= r.memberships.latestIndex {
r.memberships.latest = r.memberships.committed
Expand All @@ -1095,10 +1094,7 @@ func (r *raftServer) appendEntries(rpc RPC, a *AppendEntriesRequest) {
if n := len(newEntries); n > 0 {
// Append the new entries
if err := r.logs.StoreLogs(newEntries); err != nil {
r.logger.Error("Failed to append to logs", "error", err)
// TODO: leaving r.getLastLog() in the wrong
// state if there was a truncation above
return
r.logger.Fatal("Failed to append to logs", "error", err)
}

// Handle any new configuration changes
Expand Down

0 comments on commit d62e269

Please sign in to comment.