Skip to content

Commit

Permalink
Merge pull request #1 from osmosis-labs/adam/v0.37.2-app-hash-check
Browse files Browse the repository at this point in the history
feat: app hash error channel
  • Loading branch information
czarcas7ic committed Jan 6, 2024
1 parent 8acc13c commit 7a4e9b5
Show file tree
Hide file tree
Showing 5 changed files with 63 additions and 14 deletions.
45 changes: 33 additions & 12 deletions internal/blocksync/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package blocksync
import (
"fmt"
"reflect"
"strings"
"time"

bcproto "github.com/cometbft/cometbft/api/cometbft/blocksync/v1"
Expand Down Expand Up @@ -61,8 +62,9 @@ type Reactor struct {
pool *BlockPool
blockSync bool

requestsCh <-chan BlockRequest
errorsCh <-chan peerError
requestsCh <-chan BlockRequest
errorsCh <-chan peerError
appHashErrorsCh chan p2p.AppHashError

switchToConsensusMs int

Expand All @@ -89,8 +91,9 @@ func NewReactor(state sm.State, blockExec *sm.BlockExecutor, store *store.BlockS
}
requestsCh := make(chan BlockRequest, maxTotalRequesters)

const capacity = 1000 // must be bigger than peers count
errorsCh := make(chan peerError, capacity) // so we don't block in #Receive#pool.AddBlock
const capacity = 1000 // must be bigger than peers count
errorsCh := make(chan peerError, capacity) // so we don't block in #Receive#pool.AddBlock
appHashErrorsCh := make(chan p2p.AppHashError) // create an unbuffered channel to stream appHash errors

startHeight := storeHeight + 1
if startHeight == 1 {
Expand All @@ -99,14 +102,15 @@ func NewReactor(state sm.State, blockExec *sm.BlockExecutor, store *store.BlockS
pool := NewBlockPool(startHeight, requestsCh, errorsCh)

bcR := &Reactor{
initialState: state,
blockExec: blockExec,
store: store,
pool: pool,
blockSync: blockSync,
requestsCh: requestsCh,
errorsCh: errorsCh,
metrics: metrics,
initialState: state,
blockExec: blockExec,
store: store,
pool: pool,
blockSync: blockSync,
requestsCh: requestsCh,
errorsCh: errorsCh,
appHashErrorsCh: appHashErrorsCh,
metrics: metrics,
}
bcR.BaseReactor = *p2p.NewBaseReactor("Reactor", bcR)
return bcR
Expand Down Expand Up @@ -475,6 +479,19 @@ FOR_LOOP:
}
}
if err != nil {
// If this is an appHash or lastResultsHash error, also pass to the appHashError channel.
if strings.Contains(err.Error(), "wrong Block.Header.AppHash") {
bcR.BaseReactor.AppHashErrorChanBR <- p2p.AppHashError{
Err: err,
Height: uint64(first.Height),
}
} else if strings.Contains(err.Error(), "wrong Block.Header.LastResultsHash") {
bcR.BaseReactor.AppHashErrorChanBR <- p2p.AppHashError{
Err: err,
Height: uint64(first.Height - 1),
}
}

bcR.Logger.Error("Error in validation", "err", err)
peerID := bcR.pool.RedoRequest(first.Height)
peer := bcR.Switch.Peers().Get(peerID)
Expand Down Expand Up @@ -538,3 +555,7 @@ func (bcR *Reactor) BroadcastStatusRequest() {
Message: &bcproto.StatusRequest{},
})
}

func (bcR *Reactor) AppHashErrorsCh() chan p2p.AppHashError {
return bcR.appHashErrorsCh
}
2 changes: 2 additions & 0 deletions internal/consensus/byzantine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -593,3 +593,5 @@ func (br *ByzantineReactor) Receive(e p2p.Envelope) {
br.reactor.Receive(e)
}
func (br *ByzantineReactor) InitPeer(peer p2p.Peer) p2p.Peer { return peer }

func (br *ByzantineReactor) AppHashErrorsCh() chan p2p.AppHashError { return nil }
3 changes: 3 additions & 0 deletions mempool/nop_mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,3 +109,6 @@ func (*NopMempoolReactor) Receive(p2p.Envelope) {}

// SetSwitch does nothing.
func (*NopMempoolReactor) SetSwitch(*p2p.Switch) {}

// AppHashErrorsCh always returns nil.
func (*NopMempoolReactor) AppHashErrorsCh() chan p2p.AppHashError { return nil }
5 changes: 5 additions & 0 deletions node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -891,6 +891,11 @@ func (n *Node) ConsensusReactor() *cs.Reactor {
return n.consensusReactor
}

// BCReactor returns the Node's BlockchainReactor.
func (n *Node) BCReactor() p2p.Reactor {
return n.bcReactor
}

// MempoolReactor returns the Node's mempool reactor.
func (n *Node) MempoolReactor() p2p.Reactor {
return n.mempoolReactor
Expand Down
22 changes: 20 additions & 2 deletions p2p/base_reactor.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,21 @@
package p2p

import (
"fmt"

"github.com/cometbft/cometbft/internal/service"
"github.com/cometbft/cometbft/p2p/conn"
)

type AppHashError struct {
Err error
Height uint64
}

func (e AppHashError) Error() string {
return fmt.Sprintf("app hash error at height %v: %s", e.Height, e.Err.Error())
}

// Reactor is responsible for handling incoming messages on one or more
// Channel. Switch calls GetChannels when reactor is added to it. When a new
// peer joins our node, InitPeer and AddPeer are called. RemovePeer is called
Expand Down Expand Up @@ -41,19 +52,25 @@ type Reactor interface {
// Receive is called by the switch when an envelope is received from any connected
// peer on any of the channels registered by the reactor
Receive(e Envelope)

// AppHashErrorsCh is used to stream hash errors to the sdk, which is then used
// to provide further debugging information in logs to the user.
AppHashErrorsCh() chan AppHashError
}

//--------------------------------------

type BaseReactor struct {
service.BaseService // Provides Start, Stop, .Quit
Switch *Switch
AppHashErrorChanBR chan AppHashError
}

func NewBaseReactor(name string, impl Reactor) *BaseReactor {
return &BaseReactor{
BaseService: *service.NewBaseService(nil, name, impl),
Switch: nil,
BaseService: *service.NewBaseService(nil, name, impl),
Switch: nil,
AppHashErrorChanBR: impl.AppHashErrorsCh(),
}
}

Expand All @@ -65,3 +82,4 @@ func (*BaseReactor) AddPeer(Peer) {}
func (*BaseReactor) RemovePeer(Peer, interface{}) {}
func (*BaseReactor) Receive(Envelope) {}
func (*BaseReactor) InitPeer(peer Peer) Peer { return peer }
func (*BaseReactor) AppHashErrorsCh() chan AppHashError { return nil }

0 comments on commit 7a4e9b5

Please sign in to comment.