Skip to content

Commit

Permalink
feat(p2p): render HasChannel(chID) is a public p2p.Peer method (c…
Browse files Browse the repository at this point in the history
…ometbft#3510)

Closes: cometbft#3472

It also prevents reactors from starting routines intended to send
messages to a peer that does not implement/support a given channel.
Because all `Send()` or `TrySend()` calls from this routine will be
useless, always returning `false` and possibly producing some busy-wait
behavior (see cometbft#3414).

The changes are restricted to: mempool and evidence reactor, because
they use a single channel and have a sending routine peer peer, and two
of the consensus routines, for Data and Votes.

Block and State sync reactors have smarter ways to deal with
unresponsive peers, so probably this check is not needed.

---

- [x] Tests written/updated
- [x] Changelog entry added in `.changelog` (we use
[unclog](https://github.com/informalsystems/unclog) to manage our
changelog)
- [x] Updated relevant documentation (`docs/` or `spec/`) and code
comments
- [x] Title follows the [Conventional
Commits](https://www.conventionalcommits.org/en/v1.0.0/) spec
  • Loading branch information
cason authored and ValarDragon committed Aug 19, 2024
1 parent 2be2073 commit 80f527f
Show file tree
Hide file tree
Showing 10 changed files with 57 additions and 17 deletions.
3 changes: 3 additions & 0 deletions .changelog/v0.38.6/features/3472-p2p-has-channel-api.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
- `[p2p]` `HasChannel(chID)` method added to the `Peer` interface, used by
reactors to check whether a peer implements/supports a given channel.
([#3472](https://github.com/cometbft/cometbft/issues/3472))
8 changes: 8 additions & 0 deletions consensus/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -569,6 +569,10 @@ func (conR *Reactor) getRoundState() *cstypes.RoundState {

func (conR *Reactor) gossipDataRoutine(peer p2p.Peer, ps *PeerState) {
logger := conR.Logger.With("peer", peer)
if !peer.HasChannel(DataChannel) {
logger.Info("Peer does not implement DataChannel.")
return
}
rng := cmtrand.NewStdlibRand()

OUTER_LOOP:
Expand Down Expand Up @@ -729,6 +733,10 @@ func (conR *Reactor) gossipDataForCatchup(logger log.Logger, rs *cstypes.RoundSt

func (conR *Reactor) gossipVotesRoutine(peer p2p.Peer, ps *PeerState) {
logger := conR.Logger.With("peer", peer)
if !peer.HasChannel(VoteChannel) {
logger.Info("Peer does not implement VoteChannel.")
return
}
rng := cmtrand.NewStdlibRand()

// Simple hack to throttle logs upon sleep.
Expand Down
4 changes: 3 additions & 1 deletion evidence/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,9 @@ func (evR *Reactor) GetChannels() []*p2p.ChannelDescriptor {

// AddPeer implements Reactor.
func (evR *Reactor) AddPeer(peer p2p.Peer) {
go evR.broadcastEvidenceRoutine(peer)
if peer.HasChannel(EvidenceChannel) {
go evR.broadcastEvidenceRoutine(peer)
}
}

// Receive implements Reactor.
Expand Down
1 change: 1 addition & 0 deletions evidence/reactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,7 @@ func TestReactorBroadcastEvidenceMemoryLeak(t *testing.T) {
e, ok := i.(p2p.Envelope)
return ok && e.ChannelID == evidence.EvidenceChannel
})).Return(false)
p.On("HasChannel", evidence.EvidenceChannel).Maybe().Return(true)
quitChan := make(<-chan struct{})
p.On("Quit").Return(quitChan)
ps := peerState{2}
Expand Down
2 changes: 1 addition & 1 deletion mempool/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func (memR *Reactor) GetChannels() []*p2p.ChannelDescriptor {
// AddPeer implements Reactor.
// It starts a broadcast routine ensuring all txs are forwarded to the given peer.
func (memR *Reactor) AddPeer(peer p2p.Peer) {
if memR.config.Broadcast {
if memR.config.Broadcast && peer.HasChannel(mempool.MempoolChannel) {

Check failure on line 99 in mempool/reactor.go

View workflow job for this annotation

GitHub Actions / Build (arm, linux)

undefined: mempool

Check failure on line 99 in mempool/reactor.go

View workflow job for this annotation

GitHub Actions / govulncheck

undefined: mempool

Check failure on line 99 in mempool/reactor.go

View workflow job for this annotation

GitHub Actions / Build (amd64, linux)

undefined: mempool

Check failure on line 99 in mempool/reactor.go

View workflow job for this annotation

GitHub Actions / tests (00)

undefined: mempool

Check failure on line 99 in mempool/reactor.go

View workflow job for this annotation

GitHub Actions / tests (04)

undefined: mempool

Check failure on line 99 in mempool/reactor.go

View workflow job for this annotation

GitHub Actions / tests (03)

undefined: mempool

Check failure on line 99 in mempool/reactor.go

View workflow job for this annotation

GitHub Actions / tests (01)

undefined: mempool

Check failure on line 99 in mempool/reactor.go

View workflow job for this annotation

GitHub Actions / golangci-lint

undefined: mempool) (typecheck)

Check failure on line 99 in mempool/reactor.go

View workflow job for this annotation

GitHub Actions / golangci-lint

undefined: mempool) (typecheck)

Check failure on line 99 in mempool/reactor.go

View workflow job for this annotation

GitHub Actions / golangci-lint

undefined: mempool) (typecheck)

Check failure on line 99 in mempool/reactor.go

View workflow job for this annotation

GitHub Actions / golangci-lint

undefined: mempool (typecheck)

Check failure on line 99 in mempool/reactor.go

View workflow job for this annotation

GitHub Actions / golangci-lint

undefined: mempool) (typecheck)

Check failure on line 99 in mempool/reactor.go

View workflow job for this annotation

GitHub Actions / tests (05)

undefined: mempool
go func() {
// Always forward transactions to unconditional peers.
if !memR.Switch.IsPeerUnconditional(peer.ID()) {
Expand Down
1 change: 1 addition & 0 deletions p2p/mock/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ func NewPeer(ip net.IP) *Peer {
func (mp *Peer) FlushStop() { mp.Stop() } //nolint:errcheck //ignore error
func (mp *Peer) TrySend(_ p2p.Envelope) bool { return true }
func (mp *Peer) TrySendMarshalled(e p2p.MarshalledEnvelope) bool { return true }
func (mp *Peer) HasChannel(_ byte) bool { return true }
func (mp *Peer) Send(_ p2p.Envelope) bool { return true }
func (mp *Peer) NodeInfo() p2p.NodeInfo {
return p2p.DefaultNodeInfo{
Expand Down
18 changes: 18 additions & 0 deletions p2p/mocks/peer.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 5 additions & 5 deletions p2p/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ type Peer interface {
Status() cmtconn.ConnectionStatus
SocketAddr() *NetAddress // actual address of the socket

HasChannel(chID byte) bool // Does the peer implement this channel?
Send(Envelope) bool
TrySend(Envelope) bool
TrySendMarshalled(MarshalledEnvelope) bool
Expand Down Expand Up @@ -117,7 +118,7 @@ type peer struct {

// peer's node info and the channel it knows about
// channels = nodeInfo.Channels
// cached to avoid copying nodeInfo in hasChannel
// cached to avoid copying nodeInfo in HasChannel
nodeInfo NodeInfo
channels []byte

Expand Down Expand Up @@ -289,7 +290,7 @@ func (p *peer) send(chID byte, msg proto.Message, sendFunc func(byte, []byte) bo
func (p *peer) sendMarshalled(chID byte, msgType reflect.Type, msgBytes []byte, sendFunc func(byte, []byte) bool) bool {
if !p.IsRunning() {
return false
} else if !p.hasChannel(chID) {
} else if !p.HasChannel(chID) {
return false
}
res := sendFunc(chID, msgBytes)
Expand All @@ -309,9 +310,8 @@ func (p *peer) Set(key string, data interface{}) {
p.Data.Set(key, data)
}

// hasChannel returns true if the peer reported
// knowing about the given chID.
func (p *peer) hasChannel(chID byte) bool {
// HasChannel returns whether the peer reported implementing this channel.
func (p *peer) HasChannel(chID byte) bool {
for _, ch := range p.channels {
if ch == chID {
return true
Expand Down
1 change: 1 addition & 0 deletions p2p/peer_set_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ type mockPeer struct {
}

func (mp *mockPeer) FlushStop() { mp.Stop() } //nolint:errcheck // ignore error
func (mp *mockPeer) HasChannel(byte) bool { return true }
func (mp *mockPeer) TrySend(Envelope) bool { return true }
func (mp *mockPeer) TrySendMarshalled(MarshalledEnvelope) bool { return true }
func (mp *mockPeer) Send(Envelope) bool { return true }
Expand Down
26 changes: 16 additions & 10 deletions spec/p2p/reactor-api/p2p-api.md
Original file line number Diff line number Diff line change
Expand Up @@ -185,15 +185,16 @@ From this point, reactors can use the methods of the new `Peer` instance.
The table below summarizes the interaction of the standard reactors with
connected peers, with the `Peer` methods used by them:

| `Peer` API method | consensus | block sync | state sync | mempool | evidence | PEX |
|--------------------------------------------|-----------|------------|------------|---------|-----------|-------|
| `ID() ID` | x | x | x | x | x | x |
| `IsRunning() bool` | x | | | x | x | |
| `Quit() <-chan struct{}` | | | | x | x | |
| `Get(string) interface{}` | x | | | x | x | |
| `Set(string, interface{})` | x | | | | | |
| `Send(Envelope) bool` | x | x | x | x | x | x |
| `TrySend(Envelope) bool` | x | x | | | | |
| `Peer` API method | consensus | block sync | state sync | mempool | evidence | PEX |
|----------------------------|-----------|------------|------------|---------|----------|-----|
| `ID() ID` | x | x | x | x | x | x |
| `IsRunning() bool` | x | | | x | x | |
| `Quit() <-chan struct{}` | | | | x | x | |
| `Get(string) interface{}` | x | | | x | x | |
| `Set(string, interface{})` | x | | | | | |
| `HasChannel(byte) bool` | x | | | x | x | |
| `Send(Envelope) bool` | x | x | x | x | x | x |
| `TrySend(Envelope) bool` | x | x | | | | |

The above list is not exhaustive as it does not include all the `Peer` methods
invoked by the PEX reactor, a special component that should be considered part
Expand Down Expand Up @@ -269,8 +270,10 @@ Finally, a `Peer` instance allows a reactor to send messages to companion
reactors running at that peer.
This is ultimately the goal of the switch when it provides `Peer` instances to
the registered reactors.
There are two methods for sending messages:
There are two methods for sending messages, and one auxiliary method to check
whether the peer supports a given channel:

func (p Peer) HasChannel(chID byte) bool
func (p Peer) Send(e Envelope) bool
func (p Peer) TrySend(e Envelope) bool

Expand All @@ -279,6 +282,9 @@ set as follows:

- `ChannelID`: the channel the message should be sent through, which defines
the reactor that will process the message;
- The auxiliary `HasChannel()` method allows testing whether the remote peer
implements a channel; if it does not, both message-sending methods will
immediately return `false`, as sending always fails.
- `Src`: this field represents the source of an incoming message, which is
irrelevant for outgoing messages;
- `Message`: the actual message's payload, which is marshalled using protocol buffers.
Expand Down

0 comments on commit 80f527f

Please sign in to comment.