Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move vote bubbling before poll termination #2100

Merged
merged 6 commits into from
Sep 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions snow/engine/snowman/transitive.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,10 +274,10 @@ func (t *Transitive) Chits(ctx context.Context, nodeID ids.NodeID, requestID uin

// Will record chits once [blkID] has been issued into consensus
v := &voter{
t: t,
vdr: nodeID,
requestID: requestID,
response: blkID,
t: t,
vdr: nodeID,
requestID: requestID,
responseOptions: []ids.ID{blkID},
}

// Wait until [blkID] has been issued to consensus before applying this chit.
Expand Down
142 changes: 52 additions & 90 deletions snow/engine/snowman/voter.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@ import (

// Voter records chits received from [vdr] once its dependencies are met.
type voter struct {
t *Transitive
vdr ids.NodeID
requestID uint32
response ids.ID
deps set.Set[ids.ID]
t *Transitive
vdr ids.NodeID
requestID uint32
responseOptions []ids.ID
deps set.Set[ids.ID]
}

func (v *voter) Dependencies() set.Set[ids.ID] {
Expand All @@ -42,11 +42,24 @@ func (v *voter) Update(ctx context.Context) {
return
}

var (
vote ids.ID
shouldVote bool
)
for _, voteOption := range v.responseOptions {
// To prevent any potential deadlocks with undisclosed dependencies,
// votes must be bubbled to the nearest valid block
vote, shouldVote = v.getProcessingAncestor(ctx, voteOption)
if shouldVote {
break
}
}

var results []bag.Bag[ids.ID]
if v.response == ids.Empty {
results = v.t.polls.Drop(v.requestID, v.vdr)
if shouldVote {
results = v.t.polls.Vote(v.requestID, v.vdr, vote)
} else {
results = v.t.polls.Vote(v.requestID, v.vdr, v.response)
results = v.t.polls.Drop(v.requestID, v.vdr)
}

if len(results) == 0 {
Expand All @@ -55,13 +68,6 @@ func (v *voter) Update(ctx context.Context) {

for _, result := range results {
result := result
danlaine marked this conversation as resolved.
Show resolved Hide resolved
v.t.Ctx.Log.Debug("filtering poll results",
zap.Stringer("result", &result),
)

// To prevent any potential deadlocks with un-disclosed dependencies,
// votes must be bubbled to the nearest valid block
result = v.bubbleVotes(ctx, result)
v.t.Ctx.Log.Debug("finishing poll",
zap.Stringer("result", &result),
)
Expand All @@ -88,92 +94,48 @@ func (v *voter) Update(ctx context.Context) {
v.t.repoll(ctx)
}

// bubbleVotes bubbles the [votes] a set of the number of votes for specific
// blkIDs that received votes in consensus, to their most recent ancestor that
// has been issued to consensus.
// getProcessingAncestor finds [initialVote]'s most recent ancestor that is
// processing in consensus. If no ancestor could be found, false is returned.
//
// Note: bubbleVotes does not bubbleVotes to all of the ancestors in consensus,
// just the most recent one. bubbling to the rest of the ancestors, which may
// also be in consensus is handled in RecordPoll.
func (v *voter) bubbleVotes(ctx context.Context, votes bag.Bag[ids.ID]) bag.Bag[ids.ID] {
bubbledVotes := bag.Bag[ids.ID]{}

votesLoop:
for _, vote := range votes.List() {
count := votes.Count(vote)
// use rootID in case of this is a non-verified block ID
rootID := v.t.nonVerifieds.GetAncestor(vote)
v.t.Ctx.Log.Verbo("bubbling vote(s) through unverified blocks",
zap.Int("numVotes", count),
zap.Stringer("voteID", vote),
zap.Stringer("parentID", rootID),
)

blk, err := v.t.GetBlock(ctx, rootID)
// Note: If [initialVote] is processing, then [initialVote] will be returned.
func (v *voter) getProcessingAncestor(ctx context.Context, initialVote ids.ID) (ids.ID, bool) {
// If [bubbledVote] != [initialVote], it is guaranteed that [bubbledVote] is
// in processing. Otherwise, we attempt to iterate through any blocks we
// have at our disposal as a best-effort mechanism to find a valid ancestor.
bubbledVote := v.t.nonVerifieds.GetAncestor(initialVote)
for {
blk, err := v.t.GetBlock(ctx, bubbledVote)
// If we cannot retrieve the block, drop [vote]
if err != nil {
v.t.Ctx.Log.Debug("dropping vote(s)",
zap.String("reason", "parent couldn't be fetched"),
zap.Stringer("parentID", rootID),
zap.Int("numVotes", count),
zap.Stringer("voteID", vote),
v.t.Ctx.Log.Debug("dropping vote",
zap.String("reason", "ancestor couldn't be fetched"),
zap.Stringer("initialVoteID", initialVote),
zap.Stringer("bubbledVoteID", bubbledVote),
zap.Error(err),
)
continue
return ids.Empty, false
}

status := blk.Status()
blkID := blk.ID()
// If we have not fetched [blkID] break from the loop. We will drop the
// vote below and move on to the next vote.
//
// If [blk] has already been decided, break from the loop, we will drop
// the vote below since there is no need to count the votes for a [blk]
// we've already finalized.
//
// If [blk] is currently in consensus, break from the loop, we have
// reached the first ancestor of the original [vote] that has been
// issued consensus. In this case, the votes will be bubbled further
// from [blk] to any of its ancestors that are also in consensus.
for status.Fetched() && !(v.t.Consensus.Decided(blk) || v.t.Consensus.Processing(blkID)) {
parentID := blk.Parent()
v.t.Ctx.Log.Verbo("pushing vote(s)",
zap.Int("numVotes", count),
zap.Stringer("voteID", vote),
zap.Stringer("parentID", rootID),
if v.t.Consensus.Decided(blk) {
v.t.Ctx.Log.Debug("dropping vote",
zap.String("reason", "bubbled vote already decided"),
zap.Stringer("initialVoteID", initialVote),
zap.Stringer("bubbledVoteID", bubbledVote),
zap.Stringer("status", blk.Status()),
zap.Uint64("height", blk.Height()),
)

blkID = parentID
blk, err = v.t.GetBlock(ctx, blkID)
// If we cannot retrieve the block, drop [vote]
if err != nil {
v.t.Ctx.Log.Debug("dropping vote(s)",
zap.String("reason", "block couldn't be fetched"),
zap.Stringer("blkID", blkID),
zap.Int("numVotes", count),
zap.Stringer("voteID", vote),
zap.Error(err),
)
continue votesLoop
}
status = blk.Status()
return ids.Empty, false
}

// If [blkID] is currently in consensus, count the votes
if v.t.Consensus.Processing(blkID) {
v.t.Ctx.Log.Verbo("applying vote(s)",
zap.Int("numVotes", count),
zap.Stringer("blkID", blkID),
zap.Stringer("status", status),
)
bubbledVotes.AddCount(blkID, count)
} else {
v.t.Ctx.Log.Verbo("dropping vote(s)",
zap.Int("numVotes", count),
zap.Stringer("blkID", blkID),
zap.Stringer("status", status),
if v.t.Consensus.Processing(bubbledVote) {
v.t.Ctx.Log.Verbo("applying vote",
zap.Stringer("initialVoteID", initialVote),
zap.Stringer("bubbledVoteID", bubbledVote),
zap.Uint64("height", blk.Height()),
)
return bubbledVote, true
}

bubbledVote = blk.Parent()
}
return bubbledVotes
}
Loading