Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

When calculating a snapshot, retrieve entries in batches. #3409

Merged
merged 4 commits into from
May 14, 2019
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 41 additions & 23 deletions worker/draft.go
Original file line number Diff line number Diff line change
Expand Up @@ -1225,15 +1225,10 @@ func (n *node) calculateSnapshot(discardN int) (*pb.Snapshot, error) {
}
span.Annotatef(nil, "Found Raft entries: %d", last-first)

entries, err := n.Store.Entries(first, last+1, math.MaxUint64)
if err != nil {
span.Annotatef(nil, "Error: %v", err)
return nil, err
}

if num := posting.Oracle().NumPendingTxns(); num > 0 {
glog.V(2).Infof("Num pending txns: %d", num)
}

// We can't rely upon the Raft entries to determine the minPendingStart,
// because there are many cases during mutations where we don't commit or
// abort the transaction. This might happen due to an early error thrown.
Expand All @@ -1248,37 +1243,60 @@ func (n *node) calculateSnapshot(discardN int) (*pb.Snapshot, error) {
minPendingStart := posting.Oracle().MinPendingStartTs()
maxCommitTs := snap.ReadTs
var snapshotIdx uint64
for _, entry := range entries {
if entry.Type != raftpb.EntryNormal {
continue
}
var proposal pb.Proposal
if err := proposal.Unmarshal(entry.Data); err != nil {

// Trying to retrieve all entries at once might cause out-of-memory issues in
// cases where the raft log is too big to fit into memory. Instead of retrieving
// all entries at once, retrieve it in batches of 64MB.
var lastEntry raftpb.Entry
for batchFirst := first; batchFirst <= last; {
entries, err := n.Store.Entries(batchFirst, last+1, 64<<20)
if err != nil {
span.Annotatef(nil, "Error: %v", err)
return nil, err
}
if proposal.Mutations != nil {
start := proposal.Mutations.StartTs
if start >= minPendingStart && snapshotIdx == 0 {
snapshotIdx = entry.Index - 1
}

// Exit early from the loop if no entries were found.
if len(entries) == 0 {
break
}
if proposal.Delta != nil {
for _, txn := range proposal.Delta.GetTxns() {
maxCommitTs = x.Max(maxCommitTs, txn.CommitTs)

// Store the last entry (as it might be needed outside the loop) and set the
// start of the new batch at the entry following it. Also set foundEntries to
// true to indicate to the code outside the loop that entries were retrieved.
lastEntry = entries[len(entries)-1]
batchFirst = lastEntry.Index + 1

for _, entry := range entries {
if entry.Type != raftpb.EntryNormal {
continue
}
var proposal pb.Proposal
if err := proposal.Unmarshal(entry.Data); err != nil {
span.Annotatef(nil, "Error: %v", err)
return nil, err
}
if proposal.Mutations != nil {
start := proposal.Mutations.StartTs
if start >= minPendingStart && snapshotIdx == 0 {
snapshotIdx = entry.Index - 1
}
}
if proposal.Delta != nil {
for _, txn := range proposal.Delta.GetTxns() {
maxCommitTs = x.Max(maxCommitTs, txn.CommitTs)
}
}
}
}

if maxCommitTs == 0 {
span.Annotate(nil, "maxCommitTs is zero")
return nil, nil
}
if snapshotIdx <= 0 {
// It is possible that there are no pending transactions. In that case,
// snapshotIdx would be zero.
if len(entries) > 0 {
snapshotIdx = entries[len(entries)-1].Index
}
snapshotIdx = lastEntry.Index
span.Annotatef(nil, "snapshotIdx is zero. Using last entry's index: %d", snapshotIdx)
}

Expand Down