From c0d08de96c15f1eb6e7e4bebef95448cca6ada21 Mon Sep 17 00:00:00 2001 From: Adam Tucker Date: Tue, 2 Jan 2024 15:27:46 -0700 Subject: [PATCH 1/3] app hash error channel --- blocksync/reactor.go | 38 ++++++++++++++++++++++++++++--------- consensus/byzantine_test.go | 2 ++ mempool/v1/reactor.go | 11 ++++++++--- p2p/base_reactor.go | 14 ++++++++++++++ 4 files changed, 53 insertions(+), 12 deletions(-) diff --git a/blocksync/reactor.go b/blocksync/reactor.go index cd5a811688e..004add37e73 100644 --- a/blocksync/reactor.go +++ b/blocksync/reactor.go @@ -3,6 +3,7 @@ package blocksync import ( "fmt" "reflect" + "strings" "time" "github.com/cometbft/cometbft/libs/log" @@ -56,8 +57,9 @@ type Reactor struct { pool *BlockPool blockSync bool - requestsCh <-chan BlockRequest - errorsCh <-chan peerError + requestsCh <-chan BlockRequest + errorsCh <-chan peerError + appHashErrorsCh chan p2p.AppHashError } // NewReactor returns new reactor instance. @@ -73,6 +75,7 @@ func NewReactor(state sm.State, blockExec *sm.BlockExecutor, store *store.BlockS const capacity = 1000 // must be bigger than peers count errorsCh := make(chan peerError, capacity) // so we don't block in #Receive#pool.AddBlock + appHashErrorsCh := make(chan p2p.AppHashError, 5) startHeight := store.Height() + 1 if startHeight == 1 { @@ -81,13 +84,14 @@ func NewReactor(state sm.State, blockExec *sm.BlockExecutor, store *store.BlockS pool := NewBlockPool(startHeight, requestsCh, errorsCh) bcR := &Reactor{ - initialState: state, - blockExec: blockExec, - store: store, - pool: pool, - blockSync: blockSync, - requestsCh: requestsCh, - errorsCh: errorsCh, + initialState: state, + blockExec: blockExec, + store: store, + pool: pool, + blockSync: blockSync, + requestsCh: requestsCh, + errorsCh: errorsCh, + appHashErrorsCh: appHashErrorsCh, } bcR.BaseReactor = *p2p.NewBaseReactor("Reactor", bcR) return bcR @@ -360,6 +364,18 @@ FOR_LOOP: err = bcR.blockExec.ValidateBlock(state, first) } + if strings.Contains(err.Error(), "wrong Block.Header.AppHash") { + bcR.appHashErrorsCh <- p2p.AppHashError{ + Err: err, + Height: uint64(first.Height), + } + } else if strings.Contains(err.Error(), "wrong Block.Header.LastResultsHash") { + bcR.appHashErrorsCh <- p2p.AppHashError{ + Err: err, + Height: uint64(first.Height - 1), + } + } + if err != nil { bcR.Logger.Error("Error in validation", "err", err) peerID := bcR.pool.RedoRequest(first.Height) @@ -415,3 +431,7 @@ func (bcR *Reactor) BroadcastStatusRequest() { Message: &bcproto.StatusRequest{}, }) } + +func (r *Reactor) AppHashErrorsCh() <-chan p2p.AppHashError { + return r.appHashErrorsCh +} diff --git a/consensus/byzantine_test.go b/consensus/byzantine_test.go index 861f5e4277f..5db3f42d78f 100644 --- a/consensus/byzantine_test.go +++ b/consensus/byzantine_test.go @@ -604,3 +604,5 @@ func (br *ByzantineReactor) ReceiveEnvelope(e p2p.Envelope) { } func (br *ByzantineReactor) InitPeer(peer p2p.Peer) p2p.Peer { return peer } + +func (br *ByzantineReactor) AppHashErrorsCh() <-chan p2p.AppHashError { return nil } diff --git a/mempool/v1/reactor.go b/mempool/v1/reactor.go index 3a137a78668..3a061ab4956 100644 --- a/mempool/v1/reactor.go +++ b/mempool/v1/reactor.go @@ -21,9 +21,10 @@ import ( // peers you received it from. type Reactor struct { p2p.BaseReactor - config *cfg.MempoolConfig - mempool *TxMempool - ids *mempoolIDs + config *cfg.MempoolConfig + mempool *TxMempool + ids *mempoolIDs + appHashErrorsCh chan p2p.AppHashError } type mempoolIDs struct { @@ -280,3 +281,7 @@ type TxsMessage struct { func (m *TxsMessage) String() string { return fmt.Sprintf("[TxsMessage %v]", m.Txs) } + +func (memR *Reactor) AppHashErrorsCh() <-chan p2p.AppHashError { + return memR.appHashErrorsCh +} diff --git a/p2p/base_reactor.go b/p2p/base_reactor.go index fe56283728e..8ce393eebc1 100644 --- a/p2p/base_reactor.go +++ b/p2p/base_reactor.go @@ -1,10 +1,21 @@ package p2p import ( + "fmt" + "github.com/cometbft/cometbft/libs/service" "github.com/cometbft/cometbft/p2p/conn" ) +type AppHashError struct { + Err error + Height uint64 +} + +func (e AppHashError) Error() string { + return fmt.Sprintf("app hash error at height %v: %s", e.Height, e.Err.Error()) +} + // Reactor is responsible for handling incoming messages on one or more // Channel. Switch calls GetChannels when reactor is added to it. When a new // peer joins our node, InitPeer and AddPeer are called. RemovePeer is called @@ -41,6 +52,8 @@ type Reactor interface { // ReceiveEnvelope is called by the switch when an envelope is received from any connected // peer on any of the channels registered by the reactor. ReceiveEnvelope(Envelope) + + AppHashErrorsCh() <-chan AppHashError } //-------------------------------------- @@ -66,3 +79,4 @@ func (*BaseReactor) AddPeer(peer Peer) {} func (*BaseReactor) RemovePeer(peer Peer, reason interface{}) {} func (*BaseReactor) ReceiveEnvelope(e Envelope) {} func (*BaseReactor) InitPeer(peer Peer) Peer { return peer } +func (*BaseReactor) AppHashErrorsCh() <-chan AppHashError { return nil } From 7c374f174d56e7f805dccedf2820f34d5348b7ec Mon Sep 17 00:00:00 2001 From: Adam Tucker Date: Tue, 2 Jan 2024 22:50:11 -0700 Subject: [PATCH 2/3] implement app hash error channel --- blocksync/reactor.go | 31 ++++++++++++++++--------------- consensus/byzantine_test.go | 2 +- mempool/v1/reactor.go | 2 +- node/node.go | 5 +++++ p2p/base_reactor.go | 12 ++++++++---- 5 files changed, 31 insertions(+), 21 deletions(-) diff --git a/blocksync/reactor.go b/blocksync/reactor.go index 004add37e73..bae362b5c9a 100644 --- a/blocksync/reactor.go +++ b/blocksync/reactor.go @@ -73,9 +73,9 @@ func NewReactor(state sm.State, blockExec *sm.BlockExecutor, store *store.BlockS requestsCh := make(chan BlockRequest, maxTotalRequesters) - const capacity = 1000 // must be bigger than peers count - errorsCh := make(chan peerError, capacity) // so we don't block in #Receive#pool.AddBlock - appHashErrorsCh := make(chan p2p.AppHashError, 5) + const capacity = 1000 // must be bigger than peers count + errorsCh := make(chan peerError, capacity) // so we don't block in #Receive#pool.AddBlock + appHashErrorsCh := make(chan p2p.AppHashError) // create an unbuffered channel to stream appHash errors startHeight := store.Height() + 1 if startHeight == 1 { @@ -364,19 +364,20 @@ FOR_LOOP: err = bcR.blockExec.ValidateBlock(state, first) } - if strings.Contains(err.Error(), "wrong Block.Header.AppHash") { - bcR.appHashErrorsCh <- p2p.AppHashError{ - Err: err, - Height: uint64(first.Height), - } - } else if strings.Contains(err.Error(), "wrong Block.Header.LastResultsHash") { - bcR.appHashErrorsCh <- p2p.AppHashError{ - Err: err, - Height: uint64(first.Height - 1), + if err != nil { + // If this is an appHash or lastResultsHash error, also pass to the appHashError channel. + if strings.Contains(err.Error(), "wrong Block.Header.AppHash") { + bcR.BaseReactor.AppHashErrorChanBR <- p2p.AppHashError{ + Err: err, + Height: uint64(first.Height), + } + } else if strings.Contains(err.Error(), "wrong Block.Header.LastResultsHash") { + bcR.BaseReactor.AppHashErrorChanBR <- p2p.AppHashError{ + Err: err, + Height: uint64(first.Height - 1), + } } - } - if err != nil { bcR.Logger.Error("Error in validation", "err", err) peerID := bcR.pool.RedoRequest(first.Height) peer := bcR.Switch.Peers().Get(peerID) @@ -432,6 +433,6 @@ func (bcR *Reactor) BroadcastStatusRequest() { }) } -func (r *Reactor) AppHashErrorsCh() <-chan p2p.AppHashError { +func (r *Reactor) AppHashErrorsCh() chan p2p.AppHashError { return r.appHashErrorsCh } diff --git a/consensus/byzantine_test.go b/consensus/byzantine_test.go index 5db3f42d78f..cc1fa39dd4a 100644 --- a/consensus/byzantine_test.go +++ b/consensus/byzantine_test.go @@ -605,4 +605,4 @@ func (br *ByzantineReactor) ReceiveEnvelope(e p2p.Envelope) { func (br *ByzantineReactor) InitPeer(peer p2p.Peer) p2p.Peer { return peer } -func (br *ByzantineReactor) AppHashErrorsCh() <-chan p2p.AppHashError { return nil } +func (br *ByzantineReactor) AppHashErrorsCh() chan p2p.AppHashError { return nil } diff --git a/mempool/v1/reactor.go b/mempool/v1/reactor.go index 3a061ab4956..3918d1bb797 100644 --- a/mempool/v1/reactor.go +++ b/mempool/v1/reactor.go @@ -282,6 +282,6 @@ func (m *TxsMessage) String() string { return fmt.Sprintf("[TxsMessage %v]", m.Txs) } -func (memR *Reactor) AppHashErrorsCh() <-chan p2p.AppHashError { +func (memR *Reactor) AppHashErrorsCh() chan p2p.AppHashError { return memR.appHashErrorsCh } diff --git a/node/node.go b/node/node.go index 4e5238d1dad..ac3951b980f 100644 --- a/node/node.go +++ b/node/node.go @@ -1255,6 +1255,11 @@ func (n *Node) ConsensusReactor() *cs.Reactor { return n.consensusReactor } +// BCReactor returns the Node's BlockchainReactor. +func (n *Node) BCReactor() p2p.Reactor { + return n.bcReactor +} + // MempoolReactor returns the Node's mempool reactor. func (n *Node) MempoolReactor() p2p.Reactor { return n.mempoolReactor diff --git a/p2p/base_reactor.go b/p2p/base_reactor.go index 8ce393eebc1..d9e2f7b3396 100644 --- a/p2p/base_reactor.go +++ b/p2p/base_reactor.go @@ -53,7 +53,9 @@ type Reactor interface { // peer on any of the channels registered by the reactor. ReceiveEnvelope(Envelope) - AppHashErrorsCh() <-chan AppHashError + // AppHashErrorsCh is used to stream hash errors to the sdk, which is then used + // to provide further debugging information in logs to the user. + AppHashErrorsCh() chan AppHashError } //-------------------------------------- @@ -61,12 +63,14 @@ type Reactor interface { type BaseReactor struct { service.BaseService // Provides Start, Stop, .Quit Switch *Switch + AppHashErrorChanBR chan AppHashError } func NewBaseReactor(name string, impl Reactor) *BaseReactor { return &BaseReactor{ - BaseService: *service.NewBaseService(nil, name, impl), - Switch: nil, + BaseService: *service.NewBaseService(nil, name, impl), + Switch: nil, + AppHashErrorChanBR: impl.AppHashErrorsCh(), } } @@ -79,4 +83,4 @@ func (*BaseReactor) AddPeer(peer Peer) {} func (*BaseReactor) RemovePeer(peer Peer, reason interface{}) {} func (*BaseReactor) ReceiveEnvelope(e Envelope) {} func (*BaseReactor) InitPeer(peer Peer) Peer { return peer } -func (*BaseReactor) AppHashErrorsCh() <-chan AppHashError { return nil } +func (*BaseReactor) AppHashErrorsCh() chan AppHashError { return nil } From 28da358e3146b77e85df598e46fe53f78e9be855 Mon Sep 17 00:00:00 2001 From: Adam Tucker Date: Tue, 2 Jan 2024 22:58:22 -0700 Subject: [PATCH 3/3] lint --- blocksync/reactor.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/blocksync/reactor.go b/blocksync/reactor.go index bae362b5c9a..86f36f3e16d 100644 --- a/blocksync/reactor.go +++ b/blocksync/reactor.go @@ -433,6 +433,6 @@ func (bcR *Reactor) BroadcastStatusRequest() { }) } -func (r *Reactor) AppHashErrorsCh() chan p2p.AppHashError { - return r.appHashErrorsCh +func (bcR *Reactor) AppHashErrorsCh() chan p2p.AppHashError { + return bcR.appHashErrorsCh }