Skip to content

Commit

Permalink
Do not block indefinitely on the semaphore (cometbft#1654)
Browse files Browse the repository at this point in the history
* Do not block indefinitely on the semaphore

* Cancel the context, irrespective of the flow followed

* Makes the code more readable

* Improving comment

* make linter happy

* Updating comments to match

* Commenting out `select` and leaving it as TODO for when Contexts are more widely used

* Cleaned up comments
  • Loading branch information
lasarojc authored Nov 24, 2023
1 parent 96abada commit 2679498
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 27 deletions.
9 changes: 5 additions & 4 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -892,12 +892,13 @@ type MempoolConfig struct {
// XXX: Unused due to https://github.com/tendermint/tendermint/issues/5796
MaxBatchBytes int `mapstructure:"max_batch_bytes"`
// Experimental parameters to limit gossiping txs to up to the specified number of peers.
// We use two independent upper values for persistent peers and for non-persistent peers.
// We use two independent upper values for persistent and non-persistent peers.
// Unconditional peers are not affected by this feature.
// If we are connected to more than the specified number of persistent peers, only send txs to
// the first ExperimentalMaxGossipConnectionsToPersistentPeers of them. If one of those
// persistent peers disconnects, activate another persistent peer. Similarly for non-persistent
// peers, with an upper limit of ExperimentalMaxGossipConnectionsToNonPersistentPeers.
// ExperimentalMaxGossipConnectionsToPersistentPeers of them. If one of those
// persistent peers disconnects, activate another persistent peer.
// Similarly for non-persistent peers, with an upper limit of
// ExperimentalMaxGossipConnectionsToNonPersistentPeers.
// If set to 0, the feature is disabled for the corresponding group of peers, that is, the
// number of active connections to that group of peers is not bounded.
// For non-persistent peers, if enabled, a value of 10 is recommended based on experimental
Expand Down
9 changes: 5 additions & 4 deletions config/toml.go
Original file line number Diff line number Diff line change
Expand Up @@ -436,12 +436,13 @@ max_tx_bytes = {{ .Mempool.MaxTxBytes }}
max_batch_bytes = {{ .Mempool.MaxBatchBytes }}
# Experimental parameters to limit gossiping txs to up to the specified number of peers.
# We use two independent upper values for persistent peers and for non-persistent peers.
# We use two independent upper values for persistent and non-persistent peers.
# Unconditional peers are not affected by this feature.
# If we are connected to more than the specified number of persistent peers, only send txs to
# the first experimental_max_gossip_connections_to_persistent_peers of them. If one of those
# persistent peers disconnects, activate another persistent peer. Similarly for non-persistent
# peers, with an upper limit of experimental_max_gossip_connections_to_non_persistent_peers.
# ExperimentalMaxGossipConnectionsToPersistentPeers of them. If one of those
# persistent peers disconnects, activate another persistent peer.
# Similarly for non-persistent peers, with an upper limit of
# ExperimentalMaxGossipConnectionsToNonPersistentPeers.
# If set to 0, the feature is disabled for the corresponding group of peers, that is, the
# number of active connections to that group of peers is not bounded.
# For non-persistent peers, if enabled, a value of 10 is recommended based on experimental
Expand Down
41 changes: 23 additions & 18 deletions mempool/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,32 +107,37 @@ func (memR *Reactor) AddPeer(peer p2p.Peer) {
go func() {
// Always forward transactions to unconditional peers.
if !memR.Switch.IsPeerUnconditional(peer.ID()) {
// Depending on the type of peer, we choose a semaphore to limit the gossiping peers.
var peerSemaphore *semaphore.Weighted
if peer.IsPersistent() && memR.config.ExperimentalMaxGossipConnectionsToPersistentPeers > 0 {
// Block sending transactions to peer until one of the connections become
// available in the semaphore.
if err := memR.activePersistentPeersSemaphore.Acquire(context.TODO(), 1); err != nil {
memR.Logger.Error("Failed to acquire semaphore: %v", err)
return
}
// Release semaphore to allow other peer to start sending transactions.
defer memR.activePersistentPeersSemaphore.Release(1)
defer memR.mempool.metrics.ActiveOutboundConnections.Add(-1)
peerSemaphore = memR.activePersistentPeersSemaphore
} else if !peer.IsPersistent() && memR.config.ExperimentalMaxGossipConnectionsToNonPersistentPeers > 0 {
peerSemaphore = memR.activeNonPersistentPeersSemaphore
}

if !peer.IsPersistent() && memR.config.ExperimentalMaxGossipConnectionsToNonPersistentPeers > 0 {
// Block sending transactions to peer until one of the connections become
// available in the semaphore.
if err := memR.activeNonPersistentPeersSemaphore.Acquire(context.TODO(), 1); err != nil {
memR.Logger.Error("Failed to acquire semaphore: %v", err)
return
if peerSemaphore != nil {
for peer.IsRunning() {
// Block on the semaphore until a slot is available to start gossiping with this peer.
// Do not block indefinitely, in case the peer is disconnected before gossiping starts.
ctxTimeout, cancel := context.WithTimeout(context.TODO(), 30*time.Second)
// Block sending transactions to peer until one of the connections become
// available in the semaphore.
err := peerSemaphore.Acquire(ctxTimeout, 1)
cancel()

if err != nil {
continue
}

// Release semaphore to allow other peer to start sending transactions.
defer peerSemaphore.Release(1)
break
}
// Release semaphore to allow other peer to start sending transactions.
defer memR.activeNonPersistentPeersSemaphore.Release(1)
defer memR.mempool.metrics.ActiveOutboundConnections.Add(-1)
}
}

memR.mempool.metrics.ActiveOutboundConnections.Add(1)
defer memR.mempool.metrics.ActiveOutboundConnections.Add(-1)
memR.broadcastTxRoutine(peer)
}()
}
Expand Down
2 changes: 1 addition & 1 deletion test/e2e/pkg/manifest.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ type Manifest struct {
// Upper bound of sleep duration then gossipping votes and block parts
PeerGossipIntraloopSleepDuration time.Duration `toml:"peer_gossip_intraloop_sleep_duration"`

// Maximum number of peers to which the node gossip transactions
// Maximum number of peers to which the node gossips transactions
ExperimentalMaxGossipConnectionsToPersistentPeers uint `toml:"experimental_max_gossip_connections_to_persistent_peers"`
ExperimentalMaxGossipConnectionsToNonPersistentPeers uint `toml:"experimental_max_gossip_connections_to_non_persistent_peers"`

Expand Down

0 comments on commit 2679498

Please sign in to comment.