Skip to content

Commit

Permalink
feat(p2p): render HasChannel(chID) is a public p2p.Peer method (#… (
Browse files Browse the repository at this point in the history
#142)

* feat(p2p): render `HasChannel(chID)` is a public `p2p.Peer` method (cometbft#3510)

Closes: cometbft#3472

It also prevents reactors from starting routines intended to send
messages to a peer that does not implement/support a given channel.
Because all `Send()` or `TrySend()` calls from this routine will be
useless, always returning `false` and possibly producing some busy-wait
behavior (see cometbft#3414).

The changes are restricted to: mempool and evidence reactor, because
they use a single channel and have a sending routine peer peer, and two
of the consensus routines, for Data and Votes.

Block and State sync reactors have smarter ways to deal with
unresponsive peers, so probably this check is not needed.

---

- [x] Tests written/updated
- [x] Changelog entry added in `.changelog` (we use
[unclog](https://github.com/informalsystems/unclog) to manage our
changelog)
- [x] Updated relevant documentation (`docs/` or `spec/`) and code
comments
- [x] Title follows the [Conventional
Commits](https://www.conventionalcommits.org/en/v1.0.0/) spec

* add changelog

* Fix build issue

* Fix lint

---------

Co-authored-by: Daniel <[email protected]>
  • Loading branch information
ValarDragon and cason authored Aug 19, 2024
1 parent 2be2073 commit f2748e9
Show file tree
Hide file tree
Showing 12 changed files with 63 additions and 18 deletions.
3 changes: 3 additions & 0 deletions .changelog/v0.38.6/features/3472-p2p-has-channel-api.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
- `[p2p]` `HasChannel(chID)` method added to the `Peer` interface, used by
reactors to check whether a peer implements/supports a given channel.
([#3472](https://github.com/cometbft/cometbft/issues/3472))
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@ It also includes a few other bug fixes and performance improvements.
* [#109](https://github.com/osmosis-labs/cometbft/pull/109) perf(p2p,mempool): Make mempool reactor receive not block. (Fixed by either #3209, #3230)
* [#105](https://github.com/osmosis-labs/cometbft/pull/105) perf(p2p)!: Remove PeerSendBytesTotal metric #3184
* [#95](https://github.com/osmosis-labs/cometbft/pull/95) perf(types) Make a new method `GetByAddressMut` for `ValSet`, which does not copy the returned validator. (#3129)
* [#128](https://github.com/osmosis-labs/cometbft/pull/128) feat(p2p): render HasChannel(chID) is a public p2p.Peer method (#3510)
* [#126]() Remove p2p allocations for wrapping outbound packets
* [#125]() Fix marshalling and concurrency overhead within broadcast routines
* perf(p2p): Only update send monitor once per batch packet msg send (#3382)
* [#124]() Secret connection read buffer

## v0.38.10

Expand Down
8 changes: 8 additions & 0 deletions consensus/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -569,6 +569,10 @@ func (conR *Reactor) getRoundState() *cstypes.RoundState {

func (conR *Reactor) gossipDataRoutine(peer p2p.Peer, ps *PeerState) {
logger := conR.Logger.With("peer", peer)
if !peer.HasChannel(DataChannel) {
logger.Info("Peer does not implement DataChannel.")
return
}
rng := cmtrand.NewStdlibRand()

OUTER_LOOP:
Expand Down Expand Up @@ -729,6 +733,10 @@ func (conR *Reactor) gossipDataForCatchup(logger log.Logger, rs *cstypes.RoundSt

func (conR *Reactor) gossipVotesRoutine(peer p2p.Peer, ps *PeerState) {
logger := conR.Logger.With("peer", peer)
if !peer.HasChannel(VoteChannel) {
logger.Info("Peer does not implement VoteChannel.")
return
}
rng := cmtrand.NewStdlibRand()

// Simple hack to throttle logs upon sleep.
Expand Down
4 changes: 3 additions & 1 deletion evidence/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,9 @@ func (evR *Reactor) GetChannels() []*p2p.ChannelDescriptor {

// AddPeer implements Reactor.
func (evR *Reactor) AddPeer(peer p2p.Peer) {
go evR.broadcastEvidenceRoutine(peer)
if peer.HasChannel(EvidenceChannel) {
go evR.broadcastEvidenceRoutine(peer)
}
}

// Receive implements Reactor.
Expand Down
1 change: 1 addition & 0 deletions evidence/reactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,7 @@ func TestReactorBroadcastEvidenceMemoryLeak(t *testing.T) {
e, ok := i.(p2p.Envelope)
return ok && e.ChannelID == evidence.EvidenceChannel
})).Return(false)
p.On("HasChannel", evidence.EvidenceChannel).Maybe().Return(true)
quitChan := make(<-chan struct{})
p.On("Quit").Return(quitChan)
ps := peerState{2}
Expand Down
2 changes: 1 addition & 1 deletion mempool/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func (memR *Reactor) GetChannels() []*p2p.ChannelDescriptor {
// AddPeer implements Reactor.
// It starts a broadcast routine ensuring all txs are forwarded to the given peer.
func (memR *Reactor) AddPeer(peer p2p.Peer) {
if memR.config.Broadcast {
if memR.config.Broadcast && peer.HasChannel(MempoolChannel) {
go func() {
// Always forward transactions to unconditional peers.
if !memR.Switch.IsPeerUnconditional(peer.ID()) {
Expand Down
1 change: 1 addition & 0 deletions p2p/mock/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ func NewPeer(ip net.IP) *Peer {
func (mp *Peer) FlushStop() { mp.Stop() } //nolint:errcheck //ignore error
func (mp *Peer) TrySend(_ p2p.Envelope) bool { return true }
func (mp *Peer) TrySendMarshalled(e p2p.MarshalledEnvelope) bool { return true }
func (mp *Peer) HasChannel(_ byte) bool { return true }
func (mp *Peer) Send(_ p2p.Envelope) bool { return true }
func (mp *Peer) NodeInfo() p2p.NodeInfo {
return p2p.DefaultNodeInfo{
Expand Down
18 changes: 18 additions & 0 deletions p2p/mocks/peer.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 5 additions & 5 deletions p2p/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ type Peer interface {
Status() cmtconn.ConnectionStatus
SocketAddr() *NetAddress // actual address of the socket

HasChannel(chID byte) bool // Does the peer implement this channel?
Send(Envelope) bool
TrySend(Envelope) bool
TrySendMarshalled(MarshalledEnvelope) bool
Expand Down Expand Up @@ -117,7 +118,7 @@ type peer struct {

// peer's node info and the channel it knows about
// channels = nodeInfo.Channels
// cached to avoid copying nodeInfo in hasChannel
// cached to avoid copying nodeInfo in HasChannel
nodeInfo NodeInfo
channels []byte

Expand Down Expand Up @@ -289,7 +290,7 @@ func (p *peer) send(chID byte, msg proto.Message, sendFunc func(byte, []byte) bo
func (p *peer) sendMarshalled(chID byte, msgType reflect.Type, msgBytes []byte, sendFunc func(byte, []byte) bool) bool {
if !p.IsRunning() {
return false
} else if !p.hasChannel(chID) {
} else if !p.HasChannel(chID) {
return false
}
res := sendFunc(chID, msgBytes)
Expand All @@ -309,9 +310,8 @@ func (p *peer) Set(key string, data interface{}) {
p.Data.Set(key, data)
}

// hasChannel returns true if the peer reported
// knowing about the given chID.
func (p *peer) hasChannel(chID byte) bool {
// HasChannel returns whether the peer reported implementing this channel.
func (p *peer) HasChannel(chID byte) bool {
for _, ch := range p.channels {
if ch == chID {
return true
Expand Down
1 change: 1 addition & 0 deletions p2p/peer_set_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ type mockPeer struct {
}

func (mp *mockPeer) FlushStop() { mp.Stop() } //nolint:errcheck // ignore error
func (mp *mockPeer) HasChannel(byte) bool { return true }
func (mp *mockPeer) TrySend(Envelope) bool { return true }
func (mp *mockPeer) TrySendMarshalled(MarshalledEnvelope) bool { return true }
func (mp *mockPeer) Send(Envelope) bool { return true }
Expand Down
2 changes: 1 addition & 1 deletion p2p/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ type Envelope struct {
ChannelID byte
}

// MarshalledEnvelope contains a proto message, its marshalled message, with sender routing info.
// MarshalledEnvelope contains a proto message, its marshaled message, with sender routing info.
type MarshalledEnvelope struct {
Envelope
MarshalledMessage []byte
Expand Down
26 changes: 16 additions & 10 deletions spec/p2p/reactor-api/p2p-api.md
Original file line number Diff line number Diff line change
Expand Up @@ -185,15 +185,16 @@ From this point, reactors can use the methods of the new `Peer` instance.
The table below summarizes the interaction of the standard reactors with
connected peers, with the `Peer` methods used by them:

| `Peer` API method | consensus | block sync | state sync | mempool | evidence | PEX |
|--------------------------------------------|-----------|------------|------------|---------|-----------|-------|
| `ID() ID` | x | x | x | x | x | x |
| `IsRunning() bool` | x | | | x | x | |
| `Quit() <-chan struct{}` | | | | x | x | |
| `Get(string) interface{}` | x | | | x | x | |
| `Set(string, interface{})` | x | | | | | |
| `Send(Envelope) bool` | x | x | x | x | x | x |
| `TrySend(Envelope) bool` | x | x | | | | |
| `Peer` API method | consensus | block sync | state sync | mempool | evidence | PEX |
|----------------------------|-----------|------------|------------|---------|----------|-----|
| `ID() ID` | x | x | x | x | x | x |
| `IsRunning() bool` | x | | | x | x | |
| `Quit() <-chan struct{}` | | | | x | x | |
| `Get(string) interface{}` | x | | | x | x | |
| `Set(string, interface{})` | x | | | | | |
| `HasChannel(byte) bool` | x | | | x | x | |
| `Send(Envelope) bool` | x | x | x | x | x | x |
| `TrySend(Envelope) bool` | x | x | | | | |

The above list is not exhaustive as it does not include all the `Peer` methods
invoked by the PEX reactor, a special component that should be considered part
Expand Down Expand Up @@ -269,8 +270,10 @@ Finally, a `Peer` instance allows a reactor to send messages to companion
reactors running at that peer.
This is ultimately the goal of the switch when it provides `Peer` instances to
the registered reactors.
There are two methods for sending messages:
There are two methods for sending messages, and one auxiliary method to check
whether the peer supports a given channel:

func (p Peer) HasChannel(chID byte) bool
func (p Peer) Send(e Envelope) bool
func (p Peer) TrySend(e Envelope) bool

Expand All @@ -279,6 +282,9 @@ set as follows:

- `ChannelID`: the channel the message should be sent through, which defines
the reactor that will process the message;
- The auxiliary `HasChannel()` method allows testing whether the remote peer
implements a channel; if it does not, both message-sending methods will
immediately return `false`, as sending always fails.
- `Src`: this field represents the source of an incoming message, which is
irrelevant for outgoing messages;
- `Message`: the actual message's payload, which is marshalled using protocol buffers.
Expand Down

0 comments on commit f2748e9

Please sign in to comment.