From ef9d9fe7326c0beacf0c495a80d2382730b25856 Mon Sep 17 00:00:00 2001 From: Joshua Gutow Date: Fri, 12 Jan 2024 16:22:19 -0800 Subject: [PATCH] op-node: Execution Layer Sync This passes unsafe payloads directly the EngineController for immediate insertion when Execution Layer sync is active. This tells the execution client to sync to that target. Once the EL sync is complete, the last unsafe payload is marked as safe. This is required when doing snap sync because the EL does not have the pre-state required to do the engine consolidation until the sync is complete. --- op-node/rollup/derive/engine_controller.go | 63 +++++++++++++++------- op-node/rollup/driver/driver.go | 1 + op-node/rollup/driver/state.go | 32 +++++++++-- 3 files changed, 72 insertions(+), 24 deletions(-) diff --git a/op-node/rollup/derive/engine_controller.go b/op-node/rollup/derive/engine_controller.go index 00dd10c22d1f..deb6a90421e1 100644 --- a/op-node/rollup/derive/engine_controller.go +++ b/op-node/rollup/derive/engine_controller.go @@ -12,6 +12,15 @@ import ( "github.com/ethereum/go-ethereum/log" ) +type syncStatusEnum int + +const ( + syncStatusCL syncStatusEnum = iota + syncStatusStartedEL + syncStatusFinishedELButNotFinalized + syncStatusFinishedEL +) + var errNoFCUNeeded = errors.New("no FCU call was needed") var _ EngineControl = (*EngineController)(nil) @@ -24,11 +33,12 @@ type ExecEngine interface { } type EngineController struct { - engine ExecEngine // Underlying execution engine RPC - log log.Logger - metrics Metrics - syncMode sync.Mode - rollupCfg *rollup.Config + engine ExecEngine // Underlying execution engine RPC + log log.Logger + metrics Metrics + syncMode sync.Mode + syncStatus syncStatusEnum + rollupCfg *rollup.Config // Block Head State unsafeHead eth.L2BlockRef @@ -45,11 +55,18 @@ type EngineController struct { } func NewEngineController(engine ExecEngine, log log.Logger, metrics Metrics, rollupCfg *rollup.Config, syncMode sync.Mode) *EngineController { + syncStatus := syncStatusCL + if syncMode == sync.ELSync { + syncStatus = syncStatusStartedEL + } + return &EngineController{ - engine: engine, - log: log, - metrics: metrics, - rollupCfg: rollupCfg, + engine: engine, + log: log, + metrics: metrics, + rollupCfg: rollupCfg, + syncMode: syncMode, + syncStatus: syncStatus, } } @@ -76,7 +93,7 @@ func (e *EngineController) BuildingPayload() (eth.L2BlockRef, eth.PayloadID, boo } func (e *EngineController) IsEngineSyncing() bool { - return false + return e.syncStatus == syncStatusStartedEL } // Setters @@ -208,6 +225,9 @@ func (e *EngineController) resetBuildingState() { // It returns true if the status is acceptable. func (e *EngineController) checkNewPayloadStatus(status eth.ExecutePayloadStatus) bool { if e.syncMode == sync.ELSync { + if status == eth.ExecutionValid && e.syncStatus == syncStatusStartedEL { + e.syncStatus = syncStatusFinishedELButNotFinalized + } // Allow SYNCING and ACCEPTED if engine EL sync is enabled return status == eth.ExecutionValid || status == eth.ExecutionSyncing || status == eth.ExecutionAccepted } @@ -218,6 +238,9 @@ func (e *EngineController) checkNewPayloadStatus(status eth.ExecutePayloadStatus // It returns true if the status is acceptable. func (e *EngineController) checkForkchoiceUpdatedStatus(status eth.ExecutePayloadStatus) bool { if e.syncMode == sync.ELSync { + if status == eth.ExecutionValid && e.syncStatus == syncStatusStartedEL { + e.syncStatus = syncStatusFinishedELButNotFinalized + } // Allow SYNCING if engine P2P sync is enabled return status == eth.ExecutionValid || status == eth.ExecutionSyncing } @@ -272,6 +295,12 @@ func (e *EngineController) InsertUnsafePayload(ctx context.Context, payload *eth SafeBlockHash: e.safeHead.Hash, FinalizedBlockHash: e.finalizedHead.Hash, } + if e.syncStatus == syncStatusFinishedELButNotFinalized { + fc.SafeBlockHash = payload.BlockHash + fc.FinalizedBlockHash = payload.BlockHash + e.SetSafeHead(ref) + e.SetFinalizedHead(ref) + } fcRes, err := e.engine.ForkchoiceUpdate(ctx, &fc, nil) if err != nil { var inputErr eth.InputError @@ -293,6 +322,10 @@ func (e *EngineController) InsertUnsafePayload(ctx context.Context, payload *eth e.unsafeHead = ref e.needFCUCall = false + if e.syncStatus == syncStatusFinishedELButNotFinalized { + e.syncStatus = syncStatusFinishedEL + } + return nil } @@ -300,13 +333,3 @@ func (e *EngineController) InsertUnsafePayload(ctx context.Context, payload *eth func (e *EngineController) ResetBuildingState() { e.resetBuildingState() } - -// ForkchoiceUpdate implements LocalEngineControl. -func (e *EngineController) ForkchoiceUpdate(ctx context.Context, state *eth.ForkchoiceState, attr *eth.PayloadAttributes) (*eth.ForkchoiceUpdatedResult, error) { - return e.engine.ForkchoiceUpdate(ctx, state, attr) -} - -// NewPayload implements LocalEngineControl. -func (e *EngineController) NewPayload(ctx context.Context, payload *eth.ExecutionPayload) (*eth.PayloadStatusV1, error) { - return e.engine.NewPayload(ctx, payload) -} diff --git a/op-node/rollup/driver/driver.go b/op-node/rollup/driver/driver.go index c105c4c43550..3d53d33a7a5f 100644 --- a/op-node/rollup/driver/driver.go +++ b/op-node/rollup/driver/driver.go @@ -135,6 +135,7 @@ func NewDriver(driverCfg *Config, cfg *rollup.Config, l2 L2Chain, l1 L1Chain, l1 sequencerActive: make(chan chan bool, 10), sequencerNotifs: sequencerStateListener, config: cfg, + syncCfg: syncCfg, driverConfig: driverCfg, driverCtx: driverCtx, driverCancel: driverCancel, diff --git a/op-node/rollup/driver/state.go b/op-node/rollup/driver/state.go index d2ccfbb0c4e7..f6c7a3f5b2d7 100644 --- a/op-node/rollup/driver/state.go +++ b/op-node/rollup/driver/state.go @@ -15,6 +15,7 @@ import ( "github.com/ethereum-optimism/optimism/op-node/rollup" "github.com/ethereum-optimism/optimism/op-node/rollup/derive" + "github.com/ethereum-optimism/optimism/op-node/rollup/sync" "github.com/ethereum-optimism/optimism/op-service/eth" "github.com/ethereum-optimism/optimism/op-service/retry" ) @@ -65,6 +66,9 @@ type Driver struct { // Driver config: verifier and sequencer settings driverConfig *Config + // Sync Mod Config + syncCfg *sync.Config + // L1 Signals: // // Not all L1 blocks, or all changes, have to be signalled: @@ -302,10 +306,26 @@ func (s *Driver) eventLoop() { } case payload := <-s.unsafeL2Payloads: s.snapshot("New unsafe payload") - s.log.Info("Optimistically queueing unsafe L2 execution payload", "id", payload.ID()) - s.derivation.AddUnsafePayload(payload) - s.metrics.RecordReceivedUnsafePayload(payload) - reqStep() + if s.syncCfg.SyncMode == sync.CLSync || !s.engineController.IsEngineSyncing() { + s.log.Info("Optimistically queueing unsafe L2 execution payload", "id", payload.ID()) + s.derivation.AddUnsafePayload(payload) + s.metrics.RecordReceivedUnsafePayload(payload) + reqStep() + } else if s.syncCfg.SyncMode == sync.ELSync { + // TODO: always insert here, but log depending on if engine syncing or not? + ref, err := derive.PayloadToBlockRef(s.config, payload) + if err != nil { + s.log.Info("Failed to turn execution payload into a block ref", "id", payload.ID(), "err", err) + continue + } + if ref.Number <= s.engineController.UnsafeL2Head().Number { + continue + } + s.log.Info("Optimistically inserting unsafe L2 execution payload to drive EL sync", "id", payload.ID()) + if err := s.engineController.InsertUnsafePayload(s.driverCtx, payload, ref); err != nil { + s.log.Warn("Failed to insert unsafe payload for EL sync", "id", payload.ID(), "err", err) + } + } case newL1Head := <-s.l1HeadSig: s.l1State.HandleNewL1HeadBlock(newL1Head) @@ -321,6 +341,10 @@ func (s *Driver) eventLoop() { delayedStepReq = nil step() case <-stepReqCh: + // Don't start the derivation pipeline until we are done with EL sync + if s.engineController.IsEngineSyncing() { + continue + } s.metrics.SetDerivationIdle(false) s.log.Debug("Derivation process step", "onto_origin", s.derivation.Origin(), "attempts", stepAttempts) err := s.derivation.Step(s.driverCtx)