Skip to content

Commit

Permalink
Set the Applied index in Raft directly
Browse files Browse the repository at this point in the history
Set the Applied index in Raft directly, so it does not pick up an index older than the snapshot. Ensure that it is in sync with the Applied watermark.

This fixes #2581.
  • Loading branch information
manishrjain authored Sep 15, 2018
1 parent 66d9f94 commit c955ec1
Show file tree
Hide file tree
Showing 4 changed files with 16 additions and 5 deletions.
13 changes: 13 additions & 0 deletions conn/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@ type Node struct {
}

func NewNode(rc *intern.RaftContext, store *raftwal.DiskStorage) *Node {
first, err := store.FirstIndex()
x.Check(err)

n := &Node{
Id: rc.Id,
MyAddr: rc.Addr,
Expand Down Expand Up @@ -92,6 +95,14 @@ func NewNode(rc *intern.RaftContext, store *raftwal.DiskStorage) *Node {
// increasing the term, if the node has a good chance of becoming
// the leader.
PreVote: true,

// We can explicitly set Applied to the first index in the Raft log,
// so it does not derive it separately, thus avoiding a crash when
// the Applied is set to below snapshot index by Raft.
// In case this is a new Raft log, first would be 1, and therefore
// Applied would be zero, hence meeting the condition by the library
// that Applied should only be set during a restart.
Applied: first - 1,
},
// processConfChange etc are not throttled so some extra delta, so that we don't
// block tick when applyCh is full
Expand All @@ -104,6 +115,8 @@ func NewNode(rc *intern.RaftContext, store *raftwal.DiskStorage) *Node {
requestCh: make(chan linReadReq),
}
n.Applied.Init()
// This should match up to the Applied index set above.
n.Applied.SetDoneUntil(n.Cfg.Applied)
return n
}

Expand Down
3 changes: 1 addition & 2 deletions dgraph/cmd/zero/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -355,8 +355,7 @@ func (n *node) triggerLeaderChange() {
}

func (n *node) initAndStartNode() error {
idx, restart, err := n.PastLife()
n.Applied.SetDoneUntil(idx)
_, restart, err := n.PastLife()
x.Check(err)

if restart {
Expand Down
3 changes: 1 addition & 2 deletions worker/draft.go
Original file line number Diff line number Diff line change
Expand Up @@ -931,9 +931,8 @@ func (n *node) retryUntilSuccess(fn func() error, pause time.Duration) {

// InitAndStartNode gets called after having at least one membership sync with the cluster.
func (n *node) InitAndStartNode() {
idx, restart, err := n.PastLife()
_, restart, err := n.PastLife()
x.Check(err)
n.Applied.SetDoneUntil(idx)

if _, hasPeer := groups().MyPeer(); !restart && hasPeer {
// The node has other peers, it might have crashed after joining the cluster and before
Expand Down
2 changes: 1 addition & 1 deletion x/watermark.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ type WaterMark struct {

// Init initializes a WaterMark struct. MUST be called before using it.
func (w *WaterMark) Init() {
w.markCh = make(chan mark, 10000)
w.markCh = make(chan mark, 100)
w.elog = trace.NewEventLog("Watermark", w.Name)
go w.process()
}
Expand Down

0 comments on commit c955ec1

Please sign in to comment.