Skip to content

Commit

Permalink
Merge pull request #59 from youzan/feature-compact-ttl
Browse files Browse the repository at this point in the history
merge fix from etcd
  • Loading branch information
absolute8511 authored Feb 17, 2020
2 parents b62ee80 + c31e0b8 commit 3e931d0
Show file tree
Hide file tree
Showing 5 changed files with 177 additions and 7 deletions.
14 changes: 12 additions & 2 deletions node/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -875,6 +875,9 @@ func (rc *raftNode) serveChannels() {
busy := rc.IsBusySnapshot()
if !busy {
// note: if the lastIndex and FirstIndex is slow, we should avoid call it in every step
// and this may cause the raft log send some overflowed messages because the raft logs will
// send as much as MaxInflights*MaxSizePerMsg in pipeline (may increase the network bandwidth), so if we replaced the memory raft
// storage we can remove this to allow receiving all logs from leader.
last, err := rc.raftStorage.LastIndex()
if err == nil {
fi, _ := rc.raftStorage.FirstIndex()
Expand Down Expand Up @@ -1311,14 +1314,21 @@ func (rc *raftNode) purgeFile(done chan struct{}, stopC chan struct{}) {
keepBackup = 10
}
var serrc, werrc <-chan error
serrc = fileutil.PurgeFile(rc.config.SnapDir, "snap", uint(keepBackup), time.Minute*10, rc.stopc)
werrc = fileutil.PurgeFile(rc.config.WALDir, "wal", uint(keep), time.Minute*10, rc.stopc)
var sdonec, wdonec <-chan struct{}
sdonec, serrc = fileutil.PurgeFileWithDoneNotify(rc.config.SnapDir, "snap", uint(keepBackup), time.Minute*10, stopC)
wdonec, werrc = fileutil.PurgeFileWithDoneNotify(rc.config.WALDir, "wal", uint(keep), time.Minute*10, stopC)
select {
case e := <-werrc:
rc.Infof("failed to purge wal file %v", e)
case e := <-serrc:
rc.Infof("failed to purge snap file %v", e)
case <-stopC:
if sdonec != nil {
<-sdonec
}
if wdonec != nil {
<-wdonec
}
return
}
}
Expand Down
13 changes: 11 additions & 2 deletions pkg/fileutil/purge.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,22 @@ import (
)

func PurgeFile(dirname string, suffix string, max uint, interval time.Duration, stop <-chan struct{}) <-chan error {
return purgeFile(dirname, suffix, max, interval, stop, nil)
return purgeFile(dirname, suffix, max, interval, stop, nil, nil)
}

func PurgeFileWithDoneNotify(dirname string, suffix string, max uint, interval time.Duration, stop <-chan struct{}) (<-chan struct{}, <-chan error) {
doneC := make(chan struct{})
errC := purgeFile(dirname, suffix, max, interval, stop, nil, doneC)
return doneC, errC
}

// purgeFile is the internal implementation for PurgeFile which can post purged files to purgec if non-nil.
func purgeFile(dirname string, suffix string, max uint, interval time.Duration, stop <-chan struct{}, purgec chan<- string) <-chan error {
func purgeFile(dirname string, suffix string, max uint, interval time.Duration, stop <-chan struct{}, purgec chan<- string, donec chan<- struct{}) <-chan error {
errC := make(chan error, 1)
go func() {
if donec != nil {
defer close(donec)
}
for {
fnames, err := ReadDir(dirname)
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions pkg/fileutil/purge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func TestPurgeFile(t *testing.T) {
stop, purgec := make(chan struct{}), make(chan string, 10)

// keep 3 most recent files
errch := purgeFile(dir, "test", 3, time.Millisecond, stop, purgec)
errch := purgeFile(dir, "test", 3, time.Millisecond, stop, purgec, nil)
select {
case f := <-purgec:
t.Errorf("unexpected purge on %q", f)
Expand Down Expand Up @@ -114,7 +114,7 @@ func TestPurgeFileHoldingLockFile(t *testing.T) {
}

stop, purgec := make(chan struct{}), make(chan string, 10)
errch := purgeFile(dir, "test", 3, time.Millisecond, stop, purgec)
errch := purgeFile(dir, "test", 3, time.Millisecond, stop, purgec, nil)

for i := 0; i < 5; i++ {
select {
Expand Down
9 changes: 8 additions & 1 deletion raft/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -896,7 +896,7 @@ func (r *raft) Step(m pb.Message) error {
}

case m.Term < r.Term:
if r.checkQuorum && (m.Type == pb.MsgHeartbeat || m.Type == pb.MsgApp) {
if (r.checkQuorum || r.preVote) && (m.Type == pb.MsgHeartbeat || m.Type == pb.MsgApp) {
// We have received messages from a leader at a lower term. It is possible
// that these messages were simply delayed in the network, but this could
// also mean that this node has advanced its term number during a network
Expand All @@ -911,6 +911,13 @@ func (r *raft) Step(m pb.Message) error {
// but it will not receive MsgApp or MsgHeartbeat, so it will not create
// disruptive term increases
r.send(pb.Message{To: m.From, ToGroup: m.FromGroup, Type: pb.MsgAppResp})
} else if m.Type == pb.MsgPreVote {
// Before Pre-Vote enable, there may have candidate with higher term,
// but less log. After update to Pre-Vote, the cluster may deadlock if
// we drop messages with a lower term.
r.logger.Infof("%x [logterm: %d, index: %d, vote: %x] rejected %s from %x [logterm: %d, index: %d] at term %d",
r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.Type, m.From, m.LogTerm, m.Index, r.Term)
r.send(pb.Message{To: m.From, Term: r.Term, Type: pb.MsgPreVoteResp, Reject: true})
} else {
// ignore other cases
r.logger.Infof("%x(%v) [term: %d] ignored a %s message with lower term from %x [term: %d]",
Expand Down
144 changes: 144 additions & 0 deletions raft/raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4021,6 +4021,150 @@ func TestNodeWithSmallerTermCanCompleteElection(t *testing.T) {
}
}

// simulate rolling update a cluster for Pre-Vote. cluster has 3 nodes [n1, n2, n3].
// n1 is leader with term 2
// n2 is follower with term 2
// n3 is partitioned, with term 4 and less log, state is candidate
func newPreVoteMigrationCluster(t *testing.T) *network {
n1 := newTestRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage())
n2 := newTestRaft(2, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage())
n3 := newTestRaft(3, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage())

n1.becomeFollower(1, None)
n2.becomeFollower(1, None)
n3.becomeFollower(1, None)

n1.preVote = true
n2.preVote = true
// We intentionally do not enable PreVote for n3, this is done so in order
// to simulate a rolling restart process where it's possible to have a mixed
// version cluster with replicas with PreVote enabled, and replicas without.

nt := newNetwork(n1, n2, n3)
nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})

// Cause a network partition to isolate n3.
nt.isolate(3)
nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("some data")}}})
nt.send(pb.Message{From: 3, To: 3, Type: pb.MsgHup})
nt.send(pb.Message{From: 3, To: 3, Type: pb.MsgHup})

// check state
// n1.state == StateLeader
// n2.state == StateFollower
// n3.state == StateCandidate
if n1.state != StateLeader {
t.Fatalf("node 1 state: %s, want %s", n1.state, StateLeader)
}
if n2.state != StateFollower {
t.Fatalf("node 2 state: %s, want %s", n2.state, StateFollower)
}
if n3.state != StateCandidate {
t.Fatalf("node 3 state: %s, want %s", n3.state, StateCandidate)
}

// check term
// n1.Term == 2
// n2.Term == 2
// n3.Term == 4
if n1.Term != 2 {
t.Fatalf("node 1 term: %d, want %d", n1.Term, 2)
}
if n2.Term != 2 {
t.Fatalf("node 2 term: %d, want %d", n2.Term, 2)
}
if n3.Term != 4 {
t.Fatalf("node 3 term: %d, want %d", n3.Term, 4)
}

// Enable prevote on n3, then recover the network
n3.preVote = true
nt.recover()

return nt
}

func TestPreVoteMigrationCanCompleteElection(t *testing.T) {
nt := newPreVoteMigrationCluster(t)

// n1 is leader with term 2
// n2 is follower with term 2
// n3 is pre-candidate with term 4, and less log
n2 := nt.peers[2].(*raft)
n3 := nt.peers[3].(*raft)

// simulate leader down
nt.isolate(1)

// Call for elections from both n2 and n3.
nt.send(pb.Message{From: 3, To: 3, Type: pb.MsgHup})
nt.send(pb.Message{From: 2, To: 2, Type: pb.MsgHup})

// check state
// n2.state == Follower
// n3.state == PreCandidate
if n2.state != StateFollower {
t.Errorf("node 2 state: %s, want %s", n2.state, StateFollower)
}
if n3.state != StatePreCandidate {
t.Errorf("node 3 state: %s, want %s", n3.state, StatePreCandidate)
}

nt.send(pb.Message{From: 3, To: 3, Type: pb.MsgHup})
nt.send(pb.Message{From: 2, To: 2, Type: pb.MsgHup})

// Do we have a leader?
if n2.state != StateLeader && n3.state != StateFollower {
t.Errorf("no leader")
}
}

func TestPreVoteMigrationWithFreeStuckPreCandidate(t *testing.T) {
nt := newPreVoteMigrationCluster(t)

// n1 is leader with term 2
// n2 is follower with term 2
// n3 is pre-candidate with term 4, and less log
n1 := nt.peers[1].(*raft)
n2 := nt.peers[2].(*raft)
n3 := nt.peers[3].(*raft)

nt.send(pb.Message{From: 3, To: 3, Type: pb.MsgHup})

if n1.state != StateLeader {
t.Errorf("node 1 state: %s, want %s", n1.state, StateLeader)
}
if n2.state != StateFollower {
t.Errorf("node 2 state: %s, want %s", n2.state, StateFollower)
}
if n3.state != StatePreCandidate {
t.Errorf("node 3 state: %s, want %s", n3.state, StatePreCandidate)
}

// Pre-Vote again for safety
nt.send(pb.Message{From: 3, To: 3, Type: pb.MsgHup})

if n1.state != StateLeader {
t.Errorf("node 1 state: %s, want %s", n1.state, StateLeader)
}
if n2.state != StateFollower {
t.Errorf("node 2 state: %s, want %s", n2.state, StateFollower)
}
if n3.state != StatePreCandidate {
t.Errorf("node 3 state: %s, want %s", n3.state, StatePreCandidate)
}

nt.send(pb.Message{From: 1, To: 3, Type: pb.MsgHeartbeat, Term: n1.Term})

// Disrupt the leader so that the stuck peer is freed
if n1.state != StateFollower {
t.Errorf("state = %s, want %s", n1.state, StateFollower)
}
if n3.Term != n1.Term {
t.Errorf("term = %d, want %d", n3.Term, n1.Term)
}
}

func entsWithConfig(configFunc func(*Config), terms ...uint64) *raft {
storage := NewMemoryStorage()
for i, term := range terms {
Expand Down

0 comments on commit 3e931d0

Please sign in to comment.