Skip to content

Commit

Permalink
Proper Caplin's subscription listen (erigontech#9734)
Browse files Browse the repository at this point in the history
## Subscription system

Sentinel has a time-based subscription system, which means that
subscriptions persists only until a specific point in time is reached.


```go=
func (sub *GossipSubscription) OverwriteSubscriptionExpiry(expiry time.Time) {
	sub.expiration.Store(expiry)
}
```

All gossip topics are joined which means we can publish at all times,
but not all gossip topics are listened to, which means we are not
listening for all topic's subscriptions. calling the function above will
keep make Caplin listen to that topic until `expiry`is reached. once
rached, the listener will be closed **but we can still publish** if we
set the expiry, so that now we need to listen again, then we will revive
the subscription.
  • Loading branch information
Giulio2002 authored Mar 20, 2024
1 parent af429b8 commit ac1b139
Show file tree
Hide file tree
Showing 5 changed files with 76 additions and 29 deletions.
4 changes: 3 additions & 1 deletion cl/gossip/gossip.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@ const (
TopicNameLightClientFinalityUpdate = "light_client_finality_update"
TopicNameLightClientOptimisticUpdate = "light_client_optimistic_update"

TopicNamePrefixBlobSidecar = "blob_sidecar_%d" // {id} is a placeholder for the blob id
TopicNamePrefixBlobSidecar = "blob_sidecar_%d" // {id} is a placeholder for the blob id
TopicNamePrefixBeaconAttestation = "beacon_attestation_%d"
TopicNamePrefixSyncCommittee = "sync_committee_%d"
)

func TopicNameBlobSidecar(d int) string {
Expand Down
61 changes: 46 additions & 15 deletions cl/sentinel/gossip.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"fmt"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/ledgerwatch/erigon-lib/common"
Expand Down Expand Up @@ -165,7 +166,7 @@ func (s *Sentinel) forkWatcher() {
s.subManager.subscriptions.Range(func(key, value interface{}) bool {
sub := value.(*GossipSubscription)
s.subManager.unsubscribe(key.(string))
newSub, err := s.SubscribeGossip(sub.gossip_topic)
newSub, err := s.SubscribeGossip(sub.gossip_topic, sub.expiration.Load().(time.Time))
if err != nil {
log.Warn("[Gossip] Failed to resubscribe to topic", "err", err)
}
Expand All @@ -178,16 +179,19 @@ func (s *Sentinel) forkWatcher() {
}
}

func (s *Sentinel) SubscribeGossip(topic GossipTopic, opts ...pubsub.TopicOpt) (sub *GossipSubscription, err error) {
func (s *Sentinel) SubscribeGossip(topic GossipTopic, expiration time.Time, opts ...pubsub.TopicOpt) (sub *GossipSubscription, err error) {
digest, err := fork.ComputeForkDigest(s.cfg.BeaconConfig, s.cfg.GenesisConfig)
if err != nil {
log.Error("[Gossip] Failed to calculate fork choice", "err", err)
}
var exp atomic.Value
exp.Store(expiration)
sub = &GossipSubscription{
gossip_topic: topic,
ch: s.subManager.ch,
host: s.host.ID(),
ctx: s.ctx,
expiration: exp,
}
path := fmt.Sprintf("/eth2/%x/%s/%s", digest, topic.Name, topic.CodecStr)
sub.topic, err = s.pubsub.Join(path, opts...)
Expand Down Expand Up @@ -279,31 +283,55 @@ type GossipSubscription struct {
host peer.ID
ch chan *GossipMessage
ctx context.Context
expiration atomic.Value // Unix nano for how much we should listen to this topic
subscribed atomic.Bool

topic *pubsub.Topic
sub *pubsub.Subscription

cf context.CancelFunc
rf pubsub.RelayCancelFunc

setup sync.Once
stopCh chan struct{}
closeOnce sync.Once
}

func (sub *GossipSubscription) Listen() (err error) {
sub.setup.Do(func() {
sub.stopCh = make(chan struct{}, 3)
sub.sub, err = sub.topic.Subscribe()
if err != nil {
err = fmt.Errorf("failed to begin topic %s subscription, err=%w", sub.topic.String(), err)
return
func (sub *GossipSubscription) Listen() {
go func() {
var err error
checkingInterval := time.NewTicker(100 * time.Millisecond)
for {
select {
case <-sub.ctx.Done():
return
case <-checkingInterval.C:
expirationTime := sub.expiration.Load().(time.Time)
if sub.subscribed.Load() && time.Now().After(expirationTime) {
sub.stopCh <- struct{}{}
sub.topic.Close()
sub.subscribed.Store(false)
continue
}
if !sub.subscribed.Load() && time.Now().Before(expirationTime) {
sub.stopCh = make(chan struct{}, 3)
sub.sub, err = sub.topic.Subscribe()
if err != nil {
log.Warn("[Gossip] failed to begin topic subscription", "err", err)
time.Sleep(30 * time.Second)
continue
}
var sctx context.Context
sctx, sub.cf = context.WithCancel(sub.ctx)
go sub.run(sctx, sub.sub, sub.sub.Topic())
sub.subscribed.Store(true)
}
}
}
var sctx context.Context
sctx, sub.cf = context.WithCancel(sub.ctx)
go sub.run(sctx, sub.sub, sub.sub.Topic())
})
return nil
}()
}

func (sub *GossipSubscription) OverwriteSubscriptionExpiry(expiry time.Time) {
sub.expiration.Store(expiry)
}

// calls the cancel func for the subscriber and closes the topic and sub
Expand Down Expand Up @@ -356,6 +384,9 @@ func (s *GossipSubscription) run(ctx context.Context, sub *pubsub.Subscription,
log.Warn("[Sentinel] fail to decode gossip packet", "err", err, "topicName", topicName)
return
}
if msg.Topic != nil {
fmt.Println(*msg.Topic)
}
if msg.ReceivedFrom == s.host {
continue
}
Expand Down
9 changes: 5 additions & 4 deletions cl/sentinel/sentinel_gossip_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,16 +56,17 @@ func TestSentinelGossipOnHardFork(t *testing.T) {
require.NoError(t, sentinel2.Start())
h2 := sentinel2.host

sub1, err := sentinel1.SubscribeGossip(BeaconBlockSsz)
sub1, err := sentinel1.SubscribeGossip(BeaconBlockSsz, time.Unix(0, math.MaxInt64))
require.NoError(t, err)
defer sub1.Close()

require.NoError(t, sub1.Listen())
sub1.Listen()

sub2, err := sentinel2.SubscribeGossip(BeaconBlockSsz)
sub2, err := sentinel2.SubscribeGossip(BeaconBlockSsz, time.Unix(0, math.MaxInt64))
require.NoError(t, err)
defer sub2.Close()
require.NoError(t, sub2.Listen())
sub2.Listen()
time.Sleep(200 * time.Millisecond)

err = h.Connect(ctx, peer.AddrInfo{
ID: h2.ID(),
Expand Down
22 changes: 16 additions & 6 deletions cl/sentinel/service/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,14 @@ import (
"context"
"fmt"
"net"
"strings"
"time"

"github.com/ledgerwatch/erigon/cl/gossip"
"github.com/ledgerwatch/erigon/cl/persistence/blob_storage"
"github.com/ledgerwatch/erigon/cl/phase1/forkchoice"
"github.com/ledgerwatch/erigon/cl/sentinel"
"github.com/ledgerwatch/erigon/common/math"
"github.com/ledgerwatch/erigon/turbo/snapshotsync/freezeblocks"

"github.com/ledgerwatch/erigon-lib/direct"
Expand Down Expand Up @@ -36,6 +39,14 @@ func generateSubnetsTopics(template string, maxIds int) []sentinel.GossipTopic {
return topics
}

func getExpirationForTopic(topic string) time.Time {
if strings.Contains(topic, "beacon_attestation") || strings.Contains(topic, "sync_committee") {
return time.Unix(0, 0)
}

return time.Unix(math.MaxInt64, math.MaxInt64)
}

func createSentinel(cfg *sentinel.SentinelConfig, blockReader freezeblocks.BeaconSnapshotReader, blobStorage blob_storage.BlobStorage, indiciesDB kv.RwDB, forkChoiceReader forkchoice.ForkChoiceStorageReader, logger log.Logger) (*sentinel.Sentinel, error) {
sent, err := sentinel.New(context.Background(), cfg, blockReader, blobStorage, indiciesDB, logger, forkChoiceReader)
if err != nil {
Expand All @@ -56,23 +67,22 @@ func createSentinel(cfg *sentinel.SentinelConfig, blockReader freezeblocks.Beaco
////sentinel.LightClientOptimisticUpdateSsz,
}
gossipTopics = append(gossipTopics, generateSubnetsTopics(gossip.TopicNamePrefixBlobSidecar, int(cfg.BeaconConfig.MaxBlobsPerBlock))...)
// gossipTopics = append(gossipTopics, sentinel.GossipSidecarTopics(chain.MaxBlobsPerBlock)...)
gossipTopics = append(gossipTopics, generateSubnetsTopics(gossip.TopicNamePrefixBeaconAttestation, int(cfg.NetworkConfig.AttestationSubnetCount))...)
gossipTopics = append(gossipTopics, generateSubnetsTopics(gossip.TopicNamePrefixSyncCommittee, int(cfg.BeaconConfig.SyncCommitteeSubnetCount))...)

for _, v := range gossipTopics {
if err := sent.Unsubscribe(v); err != nil {
logger.Error("[Sentinel] failed to start sentinel", "err", err)
continue
}

// now lets separately connect to the gossip topics. this joins the room
subscriber, err := sent.SubscribeGossip(v)
subscriber, err := sent.SubscribeGossip(v, getExpirationForTopic(v.Name)) // Listen forever.
if err != nil {
logger.Error("[Sentinel] failed to start sentinel", "err", err)
}
// actually start the subscription, aka listening and sending packets to the sentinel recv channel
err = subscriber.Listen()
if err != nil {
logger.Error("[Sentinel] failed to start sentinel", "err", err)
}
subscriber.Listen()
}
return sent, nil
}
Expand Down
9 changes: 6 additions & 3 deletions cl/validator/attestation_producer/attestation_producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,12 @@ func (ap *attestationProducer) ProduceAndCacheAttestationData(baseState *state.C
return solid.AttestationData{}, err
}
if baseAttestationData, ok := ap.attestationsCache.Get(epoch); ok {
beaconBlockRoot, err := baseState.GetBlockRootAtSlot(slot)
if err != nil {
return solid.AttestationData{}, err
beaconBlockRoot := baseStateBlockRoot
if baseState.Slot() > slot {
beaconBlockRoot, err = baseState.GetBlockRootAtSlot(slot)
if err != nil {
return solid.AttestationData{}, err
}
}
return solid.NewAttestionDataFromParameters(
slot,
Expand Down

0 comments on commit ac1b139

Please sign in to comment.