Skip to content

Commit

Permalink
Caplin: Look for peers if not avaiable within subnet (#11057)
Browse files Browse the repository at this point in the history
Actually look for peers of needed subnet for better performance on
holesky and mainnet

---------

Co-authored-by: Kewei <[email protected]>
  • Loading branch information
Giulio2002 and domiwei authored Jul 16, 2024
1 parent e1ff9c1 commit b52d5ef
Show file tree
Hide file tree
Showing 2 changed files with 95 additions and 5 deletions.
14 changes: 9 additions & 5 deletions cl/sentinel/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ import (
"github.com/ledgerwatch/erigon/p2p/enr"
)

const peerSubnetTarget = 4

// ConnectWithPeer is used to attempt to connect and add the peer to our pool
// it errors when if fail to connect with the peer, for instance, if it fails the handshake
// if it does not return an error, the peer is attempted to be added to the pool
Expand Down Expand Up @@ -96,16 +98,18 @@ func (s *Sentinel) listenForPeers() {
break
}

if s.HasTooManyPeers() {
log.Trace("[Sentinel] Not looking for peers, at peer limit")
time.Sleep(100 * time.Millisecond)
continue
}
exists := iterator.Next()
if !exists {
continue
}
node := iterator.Node()

needsPeersForSubnets := s.isPeerUsefulForAnySubnet(node)
if s.HasTooManyPeers() && !needsPeersForSubnets {
log.Trace("[Sentinel] Not looking for peers, at peer limit")
time.Sleep(100 * time.Millisecond)
continue
}
peerInfo, _, err := convertToAddrInfo(node)
if err != nil {
log.Error("[Sentinel] Could not convert to peer info", "err", err)
Expand Down
86 changes: 86 additions & 0 deletions cl/sentinel/sentinel.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,16 @@ import (
"fmt"
"net"
"net/http"
"strconv"
"strings"
"sync"
"time"

"github.com/go-chi/chi/v5"
"github.com/prysmaticlabs/go-bitfield"

"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/erigon/cl/gossip"
"github.com/ledgerwatch/erigon/cl/persistence/blob_storage"
"github.com/ledgerwatch/erigon/cl/phase1/forkchoice"
"github.com/ledgerwatch/erigon/cl/sentinel/handlers"
Expand Down Expand Up @@ -326,6 +330,88 @@ func (s *Sentinel) HasTooManyPeers() bool {
return active >= peers.DefaultMaxPeers
}

func (s *Sentinel) isPeerUsefulForAnySubnet(node *enode.Node) bool {
ret := false

nodeAttnets := bitfield.NewBitvector64()
nodeSyncnets := bitfield.NewBitvector4()
if err := node.Load(enr.WithEntry(s.cfg.NetworkConfig.AttSubnetKey, &nodeAttnets)); err != nil {
log.Trace("Could not load att subnet", "err", err)
return false
}
if err := node.Load(enr.WithEntry(s.cfg.NetworkConfig.SyncCommsSubnetKey, &nodeSyncnets)); err != nil {
log.Trace("Could not load sync subnet", "err", err)
return false
}

s.subManager.subscriptions.Range(func(key, value any) bool {
sub := value.(*GossipSubscription)
if sub.sub == nil {
return true
}

if !sub.subscribed.Load() {
return true
}

if len(sub.topic.ListPeers()) > peerSubnetTarget {
return true
}
if gossip.IsTopicBeaconAttestation(sub.sub.Topic()) {
ret = s.isPeerUsefulForAttNet(sub, nodeAttnets)
return !ret
}

if gossip.IsTopicSyncCommittee(sub.sub.Topic()) {
ret = s.isPeerUsefulForSyncNet(sub, nodeSyncnets)
return !ret
}

return true
})
return ret
}

func (s *Sentinel) isPeerUsefulForAttNet(sub *GossipSubscription, nodeAttnets bitfield.Bitvector64) bool {
splitTopic := strings.Split(sub.sub.Topic(), "/")
if len(splitTopic) < 4 {
return false
}
subnetIdStr, found := strings.CutPrefix(splitTopic[3], "beacon_attestation_")
if !found {
return false
}
subnetId, err := strconv.Atoi(subnetIdStr)
if err != nil {
log.Warn("Could not parse subnet id", "subnet", subnetIdStr, "err", err)
return false
}
// check if subnetIdth bit is set in nodeAttnets
return nodeAttnets.BitAt(uint64(subnetId))

}

func (s *Sentinel) isPeerUsefulForSyncNet(sub *GossipSubscription, nodeSyncnets bitfield.Bitvector4) bool {
splitTopic := strings.Split(sub.sub.Topic(), "/")
if len(splitTopic) < 4 {
return false
}
syncnetIdStr, found := strings.CutPrefix(splitTopic[3], "sync_committee_")
if !found {
return false
}
syncnetId, err := strconv.Atoi(syncnetIdStr)
if err != nil {
log.Warn("Could not parse syncnet id", "syncnet", syncnetIdStr, "err", err)
return false
}
// check if syncnetIdth bit is set in nodeSyncnets
if nodeSyncnets.BitAt(uint64(syncnetId)) {
return true
}
return false
}

func (s *Sentinel) GetPeersCount() (active int, connected int, disconnected int) {
peers := s.host.Network().Peers()

Expand Down

0 comments on commit b52d5ef

Please sign in to comment.