diff --git a/etcdserver/raft_test.go b/etcdserver/raft_test.go index 3396e60f0726..757826cc9e1e 100644 --- a/etcdserver/raft_test.go +++ b/etcdserver/raft_test.go @@ -211,7 +211,7 @@ func TestConfgChangeBlocksApply(t *testing.T) { } // finish apply, unblock raft routine - <-ap.raftDone + <-ap.notifyc select { case <-continueC: diff --git a/etcdserver/server_test.go b/etcdserver/server_test.go index 457b27c8b7e0..9db8b577ece7 100644 --- a/etcdserver/server_test.go +++ b/etcdserver/server_test.go @@ -951,7 +951,7 @@ func TestSnapshot(t *testing.T) { <-ch } -// snapshot should snapshot the store and cut the persistent +// TestSnapshotOrdering ensures that when applying snapshot etcdserver renames snap db to db before raft persists snapshot to wal and snap files. func TestSnapshotOrdering(t *testing.T) { n := newNopReadyNode() st := store.New() @@ -968,12 +968,13 @@ func TestSnapshotOrdering(t *testing.T) { } rs := raft.NewMemoryStorage() + p := mockstorage.NewStorageRecorderStream(testdir) tr, snapDoneC := rafthttp.NewSnapTransporter(testdir) r := newRaftNode(raftNodeConfig{ isIDRemoved: func(id uint64) bool { return cl.IsIDRemoved(types.ID(id)) }, Node: n, transport: tr, - storage: mockstorage.NewStorageRecorder(testdir), + storage: p, raftStorage: rs, }) s := &EtcdServer{ @@ -997,52 +998,41 @@ func TestSnapshotOrdering(t *testing.T) { s.start() defer s.Stop() - // submit applied entries and snap entries - idx := uint64(0) - outdated := 0 - accepted := 0 - for k := 1; k <= 101; k++ { - idx++ - ch := s.w.Register(uint64(idx)) - req := &pb.Request{Method: "QGET", ID: uint64(idx)} - ent := raftpb.Entry{Index: uint64(idx), Data: pbutil.MustMarshal(req)} - ready := raft.Ready{Entries: []raftpb.Entry{ent}} - n.readyc <- ready - - ready = raft.Ready{CommittedEntries: []raftpb.Entry{ent}} - n.readyc <- ready - - // "idx" applied - <-ch - - // one snapshot for every two messages - if k%2 != 0 { - continue - } - - n.readyc <- raft.Ready{Messages: []raftpb.Message{{Type: raftpb.MsgSnap}}} - // get the snapshot sent by the transport - snapMsg := <-snapDoneC - // If the snapshot trails applied records, recovery will panic - // since there's no allocated snapshot at the place of the - // snapshot record. This only happens when the applier and the - // snapshot sender get out of sync. - if snapMsg.Snapshot.Metadata.Index == idx { - idx++ - snapMsg.Snapshot.Metadata.Index = idx - ready = raft.Ready{Snapshot: snapMsg.Snapshot} - n.readyc <- ready - accepted++ - } else { - outdated++ + actionc := p.Chan() + n.readyc <- raft.Ready{Messages: []raftpb.Message{{Type: raftpb.MsgSnap}}} + ac := <-actionc + // MsgSnap triggers raftNode to call Save() + if ac.Name != "Save" { + t.Fatalf("expect save() is called, but got %v", ac.Name) + } + // get the snapshot sent by the transport + snapMsg := <-snapDoneC + + // Snapshot ready triggers etcd server to rename snapshot db to db the first + // and raftnode to persist snapshot to wal and snap files the second. + snapMsg.Snapshot.Metadata.Index = 1 + ready := raft.Ready{Snapshot: snapMsg.Snapshot} + n.readyc <- ready + var seenDBFilePath bool + timer := time.After(5 * time.Second) + for { + select { + case ac := <-actionc: + switch ac.Name { + // DBFilePath() is called immediately before snapshot renaming. + case "DBFilePath": + seenDBFilePath = true + case "SaveSnap": + if !seenDBFilePath { + t.Fatalf("expect DBFilePath calls before SaveSnap, but it is other way around") + } + return + default: + continue + } + case <-timer: + t.Fatalf("timeout waiting on actions") } - // don't wait for the snapshot to complete, move to next message - } - if accepted != 50 { - t.Errorf("accepted=%v, want 50", accepted) - } - if outdated != 0 { - t.Errorf("outdated=%v, want 0", outdated) } }