diff --git a/op-e2e/actions/l2_engine.go b/op-e2e/actions/l2_engine.go index d1230ec8b7c5..b8647c382c4e 100644 --- a/op-e2e/actions/l2_engine.go +++ b/op-e2e/actions/l2_engine.go @@ -18,6 +18,7 @@ import ( "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/node" + "github.com/ethereum/go-ethereum/p2p/enode" "github.com/ethereum/go-ethereum/rpc" "github.com/ethereum-optimism/optimism/op-node/rollup" @@ -50,7 +51,7 @@ type EngineOption func(ethCfg *ethconfig.Config, nodeCfg *node.Config) error func NewL2Engine(t Testing, log log.Logger, genesis *core.Genesis, rollupGenesisL1 eth.BlockID, jwtPath string, options ...EngineOption) *L2Engine { n, ethBackend, apiBackend := newBackend(t, genesis, jwtPath, options) - engineApi := engineapi.NewL2EngineAPI(log, apiBackend) + engineApi := engineapi.NewL2EngineAPI(log, apiBackend, ethBackend.Downloader()) chain := ethBackend.BlockChain() genesisBlock := chain.Genesis() eng := &L2Engine{ @@ -131,6 +132,16 @@ func (e *engineApiBackend) Genesis() *core.Genesis { return e.genesis } +func (s *L2Engine) Enode() *enode.Node { + return s.node.Server().LocalNode().Node() +} + +func (s *L2Engine) AddPeers(peers ...*enode.Node) { + for _, en := range peers { + s.node.Server().AddPeer(en) + } +} + func (s *L2Engine) EthClient() *ethclient.Client { cl := s.node.Attach() return ethclient.NewClient(cl) diff --git a/op-e2e/actions/l2_sequencer_test.go b/op-e2e/actions/l2_sequencer_test.go index 654787cd8268..2bfd8d6cb0dc 100644 --- a/op-e2e/actions/l2_sequencer_test.go +++ b/op-e2e/actions/l2_sequencer_test.go @@ -5,6 +5,10 @@ import ( "testing" "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/eth/ethconfig" + "github.com/ethereum/go-ethereum/node" + "github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/common" @@ -16,6 +20,22 @@ import ( "github.com/ethereum-optimism/optimism/op-service/testlog" ) +func EngineWithP2P() EngineOption { + return func(ethCfg *ethconfig.Config, nodeCfg *node.Config) error { + p2pKey, err := crypto.GenerateKey() + if err != nil { + return err + } + nodeCfg.P2P = p2p.Config{ + MaxPeers: 100, + NoDiscovery: true, + ListenAddr: "127.0.0.1:0", + PrivateKey: p2pKey, + } + return nil + } +} + func setupSequencerTest(t Testing, sd *e2eutils.SetupData, log log.Logger) (*L1Miner, *L2Engine, *L2Sequencer) { jwtPath := e2eutils.WriteDefaultJWT(t) @@ -23,7 +43,7 @@ func setupSequencerTest(t Testing, sd *e2eutils.SetupData, log log.Logger) (*L1M l1F, err := sources.NewL1Client(miner.RPCClient(), log, nil, sources.L1ClientDefaultConfig(sd.RollupCfg, false, sources.RPCKindStandard)) require.NoError(t, err) - engine := NewL2Engine(t, log, sd.L2Cfg, sd.RollupCfg.Genesis.L1, jwtPath) + engine := NewL2Engine(t, log, sd.L2Cfg, sd.RollupCfg.Genesis.L1, jwtPath, EngineWithP2P()) l2Cl, err := sources.NewEngineClient(engine.RPCClient(), log, nil, sources.EngineClientDefaultConfig(sd.RollupCfg)) require.NoError(t, err) diff --git a/op-e2e/actions/l2_verifier.go b/op-e2e/actions/l2_verifier.go index 29f250737cea..3e8095ae182a 100644 --- a/op-e2e/actions/l2_verifier.go +++ b/op-e2e/actions/l2_verifier.go @@ -254,3 +254,13 @@ func (s *L2Verifier) ActL2UnsafeGossipReceive(payload *eth.ExecutionPayloadEnvel s.derivation.AddUnsafePayload(payload) } } + +// ActL2InsertUnsafePayload creates an action that can insert an unsafe execution payload +func (s *L2Verifier) ActL2InsertUnsafePayload(payload *eth.ExecutionPayloadEnvelope) Action { + return func(t Testing) { + ref, err := derive.PayloadToBlockRef(s.rollupCfg, payload.ExecutionPayload) + require.NoError(t, err) + err = s.engine.InsertUnsafePayload(t.Ctx(), payload, ref) + require.NoError(t, err) + } +} diff --git a/op-e2e/actions/l2_verifier_test.go b/op-e2e/actions/l2_verifier_test.go index 2552775aaad2..4150c984f695 100644 --- a/op-e2e/actions/l2_verifier_test.go +++ b/op-e2e/actions/l2_verifier_test.go @@ -15,7 +15,7 @@ import ( func setupVerifier(t Testing, sd *e2eutils.SetupData, log log.Logger, l1F derive.L1Fetcher, syncCfg *sync.Config) (*L2Engine, *L2Verifier) { jwtPath := e2eutils.WriteDefaultJWT(t) - engine := NewL2Engine(t, log, sd.L2Cfg, sd.RollupCfg.Genesis.L1, jwtPath) + engine := NewL2Engine(t, log, sd.L2Cfg, sd.RollupCfg.Genesis.L1, jwtPath, EngineWithP2P()) engCl := engine.EngineClient(t, sd.RollupCfg) mockBlobFetcher := &emptyL1BlobsFetcher{t: t} verifier := NewL2Verifier(t, log, l1F, mockBlobFetcher, engCl, sd.RollupCfg, syncCfg) diff --git a/op-e2e/actions/sync_test.go b/op-e2e/actions/sync_test.go index 62d706912b1c..efecb376ca51 100644 --- a/op-e2e/actions/sync_test.go +++ b/op-e2e/actions/sync_test.go @@ -5,6 +5,7 @@ import ( "math/big" "math/rand" "testing" + "time" "github.com/ethereum-optimism/optimism/op-batcher/compressor" "github.com/ethereum-optimism/optimism/op-e2e/e2eutils" @@ -164,7 +165,6 @@ func TestUnsafeSync(gt *testing.T) { // TestELSync tests that a verifier will have the EL import the full chain from the sequencer // when passed a single unsafe block. op-geth can either snap sync or full sync here. func TestELSync(gt *testing.T) { - gt.Skip("not implemented yet") t := NewDefaultTesting(gt) dp := e2eutils.MakeDeployParams(t, defaultRollupTestParams) sd := e2eutils.Setup(t, dp, defaultAlloc) @@ -172,35 +172,42 @@ func TestELSync(gt *testing.T) { miner, seqEng, sequencer := setupSequencerTest(t, sd, log) // Enable engine P2P sync - _, verifier := setupVerifier(t, sd, log, miner.L1Client(t, sd.RollupCfg), &sync.Config{SyncMode: sync.ELSync}) + verEng, verifier := setupVerifier(t, sd, log, miner.L1Client(t, sd.RollupCfg), &sync.Config{SyncMode: sync.ELSync}) + + seqEng.AddPeers(verEng.Enode()) + verEng.AddPeers(seqEng.Enode()) seqEngCl, err := sources.NewEngineClient(seqEng.RPCClient(), log, nil, sources.EngineClientDefaultConfig(sd.RollupCfg)) require.NoError(t, err) sequencer.ActL2PipelineFull(t) - verifier.ActL2PipelineFull(t) - - // Build a L2 block. This block will not be gossiped to verifier, so verifier can not advance chain by itself. - sequencer.ActL2StartBlock(t) - sequencer.ActL2EndBlock(t) + // Build 10 L1 blocks on the sequencer for i := 0; i < 10; i++ { // Build a L2 block sequencer.ActL2StartBlock(t) sequencer.ActL2EndBlock(t) - // Notify new L2 block to verifier by unsafe gossip - seqHead, err := seqEngCl.PayloadByLabel(t.Ctx(), eth.Unsafe) - require.NoError(t, err) - verifier.ActL2UnsafeGossipReceive(seqHead)(t) - // Handle unsafe payload - verifier.ActL2PipelineFull(t) - // Verifier must advance unsafe head after unsafe gossip. - require.Equal(t, sequencer.L2Unsafe().Hash, verifier.L2Unsafe().Hash) } - // Actual test flow should be as follows: - // 1. Build a chain on the sequencer. - // 2. Gossip only a single final L2 block from the sequencer to the verifier. - // 3. Assert that the verifier has the full chain. + + // Insert it on the verifier + seqHead, err := seqEngCl.PayloadByLabel(t.Ctx(), eth.Unsafe) + require.NoError(t, err) + seqStart, err := seqEngCl.PayloadByNumber(t.Ctx(), 1) + require.NoError(t, err) + verifier.ActL2InsertUnsafePayload(seqHead)(t) + + // Expect snap sync to download & execute the entire chain + // Verify this by checking that the verifier has the correct value for block 1 + require.Eventually(t, + func() bool { + block, err := verifier.eng.L2BlockRefByNumber(t.Ctx(), 1) + if err != nil { + return false + } + return seqStart.ExecutionPayload.BlockHash == block.Hash + }, + 60*time.Second, 1500*time.Millisecond, + ) } func TestInvalidPayloadInSpanBatch(gt *testing.T) { diff --git a/op-e2e/system_test.go b/op-e2e/system_test.go index c36cb28ada8e..2c6225cb1625 100644 --- a/op-e2e/system_test.go +++ b/op-e2e/system_test.go @@ -218,6 +218,7 @@ func TestSystemE2E(t *testing.T) { sys, err := cfg.Start(t) require.Nil(t, err, "Error starting up system") + runE2ESystemTest(t, sys) defer sys.Close() } diff --git a/op-node/node/node.go b/op-node/node/node.go index 6f5891381548..418b60380a4a 100644 --- a/op-node/node/node.go +++ b/op-node/node/node.go @@ -409,7 +409,8 @@ func (n *OpNode) initPProf(cfg *Config) error { func (n *OpNode) initP2P(ctx context.Context, cfg *Config) error { if cfg.P2P != nil { - p2pNode, err := p2p.NewNodeP2P(n.resourcesCtx, &cfg.Rollup, n.log, cfg.P2P, n, n.l2Source, n.runCfg, n.metrics, cfg.Sync.SyncMode == sync.ELSync) + // TODO(protocol-quest/97): Use EL Sync instead of CL Alt sync for fetching missing blocks in the payload queue. + p2pNode, err := p2p.NewNodeP2P(n.resourcesCtx, &cfg.Rollup, n.log, cfg.P2P, n, n.l2Source, n.runCfg, n.metrics, false) if err != nil || p2pNode == nil { return err } diff --git a/op-node/rollup/derive/engine_controller.go b/op-node/rollup/derive/engine_controller.go index 362c9a810a29..a77ee8a3876c 100644 --- a/op-node/rollup/derive/engine_controller.go +++ b/op-node/rollup/derive/engine_controller.go @@ -4,15 +4,33 @@ import ( "context" "errors" "fmt" + "time" "github.com/ethereum-optimism/optimism/op-node/rollup" "github.com/ethereum-optimism/optimism/op-node/rollup/async" "github.com/ethereum-optimism/optimism/op-node/rollup/sync" + "github.com/ethereum-optimism/optimism/op-service/clock" "github.com/ethereum-optimism/optimism/op-service/eth" + "github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/log" ) +type syncStatusEnum int + +const ( + syncStatusCL syncStatusEnum = iota + // We transition between the 4 EL states linearly. We spend the majority of the time in the second & fourth. + // We only want to EL sync if there is no finalized block & once we finish EL sync we need to mark the last block + // as finalized so we can switch to consolidation + // TODO(protocol-quest/91): We can restart EL sync & still consolidate if there finalized blocks on the execution client if the + // execution client is running in archive mode. In some cases we may want to switch back from CL to EL sync, but that is complicated. + syncStatusWillStartEL // First if we are directed to EL sync, check that nothing has been finalized yet + syncStatusStartedEL // Perform our EL sync + syncStatusFinishedELButNotFinalized // EL sync is done, but we need to mark the final sync block as finalized + syncStatusFinishedEL // EL sync is done & we should be performing consolidation +) + var errNoFCUNeeded = errors.New("no FCU call was needed") var _ EngineControl = (*EngineController)(nil) @@ -22,14 +40,18 @@ type ExecEngine interface { GetPayload(ctx context.Context, payloadId eth.PayloadID) (*eth.ExecutionPayloadEnvelope, error) ForkchoiceUpdate(ctx context.Context, state *eth.ForkchoiceState, attr *eth.PayloadAttributes) (*eth.ForkchoiceUpdatedResult, error) NewPayload(ctx context.Context, payload *eth.ExecutionPayload, parentBeaconBlockRoot *common.Hash) (*eth.PayloadStatusV1, error) + L2BlockRefByLabel(ctx context.Context, label eth.BlockLabel) (eth.L2BlockRef, error) } type EngineController struct { - engine ExecEngine // Underlying execution engine RPC - log log.Logger - metrics Metrics - syncMode sync.Mode - rollupCfg *rollup.Config + engine ExecEngine // Underlying execution engine RPC + log log.Logger + metrics Metrics + syncMode sync.Mode + syncStatus syncStatusEnum + rollupCfg *rollup.Config + elStart time.Time + clock clock.Clock // Block Head State unsafeHead eth.L2BlockRef @@ -46,11 +68,19 @@ type EngineController struct { } func NewEngineController(engine ExecEngine, log log.Logger, metrics Metrics, rollupCfg *rollup.Config, syncMode sync.Mode) *EngineController { + syncStatus := syncStatusCL + if syncMode == sync.ELSync { + syncStatus = syncStatusWillStartEL + } + return &EngineController{ - engine: engine, - log: log, - metrics: metrics, - rollupCfg: rollupCfg, + engine: engine, + log: log, + metrics: metrics, + rollupCfg: rollupCfg, + syncMode: syncMode, + syncStatus: syncStatus, + clock: clock.SystemClock, } } @@ -77,7 +107,7 @@ func (e *EngineController) BuildingPayload() (eth.L2BlockRef, eth.PayloadID, boo } func (e *EngineController) IsEngineSyncing() bool { - return false + return e.syncStatus == syncStatusWillStartEL || e.syncStatus == syncStatusStartedEL || e.syncStatus == syncStatusFinishedELButNotFinalized } // Setters @@ -209,6 +239,9 @@ func (e *EngineController) resetBuildingState() { // It returns true if the status is acceptable. func (e *EngineController) checkNewPayloadStatus(status eth.ExecutePayloadStatus) bool { if e.syncMode == sync.ELSync { + if status == eth.ExecutionValid && e.syncStatus == syncStatusStartedEL { + e.syncStatus = syncStatusFinishedELButNotFinalized + } // Allow SYNCING and ACCEPTED if engine EL sync is enabled return status == eth.ExecutionValid || status == eth.ExecutionSyncing || status == eth.ExecutionAccepted } @@ -219,6 +252,9 @@ func (e *EngineController) checkNewPayloadStatus(status eth.ExecutePayloadStatus // It returns true if the status is acceptable. func (e *EngineController) checkForkchoiceUpdatedStatus(status eth.ExecutePayloadStatus) bool { if e.syncMode == sync.ELSync { + if status == eth.ExecutionValid && e.syncStatus == syncStatusStartedEL { + e.syncStatus = syncStatusFinishedELButNotFinalized + } // Allow SYNCING if engine P2P sync is enabled return status == eth.ExecutionValid || status == eth.ExecutionSyncing } @@ -258,6 +294,22 @@ func (e *EngineController) TryUpdateEngine(ctx context.Context) error { } func (e *EngineController) InsertUnsafePayload(ctx context.Context, envelope *eth.ExecutionPayloadEnvelope, ref eth.L2BlockRef) error { + // Check if there is a finalized head once when doing EL sync. If so, transition to CL sync + if e.syncStatus == syncStatusWillStartEL { + b, err := e.engine.L2BlockRefByLabel(ctx, eth.Finalized) + if errors.Is(err, ethereum.NotFound) { + e.syncStatus = syncStatusStartedEL + e.log.Info("Starting EL sync") + e.elStart = e.clock.Now() + } else if err == nil { + e.syncStatus = syncStatusFinishedEL + e.log.Info("Skipping EL sync and going straight to CL sync because there is a finalized block", "id", b.ID()) + return nil + } else { + return NewTemporaryError(fmt.Errorf("failed to fetch finalized head: %w", err)) + } + } + // Insert the payload & then call FCU status, err := e.engine.NewPayload(ctx, envelope.ExecutionPayload, envelope.ParentBeaconBlockRoot) if err != nil { return NewTemporaryError(fmt.Errorf("failed to update insert payload: %w", err)) @@ -274,6 +326,12 @@ func (e *EngineController) InsertUnsafePayload(ctx context.Context, envelope *et SafeBlockHash: e.safeHead.Hash, FinalizedBlockHash: e.finalizedHead.Hash, } + if e.syncStatus == syncStatusFinishedELButNotFinalized { + fc.SafeBlockHash = envelope.ExecutionPayload.BlockHash + fc.FinalizedBlockHash = envelope.ExecutionPayload.BlockHash + e.SetSafeHead(ref) + e.SetFinalizedHead(ref) + } fcRes, err := e.engine.ForkchoiceUpdate(ctx, &fc, nil) if err != nil { var inputErr eth.InputError @@ -293,9 +351,14 @@ func (e *EngineController) InsertUnsafePayload(ctx context.Context, envelope *et return NewTemporaryError(fmt.Errorf("cannot prepare unsafe chain for new payload: new - %v; parent: %v; err: %w", payload.ID(), payload.ParentID(), eth.ForkchoiceUpdateErr(fcRes.PayloadStatus))) } - e.unsafeHead = ref + e.SetUnsafeHead(ref) e.needFCUCall = false + if e.syncStatus == syncStatusFinishedELButNotFinalized { + e.log.Info("Finished EL sync", "sync_duration", e.clock.Since(e.elStart)) + e.syncStatus = syncStatusFinishedEL + } + return nil } @@ -303,13 +366,3 @@ func (e *EngineController) InsertUnsafePayload(ctx context.Context, envelope *et func (e *EngineController) ResetBuildingState() { e.resetBuildingState() } - -// ForkchoiceUpdate implements LocalEngineControl. -func (e *EngineController) ForkchoiceUpdate(ctx context.Context, state *eth.ForkchoiceState, attr *eth.PayloadAttributes) (*eth.ForkchoiceUpdatedResult, error) { - return e.engine.ForkchoiceUpdate(ctx, state, attr) -} - -// NewPayload implements LocalEngineControl. -func (e *EngineController) NewPayload(ctx context.Context, payload *eth.ExecutionPayload, parentBeaconBlockRoot *common.Hash) (*eth.PayloadStatusV1, error) { - return e.engine.NewPayload(ctx, payload, parentBeaconBlockRoot) -} diff --git a/op-node/rollup/driver/driver.go b/op-node/rollup/driver/driver.go index 205a5586bba2..7c21a63976ee 100644 --- a/op-node/rollup/driver/driver.go +++ b/op-node/rollup/driver/driver.go @@ -137,6 +137,7 @@ func NewDriver(driverCfg *Config, cfg *rollup.Config, l2 L2Chain, l1 L1Chain, l1 sequencerActive: make(chan chan bool, 10), sequencerNotifs: sequencerStateListener, config: cfg, + syncCfg: syncCfg, driverConfig: driverCfg, driverCtx: driverCtx, driverCancel: driverCancel, diff --git a/op-node/rollup/driver/state.go b/op-node/rollup/driver/state.go index b31f354291eb..73cb771d78cd 100644 --- a/op-node/rollup/driver/state.go +++ b/op-node/rollup/driver/state.go @@ -16,6 +16,7 @@ import ( "github.com/ethereum-optimism/optimism/op-node/rollup" "github.com/ethereum-optimism/optimism/op-node/rollup/async" "github.com/ethereum-optimism/optimism/op-node/rollup/derive" + "github.com/ethereum-optimism/optimism/op-node/rollup/sync" "github.com/ethereum-optimism/optimism/op-service/eth" "github.com/ethereum-optimism/optimism/op-service/retry" ) @@ -66,6 +67,9 @@ type Driver struct { // Driver config: verifier and sequencer settings driverConfig *Config + // Sync Mod Config + syncCfg *sync.Config + // L1 Signals: // // Not all L1 blocks, or all changes, have to be signalled: @@ -177,6 +181,18 @@ func (s *Driver) OnUnsafeL2Payload(ctx context.Context, envelope *eth.ExecutionP } } +func (s *Driver) logSyncProgress(reason string) { + s.log.Info("Sync progress", + "reason", reason, + "l2_finalized", s.engineController.Finalized(), + "l2_safe", s.engineController.SafeL2Head(), + "l2_pending_safe", s.engineController.PendingSafeL2Head(), + "l2_unsafe", s.engineController.UnsafeL2Head(), + "l2_time", s.engineController.UnsafeL2Head().Time, + "l1_derived", s.derivation.Origin(), + ) +} + // the eventLoop responds to L1 changes and internal timers to produce L2 blocks. func (s *Driver) eventLoop() { defer s.wg.Done() @@ -304,11 +320,27 @@ func (s *Driver) eventLoop() { } case envelope := <-s.unsafeL2Payloads: s.snapshot("New unsafe payload") - s.log.Info("Optimistically queueing unsafe L2 execution payload", "id", envelope.ExecutionPayload.ID()) - s.derivation.AddUnsafePayload(envelope) - s.metrics.RecordReceivedUnsafePayload(envelope) - reqStep() - + // If we are doing CL sync or done with engine syncing, fallback to the unsafe payload queue & CL P2P sync. + if s.syncCfg.SyncMode == sync.CLSync || !s.engineController.IsEngineSyncing() { + s.log.Info("Optimistically queueing unsafe L2 execution payload", "id", envelope.ExecutionPayload.ID()) + s.derivation.AddUnsafePayload(envelope) + s.metrics.RecordReceivedUnsafePayload(envelope) + reqStep() + } else if s.syncCfg.SyncMode == sync.ELSync { + ref, err := derive.PayloadToBlockRef(s.config, envelope.ExecutionPayload) + if err != nil { + s.log.Info("Failed to turn execution payload into a block ref", "id", envelope.ExecutionPayload.ID(), "err", err) + continue + } + if ref.Number <= s.engineController.UnsafeL2Head().Number { + continue + } + s.log.Info("Optimistically inserting unsafe L2 execution payload to drive EL sync", "id", envelope.ExecutionPayload.ID()) + if err := s.engineController.InsertUnsafePayload(s.driverCtx, envelope, ref); err != nil { + s.log.Warn("Failed to insert unsafe payload for EL sync", "id", envelope.ExecutionPayload.ID(), "err", err) + } + s.logSyncProgress("unsafe payload from sequencer") + } case newL1Head := <-s.l1HeadSig: s.l1State.HandleNewL1HeadBlock(newL1Head) reqStep() // a new L1 head may mean we have the data to not get an EOF again. @@ -323,6 +355,10 @@ func (s *Driver) eventLoop() { delayedStepReq = nil step() case <-stepReqCh: + // Don't start the derivation pipeline until we are done with EL sync + if s.engineController.IsEngineSyncing() { + continue + } s.metrics.SetDerivationIdle(false) s.log.Debug("Derivation process step", "onto_origin", s.derivation.Origin(), "attempts", stepAttempts) err := s.derivation.Step(s.driverCtx) diff --git a/op-program/client/l2/engine.go b/op-program/client/l2/engine.go index 67dfd94d0b5c..260561bccee3 100644 --- a/op-program/client/l2/engine.go +++ b/op-program/client/l2/engine.go @@ -26,7 +26,7 @@ type OracleEngine struct { } func NewOracleEngine(rollupCfg *rollup.Config, logger log.Logger, backend engineapi.EngineBackend) *OracleEngine { - engineAPI := engineapi.NewL2EngineAPI(logger, backend) + engineAPI := engineapi.NewL2EngineAPI(logger, backend, nil) return &OracleEngine{ api: engineAPI, backend: backend, diff --git a/op-program/client/l2/engineapi/l2_engine_api.go b/op-program/client/l2/engineapi/l2_engine_api.go index 27b1420fc3d5..184494a84b8f 100644 --- a/op-program/client/l2/engineapi/l2_engine_api.go +++ b/op-program/client/l2/engineapi/l2_engine_api.go @@ -16,6 +16,7 @@ import ( "github.com/ethereum/go-ethereum/core/state" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/vm" + "github.com/ethereum/go-ethereum/eth/downloader" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/params" ) @@ -50,6 +51,10 @@ type L2EngineAPI struct { log log.Logger backend EngineBackend + // Functionality for snap sync + remotes map[common.Hash]*types.Block + downloader *downloader.Downloader + // L2 block building data blockProcessor *BlockProcessor pendingIndices map[common.Address]uint64 // per account, how many txs from the pool were already included in the block, since the pool is lagging behind block mining. @@ -59,10 +64,12 @@ type L2EngineAPI struct { payloadID engine.PayloadID // ID of payload that is currently being built } -func NewL2EngineAPI(log log.Logger, backend EngineBackend) *L2EngineAPI { +func NewL2EngineAPI(log log.Logger, backend EngineBackend, downloader *downloader.Downloader) *L2EngineAPI { return &L2EngineAPI{ - log: log, - backend: backend, + log: log, + backend: backend, + remotes: make(map[common.Hash]*types.Block), + downloader: downloader, } } @@ -329,7 +336,24 @@ func (ea *L2EngineAPI) forkchoiceUpdated(ctx context.Context, state *eth.Forkcho // reason. block := ea.backend.GetBlockByHash(state.HeadBlockHash) if block == nil { - // TODO: syncing not supported yet + if ea.downloader == nil { + ea.log.Warn("Must register downloader to be able to snap sync") + return STATUS_SYNCING, nil + } + // If the head hash is unknown (was not given to us in a newPayload request), + // we cannot resolve the header, so not much to do. This could be extended in + // the future to resolve from the `eth` network, but it's an unexpected case + // that should be fixed, not papered over. + header := ea.remotes[state.HeadBlockHash] + if header == nil { + ea.log.Warn("Forkchoice requested unknown head", "hash", state.HeadBlockHash) + return STATUS_SYNCING, nil + } + + ea.log.Info("Forkchoice requested sync to new head", "number", header.Number, "hash", header.Hash()) + if err := ea.downloader.BeaconSync(downloader.SnapSync, header.Header(), nil); err != nil { + return STATUS_SYNCING, err + } return STATUS_SYNCING, nil } // Block is known locally, just sanity check that the beacon client does not @@ -462,6 +486,7 @@ func (ea *L2EngineAPI) newPayload(ctx context.Context, payload *eth.ExecutionPay parent := ea.backend.GetBlock(block.ParentHash(), block.NumberU64()-1) if parent == nil { + ea.remotes[block.Hash()] = block // TODO: hack, saying we accepted if we don't know the parent block. Might want to return critical error if we can't actually sync. return ð.PayloadStatusV1{Status: eth.ExecutionAccepted, LatestValidHash: nil}, nil } diff --git a/op-program/client/l2/engineapi/test/l2_engine_api_tests.go b/op-program/client/l2/engineapi/test/l2_engine_api_tests.go index f68b33f8b3ae..c8a0fb706283 100644 --- a/op-program/client/l2/engineapi/test/l2_engine_api_tests.go +++ b/op-program/client/l2/engineapi/test/l2_engine_api_tests.go @@ -320,7 +320,7 @@ func newTestHelper(t *testing.T, createBackend func(t *testing.T) engineapi.Engi logger := testlog.Logger(t, log.LvlDebug) ctx := context.Background() backend := createBackend(t) - api := engineapi.NewL2EngineAPI(logger, backend) + api := engineapi.NewL2EngineAPI(logger, backend, nil) test := &testHelper{ t: t, ctx: ctx, diff --git a/op-service/clock/clock.go b/op-service/clock/clock.go index 834d2984c911..d38eaa256908 100644 --- a/op-service/clock/clock.go +++ b/op-service/clock/clock.go @@ -13,6 +13,9 @@ type Clock interface { // Now provides the current local time. Equivalent to time.Now Now() time.Time + // Since returns the time elapsed since t. It is shorthand for time.Now().Sub(t). + Since(time.Time) time.Duration + // After waits for the duration to elapse and then sends the current time on the returned channel. // It is equivalent to time.After After(d time.Duration) <-chan time.Time @@ -81,6 +84,10 @@ func (s systemClock) Now() time.Time { return time.Now() } +func (s systemClock) Since(t time.Time) time.Duration { + return time.Since(t) +} + func (s systemClock) After(d time.Duration) <-chan time.Time { return time.After(d) } diff --git a/op-service/clock/deterministic.go b/op-service/clock/deterministic.go index 8d28fb917340..ce11773232ce 100644 --- a/op-service/clock/deterministic.go +++ b/op-service/clock/deterministic.go @@ -138,6 +138,12 @@ func (s *DeterministicClock) Now() time.Time { return s.now } +func (s *DeterministicClock) Since(t time.Time) time.Duration { + s.lock.Lock() + defer s.lock.Unlock() + return s.now.Sub(t) +} + func (s *DeterministicClock) After(d time.Duration) <-chan time.Time { s.lock.Lock() defer s.lock.Unlock()