Skip to content

Commit

Permalink
rethink votes
Browse files Browse the repository at this point in the history
  • Loading branch information
charithabandi committed Feb 3, 2025
1 parent b21b731 commit 06294ea
Show file tree
Hide file tree
Showing 11 changed files with 259 additions and 223 deletions.
22 changes: 16 additions & 6 deletions node/consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"context"
"encoding"
"encoding/binary"
"encoding/hex"
"errors"
"fmt"
"io"
Expand All @@ -22,8 +23,8 @@ import (
type (
ConsensusReset = types.ConsensusReset
AckRes = types.AckRes
DiscReq = types.DiscoveryRequest
DiscRes = types.DiscoveryResponse
// DiscReq = types.DiscoveryRequest
// DiscRes = types.DiscoveryResponse
)

type blockProp struct {
Expand Down Expand Up @@ -270,10 +271,6 @@ func (n *Node) blkPropStreamHandler(s network.Stream) {
// After then consensus engine executes the block, this is used to gossip the
// result back to the leader.
func (n *Node) sendACK(msg *AckRes) error {
if msg == nil {
return errors.New("nil ACK response")
}
// n.log.Debugln("sending ACK", height, ack, blkID, appHash)
n.ackChan <- *msg
return nil // actually gossip the nack
}
Expand Down Expand Up @@ -353,13 +350,25 @@ func (n *Node) startAckGossip(ctx context.Context, ps *pubsub.PubSub) error {
continue
}
pubkeyBytes := peerPubKey.Bytes() // does not error for secp256k1 or ed25519

if ack.Signature == nil {
n.log.Warnf("received ACK with nil signature from %s", fromPeerID)
continue
}

if !bytes.Equal(ack.Signature.PubKey, pubkeyBytes) {
n.log.Warnf("invalid ack msg source: sender mismatch %s, expected: %s", hex.EncodeToString(pubkeyBytes), hex.EncodeToString(ack.Signature.PubKey))
continue
}

go n.ce.NotifyACK(pubkeyBytes, ack)
}
}()

return nil
}

/*
func (n *Node) sendDiscoveryRequest() {
n.log.Debug("sending Discovery request")
n.discReq <- types.DiscoveryRequest{}
Expand Down Expand Up @@ -512,6 +521,7 @@ func (n *Node) startDiscoveryResponseGossip(ctx context.Context, ps *pubsub.PubS
return nil
}
*/

func (n *Node) sendReset(height int64, txIDs []ktypes.Hash) error {
n.resetMsg <- types.ConsensusReset{
Expand Down
6 changes: 3 additions & 3 deletions node/consensus/blocksync.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ func (ce *ConsensusEngine) syncBlocksUntilHeight(ctx context.Context, startHeigh
func (ce *ConsensusEngine) syncBlockWithRetry(ctx context.Context, height int64) error {
_, rawblk, ci, err := ce.getBlock(ctx, height)
if err != nil {
return err
return fmt.Errorf("failed to get block from the network: %w", err)
}

return ce.applyBlock(ctx, rawblk, ci)
Expand All @@ -136,7 +136,7 @@ func (ce *ConsensusEngine) syncBlockWithRetry(ctx context.Context, height int64)
func (ce *ConsensusEngine) syncBlock(ctx context.Context, height int64) error {
_, rawblk, ci, err := ce.blkRequester(ctx, height)
if err != nil {
return err
return fmt.Errorf("failed to get block from the network: %w", err)
}

return ce.applyBlock(ctx, rawblk, ci)
Expand All @@ -152,7 +152,7 @@ func (ce *ConsensusEngine) applyBlock(ctx context.Context, rawBlk []byte, ci *ty
}

if err := ce.processAndCommit(ctx, blk, ci); err != nil {
return err
return fmt.Errorf("failed to apply block: %w", err)
}

return nil
Expand Down
34 changes: 17 additions & 17 deletions node/consensus/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,13 +108,13 @@ type ConsensusEngine struct {
mempoolMtx sync.Mutex

// Broadcasters
proposalBroadcaster ProposalBroadcaster
blkAnnouncer BlkAnnouncer
ackBroadcaster AckBroadcaster
blkRequester BlkRequester
rstStateBroadcaster ResetStateBroadcaster
discoveryReqBroadcaster DiscoveryReqBroadcaster
txAnnouncer TxAnnouncer
proposalBroadcaster ProposalBroadcaster
blkAnnouncer BlkAnnouncer
ackBroadcaster AckBroadcaster
blkRequester BlkRequester
rstStateBroadcaster ResetStateBroadcaster
// discoveryReqBroadcaster DiscoveryReqBroadcaster
txAnnouncer TxAnnouncer

// TxSubscriber
subMtx sync.Mutex // protects access to txSubscribers
Expand Down Expand Up @@ -163,14 +163,14 @@ type Config struct {
}

type BroadcastFns struct {
ProposalBroadcaster ProposalBroadcaster
TxAnnouncer TxAnnouncer
BlkAnnouncer BlkAnnouncer
AckBroadcaster AckBroadcaster
BlkRequester BlkRequester
RstStateBroadcaster ResetStateBroadcaster
DiscoveryReqBroadcaster DiscoveryReqBroadcaster
TxBroadcaster blockprocessor.BroadcastTxFn
ProposalBroadcaster ProposalBroadcaster
TxAnnouncer TxAnnouncer
BlkAnnouncer BlkAnnouncer
AckBroadcaster AckBroadcaster
BlkRequester BlkRequester
RstStateBroadcaster ResetStateBroadcaster
// DiscoveryReqBroadcaster DiscoveryReqBroadcaster
TxBroadcaster blockprocessor.BroadcastTxFn
}

type WhitelistFns struct {
Expand Down Expand Up @@ -336,7 +336,7 @@ func (ce *ConsensusEngine) Start(ctx context.Context, fns BroadcastFns, peerFns
ce.ackBroadcaster = fns.AckBroadcaster
ce.blkRequester = fns.BlkRequester
ce.rstStateBroadcaster = fns.RstStateBroadcaster
ce.discoveryReqBroadcaster = fns.DiscoveryReqBroadcaster
// ce.discoveryReqBroadcaster = fns.DiscoveryReqBroadcaster
ce.txAnnouncer = fns.TxAnnouncer

ce.blockProcessor.SetCallbackFns(fns.TxBroadcaster, peerFns.AddPeer, peerFns.RemovePeer)
Expand Down Expand Up @@ -739,7 +739,7 @@ func (ce *ConsensusEngine) reannounceMsgs(ctx context.Context) {
!ce.state.blockRes.appHash.IsZero() {
ce.log.Info("Reannouncing ACK", "ack", ce.state.blockRes.ack, "height", ce.state.blkProp.height, "hash", ce.state.blkProp.blkHash)
vote := ce.state.blockRes.vote
go ce.ackBroadcaster(vote.ToAckRes())
go ce.ackBroadcaster(vote.msg)
}
}
}
Expand Down
48 changes: 25 additions & 23 deletions node/consensus/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,14 @@ import (

var (
broadcastFns = BroadcastFns{
ProposalBroadcaster: mockBlockPropBroadcaster,
TxAnnouncer: mockTxAnnouncer,
BlkAnnouncer: mockBlkAnnouncer,
BlkRequester: mockBlkRequester,
AckBroadcaster: mockVoteBroadcaster,
RstStateBroadcaster: mockResetStateBroadcaster,
DiscoveryReqBroadcaster: mockDiscoveryBroadcaster,
TxBroadcaster: nil,
ProposalBroadcaster: mockBlockPropBroadcaster,
TxAnnouncer: mockTxAnnouncer,
BlkAnnouncer: mockBlkAnnouncer,
BlkRequester: mockBlkRequester,
AckBroadcaster: mockVoteBroadcaster,
RstStateBroadcaster: mockResetStateBroadcaster,
// DiscoveryReqBroadcaster: mockDiscoveryBroadcaster,
TxBroadcaster: nil,
}

peerFns = WhitelistFns{
Expand Down Expand Up @@ -237,15 +237,15 @@ func addVotes(t *testing.T, blkHash, appHash ktypes.Hash, n1, n2 *ConsensusEngin

ci.Votes = append(ci.Votes, &types.VoteInfo{
Signature: *sig1,
AckStatus: types.AckStatusAgree,
AckStatus: types.Agreed,
})

sig2, err := types.SignVote(blkHash, true, &appHash, n2.privKey)
require.NoError(t, err)

ci.Votes = append(ci.Votes, &types.VoteInfo{
Signature: *sig2,
AckStatus: types.AckStatusAgree,
AckStatus: types.Agreed,
})

return ci
Expand Down Expand Up @@ -816,11 +816,13 @@ func TestCELeaderTwoNodesMajorityAcks(t *testing.T) {
assert.NoError(t, err)

vote := &vote{
height: 1,
ack: true,
blkHash: blProp.blkHash,
appHash: &blockAppHash,
signature: sig,
msg: &types.AckRes{
Height: 1,
ACK: true,
BlkHash: blProp.blkHash,
AppHash: &blockAppHash,
Signature: sig,
},
}

// Invalid sender
Expand Down Expand Up @@ -882,11 +884,13 @@ func TestCELeaderTwoNodesMajorityNacks(t *testing.T) {

// node2 should send a vote to node1
vote := &vote{
height: 1,
ack: true,
blkHash: b.blkHash,
appHash: &nextAppHash,
signature: sig1,
msg: &types.AckRes{
Height: 1,
ACK: true,
BlkHash: b.blkHash,
AppHash: &nextAppHash,
Signature: sig1,
},
}

// Invalid sender -> vote ignored
Expand All @@ -897,7 +901,7 @@ func TestCELeaderTwoNodesMajorityNacks(t *testing.T) {
err = n1.addVote(ctx, vote, hex.EncodeToString(ceConfigs[1].PrivateKey.Public().Bytes()))
assert.NoError(t, err)

vote.signature = sig2
vote.msg.Signature = sig2
err = n1.addVote(ctx, vote, hex.EncodeToString(ceConfigs[2].PrivateKey.Public().Bytes()))
assert.NoError(t, err)

Expand Down Expand Up @@ -925,8 +929,6 @@ func mockResetStateBroadcaster(_ int64, _ []ktypes.Hash) error {
return nil
}

func mockDiscoveryBroadcaster() {}

func nextAppHash(prevHash types.Hash) types.Hash {
hasher := sha256.New()
txHash := types.Hash(hasher.Sum(nil))
Expand Down
27 changes: 14 additions & 13 deletions node/consensus/follower.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,14 @@ import (
)

// AcceptProposal determines if the node should download the block for the given proposal.
// This function should not be executed by the leader or sentry nodes.
// This should not be processed by the leader and the sentry nodes and must return false.
// Validators should only accept the proposal if they are not currently processing
// another block and the proposal is for the next block to be processed.
// If a new proposal for the same height is received, the current proposal execution
// should be aborted and the new proposal should be processed.
// If the leader proposes a new block for already committed heights, the validator should
// send a Nack to the leader with an OutOfSyncProof, indicating that the leader should
// sync to the correct height before proposing new blocks.
// send a Nack to the leader with an OutOfSyncProof, indicating the leader to
// catchup to the correct height before proposing new blocks.
func (ce *ConsensusEngine) AcceptProposal(height int64, blkID, prevBlockID types.Hash, leaderSig []byte, timestamp int64) bool {
if ce.role.Load() != types.RoleValidator {
return false
Expand Down Expand Up @@ -228,10 +228,9 @@ func (ce *ConsensusEngine) processBlockProposal(ctx context.Context, blkPropMsg
ce.log.Info("Processing block proposal", "height", blkPropMsg.blk.Header.Height, "header", blkPropMsg.blk.Header)

if err := ce.validateBlock(blkPropMsg.blk); err != nil {
ce.log.Error("Error validating block, sending NACK", "error", err)
sig, err := types.SignVote(blkPropMsg.blkHash, false, nil, ce.privKey)
if err != nil {
ce.log.Error("Error signing the voteInfo", "error", err)
return fmt.Errorf("error signing the voteInfo: %w", err)
}
// go ce.ackBroadcaster(false, blkPropMsg.height, blkPropMsg.blkHash, nil, nil)
status := types.NackStatusInvalidBlock
Expand All @@ -243,7 +242,7 @@ func (ce *ConsensusEngine) processBlockProposal(ctx context.Context, blkPropMsg
Signature: sig,
})

return err
return fmt.Errorf("error validating block: %w", err)
}
ce.state.blkProp = blkPropMsg
ce.state.blockRes = nil
Expand Down Expand Up @@ -282,15 +281,17 @@ func (ce *ConsensusEngine) processBlockProposal(ctx context.Context, blkPropMsg
return err
}
voteInfo := &vote{
ack: true,
blkHash: blkPropMsg.blkHash,
height: blkPropMsg.height,
appHash: &ce.state.blockRes.appHash,
signature: signature,
msg: &types.AckRes{
ACK: true,
BlkHash: blkPropMsg.blkHash,
Height: blkPropMsg.height,
AppHash: &ce.state.blockRes.appHash,
Signature: signature,
},
}
ce.state.blockRes.vote = voteInfo

go ce.ackBroadcaster(voteInfo.ToAckRes())
go ce.ackBroadcaster(voteInfo.msg)

return nil
}
Expand Down Expand Up @@ -435,7 +436,7 @@ func (ce *ConsensusEngine) acceptCommitInfo(ci *types.CommitInfo, blkID ktypes.H
return fmt.Errorf("error verifying vote: %w", err)
}

if vote.AckStatus == types.AckStatusAgree {
if vote.AckStatus == types.Agreed {
acks++
}
}
Expand Down
Loading

0 comments on commit 06294ea

Please sign in to comment.