From bde8009675f4efb75499ac2b3046120d8d278c23 Mon Sep 17 00:00:00 2001 From: Dev Ojha Date: Mon, 19 Aug 2024 22:30:21 +0200 Subject: [PATCH] mempool async processing (#139) * Mempool async processing * Forgot to commit important part * Add changelog * Fix race --- CHANGELOG.md | 2 ++ mempool/reactor.go | 50 +++++++++++++++++++++++++++++++++++----------- 2 files changed, 40 insertions(+), 12 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 3e648c749d8..c3b0882420e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,6 +19,8 @@ It also includes a few other bug fixes and performance improvements. point to their enclosing for loop label to exit ([\#3544](https://github.com/cometbft/cometbft/issues/3544)) - [#91](https://github.com/osmosis-labs/cometbft/pull/91) perf(consensus): Minor improvement by making add vote only do one peer set mutex call, not 3 (#3156) +* [#109](https://github.com/osmosis-labs/cometbft/pull/109) perf(p2p,mempool): Make mempool reactor receive not block. (Fixed by either #3209, #3230) + ## v0.38.10 diff --git a/mempool/reactor.go b/mempool/reactor.go index 5919b8cadd2..3f40181bc32 100644 --- a/mempool/reactor.go +++ b/mempool/reactor.go @@ -24,6 +24,8 @@ type Reactor struct { mempool *CListMempool ids *mempoolIDs + peerTxProcesserChan chan *peerIncomingTx + // Semaphores to keep track of how many connections to peers are active for broadcasting // transactions. Each semaphore has a capacity that puts an upper bound on the number of // connections for different groups of peers. @@ -41,6 +43,7 @@ func NewReactor(config *cfg.MempoolConfig, mempool *CListMempool) *Reactor { memR.BaseReactor = *p2p.NewBaseReactor("Mempool", memR) memR.activePersistentPeersSemaphore = semaphore.NewWeighted(int64(memR.config.ExperimentalMaxGossipConnectionsToPersistentPeers)) memR.activeNonPersistentPeersSemaphore = semaphore.NewWeighted(int64(memR.config.ExperimentalMaxGossipConnectionsToNonPersistentPeers)) + memR.peerTxProcesserChan = make(chan *peerIncomingTx, 10000) return memR } @@ -62,9 +65,14 @@ func (memR *Reactor) OnStart() error { if !memR.config.Broadcast { memR.Logger.Info("Tx broadcasting is disabled") } + + go memR.incomingPacketProcessor() return nil } +func (memR *Reactor) OnStop() { +} + // GetChannels implements Reactor by returning the list of channels for this // reactor. func (memR *Reactor) GetChannels() []*p2p.ChannelDescriptor { @@ -134,20 +142,44 @@ func (memR *Reactor) RemovePeer(peer p2p.Peer, _ interface{}) { // broadcast routine checks if peer is gone and returns } +type peerIncomingTx struct { + tx *protomem.Txs + peer p2p.Peer +} + // Receive implements Reactor. // It adds any received transactions to the mempool. func (memR *Reactor) Receive(e p2p.Envelope) { memR.Logger.Debug("Receive", "src", e.Src, "chId", e.ChannelID, "msg", e.Message) switch msg := e.Message.(type) { case *protomem.Txs: - protoTxs := msg.GetTxs() + pit := &peerIncomingTx{ + tx: msg, + peer: e.Src, + } + memR.peerTxProcesserChan <- pit + default: + memR.Logger.Error("unknown message type", "src", e.Src, "chId", e.ChannelID, "msg", e.Message) + memR.Switch.StopPeerForError(e.Src, fmt.Errorf("mempool cannot handle message of type: %T", e.Message)) + return + } +} + +func (memR *Reactor) incomingPacketProcessor() { + for { + pit, chanOpen := <-memR.peerTxProcesserChan + if !chanOpen { + break + } + + protoTxs := pit.tx.GetTxs() if len(protoTxs) == 0 { - memR.Logger.Error("received empty txs from peer", "src", e.Src) - return + memR.Logger.Error("received empty txs from peer", "src", pit.peer) + continue } - txInfo := TxInfo{SenderID: memR.ids.GetForPeer(e.Src)} - if e.Src != nil { - txInfo.SenderP2PID = e.Src.ID() + txInfo := TxInfo{SenderID: memR.ids.GetForPeer(pit.peer)} + if pit.peer != nil { + txInfo.SenderP2PID = pit.peer.ID() } var err error @@ -160,13 +192,7 @@ func (memR *Reactor) Receive(e p2p.Envelope) { memR.Logger.Info("Could not check tx", "tx", ntx.String(), "err", err) } } - default: - memR.Logger.Error("unknown message type", "src", e.Src, "chId", e.ChannelID, "msg", e.Message) - memR.Switch.StopPeerForError(e.Src, fmt.Errorf("mempool cannot handle message of type: %T", e.Message)) - return } - - // broadcasting happens from go routines per peer } // PeerState describes the state of a peer.