From d86a3102cf50142ac79c48bbf80121499bcdd780 Mon Sep 17 00:00:00 2001 From: Manish R Jain Date: Wed, 8 May 2019 16:36:53 -0700 Subject: [PATCH 1/6] Report Heartbeat comms --- conn/node.go | 13 +++++++++++++ conn/raft_server.go | 2 ++ worker/draft.go | 5 +++++ 3 files changed, 20 insertions(+) diff --git a/conn/node.go b/conn/node.go index a33b9236d81..9723a0a48e7 100644 --- a/conn/node.go +++ b/conn/node.go @@ -72,6 +72,8 @@ type Node struct { // The stages are proposed -> committed (accepted by cluster) -> // applied (to PL) -> synced (to BadgerDB). Applied y.WaterMark + + Heartbeats int64 } type ToGlog struct { @@ -155,6 +157,16 @@ func NewNode(rc *pb.RaftContext, store *raftwal.DiskStorage) *Node { return n } +func (n *Node) ReportRaftComms() { + ticker := time.NewTicker(10 * time.Second) + defer ticker.Stop() + + for range ticker.C { + num := atomic.SwapInt64(&n.Heartbeats, 0) + glog.V(2).Infof("RaftComm: [%#x] Heartbeats exchanged since last report: %d", n.Id, num) + } +} + // SetRaft would set the provided raft.Node to this node. // It would check fail if the node is already set. func (n *Node) SetRaft(r raft.Node) { @@ -233,6 +245,7 @@ func (n *Node) Send(msg raftpb.Message) { if glog.V(2) { switch msg.Type { case raftpb.MsgHeartbeat, raftpb.MsgHeartbeatResp: + atomic.AddInt64(&n.Heartbeats, 1) case raftpb.MsgReadIndex, raftpb.MsgReadIndexResp: case raftpb.MsgApp, raftpb.MsgAppResp: case raftpb.MsgProp: diff --git a/conn/raft_server.go b/conn/raft_server.go index 03a04077d4c..3a2074e5da3 100644 --- a/conn/raft_server.go +++ b/conn/raft_server.go @@ -21,6 +21,7 @@ import ( "encoding/binary" "math/rand" "sync" + "sync/atomic" "time" "github.com/dgraph-io/dgo/protos/api" @@ -228,6 +229,7 @@ func (w *RaftServer) RaftMessage(server pb.Raft_RaftMessageServer) error { if glog.V(2) { switch msg.Type { case raftpb.MsgHeartbeat, raftpb.MsgHeartbeatResp: + atomic.AddInt64(&n.Heartbeats, 1) case raftpb.MsgReadIndex, raftpb.MsgReadIndexResp: case raftpb.MsgApp, raftpb.MsgAppResp: case raftpb.MsgProp: diff --git a/worker/draft.go b/worker/draft.go index 0c4e46cd179..c9eb14a681d 100644 --- a/worker/draft.go +++ b/worker/draft.go @@ -752,6 +752,8 @@ func (n *node) Run() { close(done) }() + go n.ReportRaftComms() + applied, err := n.findRaftProgress() if err != nil { glog.Errorf("While trying to find raft progress: %v", err) @@ -802,6 +804,9 @@ func (n *node) Run() { otrace.WithSampler(otrace.ProbabilitySampler(0.001))) if rd.SoftState != nil { + glog.V(2).Infof("RaftComm: Resetting heartbeats") + atomic.StoreInt64(&n.Node.Heartbeats, 0) + groups().triggerMembershipSync() leader = rd.RaftState == raft.StateLeader } From 6be5733facbea13f87a07761667e29d72dc19144 Mon Sep 17 00:00:00 2001 From: Manish R Jain Date: Wed, 8 May 2019 18:26:39 -0700 Subject: [PATCH 2/6] Add logs around heartbeats. --- conn/node.go | 15 ++++++++++----- conn/raft_server.go | 2 +- worker/draft.go | 3 --- 3 files changed, 11 insertions(+), 9 deletions(-) diff --git a/conn/node.go b/conn/node.go index 9723a0a48e7..2d286fefc2d 100644 --- a/conn/node.go +++ b/conn/node.go @@ -73,7 +73,8 @@ type Node struct { // applied (to PL) -> synced (to BadgerDB). Applied y.WaterMark - Heartbeats int64 + heartbeatsOut int64 + heartbeatsIn int64 } type ToGlog struct { @@ -158,12 +159,16 @@ func NewNode(rc *pb.RaftContext, store *raftwal.DiskStorage) *Node { } func (n *Node) ReportRaftComms() { - ticker := time.NewTicker(10 * time.Second) + if !glog.V(3) { + return + } + ticker := time.NewTicker(time.Second) defer ticker.Stop() for range ticker.C { - num := atomic.SwapInt64(&n.Heartbeats, 0) - glog.V(2).Infof("RaftComm: [%#x] Heartbeats exchanged since last report: %d", n.Id, num) + out := atomic.SwapInt64(&n.heartbeatsOut, 0) + in := atomic.SwapInt64(&n.heartbeatsIn, 0) + glog.Infof("RaftComm: [%#x] Heartbeats out: %d, in: %d", n.Id, out, in) } } @@ -245,7 +250,7 @@ func (n *Node) Send(msg raftpb.Message) { if glog.V(2) { switch msg.Type { case raftpb.MsgHeartbeat, raftpb.MsgHeartbeatResp: - atomic.AddInt64(&n.Heartbeats, 1) + atomic.AddInt64(&n.heartbeatsOut, 1) case raftpb.MsgReadIndex, raftpb.MsgReadIndexResp: case raftpb.MsgApp, raftpb.MsgAppResp: case raftpb.MsgProp: diff --git a/conn/raft_server.go b/conn/raft_server.go index 3a2074e5da3..06d6f08c3fd 100644 --- a/conn/raft_server.go +++ b/conn/raft_server.go @@ -229,7 +229,7 @@ func (w *RaftServer) RaftMessage(server pb.Raft_RaftMessageServer) error { if glog.V(2) { switch msg.Type { case raftpb.MsgHeartbeat, raftpb.MsgHeartbeatResp: - atomic.AddInt64(&n.Heartbeats, 1) + atomic.AddInt64(&n.heartbeatsIn, 1) case raftpb.MsgReadIndex, raftpb.MsgReadIndexResp: case raftpb.MsgApp, raftpb.MsgAppResp: case raftpb.MsgProp: diff --git a/worker/draft.go b/worker/draft.go index c9eb14a681d..925400442b6 100644 --- a/worker/draft.go +++ b/worker/draft.go @@ -804,9 +804,6 @@ func (n *node) Run() { otrace.WithSampler(otrace.ProbabilitySampler(0.001))) if rd.SoftState != nil { - glog.V(2).Infof("RaftComm: Resetting heartbeats") - atomic.StoreInt64(&n.Node.Heartbeats, 0) - groups().triggerMembershipSync() leader = rd.RaftState == raft.StateLeader } From e2514e43039c1ef522d84eda5bdcbae758eeb81a Mon Sep 17 00:00:00 2001 From: Manish R Jain Date: Wed, 8 May 2019 18:56:57 -0700 Subject: [PATCH 3/6] Move snapshot and checkpoint calculation outside of the main Raft loop. Capture the latency of individual components in Raft.Ready better. --- worker/draft.go | 90 ++++++++++++++++++++++++++++--------------------- x/x.go | 22 ++++++++---- 2 files changed, 67 insertions(+), 45 deletions(-) diff --git a/worker/draft.go b/worker/draft.go index 925400442b6..51f988c797a 100644 --- a/worker/draft.go +++ b/worker/draft.go @@ -724,7 +724,7 @@ func (n *node) updateRaftProgress() error { if err := txn.CommitAt(1, nil); err != nil { return err } - glog.V(1).Infof("[%#x] Set Raft progress to index: %d.", n.Id, snap.Index) + glog.V(2).Infof("[%#x] Set Raft progress to index: %d.", n.Id, snap.Index) return nil } @@ -737,47 +737,22 @@ func (n *node) Run() { ticker := time.NewTicker(20 * time.Millisecond) defer ticker.Stop() - slowTicker := time.NewTicker(30 * time.Second) - defer slowTicker.Stop() - done := make(chan struct{}) go func() { - <-n.closer.HasBeenClosed() - glog.Infof("Stopping node.Run") - if peerId, has := groups().MyPeer(); has && n.AmLeader() { - n.Raft().TransferLeadership(n.ctx, x.WorkerConfig.RaftId, peerId) - time.Sleep(time.Second) // Let transfer happen. - } - n.Raft().Stop() - close(done) - }() - - go n.ReportRaftComms() - - applied, err := n.findRaftProgress() - if err != nil { - glog.Errorf("While trying to find raft progress: %v", err) - } else { - glog.Infof("Found Raft progress in p directory: %d", applied) - } + slowTicker := time.NewTicker(time.Minute) + defer slowTicker.Stop() - for { select { - case <-done: - // We use done channel here instead of closer.HasBeenClosed so that we can transfer - // leadership in a goroutine. The push to n.applyCh happens in this loop, so the close - // should happen here too. Otherwise, race condition between push and close happens. - close(n.applyCh) - glog.Infoln("Raft node done.") - return - case <-slowTicker.C: + // Do these operations asynchronously away from the main Run loop to allow heartbeats to + // be sent on time. Otherwise, followers would just keep running elections. + n.elog.Printf("Size of applyCh: %d", len(n.applyCh)) if err := n.updateRaftProgress(); err != nil { glog.Errorf("While updating Raft progress: %v", err) } - if leader { + if n.AmLeader() { // We keep track of the applied index in the p directory. Even if we don't take // snapshot for a while and let the Raft logs grow and restart, we would not have to // run all the log entries, because we can tell Raft.Config to set Applied to that @@ -795,11 +770,46 @@ func (n *node) Run() { go n.abortOldTransactions() } + case <-n.closer.HasBeenClosed(): + glog.Infof("Stopping node.Run") + if peerId, has := groups().MyPeer(); has && n.AmLeader() { + n.Raft().TransferLeadership(n.ctx, x.WorkerConfig.RaftId, peerId) + time.Sleep(time.Second) // Let transfer happen. + } + n.Raft().Stop() + close(done) + } + }() + + go n.ReportRaftComms() + + applied, err := n.findRaftProgress() + if err != nil { + glog.Errorf("While trying to find raft progress: %v", err) + } else { + glog.Infof("Found Raft progress in p directory: %d", applied) + } + + var timer x.Timer + for { + select { + case <-done: + // We use done channel here instead of closer.HasBeenClosed so that we can transfer + // leadership in a goroutine. The push to n.applyCh happens in this loop, so the close + // should happen here too. Otherwise, race condition between push and close happens. + close(n.applyCh) + glog.Infoln("Raft node done.") + return + + // Slow ticker can't be placed here because figuring out checkpoints and snapshots takes + // time and if the leader does not send heartbeats out during this time, the followers + // start an election process. And that election process would just continue to happen + // indefinitely because checkpoints and snapshots are being calculated indefinitely. case <-ticker.C: n.Raft().Tick() case rd := <-n.Raft().Ready(): - start := time.Now() + timer.Start() _, span := otrace.StartSpan(n.ctx, "Alpha.RunLoop", otrace.WithSampler(otrace.ProbabilitySampler(0.001))) @@ -877,13 +887,13 @@ func (n *node) Run() { // Store the hardstate and entries. Note that these are not CommittedEntries. n.SaveToStorage(rd.HardState, rd.Entries, rd.Snapshot) - diskDur := time.Since(start) if span != nil { span.Annotatef(nil, "Saved %d entries. Snapshot, HardState empty? (%v, %v)", len(rd.Entries), raft.IsEmptySnap(rd.Snapshot), raft.IsEmptyHardState(rd.HardState)) } + timer.Record("disk") // Now schedule or apply committed entries. var proposals []*pb.Proposal @@ -955,8 +965,11 @@ func (n *node) Run() { if span != nil { span.Annotate(nil, "Followed queued messages.") } + timer.Record("proposals") n.Raft().Advance() + timer.Record("advance") + if firstRun && n.canCampaign { go n.Raft().Campaign(n.ctx) firstRun = false @@ -966,14 +979,13 @@ func (n *node) Run() { span.End() ostats.RecordWithTags(context.Background(), []tag.Mutator{tag.Upsert(x.KeyMethod, "alpha.RunLoop")}, - x.LatencyMs.M(x.SinceMs(start))) + x.LatencyMs.M(float64(timer.Total())/1e6)) } - if time.Since(start) > 100*time.Millisecond { + if timer.Total() > 100*time.Millisecond { glog.Warningf( - "Raft.Ready took too long to process: %v. Most likely due to slow disk: %v."+ + "Raft.Ready took too long to process: %s"+ " Num entries: %d. MustSync: %v", - time.Since(start).Round(time.Millisecond), diskDur.Round(time.Millisecond), - len(rd.Entries), rd.MustSync) + timer.String(), len(rd.Entries), rd.MustSync) } } } diff --git a/x/x.go b/x/x.go index 3e3b6c275b0..96a0c83d608 100644 --- a/x/x.go +++ b/x/x.go @@ -379,10 +379,14 @@ func (b *BytesBuffer) TruncateBy(n int) { AssertTrue(b.off >= 0 && b.sz >= 0) } +type record struct { + Name string + Dur time.Duration +} type Timer struct { start time.Time last time.Time - records []time.Duration + records []record } func (t *Timer) Start() { @@ -391,18 +395,24 @@ func (t *Timer) Start() { t.records = t.records[:0] } -func (t *Timer) Record() { +func (t *Timer) Record(name string) { now := time.Now() - t.records = append(t.records, now.Sub(t.last)) + t.records = append(t.records, record{ + Name: name, + Dur: now.Sub(t.last).Round(time.Millisecond), + }) t.last = now } func (t *Timer) Total() time.Duration { - return time.Since(t.start) + return time.Since(t.start).Round(time.Millisecond) } -func (t *Timer) All() []time.Duration { - return t.records +func (t *Timer) String() string { + sort.Slice(t.records, func(i, j int) bool { + return t.records[i].Dur > t.records[j].Dur + }) + return fmt.Sprintf("Timer Total: %s. Breakdown: %v", t.Total(), t.records) } // PredicateLang extracts the language from a predicate (or facet) name. From cd1f778c35646d599dd5c76bfa8a472bb8e92c21 Mon Sep 17 00:00:00 2001 From: Manish R Jain Date: Wed, 8 May 2019 19:09:14 -0700 Subject: [PATCH 4/6] Add timer to Zero as well. Fix a bug: Use a for loop when going over slow ticker. --- dgraph/cmd/zero/raft.go | 15 +++++---- worker/draft.go | 67 +++++++++++++++++++++-------------------- 2 files changed, 44 insertions(+), 38 deletions(-) diff --git a/dgraph/cmd/zero/raft.go b/dgraph/cmd/zero/raft.go index 169e1497ec0..a5ad758acf5 100644 --- a/dgraph/cmd/zero/raft.go +++ b/dgraph/cmd/zero/raft.go @@ -618,6 +618,7 @@ func (n *node) Run() { // We only stop runReadIndexLoop after the for loop below has finished interacting with it. // That way we know sending to readStateCh will not deadlock. + var timer x.Timer for { select { case <-n.closer.HasBeenClosed(): @@ -626,7 +627,7 @@ func (n *node) Run() { case <-ticker.C: n.Raft().Tick() case rd := <-n.Raft().Ready(): - start := time.Now() + timer.Start() _, span := otrace.StartSpan(n.ctx, "Zero.RunLoop", otrace.WithSampler(otrace.ProbabilitySampler(0.001))) for _, rs := range rd.ReadStates { @@ -654,7 +655,7 @@ func (n *node) Run() { } n.SaveToStorage(rd.HardState, rd.Entries, rd.Snapshot) span.Annotatef(nil, "Saved to storage") - diskDur := time.Since(start) + timer.Record("disk") if !raft.IsEmptySnap(rd.Snapshot) { var state pb.MembershipState @@ -689,16 +690,18 @@ func (n *node) Run() { } } span.Annotate(nil, "Sent messages") + timer.Record("proposals") n.Raft().Advance() span.Annotate(nil, "Advanced Raft") + timer.Record("advance") + span.End() - if time.Since(start) > 100*time.Millisecond { + if timer.Total() > 100*time.Millisecond { glog.Warningf( - "Raft.Ready took too long to process: %v. Most likely due to slow disk: %v."+ + "Raft.Ready took too long to process: %s."+ " Num entries: %d. MustSync: %v", - time.Since(start).Round(time.Millisecond), diskDur.Round(time.Millisecond), - len(rd.Entries), rd.MustSync) + timer.String(), len(rd.Entries), rd.MustSync) } } } diff --git a/worker/draft.go b/worker/draft.go index 51f988c797a..6de2dce5ce1 100644 --- a/worker/draft.go +++ b/worker/draft.go @@ -742,42 +742,45 @@ func (n *node) Run() { slowTicker := time.NewTicker(time.Minute) defer slowTicker.Stop() - select { - case <-slowTicker.C: - // Do these operations asynchronously away from the main Run loop to allow heartbeats to - // be sent on time. Otherwise, followers would just keep running elections. - - n.elog.Printf("Size of applyCh: %d", len(n.applyCh)) - if err := n.updateRaftProgress(); err != nil { - glog.Errorf("While updating Raft progress: %v", err) - } + for { + select { + case <-slowTicker.C: + // Do these operations asynchronously away from the main Run loop to allow heartbeats to + // be sent on time. Otherwise, followers would just keep running elections. + + n.elog.Printf("Size of applyCh: %d", len(n.applyCh)) + if err := n.updateRaftProgress(); err != nil { + glog.Errorf("While updating Raft progress: %v", err) + } - if n.AmLeader() { - // We keep track of the applied index in the p directory. Even if we don't take - // snapshot for a while and let the Raft logs grow and restart, we would not have to - // run all the log entries, because we can tell Raft.Config to set Applied to that - // index. - // This applied index tracking also covers the case when we have a big index - // rebuild. The rebuild would be tracked just like others and would not need to be - // replayed after a restart, because the Applied config would let us skip right - // through it. - // We use disk based storage for Raft. So, we're not too concerned about - // snapshotting. We just need to do enough, so that we don't have a huge backlog of - // entries to process on a restart. - if err := n.proposeSnapshot(x.WorkerConfig.SnapshotAfter); err != nil { - x.Errorf("While calculating and proposing snapshot: %v", err) + if n.AmLeader() { + // We keep track of the applied index in the p directory. Even if we don't take + // snapshot for a while and let the Raft logs grow and restart, we would not have to + // run all the log entries, because we can tell Raft.Config to set Applied to that + // index. + // This applied index tracking also covers the case when we have a big index + // rebuild. The rebuild would be tracked just like others and would not need to be + // replayed after a restart, because the Applied config would let us skip right + // through it. + // We use disk based storage for Raft. So, we're not too concerned about + // snapshotting. We just need to do enough, so that we don't have a huge backlog of + // entries to process on a restart. + if err := n.proposeSnapshot(x.WorkerConfig.SnapshotAfter); err != nil { + x.Errorf("While calculating and proposing snapshot: %v", err) + } + go n.abortOldTransactions() } - go n.abortOldTransactions() - } - case <-n.closer.HasBeenClosed(): - glog.Infof("Stopping node.Run") - if peerId, has := groups().MyPeer(); has && n.AmLeader() { - n.Raft().TransferLeadership(n.ctx, x.WorkerConfig.RaftId, peerId) - time.Sleep(time.Second) // Let transfer happen. + case <-n.closer.HasBeenClosed(): + glog.Infof("Stopping node.Run") + if peerId, has := groups().MyPeer(); has && n.AmLeader() { + n.Raft().TransferLeadership(n.ctx, x.WorkerConfig.RaftId, peerId) + time.Sleep(time.Second) // Let transfer happen. + } + n.Raft().Stop() + close(done) + return } - n.Raft().Stop() - close(done) } }() From fb45daa71920871de69b7fa8e0984cdd8e937251 Mon Sep 17 00:00:00 2001 From: Manish R Jain Date: Wed, 8 May 2019 19:13:04 -0700 Subject: [PATCH 5/6] Move num pending txns to V(2). --- worker/draft.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/worker/draft.go b/worker/draft.go index 6de2dce5ce1..00e38a57bec 100644 --- a/worker/draft.go +++ b/worker/draft.go @@ -1231,7 +1231,7 @@ func (n *node) calculateSnapshot(discardN int) (*pb.Snapshot, error) { } if num := posting.Oracle().NumPendingTxns(); num > 0 { - glog.Infof("Num pending txns: %d", num) + glog.V(2).Infof("Num pending txns: %d", num) } // We can't rely upon the Raft entries to determine the minPendingStart, // because there are many cases during mutations where we don't commit or From f5f170e254f0d690f5e4154255305afdfb3b2f38 Mon Sep 17 00:00:00 2001 From: Manish R Jain Date: Wed, 8 May 2019 19:26:13 -0700 Subject: [PATCH 6/6] Move the checkpointing code outside of the Run func. --- worker/draft.go | 93 +++++++++++++++++++++++++------------------------ 1 file changed, 47 insertions(+), 46 deletions(-) diff --git a/worker/draft.go b/worker/draft.go index 00e38a57bec..1d54b0c7eff 100644 --- a/worker/draft.go +++ b/worker/draft.go @@ -728,6 +728,52 @@ func (n *node) updateRaftProgress() error { return nil } +func (n *node) checkpointAndClose(done chan struct{}) { + slowTicker := time.NewTicker(time.Minute) + defer slowTicker.Stop() + + for { + select { + case <-slowTicker.C: + // Do these operations asynchronously away from the main Run loop to allow heartbeats to + // be sent on time. Otherwise, followers would just keep running elections. + + n.elog.Printf("Size of applyCh: %d", len(n.applyCh)) + if err := n.updateRaftProgress(); err != nil { + glog.Errorf("While updating Raft progress: %v", err) + } + + if n.AmLeader() { + // We keep track of the applied index in the p directory. Even if we don't take + // snapshot for a while and let the Raft logs grow and restart, we would not have to + // run all the log entries, because we can tell Raft.Config to set Applied to that + // index. + // This applied index tracking also covers the case when we have a big index + // rebuild. The rebuild would be tracked just like others and would not need to be + // replayed after a restart, because the Applied config would let us skip right + // through it. + // We use disk based storage for Raft. So, we're not too concerned about + // snapshotting. We just need to do enough, so that we don't have a huge backlog of + // entries to process on a restart. + if err := n.proposeSnapshot(x.WorkerConfig.SnapshotAfter); err != nil { + x.Errorf("While calculating and proposing snapshot: %v", err) + } + go n.abortOldTransactions() + } + + case <-n.closer.HasBeenClosed(): + glog.Infof("Stopping node.Run") + if peerId, has := groups().MyPeer(); has && n.AmLeader() { + n.Raft().TransferLeadership(n.ctx, x.WorkerConfig.RaftId, peerId) + time.Sleep(time.Second) // Let transfer happen. + } + n.Raft().Stop() + close(done) + return + } + } +} + func (n *node) Run() { defer n.closer.Done() // CLOSER:1 @@ -738,52 +784,7 @@ func (n *node) Run() { defer ticker.Stop() done := make(chan struct{}) - go func() { - slowTicker := time.NewTicker(time.Minute) - defer slowTicker.Stop() - - for { - select { - case <-slowTicker.C: - // Do these operations asynchronously away from the main Run loop to allow heartbeats to - // be sent on time. Otherwise, followers would just keep running elections. - - n.elog.Printf("Size of applyCh: %d", len(n.applyCh)) - if err := n.updateRaftProgress(); err != nil { - glog.Errorf("While updating Raft progress: %v", err) - } - - if n.AmLeader() { - // We keep track of the applied index in the p directory. Even if we don't take - // snapshot for a while and let the Raft logs grow and restart, we would not have to - // run all the log entries, because we can tell Raft.Config to set Applied to that - // index. - // This applied index tracking also covers the case when we have a big index - // rebuild. The rebuild would be tracked just like others and would not need to be - // replayed after a restart, because the Applied config would let us skip right - // through it. - // We use disk based storage for Raft. So, we're not too concerned about - // snapshotting. We just need to do enough, so that we don't have a huge backlog of - // entries to process on a restart. - if err := n.proposeSnapshot(x.WorkerConfig.SnapshotAfter); err != nil { - x.Errorf("While calculating and proposing snapshot: %v", err) - } - go n.abortOldTransactions() - } - - case <-n.closer.HasBeenClosed(): - glog.Infof("Stopping node.Run") - if peerId, has := groups().MyPeer(); has && n.AmLeader() { - n.Raft().TransferLeadership(n.ctx, x.WorkerConfig.RaftId, peerId) - time.Sleep(time.Second) // Let transfer happen. - } - n.Raft().Stop() - close(done) - return - } - } - }() - + go n.checkpointAndClose(done) go n.ReportRaftComms() applied, err := n.findRaftProgress()