From 4b66123afb8eab12b4843e9a237425f879f79e4d Mon Sep 17 00:00:00 2001 From: maskpp Date: Sun, 31 Mar 2024 12:55:20 +0800 Subject: [PATCH] simplify driver --- cmd/utils/sub_command.go | 15 +-- .../beaconsync/progress_tracker.go | 92 +++++++------------ .../beaconsync/progress_tracker_test.go | 18 ++-- driver/chain_syncer/beaconsync/syncer.go | 24 ++--- driver/chain_syncer/calldata/syncer.go | 9 +- driver/chain_syncer/calldata/syncer_test.go | 8 +- driver/chain_syncer/chain_syncer.go | 25 +++-- driver/chain_syncer/chain_syncer_test.go | 4 +- driver/config_test.go | 3 +- driver/driver.go | 70 ++++---------- driver/driver_test.go | 15 ++- driver/state/state.go | 16 ++-- integration_test/deploy_l1_contract.sh | 16 ++-- integration_test/l1_env.sh | 2 +- internal/metrics/metrics.go | 5 +- internal/testutils/helper.go | 4 +- internal/testutils/interfaces.go | 4 +- proposer/config_test.go | 3 +- proposer/proposer.go | 6 +- proposer/proposer_test.go | 2 +- prover/config_test.go | 3 +- prover/prover.go | 8 +- 22 files changed, 129 insertions(+), 223 deletions(-) diff --git a/cmd/utils/sub_command.go b/cmd/utils/sub_command.go index d4149a19e..787c6aac6 100644 --- a/cmd/utils/sub_command.go +++ b/cmd/utils/sub_command.go @@ -1,7 +1,6 @@ package utils import ( - "context" "os" "os/signal" "syscall" @@ -14,20 +13,17 @@ import ( ) type SubcommandApplication interface { - InitFromCli(context.Context, *cli.Context) error + InitFromCli(*cli.Context) error Name() string Start() error - Close(context.Context) + Close() } func SubcommandAction(app SubcommandApplication) cli.ActionFunc { return func(c *cli.Context) error { logger.InitLogger(c) - ctx, ctxClose := context.WithCancel(context.Background()) - defer ctxClose() - - if err := app.InitFromCli(ctx, c); err != nil { + if err := app.InitFromCli(c); err != nil { return err } @@ -38,14 +34,13 @@ func SubcommandAction(app SubcommandApplication) cli.ActionFunc { return err } - if err := metrics.Serve(ctx, c); err != nil { + if err := metrics.Serve(c); err != nil { log.Error("Starting metrics server error", "error", err) return err } defer func() { - ctxClose() - app.Close(ctx) + app.Close() log.Info("Application stopped", "name", app.Name()) }() diff --git a/driver/chain_syncer/beaconsync/progress_tracker.go b/driver/chain_syncer/beaconsync/progress_tracker.go index 1948b8ddf..e11feeb79 100644 --- a/driver/chain_syncer/beaconsync/progress_tracker.go +++ b/driver/chain_syncer/beaconsync/progress_tracker.go @@ -2,8 +2,7 @@ package beaconsync import ( "context" - "math/big" - "sync" + "sync/atomic" "time" "github.com/ethereum/go-ethereum" @@ -25,19 +24,16 @@ type SyncProgressTracker struct { client *rpc.EthClient // Meta data - triggered bool - lastSyncedVerifiedBlockID *big.Int - lastSyncedVerifiedBlockHash common.Hash + triggered atomic.Bool + lastSyncedVerifiedBlockID atomic.Uint64 + lastSyncedVerifiedBlockHash atomic.Value // Out-of-sync check related lastSyncProgress *ethereum.SyncProgress lastProgressedTime time.Time timeout time.Duration - outOfSync bool + outOfSync atomic.Bool ticker *time.Ticker - - // Read-write mutex - mutex sync.RWMutex } // NewSyncProgressTracker creates a new SyncProgressTracker instance. @@ -60,15 +56,12 @@ func (t *SyncProgressTracker) Track(ctx context.Context) { // track is the internal implementation of MonitorSyncProgress, tries to // track the L2 execution engine's beacon sync progress. func (t *SyncProgressTracker) track(ctx context.Context) { - t.mutex.Lock() - defer t.mutex.Unlock() - - if !t.triggered { + if !t.triggered.Load() { log.Debug("Beacon sync not triggered") return } - if t.outOfSync { + if t.outOfSync.Load() { return } @@ -94,12 +87,12 @@ func (t *SyncProgressTracker) track(ctx context.Context) { return } - if new(big.Int).SetUint64(headHeight).Cmp(t.lastSyncedVerifiedBlockID) >= 0 { + if headHeight >= t.lastSyncedVerifiedBlockID.Load() { t.lastProgressedTime = time.Now() log.Info( "L2 execution engine has finished the P2P sync work, all verified blocks synced, "+ "will switch to insert pending blocks one by one", - "lastSyncedVerifiedBlockID", t.lastSyncedVerifiedBlockID, + "lastSyncedVerifiedBlockID", t.lastSyncedVerifiedBlockID.Load(), "lastSyncedVerifiedBlockHash", t.lastSyncedVerifiedBlockHash, ) return @@ -112,7 +105,7 @@ func (t *SyncProgressTracker) track(ctx context.Context) { // Check whether the L2 execution engine has synced any new block through P2P since last event loop. if syncProgressed(t.lastSyncProgress, progress) { - t.outOfSync = false + t.outOfSync.Store(false) t.lastProgressedTime = time.Now() return } @@ -120,7 +113,7 @@ func (t *SyncProgressTracker) track(ctx context.Context) { // Has not synced any new block since last loop, check whether reaching the timeout. if time.Since(t.lastProgressedTime) > t.timeout { // Mark the L2 execution engine out of sync. - t.outOfSync = true + t.outOfSync.Store(true) log.Warn( "L2 execution engine is not able to sync through P2P", @@ -131,80 +124,59 @@ func (t *SyncProgressTracker) track(ctx context.Context) { } // UpdateMeta updates the inner beacon sync metadata. -func (t *SyncProgressTracker) UpdateMeta(id *big.Int, blockHash common.Hash) { - t.mutex.Lock() - defer t.mutex.Unlock() - +func (t *SyncProgressTracker) UpdateMeta(id uint64, blockHash common.Hash) { log.Debug("Update sync progress tracker meta", "id", id, "hash", blockHash) - if !t.triggered { + if !t.triggered.Load() { t.lastProgressedTime = time.Now() } - t.triggered = true - t.lastSyncedVerifiedBlockID = id - t.lastSyncedVerifiedBlockHash = blockHash + t.triggered.Store(true) + t.lastSyncedVerifiedBlockID.Store(id) + t.lastSyncedVerifiedBlockHash.Store(blockHash) } // ClearMeta cleans the inner beacon sync metadata. func (t *SyncProgressTracker) ClearMeta() { - t.mutex.Lock() - defer t.mutex.Unlock() - log.Debug("Clear sync progress tracker meta") - t.triggered = false - t.lastSyncedVerifiedBlockID = nil - t.lastSyncedVerifiedBlockHash = common.Hash{} - t.outOfSync = false + t.triggered.Store(false) + t.lastSyncedVerifiedBlockID.Store(0) + t.lastSyncedVerifiedBlockHash.Store(common.Hash{}) + t.outOfSync.Store(false) } // HeadChanged checks if a new beacon sync request will be needed. -func (t *SyncProgressTracker) HeadChanged(newID *big.Int) bool { - t.mutex.RLock() - defer t.mutex.RUnlock() - - if !t.triggered { +func (t *SyncProgressTracker) HeadChanged(newID uint64) bool { + if !t.triggered.Load() { return true } - return t.lastSyncedVerifiedBlockID != nil && t.lastSyncedVerifiedBlockID != newID + return t.lastSyncedVerifiedBlockID.Load() != newID } // OutOfSync tells whether the L2 execution engine is marked as out of sync. func (t *SyncProgressTracker) OutOfSync() bool { - t.mutex.RLock() - defer t.mutex.RUnlock() - - return t.outOfSync + return t.outOfSync.Load() } // Triggered returns tracker.triggered. func (t *SyncProgressTracker) Triggered() bool { - t.mutex.RLock() - defer t.mutex.RUnlock() - - return t.triggered + return t.triggered.Load() } // LastSyncedVerifiedBlockID returns tracker.lastSyncedVerifiedBlockID. -func (t *SyncProgressTracker) LastSyncedVerifiedBlockID() *big.Int { - t.mutex.RLock() - defer t.mutex.RUnlock() - - if t.lastSyncedVerifiedBlockID == nil { - return nil - } - - return new(big.Int).Set(t.lastSyncedVerifiedBlockID) +func (t *SyncProgressTracker) LastSyncedVerifiedBlockID() uint64 { + return t.lastSyncedVerifiedBlockID.Load() } // LastSyncedVerifiedBlockHash returns tracker.lastSyncedVerifiedBlockHash. func (t *SyncProgressTracker) LastSyncedVerifiedBlockHash() common.Hash { - t.mutex.RLock() - defer t.mutex.RUnlock() - - return t.lastSyncedVerifiedBlockHash + val := t.lastSyncedVerifiedBlockHash.Load() + if val == nil { + return common.Hash{} + } + return val.(common.Hash) } // syncProgressed checks whether there is any new progress since last sync progress check. diff --git a/driver/chain_syncer/beaconsync/progress_tracker_test.go b/driver/chain_syncer/beaconsync/progress_tracker_test.go index 37d0a03a0..8ae8c4ba7 100644 --- a/driver/chain_syncer/beaconsync/progress_tracker_test.go +++ b/driver/chain_syncer/beaconsync/progress_tracker_test.go @@ -59,22 +59,22 @@ func (s *BeaconSyncProgressTrackerTestSuite) TestTrack() { // Triggered ctx, cancel = context.WithCancel(context.Background()) - s.t.UpdateMeta(common.Big256, testutils.RandomHash()) + s.t.UpdateMeta(common.Big256.Uint64(), testutils.RandomHash()) go s.t.Track(ctx) time.Sleep(syncProgressCheckInterval + 5*time.Second) cancel() } func (s *BeaconSyncProgressTrackerTestSuite) TestClearMeta() { - s.t.triggered = true + s.t.triggered.Store(true) s.t.ClearMeta() - s.False(s.t.triggered) + s.False(s.t.triggered.Load()) } func (s *BeaconSyncProgressTrackerTestSuite) TestHeadChanged() { - s.True(s.t.HeadChanged(common.Big256)) - s.t.triggered = true - s.False(s.t.HeadChanged(common.Big256)) + s.True(s.t.HeadChanged(common.Big256.Uint64())) + s.t.triggered.Store(true) + s.False(s.t.HeadChanged(common.Big256.Uint64())) } func (s *BeaconSyncProgressTrackerTestSuite) TestOutOfSync() { @@ -87,8 +87,8 @@ func (s *BeaconSyncProgressTrackerTestSuite) TestTriggered() { func (s *BeaconSyncProgressTrackerTestSuite) TestLastSyncedVerifiedBlockID() { s.Nil(s.t.LastSyncedVerifiedBlockID()) - s.t.lastSyncedVerifiedBlockID = common.Big1 - s.Equal(common.Big1.Uint64(), s.t.LastSyncedVerifiedBlockID().Uint64()) + s.t.lastSyncedVerifiedBlockID.Store(1) + s.Equal(common.Big1.Uint64(), s.t.LastSyncedVerifiedBlockID()) } func (s *BeaconSyncProgressTrackerTestSuite) TestLastSyncedVerifiedBlockHash() { @@ -97,7 +97,7 @@ func (s *BeaconSyncProgressTrackerTestSuite) TestLastSyncedVerifiedBlockHash() { s.t.LastSyncedVerifiedBlockHash(), ) randomHash := testutils.RandomHash() - s.t.lastSyncedVerifiedBlockHash = randomHash + s.t.lastSyncedVerifiedBlockHash.Store(randomHash) s.Equal(randomHash, s.t.LastSyncedVerifiedBlockHash()) } diff --git a/driver/chain_syncer/beaconsync/syncer.go b/driver/chain_syncer/beaconsync/syncer.go index d263edc7b..da3b9aa39 100644 --- a/driver/chain_syncer/beaconsync/syncer.go +++ b/driver/chain_syncer/beaconsync/syncer.go @@ -5,7 +5,6 @@ import ( "fmt" "math/big" - "github.com/ethereum/go-ethereum/accounts/abi/bind" "github.com/ethereum/go-ethereum/beacon/engine" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/log" @@ -36,8 +35,8 @@ func NewSyncer( // TriggerBeaconSync triggers the L2 execution engine to start performing a beacon sync, if the // latest verified block has changed. -func (s *Syncer) TriggerBeaconSync() error { - blockID, latestVerifiedHeadPayload, err := s.getVerifiedBlockPayload(s.ctx) +func (s *Syncer) TriggerBeaconSync(blockID uint64) error { + latestVerifiedHeadPayload, err := s.getVerifiedBlockPayload(s.ctx, blockID) if err != nil { return err } @@ -92,29 +91,24 @@ func (s *Syncer) TriggerBeaconSync() error { // getVerifiedBlockPayload fetches the latest verified block's header, and converts it to an Engine API executable data, // which will be used to let the node start beacon syncing. -func (s *Syncer) getVerifiedBlockPayload(ctx context.Context) (*big.Int, *engine.ExecutableData, error) { - stateVars, err := s.rpc.GetProtocolStateVariables(&bind.CallOpts{Context: ctx}) +func (s *Syncer) getVerifiedBlockPayload(ctx context.Context, blockID uint64) (*engine.ExecutableData, error) { + blockInfo, err := s.rpc.GetL2BlockInfo(ctx, new(big.Int).SetUint64(blockID)) if err != nil { - return nil, nil, err + return nil, err } - blockInfo, err := s.rpc.GetL2BlockInfo(ctx, new(big.Int).SetUint64(stateVars.B.LastVerifiedBlockId)) + header, err := s.rpc.L2CheckPoint.HeaderByNumber(s.ctx, new(big.Int).SetUint64(blockID)) if err != nil { - return nil, nil, err - } - - header, err := s.rpc.L2CheckPoint.HeaderByNumber(s.ctx, new(big.Int).SetUint64(stateVars.B.LastVerifiedBlockId)) - if err != nil { - return nil, nil, err + return nil, err } if header.Hash() != blockInfo.Ts.BlockHash { - return nil, nil, fmt.Errorf( + return nil, fmt.Errorf( "latest verified block hash mismatch: %s != %s", header.Hash(), common.BytesToHash(blockInfo.Ts.BlockHash[:]), ) } log.Info("Latest verified block header retrieved", "hash", header.Hash()) - return new(big.Int).SetUint64(stateVars.B.LastVerifiedBlockId), encoding.ToExecutableData(header), nil + return encoding.ToExecutableData(header), nil } diff --git a/driver/chain_syncer/calldata/syncer.go b/driver/chain_syncer/calldata/syncer.go index 6b678ffe2..258062f8d 100644 --- a/driver/chain_syncer/calldata/syncer.go +++ b/driver/chain_syncer/calldata/syncer.go @@ -79,9 +79,9 @@ func NewSyncer( // ProcessL1Blocks fetches all `TaikoL1.BlockProposed` events between given // L1 block heights, and then tries inserting them into L2 execution engine's blockchain. -func (s *Syncer) ProcessL1Blocks(ctx context.Context, l1End *types.Header) error { +func (s *Syncer) ProcessL1Blocks(ctx context.Context) error { for { - if err := s.processL1Blocks(ctx, l1End); err != nil { + if err := s.processL1Blocks(ctx); err != nil { return err } @@ -98,7 +98,8 @@ func (s *Syncer) ProcessL1Blocks(ctx context.Context, l1End *types.Header) error // processL1Blocks is the inner method which responsible for processing // all new L1 blocks. -func (s *Syncer) processL1Blocks(ctx context.Context, l1End *types.Header) error { +func (s *Syncer) processL1Blocks(ctx context.Context) error { + l1End := s.state.GetL1Head() startL1Current := s.state.GetL1Current() // If there is a L1 reorg, sometimes this will happen. if startL1Current.Number.Uint64() >= l1End.Number.Uint64() && startL1Current.Hash() != l1End.Hash() { @@ -211,7 +212,7 @@ func (s *Syncer) onBlockProposed( ) if s.progressTracker.Triggered() { // Already synced through beacon sync, just skip this event. - if event.BlockId.Cmp(s.progressTracker.LastSyncedVerifiedBlockID()) <= 0 { + if event.BlockId.Uint64() <= s.progressTracker.LastSyncedVerifiedBlockID() { return nil } diff --git a/driver/chain_syncer/calldata/syncer_test.go b/driver/chain_syncer/calldata/syncer_test.go index e1e670623..3706e26ee 100644 --- a/driver/chain_syncer/calldata/syncer_test.go +++ b/driver/chain_syncer/calldata/syncer_test.go @@ -61,16 +61,12 @@ func (s *CalldataSyncerTestSuite) TestCancelNewSyncer() { } func (s *CalldataSyncerTestSuite) TestProcessL1Blocks() { - head, err := s.s.rpc.L1.HeaderByNumber(context.Background(), nil) - s.Nil(err) - s.Nil(s.s.ProcessL1Blocks(context.Background(), head)) + s.Nil(s.s.ProcessL1Blocks(context.Background())) } func (s *CalldataSyncerTestSuite) TestProcessL1BlocksReorg() { - head, err := s.s.rpc.L1.HeaderByNumber(context.Background(), nil) s.ProposeAndInsertEmptyBlocks(s.p, s.s) - s.Nil(err) - s.Nil(s.s.ProcessL1Blocks(context.Background(), head)) + s.Nil(s.s.ProcessL1Blocks(context.Background())) } func (s *CalldataSyncerTestSuite) TestOnBlockProposed() { diff --git a/driver/chain_syncer/chain_syncer.go b/driver/chain_syncer/chain_syncer.go index 903175f6b..1f3113404 100644 --- a/driver/chain_syncer/chain_syncer.go +++ b/driver/chain_syncer/chain_syncer.go @@ -6,7 +6,6 @@ import ( "time" "github.com/ethereum/go-ethereum/accounts/abi/bind" - "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/log" "github.com/taikoxyz/taiko-client/driver/chain_syncer/beaconsync" @@ -64,8 +63,8 @@ func New( } // Sync performs a sync operation to L2 execution engine's local chain. -func (s *L2ChainSyncer) Sync(l1End *types.Header) error { - needNewBeaconSyncTriggered, err := s.needNewBeaconSyncTriggered() +func (s *L2ChainSyncer) Sync() error { + blockID, needNewBeaconSyncTriggered, err := s.needNewBeaconSyncTriggered() if err != nil { return err } @@ -73,7 +72,7 @@ func (s *L2ChainSyncer) Sync(l1End *types.Header) error { // `P2PSyncVerifiedBlocks` flag is set, try triggering a beacon sync in L2 execution engine to catch up the // latest verified block head. if needNewBeaconSyncTriggered { - if err := s.beaconSyncer.TriggerBeaconSync(); err != nil { + if err := s.beaconSyncer.TriggerBeaconSync(blockID); err != nil { return fmt.Errorf("trigger beacon sync error: %w", err) } @@ -109,11 +108,11 @@ func (s *L2ChainSyncer) Sync(l1End *types.Header) error { } // Reset to the latest L2 execution engine's chain status. - s.progressTracker.UpdateMeta(l2Head.Number, l2Head.Hash()) + s.progressTracker.UpdateMeta(l2Head.Number.Uint64(), l2Head.Hash()) } // Insert the proposed block one by one. - return s.calldataSyncer.ProcessL1Blocks(s.ctx, l1End) + return s.calldataSyncer.ProcessL1Blocks(s.ctx) } // AheadOfProtocolVerifiedHead checks whether the L2 chain is ahead of verified head in protocol. @@ -137,8 +136,8 @@ func (s *L2ChainSyncer) AheadOfProtocolVerifiedHead(verifiedHeightToCompare uint return false } - if s.progressTracker.LastSyncedVerifiedBlockID() != nil { - return s.state.GetL2Head().Number.Uint64() >= s.progressTracker.LastSyncedVerifiedBlockID().Uint64() + if s.progressTracker.LastSyncedVerifiedBlockID() != 0 { + return s.state.GetL2Head().Number.Uint64() >= s.progressTracker.LastSyncedVerifiedBlockID() } return true @@ -150,23 +149,23 @@ func (s *L2ChainSyncer) AheadOfProtocolVerifiedHead(verifiedHeightToCompare uint // 2. The protocol's latest verified block head is not zero. // 3. The L2 execution engine's chain is behind of the protocol's latest verified block head. // 4. The L2 execution engine's chain have met a sync timeout issue. -func (s *L2ChainSyncer) needNewBeaconSyncTriggered() (bool, error) { +func (s *L2ChainSyncer) needNewBeaconSyncTriggered() (uint64, bool, error) { // If the flag is not set, we simply return false. if !s.p2pSyncVerifiedBlocks { - return false, nil + return 0, false, nil } stateVars, err := s.rpc.GetProtocolStateVariables(&bind.CallOpts{Context: s.ctx}) if err != nil { - return false, err + return 0, false, err } // If the protocol's latest verified block head is zero, we simply return false. if stateVars.B.LastVerifiedBlockId == 0 { - return false, nil + return 0, false, nil } - return !s.AheadOfProtocolVerifiedHead(stateVars.B.LastVerifiedBlockId) && + return stateVars.B.LastVerifiedBlockId, !s.AheadOfProtocolVerifiedHead(stateVars.B.LastVerifiedBlockId) && !s.progressTracker.OutOfSync(), nil } diff --git a/driver/chain_syncer/chain_syncer_test.go b/driver/chain_syncer/chain_syncer_test.go index 200453652..1bae7c909 100644 --- a/driver/chain_syncer/chain_syncer_test.go +++ b/driver/chain_syncer/chain_syncer_test.go @@ -96,9 +96,7 @@ func (s *ChainSyncerTestSuite) TestGetInnerSyncers() { } func (s *ChainSyncerTestSuite) TestSync() { - head, err := s.RPCClient.L1.HeaderByNumber(context.Background(), nil) - s.Nil(err) - s.Nil(s.s.Sync(head)) + s.Nil(s.s.Sync()) } func (s *ChainSyncerTestSuite) TestAheadOfProtocolVerifiedHead2() { diff --git a/driver/config_test.go b/driver/config_test.go index 5232aa03f..888e0557f 100644 --- a/driver/config_test.go +++ b/driver/config_test.go @@ -1,7 +1,6 @@ package driver import ( - "context" "os" "time" @@ -38,7 +37,7 @@ func (s *DriverTestSuite) TestNewConfigFromCliContext() { s.NotEmpty(c.JwtSecret) s.True(c.P2PSyncVerifiedBlocks) s.Equal(l2CheckPoint, c.L2CheckPoint) - s.NotNil(new(Driver).InitFromCli(context.Background(), ctx)) + s.NotNil(new(Driver).InitFromCli(ctx)) return err } diff --git a/driver/driver.go b/driver/driver.go index 0f2561270..6157dfae5 100644 --- a/driver/driver.go +++ b/driver/driver.go @@ -33,28 +33,26 @@ type Driver struct { l2ChainSyncer *chainSyncer.L2ChainSyncer state *state.State - l1HeadCh chan *types.Header - l1HeadSub event.Subscription - syncNotify chan struct{} + l1HeadCh chan *types.Header + l1HeadSub event.Subscription ctx context.Context wg sync.WaitGroup } // InitFromCli initializes the given driver instance based on the command line flags. -func (d *Driver) InitFromCli(ctx context.Context, c *cli.Context) error { +func (d *Driver) InitFromCli(c *cli.Context) error { cfg, err := NewConfigFromCliContext(c) if err != nil { return err } - return d.InitFromConfig(ctx, cfg) + return d.InitFromConfig(c.Context, cfg) } // InitFromConfig initializes the driver instance based on the given configurations. func (d *Driver) InitFromConfig(ctx context.Context, cfg *Config) (err error) { d.l1HeadCh = make(chan *types.Header, 1024) - d.syncNotify = make(chan struct{}, 1) d.ctx = ctx d.Config = cfg @@ -101,7 +99,7 @@ func (d *Driver) Start() error { } // Close closes the driver instance. -func (d *Driver) Close(_ context.Context) { +func (d *Driver) Close() { d.l1HeadSub.Unsubscribe() d.state.Close() d.wg.Wait() @@ -112,21 +110,9 @@ func (d *Driver) eventLoop() { d.wg.Add(1) defer d.wg.Done() - // reqSync requests performing a synchronising operation, won't block - // if we are already synchronising. - reqSync := func() { - select { - case d.syncNotify <- struct{}{}: - default: - } - } - // doSyncWithBackoff performs a synchronising operation with a backoff strategy. doSyncWithBackoff := func() { - if err := backoff.Retry( - d.doSync, - backoff.WithContext(backoff.NewConstantBackOff(d.RetryInterval), d.ctx), - ); err != nil { + if err := d.l2ChainSyncer.Sync(); err != nil { log.Error("Sync L2 execution engine's block chain error", "error", err) } } @@ -138,32 +124,12 @@ func (d *Driver) eventLoop() { select { case <-d.ctx.Done(): return - case <-d.syncNotify: - doSyncWithBackoff() case <-d.l1HeadCh: - reqSync() + doSyncWithBackoff() } } } -// doSync fetches all `BlockProposed` events emitted from local -// L1 sync cursor to the L1 head, and then applies all corresponding -// L2 blocks into node's local blockchain. -func (d *Driver) doSync() error { - // Check whether the application is closing. - if d.ctx.Err() != nil { - log.Warn("Driver context error", "error", d.ctx.Err()) - return nil - } - - if err := d.l2ChainSyncer.Sync(d.state.GetL1Head()); err != nil { - log.Error("Process new L1 blocks error", "error", err) - return err - } - - return nil -} - // ChainSyncer returns the driver's chain syncer, this method // should only be used for testing. func (d *Driver) ChainSyncer() *chainSyncer.L2ChainSyncer { @@ -239,18 +205,16 @@ func (d *Driver) exchangeTransitionConfigLoop() { case <-d.ctx.Done(): return case <-ticker.C: - func() { - tc, err := d.rpc.L2Engine.ExchangeTransitionConfiguration(d.ctx, &engine.TransitionConfigurationV1{ - TerminalTotalDifficulty: (*hexutil.Big)(common.Big0), - TerminalBlockHash: common.Hash{}, - TerminalBlockNumber: 0, - }) - if err != nil { - log.Error("Failed to exchange Transition Configuration", "error", err) - } else { - log.Debug("Exchanged transition config", "transitionConfig", tc) - } - }() + tc, err := d.rpc.L2Engine.ExchangeTransitionConfiguration(d.ctx, &engine.TransitionConfigurationV1{ + TerminalTotalDifficulty: (*hexutil.Big)(common.Big0), + TerminalBlockHash: common.Hash{}, + TerminalBlockNumber: 0, + }) + if err != nil { + log.Error("Failed to exchange Transition Configuration", "error", err) + } else { + log.Debug("Exchanged transition config", "transitionConfig", tc) + } } } } diff --git a/driver/driver_test.go b/driver/driver_test.go index cb42d2ee7..629b1ffec 100644 --- a/driver/driver_test.go +++ b/driver/driver_test.go @@ -58,13 +58,10 @@ func (s *DriverTestSuite) TestName() { } func (s *DriverTestSuite) TestProcessL1Blocks() { - l1Head1, err := s.d.rpc.L1.HeaderByNumber(context.Background(), nil) - s.Nil(err) - l2Head1, err := s.d.rpc.L2.HeaderByNumber(context.Background(), nil) s.Nil(err) - s.Nil(s.d.ChainSyncer().CalldataSyncer().ProcessL1Blocks(context.Background(), l1Head1)) + s.Nil(s.d.ChainSyncer().CalldataSyncer().ProcessL1Blocks(context.Background())) // Propose a valid L2 block s.ProposeAndInsertValidBlock(s.p, s.d.ChainSyncer().CalldataSyncer()) @@ -148,7 +145,7 @@ func (s *DriverTestSuite) TestCheckL1ReorgToHigherFork() { s.Greater(l1Head4.Number.Uint64(), l1Head2.Number.Uint64()) - s.Nil(s.d.ChainSyncer().CalldataSyncer().ProcessL1Blocks(context.Background(), l1Head4)) + s.Nil(s.d.ChainSyncer().CalldataSyncer().ProcessL1Blocks(context.Background())) l2Head3, err := s.d.rpc.L2.HeaderByNumber(context.Background(), nil) s.Nil(err) @@ -206,7 +203,7 @@ func (s *DriverTestSuite) TestCheckL1ReorgToLowerFork() { s.Greater(l1Head4.Number.Uint64(), l1Head3.Number.Uint64()) s.Less(l1Head4.Number.Uint64(), l1Head2.Number.Uint64()) - s.Nil(s.d.ChainSyncer().CalldataSyncer().ProcessL1Blocks(context.Background(), l1Head4)) + s.Nil(s.d.ChainSyncer().CalldataSyncer().ProcessL1Blocks(context.Background())) l2Head3, err := s.d.rpc.L2.HeaderByNumber(context.Background(), nil) s.Nil(err) @@ -263,7 +260,7 @@ func (s *DriverTestSuite) TestCheckL1ReorgToSameHeightFork() { s.Greater(l1Head4.Number.Uint64(), l1Head3.Number.Uint64()) - s.Nil(s.d.ChainSyncer().CalldataSyncer().ProcessL1Blocks(context.Background(), l1Head4)) + s.Nil(s.d.ChainSyncer().CalldataSyncer().ProcessL1Blocks(context.Background())) l2Head3, err := s.d.rpc.L2.HeaderByNumber(context.Background(), nil) s.Nil(err) @@ -276,13 +273,13 @@ func (s *DriverTestSuite) TestCheckL1ReorgToSameHeightFork() { } func (s *DriverTestSuite) TestDoSyncNoNewL2Blocks() { - s.Nil(s.d.doSync()) + s.Nil(s.d.l2ChainSyncer.Sync()) } func (s *DriverTestSuite) TestStartClose() { s.Nil(s.d.Start()) s.cancel() - s.d.Close(context.Background()) + s.d.Close() } func (s *DriverTestSuite) TestL1Current() { diff --git a/driver/state/state.go b/driver/state/state.go index 6e8e632c7..48f0351e6 100644 --- a/driver/state/state.go +++ b/driver/state/state.go @@ -22,10 +22,10 @@ type State struct { // Feeds l1HeadsFeed event.Feed // L1 new heads notification feed - l1Head *atomic.Value // Latest known L1 head - l2Head *atomic.Value // Current L2 execution engine's local chain head - l2HeadBlockID *atomic.Value // Latest known L2 block ID in protocol - l1Current *atomic.Value // Current L1 block sync cursor + l1Head atomic.Value // Latest known L1 head + l2Head atomic.Value // Current L2 execution engine's local chain head + l2HeadBlockID atomic.Value // Latest known L2 block ID in protocol + l1Current atomic.Value // Current L1 block sync cursor // Constants GenesisL1Height *big.Int @@ -40,12 +40,8 @@ type State struct { // New creates a new driver state instance. func New(ctx context.Context, rpc *rpc.Client) (*State, error) { s := &State{ - rpc: rpc, - l1Head: new(atomic.Value), - l2Head: new(atomic.Value), - l2HeadBlockID: new(atomic.Value), - l1Current: new(atomic.Value), - stopCh: make(chan struct{}), + rpc: rpc, + stopCh: make(chan struct{}), } if err := s.init(ctx); err != nil { diff --git a/integration_test/deploy_l1_contract.sh b/integration_test/deploy_l1_contract.sh index 4e3f44942..694c57452 100755 --- a/integration_test/deploy_l1_contract.sh +++ b/integration_test/deploy_l1_contract.sh @@ -9,11 +9,11 @@ source integration_test/l1_env.sh check_env "TAIKO_MONO_DIR" cd "$TAIKO_MONO_DIR"/packages/protocol && - forge script script/DeployOnL1.s.sol:DeployOnL1 \ - --fork-url "$L1_NODE_HTTP_ENDPOINT" \ - --broadcast \ - --ffi \ - -vvvvv \ - --evm-version cancun \ - --private-key "$PRIVATE_KEY" \ - --block-gas-limit 100000000 + forge script script/DeployOnL1.s.sol:DeployOnL1 \ + --fork-url "$L1_NODE_HTTP_ENDPOINT" \ + --broadcast \ + --ffi \ + -vvvvv \ + --evm-version cancun \ + --private-key "$PRIVATE_KEY" \ + --block-gas-limit 100000000 diff --git a/integration_test/l1_env.sh b/integration_test/l1_env.sh index 76b1fc923..7ef412926 100755 --- a/integration_test/l1_env.sh +++ b/integration_test/l1_env.sh @@ -27,5 +27,5 @@ export GUARDIAN_PROVERS=${GUARDIAN_PROVERS_ADDRESSES:1} export MIN_GUARDIANS=${#GUARDIAN_PROVERS_ADDRESSES_LIST[@]} # Get the hash of L2 genesis. -export L2_GENESIS_HASH=$(cast block --rpc-url "$L2_EXECUTION_ENGINE_HTTP_ENDPOINT" 0x0 -f hash) +export L2_GENESIS_HASH=$(cast block --rpc-url "$L2_EXECUTION_ENGINE_WS_ENDPOINT" 0x0 -f hash) echo "L2_GENESIS_HASH: $L2_GENESIS_HASH" diff --git a/internal/metrics/metrics.go b/internal/metrics/metrics.go index 002b1dc8a..1187398d5 100644 --- a/internal/metrics/metrics.go +++ b/internal/metrics/metrics.go @@ -1,7 +1,6 @@ package metrics import ( - "context" "net" "net/http" "strconv" @@ -55,7 +54,7 @@ var ( // Serve starts the metrics server on the given address, will be closed when the given // context is cancelled. -func Serve(ctx context.Context, c *cli.Context) error { +func Serve(c *cli.Context) error { if !c.Bool(flags.MetricsEnabled.Name) { return nil } @@ -72,7 +71,7 @@ func Serve(ctx context.Context, c *cli.Context) error { } go func() { - <-ctx.Done() + <-c.Context.Done() if err := server.Close(); err != nil { log.Error("Failed to close metrics server", "error", err) } diff --git a/internal/testutils/helper.go b/internal/testutils/helper.go index 0a72da2d6..d127ef853 100644 --- a/internal/testutils/helper.go +++ b/internal/testutils/helper.go @@ -74,7 +74,7 @@ func (s *ClientTestSuite) ProposeAndInsertEmptyBlocks( defer cancel() s.Nil(backoff.Retry(func() error { - return calldataSyncer.ProcessL1Blocks(ctx, newL1Head) + return calldataSyncer.ProcessL1Blocks(ctx) }, backoff.NewExponentialBackOff())) s.Nil(s.RPCClient.WaitTillL2ExecutionEngineSynced(context.Background())) @@ -142,7 +142,7 @@ func (s *ClientTestSuite) ProposeAndInsertValidBlock( defer cancel() s.Nil(backoff.Retry(func() error { - return calldataSyncer.ProcessL1Blocks(ctx, newL1Head) + return calldataSyncer.ProcessL1Blocks(ctx) }, backoff.NewExponentialBackOff())) s.Nil(s.RPCClient.WaitTillL2ExecutionEngineSynced(context.Background())) diff --git a/internal/testutils/interfaces.go b/internal/testutils/interfaces.go index a6be5ef48..08ea83d29 100644 --- a/internal/testutils/interfaces.go +++ b/internal/testutils/interfaces.go @@ -3,13 +3,11 @@ package testutils import ( "context" - "github.com/ethereum/go-ethereum/core/types" - "github.com/taikoxyz/taiko-client/cmd/utils" ) type CalldataSyncer interface { - ProcessL1Blocks(ctx context.Context, l1End *types.Header) error + ProcessL1Blocks(ctx context.Context) error } type Proposer interface { diff --git a/proposer/config_test.go b/proposer/config_test.go index e2c9fa858..d30213585 100644 --- a/proposer/config_test.go +++ b/proposer/config_test.go @@ -1,7 +1,6 @@ package proposer import ( - "context" "fmt" "os" "strings" @@ -58,7 +57,7 @@ func (s *ProposerTestSuite) TestNewConfigFromCliContext() { s.Equal(c.ProverEndpoints[i].String(), e) } - s.Nil(new(Proposer).InitFromCli(context.Background(), ctx)) + s.Nil(new(Proposer).InitFromCli(ctx)) return nil } diff --git a/proposer/proposer.go b/proposer/proposer.go index 52c26e6c5..9c03ac22c 100644 --- a/proposer/proposer.go +++ b/proposer/proposer.go @@ -70,13 +70,13 @@ type Proposer struct { } // InitFromCli New initializes the given proposer instance based on the command line flags. -func (p *Proposer) InitFromCli(ctx context.Context, c *cli.Context) error { +func (p *Proposer) InitFromCli(c *cli.Context) error { cfg, err := NewConfigFromCliContext(c) if err != nil { return err } - return p.InitFromConfig(ctx, cfg) + return p.InitFromConfig(c.Context, cfg) } // InitFromConfig initializes the proposer instance based on the given configurations. @@ -209,7 +209,7 @@ func (p *Proposer) eventLoop() { } // Close closes the proposer instance. -func (p *Proposer) Close(_ context.Context) { +func (p *Proposer) Close() { p.wg.Wait() } diff --git a/proposer/proposer_test.go b/proposer/proposer_test.go index 76d867782..7819fdce9 100644 --- a/proposer/proposer_test.go +++ b/proposer/proposer_test.go @@ -184,7 +184,7 @@ func (s *ProposerTestSuite) TestUpdateProposingTicker() { func (s *ProposerTestSuite) TestStartClose() { s.Nil(s.p.Start()) s.cancel() - s.NotPanics(func() { s.p.Close(context.Background()) }) + s.NotPanics(func() { s.p.Close() }) } func TestProposerTestSuite(t *testing.T) { diff --git a/prover/config_test.go b/prover/config_test.go index a6214060d..2d1e15296 100644 --- a/prover/config_test.go +++ b/prover/config_test.go @@ -1,7 +1,6 @@ package prover import ( - "context" "fmt" "os" "time" @@ -53,7 +52,7 @@ func (s *ProverTestSuite) TestNewConfigFromCliContextGuardianProver() { s.Equal(uint64(minTierFee), c.MinSgxTierFee.Uint64()) s.Equal(c.L1NodeVersion, l1NodeVersion) s.Equal(c.L2NodeVersion, l2NodeVersion) - s.Nil(new(Prover).InitFromCli(context.Background(), ctx)) + s.Nil(new(Prover).InitFromCli(ctx)) s.True(c.ProveUnassignedBlocks) s.Equal(uint64(100), c.MaxProposedIn) s.Equal(os.Getenv("ASSIGNMENT_HOOK_ADDRESS"), c.AssignmentHookAddress.String()) diff --git a/prover/prover.go b/prover/prover.go index 228415002..bce82c30f 100644 --- a/prover/prover.go +++ b/prover/prover.go @@ -78,13 +78,13 @@ type Prover struct { } // InitFromCli initializes the given prover instance based on the command line flags. -func (p *Prover) InitFromCli(ctx context.Context, c *cli.Context) error { +func (p *Prover) InitFromCli(c *cli.Context) error { cfg, err := NewConfigFromCliContext(c) if err != nil { return err } - return InitFromConfig(ctx, p, cfg) + return InitFromConfig(c.Context, p, cfg) } // InitFromConfig initializes the prover instance based on the given configurations. @@ -312,8 +312,8 @@ func (p *Prover) eventLoop() { } // Close closes the prover instance. -func (p *Prover) Close(ctx context.Context) { - if err := p.server.Shutdown(ctx); err != nil { +func (p *Prover) Close() { + if err := p.server.Shutdown(p.ctx); err != nil { log.Error("Failed to shut down prover server", "error", err) } p.wg.Wait()