diff --git a/conn/node.go b/conn/node.go index 6e44c2941eb..01a4b8eb006 100644 --- a/conn/node.go +++ b/conn/node.go @@ -63,6 +63,9 @@ type Node struct { } func NewNode(rc *intern.RaftContext, store *raftwal.DiskStorage) *Node { + first, err := store.FirstIndex() + x.Check(err) + n := &Node{ Id: rc.Id, MyAddr: rc.Addr, @@ -92,6 +95,14 @@ func NewNode(rc *intern.RaftContext, store *raftwal.DiskStorage) *Node { // increasing the term, if the node has a good chance of becoming // the leader. PreVote: true, + + // We can explicitly set Applied to the first index in the Raft log, + // so it does not derive it separately, thus avoiding a crash when + // the Applied is set to below snapshot index by Raft. + // In case this is a new Raft log, first would be 1, and therefore + // Applied would be zero, hence meeting the condition by the library + // that Applied should only be set during a restart. + Applied: first - 1, }, // processConfChange etc are not throttled so some extra delta, so that we don't // block tick when applyCh is full @@ -104,6 +115,8 @@ func NewNode(rc *intern.RaftContext, store *raftwal.DiskStorage) *Node { requestCh: make(chan linReadReq), } n.Applied.Init() + // This should match up to the Applied index set above. + n.Applied.SetDoneUntil(n.Cfg.Applied) return n } diff --git a/dgraph/cmd/zero/raft.go b/dgraph/cmd/zero/raft.go index fff331f96ab..99cfe17083d 100644 --- a/dgraph/cmd/zero/raft.go +++ b/dgraph/cmd/zero/raft.go @@ -355,8 +355,7 @@ func (n *node) triggerLeaderChange() { } func (n *node) initAndStartNode() error { - idx, restart, err := n.PastLife() - n.Applied.SetDoneUntil(idx) + _, restart, err := n.PastLife() x.Check(err) if restart { diff --git a/worker/draft.go b/worker/draft.go index bde38860415..e2c51a7dce2 100644 --- a/worker/draft.go +++ b/worker/draft.go @@ -931,9 +931,8 @@ func (n *node) retryUntilSuccess(fn func() error, pause time.Duration) { // InitAndStartNode gets called after having at least one membership sync with the cluster. func (n *node) InitAndStartNode() { - idx, restart, err := n.PastLife() + _, restart, err := n.PastLife() x.Check(err) - n.Applied.SetDoneUntil(idx) if _, hasPeer := groups().MyPeer(); !restart && hasPeer { // The node has other peers, it might have crashed after joining the cluster and before diff --git a/x/watermark.go b/x/watermark.go index bb8d1597b7e..5d4e0be8826 100644 --- a/x/watermark.go +++ b/x/watermark.go @@ -56,7 +56,7 @@ type WaterMark struct { // Init initializes a WaterMark struct. MUST be called before using it. func (w *WaterMark) Init() { - w.markCh = make(chan mark, 10000) + w.markCh = make(chan mark, 100) w.elog = trace.NewEventLog("Watermark", w.Name) go w.process() }