Skip to content

Commit

Permalink
Fix(Zero): Pick up checkpoint timestamp
Browse files Browse the repository at this point in the history
In the last commit, I was picking up checkpoint index instead of the
timestamp corresponding to the checkpoint from Alpha leaders. This is a
fix for that.
  • Loading branch information
manishrjain committed Dec 10, 2020
1 parent 5efdfbf commit 55390aa
Show file tree
Hide file tree
Showing 4 changed files with 17 additions and 11 deletions.
14 changes: 9 additions & 5 deletions dgraph/cmd/zero/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -708,12 +708,16 @@ func (n *node) calculateAndProposeSnapshot() error {
s := n.server
s.RLock()
if len(s.state.Groups) != len(s.checkpointPerGroup) {
glog.Infof("Skipping creating a snapshot. Num groups: %d, Num max assigned: %d",
log := fmt.Sprintf("Skipping creating a snapshot."+
" Num groups: %d, Num checkpoints: %d\n",
len(s.state.Groups), len(s.checkpointPerGroup))
s.RUnlock()
span.Annotatef(nil, log)
glog.Infof(log)
return nil
}
for _, ts := range s.checkpointPerGroup {
for gid, ts := range s.checkpointPerGroup {
span.Annotatef(nil, "Group: %d Checkpoint Ts: %d", gid, ts)
discardBelow = x.Min(discardBelow, ts)
}
s.RUnlock()
Expand All @@ -730,7 +734,7 @@ func (n *node) calculateAndProposeSnapshot() error {
return err
}

span.Annotatef(nil, "First index: %d. Last index: %d. Discard Below: %d",
span.Annotatef(nil, "First index: %d. Last index: %d. Discard Below Ts: %d",
first, last, discardBelow)

var snapshotIndex uint64
Expand Down Expand Up @@ -764,15 +768,15 @@ func (n *node) calculateAndProposeSnapshot() error {
if snapshotIndex == 0 {
return nil
}
span.Annotatef(nil, "Taking snapshot at: %d", snapshotIndex)
span.Annotatef(nil, "Taking snapshot at index: %d", snapshotIndex)
state := n.server.membershipState()

zs := &pb.ZeroSnapshot{
Index: snapshotIndex,
CheckpointTs: discardBelow,
State: state,
}
glog.V(2).Infof("Proposing snapshot at Index: %d Checkpoint Ts: %d\n",
glog.V(2).Infof("Proposing snapshot at index: %d, checkpoint ts: %d\n",
zs.Index, zs.CheckpointTs)
zp := &pb.ZeroProposal{Snapshot: zs}
if err = n.proposeAndWait(n.ctx, zp); err != nil {
Expand Down
2 changes: 2 additions & 0 deletions raftwal/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,8 @@ func (w *DiskStorage) HardState() (raftpb.HardState, error) {
}
return w.meta.HardState()
}

// Checkpoint returns the Raft index corresponding to the checkpoint.
func (w *DiskStorage) Checkpoint() (uint64, error) {
if w.meta == nil {
return 0, errors.Errorf("uninitialized meta file")
Expand Down
8 changes: 5 additions & 3 deletions worker/draft.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@ type node struct {
gid uint32
closer *z.Closer

streaming int32 // Used to avoid calculating snapshot
checkpointTs uint64 // Timestamp corresponding to checkpoint.
streaming int32 // Used to avoid calculating snapshot

// Used to track the ops going on in the system.
ops map[op]*z.Closer
Expand Down Expand Up @@ -600,7 +601,7 @@ func (n *node) applyCommitted(proposal *pb.Proposal) error {
return nil
}
n.elog.Printf("Creating snapshot: %+v", snap)
glog.Infof("Creating snapshot at index: %d. ReadTs: %d.\n", snap.Index, snap.ReadTs)
glog.Infof("Creating snapshot at Index: %d, ReadTs: %d\n", snap.Index, snap.ReadTs)

data, err := snap.Marshal()
x.Check(err)
Expand Down Expand Up @@ -936,9 +937,10 @@ func (n *node) updateRaftProgress() error {
if err != nil || snap == nil || snap.Index <= applied {
return err
}
atomic.StoreUint64(&n.checkpointTs, snap.ReadTs)

n.Store.SetUint(raftwal.CheckpointIndex, snap.GetIndex())
glog.V(2).Infof("[%#x] Set Raft progress to index: %d.", n.Id, snap.Index)
glog.V(2).Infof("[%#x] Set Raft progress to index: %d, ts: %d.", n.Id, snap.Index, snap.ReadTs)
return nil
}

Expand Down
4 changes: 1 addition & 3 deletions worker/groups.go
Original file line number Diff line number Diff line change
Expand Up @@ -712,9 +712,7 @@ func (g *groupi) doSendMembership(tablets map[string]*pb.Tablet) error {
if snap, err := g.Node.Snapshot(); err == nil {
group.SnapshotTs = snap.ReadTs
}
if cidx, err := g.Node.Store.Checkpoint(); err == nil {
group.CheckpointTs = cidx
}
group.CheckpointTs = atomic.LoadUint64(&g.Node.checkpointTs)
}

pl := g.connToZeroLeader()
Expand Down

0 comments on commit 55390aa

Please sign in to comment.