diff --git a/etcdserver/raft.go b/etcdserver/raft.go index b87ceea54660..a7867db684d6 100644 --- a/etcdserver/raft.go +++ b/etcdserver/raft.go @@ -83,7 +83,8 @@ type RaftTimer interface { type apply struct { entries []raftpb.Entry snapshot raftpb.Snapshot - raftDone <-chan struct{} // rx {} after raft has persisted messages + // notifyc synchronizes etcd server applies with the raft node + notifyc chan struct{} } type raftNode struct { @@ -190,11 +191,11 @@ func (r *raftNode) start(rh *raftReadyHandler) { } } - raftDone := make(chan struct{}, 1) + notifyc := make(chan struct{}, 1) ap := apply{ entries: rd.CommittedEntries, snapshot: rd.Snapshot, - raftDone: raftDone, + notifyc: notifyc, } updateCommittedIndex(&ap, rh) @@ -227,10 +228,14 @@ func (r *raftNode) start(rh *raftReadyHandler) { if err := r.storage.SaveSnap(rd.Snapshot); err != nil { plog.Fatalf("raft save snapshot error: %v", err) } + // etcdserver now claim the snapshot has been persisted onto the disk + notifyc <- struct{}{} + // gofail: var raftAfterSaveSnap struct{} r.raftStorage.ApplySnapshot(rd.Snapshot) plog.Infof("raft applied incoming snapshot at index %d", rd.Snapshot.Metadata.Index) // gofail: var raftAfterApplySnap struct{} + } r.raftStorage.Append(rd.Entries) @@ -240,7 +245,7 @@ func (r *raftNode) start(rh *raftReadyHandler) { msgs := r.processMessages(rd.Messages) // now unblocks 'applyAll' that waits on Raft log disk writes before triggering snapshots - raftDone <- struct{}{} + notifyc <- struct{}{} // Candidate or follower needs to wait for all pending configuration // changes to be applied before sending messages. @@ -259,9 +264,9 @@ func (r *raftNode) start(rh *raftReadyHandler) { if waitApply { // blocks until 'applyAll' calls 'applyWait.Trigger' // to be in sync with scheduled config-change job - // (assume raftDone has cap of 1) + // (assume notifyc has cap of 1) select { - case raftDone <- struct{}{}: + case notifyc <- struct{}{}: case <-r.stopped: return } @@ -271,7 +276,7 @@ func (r *raftNode) start(rh *raftReadyHandler) { r.transport.Send(msgs) } else { // leader already processed 'MsgSnap' and signaled - raftDone <- struct{}{} + notifyc <- struct{}{} } r.Advance() diff --git a/etcdserver/server.go b/etcdserver/server.go index 33430276bff6..f921f854dd83 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -274,20 +274,7 @@ func NewServer(cfg *ServerConfig) (srv *EtcdServer, err error) { bepath := filepath.Join(cfg.SnapDir(), databaseFilename) beExist := fileutil.Exist(bepath) - var be backend.Backend - beOpened := make(chan struct{}) - go func() { - be = newBackend(bepath, cfg.QuotaBackendBytes) - beOpened <- struct{}{} - }() - - select { - case <-beOpened: - case <-time.After(time.Second): - plog.Warningf("another etcd process is running with the same data dir and holding the file lock.") - plog.Warningf("waiting for it to exit before starting...") - <-beOpened - } + be := loadBackend(bepath, cfg.QuotaBackendBytes) defer func() { if err != nil { @@ -385,6 +372,25 @@ func NewServer(cfg *ServerConfig) (srv *EtcdServer, err error) { plog.Panicf("recovered store from snapshot error: %v", err) } plog.Infof("recovered store from snapshot at index %d", snapshot.Metadata.Index) + + // when follwer crashes before updating its in-state db + // and after persisting snapshot to disk from syncing with leader, + // snapshot can be newer than db where (snapshot.Metadata.Index > db.consistentIndex ). + // when that happen, load xxx.snap.db that matches snap index. + var cIndex consistentIndex + kv := mvcc.New(be, &lease.FakeLessor{}, &cIndex) + kvindex := kv.ConsistentIndex() + if snapshot.Metadata.Index > kvindex { + snapfn, err := snap.GetDBFilePathByID(cfg.SnapDir(), snapshot.Metadata.Index) + if err != nil { + plog.Panic(err) + } + if err := os.Rename(snapfn, bepath); err != nil { + plog.Panicf("rename snapshot file error: %v", err) + } + be.Close() + be = loadBackend(bepath, cfg.QuotaBackendBytes) + } } cfg.Print() if !cfg.ForceNewCluster { @@ -778,7 +784,7 @@ func (s *EtcdServer) applyAll(ep *etcdProgress, apply *apply) { // wait for the raft routine to finish the disk writes before triggering a // snapshot. or applied index might be greater than the last index in raft // storage, since the raft routine might be slower than apply routine. - <-apply.raftDone + <-apply.notifyc s.triggerSnapshot(ep) select { @@ -803,6 +809,9 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) { apply.snapshot.Metadata.Index, ep.appliedi) } + // wait for raftNode to persist snashot onto the disk + <-apply.notifyc + snapfn, err := s.r.storage.DBFilePath(apply.snapshot.Metadata.Index) if err != nil { plog.Panicf("get database snapshot file path error: %v", err) diff --git a/etcdserver/util.go b/etcdserver/util.go index e3896ffc2d3d..2d337820b4bb 100644 --- a/etcdserver/util.go +++ b/etcdserver/util.go @@ -18,6 +18,7 @@ import ( "time" "github.com/coreos/etcd/etcdserver/membership" + "github.com/coreos/etcd/mvcc/backend" "github.com/coreos/etcd/pkg/types" "github.com/coreos/etcd/rafthttp" ) @@ -95,3 +96,20 @@ func (nc *notifier) notify(err error) { nc.err = err close(nc.c) } + +func loadBackend(bepath string, quotaBackendBytes int64) (be backend.Backend) { + beOpened := make(chan struct{}) + go func() { + be = newBackend(bepath, quotaBackendBytes) + beOpened <- struct{}{} + }() + + select { + case <-beOpened: + case <-time.After(time.Second): + plog.Warningf("another etcd process is running with the same data dir and holding the file lock.") + plog.Warningf("waiting for it to exit before starting...") + <-beOpened + } + return be +} diff --git a/snap/db.go b/snap/db.go index ae3c743f80c1..d36d91318f36 100644 --- a/snap/db.go +++ b/snap/db.go @@ -15,6 +15,7 @@ package snap import ( + "errors" "fmt" "io" "io/ioutil" @@ -24,6 +25,8 @@ import ( "github.com/coreos/etcd/pkg/fileutil" ) +var ErrNoDBSnapshot = errors.New("snap: snapshot file doesn't exist") + // SaveDBFrom saves snapshot of the database from the given reader. It // guarantees the save operation is atomic. func (s *Snapshotter) SaveDBFrom(r io.Reader, id uint64) (int64, error) { @@ -60,15 +63,19 @@ func (s *Snapshotter) SaveDBFrom(r io.Reader, id uint64) (int64, error) { // DBFilePath returns the file path for the snapshot of the database with // given id. If the snapshot does not exist, it returns error. func (s *Snapshotter) DBFilePath(id uint64) (string, error) { - fns, err := fileutil.ReadDir(s.dir) + return GetDBFilePathByID(s.dir, id) +} + +func GetDBFilePathByID(dbPath string, id uint64) (string, error) { + fns, err := fileutil.ReadDir(dbPath) if err != nil { return "", err } wfn := fmt.Sprintf("%016x.snap.db", id) for _, fn := range fns { if fn == wfn { - return filepath.Join(s.dir, fn), nil + return filepath.Join(dbPath, fn), nil } } - return "", fmt.Errorf("snap: snapshot file doesn't exist") + return "", ErrNoDBSnapshot }