diff --git a/cl/phase1/stages/stage_history_download.go b/cl/phase1/stages/stage_history_download.go index ad737973ec6..b2b4039d84f 100644 --- a/cl/phase1/stages/stage_history_download.go +++ b/cl/phase1/stages/stage_history_download.go @@ -144,8 +144,8 @@ func SpawnStageHistoryDownload(cfg StageHistoryReconstructionCfg, ctx context.Co } } isInElSnapshots := true - frozenBlocksInEL := cfg.engine.FrozenBlocks(ctx) if blk.Version() >= clparams.BellatrixVersion && cfg.engine != nil && cfg.engine.SupportInsertion() { + frozenBlocksInEL := cfg.engine.FrozenBlocks(ctx) isInElSnapshots = blk.Block.Body.ExecutionPayload.BlockNumber < frozenBlocksInEL if cfg.engine.HasGapInSnapshots(ctx) && frozenBlocksInEL > 0 { destinationSlotForEL = frozenBlocksInEL - 1 diff --git a/cl/sentinel/discovery.go b/cl/sentinel/discovery.go index 2fd0e43197e..1f73ff59502 100644 --- a/cl/sentinel/discovery.go +++ b/cl/sentinel/discovery.go @@ -32,6 +32,8 @@ import ( "github.com/ledgerwatch/erigon/p2p/enr" ) +const peerSubnetTarget = 4 + // ConnectWithPeer is used to attempt to connect and add the peer to our pool // it errors when if fail to connect with the peer, for instance, if it fails the handshake // if it does not return an error, the peer is attempted to be added to the pool @@ -96,16 +98,18 @@ func (s *Sentinel) listenForPeers() { break } - if s.HasTooManyPeers() { - log.Trace("[Sentinel] Not looking for peers, at peer limit") - time.Sleep(100 * time.Millisecond) - continue - } exists := iterator.Next() if !exists { continue } node := iterator.Node() + + needsPeersForSubnets := s.isPeerUsefulForAnySubnet(node) + if s.HasTooManyPeers() && !needsPeersForSubnets { + log.Trace("[Sentinel] Not looking for peers, at peer limit") + time.Sleep(100 * time.Millisecond) + continue + } peerInfo, _, err := convertToAddrInfo(node) if err != nil { log.Error("[Sentinel] Could not convert to peer info", "err", err) diff --git a/cl/sentinel/sentinel.go b/cl/sentinel/sentinel.go index 5b77c60b322..5dc4e747757 100644 --- a/cl/sentinel/sentinel.go +++ b/cl/sentinel/sentinel.go @@ -22,12 +22,16 @@ import ( "fmt" "net" "net/http" + "strconv" + "strings" "sync" "time" "github.com/go-chi/chi/v5" + "github.com/prysmaticlabs/go-bitfield" "github.com/ledgerwatch/erigon-lib/kv" + "github.com/ledgerwatch/erigon/cl/gossip" "github.com/ledgerwatch/erigon/cl/persistence/blob_storage" "github.com/ledgerwatch/erigon/cl/phase1/forkchoice" "github.com/ledgerwatch/erigon/cl/sentinel/handlers" @@ -325,6 +329,88 @@ func (s *Sentinel) HasTooManyPeers() bool { return active >= peers.DefaultMaxPeers } +func (s *Sentinel) isPeerUsefulForAnySubnet(node *enode.Node) bool { + ret := false + + nodeAttnets := bitfield.NewBitvector64() + nodeSyncnets := bitfield.NewBitvector4() + if err := node.Load(enr.WithEntry(s.cfg.NetworkConfig.AttSubnetKey, &nodeAttnets)); err != nil { + log.Trace("Could not load att subnet", "err", err) + return false + } + if err := node.Load(enr.WithEntry(s.cfg.NetworkConfig.SyncCommsSubnetKey, &nodeSyncnets)); err != nil { + log.Trace("Could not load sync subnet", "err", err) + return false + } + + s.subManager.subscriptions.Range(func(key, value any) bool { + sub := value.(*GossipSubscription) + if sub.sub == nil { + return true + } + + if !sub.subscribed.Load() { + return true + } + + if len(sub.topic.ListPeers()) > peerSubnetTarget { + return true + } + if gossip.IsTopicBeaconAttestation(sub.sub.Topic()) { + ret = s.isPeerUsefulForAttNet(sub, nodeAttnets) + return !ret + } + + if gossip.IsTopicSyncCommittee(sub.sub.Topic()) { + ret = s.isPeerUsefulForSyncNet(sub, nodeSyncnets) + return !ret + } + + return true + }) + return ret +} + +func (s *Sentinel) isPeerUsefulForAttNet(sub *GossipSubscription, nodeAttnets bitfield.Bitvector64) bool { + splitTopic := strings.Split(sub.sub.Topic(), "/") + if len(splitTopic) < 4 { + return false + } + subnetIdStr, found := strings.CutPrefix(splitTopic[3], "beacon_attestation_") + if !found { + return false + } + subnetId, err := strconv.Atoi(subnetIdStr) + if err != nil { + log.Warn("Could not parse subnet id", "subnet", subnetIdStr, "err", err) + return false + } + // check if subnetIdth bit is set in nodeAttnets + return nodeAttnets.BitAt(uint64(subnetId)) + +} + +func (s *Sentinel) isPeerUsefulForSyncNet(sub *GossipSubscription, nodeSyncnets bitfield.Bitvector4) bool { + splitTopic := strings.Split(sub.sub.Topic(), "/") + if len(splitTopic) < 4 { + return false + } + syncnetIdStr, found := strings.CutPrefix(splitTopic[3], "sync_committee_") + if !found { + return false + } + syncnetId, err := strconv.Atoi(syncnetIdStr) + if err != nil { + log.Warn("Could not parse syncnet id", "syncnet", syncnetIdStr, "err", err) + return false + } + // check if syncnetIdth bit is set in nodeSyncnets + if nodeSyncnets.BitAt(uint64(syncnetId)) { + return true + } + return false +} + func (s *Sentinel) GetPeersCount() (active int, connected int, disconnected int) { peers := s.host.Network().Peers()