Skip to content

Commit

Permalink
Move vote bubbling before poll termination (#2100)
Browse files Browse the repository at this point in the history
  • Loading branch information
StephenButtolph authored Sep 28, 2023
1 parent e4b7e82 commit 943f7d1
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 94 deletions.
8 changes: 4 additions & 4 deletions snow/engine/snowman/transitive.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,10 +274,10 @@ func (t *Transitive) Chits(ctx context.Context, nodeID ids.NodeID, requestID uin

// Will record chits once [blkID] has been issued into consensus
v := &voter{
t: t,
vdr: nodeID,
requestID: requestID,
response: blkID,
t: t,
vdr: nodeID,
requestID: requestID,
responseOptions: []ids.ID{blkID},
}

// Wait until [blkID] has been issued to consensus before applying this chit.
Expand Down
142 changes: 52 additions & 90 deletions snow/engine/snowman/voter.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@ import (

// Voter records chits received from [vdr] once its dependencies are met.
type voter struct {
t *Transitive
vdr ids.NodeID
requestID uint32
response ids.ID
deps set.Set[ids.ID]
t *Transitive
vdr ids.NodeID
requestID uint32
responseOptions []ids.ID
deps set.Set[ids.ID]
}

func (v *voter) Dependencies() set.Set[ids.ID] {
Expand All @@ -42,11 +42,24 @@ func (v *voter) Update(ctx context.Context) {
return
}

var (
vote ids.ID
shouldVote bool
)
for _, voteOption := range v.responseOptions {
// To prevent any potential deadlocks with undisclosed dependencies,
// votes must be bubbled to the nearest valid block
vote, shouldVote = v.getProcessingAncestor(ctx, voteOption)
if shouldVote {
break
}
}

var results []bag.Bag[ids.ID]
if v.response == ids.Empty {
results = v.t.polls.Drop(v.requestID, v.vdr)
if shouldVote {
results = v.t.polls.Vote(v.requestID, v.vdr, vote)
} else {
results = v.t.polls.Vote(v.requestID, v.vdr, v.response)
results = v.t.polls.Drop(v.requestID, v.vdr)
}

if len(results) == 0 {
Expand All @@ -55,13 +68,6 @@ func (v *voter) Update(ctx context.Context) {

for _, result := range results {
result := result
v.t.Ctx.Log.Debug("filtering poll results",
zap.Stringer("result", &result),
)

// To prevent any potential deadlocks with un-disclosed dependencies,
// votes must be bubbled to the nearest valid block
result = v.bubbleVotes(ctx, result)
v.t.Ctx.Log.Debug("finishing poll",
zap.Stringer("result", &result),
)
Expand All @@ -88,92 +94,48 @@ func (v *voter) Update(ctx context.Context) {
v.t.repoll(ctx)
}

// bubbleVotes bubbles the [votes] a set of the number of votes for specific
// blkIDs that received votes in consensus, to their most recent ancestor that
// has been issued to consensus.
// getProcessingAncestor finds [initialVote]'s most recent ancestor that is
// processing in consensus. If no ancestor could be found, false is returned.
//
// Note: bubbleVotes does not bubbleVotes to all of the ancestors in consensus,
// just the most recent one. bubbling to the rest of the ancestors, which may
// also be in consensus is handled in RecordPoll.
func (v *voter) bubbleVotes(ctx context.Context, votes bag.Bag[ids.ID]) bag.Bag[ids.ID] {
bubbledVotes := bag.Bag[ids.ID]{}

votesLoop:
for _, vote := range votes.List() {
count := votes.Count(vote)
// use rootID in case of this is a non-verified block ID
rootID := v.t.nonVerifieds.GetAncestor(vote)
v.t.Ctx.Log.Verbo("bubbling vote(s) through unverified blocks",
zap.Int("numVotes", count),
zap.Stringer("voteID", vote),
zap.Stringer("parentID", rootID),
)

blk, err := v.t.GetBlock(ctx, rootID)
// Note: If [initialVote] is processing, then [initialVote] will be returned.
func (v *voter) getProcessingAncestor(ctx context.Context, initialVote ids.ID) (ids.ID, bool) {
// If [bubbledVote] != [initialVote], it is guaranteed that [bubbledVote] is
// in processing. Otherwise, we attempt to iterate through any blocks we
// have at our disposal as a best-effort mechanism to find a valid ancestor.
bubbledVote := v.t.nonVerifieds.GetAncestor(initialVote)
for {
blk, err := v.t.GetBlock(ctx, bubbledVote)
// If we cannot retrieve the block, drop [vote]
if err != nil {
v.t.Ctx.Log.Debug("dropping vote(s)",
zap.String("reason", "parent couldn't be fetched"),
zap.Stringer("parentID", rootID),
zap.Int("numVotes", count),
zap.Stringer("voteID", vote),
v.t.Ctx.Log.Debug("dropping vote",
zap.String("reason", "ancestor couldn't be fetched"),
zap.Stringer("initialVoteID", initialVote),
zap.Stringer("bubbledVoteID", bubbledVote),
zap.Error(err),
)
continue
return ids.Empty, false
}

status := blk.Status()
blkID := blk.ID()
// If we have not fetched [blkID] break from the loop. We will drop the
// vote below and move on to the next vote.
//
// If [blk] has already been decided, break from the loop, we will drop
// the vote below since there is no need to count the votes for a [blk]
// we've already finalized.
//
// If [blk] is currently in consensus, break from the loop, we have
// reached the first ancestor of the original [vote] that has been
// issued consensus. In this case, the votes will be bubbled further
// from [blk] to any of its ancestors that are also in consensus.
for status.Fetched() && !(v.t.Consensus.Decided(blk) || v.t.Consensus.Processing(blkID)) {
parentID := blk.Parent()
v.t.Ctx.Log.Verbo("pushing vote(s)",
zap.Int("numVotes", count),
zap.Stringer("voteID", vote),
zap.Stringer("parentID", rootID),
if v.t.Consensus.Decided(blk) {
v.t.Ctx.Log.Debug("dropping vote",
zap.String("reason", "bubbled vote already decided"),
zap.Stringer("initialVoteID", initialVote),
zap.Stringer("bubbledVoteID", bubbledVote),
zap.Stringer("status", blk.Status()),
zap.Uint64("height", blk.Height()),
)

blkID = parentID
blk, err = v.t.GetBlock(ctx, blkID)
// If we cannot retrieve the block, drop [vote]
if err != nil {
v.t.Ctx.Log.Debug("dropping vote(s)",
zap.String("reason", "block couldn't be fetched"),
zap.Stringer("blkID", blkID),
zap.Int("numVotes", count),
zap.Stringer("voteID", vote),
zap.Error(err),
)
continue votesLoop
}
status = blk.Status()
return ids.Empty, false
}

// If [blkID] is currently in consensus, count the votes
if v.t.Consensus.Processing(blkID) {
v.t.Ctx.Log.Verbo("applying vote(s)",
zap.Int("numVotes", count),
zap.Stringer("blkID", blkID),
zap.Stringer("status", status),
)
bubbledVotes.AddCount(blkID, count)
} else {
v.t.Ctx.Log.Verbo("dropping vote(s)",
zap.Int("numVotes", count),
zap.Stringer("blkID", blkID),
zap.Stringer("status", status),
if v.t.Consensus.Processing(bubbledVote) {
v.t.Ctx.Log.Verbo("applying vote",
zap.Stringer("initialVoteID", initialVote),
zap.Stringer("bubbledVoteID", bubbledVote),
zap.Uint64("height", blk.Height()),
)
return bubbledVote, true
}

bubbledVote = blk.Parent()
}
return bubbledVotes
}

0 comments on commit 943f7d1

Please sign in to comment.