Skip to content

Commit

Permalink
Merge pull request #1 from osmosis-labs/adam/v0.37.2-app-hash-check
Browse files Browse the repository at this point in the history
feat: app hash error channel
  • Loading branch information
czarcas7ic authored Jan 6, 2024
2 parents fe45483 + 28da358 commit 28d7d17
Show file tree
Hide file tree
Showing 5 changed files with 67 additions and 16 deletions.
43 changes: 32 additions & 11 deletions blocksync/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package blocksync
import (
"fmt"
"reflect"
"strings"
"time"

"github.com/cometbft/cometbft/libs/log"
Expand Down Expand Up @@ -56,8 +57,9 @@ type Reactor struct {
pool *BlockPool
blockSync bool

requestsCh <-chan BlockRequest
errorsCh <-chan peerError
requestsCh <-chan BlockRequest
errorsCh <-chan peerError
appHashErrorsCh chan p2p.AppHashError
}

// NewReactor returns new reactor instance.
Expand All @@ -71,8 +73,9 @@ func NewReactor(state sm.State, blockExec *sm.BlockExecutor, store *store.BlockS

requestsCh := make(chan BlockRequest, maxTotalRequesters)

const capacity = 1000 // must be bigger than peers count
errorsCh := make(chan peerError, capacity) // so we don't block in #Receive#pool.AddBlock
const capacity = 1000 // must be bigger than peers count
errorsCh := make(chan peerError, capacity) // so we don't block in #Receive#pool.AddBlock
appHashErrorsCh := make(chan p2p.AppHashError) // create an unbuffered channel to stream appHash errors

startHeight := store.Height() + 1
if startHeight == 1 {
Expand All @@ -81,13 +84,14 @@ func NewReactor(state sm.State, blockExec *sm.BlockExecutor, store *store.BlockS
pool := NewBlockPool(startHeight, requestsCh, errorsCh)

bcR := &Reactor{
initialState: state,
blockExec: blockExec,
store: store,
pool: pool,
blockSync: blockSync,
requestsCh: requestsCh,
errorsCh: errorsCh,
initialState: state,
blockExec: blockExec,
store: store,
pool: pool,
blockSync: blockSync,
requestsCh: requestsCh,
errorsCh: errorsCh,
appHashErrorsCh: appHashErrorsCh,
}
bcR.BaseReactor = *p2p.NewBaseReactor("Reactor", bcR)
return bcR
Expand Down Expand Up @@ -361,6 +365,19 @@ FOR_LOOP:
}

if err != nil {
// If this is an appHash or lastResultsHash error, also pass to the appHashError channel.
if strings.Contains(err.Error(), "wrong Block.Header.AppHash") {
bcR.BaseReactor.AppHashErrorChanBR <- p2p.AppHashError{
Err: err,
Height: uint64(first.Height),
}
} else if strings.Contains(err.Error(), "wrong Block.Header.LastResultsHash") {
bcR.BaseReactor.AppHashErrorChanBR <- p2p.AppHashError{
Err: err,
Height: uint64(first.Height - 1),
}
}

bcR.Logger.Error("Error in validation", "err", err)
peerID := bcR.pool.RedoRequest(first.Height)
peer := bcR.Switch.Peers().Get(peerID)
Expand Down Expand Up @@ -415,3 +432,7 @@ func (bcR *Reactor) BroadcastStatusRequest() {
Message: &bcproto.StatusRequest{},
})
}

func (bcR *Reactor) AppHashErrorsCh() chan p2p.AppHashError {
return bcR.appHashErrorsCh
}
2 changes: 2 additions & 0 deletions consensus/byzantine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -604,3 +604,5 @@ func (br *ByzantineReactor) ReceiveEnvelope(e p2p.Envelope) {
}

func (br *ByzantineReactor) InitPeer(peer p2p.Peer) p2p.Peer { return peer }

func (br *ByzantineReactor) AppHashErrorsCh() chan p2p.AppHashError { return nil }
11 changes: 8 additions & 3 deletions mempool/v1/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,10 @@ import (
// peers you received it from.
type Reactor struct {
p2p.BaseReactor
config *cfg.MempoolConfig
mempool *TxMempool
ids *mempoolIDs
config *cfg.MempoolConfig
mempool *TxMempool
ids *mempoolIDs
appHashErrorsCh chan p2p.AppHashError
}

type mempoolIDs struct {
Expand Down Expand Up @@ -280,3 +281,7 @@ type TxsMessage struct {
func (m *TxsMessage) String() string {
return fmt.Sprintf("[TxsMessage %v]", m.Txs)
}

func (memR *Reactor) AppHashErrorsCh() chan p2p.AppHashError {
return memR.appHashErrorsCh
}
5 changes: 5 additions & 0 deletions node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -1255,6 +1255,11 @@ func (n *Node) ConsensusReactor() *cs.Reactor {
return n.consensusReactor
}

// BCReactor returns the Node's BlockchainReactor.
func (n *Node) BCReactor() p2p.Reactor {
return n.bcReactor
}

// MempoolReactor returns the Node's mempool reactor.
func (n *Node) MempoolReactor() p2p.Reactor {
return n.mempoolReactor
Expand Down
22 changes: 20 additions & 2 deletions p2p/base_reactor.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,21 @@
package p2p

import (
"fmt"

"github.com/cometbft/cometbft/libs/service"
"github.com/cometbft/cometbft/p2p/conn"
)

type AppHashError struct {
Err error
Height uint64
}

func (e AppHashError) Error() string {
return fmt.Sprintf("app hash error at height %v: %s", e.Height, e.Err.Error())
}

// Reactor is responsible for handling incoming messages on one or more
// Channel. Switch calls GetChannels when reactor is added to it. When a new
// peer joins our node, InitPeer and AddPeer are called. RemovePeer is called
Expand Down Expand Up @@ -41,19 +52,25 @@ type Reactor interface {
// ReceiveEnvelope is called by the switch when an envelope is received from any connected
// peer on any of the channels registered by the reactor.
ReceiveEnvelope(Envelope)

// AppHashErrorsCh is used to stream hash errors to the sdk, which is then used
// to provide further debugging information in logs to the user.
AppHashErrorsCh() chan AppHashError
}

//--------------------------------------

type BaseReactor struct {
service.BaseService // Provides Start, Stop, .Quit
Switch *Switch
AppHashErrorChanBR chan AppHashError
}

func NewBaseReactor(name string, impl Reactor) *BaseReactor {
return &BaseReactor{
BaseService: *service.NewBaseService(nil, name, impl),
Switch: nil,
BaseService: *service.NewBaseService(nil, name, impl),
Switch: nil,
AppHashErrorChanBR: impl.AppHashErrorsCh(),
}
}

Expand All @@ -66,3 +83,4 @@ func (*BaseReactor) AddPeer(peer Peer) {}
func (*BaseReactor) RemovePeer(peer Peer, reason interface{}) {}
func (*BaseReactor) ReceiveEnvelope(e Envelope) {}
func (*BaseReactor) InitPeer(peer Peer) Peer { return peer }
func (*BaseReactor) AppHashErrorsCh() chan AppHashError { return nil }

0 comments on commit 28d7d17

Please sign in to comment.