From 55390aaa514ac1e66a7691b40c1f95bad66487a7 Mon Sep 17 00:00:00 2001 From: Manish R Jain Date: Wed, 9 Dec 2020 22:34:47 -0800 Subject: [PATCH] Fix(Zero): Pick up checkpoint timestamp In the last commit, I was picking up checkpoint index instead of the timestamp corresponding to the checkpoint from Alpha leaders. This is a fix for that. --- dgraph/cmd/zero/raft.go | 14 +++++++++----- raftwal/storage.go | 2 ++ worker/draft.go | 8 +++++--- worker/groups.go | 4 +--- 4 files changed, 17 insertions(+), 11 deletions(-) diff --git a/dgraph/cmd/zero/raft.go b/dgraph/cmd/zero/raft.go index 8b37f167127..7870955b3a3 100644 --- a/dgraph/cmd/zero/raft.go +++ b/dgraph/cmd/zero/raft.go @@ -708,12 +708,16 @@ func (n *node) calculateAndProposeSnapshot() error { s := n.server s.RLock() if len(s.state.Groups) != len(s.checkpointPerGroup) { - glog.Infof("Skipping creating a snapshot. Num groups: %d, Num max assigned: %d", + log := fmt.Sprintf("Skipping creating a snapshot."+ + " Num groups: %d, Num checkpoints: %d\n", len(s.state.Groups), len(s.checkpointPerGroup)) s.RUnlock() + span.Annotatef(nil, log) + glog.Infof(log) return nil } - for _, ts := range s.checkpointPerGroup { + for gid, ts := range s.checkpointPerGroup { + span.Annotatef(nil, "Group: %d Checkpoint Ts: %d", gid, ts) discardBelow = x.Min(discardBelow, ts) } s.RUnlock() @@ -730,7 +734,7 @@ func (n *node) calculateAndProposeSnapshot() error { return err } - span.Annotatef(nil, "First index: %d. Last index: %d. Discard Below: %d", + span.Annotatef(nil, "First index: %d. Last index: %d. Discard Below Ts: %d", first, last, discardBelow) var snapshotIndex uint64 @@ -764,7 +768,7 @@ func (n *node) calculateAndProposeSnapshot() error { if snapshotIndex == 0 { return nil } - span.Annotatef(nil, "Taking snapshot at: %d", snapshotIndex) + span.Annotatef(nil, "Taking snapshot at index: %d", snapshotIndex) state := n.server.membershipState() zs := &pb.ZeroSnapshot{ @@ -772,7 +776,7 @@ func (n *node) calculateAndProposeSnapshot() error { CheckpointTs: discardBelow, State: state, } - glog.V(2).Infof("Proposing snapshot at Index: %d Checkpoint Ts: %d\n", + glog.V(2).Infof("Proposing snapshot at index: %d, checkpoint ts: %d\n", zs.Index, zs.CheckpointTs) zp := &pb.ZeroProposal{Snapshot: zs} if err = n.proposeAndWait(n.ctx, zp); err != nil { diff --git a/raftwal/storage.go b/raftwal/storage.go index ee6fa111a1b..5af6a0181d9 100644 --- a/raftwal/storage.go +++ b/raftwal/storage.go @@ -152,6 +152,8 @@ func (w *DiskStorage) HardState() (raftpb.HardState, error) { } return w.meta.HardState() } + +// Checkpoint returns the Raft index corresponding to the checkpoint. func (w *DiskStorage) Checkpoint() (uint64, error) { if w.meta == nil { return 0, errors.Errorf("uninitialized meta file") diff --git a/worker/draft.go b/worker/draft.go index c00f24f295d..fb458825032 100644 --- a/worker/draft.go +++ b/worker/draft.go @@ -62,7 +62,8 @@ type node struct { gid uint32 closer *z.Closer - streaming int32 // Used to avoid calculating snapshot + checkpointTs uint64 // Timestamp corresponding to checkpoint. + streaming int32 // Used to avoid calculating snapshot // Used to track the ops going on in the system. ops map[op]*z.Closer @@ -600,7 +601,7 @@ func (n *node) applyCommitted(proposal *pb.Proposal) error { return nil } n.elog.Printf("Creating snapshot: %+v", snap) - glog.Infof("Creating snapshot at index: %d. ReadTs: %d.\n", snap.Index, snap.ReadTs) + glog.Infof("Creating snapshot at Index: %d, ReadTs: %d\n", snap.Index, snap.ReadTs) data, err := snap.Marshal() x.Check(err) @@ -936,9 +937,10 @@ func (n *node) updateRaftProgress() error { if err != nil || snap == nil || snap.Index <= applied { return err } + atomic.StoreUint64(&n.checkpointTs, snap.ReadTs) n.Store.SetUint(raftwal.CheckpointIndex, snap.GetIndex()) - glog.V(2).Infof("[%#x] Set Raft progress to index: %d.", n.Id, snap.Index) + glog.V(2).Infof("[%#x] Set Raft progress to index: %d, ts: %d.", n.Id, snap.Index, snap.ReadTs) return nil } diff --git a/worker/groups.go b/worker/groups.go index 81f38f46c04..31e55da6db3 100644 --- a/worker/groups.go +++ b/worker/groups.go @@ -712,9 +712,7 @@ func (g *groupi) doSendMembership(tablets map[string]*pb.Tablet) error { if snap, err := g.Node.Snapshot(); err == nil { group.SnapshotTs = snap.ReadTs } - if cidx, err := g.Node.Store.Checkpoint(); err == nil { - group.CheckpointTs = cidx - } + group.CheckpointTs = atomic.LoadUint64(&g.Node.checkpointTs) } pl := g.connToZeroLeader()