Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: app hash error channel #1

Merged
merged 3 commits into from
Jan 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 32 additions & 11 deletions blocksync/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package blocksync
import (
"fmt"
"reflect"
"strings"
"time"

"github.com/cometbft/cometbft/libs/log"
Expand Down Expand Up @@ -56,8 +57,9 @@ type Reactor struct {
pool *BlockPool
blockSync bool

requestsCh <-chan BlockRequest
errorsCh <-chan peerError
requestsCh <-chan BlockRequest
errorsCh <-chan peerError
appHashErrorsCh chan p2p.AppHashError
}

// NewReactor returns new reactor instance.
Expand All @@ -71,8 +73,9 @@ func NewReactor(state sm.State, blockExec *sm.BlockExecutor, store *store.BlockS

requestsCh := make(chan BlockRequest, maxTotalRequesters)

const capacity = 1000 // must be bigger than peers count
errorsCh := make(chan peerError, capacity) // so we don't block in #Receive#pool.AddBlock
const capacity = 1000 // must be bigger than peers count
errorsCh := make(chan peerError, capacity) // so we don't block in #Receive#pool.AddBlock
appHashErrorsCh := make(chan p2p.AppHashError) // create an unbuffered channel to stream appHash errors

startHeight := store.Height() + 1
if startHeight == 1 {
Expand All @@ -81,13 +84,14 @@ func NewReactor(state sm.State, blockExec *sm.BlockExecutor, store *store.BlockS
pool := NewBlockPool(startHeight, requestsCh, errorsCh)

bcR := &Reactor{
initialState: state,
blockExec: blockExec,
store: store,
pool: pool,
blockSync: blockSync,
requestsCh: requestsCh,
errorsCh: errorsCh,
initialState: state,
blockExec: blockExec,
store: store,
pool: pool,
blockSync: blockSync,
requestsCh: requestsCh,
errorsCh: errorsCh,
appHashErrorsCh: appHashErrorsCh,
}
bcR.BaseReactor = *p2p.NewBaseReactor("Reactor", bcR)
return bcR
Expand Down Expand Up @@ -361,6 +365,19 @@ FOR_LOOP:
}

if err != nil {
// If this is an appHash or lastResultsHash error, also pass to the appHashError channel.
if strings.Contains(err.Error(), "wrong Block.Header.AppHash") {
bcR.BaseReactor.AppHashErrorChanBR <- p2p.AppHashError{
Err: err,
Height: uint64(first.Height),
}
} else if strings.Contains(err.Error(), "wrong Block.Header.LastResultsHash") {
bcR.BaseReactor.AppHashErrorChanBR <- p2p.AppHashError{
Err: err,
Height: uint64(first.Height - 1),
Comment on lines +369 to +377
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I used height-1 for lastResultHash and height for AppHash. I think this is correct, but highlighting that I would like this double checked

}
}

bcR.Logger.Error("Error in validation", "err", err)
peerID := bcR.pool.RedoRequest(first.Height)
peer := bcR.Switch.Peers().Get(peerID)
Expand Down Expand Up @@ -415,3 +432,7 @@ func (bcR *Reactor) BroadcastStatusRequest() {
Message: &bcproto.StatusRequest{},
})
}

func (bcR *Reactor) AppHashErrorsCh() chan p2p.AppHashError {
return bcR.appHashErrorsCh
}
2 changes: 2 additions & 0 deletions consensus/byzantine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -604,3 +604,5 @@ func (br *ByzantineReactor) ReceiveEnvelope(e p2p.Envelope) {
}

func (br *ByzantineReactor) InitPeer(peer p2p.Peer) p2p.Peer { return peer }

func (br *ByzantineReactor) AppHashErrorsCh() chan p2p.AppHashError { return nil }
11 changes: 8 additions & 3 deletions mempool/v1/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,10 @@ import (
// peers you received it from.
type Reactor struct {
p2p.BaseReactor
config *cfg.MempoolConfig
mempool *TxMempool
ids *mempoolIDs
config *cfg.MempoolConfig
mempool *TxMempool
ids *mempoolIDs
appHashErrorsCh chan p2p.AppHashError
}

type mempoolIDs struct {
Expand Down Expand Up @@ -280,3 +281,7 @@ type TxsMessage struct {
func (m *TxsMessage) String() string {
return fmt.Sprintf("[TxsMessage %v]", m.Txs)
}

func (memR *Reactor) AppHashErrorsCh() chan p2p.AppHashError {
return memR.appHashErrorsCh
}
5 changes: 5 additions & 0 deletions node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -1255,6 +1255,11 @@ func (n *Node) ConsensusReactor() *cs.Reactor {
return n.consensusReactor
}

// BCReactor returns the Node's BlockchainReactor.
func (n *Node) BCReactor() p2p.Reactor {
return n.bcReactor
}

// MempoolReactor returns the Node's mempool reactor.
func (n *Node) MempoolReactor() p2p.Reactor {
return n.mempoolReactor
Expand Down
22 changes: 20 additions & 2 deletions p2p/base_reactor.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,21 @@
package p2p

import (
"fmt"

"github.com/cometbft/cometbft/libs/service"
"github.com/cometbft/cometbft/p2p/conn"
)

type AppHashError struct {
Err error
Height uint64
}

func (e AppHashError) Error() string {
return fmt.Sprintf("app hash error at height %v: %s", e.Height, e.Err.Error())
}

// Reactor is responsible for handling incoming messages on one or more
// Channel. Switch calls GetChannels when reactor is added to it. When a new
// peer joins our node, InitPeer and AddPeer are called. RemovePeer is called
Expand Down Expand Up @@ -41,19 +52,25 @@ type Reactor interface {
// ReceiveEnvelope is called by the switch when an envelope is received from any connected
// peer on any of the channels registered by the reactor.
ReceiveEnvelope(Envelope)

// AppHashErrorsCh is used to stream hash errors to the sdk, which is then used
// to provide further debugging information in logs to the user.
AppHashErrorsCh() chan AppHashError
}

//--------------------------------------

type BaseReactor struct {
service.BaseService // Provides Start, Stop, .Quit
Switch *Switch
AppHashErrorChanBR chan AppHashError
}

func NewBaseReactor(name string, impl Reactor) *BaseReactor {
return &BaseReactor{
BaseService: *service.NewBaseService(nil, name, impl),
Switch: nil,
BaseService: *service.NewBaseService(nil, name, impl),
Switch: nil,
AppHashErrorChanBR: impl.AppHashErrorsCh(),
}
}

Expand All @@ -66,3 +83,4 @@ func (*BaseReactor) AddPeer(peer Peer) {}
func (*BaseReactor) RemovePeer(peer Peer, reason interface{}) {}
func (*BaseReactor) ReceiveEnvelope(e Envelope) {}
func (*BaseReactor) InitPeer(peer Peer) Peer { return peer }
func (*BaseReactor) AppHashErrorsCh() chan AppHashError { return nil }
Loading