Skip to content

Commit

Permalink
Merge pull request #285 from ethpandaops/feat/cleanup-blobs
Browse files Browse the repository at this point in the history
feat(cannon): cleanup and remove blob ttlcache
  • Loading branch information
Savid authored Mar 8, 2024
2 parents fcb948f + 8ec6231 commit 0c0cf1a
Show file tree
Hide file tree
Showing 5 changed files with 11 additions and 201 deletions.
42 changes: 3 additions & 39 deletions pkg/cannon/deriver/beacon/eth/v1/beacon_blob.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,19 +119,14 @@ func (b *BeaconBlobDeriver) run(rctx context.Context) {
span.AddEvent("Grabbing next location")

// Get the next slot
location, lookAhead, err := b.iterator.Next(ctx)
location, _, err := b.iterator.Next(ctx)
if err != nil {
span.SetStatus(codes.Error, err.Error())

return err
}

span.AddEvent("Obtained next location, looking ahead...", trace.WithAttributes(attribute.Int64("location", int64(location.GetEthV1BeaconBlobSidecar().GetEpoch()))))

// Look ahead
b.lookAheadAtLocation(ctx, lookAhead)

span.AddEvent("Look ahead complete. Processing epoch...")
span.AddEvent("Obtained next location. Processing epoch...", trace.WithAttributes(attribute.Int64("location", int64(location.GetEthV1BeaconBlobSidecar().GetEpoch()))))

// Process the epoch
events, err := b.processEpoch(ctx, phase0.Epoch(location.GetEthV1BeaconBlobSidecar().GetEpoch()))
Expand Down Expand Up @@ -179,37 +174,6 @@ func (b *BeaconBlobDeriver) run(rctx context.Context) {
}
}

// lookAheadAtLocation takes the upcoming locations and looks ahead to do any pre-processing that might be required.
func (b *BeaconBlobDeriver) lookAheadAtLocation(ctx context.Context, locations []*xatu.CannonLocation) {
_, span := observability.Tracer().Start(ctx,
"BeaconBlobDeriver.lookAheadAtLocations",
)
defer span.End()

if locations == nil {
return
}

for _, location := range locations {
// Get the next look ahead epoch
epoch := phase0.Epoch(location.GetEthV1BeaconBlobSidecar().GetEpoch())

sp, err := b.beacon.Node().Spec()
if err != nil {
b.log.WithError(err).WithField("epoch", epoch).Warn("Failed to look ahead at epoch")

return
}

for i := uint64(0); i <= uint64(sp.SlotsPerEpoch-1); i++ {
slot := phase0.Slot(i + uint64(epoch)*uint64(sp.SlotsPerEpoch))

// Add the block sidecars to the preload queue so it's available when we need it
b.beacon.LazyLoadBeaconBlobSidecars(xatuethv1.SlotAsString(slot))
}
}
}

func (b *BeaconBlobDeriver) processEpoch(ctx context.Context, epoch phase0.Epoch) ([]*xatu.DecoratedEvent, error) {
ctx, span := observability.Tracer().Start(ctx,
"BeaconBlobDeriver.processEpoch",
Expand Down Expand Up @@ -245,7 +209,7 @@ func (b *BeaconBlobDeriver) processSlot(ctx context.Context, slot phase0.Slot) (
)
defer span.End()

blobs, err := b.beacon.GetBeaconBlobSidecars(ctx, xatuethv1.SlotAsString(slot))
blobs, err := b.beacon.Node().FetchBeaconBlockBlobs(ctx, xatuethv1.SlotAsString(slot))
if err != nil {
var apiErr *api.Error
if errors.As(err, &apiErr) {
Expand Down
5 changes: 1 addition & 4 deletions pkg/cannon/deriver/beacon/eth/v2/execution_transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,9 +206,6 @@ func (b *ExecutionTransactionDeriver) lookAheadAtLocation(ctx context.Context, l

// Add the block to the preload queue so it's available when we need it
b.beacon.LazyLoadBeaconBlock(xatuethv1.SlotAsString(slot))

// Add the blob sidecars to the preload queue so it's available when we need it
b.beacon.LazyLoadBeaconBlobSidecars(xatuethv1.SlotAsString(slot))
}
}
}
Expand All @@ -235,7 +232,7 @@ func (b *ExecutionTransactionDeriver) processSlot(ctx context.Context, slot phas
return nil, errors.Wrapf(err, "failed to get block identifier for slot %d", slot)
}

blobSidecars, err := b.beacon.GetBeaconBlobSidecars(ctx, xatuethv1.SlotAsString(slot))
blobSidecars, err := b.beacon.Node().FetchBeaconBlockBlobs(ctx, xatuethv1.SlotAsString(slot))
if err != nil {
var apiErr *api.Error
if errors.As(err, &apiErr) {
Expand Down
97 changes: 7 additions & 90 deletions pkg/cannon/ethereum/beacon.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"time"

"github.com/attestantio/go-eth2-client/spec"
"github.com/attestantio/go-eth2-client/spec/deneb"
"github.com/ethpandaops/beacon/pkg/beacon"
"github.com/ethpandaops/xatu/pkg/cannon/ethereum/services"
"github.com/ethpandaops/xatu/pkg/networks"
Expand All @@ -33,13 +32,10 @@ type BeaconNode struct {

onReadyCallbacks []func(ctx context.Context) error

sfGroup *singleflight.Group
blockCache *ttlcache.Cache[string, *spec.VersionedSignedBeaconBlock]
blockPreloadChan chan string
blockPreloadSem chan struct{}
blobSidecarsCache *ttlcache.Cache[string, []*deneb.BlobSidecar]
blobSidecarsPreloadChan chan string
blobSidecarsPreloadSem chan struct{}
sfGroup *singleflight.Group
blockCache *ttlcache.Cache[string, *spec.VersionedSignedBeaconBlock]
blockPreloadChan chan string
blockPreloadSem chan struct{}
}

func NewBeaconNode(ctx context.Context, name string, config *Config, log logrus.FieldLogger) (*BeaconNode, error) {
Expand Down Expand Up @@ -75,8 +71,7 @@ func NewBeaconNode(ctx context.Context, name string, config *Config, log logrus.
}

// Create a buffered channel (semaphore) to limit the number of concurrent goroutines.
blockSem := make(chan struct{}, config.BlockPreloadWorkers)
blobSidecarsSem := make(chan struct{}, config.BlobSidecarsPreloadWorkers)
sem := make(chan struct{}, config.BlockPreloadWorkers)

return &BeaconNode{
config: config,
Expand All @@ -89,14 +84,8 @@ func NewBeaconNode(ctx context.Context, name string, config *Config, log logrus.
ttlcache.WithCapacity[string, *spec.VersionedSignedBeaconBlock](config.BlockCacheSize),
),
blockPreloadChan: make(chan string, config.BlockPreloadQueueSize),
blockPreloadSem: blockSem,
blobSidecarsCache: ttlcache.New(
ttlcache.WithTTL[string, []*deneb.BlobSidecar](config.BlobSidecarsCacheTTL.Duration),
ttlcache.WithCapacity[string, []*deneb.BlobSidecar](config.BlobSidecarsCacheSize),
),
blobSidecarsPreloadChan: make(chan string, config.BlobSidecarsPreloadQueueSize),
blobSidecarsPreloadSem: blobSidecarsSem,
metrics: NewMetrics(namespace, name),
blockPreloadSem: sem,
metrics: NewMetrics(namespace, name),
}, nil
}

Expand Down Expand Up @@ -318,75 +307,3 @@ func (b *BeaconNode) LazyLoadBeaconBlock(identifier string) {

b.blockPreloadChan <- identifier
}

// GetBeaconBlobSidecars returns a block's blob sidecars.
func (b *BeaconNode) GetBeaconBlobSidecars(ctx context.Context, identifier string, ignoreMetrics ...bool) ([]*deneb.BlobSidecar, error) {
ctx, span := observability.Tracer().Start(ctx, "ethereum.beacon.GetBeaconBlobSidecars", trace.WithAttributes(attribute.String("identifier", identifier)))

defer span.End()

b.metrics.IncBlobSidecarsFetched(string(b.Metadata().Network.Name))

// Check the cache first.
if item := b.blobSidecarsCache.Get(identifier); item != nil {
if len(ignoreMetrics) != 0 && ignoreMetrics[0] {
b.metrics.IncBlobSidecarsCacheHit(string(b.Metadata().Network.Name))
}

span.SetAttributes(attribute.Bool("cached", true))

return item.Value(), nil
}

span.SetAttributes(attribute.Bool("cached", false))

if len(ignoreMetrics) != 0 && ignoreMetrics[0] {
b.metrics.IncBlobSidecarsCacheMiss(string(b.Metadata().Network.Name))
}

// Use singleflight to ensure we only make one request for a block at a time.
x, err, shared := b.sfGroup.Do(identifier, func() (interface{}, error) {
span.AddEvent("Acquiring semaphore...")

// Acquire a semaphore before proceeding.
b.blobSidecarsPreloadSem <- struct{}{}
defer func() { <-b.blobSidecarsPreloadSem }()

span.AddEvent("Semaphore acquired. Fetching blob sidecars from beacon api...")

// Not in the cache, so fetch it.
blobSidecars, err := b.beacon.FetchBeaconBlockBlobs(ctx, identifier)
if err != nil {
return nil, err
}

span.AddEvent("BlobSidecar fetched from beacon node.")

// Add it to the cache.
b.blobSidecarsCache.Set(identifier, blobSidecars, time.Hour)

return blobSidecars, nil
})
if err != nil {
span.SetStatus(codes.Error, err.Error())

if len(ignoreMetrics) != 0 && ignoreMetrics[0] {
b.metrics.IncBlobSidecarsFetchErrors(string(b.Metadata().Network.Name))
}

return nil, err
}

span.AddEvent("Block fetching complete.", trace.WithAttributes(attribute.Bool("shared", shared)))

return x.([]*deneb.BlobSidecar), nil
}

func (b *BeaconNode) LazyLoadBeaconBlobSidecars(identifier string) {
// Don't add the blob sidecars to the preload queue if it's already in the cache.
if item := b.blobSidecarsCache.Get(identifier); item != nil {
return
}

b.blobSidecarsPreloadChan <- identifier
}
8 changes: 0 additions & 8 deletions pkg/cannon/ethereum/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,6 @@ type Config struct {
BlockPreloadWorkers uint64 `yaml:"blockPreloadWorkers" default:"5"`
// BlockPreloadQueueSize is the size of the queue for preloading blocks.
BlockPreloadQueueSize uint64 `yaml:"blockPreloadQueueSize" default:"5000"`
// BlobSidecarsCacheSize is the number of blob sidecars to cache.
BlobSidecarsCacheSize uint64 `yaml:"blobSidecarsCacheSize" default:"500"`
// BlobSidecarsCacheTTL is the time to live for blob sidecars in the cache.
BlobSidecarsCacheTTL human.Duration `yaml:"blobSidecarsCacheTtl" default:"10m"`
// BlobSidecarsPreloadWorkers is the number of workers to use for preloading blob sidecars.
BlobSidecarsPreloadWorkers uint64 `yaml:"blobSidecarsPreloadWorkers" default:"5"`
// BlobSidecarsPreloadQueueSize is the size of the queue for preloading blob sidecars.
BlobSidecarsPreloadQueueSize uint64 `yaml:"blobSidecarsPreloadQueueSize" default:"5000"`
}

func (c *Config) Validate() error {
Expand Down
60 changes: 0 additions & 60 deletions pkg/cannon/ethereum/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,6 @@ type Metrics struct {
blockCacheMiss *prometheus.CounterVec
// PreloadBlockQueueSize is the number of blocks in the preload queue.
preloadBlockQueueSize *prometheus.GaugeVec
// The number of blob sidecars that have been fetched.
blobSidecarsFetched *prometheus.CounterVec
// The number of blob sidecars fetches that have failed.
blobSidecarsFetchErrors *prometheus.CounterVec
// blobSidecarsCacheHit is the number of times a blob sidecars was found in the cache.
blobSidecarsCacheHit *prometheus.CounterVec
// blobSidecarsCacheMiss is the number of times a blob sidecars was not found in the cache.
blobSidecarsCacheMiss *prometheus.CounterVec
// preloadBlobSidecarsQueueSize is the number of blob sidecars in the preload queue.
preloadBlobSidecarsQueueSize *prometheus.GaugeVec
}

func NewMetrics(namespace, beaconNodeName string) *Metrics {
Expand Down Expand Up @@ -56,43 +46,13 @@ func NewMetrics(namespace, beaconNodeName string) *Metrics {
Name: "preload_block_queue_size",
Help: "The number of blocks in the preload queue",
}, []string{"network", "beacon"}),
blobSidecarsFetched: prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: namespace,
Name: "blob_sidecars_fetched_total",
Help: "The number of blob sidecars that have been fetched",
}, []string{"network", "beacon"}),
blobSidecarsFetchErrors: prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: namespace,
Name: "blob_sidecars_fetch_errors_total",
Help: "The number of blob sidecars that have failed to be fetched",
}, []string{"network", "beacon"}),
blobSidecarsCacheHit: prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: namespace,
Name: "blob_sidecars_cache_hit_total",
Help: "The number of times a blob sidecars from a block was found in the cache",
}, []string{"network", "beacon"}),
blobSidecarsCacheMiss: prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: namespace,
Name: "blob_sidecars_cache_miss_total",
Help: "The number of times a blob sidecars from a block was not found in the cache",
}, []string{"network", "beacon"}),
preloadBlobSidecarsQueueSize: prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: namespace,
Name: "preload_blob_sidecars_queue_size",
Help: "The number of blob sidecars in the preload queue",
}, []string{"network", "beacon"}),
}

prometheus.MustRegister(m.blocksFetched)
prometheus.MustRegister(m.blocksFetchErrors)
prometheus.MustRegister(m.blockCacheHit)
prometheus.MustRegister(m.blockCacheMiss)
prometheus.MustRegister(m.preloadBlockQueueSize)
prometheus.MustRegister(m.blobSidecarsFetched)
prometheus.MustRegister(m.blobSidecarsFetchErrors)
prometheus.MustRegister(m.blobSidecarsCacheHit)
prometheus.MustRegister(m.blobSidecarsCacheMiss)
prometheus.MustRegister(m.preloadBlobSidecarsQueueSize)

return m
}
Expand All @@ -116,23 +76,3 @@ func (m *Metrics) IncBlockCacheMiss(network string) {
func (m *Metrics) SetPreloadBlockQueueSize(network string, size int) {
m.preloadBlockQueueSize.WithLabelValues(network, m.beacon).Set(float64(size))
}

func (m *Metrics) IncBlobSidecarsFetched(network string) {
m.blobSidecarsFetched.WithLabelValues(network, m.beacon).Inc()
}

func (m *Metrics) IncBlobSidecarsFetchErrors(network string) {
m.blobSidecarsFetchErrors.WithLabelValues(network, m.beacon).Inc()
}

func (m *Metrics) IncBlobSidecarsCacheHit(network string) {
m.blobSidecarsCacheHit.WithLabelValues(network, m.beacon).Inc()
}

func (m *Metrics) IncBlobSidecarsCacheMiss(network string) {
m.blobSidecarsCacheMiss.WithLabelValues(network, m.beacon).Inc()
}

func (m *Metrics) SetPreloadBlobSidecarsQueueSize(network string, size int) {
m.preloadBlobSidecarsQueueSize.WithLabelValues(network, m.beacon).Set(float64(size))
}

0 comments on commit 0c0cf1a

Please sign in to comment.