Skip to content

Commit

Permalink
fix the potential data loss for clusters with only one member
Browse files Browse the repository at this point in the history
For a cluster with only one member, the raft always send identical
unstable entries and committed entries to etcdserver, and etcd
responds to the client once it finishes (actually partially) the
applying workflow.

When the client receives the response, it doesn't mean etcd has already
successfully saved the data, including BoltDB and WAL, because:
   1. etcd commits the boltDB transaction periodically instead of on each request;
   2. etcd saves WAL entries in parallel with applying the committed entries.
Accordingly, it may run into a situation of data loss when the etcd crashes
immediately after responding to the client and before the boltDB and WAL
successfully save the data to disk.
Note that this issue can only happen for clusters with only one member.

For clusters with multiple members, it isn't an issue, because etcd will
not commit & apply the data before it being replicated to majority members.
When the client receives the response, it means the data must have been applied.
It further means the data must have been committed.
Note: for clusters with multiple members, the raft will never send identical
unstable entries and committed entries to etcdserver.

Signed-off-by: Benjamin Wang <[email protected]>
  • Loading branch information
ahrtr committed Aug 28, 2022
1 parent a3f1564 commit 2b2bb3e
Show file tree
Hide file tree
Showing 4 changed files with 223 additions and 32 deletions.
68 changes: 58 additions & 10 deletions server/etcdserver/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,10 @@ func init() {
type toApply struct {
entries []raftpb.Entry
snapshot raftpb.Snapshot
// notifyc synchronizes etcd server applies with the raft node
notifyc chan struct{}
// snapNotifyc synchronizes etcd server applies with the raft node
snapNotifyc chan struct{}
// walNotifyc synchronizes etcd server applies with the WAL persistence
walNotifyc chan struct{}
}

type raftNode struct {
Expand Down Expand Up @@ -200,11 +202,16 @@ func (r *raftNode) start(rh *raftReadyHandler) {
}
}

notifyc := make(chan struct{}, 1)
snapNotifyc := make(chan struct{}, 1)
var walNotifyc chan struct{}
if shouldWaitWALSync(rd) {
walNotifyc = make(chan struct{}, 1)
}
ap := toApply{
entries: rd.CommittedEntries,
snapshot: rd.Snapshot,
notifyc: notifyc,
entries: rd.CommittedEntries,
snapshot: rd.Snapshot,
snapNotifyc: snapNotifyc,
walNotifyc: walNotifyc,
}

updateCommittedIndex(&ap, rh)
Expand Down Expand Up @@ -237,6 +244,12 @@ func (r *raftNode) start(rh *raftReadyHandler) {
if err := r.storage.Save(rd.HardState, rd.Entries); err != nil {
r.lg.Fatal("failed to save Raft hard state and entries", zap.Error(err))
}

if walNotifyc != nil {
// etcdserver should wait for this notification before responding to client.
walNotifyc <- struct{}{}
}

if !raft.IsEmptyHardState(rd.HardState) {
proposalsCommitted.Set(float64(rd.HardState.Commit))
}
Expand All @@ -252,7 +265,7 @@ func (r *raftNode) start(rh *raftReadyHandler) {
}

// etcdserver now claim the snapshot has been persisted onto the disk
notifyc <- struct{}{}
snapNotifyc <- struct{}{}

// gofail: var raftBeforeApplySnap struct{}
r.raftStorage.ApplySnapshot(rd.Snapshot)
Expand All @@ -272,7 +285,7 @@ func (r *raftNode) start(rh *raftReadyHandler) {
msgs := r.processMessages(rd.Messages)

// now unblocks 'applyAll' that waits on Raft log disk writes before triggering snapshots
notifyc <- struct{}{}
snapNotifyc <- struct{}{}

// Candidate or follower needs to wait for all pending configuration
// changes to be applied before sending messages.
Expand All @@ -293,7 +306,7 @@ func (r *raftNode) start(rh *raftReadyHandler) {
// to be in sync with scheduled config-change job
// (assume notifyc has cap of 1)
select {
case notifyc <- struct{}{}:
case snapNotifyc <- struct{}{}:
case <-r.stopped:
return
}
Expand All @@ -303,7 +316,7 @@ func (r *raftNode) start(rh *raftReadyHandler) {
r.transport.Send(msgs)
} else {
// leader already processed 'MsgSnap' and signaled
notifyc <- struct{}{}
snapNotifyc <- struct{}{}
}

r.Advance()
Expand All @@ -314,6 +327,41 @@ func (r *raftNode) start(rh *raftReadyHandler) {
}()
}

// For a cluster with only one member, the raft always send identical
// unstable entries and committed entries to etcdserver, and etcd
// responds to the client once it finishes (actually partially) the
// applying workflow.
//
// When the client receives the response, it doesn't mean etcd has already
// successfully saved the data, including BoltDB and WAL, because:
// 1. etcd commits the boltDB transaction periodically instead of on each request;
// 2. etcd saves WAL entries in parallel with applying the committed entries.
// Accordingly, it might run into a situation of data loss when the etcd crashes
// immediately after responding to the client and before the boltDB and WAL
// successfully save the data to disk.
// Note that this issue can only happen for clusters with only one member.
//
// For clusters with multiple members, it isn't an issue, because etcd will
// not commit & apply the data before it being replicated to majority members.
// When the client receives the response, it means the data must have been applied.
// It further means the data must have been committed.
// Note: for clusters with multiple members, the raft will never send identical
// unstable entries and committed entries to etcdserver.
func shouldWaitWALSync(rd raft.Ready) bool {
if len(rd.CommittedEntries) == 0 || len(rd.Entries) == 0 {
return false
}

// Previously `reflect.DeepEqual` is used to compare the `unstableEntries`
// and `committedEntries`, but it's a little expensive, so we just compare
// the log index. The applying workflow should wait for the WAL sync if
// there is any log index overlap between them.
lastCommittedEntry := rd.CommittedEntries[len(rd.CommittedEntries)-1]
firstUnstableEntry := rd.Entries[0]
return lastCommittedEntry.Term > firstUnstableEntry.Term ||
(lastCommittedEntry.Term == firstUnstableEntry.Term && lastCommittedEntry.Index >= firstUnstableEntry.Index)
}

func updateCommittedIndex(ap *toApply, rh *raftReadyHandler) {
var ci uint64
if len(ap.entries) != 0 {
Expand Down
131 changes: 125 additions & 6 deletions server/etcdserver/raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"testing"
"time"

"github.com/stretchr/testify/assert"
"go.etcd.io/etcd/client/pkg/v3/types"
"go.etcd.io/etcd/pkg/v3/pbutil"
"go.etcd.io/etcd/raft/v3"
Expand Down Expand Up @@ -190,6 +191,52 @@ func TestStopRaftWhenWaitingForApplyDone(t *testing.T) {

// TestConfigChangeBlocksApply ensures toApply blocks if committed entries contain config-change.
func TestConfigChangeBlocksApply(t *testing.T) {
testEtcdserverAndRaftInteraction(t, testConfigChangeBlockApply)
}

func TestWALSyncNotBlockApply(t *testing.T) {
testEtcdserverAndRaftInteraction(t, testWALSyncNotBlockApply)
}

func TestWALSyncBlockApply(t *testing.T) {
testEtcdserverAndRaftInteraction(t, testWALSyncBlockApply)
}

func testConfigChangeBlockApply(t *testing.T, srv *EtcdServer, n *readyNode) toApply {
n.readyc <- raft.Ready{
SoftState: &raft.SoftState{RaftState: raft.StateFollower},
CommittedEntries: []raftpb.Entry{{Type: raftpb.EntryConfChange}},
}
return <-srv.r.applyc
}

func testWALSyncNotBlockApply(t *testing.T, srv *EtcdServer, n *readyNode) toApply {
n.readyc <- raft.Ready{
SoftState: &raft.SoftState{RaftState: raft.StateFollower},
CommittedEntries: []raftpb.Entry{{Type: raftpb.EntryConfChange}},
}
ap := <-srv.r.applyc
if ap.walNotifyc != nil {
t.Error("unexpected ap.walNotifyc, expected nil")
}
return ap
}

func testWALSyncBlockApply(t *testing.T, srv *EtcdServer, n *readyNode) toApply {
n.readyc <- raft.Ready{
SoftState: &raft.SoftState{RaftState: raft.StateFollower},
Entries: []raftpb.Entry{{Type: raftpb.EntryConfChange}},
CommittedEntries: []raftpb.Entry{{Type: raftpb.EntryConfChange}},
}
ap := <-srv.r.applyc
if ap.walNotifyc == nil {
t.Error("unexpected ap.walNotifyc, expected not nil")
}
assert.NotEqual(t, nil, ap.walNotifyc)
return ap
}

func testEtcdserverAndRaftInteraction(t *testing.T, testFunc func(*testing.T, *EtcdServer, *readyNode) toApply) {
n := newNopReadyNode()

r := newRaftNode(raftNodeConfig{
Expand All @@ -208,11 +255,7 @@ func TestConfigChangeBlocksApply(t *testing.T) {
})
defer srv.r.Stop()

n.readyc <- raft.Ready{
SoftState: &raft.SoftState{RaftState: raft.StateFollower},
CommittedEntries: []raftpb.Entry{{Type: raftpb.EntryConfChange}},
}
ap := <-srv.r.applyc
ap := testFunc(t, srv, n)

continueC := make(chan struct{})
go func() {
Expand All @@ -228,7 +271,7 @@ func TestConfigChangeBlocksApply(t *testing.T) {
}

// finish toApply, unblock raft routine
<-ap.notifyc
<-ap.snapNotifyc

select {
case <-continueC:
Expand Down Expand Up @@ -317,3 +360,79 @@ func TestStopRaftNodeMoreThanOnce(t *testing.T) {
}
}
}

func TestShouldWaitWALSync(t *testing.T) {
testcases := []struct {
name string
unstableEntries []raftpb.Entry
commitedEntries []raftpb.Entry
expectedResult bool
}{
{
name: "both entries are nil",
unstableEntries: nil,
commitedEntries: nil,
expectedResult: false,
},
{
name: "both entries are empty slices",
unstableEntries: []raftpb.Entry{},
commitedEntries: []raftpb.Entry{},
expectedResult: false,
},
{
name: "one nil and the other empty",
unstableEntries: nil,
commitedEntries: []raftpb.Entry{},
expectedResult: false,
},
{
name: "one nil and the other has data",
unstableEntries: nil,
commitedEntries: []raftpb.Entry{{Term: 4, Index: 10, Type: raftpb.EntryNormal, Data: []byte{0x11, 0x22, 0x33}}},
expectedResult: false,
},
{
name: "one empty and the other has data",
unstableEntries: []raftpb.Entry{},
commitedEntries: []raftpb.Entry{{Term: 4, Index: 10, Type: raftpb.EntryNormal, Data: []byte{0x11, 0x22, 0x33}}},
expectedResult: false,
},
{
name: "has different term and index",
unstableEntries: []raftpb.Entry{{Term: 5, Index: 11, Type: raftpb.EntryNormal, Data: []byte{0x11, 0x22, 0x33}}},
commitedEntries: []raftpb.Entry{{Term: 4, Index: 10, Type: raftpb.EntryNormal, Data: []byte{0x11, 0x22, 0x33}}},
expectedResult: false,
},
{
name: "has identical data",
unstableEntries: []raftpb.Entry{{Term: 4, Index: 10, Type: raftpb.EntryNormal, Data: []byte{0x11, 0x22, 0x33}}},
commitedEntries: []raftpb.Entry{{Term: 4, Index: 10, Type: raftpb.EntryNormal, Data: []byte{0x11, 0x22, 0x33}}},
expectedResult: true,
},
{
name: "has overlapped entry",
unstableEntries: []raftpb.Entry{
{Term: 4, Index: 10, Type: raftpb.EntryNormal, Data: []byte{0x11, 0x22, 0x33}},
{Term: 4, Index: 11, Type: raftpb.EntryNormal, Data: []byte{0x44, 0x55, 0x66}},
{Term: 4, Index: 12, Type: raftpb.EntryNormal, Data: []byte{0x77, 0x88, 0x99}},
},
commitedEntries: []raftpb.Entry{
{Term: 4, Index: 8, Type: raftpb.EntryNormal, Data: []byte{0x07, 0x08, 0x09}},
{Term: 4, Index: 9, Type: raftpb.EntryNormal, Data: []byte{0x10, 0x11, 0x12}},
{Term: 4, Index: 10, Type: raftpb.EntryNormal, Data: []byte{0x11, 0x22, 0x33}},
},
expectedResult: true,
},
}

for _, tc := range testcases {
t.Run(tc.name, func(t *testing.T) {
shouldWALSync := shouldWaitWALSync(raft.Ready{
Entries: tc.unstableEntries,
CommittedEntries: tc.commitedEntries,
})
assert.Equal(t, tc.expectedResult, shouldWALSync)
})
}
}
Loading

0 comments on commit 2b2bb3e

Please sign in to comment.