From 80f527fb3bd57ccd9f15d32880c27126c7fb1a8f Mon Sep 17 00:00:00 2001 From: Daniel Date: Mon, 15 Jul 2024 09:04:39 +0200 Subject: [PATCH] feat(p2p): render `HasChannel(chID)` is a public `p2p.Peer` method (#3510) Closes: #3472 It also prevents reactors from starting routines intended to send messages to a peer that does not implement/support a given channel. Because all `Send()` or `TrySend()` calls from this routine will be useless, always returning `false` and possibly producing some busy-wait behavior (see https://github.com/cometbft/cometbft/issues/3414). The changes are restricted to: mempool and evidence reactor, because they use a single channel and have a sending routine peer peer, and two of the consensus routines, for Data and Votes. Block and State sync reactors have smarter ways to deal with unresponsive peers, so probably this check is not needed. --- - [x] Tests written/updated - [x] Changelog entry added in `.changelog` (we use [unclog](https://github.com/informalsystems/unclog) to manage our changelog) - [x] Updated relevant documentation (`docs/` or `spec/`) and code comments - [x] Title follows the [Conventional Commits](https://www.conventionalcommits.org/en/v1.0.0/) spec --- .../features/3472-p2p-has-channel-api.md | 3 +++ consensus/reactor.go | 8 ++++++ evidence/reactor.go | 4 ++- evidence/reactor_test.go | 1 + mempool/reactor.go | 2 +- p2p/mock/peer.go | 1 + p2p/mocks/peer.go | 18 +++++++++++++ p2p/peer.go | 10 +++---- p2p/peer_set_test.go | 1 + spec/p2p/reactor-api/p2p-api.md | 26 ++++++++++++------- 10 files changed, 57 insertions(+), 17 deletions(-) create mode 100644 .changelog/v0.38.6/features/3472-p2p-has-channel-api.md diff --git a/.changelog/v0.38.6/features/3472-p2p-has-channel-api.md b/.changelog/v0.38.6/features/3472-p2p-has-channel-api.md new file mode 100644 index 00000000000..b554a29ce1d --- /dev/null +++ b/.changelog/v0.38.6/features/3472-p2p-has-channel-api.md @@ -0,0 +1,3 @@ +- `[p2p]` `HasChannel(chID)` method added to the `Peer` interface, used by + reactors to check whether a peer implements/supports a given channel. + ([#3472](https://github.com/cometbft/cometbft/issues/3472)) diff --git a/consensus/reactor.go b/consensus/reactor.go index 4fa1ca3484a..cd9cae4b0be 100644 --- a/consensus/reactor.go +++ b/consensus/reactor.go @@ -569,6 +569,10 @@ func (conR *Reactor) getRoundState() *cstypes.RoundState { func (conR *Reactor) gossipDataRoutine(peer p2p.Peer, ps *PeerState) { logger := conR.Logger.With("peer", peer) + if !peer.HasChannel(DataChannel) { + logger.Info("Peer does not implement DataChannel.") + return + } rng := cmtrand.NewStdlibRand() OUTER_LOOP: @@ -729,6 +733,10 @@ func (conR *Reactor) gossipDataForCatchup(logger log.Logger, rs *cstypes.RoundSt func (conR *Reactor) gossipVotesRoutine(peer p2p.Peer, ps *PeerState) { logger := conR.Logger.With("peer", peer) + if !peer.HasChannel(VoteChannel) { + logger.Info("Peer does not implement VoteChannel.") + return + } rng := cmtrand.NewStdlibRand() // Simple hack to throttle logs upon sleep. diff --git a/evidence/reactor.go b/evidence/reactor.go index 10d3e53111b..fbc25f63ec5 100644 --- a/evidence/reactor.go +++ b/evidence/reactor.go @@ -64,7 +64,9 @@ func (evR *Reactor) GetChannels() []*p2p.ChannelDescriptor { // AddPeer implements Reactor. func (evR *Reactor) AddPeer(peer p2p.Peer) { - go evR.broadcastEvidenceRoutine(peer) + if peer.HasChannel(EvidenceChannel) { + go evR.broadcastEvidenceRoutine(peer) + } } // Receive implements Reactor. diff --git a/evidence/reactor_test.go b/evidence/reactor_test.go index 620c8fa7d5e..d04c5f5d49f 100644 --- a/evidence/reactor_test.go +++ b/evidence/reactor_test.go @@ -212,6 +212,7 @@ func TestReactorBroadcastEvidenceMemoryLeak(t *testing.T) { e, ok := i.(p2p.Envelope) return ok && e.ChannelID == evidence.EvidenceChannel })).Return(false) + p.On("HasChannel", evidence.EvidenceChannel).Maybe().Return(true) quitChan := make(<-chan struct{}) p.On("Quit").Return(quitChan) ps := peerState{2} diff --git a/mempool/reactor.go b/mempool/reactor.go index 3f40181bc32..0e69fd02fec 100644 --- a/mempool/reactor.go +++ b/mempool/reactor.go @@ -96,7 +96,7 @@ func (memR *Reactor) GetChannels() []*p2p.ChannelDescriptor { // AddPeer implements Reactor. // It starts a broadcast routine ensuring all txs are forwarded to the given peer. func (memR *Reactor) AddPeer(peer p2p.Peer) { - if memR.config.Broadcast { + if memR.config.Broadcast && peer.HasChannel(mempool.MempoolChannel) { go func() { // Always forward transactions to unconditional peers. if !memR.Switch.IsPeerUnconditional(peer.ID()) { diff --git a/p2p/mock/peer.go b/p2p/mock/peer.go index 90acb65aa5b..4214550a122 100644 --- a/p2p/mock/peer.go +++ b/p2p/mock/peer.go @@ -45,6 +45,7 @@ func NewPeer(ip net.IP) *Peer { func (mp *Peer) FlushStop() { mp.Stop() } //nolint:errcheck //ignore error func (mp *Peer) TrySend(_ p2p.Envelope) bool { return true } func (mp *Peer) TrySendMarshalled(e p2p.MarshalledEnvelope) bool { return true } +func (mp *Peer) HasChannel(_ byte) bool { return true } func (mp *Peer) Send(_ p2p.Envelope) bool { return true } func (mp *Peer) NodeInfo() p2p.NodeInfo { return p2p.DefaultNodeInfo{ diff --git a/p2p/mocks/peer.go b/p2p/mocks/peer.go index bc0ed10470b..3358896967d 100644 --- a/p2p/mocks/peer.go +++ b/p2p/mocks/peer.go @@ -67,6 +67,24 @@ func (_m *Peer) GetRemovalFailed() bool { return r0 } +// HasChannel provides a mock function with given fields: chID +func (_m *Peer) HasChannel(chID byte) bool { + ret := _m.Called(chID) + + if len(ret) == 0 { + panic("no return value specified for HasChannel") + } + + var r0 bool + if rf, ok := ret.Get(0).(func(byte) bool); ok { + r0 = rf(chID) + } else { + r0 = ret.Get(0).(bool) + } + + return r0 +} + // ID provides a mock function with given fields: func (_m *Peer) ID() p2p.ID { ret := _m.Called() diff --git a/p2p/peer.go b/p2p/peer.go index fabecc3a82b..e1e371d8104 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -39,6 +39,7 @@ type Peer interface { Status() cmtconn.ConnectionStatus SocketAddr() *NetAddress // actual address of the socket + HasChannel(chID byte) bool // Does the peer implement this channel? Send(Envelope) bool TrySend(Envelope) bool TrySendMarshalled(MarshalledEnvelope) bool @@ -117,7 +118,7 @@ type peer struct { // peer's node info and the channel it knows about // channels = nodeInfo.Channels - // cached to avoid copying nodeInfo in hasChannel + // cached to avoid copying nodeInfo in HasChannel nodeInfo NodeInfo channels []byte @@ -289,7 +290,7 @@ func (p *peer) send(chID byte, msg proto.Message, sendFunc func(byte, []byte) bo func (p *peer) sendMarshalled(chID byte, msgType reflect.Type, msgBytes []byte, sendFunc func(byte, []byte) bool) bool { if !p.IsRunning() { return false - } else if !p.hasChannel(chID) { + } else if !p.HasChannel(chID) { return false } res := sendFunc(chID, msgBytes) @@ -309,9 +310,8 @@ func (p *peer) Set(key string, data interface{}) { p.Data.Set(key, data) } -// hasChannel returns true if the peer reported -// knowing about the given chID. -func (p *peer) hasChannel(chID byte) bool { +// HasChannel returns whether the peer reported implementing this channel. +func (p *peer) HasChannel(chID byte) bool { for _, ch := range p.channels { if ch == chID { return true diff --git a/p2p/peer_set_test.go b/p2p/peer_set_test.go index 5a7701a55b2..58fe7520579 100644 --- a/p2p/peer_set_test.go +++ b/p2p/peer_set_test.go @@ -19,6 +19,7 @@ type mockPeer struct { } func (mp *mockPeer) FlushStop() { mp.Stop() } //nolint:errcheck // ignore error +func (mp *mockPeer) HasChannel(byte) bool { return true } func (mp *mockPeer) TrySend(Envelope) bool { return true } func (mp *mockPeer) TrySendMarshalled(MarshalledEnvelope) bool { return true } func (mp *mockPeer) Send(Envelope) bool { return true } diff --git a/spec/p2p/reactor-api/p2p-api.md b/spec/p2p/reactor-api/p2p-api.md index ad1fbff3110..06386bdce5b 100644 --- a/spec/p2p/reactor-api/p2p-api.md +++ b/spec/p2p/reactor-api/p2p-api.md @@ -185,15 +185,16 @@ From this point, reactors can use the methods of the new `Peer` instance. The table below summarizes the interaction of the standard reactors with connected peers, with the `Peer` methods used by them: -| `Peer` API method | consensus | block sync | state sync | mempool | evidence | PEX | -|--------------------------------------------|-----------|------------|------------|---------|-----------|-------| -| `ID() ID` | x | x | x | x | x | x | -| `IsRunning() bool` | x | | | x | x | | -| `Quit() <-chan struct{}` | | | | x | x | | -| `Get(string) interface{}` | x | | | x | x | | -| `Set(string, interface{})` | x | | | | | | -| `Send(Envelope) bool` | x | x | x | x | x | x | -| `TrySend(Envelope) bool` | x | x | | | | | +| `Peer` API method | consensus | block sync | state sync | mempool | evidence | PEX | +|----------------------------|-----------|------------|------------|---------|----------|-----| +| `ID() ID` | x | x | x | x | x | x | +| `IsRunning() bool` | x | | | x | x | | +| `Quit() <-chan struct{}` | | | | x | x | | +| `Get(string) interface{}` | x | | | x | x | | +| `Set(string, interface{})` | x | | | | | | +| `HasChannel(byte) bool` | x | | | x | x | | +| `Send(Envelope) bool` | x | x | x | x | x | x | +| `TrySend(Envelope) bool` | x | x | | | | | The above list is not exhaustive as it does not include all the `Peer` methods invoked by the PEX reactor, a special component that should be considered part @@ -269,8 +270,10 @@ Finally, a `Peer` instance allows a reactor to send messages to companion reactors running at that peer. This is ultimately the goal of the switch when it provides `Peer` instances to the registered reactors. -There are two methods for sending messages: +There are two methods for sending messages, and one auxiliary method to check +whether the peer supports a given channel: + func (p Peer) HasChannel(chID byte) bool func (p Peer) Send(e Envelope) bool func (p Peer) TrySend(e Envelope) bool @@ -279,6 +282,9 @@ set as follows: - `ChannelID`: the channel the message should be sent through, which defines the reactor that will process the message; + - The auxiliary `HasChannel()` method allows testing whether the remote peer + implements a channel; if it does not, both message-sending methods will + immediately return `false`, as sending always fails. - `Src`: this field represents the source of an incoming message, which is irrelevant for outgoing messages; - `Message`: the actual message's payload, which is marshalled using protocol buffers.