Skip to content

Commit

Permalink
etcdserver: add a test to ensure renaming db happens before persistin…
Browse files Browse the repository at this point in the history
…g wal and snap files.
  • Loading branch information
fanminshi committed May 5, 2017
1 parent c1713e0 commit 7590bf3
Show file tree
Hide file tree
Showing 2 changed files with 96 additions and 48 deletions.
2 changes: 1 addition & 1 deletion etcdserver/raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ func TestConfgChangeBlocksApply(t *testing.T) {
}

// finish apply, unblock raft routine
<-ap.raftDone
<-ap.notifyc

select {
case <-continueC:
Expand Down
142 changes: 95 additions & 47 deletions etcdserver/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -951,7 +951,7 @@ func TestSnapshot(t *testing.T) {
<-ch
}

// snapshot should snapshot the store and cut the persistent
// TestSnapshotOrdering ensures that when apply snapshot, etcdserver renames snap db to db before raft persists snapshot to wal and snap files.
func TestSnapshotOrdering(t *testing.T) {
n := newNopReadyNode()
st := store.New()
Expand All @@ -968,12 +968,13 @@ func TestSnapshotOrdering(t *testing.T) {
}

rs := raft.NewMemoryStorage()
p := mockstorage.NewStorageRecorderStream(testdir)
tr, snapDoneC := rafthttp.NewSnapTransporter(testdir)
r := newRaftNode(raftNodeConfig{
isIDRemoved: func(id uint64) bool { return cl.IsIDRemoved(types.ID(id)) },
Node: n,
transport: tr,
storage: mockstorage.NewStorageRecorder(testdir),
storage: p,
raftStorage: rs,
})
s := &EtcdServer{
Expand All @@ -997,55 +998,102 @@ func TestSnapshotOrdering(t *testing.T) {
s.start()
defer s.Stop()

// submit applied entries and snap entries
idx := uint64(0)
outdated := 0
accepted := 0
for k := 1; k <= 101; k++ {
idx++
ch := s.w.Register(uint64(idx))
req := &pb.Request{Method: "QGET", ID: uint64(idx)}
ent := raftpb.Entry{Index: uint64(idx), Data: pbutil.MustMarshal(req)}
ready := raft.Ready{Entries: []raftpb.Entry{ent}}
n.readyc <- ready

ready = raft.Ready{CommittedEntries: []raftpb.Entry{ent}}
n.readyc <- ready

// "idx" applied
<-ch

// one snapshot for every two messages
if k%2 != 0 {
continue
}

n.readyc <- raft.Ready{Messages: []raftpb.Message{{Type: raftpb.MsgSnap}}}
// get the snapshot sent by the transport
snapMsg := <-snapDoneC
// If the snapshot trails applied records, recovery will panic
// since there's no allocated snapshot at the place of the
// snapshot record. This only happens when the applier and the
// snapshot sender get out of sync.
if snapMsg.Snapshot.Metadata.Index == idx {
idx++
snapMsg.Snapshot.Metadata.Index = idx
ready = raft.Ready{Snapshot: snapMsg.Snapshot}
n.readyc <- ready
accepted++
} else {
outdated++
actionc := p.Chan()
n.readyc <- raft.Ready{Messages: []raftpb.Message{{Type: raftpb.MsgSnap}}}
ac := <-actionc
// MsgSnap triggers raftNode to call Save()
if ac.Name != "Save" {
t.Fatalf("expect save() is called, but got %v", ac.Name)
}
// get the snapshot sent by the transport
snapMsg := <-snapDoneC

// Snapshot ready triggers etcd server to rename snapshot db to db the first
// and raftnode to persist snapshot to wal and snap files the second.
snapMsg.Snapshot.Metadata.Index = 1
ready := raft.Ready{Snapshot: snapMsg.Snapshot}
n.readyc <- ready
var seenDBFilePath bool
timer := time.After(5 * time.Second)
for {
select {
case ac := <-actionc:
switch ac.Name {
case "DBFilePath":
seenDBFilePath = true
case "SaveSnap":
if !seenDBFilePath {
t.Fatalf("expect DBFilePath calls before SaveSnap, but it is other way around")
}
return
default:
continue
}
case <-timer:
t.Fatalf("timeout waiting on actions")
}
// don't wait for the snapshot to complete, move to next message
}
if accepted != 50 {
t.Errorf("accepted=%v, want 50", accepted)
}
if outdated != 0 {
t.Errorf("outdated=%v, want 0", outdated)
}
}

// func TestSnapshotOrdering2(t *testing.T) {
// testdir, err := ioutil.TempDir(os.TempDir(), "testsnapdir")
// if err != nil {
// t.Fatalf("Couldn't open tempdir (%v)", err)
// }
// defer os.RemoveAll(testdir)
// snapDir := testdir + "/member/snap"
// if err := os.MkdirAll(snapDir, 0755); err != nil {
// t.Fatalf("Couldn't make snap dir (%v)", err)
// }

// n := newNopReadyNode()
// s := raft.NewMemoryStorage()
// s.Append([]raftpb.Entry{{Index: 1}})
// st := mockstore.NewRecorderStream()
// p := mockstorage.NewStorageRecorderStream(snapDir)
// r := newRaftNode(raftNodeConfig{
// Node: n,
// raftStorage: s,
// storage: p,
// transport: rafthttp.NewNopTransporter(),
// })
// srv := &EtcdServer{
// Cfg: &ServerConfig{
// DataDir: testdir,
// },
// r: *r,
// store: st,
// cluster: membership.NewCluster("abc"),
// }
// // create snap db file.
// beconf := backend.DefaultBackendConfig()
// beconf.Path = snapDir + "/0000000000000001.snap.db"
// be := backend.New(beconf)
// kv := mvcc.New(be, &lease.FakeLessor{}, &srv.consistIndex)
// // write consistIndex to db
// kv.Commit()
// kv.Close()
// be.Close()
// // create normal db file.
// beconf = backend.DefaultBackendConfig()
// beconf.Path = snapDir + "/db"
// be = backend.New(beconf)
// srv.be = be
// srv.kv = mvcc.New(be, &lease.FakeLessor{}, &srv.consistIndex)

// srv.start()
// defer srv.Stop()

// snap := raftpb.Snapshot{Metadata: raftpb.SnapshotMetadata{Index: 1}}
// ready := raft.Ready{Snapshot: snap}
// n.readyc <- ready
// acs, err := p.Wait(2)
// if err != nil {
// t.Fatal(err)
// }
// log.Printf("actions %+v", acs)
// }

// Applied > SnapCount should trigger a SaveSnap event
func TestTriggerSnap(t *testing.T) {
be, tmpPath := backend.NewDefaultTmpBackend()
Expand Down

0 comments on commit 7590bf3

Please sign in to comment.