From c955ec14aab39a373984e1d34a65e03f4dc67d82 Mon Sep 17 00:00:00 2001 From: Manish R Jain Date: Fri, 14 Sep 2018 17:19:14 -0700 Subject: [PATCH] Set the Applied index in Raft directly Set the Applied index in Raft directly, so it does not pick up an index older than the snapshot. Ensure that it is in sync with the Applied watermark. This fixes #2581. --- conn/node.go | 13 +++++++++++++ dgraph/cmd/zero/raft.go | 3 +-- worker/draft.go | 3 +-- x/watermark.go | 2 +- 4 files changed, 16 insertions(+), 5 deletions(-) diff --git a/conn/node.go b/conn/node.go index 6e44c2941eb..01a4b8eb006 100644 --- a/conn/node.go +++ b/conn/node.go @@ -63,6 +63,9 @@ type Node struct { } func NewNode(rc *intern.RaftContext, store *raftwal.DiskStorage) *Node { + first, err := store.FirstIndex() + x.Check(err) + n := &Node{ Id: rc.Id, MyAddr: rc.Addr, @@ -92,6 +95,14 @@ func NewNode(rc *intern.RaftContext, store *raftwal.DiskStorage) *Node { // increasing the term, if the node has a good chance of becoming // the leader. PreVote: true, + + // We can explicitly set Applied to the first index in the Raft log, + // so it does not derive it separately, thus avoiding a crash when + // the Applied is set to below snapshot index by Raft. + // In case this is a new Raft log, first would be 1, and therefore + // Applied would be zero, hence meeting the condition by the library + // that Applied should only be set during a restart. + Applied: first - 1, }, // processConfChange etc are not throttled so some extra delta, so that we don't // block tick when applyCh is full @@ -104,6 +115,8 @@ func NewNode(rc *intern.RaftContext, store *raftwal.DiskStorage) *Node { requestCh: make(chan linReadReq), } n.Applied.Init() + // This should match up to the Applied index set above. + n.Applied.SetDoneUntil(n.Cfg.Applied) return n } diff --git a/dgraph/cmd/zero/raft.go b/dgraph/cmd/zero/raft.go index fff331f96ab..99cfe17083d 100644 --- a/dgraph/cmd/zero/raft.go +++ b/dgraph/cmd/zero/raft.go @@ -355,8 +355,7 @@ func (n *node) triggerLeaderChange() { } func (n *node) initAndStartNode() error { - idx, restart, err := n.PastLife() - n.Applied.SetDoneUntil(idx) + _, restart, err := n.PastLife() x.Check(err) if restart { diff --git a/worker/draft.go b/worker/draft.go index bde38860415..e2c51a7dce2 100644 --- a/worker/draft.go +++ b/worker/draft.go @@ -931,9 +931,8 @@ func (n *node) retryUntilSuccess(fn func() error, pause time.Duration) { // InitAndStartNode gets called after having at least one membership sync with the cluster. func (n *node) InitAndStartNode() { - idx, restart, err := n.PastLife() + _, restart, err := n.PastLife() x.Check(err) - n.Applied.SetDoneUntil(idx) if _, hasPeer := groups().MyPeer(); !restart && hasPeer { // The node has other peers, it might have crashed after joining the cluster and before diff --git a/x/watermark.go b/x/watermark.go index bb8d1597b7e..5d4e0be8826 100644 --- a/x/watermark.go +++ b/x/watermark.go @@ -56,7 +56,7 @@ type WaterMark struct { // Init initializes a WaterMark struct. MUST be called before using it. func (w *WaterMark) Init() { - w.markCh = make(chan mark, 10000) + w.markCh = make(chan mark, 100) w.elog = trace.NewEventLog("Watermark", w.Name) go w.process() }