Skip to content

Commit

Permalink
etcdserver: renaming db happens after snapshot persists to wal and sn…
Browse files Browse the repository at this point in the history
…ap files

In the case that follower recieves a snapshot from leader
and crashes before renaming xxx.snap.db to db but after
snapshot has persisted to .wal and .snap, restarting
follower results loading old db, new .wal, and new .snap.
This will causes a index mismatch between snap metadata index
and consistent index from db.

This pr forces an ordering where saving/renaming db must
happen after snapshot is persisted to wal and snap file.
this guarantees wal and snap files are newer than db.
on server restart, etcd server checks if snap index > db consistent index.
if yes, etcd server attempts to load xxx.snap.db where xxx=snap index
if there is any and panic other wise.

FIXES #7628
  • Loading branch information
fanminshi committed May 8, 2017
1 parent 505bf8c commit d591c97
Show file tree
Hide file tree
Showing 4 changed files with 64 additions and 25 deletions.
19 changes: 12 additions & 7 deletions etcdserver/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,8 @@ type RaftTimer interface {
type apply struct {
entries []raftpb.Entry
snapshot raftpb.Snapshot
raftDone <-chan struct{} // rx {} after raft has persisted messages
// notifyc synchronizes etcd server applies with the raft node
notifyc chan struct{}
}

type raftNode struct {
Expand Down Expand Up @@ -190,11 +191,11 @@ func (r *raftNode) start(rh *raftReadyHandler) {
}
}

raftDone := make(chan struct{}, 1)
notifyc := make(chan struct{}, 1)
ap := apply{
entries: rd.CommittedEntries,
snapshot: rd.Snapshot,
raftDone: raftDone,
notifyc: notifyc,
}

updateCommittedIndex(&ap, rh)
Expand Down Expand Up @@ -227,10 +228,14 @@ func (r *raftNode) start(rh *raftReadyHandler) {
if err := r.storage.SaveSnap(rd.Snapshot); err != nil {
plog.Fatalf("raft save snapshot error: %v", err)
}
// etcdserver now claim the snapshot has been persisted onto the disk
notifyc <- struct{}{}

// gofail: var raftAfterSaveSnap struct{}
r.raftStorage.ApplySnapshot(rd.Snapshot)
plog.Infof("raft applied incoming snapshot at index %d", rd.Snapshot.Metadata.Index)
// gofail: var raftAfterApplySnap struct{}

}

r.raftStorage.Append(rd.Entries)
Expand All @@ -240,7 +245,7 @@ func (r *raftNode) start(rh *raftReadyHandler) {
msgs := r.processMessages(rd.Messages)

// now unblocks 'applyAll' that waits on Raft log disk writes before triggering snapshots
raftDone <- struct{}{}
notifyc <- struct{}{}

// Candidate or follower needs to wait for all pending configuration
// changes to be applied before sending messages.
Expand All @@ -259,9 +264,9 @@ func (r *raftNode) start(rh *raftReadyHandler) {
if waitApply {
// blocks until 'applyAll' calls 'applyWait.Trigger'
// to be in sync with scheduled config-change job
// (assume raftDone has cap of 1)
// (assume notifyc has cap of 1)
select {
case raftDone <- struct{}{}:
case notifyc <- struct{}{}:
case <-r.stopped:
return
}
Expand All @@ -271,7 +276,7 @@ func (r *raftNode) start(rh *raftReadyHandler) {
r.transport.Send(msgs)
} else {
// leader already processed 'MsgSnap' and signaled
raftDone <- struct{}{}
notifyc <- struct{}{}
}

r.Advance()
Expand Down
39 changes: 24 additions & 15 deletions etcdserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,20 +274,7 @@ func NewServer(cfg *ServerConfig) (srv *EtcdServer, err error) {
bepath := filepath.Join(cfg.SnapDir(), databaseFilename)
beExist := fileutil.Exist(bepath)

var be backend.Backend
beOpened := make(chan struct{})
go func() {
be = newBackend(bepath, cfg.QuotaBackendBytes)
beOpened <- struct{}{}
}()

select {
case <-beOpened:
case <-time.After(time.Second):
plog.Warningf("another etcd process is running with the same data dir and holding the file lock.")
plog.Warningf("waiting for it to exit before starting...")
<-beOpened
}
be := loadBackend(bepath, cfg.QuotaBackendBytes)

defer func() {
if err != nil {
Expand Down Expand Up @@ -385,6 +372,25 @@ func NewServer(cfg *ServerConfig) (srv *EtcdServer, err error) {
plog.Panicf("recovered store from snapshot error: %v", err)
}
plog.Infof("recovered store from snapshot at index %d", snapshot.Metadata.Index)

// when follwer crashes before updating its in-state db
// and after persisting snapshot to disk from syncing with leader,
// snapshot can be newer than db where (snapshot.Metadata.Index > db.consistentIndex ).
// when that happen, load xxx.snap.db that matches snap index.
var cIndex consistentIndex
kv := mvcc.New(be, &lease.FakeLessor{}, &cIndex)
kvindex := kv.ConsistentIndex()
if snapshot.Metadata.Index > kvindex {
snapfn, err := snap.GetDBFilePathByID(cfg.SnapDir(), snapshot.Metadata.Index)
if err != nil {
plog.Panic(err)
}
if err := os.Rename(snapfn, bepath); err != nil {
plog.Panicf("rename snapshot file error: %v", err)
}
be.Close()
be = loadBackend(bepath, cfg.QuotaBackendBytes)
}
}
cfg.Print()
if !cfg.ForceNewCluster {
Expand Down Expand Up @@ -778,7 +784,7 @@ func (s *EtcdServer) applyAll(ep *etcdProgress, apply *apply) {
// wait for the raft routine to finish the disk writes before triggering a
// snapshot. or applied index might be greater than the last index in raft
// storage, since the raft routine might be slower than apply routine.
<-apply.raftDone
<-apply.notifyc

s.triggerSnapshot(ep)
select {
Expand All @@ -803,6 +809,9 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) {
apply.snapshot.Metadata.Index, ep.appliedi)
}

// wait for raftNode to persist snashot onto the disk
<-apply.notifyc

snapfn, err := s.r.storage.DBFilePath(apply.snapshot.Metadata.Index)
if err != nil {
plog.Panicf("get database snapshot file path error: %v", err)
Expand Down
18 changes: 18 additions & 0 deletions etcdserver/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"time"

"github.com/coreos/etcd/etcdserver/membership"
"github.com/coreos/etcd/mvcc/backend"
"github.com/coreos/etcd/pkg/types"
"github.com/coreos/etcd/rafthttp"
)
Expand Down Expand Up @@ -95,3 +96,20 @@ func (nc *notifier) notify(err error) {
nc.err = err
close(nc.c)
}

func loadBackend(bepath string, quotaBackendBytes int64) (be backend.Backend) {
beOpened := make(chan struct{})
go func() {
be = newBackend(bepath, quotaBackendBytes)
beOpened <- struct{}{}
}()

select {
case <-beOpened:
case <-time.After(time.Second):
plog.Warningf("another etcd process is running with the same data dir and holding the file lock.")
plog.Warningf("waiting for it to exit before starting...")
<-beOpened
}
return be
}
13 changes: 10 additions & 3 deletions snap/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package snap

import (
"errors"
"fmt"
"io"
"io/ioutil"
Expand All @@ -24,6 +25,8 @@ import (
"github.com/coreos/etcd/pkg/fileutil"
)

var ErrNoDBSnapshot = errors.New("snap: snapshot file doesn't exist")

// SaveDBFrom saves snapshot of the database from the given reader. It
// guarantees the save operation is atomic.
func (s *Snapshotter) SaveDBFrom(r io.Reader, id uint64) (int64, error) {
Expand Down Expand Up @@ -60,15 +63,19 @@ func (s *Snapshotter) SaveDBFrom(r io.Reader, id uint64) (int64, error) {
// DBFilePath returns the file path for the snapshot of the database with
// given id. If the snapshot does not exist, it returns error.
func (s *Snapshotter) DBFilePath(id uint64) (string, error) {
fns, err := fileutil.ReadDir(s.dir)
return GetDBFilePathByID(s.dir, id)
}

func GetDBFilePathByID(dbPath string, id uint64) (string, error) {
fns, err := fileutil.ReadDir(dbPath)
if err != nil {
return "", err
}
wfn := fmt.Sprintf("%016x.snap.db", id)
for _, fn := range fns {
if fn == wfn {
return filepath.Join(s.dir, fn), nil
return filepath.Join(dbPath, fn), nil
}
}
return "", fmt.Errorf("snap: snapshot file doesn't exist")
return "", ErrNoDBSnapshot
}

0 comments on commit d591c97

Please sign in to comment.