diff --git a/conn/node.go b/conn/node.go index a33b9236d81..2d286fefc2d 100644 --- a/conn/node.go +++ b/conn/node.go @@ -72,6 +72,9 @@ type Node struct { // The stages are proposed -> committed (accepted by cluster) -> // applied (to PL) -> synced (to BadgerDB). Applied y.WaterMark + + heartbeatsOut int64 + heartbeatsIn int64 } type ToGlog struct { @@ -155,6 +158,20 @@ func NewNode(rc *pb.RaftContext, store *raftwal.DiskStorage) *Node { return n } +func (n *Node) ReportRaftComms() { + if !glog.V(3) { + return + } + ticker := time.NewTicker(time.Second) + defer ticker.Stop() + + for range ticker.C { + out := atomic.SwapInt64(&n.heartbeatsOut, 0) + in := atomic.SwapInt64(&n.heartbeatsIn, 0) + glog.Infof("RaftComm: [%#x] Heartbeats out: %d, in: %d", n.Id, out, in) + } +} + // SetRaft would set the provided raft.Node to this node. // It would check fail if the node is already set. func (n *Node) SetRaft(r raft.Node) { @@ -233,6 +250,7 @@ func (n *Node) Send(msg raftpb.Message) { if glog.V(2) { switch msg.Type { case raftpb.MsgHeartbeat, raftpb.MsgHeartbeatResp: + atomic.AddInt64(&n.heartbeatsOut, 1) case raftpb.MsgReadIndex, raftpb.MsgReadIndexResp: case raftpb.MsgApp, raftpb.MsgAppResp: case raftpb.MsgProp: diff --git a/conn/raft_server.go b/conn/raft_server.go index 03a04077d4c..06d6f08c3fd 100644 --- a/conn/raft_server.go +++ b/conn/raft_server.go @@ -21,6 +21,7 @@ import ( "encoding/binary" "math/rand" "sync" + "sync/atomic" "time" "github.com/dgraph-io/dgo/protos/api" @@ -228,6 +229,7 @@ func (w *RaftServer) RaftMessage(server pb.Raft_RaftMessageServer) error { if glog.V(2) { switch msg.Type { case raftpb.MsgHeartbeat, raftpb.MsgHeartbeatResp: + atomic.AddInt64(&n.heartbeatsIn, 1) case raftpb.MsgReadIndex, raftpb.MsgReadIndexResp: case raftpb.MsgApp, raftpb.MsgAppResp: case raftpb.MsgProp: diff --git a/dgraph/cmd/zero/raft.go b/dgraph/cmd/zero/raft.go index 169e1497ec0..a5ad758acf5 100644 --- a/dgraph/cmd/zero/raft.go +++ b/dgraph/cmd/zero/raft.go @@ -618,6 +618,7 @@ func (n *node) Run() { // We only stop runReadIndexLoop after the for loop below has finished interacting with it. // That way we know sending to readStateCh will not deadlock. + var timer x.Timer for { select { case <-n.closer.HasBeenClosed(): @@ -626,7 +627,7 @@ func (n *node) Run() { case <-ticker.C: n.Raft().Tick() case rd := <-n.Raft().Ready(): - start := time.Now() + timer.Start() _, span := otrace.StartSpan(n.ctx, "Zero.RunLoop", otrace.WithSampler(otrace.ProbabilitySampler(0.001))) for _, rs := range rd.ReadStates { @@ -654,7 +655,7 @@ func (n *node) Run() { } n.SaveToStorage(rd.HardState, rd.Entries, rd.Snapshot) span.Annotatef(nil, "Saved to storage") - diskDur := time.Since(start) + timer.Record("disk") if !raft.IsEmptySnap(rd.Snapshot) { var state pb.MembershipState @@ -689,16 +690,18 @@ func (n *node) Run() { } } span.Annotate(nil, "Sent messages") + timer.Record("proposals") n.Raft().Advance() span.Annotate(nil, "Advanced Raft") + timer.Record("advance") + span.End() - if time.Since(start) > 100*time.Millisecond { + if timer.Total() > 100*time.Millisecond { glog.Warningf( - "Raft.Ready took too long to process: %v. Most likely due to slow disk: %v."+ + "Raft.Ready took too long to process: %s."+ " Num entries: %d. MustSync: %v", - time.Since(start).Round(time.Millisecond), diskDur.Round(time.Millisecond), - len(rd.Entries), rd.MustSync) + timer.String(), len(rd.Entries), rd.MustSync) } } } diff --git a/worker/draft.go b/worker/draft.go index 0c4e46cd179..1d54b0c7eff 100644 --- a/worker/draft.go +++ b/worker/draft.go @@ -724,10 +724,56 @@ func (n *node) updateRaftProgress() error { if err := txn.CommitAt(1, nil); err != nil { return err } - glog.V(1).Infof("[%#x] Set Raft progress to index: %d.", n.Id, snap.Index) + glog.V(2).Infof("[%#x] Set Raft progress to index: %d.", n.Id, snap.Index) return nil } +func (n *node) checkpointAndClose(done chan struct{}) { + slowTicker := time.NewTicker(time.Minute) + defer slowTicker.Stop() + + for { + select { + case <-slowTicker.C: + // Do these operations asynchronously away from the main Run loop to allow heartbeats to + // be sent on time. Otherwise, followers would just keep running elections. + + n.elog.Printf("Size of applyCh: %d", len(n.applyCh)) + if err := n.updateRaftProgress(); err != nil { + glog.Errorf("While updating Raft progress: %v", err) + } + + if n.AmLeader() { + // We keep track of the applied index in the p directory. Even if we don't take + // snapshot for a while and let the Raft logs grow and restart, we would not have to + // run all the log entries, because we can tell Raft.Config to set Applied to that + // index. + // This applied index tracking also covers the case when we have a big index + // rebuild. The rebuild would be tracked just like others and would not need to be + // replayed after a restart, because the Applied config would let us skip right + // through it. + // We use disk based storage for Raft. So, we're not too concerned about + // snapshotting. We just need to do enough, so that we don't have a huge backlog of + // entries to process on a restart. + if err := n.proposeSnapshot(x.WorkerConfig.SnapshotAfter); err != nil { + x.Errorf("While calculating and proposing snapshot: %v", err) + } + go n.abortOldTransactions() + } + + case <-n.closer.HasBeenClosed(): + glog.Infof("Stopping node.Run") + if peerId, has := groups().MyPeer(); has && n.AmLeader() { + n.Raft().TransferLeadership(n.ctx, x.WorkerConfig.RaftId, peerId) + time.Sleep(time.Second) // Let transfer happen. + } + n.Raft().Stop() + close(done) + return + } + } +} + func (n *node) Run() { defer n.closer.Done() // CLOSER:1 @@ -737,20 +783,9 @@ func (n *node) Run() { ticker := time.NewTicker(20 * time.Millisecond) defer ticker.Stop() - slowTicker := time.NewTicker(30 * time.Second) - defer slowTicker.Stop() - done := make(chan struct{}) - go func() { - <-n.closer.HasBeenClosed() - glog.Infof("Stopping node.Run") - if peerId, has := groups().MyPeer(); has && n.AmLeader() { - n.Raft().TransferLeadership(n.ctx, x.WorkerConfig.RaftId, peerId) - time.Sleep(time.Second) // Let transfer happen. - } - n.Raft().Stop() - close(done) - }() + go n.checkpointAndClose(done) + go n.ReportRaftComms() applied, err := n.findRaftProgress() if err != nil { @@ -759,6 +794,7 @@ func (n *node) Run() { glog.Infof("Found Raft progress in p directory: %d", applied) } + var timer x.Timer for { select { case <-done: @@ -769,35 +805,15 @@ func (n *node) Run() { glog.Infoln("Raft node done.") return - case <-slowTicker.C: - n.elog.Printf("Size of applyCh: %d", len(n.applyCh)) - if err := n.updateRaftProgress(); err != nil { - glog.Errorf("While updating Raft progress: %v", err) - } - - if leader { - // We keep track of the applied index in the p directory. Even if we don't take - // snapshot for a while and let the Raft logs grow and restart, we would not have to - // run all the log entries, because we can tell Raft.Config to set Applied to that - // index. - // This applied index tracking also covers the case when we have a big index - // rebuild. The rebuild would be tracked just like others and would not need to be - // replayed after a restart, because the Applied config would let us skip right - // through it. - // We use disk based storage for Raft. So, we're not too concerned about - // snapshotting. We just need to do enough, so that we don't have a huge backlog of - // entries to process on a restart. - if err := n.proposeSnapshot(x.WorkerConfig.SnapshotAfter); err != nil { - x.Errorf("While calculating and proposing snapshot: %v", err) - } - go n.abortOldTransactions() - } - + // Slow ticker can't be placed here because figuring out checkpoints and snapshots takes + // time and if the leader does not send heartbeats out during this time, the followers + // start an election process. And that election process would just continue to happen + // indefinitely because checkpoints and snapshots are being calculated indefinitely. case <-ticker.C: n.Raft().Tick() case rd := <-n.Raft().Ready(): - start := time.Now() + timer.Start() _, span := otrace.StartSpan(n.ctx, "Alpha.RunLoop", otrace.WithSampler(otrace.ProbabilitySampler(0.001))) @@ -875,13 +891,13 @@ func (n *node) Run() { // Store the hardstate and entries. Note that these are not CommittedEntries. n.SaveToStorage(rd.HardState, rd.Entries, rd.Snapshot) - diskDur := time.Since(start) if span != nil { span.Annotatef(nil, "Saved %d entries. Snapshot, HardState empty? (%v, %v)", len(rd.Entries), raft.IsEmptySnap(rd.Snapshot), raft.IsEmptyHardState(rd.HardState)) } + timer.Record("disk") // Now schedule or apply committed entries. var proposals []*pb.Proposal @@ -953,8 +969,11 @@ func (n *node) Run() { if span != nil { span.Annotate(nil, "Followed queued messages.") } + timer.Record("proposals") n.Raft().Advance() + timer.Record("advance") + if firstRun && n.canCampaign { go n.Raft().Campaign(n.ctx) firstRun = false @@ -964,14 +983,13 @@ func (n *node) Run() { span.End() ostats.RecordWithTags(context.Background(), []tag.Mutator{tag.Upsert(x.KeyMethod, "alpha.RunLoop")}, - x.LatencyMs.M(x.SinceMs(start))) + x.LatencyMs.M(float64(timer.Total())/1e6)) } - if time.Since(start) > 100*time.Millisecond { + if timer.Total() > 100*time.Millisecond { glog.Warningf( - "Raft.Ready took too long to process: %v. Most likely due to slow disk: %v."+ + "Raft.Ready took too long to process: %s"+ " Num entries: %d. MustSync: %v", - time.Since(start).Round(time.Millisecond), diskDur.Round(time.Millisecond), - len(rd.Entries), rd.MustSync) + timer.String(), len(rd.Entries), rd.MustSync) } } } @@ -1214,7 +1232,7 @@ func (n *node) calculateSnapshot(discardN int) (*pb.Snapshot, error) { } if num := posting.Oracle().NumPendingTxns(); num > 0 { - glog.Infof("Num pending txns: %d", num) + glog.V(2).Infof("Num pending txns: %d", num) } // We can't rely upon the Raft entries to determine the minPendingStart, // because there are many cases during mutations where we don't commit or diff --git a/x/x.go b/x/x.go index 3e3b6c275b0..96a0c83d608 100644 --- a/x/x.go +++ b/x/x.go @@ -379,10 +379,14 @@ func (b *BytesBuffer) TruncateBy(n int) { AssertTrue(b.off >= 0 && b.sz >= 0) } +type record struct { + Name string + Dur time.Duration +} type Timer struct { start time.Time last time.Time - records []time.Duration + records []record } func (t *Timer) Start() { @@ -391,18 +395,24 @@ func (t *Timer) Start() { t.records = t.records[:0] } -func (t *Timer) Record() { +func (t *Timer) Record(name string) { now := time.Now() - t.records = append(t.records, now.Sub(t.last)) + t.records = append(t.records, record{ + Name: name, + Dur: now.Sub(t.last).Round(time.Millisecond), + }) t.last = now } func (t *Timer) Total() time.Duration { - return time.Since(t.start) + return time.Since(t.start).Round(time.Millisecond) } -func (t *Timer) All() []time.Duration { - return t.records +func (t *Timer) String() string { + sort.Slice(t.records, func(i, j int) bool { + return t.records[i].Dur > t.records[j].Dur + }) + return fmt.Sprintf("Timer Total: %s. Breakdown: %v", t.Total(), t.records) } // PredicateLang extracts the language from a predicate (or facet) name.