Skip to content

Commit

Permalink
fix(sentry): Beacon commitee duties cache overriding (#248)
Browse files Browse the repository at this point in the history
* fix(sentry): Override beacon commitee duties

* Refresh beacon committees when sync status changes

* feat: Add error checks for network name and wallclock availability

* refactor(duties): remove debug log statement
  • Loading branch information
samcm authored Oct 27, 2023
1 parent 0a8a595 commit c5b7238
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 24 deletions.
65 changes: 41 additions & 24 deletions pkg/sentry/ethereum/services/duties.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ type DutiesService struct {
bootstrapped bool

onReadyCallbacks []func(context.Context) error

lastSyncState bool
}

func NewDutiesService(log logrus.FieldLogger, sbeacon beacon.Node, metadata *MetadataService) DutiesService {
Expand All @@ -47,13 +49,15 @@ func NewDutiesService(log logrus.FieldLogger, sbeacon beacon.Node, metadata *Met
metadata: metadata,

bootstrapped: false,

lastSyncState: false,
}
}

func (m *DutiesService) Start(ctx context.Context) error {
go func() {
operation := func() error {
if err := m.fetchRequiredEpochDuties(ctx); err != nil {
if err := m.fetchRequiredEpochDuties(ctx, false); err != nil {
return err
}

Expand All @@ -79,32 +83,46 @@ func (m *DutiesService) Start(ctx context.Context) error {
}()

m.metadata.Wallclock().OnEpochChanged(func(epoch ethwallclock.Epoch) {
if err := m.fetchRequiredEpochDuties(ctx); err != nil {
if err := m.fetchRequiredEpochDuties(ctx, true); err != nil {
m.log.WithError(err).Warn("Failed to fetch required epoch duties")
}

// Sleep for a bit to give the beacon node a chance to run its epoch transition.
// We don't really care about nice-to-have duties so the sleep here is fine.
// "Required" duties (aka the current epoch) will be refetched the moment that epoch
// starts.
time.Sleep(15 * time.Second)

//nolint:errcheck // We don't care about the error here
m.fetchNiceToHaveEpochDuties(ctx)
})

m.beacon.OnChainReOrg(ctx, func(ctx context.Context, ev *v1.ChainReorgEvent) error {
// Clear the cache for the reorged epoch and all future epochs.
for _, epoch := range m.beaconCommittees.Keys() {
if epoch >= ev.Epoch {
m.log.WithFields(logrus.Fields{
"epoch": epoch,
"event": ev,
}).Info("Clearing beacon committee after reorg event")

if err := m.fetchBeaconCommittee(ctx, epoch, true); err != nil {
m.log.WithError(err).WithFields(logrus.Fields{
"epoch": epoch,
"event": ev,
}).Error("Failed to fetch new beacon committee after reorg")
}
m.log.Info("Chain reorg detected - refetching beacon committees")

if err := m.fetchRequiredEpochDuties(ctx, true); err != nil {
m.log.WithError(err).Warn("Failed to fetch required epoch duties")
}

return nil
})

m.beacon.OnSyncStatus(ctx, func(ctx context.Context, ev *beacon.SyncStatusEvent) error {
if ev.State.IsSyncing != m.lastSyncState {
m.log.WithFields(logrus.Fields{
"is_syncing": ev.State.IsSyncing,
}).Info("Sync status changed - refetching beacon committees")

if err := m.fetchRequiredEpochDuties(ctx, true); err != nil {
m.log.
WithError(err).
WithField("is_syncing", ev.State.IsSyncing).
Warn("Failed to fetch required epoch duties after a sync status change")
}
}

m.lastSyncState = ev.State.IsSyncing

return nil
})

Expand Down Expand Up @@ -150,9 +168,6 @@ func (m *DutiesService) NiceToHaveEpochDuties(ctx context.Context) []phase0.Epoc

epochs := []phase0.Epoch{
phase0.Epoch(epochNumber - 1),
phase0.Epoch(epochNumber - 2),
phase0.Epoch(epochNumber - 3),

phase0.Epoch(epochNumber + 1),
}

Expand Down Expand Up @@ -181,14 +196,14 @@ func (m *DutiesService) Ready(ctx context.Context) error {
return nil
}

func (m *DutiesService) fetchRequiredEpochDuties(ctx context.Context) error {
func (m *DutiesService) fetchRequiredEpochDuties(ctx context.Context, overrideCache ...bool) error {
if m.metadata.Wallclock() == nil {
return fmt.Errorf("metadata service is not ready")
}

for _, epoch := range m.RequiredEpochDuties(ctx) {
if duties := m.beaconCommittees.Get(epoch); duties == nil {
if err := m.fetchBeaconCommittee(ctx, epoch); err != nil {
if duties := m.beaconCommittees.Get(epoch); duties == nil || len(overrideCache) != 0 && overrideCache[0] {
if err := m.fetchBeaconCommittee(ctx, epoch, overrideCache...); err != nil {
return err
}
}
Expand Down Expand Up @@ -221,8 +236,8 @@ func (m *DutiesService) fireOnBeaconCommitteeSubscriptions(epoch phase0.Epoch, c
}
}

func (m *DutiesService) fetchBeaconCommittee(ctx context.Context, epoch phase0.Epoch, skipCache ...bool) error {
if len(skipCache) != 0 && !skipCache[0] {
func (m *DutiesService) fetchBeaconCommittee(ctx context.Context, epoch phase0.Epoch, overrideCache ...bool) error {
if len(overrideCache) != 0 && !overrideCache[0] {
if duties := m.beaconCommittees.Get(epoch); duties != nil {
return nil
}
Expand All @@ -231,6 +246,8 @@ func (m *DutiesService) fetchBeaconCommittee(ctx context.Context, epoch phase0.E
m.mu.Lock()
defer m.mu.Unlock()

m.log.WithField("epoch", epoch).WithField("override_cache", overrideCache).Debug("Fetching beacon committee")

committees, err := m.beacon.FetchBeaconCommittees(ctx, "head", epoch)
if err != nil {
m.log.WithError(err).Error("Failed to fetch beacon committees")
Expand Down
4 changes: 4 additions & 0 deletions pkg/sentry/ethereum/services/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,10 @@ func (m *MetadataService) Ready(ctx context.Context) error {
return errors.New("network name is not available")
}

if m.wallclock == nil {
return errors.New("wallclock is not available")
}

return nil
}

Expand Down

0 comments on commit c5b7238

Please sign in to comment.