Skip to content
This repository has been archived by the owner on May 11, 2024. It is now read-only.

Commit

Permalink
simplify driver
Browse files Browse the repository at this point in the history
  • Loading branch information
mask-pp committed Apr 1, 2024
1 parent bd1c324 commit 4b66123
Show file tree
Hide file tree
Showing 22 changed files with 129 additions and 223 deletions.
15 changes: 5 additions & 10 deletions cmd/utils/sub_command.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package utils

import (
"context"
"os"
"os/signal"
"syscall"
Expand All @@ -14,20 +13,17 @@ import (
)

type SubcommandApplication interface {
InitFromCli(context.Context, *cli.Context) error
InitFromCli(*cli.Context) error
Name() string
Start() error
Close(context.Context)
Close()
}

func SubcommandAction(app SubcommandApplication) cli.ActionFunc {
return func(c *cli.Context) error {
logger.InitLogger(c)

ctx, ctxClose := context.WithCancel(context.Background())
defer ctxClose()

if err := app.InitFromCli(ctx, c); err != nil {
if err := app.InitFromCli(c); err != nil {
return err
}

Expand All @@ -38,14 +34,13 @@ func SubcommandAction(app SubcommandApplication) cli.ActionFunc {
return err
}

if err := metrics.Serve(ctx, c); err != nil {
if err := metrics.Serve(c); err != nil {
log.Error("Starting metrics server error", "error", err)
return err
}

defer func() {
ctxClose()
app.Close(ctx)
app.Close()
log.Info("Application stopped", "name", app.Name())
}()

Expand Down
92 changes: 32 additions & 60 deletions driver/chain_syncer/beaconsync/progress_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,7 @@ package beaconsync

import (
"context"
"math/big"
"sync"
"sync/atomic"
"time"

"github.com/ethereum/go-ethereum"
Expand All @@ -25,19 +24,16 @@ type SyncProgressTracker struct {
client *rpc.EthClient

// Meta data
triggered bool
lastSyncedVerifiedBlockID *big.Int
lastSyncedVerifiedBlockHash common.Hash
triggered atomic.Bool
lastSyncedVerifiedBlockID atomic.Uint64
lastSyncedVerifiedBlockHash atomic.Value

// Out-of-sync check related
lastSyncProgress *ethereum.SyncProgress
lastProgressedTime time.Time
timeout time.Duration
outOfSync bool
outOfSync atomic.Bool
ticker *time.Ticker

// Read-write mutex
mutex sync.RWMutex
}

// NewSyncProgressTracker creates a new SyncProgressTracker instance.
Expand All @@ -60,15 +56,12 @@ func (t *SyncProgressTracker) Track(ctx context.Context) {
// track is the internal implementation of MonitorSyncProgress, tries to
// track the L2 execution engine's beacon sync progress.
func (t *SyncProgressTracker) track(ctx context.Context) {
t.mutex.Lock()
defer t.mutex.Unlock()

if !t.triggered {
if !t.triggered.Load() {
log.Debug("Beacon sync not triggered")
return
}

if t.outOfSync {
if t.outOfSync.Load() {
return
}

Expand All @@ -94,12 +87,12 @@ func (t *SyncProgressTracker) track(ctx context.Context) {
return
}

if new(big.Int).SetUint64(headHeight).Cmp(t.lastSyncedVerifiedBlockID) >= 0 {
if headHeight >= t.lastSyncedVerifiedBlockID.Load() {
t.lastProgressedTime = time.Now()
log.Info(
"L2 execution engine has finished the P2P sync work, all verified blocks synced, "+
"will switch to insert pending blocks one by one",
"lastSyncedVerifiedBlockID", t.lastSyncedVerifiedBlockID,
"lastSyncedVerifiedBlockID", t.lastSyncedVerifiedBlockID.Load(),
"lastSyncedVerifiedBlockHash", t.lastSyncedVerifiedBlockHash,
)
return
Expand All @@ -112,15 +105,15 @@ func (t *SyncProgressTracker) track(ctx context.Context) {

// Check whether the L2 execution engine has synced any new block through P2P since last event loop.
if syncProgressed(t.lastSyncProgress, progress) {
t.outOfSync = false
t.outOfSync.Store(false)
t.lastProgressedTime = time.Now()
return
}

// Has not synced any new block since last loop, check whether reaching the timeout.
if time.Since(t.lastProgressedTime) > t.timeout {
// Mark the L2 execution engine out of sync.
t.outOfSync = true
t.outOfSync.Store(true)

log.Warn(
"L2 execution engine is not able to sync through P2P",
Expand All @@ -131,80 +124,59 @@ func (t *SyncProgressTracker) track(ctx context.Context) {
}

// UpdateMeta updates the inner beacon sync metadata.
func (t *SyncProgressTracker) UpdateMeta(id *big.Int, blockHash common.Hash) {
t.mutex.Lock()
defer t.mutex.Unlock()

func (t *SyncProgressTracker) UpdateMeta(id uint64, blockHash common.Hash) {
log.Debug("Update sync progress tracker meta", "id", id, "hash", blockHash)

if !t.triggered {
if !t.triggered.Load() {
t.lastProgressedTime = time.Now()
}

t.triggered = true
t.lastSyncedVerifiedBlockID = id
t.lastSyncedVerifiedBlockHash = blockHash
t.triggered.Store(true)
t.lastSyncedVerifiedBlockID.Store(id)
t.lastSyncedVerifiedBlockHash.Store(blockHash)
}

// ClearMeta cleans the inner beacon sync metadata.
func (t *SyncProgressTracker) ClearMeta() {
t.mutex.Lock()
defer t.mutex.Unlock()

log.Debug("Clear sync progress tracker meta")

t.triggered = false
t.lastSyncedVerifiedBlockID = nil
t.lastSyncedVerifiedBlockHash = common.Hash{}
t.outOfSync = false
t.triggered.Store(false)
t.lastSyncedVerifiedBlockID.Store(0)
t.lastSyncedVerifiedBlockHash.Store(common.Hash{})
t.outOfSync.Store(false)
}

// HeadChanged checks if a new beacon sync request will be needed.
func (t *SyncProgressTracker) HeadChanged(newID *big.Int) bool {
t.mutex.RLock()
defer t.mutex.RUnlock()

if !t.triggered {
func (t *SyncProgressTracker) HeadChanged(newID uint64) bool {
if !t.triggered.Load() {
return true
}

return t.lastSyncedVerifiedBlockID != nil && t.lastSyncedVerifiedBlockID != newID
return t.lastSyncedVerifiedBlockID.Load() != newID
}

// OutOfSync tells whether the L2 execution engine is marked as out of sync.
func (t *SyncProgressTracker) OutOfSync() bool {
t.mutex.RLock()
defer t.mutex.RUnlock()

return t.outOfSync
return t.outOfSync.Load()
}

// Triggered returns tracker.triggered.
func (t *SyncProgressTracker) Triggered() bool {
t.mutex.RLock()
defer t.mutex.RUnlock()

return t.triggered
return t.triggered.Load()
}

// LastSyncedVerifiedBlockID returns tracker.lastSyncedVerifiedBlockID.
func (t *SyncProgressTracker) LastSyncedVerifiedBlockID() *big.Int {
t.mutex.RLock()
defer t.mutex.RUnlock()

if t.lastSyncedVerifiedBlockID == nil {
return nil
}

return new(big.Int).Set(t.lastSyncedVerifiedBlockID)
func (t *SyncProgressTracker) LastSyncedVerifiedBlockID() uint64 {
return t.lastSyncedVerifiedBlockID.Load()
}

// LastSyncedVerifiedBlockHash returns tracker.lastSyncedVerifiedBlockHash.
func (t *SyncProgressTracker) LastSyncedVerifiedBlockHash() common.Hash {
t.mutex.RLock()
defer t.mutex.RUnlock()

return t.lastSyncedVerifiedBlockHash
val := t.lastSyncedVerifiedBlockHash.Load()
if val == nil {
return common.Hash{}
}
return val.(common.Hash)
}

// syncProgressed checks whether there is any new progress since last sync progress check.
Expand Down
18 changes: 9 additions & 9 deletions driver/chain_syncer/beaconsync/progress_tracker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,22 +59,22 @@ func (s *BeaconSyncProgressTrackerTestSuite) TestTrack() {

// Triggered
ctx, cancel = context.WithCancel(context.Background())
s.t.UpdateMeta(common.Big256, testutils.RandomHash())
s.t.UpdateMeta(common.Big256.Uint64(), testutils.RandomHash())
go s.t.Track(ctx)
time.Sleep(syncProgressCheckInterval + 5*time.Second)
cancel()
}

func (s *BeaconSyncProgressTrackerTestSuite) TestClearMeta() {
s.t.triggered = true
s.t.triggered.Store(true)
s.t.ClearMeta()
s.False(s.t.triggered)
s.False(s.t.triggered.Load())
}

func (s *BeaconSyncProgressTrackerTestSuite) TestHeadChanged() {
s.True(s.t.HeadChanged(common.Big256))
s.t.triggered = true
s.False(s.t.HeadChanged(common.Big256))
s.True(s.t.HeadChanged(common.Big256.Uint64()))
s.t.triggered.Store(true)
s.False(s.t.HeadChanged(common.Big256.Uint64()))
}

func (s *BeaconSyncProgressTrackerTestSuite) TestOutOfSync() {
Expand All @@ -87,8 +87,8 @@ func (s *BeaconSyncProgressTrackerTestSuite) TestTriggered() {

func (s *BeaconSyncProgressTrackerTestSuite) TestLastSyncedVerifiedBlockID() {
s.Nil(s.t.LastSyncedVerifiedBlockID())
s.t.lastSyncedVerifiedBlockID = common.Big1
s.Equal(common.Big1.Uint64(), s.t.LastSyncedVerifiedBlockID().Uint64())
s.t.lastSyncedVerifiedBlockID.Store(1)
s.Equal(common.Big1.Uint64(), s.t.LastSyncedVerifiedBlockID())
}

func (s *BeaconSyncProgressTrackerTestSuite) TestLastSyncedVerifiedBlockHash() {
Expand All @@ -97,7 +97,7 @@ func (s *BeaconSyncProgressTrackerTestSuite) TestLastSyncedVerifiedBlockHash() {
s.t.LastSyncedVerifiedBlockHash(),
)
randomHash := testutils.RandomHash()
s.t.lastSyncedVerifiedBlockHash = randomHash
s.t.lastSyncedVerifiedBlockHash.Store(randomHash)
s.Equal(randomHash, s.t.LastSyncedVerifiedBlockHash())
}

Expand Down
24 changes: 9 additions & 15 deletions driver/chain_syncer/beaconsync/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"fmt"
"math/big"

"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/beacon/engine"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
Expand Down Expand Up @@ -36,8 +35,8 @@ func NewSyncer(

// TriggerBeaconSync triggers the L2 execution engine to start performing a beacon sync, if the
// latest verified block has changed.
func (s *Syncer) TriggerBeaconSync() error {
blockID, latestVerifiedHeadPayload, err := s.getVerifiedBlockPayload(s.ctx)
func (s *Syncer) TriggerBeaconSync(blockID uint64) error {
latestVerifiedHeadPayload, err := s.getVerifiedBlockPayload(s.ctx, blockID)
if err != nil {
return err
}
Expand Down Expand Up @@ -92,29 +91,24 @@ func (s *Syncer) TriggerBeaconSync() error {

// getVerifiedBlockPayload fetches the latest verified block's header, and converts it to an Engine API executable data,
// which will be used to let the node start beacon syncing.
func (s *Syncer) getVerifiedBlockPayload(ctx context.Context) (*big.Int, *engine.ExecutableData, error) {
stateVars, err := s.rpc.GetProtocolStateVariables(&bind.CallOpts{Context: ctx})
func (s *Syncer) getVerifiedBlockPayload(ctx context.Context, blockID uint64) (*engine.ExecutableData, error) {
blockInfo, err := s.rpc.GetL2BlockInfo(ctx, new(big.Int).SetUint64(blockID))
if err != nil {
return nil, nil, err
return nil, err
}

blockInfo, err := s.rpc.GetL2BlockInfo(ctx, new(big.Int).SetUint64(stateVars.B.LastVerifiedBlockId))
header, err := s.rpc.L2CheckPoint.HeaderByNumber(s.ctx, new(big.Int).SetUint64(blockID))
if err != nil {
return nil, nil, err
}

header, err := s.rpc.L2CheckPoint.HeaderByNumber(s.ctx, new(big.Int).SetUint64(stateVars.B.LastVerifiedBlockId))
if err != nil {
return nil, nil, err
return nil, err
}

if header.Hash() != blockInfo.Ts.BlockHash {
return nil, nil, fmt.Errorf(
return nil, fmt.Errorf(
"latest verified block hash mismatch: %s != %s", header.Hash(), common.BytesToHash(blockInfo.Ts.BlockHash[:]),
)
}

log.Info("Latest verified block header retrieved", "hash", header.Hash())

return new(big.Int).SetUint64(stateVars.B.LastVerifiedBlockId), encoding.ToExecutableData(header), nil
return encoding.ToExecutableData(header), nil
}
9 changes: 5 additions & 4 deletions driver/chain_syncer/calldata/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,9 @@ func NewSyncer(

// ProcessL1Blocks fetches all `TaikoL1.BlockProposed` events between given
// L1 block heights, and then tries inserting them into L2 execution engine's blockchain.
func (s *Syncer) ProcessL1Blocks(ctx context.Context, l1End *types.Header) error {
func (s *Syncer) ProcessL1Blocks(ctx context.Context) error {
for {
if err := s.processL1Blocks(ctx, l1End); err != nil {
if err := s.processL1Blocks(ctx); err != nil {
return err
}

Expand All @@ -98,7 +98,8 @@ func (s *Syncer) ProcessL1Blocks(ctx context.Context, l1End *types.Header) error

// processL1Blocks is the inner method which responsible for processing
// all new L1 blocks.
func (s *Syncer) processL1Blocks(ctx context.Context, l1End *types.Header) error {
func (s *Syncer) processL1Blocks(ctx context.Context) error {
l1End := s.state.GetL1Head()
startL1Current := s.state.GetL1Current()
// If there is a L1 reorg, sometimes this will happen.
if startL1Current.Number.Uint64() >= l1End.Number.Uint64() && startL1Current.Hash() != l1End.Hash() {
Expand Down Expand Up @@ -211,7 +212,7 @@ func (s *Syncer) onBlockProposed(
)
if s.progressTracker.Triggered() {
// Already synced through beacon sync, just skip this event.
if event.BlockId.Cmp(s.progressTracker.LastSyncedVerifiedBlockID()) <= 0 {
if event.BlockId.Uint64() <= s.progressTracker.LastSyncedVerifiedBlockID() {
return nil
}

Expand Down
8 changes: 2 additions & 6 deletions driver/chain_syncer/calldata/syncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,16 +61,12 @@ func (s *CalldataSyncerTestSuite) TestCancelNewSyncer() {
}

func (s *CalldataSyncerTestSuite) TestProcessL1Blocks() {
head, err := s.s.rpc.L1.HeaderByNumber(context.Background(), nil)
s.Nil(err)
s.Nil(s.s.ProcessL1Blocks(context.Background(), head))
s.Nil(s.s.ProcessL1Blocks(context.Background()))
}

func (s *CalldataSyncerTestSuite) TestProcessL1BlocksReorg() {
head, err := s.s.rpc.L1.HeaderByNumber(context.Background(), nil)
s.ProposeAndInsertEmptyBlocks(s.p, s.s)
s.Nil(err)
s.Nil(s.s.ProcessL1Blocks(context.Background(), head))
s.Nil(s.s.ProcessL1Blocks(context.Background()))
}

func (s *CalldataSyncerTestSuite) TestOnBlockProposed() {
Expand Down
Loading

0 comments on commit 4b66123

Please sign in to comment.