diff --git a/server/etcdserver/raft.go b/server/etcdserver/raft.go index 704e45a7aac3..d5bf82d423e8 100644 --- a/server/etcdserver/raft.go +++ b/server/etcdserver/raft.go @@ -68,8 +68,10 @@ func init() { type toApply struct { entries []raftpb.Entry snapshot raftpb.Snapshot - // notifyc synchronizes etcd server applies with the raft node - notifyc chan struct{} + // snapNotifyc synchronizes etcd server applies with the raft node + snapNotifyc chan struct{} + // walNotifyc synchronizes etcd server applies with the WAL persistence + walNotifyc chan struct{} } type raftNode struct { @@ -200,11 +202,16 @@ func (r *raftNode) start(rh *raftReadyHandler) { } } - notifyc := make(chan struct{}, 1) + snapNotifyc := make(chan struct{}, 1) + var walNotifyc chan struct{} + if shouldWaitWALSync(rd) { + walNotifyc = make(chan struct{}, 1) + } ap := toApply{ - entries: rd.CommittedEntries, - snapshot: rd.Snapshot, - notifyc: notifyc, + entries: rd.CommittedEntries, + snapshot: rd.Snapshot, + snapNotifyc: snapNotifyc, + walNotifyc: walNotifyc, } updateCommittedIndex(&ap, rh) @@ -237,6 +244,12 @@ func (r *raftNode) start(rh *raftReadyHandler) { if err := r.storage.Save(rd.HardState, rd.Entries); err != nil { r.lg.Fatal("failed to save Raft hard state and entries", zap.Error(err)) } + + if walNotifyc != nil { + // etcdserver should wait for this notification before responding to client. + walNotifyc <- struct{}{} + } + if !raft.IsEmptyHardState(rd.HardState) { proposalsCommitted.Set(float64(rd.HardState.Commit)) } @@ -252,7 +265,7 @@ func (r *raftNode) start(rh *raftReadyHandler) { } // etcdserver now claim the snapshot has been persisted onto the disk - notifyc <- struct{}{} + snapNotifyc <- struct{}{} // gofail: var raftBeforeApplySnap struct{} r.raftStorage.ApplySnapshot(rd.Snapshot) @@ -272,7 +285,7 @@ func (r *raftNode) start(rh *raftReadyHandler) { msgs := r.processMessages(rd.Messages) // now unblocks 'applyAll' that waits on Raft log disk writes before triggering snapshots - notifyc <- struct{}{} + snapNotifyc <- struct{}{} // Candidate or follower needs to wait for all pending configuration // changes to be applied before sending messages. @@ -293,7 +306,7 @@ func (r *raftNode) start(rh *raftReadyHandler) { // to be in sync with scheduled config-change job // (assume notifyc has cap of 1) select { - case notifyc <- struct{}{}: + case snapNotifyc <- struct{}{}: case <-r.stopped: return } @@ -303,7 +316,7 @@ func (r *raftNode) start(rh *raftReadyHandler) { r.transport.Send(msgs) } else { // leader already processed 'MsgSnap' and signaled - notifyc <- struct{}{} + snapNotifyc <- struct{}{} } r.Advance() @@ -314,6 +327,41 @@ func (r *raftNode) start(rh *raftReadyHandler) { }() } +// For a cluster with only one member, the raft always send identical +// unstable entries and committed entries to etcdserver, and etcd +// responds to the client once it finishes (actually partially) the +// applying workflow. +// +// When the client receives the response, it doesn't mean etcd has already +// successfully saved the data, including BoltDB and WAL, because: +// 1. etcd commits the boltDB transaction periodically instead of on each request; +// 2. etcd saves WAL entries in parallel with applying the committed entries. +// Accordingly, it might run into a situation of data loss when the etcd crashes +// immediately after responding to the client and before the boltDB and WAL +// successfully save the data to disk. +// Note that this issue can only happen for clusters with only one member. +// +// For clusters with multiple members, it isn't an issue, because etcd will +// not commit & apply the data before it being replicated to majority members. +// When the client receives the response, it means the data must have been applied. +// It further means the data must have been committed. +// Note: for clusters with multiple members, the raft will never send identical +// unstable entries and committed entries to etcdserver. +func shouldWaitWALSync(rd raft.Ready) bool { + if len(rd.CommittedEntries) == 0 || len(rd.Entries) == 0 { + return false + } + + // Previously `reflect.DeepEqual` is used to compare the `unstableEntries` + // and `committedEntries`, but it's a little expensive, so we just compare + // the log index. The applying workflow should wait for the WAL sync if + // there is any log index overlap between them. + lastCommittedEntry := rd.CommittedEntries[len(rd.CommittedEntries)-1] + firstUnstableEntry := rd.Entries[0] + return lastCommittedEntry.Term > firstUnstableEntry.Term || + (lastCommittedEntry.Term == firstUnstableEntry.Term && lastCommittedEntry.Index >= firstUnstableEntry.Index) +} + func updateCommittedIndex(ap *toApply, rh *raftReadyHandler) { var ci uint64 if len(ap.entries) != 0 { diff --git a/server/etcdserver/raft_test.go b/server/etcdserver/raft_test.go index 8e5585ad0c9e..7d26ff28db37 100644 --- a/server/etcdserver/raft_test.go +++ b/server/etcdserver/raft_test.go @@ -22,6 +22,7 @@ import ( "testing" "time" + "github.com/stretchr/testify/assert" "go.etcd.io/etcd/client/pkg/v3/types" "go.etcd.io/etcd/pkg/v3/pbutil" "go.etcd.io/etcd/raft/v3" @@ -190,6 +191,52 @@ func TestStopRaftWhenWaitingForApplyDone(t *testing.T) { // TestConfigChangeBlocksApply ensures toApply blocks if committed entries contain config-change. func TestConfigChangeBlocksApply(t *testing.T) { + testEtcdserverAndRaftInteraction(t, testConfigChangeBlockApply) +} + +func TestWALSyncNotBlockApply(t *testing.T) { + testEtcdserverAndRaftInteraction(t, testWALSyncNotBlockApply) +} + +func TestWALSyncBlockApply(t *testing.T) { + testEtcdserverAndRaftInteraction(t, testWALSyncBlockApply) +} + +func testConfigChangeBlockApply(t *testing.T, srv *EtcdServer, n *readyNode) toApply { + n.readyc <- raft.Ready{ + SoftState: &raft.SoftState{RaftState: raft.StateFollower}, + CommittedEntries: []raftpb.Entry{{Type: raftpb.EntryConfChange}}, + } + return <-srv.r.applyc +} + +func testWALSyncNotBlockApply(t *testing.T, srv *EtcdServer, n *readyNode) toApply { + n.readyc <- raft.Ready{ + SoftState: &raft.SoftState{RaftState: raft.StateFollower}, + CommittedEntries: []raftpb.Entry{{Type: raftpb.EntryConfChange}}, + } + ap := <-srv.r.applyc + if ap.walNotifyc != nil { + t.Error("unexpected ap.walNotifyc, expected nil") + } + return ap +} + +func testWALSyncBlockApply(t *testing.T, srv *EtcdServer, n *readyNode) toApply { + n.readyc <- raft.Ready{ + SoftState: &raft.SoftState{RaftState: raft.StateFollower}, + Entries: []raftpb.Entry{{Type: raftpb.EntryConfChange}}, + CommittedEntries: []raftpb.Entry{{Type: raftpb.EntryConfChange}}, + } + ap := <-srv.r.applyc + if ap.walNotifyc == nil { + t.Error("unexpected ap.walNotifyc, expected not nil") + } + assert.NotEqual(t, nil, ap.walNotifyc) + return ap +} + +func testEtcdserverAndRaftInteraction(t *testing.T, testFunc func(*testing.T, *EtcdServer, *readyNode) toApply) { n := newNopReadyNode() r := newRaftNode(raftNodeConfig{ @@ -208,11 +255,7 @@ func TestConfigChangeBlocksApply(t *testing.T) { }) defer srv.r.Stop() - n.readyc <- raft.Ready{ - SoftState: &raft.SoftState{RaftState: raft.StateFollower}, - CommittedEntries: []raftpb.Entry{{Type: raftpb.EntryConfChange}}, - } - ap := <-srv.r.applyc + ap := testFunc(t, srv, n) continueC := make(chan struct{}) go func() { @@ -228,7 +271,7 @@ func TestConfigChangeBlocksApply(t *testing.T) { } // finish toApply, unblock raft routine - <-ap.notifyc + <-ap.snapNotifyc select { case <-continueC: @@ -317,3 +360,79 @@ func TestStopRaftNodeMoreThanOnce(t *testing.T) { } } } + +func TestShouldWaitWALSync(t *testing.T) { + testcases := []struct { + name string + unstableEntries []raftpb.Entry + commitedEntries []raftpb.Entry + expectedResult bool + }{ + { + name: "both entries are nil", + unstableEntries: nil, + commitedEntries: nil, + expectedResult: false, + }, + { + name: "both entries are empty slices", + unstableEntries: []raftpb.Entry{}, + commitedEntries: []raftpb.Entry{}, + expectedResult: false, + }, + { + name: "one nil and the other empty", + unstableEntries: nil, + commitedEntries: []raftpb.Entry{}, + expectedResult: false, + }, + { + name: "one nil and the other has data", + unstableEntries: nil, + commitedEntries: []raftpb.Entry{{Term: 4, Index: 10, Type: raftpb.EntryNormal, Data: []byte{0x11, 0x22, 0x33}}}, + expectedResult: false, + }, + { + name: "one empty and the other has data", + unstableEntries: []raftpb.Entry{}, + commitedEntries: []raftpb.Entry{{Term: 4, Index: 10, Type: raftpb.EntryNormal, Data: []byte{0x11, 0x22, 0x33}}}, + expectedResult: false, + }, + { + name: "has different term and index", + unstableEntries: []raftpb.Entry{{Term: 5, Index: 11, Type: raftpb.EntryNormal, Data: []byte{0x11, 0x22, 0x33}}}, + commitedEntries: []raftpb.Entry{{Term: 4, Index: 10, Type: raftpb.EntryNormal, Data: []byte{0x11, 0x22, 0x33}}}, + expectedResult: false, + }, + { + name: "has identical data", + unstableEntries: []raftpb.Entry{{Term: 4, Index: 10, Type: raftpb.EntryNormal, Data: []byte{0x11, 0x22, 0x33}}}, + commitedEntries: []raftpb.Entry{{Term: 4, Index: 10, Type: raftpb.EntryNormal, Data: []byte{0x11, 0x22, 0x33}}}, + expectedResult: true, + }, + { + name: "has overlapped entry", + unstableEntries: []raftpb.Entry{ + {Term: 4, Index: 10, Type: raftpb.EntryNormal, Data: []byte{0x11, 0x22, 0x33}}, + {Term: 4, Index: 11, Type: raftpb.EntryNormal, Data: []byte{0x44, 0x55, 0x66}}, + {Term: 4, Index: 12, Type: raftpb.EntryNormal, Data: []byte{0x77, 0x88, 0x99}}, + }, + commitedEntries: []raftpb.Entry{ + {Term: 4, Index: 8, Type: raftpb.EntryNormal, Data: []byte{0x07, 0x08, 0x09}}, + {Term: 4, Index: 9, Type: raftpb.EntryNormal, Data: []byte{0x10, 0x11, 0x12}}, + {Term: 4, Index: 10, Type: raftpb.EntryNormal, Data: []byte{0x11, 0x22, 0x33}}, + }, + expectedResult: true, + }, + } + + for _, tc := range testcases { + t.Run(tc.name, func(t *testing.T) { + shouldWALSync := shouldWaitWALSync(raft.Ready{ + Entries: tc.unstableEntries, + CommittedEntries: tc.commitedEntries, + }) + assert.Equal(t, tc.expectedResult, shouldWALSync) + }) + } +} diff --git a/server/etcdserver/server.go b/server/etcdserver/server.go index 99a2159d9932..0706cbbf5f4a 100644 --- a/server/etcdserver/server.go +++ b/server/etcdserver/server.go @@ -927,7 +927,7 @@ func (s *EtcdServer) applyAll(ep *etcdProgress, apply *toApply) { // wait for the raft routine to finish the disk writes before triggering a // snapshot. or applied index might be greater than the last index in raft // storage, since the raft routine might be slower than toApply routine. - <-apply.notifyc + <-apply.snapNotifyc s.triggerSnapshot(ep) select { @@ -975,7 +975,7 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, toApply *toApply) { } // wait for raftNode to persist snapshot onto the disk - <-toApply.notifyc + <-toApply.snapNotifyc newbe, err := serverstorage.OpenSnapshotBackend(s.Cfg, s.snapshotter, toApply.snapshot, s.beHooks) if err != nil { @@ -1122,8 +1122,9 @@ func (s *EtcdServer) applyEntries(ep *etcdProgress, apply *toApply) { if len(ents) == 0 { return } + var shouldstop bool - if ep.appliedt, ep.appliedi, shouldstop = s.apply(ents, &ep.confState); shouldstop { + if ep.appliedt, ep.appliedi, shouldstop = s.apply(ents, &ep.confState, apply.walNotifyc); shouldstop { go s.stopWithDelay(10*100*time.Millisecond, fmt.Errorf("the member has been permanently removed from the cluster")) } } @@ -1792,8 +1793,10 @@ func (s *EtcdServer) sendMergedSnap(merged snap.Message) { func (s *EtcdServer) apply( es []raftpb.Entry, confState *raftpb.ConfState, + walNotifyc chan struct{}, ) (appliedt uint64, appliedi uint64, shouldStop bool) { s.lg.Debug("Applying entries", zap.Int("num-entries", len(es))) + walNotified := false for i := range es { e := es[i] s.lg.Debug("Applying entry", @@ -1802,7 +1805,16 @@ func (s *EtcdServer) apply( zap.Stringer("type", e.Type)) switch e.Type { case raftpb.EntryNormal: - s.applyEntryNormal(&e) + postApplyFunc := s.applyEntryNormal(&e) + if postApplyFunc != nil { + // wait for the WAL entries to be synced. Note we only need + // to wait for the notification once. + if walNotifyc != nil && !walNotified { + walNotified = true + <-walNotifyc + } + postApplyFunc() + } s.setAppliedIndex(e.Index) s.setTerm(e.Term) @@ -1823,6 +1835,12 @@ func (s *EtcdServer) apply( s.setAppliedIndex(e.Index) s.setTerm(e.Term) shouldStop = shouldStop || removedSelf + // wait for the WAL entries to be synced. Note we only need + // to wait for the notification once. + if walNotifyc != nil && !walNotified { + walNotified = true + <-walNotifyc + } s.w.Trigger(cc.ID, &confChangeResponse{s.cluster.Members(), err}) default: @@ -1838,7 +1856,7 @@ func (s *EtcdServer) apply( } // applyEntryNormal applies an EntryNormal type raftpb request to the EtcdServer -func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) { +func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) func() { shouldApplyV3 := membership.ApplyV2storeOnly applyV3Performed := false var ar *apply.Result @@ -1870,7 +1888,8 @@ func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) { if s.isLeader() { s.lessor.Promote(s.Cfg.ElectionTimeout()) } - return + + return nil } var raftReq pb.InternalRaftRequest @@ -1879,15 +1898,17 @@ func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) { rp := &r pbutil.MustUnmarshal(rp, e.Data) s.lg.Debug("applyEntryNormal", zap.Stringer("V2request", rp)) - s.w.Trigger(r.ID, s.applyV2Request((*RequestV2)(rp), shouldApplyV3)) - return + return func() { + s.w.Trigger(r.ID, s.applyV2Request((*RequestV2)(rp), shouldApplyV3)) + } } s.lg.Debug("applyEntryNormal", zap.Stringer("raftReq", &raftReq)) if raftReq.V2 != nil { req := (*RequestV2)(raftReq.V2) - s.w.Trigger(req.ID, s.applyV2Request(req, shouldApplyV3)) - return + return func() { + s.w.Trigger(req.ID, s.applyV2Request(req, shouldApplyV3)) + } } id := raftReq.ID @@ -1909,16 +1930,17 @@ func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) { // do not re-toApply applied entries. if !shouldApplyV3 { - return + return nil } if ar == nil { - return + return nil } if ar.Err != errors.ErrNoSpace || len(s.alarmStore.Get(pb.AlarmType_NOSPACE)) > 0 { - s.w.Trigger(id, ar) - return + return func() { + s.w.Trigger(id, ar) + } } lg := s.Logger() @@ -1938,6 +1960,8 @@ func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) { s.raftRequest(s.ctx, pb.InternalRaftRequest{Alarm: a}) s.w.Trigger(id, ar) }) + + return nil } func noSideEffect(r *pb.InternalRaftRequest) bool { diff --git a/server/etcdserver/server_test.go b/server/etcdserver/server_test.go index f60f73f4d362..8f1cdbca882e 100644 --- a/server/etcdserver/server_test.go +++ b/server/etcdserver/server_test.go @@ -688,7 +688,7 @@ func TestApplyConfigChangeUpdatesConsistIndex(t *testing.T) { Data: pbutil.MustMarshal(cc), }} - _, appliedi, _ := srv.apply(ents, &raftpb.ConfState{}) + _, appliedi, _ := srv.apply(ents, &raftpb.ConfState{}, nil) consistIndex := srv.consistIndex.ConsistentIndex() assert.Equal(t, uint64(2), appliedi) @@ -762,7 +762,7 @@ func TestApplyMultiConfChangeShouldStop(t *testing.T) { ents = append(ents, ent) } - _, _, shouldStop := srv.apply(ents, &raftpb.ConfState{}) + _, _, shouldStop := srv.apply(ents, &raftpb.ConfState{}, nil) if !shouldStop { t.Errorf("shouldStop = %t, want %t", shouldStop, true) }