Skip to content

Commit

Permalink
mempool async processing (#139)
Browse files Browse the repository at this point in the history
* Mempool async processing

* Forgot to commit important part

* Add changelog

* Fix race
  • Loading branch information
ValarDragon authored Aug 19, 2024
1 parent 18fa632 commit bde8009
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 12 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ It also includes a few other bug fixes and performance improvements.
point to their enclosing for loop label to exit
([\#3544](https://github.com/cometbft/cometbft/issues/3544))
- [#91](https://github.com/osmosis-labs/cometbft/pull/91) perf(consensus): Minor improvement by making add vote only do one peer set mutex call, not 3 (#3156)
* [#109](https://github.com/osmosis-labs/cometbft/pull/109) perf(p2p,mempool): Make mempool reactor receive not block. (Fixed by either #3209, #3230)


## v0.38.10

Expand Down
50 changes: 38 additions & 12 deletions mempool/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ type Reactor struct {
mempool *CListMempool
ids *mempoolIDs

peerTxProcesserChan chan *peerIncomingTx

// Semaphores to keep track of how many connections to peers are active for broadcasting
// transactions. Each semaphore has a capacity that puts an upper bound on the number of
// connections for different groups of peers.
Expand All @@ -41,6 +43,7 @@ func NewReactor(config *cfg.MempoolConfig, mempool *CListMempool) *Reactor {
memR.BaseReactor = *p2p.NewBaseReactor("Mempool", memR)
memR.activePersistentPeersSemaphore = semaphore.NewWeighted(int64(memR.config.ExperimentalMaxGossipConnectionsToPersistentPeers))
memR.activeNonPersistentPeersSemaphore = semaphore.NewWeighted(int64(memR.config.ExperimentalMaxGossipConnectionsToNonPersistentPeers))
memR.peerTxProcesserChan = make(chan *peerIncomingTx, 10000)

return memR
}
Expand All @@ -62,9 +65,14 @@ func (memR *Reactor) OnStart() error {
if !memR.config.Broadcast {
memR.Logger.Info("Tx broadcasting is disabled")
}

go memR.incomingPacketProcessor()
return nil
}

func (memR *Reactor) OnStop() {
}

// GetChannels implements Reactor by returning the list of channels for this
// reactor.
func (memR *Reactor) GetChannels() []*p2p.ChannelDescriptor {
Expand Down Expand Up @@ -134,20 +142,44 @@ func (memR *Reactor) RemovePeer(peer p2p.Peer, _ interface{}) {
// broadcast routine checks if peer is gone and returns
}

type peerIncomingTx struct {
tx *protomem.Txs
peer p2p.Peer
}

// Receive implements Reactor.
// It adds any received transactions to the mempool.
func (memR *Reactor) Receive(e p2p.Envelope) {
memR.Logger.Debug("Receive", "src", e.Src, "chId", e.ChannelID, "msg", e.Message)
switch msg := e.Message.(type) {
case *protomem.Txs:
protoTxs := msg.GetTxs()
pit := &peerIncomingTx{
tx: msg,
peer: e.Src,
}
memR.peerTxProcesserChan <- pit
default:
memR.Logger.Error("unknown message type", "src", e.Src, "chId", e.ChannelID, "msg", e.Message)
memR.Switch.StopPeerForError(e.Src, fmt.Errorf("mempool cannot handle message of type: %T", e.Message))
return
}
}

func (memR *Reactor) incomingPacketProcessor() {
for {
pit, chanOpen := <-memR.peerTxProcesserChan
if !chanOpen {
break
}

protoTxs := pit.tx.GetTxs()
if len(protoTxs) == 0 {
memR.Logger.Error("received empty txs from peer", "src", e.Src)
return
memR.Logger.Error("received empty txs from peer", "src", pit.peer)
continue
}
txInfo := TxInfo{SenderID: memR.ids.GetForPeer(e.Src)}
if e.Src != nil {
txInfo.SenderP2PID = e.Src.ID()
txInfo := TxInfo{SenderID: memR.ids.GetForPeer(pit.peer)}
if pit.peer != nil {
txInfo.SenderP2PID = pit.peer.ID()
}

var err error
Expand All @@ -160,13 +192,7 @@ func (memR *Reactor) Receive(e p2p.Envelope) {
memR.Logger.Info("Could not check tx", "tx", ntx.String(), "err", err)
}
}
default:
memR.Logger.Error("unknown message type", "src", e.Src, "chId", e.ChannelID, "msg", e.Message)
memR.Switch.StopPeerForError(e.Src, fmt.Errorf("mempool cannot handle message of type: %T", e.Message))
return
}

// broadcasting happens from go routines per peer
}

// PeerState describes the state of a peer.
Expand Down

0 comments on commit bde8009

Please sign in to comment.