diff --git a/etcdserver/server.go b/etcdserver/server.go index 33430276bff6..3efde8353fab 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -57,6 +57,7 @@ import ( "github.com/coreos/etcd/wal" "github.com/coreos/go-semver/semver" "github.com/coreos/pkg/capnslog" + "golang.org/x/net/context" ) @@ -275,19 +276,7 @@ func NewServer(cfg *ServerConfig) (srv *EtcdServer, err error) { beExist := fileutil.Exist(bepath) var be backend.Backend - beOpened := make(chan struct{}) - go func() { - be = newBackend(bepath, cfg.QuotaBackendBytes) - beOpened <- struct{}{} - }() - - select { - case <-beOpened: - case <-time.After(time.Second): - plog.Warningf("another etcd process is running with the same data dir and holding the file lock.") - plog.Warningf("waiting for it to exit before starting...") - <-beOpened - } + loadBackend(bepath, &be, cfg.QuotaBackendBytes) defer func() { if err != nil { @@ -385,6 +374,23 @@ func NewServer(cfg *ServerConfig) (srv *EtcdServer, err error) { plog.Panicf("recovered store from snapshot error: %v", err) } plog.Infof("recovered store from snapshot at index %d", snapshot.Metadata.Index) + + // if follower recieves snapshot from leader and crashes before renaming xxx.snap.db to db, + // restarting follower results loading a outdated db. + // In this case: + // 1. check if xxx.snap.db (xxx==snapshot.Metadata.Index) exists. + // 2. rename xxx.snap.db to db if exists. + // 3. load backend again with the new db file. + snapfn, err := snap.GetDBFilePathByID(cfg.SnapDir(), snapshot.Metadata.Index) + if err != nil && err != snap.ErrDBSnapFileNotFound { + return nil, err + } + if snapfn != "" { + if err := os.Rename(snapfn, bepath); err != nil { + plog.Panicf("rename snapshot file error: %v", err) + } + loadBackend(bepath, &be, cfg.QuotaBackendBytes) + } } cfg.Print() if !cfg.ForceNewCluster { @@ -454,6 +460,7 @@ func NewServer(cfg *ServerConfig) (srv *EtcdServer, err error) { kvindex := srv.kv.ConsistentIndex() // TODO: remove kvindex != 0 checking when we do not expect users to upgrade // etcd from pre-3.0 release. + // plog.Printf("DEBUG: db index %v snapshot index %v ", kvindex, snapshot.Metadata.Index) if snapshot != nil && kvindex < snapshot.Metadata.Index { if kvindex != 0 { return nil, fmt.Errorf("database file (%v index %d) does not match with snapshot (index %d).", bepath, kvindex, snapshot.Metadata.Index) diff --git a/etcdserver/util.go b/etcdserver/util.go index e3896ffc2d3d..3aeb26317bc4 100644 --- a/etcdserver/util.go +++ b/etcdserver/util.go @@ -18,6 +18,7 @@ import ( "time" "github.com/coreos/etcd/etcdserver/membership" + "github.com/coreos/etcd/mvcc/backend" "github.com/coreos/etcd/pkg/types" "github.com/coreos/etcd/rafthttp" ) @@ -95,3 +96,19 @@ func (nc *notifier) notify(err error) { nc.err = err close(nc.c) } + +func loadBackend(bepath string, be *backend.Backend, QuotaBackendBytes int64) { + beOpened := make(chan struct{}) + go func() { + *be = newBackend(bepath, QuotaBackendBytes) + beOpened <- struct{}{} + }() + + select { + case <-beOpened: + case <-time.After(time.Second): + plog.Warningf("another etcd process is running with the same data dir and holding the file lock.") + plog.Warningf("waiting for it to exit before starting...") + <-beOpened + } +}