Skip to content

Commit

Permalink
op-node: Execution Layer Sync
Browse files Browse the repository at this point in the history
This passes unsafe payloads directly the EngineController for immediate
insertion when Execution Layer sync is active. This tells the execution
client to sync to that target. Once the EL sync is complete, the last
unsafe payload is marked as safe. This is required when doing snap sync
because the EL does not have the pre-state required to do the engine
consolidation until the sync is complete.
  • Loading branch information
trianglesphere committed Jan 13, 2024
1 parent 4077200 commit ef9d9fe
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 24 deletions.
63 changes: 43 additions & 20 deletions op-node/rollup/derive/engine_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,15 @@ import (
"github.com/ethereum/go-ethereum/log"
)

type syncStatusEnum int

const (
syncStatusCL syncStatusEnum = iota
syncStatusStartedEL
syncStatusFinishedELButNotFinalized
syncStatusFinishedEL
)

var errNoFCUNeeded = errors.New("no FCU call was needed")

var _ EngineControl = (*EngineController)(nil)
Expand All @@ -24,11 +33,12 @@ type ExecEngine interface {
}

type EngineController struct {
engine ExecEngine // Underlying execution engine RPC
log log.Logger
metrics Metrics
syncMode sync.Mode
rollupCfg *rollup.Config
engine ExecEngine // Underlying execution engine RPC
log log.Logger
metrics Metrics
syncMode sync.Mode
syncStatus syncStatusEnum
rollupCfg *rollup.Config

// Block Head State
unsafeHead eth.L2BlockRef
Expand All @@ -45,11 +55,18 @@ type EngineController struct {
}

func NewEngineController(engine ExecEngine, log log.Logger, metrics Metrics, rollupCfg *rollup.Config, syncMode sync.Mode) *EngineController {
syncStatus := syncStatusCL
if syncMode == sync.ELSync {
syncStatus = syncStatusStartedEL
}

return &EngineController{
engine: engine,
log: log,
metrics: metrics,
rollupCfg: rollupCfg,
engine: engine,
log: log,
metrics: metrics,
rollupCfg: rollupCfg,
syncMode: syncMode,
syncStatus: syncStatus,
}
}

Expand All @@ -76,7 +93,7 @@ func (e *EngineController) BuildingPayload() (eth.L2BlockRef, eth.PayloadID, boo
}

func (e *EngineController) IsEngineSyncing() bool {
return false
return e.syncStatus == syncStatusStartedEL
}

// Setters
Expand Down Expand Up @@ -208,6 +225,9 @@ func (e *EngineController) resetBuildingState() {
// It returns true if the status is acceptable.
func (e *EngineController) checkNewPayloadStatus(status eth.ExecutePayloadStatus) bool {
if e.syncMode == sync.ELSync {
if status == eth.ExecutionValid && e.syncStatus == syncStatusStartedEL {
e.syncStatus = syncStatusFinishedELButNotFinalized
}
// Allow SYNCING and ACCEPTED if engine EL sync is enabled
return status == eth.ExecutionValid || status == eth.ExecutionSyncing || status == eth.ExecutionAccepted
}
Expand All @@ -218,6 +238,9 @@ func (e *EngineController) checkNewPayloadStatus(status eth.ExecutePayloadStatus
// It returns true if the status is acceptable.
func (e *EngineController) checkForkchoiceUpdatedStatus(status eth.ExecutePayloadStatus) bool {
if e.syncMode == sync.ELSync {
if status == eth.ExecutionValid && e.syncStatus == syncStatusStartedEL {
e.syncStatus = syncStatusFinishedELButNotFinalized
}
// Allow SYNCING if engine P2P sync is enabled
return status == eth.ExecutionValid || status == eth.ExecutionSyncing
}
Expand Down Expand Up @@ -272,6 +295,12 @@ func (e *EngineController) InsertUnsafePayload(ctx context.Context, payload *eth
SafeBlockHash: e.safeHead.Hash,
FinalizedBlockHash: e.finalizedHead.Hash,
}
if e.syncStatus == syncStatusFinishedELButNotFinalized {
fc.SafeBlockHash = payload.BlockHash
fc.FinalizedBlockHash = payload.BlockHash
e.SetSafeHead(ref)
e.SetFinalizedHead(ref)
}
fcRes, err := e.engine.ForkchoiceUpdate(ctx, &fc, nil)
if err != nil {
var inputErr eth.InputError
Expand All @@ -293,20 +322,14 @@ func (e *EngineController) InsertUnsafePayload(ctx context.Context, payload *eth
e.unsafeHead = ref
e.needFCUCall = false

if e.syncStatus == syncStatusFinishedELButNotFinalized {
e.syncStatus = syncStatusFinishedEL
}

return nil
}

// ResetBuildingState implements LocalEngineControl.
func (e *EngineController) ResetBuildingState() {
e.resetBuildingState()
}

// ForkchoiceUpdate implements LocalEngineControl.
func (e *EngineController) ForkchoiceUpdate(ctx context.Context, state *eth.ForkchoiceState, attr *eth.PayloadAttributes) (*eth.ForkchoiceUpdatedResult, error) {
return e.engine.ForkchoiceUpdate(ctx, state, attr)
}

// NewPayload implements LocalEngineControl.
func (e *EngineController) NewPayload(ctx context.Context, payload *eth.ExecutionPayload) (*eth.PayloadStatusV1, error) {
return e.engine.NewPayload(ctx, payload)
}
1 change: 1 addition & 0 deletions op-node/rollup/driver/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ func NewDriver(driverCfg *Config, cfg *rollup.Config, l2 L2Chain, l1 L1Chain, l1
sequencerActive: make(chan chan bool, 10),
sequencerNotifs: sequencerStateListener,
config: cfg,
syncCfg: syncCfg,
driverConfig: driverCfg,
driverCtx: driverCtx,
driverCancel: driverCancel,
Expand Down
32 changes: 28 additions & 4 deletions op-node/rollup/driver/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (

"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum-optimism/optimism/op-node/rollup/sync"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-service/retry"
)
Expand Down Expand Up @@ -65,6 +66,9 @@ type Driver struct {
// Driver config: verifier and sequencer settings
driverConfig *Config

// Sync Mod Config
syncCfg *sync.Config

// L1 Signals:
//
// Not all L1 blocks, or all changes, have to be signalled:
Expand Down Expand Up @@ -302,10 +306,26 @@ func (s *Driver) eventLoop() {
}
case payload := <-s.unsafeL2Payloads:
s.snapshot("New unsafe payload")
s.log.Info("Optimistically queueing unsafe L2 execution payload", "id", payload.ID())
s.derivation.AddUnsafePayload(payload)
s.metrics.RecordReceivedUnsafePayload(payload)
reqStep()
if s.syncCfg.SyncMode == sync.CLSync || !s.engineController.IsEngineSyncing() {
s.log.Info("Optimistically queueing unsafe L2 execution payload", "id", payload.ID())
s.derivation.AddUnsafePayload(payload)
s.metrics.RecordReceivedUnsafePayload(payload)
reqStep()
} else if s.syncCfg.SyncMode == sync.ELSync {
// TODO: always insert here, but log depending on if engine syncing or not?
ref, err := derive.PayloadToBlockRef(s.config, payload)
if err != nil {
s.log.Info("Failed to turn execution payload into a block ref", "id", payload.ID(), "err", err)
continue
}
if ref.Number <= s.engineController.UnsafeL2Head().Number {
continue
}
s.log.Info("Optimistically inserting unsafe L2 execution payload to drive EL sync", "id", payload.ID())
if err := s.engineController.InsertUnsafePayload(s.driverCtx, payload, ref); err != nil {
s.log.Warn("Failed to insert unsafe payload for EL sync", "id", payload.ID(), "err", err)
}
}

case newL1Head := <-s.l1HeadSig:
s.l1State.HandleNewL1HeadBlock(newL1Head)
Expand All @@ -321,6 +341,10 @@ func (s *Driver) eventLoop() {
delayedStepReq = nil
step()
case <-stepReqCh:
// Don't start the derivation pipeline until we are done with EL sync
if s.engineController.IsEngineSyncing() {
continue
}
s.metrics.SetDerivationIdle(false)
s.log.Debug("Derivation process step", "onto_origin", s.derivation.Origin(), "attempts", stepAttempts)
err := s.derivation.Step(s.driverCtx)
Expand Down

0 comments on commit ef9d9fe

Please sign in to comment.