Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Caplin: Look for peers if not avaiable within subnet #11057

Merged
merged 3 commits into from
Jul 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cl/phase1/stages/stage_history_download.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,8 +144,8 @@ func SpawnStageHistoryDownload(cfg StageHistoryReconstructionCfg, ctx context.Co
}
}
isInElSnapshots := true
frozenBlocksInEL := cfg.engine.FrozenBlocks(ctx)
if blk.Version() >= clparams.BellatrixVersion && cfg.engine != nil && cfg.engine.SupportInsertion() {
frozenBlocksInEL := cfg.engine.FrozenBlocks(ctx)
isInElSnapshots = blk.Block.Body.ExecutionPayload.BlockNumber < frozenBlocksInEL
if cfg.engine.HasGapInSnapshots(ctx) && frozenBlocksInEL > 0 {
destinationSlotForEL = frozenBlocksInEL - 1
Expand Down
14 changes: 9 additions & 5 deletions cl/sentinel/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ import (
"github.com/ledgerwatch/erigon/p2p/enr"
)

const peerSubnetTarget = 4

// ConnectWithPeer is used to attempt to connect and add the peer to our pool
// it errors when if fail to connect with the peer, for instance, if it fails the handshake
// if it does not return an error, the peer is attempted to be added to the pool
Expand Down Expand Up @@ -96,16 +98,18 @@ func (s *Sentinel) listenForPeers() {
break
}

if s.HasTooManyPeers() {
log.Trace("[Sentinel] Not looking for peers, at peer limit")
time.Sleep(100 * time.Millisecond)
continue
}
exists := iterator.Next()
if !exists {
continue
}
node := iterator.Node()

needsPeersForSubnets := s.isPeerUsefulForAnySubnet(node)
if s.HasTooManyPeers() && !needsPeersForSubnets {
log.Trace("[Sentinel] Not looking for peers, at peer limit")
time.Sleep(100 * time.Millisecond)
continue
}
peerInfo, _, err := convertToAddrInfo(node)
if err != nil {
log.Error("[Sentinel] Could not convert to peer info", "err", err)
Expand Down
86 changes: 86 additions & 0 deletions cl/sentinel/sentinel.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,16 @@ import (
"fmt"
"net"
"net/http"
"strconv"
"strings"
"sync"
"time"

"github.com/go-chi/chi/v5"
"github.com/prysmaticlabs/go-bitfield"

"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/erigon/cl/gossip"
"github.com/ledgerwatch/erigon/cl/persistence/blob_storage"
"github.com/ledgerwatch/erigon/cl/phase1/forkchoice"
"github.com/ledgerwatch/erigon/cl/sentinel/handlers"
Expand Down Expand Up @@ -325,6 +329,88 @@ func (s *Sentinel) HasTooManyPeers() bool {
return active >= peers.DefaultMaxPeers
}

func (s *Sentinel) isPeerUsefulForAnySubnet(node *enode.Node) bool {
ret := false
Copy link
Member

@domiwei domiwei Jul 9, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prefer a more meaningful naming here, like peerUseful


nodeAttnets := bitfield.NewBitvector64()
nodeSyncnets := bitfield.NewBitvector4()
if err := node.Load(enr.WithEntry(s.cfg.NetworkConfig.AttSubnetKey, &nodeAttnets)); err != nil {
log.Trace("Could not load att subnet", "err", err)
return false
}
if err := node.Load(enr.WithEntry(s.cfg.NetworkConfig.SyncCommsSubnetKey, &nodeSyncnets)); err != nil {
log.Trace("Could not load sync subnet", "err", err)
return false
}

s.subManager.subscriptions.Range(func(key, value any) bool {
sub := value.(*GossipSubscription)
if sub.sub == nil {
return true
}

if !sub.subscribed.Load() {
return true
}

if len(sub.topic.ListPeers()) > peerSubnetTarget {
return true
}
if gossip.IsTopicBeaconAttestation(sub.sub.Topic()) {
ret = s.isPeerUsefulForAttNet(sub, nodeAttnets)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel the logic can be more explicit so the code turns to be readable, but feel free to see if you'd like to take my advise.

if peerUseful = s.isPeerUsefulForAttNet(sub, nodeAttnets); peerUseful {
  return false
}
return true

return !ret
}

if gossip.IsTopicSyncCommittee(sub.sub.Topic()) {
ret = s.isPeerUsefulForSyncNet(sub, nodeSyncnets)
return !ret
}

return true
})
return ret
}

func (s *Sentinel) isPeerUsefulForAttNet(sub *GossipSubscription, nodeAttnets bitfield.Bitvector64) bool {
splitTopic := strings.Split(sub.sub.Topic(), "/")
if len(splitTopic) < 4 {
return false
}
subnetIdStr, found := strings.CutPrefix(splitTopic[3], "beacon_attestation_")
if !found {
return false
}
subnetId, err := strconv.Atoi(subnetIdStr)
if err != nil {
log.Warn("Could not parse subnet id", "subnet", subnetIdStr, "err", err)
return false
}
// check if subnetIdth bit is set in nodeAttnets
return nodeAttnets.BitAt(uint64(subnetId))

}

func (s *Sentinel) isPeerUsefulForSyncNet(sub *GossipSubscription, nodeSyncnets bitfield.Bitvector4) bool {
splitTopic := strings.Split(sub.sub.Topic(), "/")
if len(splitTopic) < 4 {
return false
}
syncnetIdStr, found := strings.CutPrefix(splitTopic[3], "sync_committee_")
if !found {
return false
}
syncnetId, err := strconv.Atoi(syncnetIdStr)
if err != nil {
log.Warn("Could not parse syncnet id", "syncnet", syncnetIdStr, "err", err)
return false
}
// check if syncnetIdth bit is set in nodeSyncnets
if nodeSyncnets.BitAt(uint64(syncnetId)) {
return true
}
return false
}

func (s *Sentinel) GetPeersCount() (active int, connected int, disconnected int) {
peers := s.host.Network().Peers()

Expand Down
Loading