diff --git a/op-e2e/actions/l2_sequencer.go b/op-e2e/actions/l2_sequencer.go index 625356f4bca68..8f7cb6154db85 100644 --- a/op-e2e/actions/l2_sequencer.go +++ b/op-e2e/actions/l2_sequencer.go @@ -49,7 +49,7 @@ func NewL2Sequencer(t Testing, log log.Logger, l1 derive.L1Fetcher, eng L2API, c } return &L2Sequencer{ L2Verifier: *ver, - sequencer: driver.NewSequencer(log, cfg, ver.derivation, attrBuilder, l1OriginSelector, metrics.NoopMetrics), + sequencer: driver.NewSequencer(log, cfg, ver.engine, attrBuilder, l1OriginSelector, metrics.NoopMetrics), mockL1OriginSelector: l1OriginSelector, failL2GossipUnsafeBlock: nil, } @@ -104,7 +104,7 @@ func (s *L2Sequencer) ActL2EndBlock(t Testing) { // ActL2KeepL1Origin makes the sequencer use the current L1 origin, even if the next origin is available. func (s *L2Sequencer) ActL2KeepL1Origin(t Testing) { - parent := s.derivation.UnsafeL2Head() + parent := s.engine.UnsafeL2Head() // force old origin, for testing purposes oldOrigin, err := s.l1.L1BlockRefByHash(t.Ctx(), parent.L1Origin.Hash) require.NoError(t, err, "failed to get current origin: %s", parent.L1Origin) @@ -113,7 +113,7 @@ func (s *L2Sequencer) ActL2KeepL1Origin(t Testing) { // ActBuildToL1Head builds empty blocks until (incl.) the L1 head becomes the L2 origin func (s *L2Sequencer) ActBuildToL1Head(t Testing) { - for s.derivation.UnsafeL2Head().L1Origin.Number < s.l1State.L1Head().Number { + for s.engine.UnsafeL2Head().L1Origin.Number < s.l1State.L1Head().Number { s.ActL2PipelineFull(t) s.ActL2StartBlock(t) s.ActL2EndBlock(t) @@ -122,7 +122,7 @@ func (s *L2Sequencer) ActBuildToL1Head(t Testing) { // ActBuildToL1HeadUnsafe builds empty blocks until (incl.) the L1 head becomes the L1 origin of the L2 head func (s *L2Sequencer) ActBuildToL1HeadUnsafe(t Testing) { - for s.derivation.UnsafeL2Head().L1Origin.Number < s.l1State.L1Head().Number { + for s.engine.UnsafeL2Head().L1Origin.Number < s.l1State.L1Head().Number { // Note: the derivation pipeline does not run, we are just sequencing a block on top of the existing L2 chain. s.ActL2StartBlock(t) s.ActL2EndBlock(t) @@ -133,7 +133,7 @@ func (s *L2Sequencer) ActBuildToL1HeadUnsafe(t Testing) { func (s *L2Sequencer) ActBuildToL1HeadExcl(t Testing) { for { s.ActL2PipelineFull(t) - nextOrigin, err := s.mockL1OriginSelector.FindL1Origin(t.Ctx(), s.derivation.UnsafeL2Head()) + nextOrigin, err := s.mockL1OriginSelector.FindL1Origin(t.Ctx(), s.engine.UnsafeL2Head()) require.NoError(t, err) if nextOrigin.Number >= s.l1State.L1Head().Number { break @@ -147,7 +147,7 @@ func (s *L2Sequencer) ActBuildToL1HeadExcl(t Testing) { func (s *L2Sequencer) ActBuildToL1HeadExclUnsafe(t Testing) { for { // Note: the derivation pipeline does not run, we are just sequencing a block on top of the existing L2 chain. - nextOrigin, err := s.mockL1OriginSelector.FindL1Origin(t.Ctx(), s.derivation.UnsafeL2Head()) + nextOrigin, err := s.mockL1OriginSelector.FindL1Origin(t.Ctx(), s.engine.UnsafeL2Head()) require.NoError(t, err) if nextOrigin.Number >= s.l1State.L1Head().Number { break diff --git a/op-e2e/actions/l2_verifier.go b/op-e2e/actions/l2_verifier.go index d0f6bb21c0625..2cb8a688d6f10 100644 --- a/op-e2e/actions/l2_verifier.go +++ b/op-e2e/actions/l2_verifier.go @@ -33,6 +33,7 @@ type L2Verifier struct { } // L2 rollup + engine *derive.EngineController derivation *derive.DerivationPipeline l1 derive.L1Fetcher @@ -59,12 +60,14 @@ type L2API interface { func NewL2Verifier(t Testing, log log.Logger, l1 derive.L1Fetcher, eng L2API, cfg *rollup.Config, syncCfg *sync.Config) *L2Verifier { metrics := &testutils.TestDerivationMetrics{} - pipeline := derive.NewDerivationPipeline(log, cfg, l1, nil, eng, metrics, syncCfg) + engine := derive.NewEngineController(eng, log, metrics, cfg, syncCfg.SyncMode) + pipeline := derive.NewDerivationPipeline(log, cfg, l1, nil, eng, engine, metrics, syncCfg) pipeline.Reset() rollupNode := &L2Verifier{ log: log, eng: eng, + engine: engine, derivation: pipeline, l1: l1, l1State: driver.NewL1State(log, metrics), @@ -132,19 +135,19 @@ func (s *l2VerifierBackend) OnUnsafeL2Payload(ctx context.Context, payload *eth. } func (s *L2Verifier) L2Finalized() eth.L2BlockRef { - return s.derivation.Finalized() + return s.engine.Finalized() } func (s *L2Verifier) L2Safe() eth.L2BlockRef { - return s.derivation.SafeL2Head() + return s.engine.SafeL2Head() } func (s *L2Verifier) L2PendingSafe() eth.L2BlockRef { - return s.derivation.PendingSafeL2Head() + return s.engine.PendingSafeL2Head() } func (s *L2Verifier) L2Unsafe() eth.L2BlockRef { - return s.derivation.UnsafeL2Head() + return s.engine.UnsafeL2Head() } func (s *L2Verifier) SyncStatus() *eth.SyncStatus { @@ -158,7 +161,6 @@ func (s *L2Verifier) SyncStatus() *eth.SyncStatus { SafeL2: s.L2Safe(), FinalizedL2: s.L2Finalized(), PendingSafeL2: s.L2PendingSafe(), - UnsafeL2SyncTarget: s.derivation.UnsafeL2SyncTarget(), } } diff --git a/op-e2e/actions/reorg_test.go b/op-e2e/actions/reorg_test.go index 3ebbe7ecffe58..25804e59f0fcf 100644 --- a/op-e2e/actions/reorg_test.go +++ b/op-e2e/actions/reorg_test.go @@ -833,11 +833,11 @@ func SyncAfterReorg(gt *testing.T, deltaTimeOffset *hexutil.Uint64) { miner.ActL1SetFeeRecipient(common.Address{'A', 0}) miner.ActEmptyBlock(t) sequencer.ActL1HeadSignal(t) - for sequencer.derivation.UnsafeL2Head().L1Origin.Number < sequencer.l1State.L1Head().Number { + for sequencer.engine.UnsafeL2Head().L1Origin.Number < sequencer.l1State.L1Head().Number { // build L2 blocks until the L1 origin is the current L1 head(A0) sequencer.ActL2PipelineFull(t) sequencer.ActL2StartBlock(t) - if sequencer.derivation.UnsafeL2Head().Number == 11 { + if sequencer.engine.UnsafeL2Head().Number == 11 { // include a user tx at L2 block #12 to make a state transition alice.L2.ActResetTxOpts(t) alice.L2.ActSetTxToAddr(&dp.Addresses.Bob)(t) diff --git a/op-e2e/actions/span_batch_test.go b/op-e2e/actions/span_batch_test.go index 65312b949652d..07bdd91ee3b9d 100644 --- a/op-e2e/actions/span_batch_test.go +++ b/op-e2e/actions/span_batch_test.go @@ -509,7 +509,7 @@ func TestSpanBatchLowThroughputChain(gt *testing.T) { totalTxCount := 0 // Make 600 L2 blocks (L1BlockTime / L2BlockTime * 50) including 1~3 txs for i := 0; i < 50; i++ { - for sequencer.derivation.UnsafeL2Head().L1Origin.Number < sequencer.l1State.L1Head().Number { + for sequencer.engine.UnsafeL2Head().L1Origin.Number < sequencer.l1State.L1Head().Number { sequencer.ActL2StartBlock(t) // fill the block with random number of L2 txs for j := 0; j < rand.Intn(3); j++ { @@ -646,7 +646,7 @@ func TestBatchEquivalence(gt *testing.T) { sequencer.ActL2PipelineFull(t) totalTxCount := 0 // Build random blocks - for sequencer.derivation.UnsafeL2Head().L1Origin.Number < sequencer.l1State.L1Head().Number { + for sequencer.engine.UnsafeL2Head().L1Origin.Number < sequencer.l1State.L1Head().Number { sequencer.ActL2StartBlock(t) // fill the block with random number of L2 txs for j := 0; j < rand.Intn(3); j++ { diff --git a/op-node/node/server_test.go b/op-node/node/server_test.go index 51d34c11f625f..a029cafab3e8a 100644 --- a/op-node/node/server_test.go +++ b/op-node/node/server_test.go @@ -161,7 +161,6 @@ func randomSyncStatus(rng *rand.Rand) *eth.SyncStatus { SafeL2: testutils.RandomL2BlockRef(rng), FinalizedL2: testutils.RandomL2BlockRef(rng), PendingSafeL2: testutils.RandomL2BlockRef(rng), - UnsafeL2SyncTarget: testutils.RandomL2BlockRef(rng), } } diff --git a/op-node/rollup/derive/engine_controller.go b/op-node/rollup/derive/engine_controller.go index 94e734a2c4bb6..00dd10c22d1fb 100644 --- a/op-node/rollup/derive/engine_controller.go +++ b/op-node/rollup/derive/engine_controller.go @@ -32,7 +32,7 @@ type EngineController struct { // Block Head State unsafeHead eth.L2BlockRef - pendingSafeHead eth.L2BlockRef + pendingSafeHead eth.L2BlockRef // L2 block processed from the middle of a span batch, but not marked as the safe block yet. safeHead eth.L2BlockRef finalizedHead eth.L2BlockRef needFCUCall bool @@ -165,7 +165,6 @@ func (e *EngineController) ConfirmPayload(ctx context.Context) (out *eth.Executi e.unsafeHead = ref e.metrics.RecordL2Ref("l2_unsafe", ref) - e.metrics.RecordL2Ref("l2_engineSyncTarget", ref) if e.buildingSafe { e.metrics.RecordL2Ref("l2_pending_safe", ref) e.pendingSafeHead = ref diff --git a/op-node/rollup/derive/engine_queue.go b/op-node/rollup/derive/engine_queue.go index 08eeb71531827..a6c15d859abda 100644 --- a/op-node/rollup/derive/engine_queue.go +++ b/op-node/rollup/derive/engine_queue.go @@ -153,15 +153,13 @@ type EngineQueue struct { syncCfg *sync.Config } -var _ EngineControl = (*EngineQueue)(nil) - // NewEngineQueue creates a new EngineQueue, which should be Reset(origin) before use. -func NewEngineQueue(log log.Logger, cfg *rollup.Config, engine Engine, metrics Metrics, prev NextAttributesProvider, l1Fetcher L1Fetcher, syncCfg *sync.Config) *EngineQueue { +func NewEngineQueue(log log.Logger, cfg *rollup.Config, l2Source L2Source, engine LocalEngineControl, metrics Metrics, prev NextAttributesProvider, l1Fetcher L1Fetcher, syncCfg *sync.Config) *EngineQueue { return &EngineQueue{ log: log, cfg: cfg, - ec: NewEngineController(engine, log, metrics, cfg, syncCfg.SyncMode), - engine: engine, + ec: engine, + engine: l2Source, metrics: metrics, finalityData: make([]FinalityData, 0, finalityLookback), unsafePayloads: NewPayloadsQueue(maxUnsafePayloadsMemory, payloadMemSize), @@ -225,20 +223,12 @@ func (eq *EngineQueue) FinalizedL1() eth.L1BlockRef { return eq.finalizedL1 } -func (eq *EngineQueue) Finalized() eth.L2BlockRef { - return eq.ec.Finalized() -} - -func (eq *EngineQueue) UnsafeL2Head() eth.L2BlockRef { - return eq.ec.UnsafeL2Head() -} - -func (eq *EngineQueue) SafeL2Head() eth.L2BlockRef { - return eq.ec.SafeL2Head() -} - -func (eq *EngineQueue) PendingSafeL2Head() eth.L2BlockRef { - return eq.ec.PendingSafeL2Head() +func (eq *EngineQueue) HighestUnsafeBlock() eth.L2BlockRef { + ref, err := eq.unsafePayloads.HighestUnsafeBlock(eq.cfg) + if err != nil { + return eth.L2BlockRef{} + } + return ref } // Determine if the engine is syncing to the target block @@ -261,7 +251,7 @@ func (eq *EngineQueue) Step(ctx context.Context) error { // EOF error means we can't process the next unsafe payload. Then we should process next safe attributes. } if eq.isEngineSyncing() { - // Make pipeline first focus to sync unsafe blocks to engineSyncTarget + // The pipeline cannot move forwards if doing EL sync. return EngineELSyncing } if eq.safeAttributes != nil { diff --git a/op-node/rollup/derive/engine_queue_test.go b/op-node/rollup/derive/engine_queue_test.go index c8f84b8416549..204b56d33f054 100644 --- a/op-node/rollup/derive/engine_queue_test.go +++ b/op-node/rollup/derive/engine_queue_test.go @@ -248,12 +248,13 @@ func TestEngineQueue_Finalize(t *testing.T) { prev := &fakeAttributesQueue{} - eq := NewEngineQueue(logger, cfg, eng, metrics, prev, l1F, &sync.Config{}) + ec := NewEngineController(eng, logger, metrics, &rollup.Config{}, sync.CLSync) + eq := NewEngineQueue(logger, cfg, eng, ec, metrics, prev, l1F, &sync.Config{}) require.ErrorIs(t, eq.Reset(context.Background(), eth.L1BlockRef{}, eth.SystemConfig{}), io.EOF) - require.Equal(t, refB1, eq.SafeL2Head(), "L2 reset should go back to sequence window ago: blocks with origin E and D are not safe until we reconcile, C is extra, and B1 is the end we look for") + require.Equal(t, refB1, ec.SafeL2Head(), "L2 reset should go back to sequence window ago: blocks with origin E and D are not safe until we reconcile, C is extra, and B1 is the end we look for") require.Equal(t, refB, eq.Origin(), "Expecting to be set back derivation L1 progress to B") - require.Equal(t, refA1, eq.Finalized(), "A1 is recognized as finalized before we run any steps") + require.Equal(t, refA1, ec.Finalized(), "A1 is recognized as finalized before we run any steps") // now say C1 was included in D and became the new safe head eq.origin = refD @@ -270,7 +271,7 @@ func TestEngineQueue_Finalize(t *testing.T) { // let's finalize D (current L1), from which we fully derived C1 (it was safe head), but not D0 (included in E) eq.Finalize(refD) - require.Equal(t, refC1, eq.Finalized(), "C1 was included in finalized D, and should now be finalized") + require.Equal(t, refC1, ec.Finalized(), "C1 was included in finalized D, and should now be finalized") l1F.AssertExpectations(t) eng.AssertExpectations(t) @@ -483,12 +484,13 @@ func TestEngineQueue_ResetWhenUnsafeOriginNotCanonical(t *testing.T) { prev := &fakeAttributesQueue{origin: refE} - eq := NewEngineQueue(logger, cfg, eng, metrics, prev, l1F, &sync.Config{}) + ec := NewEngineController(eng, logger, metrics, &rollup.Config{}, sync.CLSync) + eq := NewEngineQueue(logger, cfg, eng, ec, metrics, prev, l1F, &sync.Config{}) require.ErrorIs(t, eq.Reset(context.Background(), eth.L1BlockRef{}, eth.SystemConfig{}), io.EOF) - require.Equal(t, refB1, eq.SafeL2Head(), "L2 reset should go back to sequence window ago: blocks with origin E and D are not safe until we reconcile, C is extra, and B1 is the end we look for") + require.Equal(t, refB1, ec.SafeL2Head(), "L2 reset should go back to sequence window ago: blocks with origin E and D are not safe until we reconcile, C is extra, and B1 is the end we look for") require.Equal(t, refB, eq.Origin(), "Expecting to be set back derivation L1 progress to B") - require.Equal(t, refA1, eq.Finalized(), "A1 is recognized as finalized before we run any steps") + require.Equal(t, refA1, ec.Finalized(), "A1 is recognized as finalized before we run any steps") // First step after reset will do a fork choice update eng.ExpectForkchoiceUpdate(ð.ForkchoiceState{ @@ -508,7 +510,6 @@ func TestEngineQueue_ResetWhenUnsafeOriginNotCanonical(t *testing.T) { ParentHash: refE.Hash, Time: refF.Time, } - eq.UnsafeL2Head() err = eq.Step(context.Background()) require.ErrorIs(t, err, ErrReset, "should reset pipeline due to mismatched origin") @@ -813,12 +814,13 @@ func TestVerifyNewL1Origin(t *testing.T) { }, nil) prev := &fakeAttributesQueue{origin: refE} - eq := NewEngineQueue(logger, cfg, eng, metrics, prev, l1F, &sync.Config{}) + ec := NewEngineController(eng, logger, metrics, &rollup.Config{}, sync.CLSync) + eq := NewEngineQueue(logger, cfg, eng, ec, metrics, prev, l1F, &sync.Config{}) require.ErrorIs(t, eq.Reset(context.Background(), eth.L1BlockRef{}, eth.SystemConfig{}), io.EOF) - require.Equal(t, refB1, eq.SafeL2Head(), "L2 reset should go back to sequence window ago: blocks with origin E and D are not safe until we reconcile, C is extra, and B1 is the end we look for") + require.Equal(t, refB1, ec.SafeL2Head(), "L2 reset should go back to sequence window ago: blocks with origin E and D are not safe until we reconcile, C is extra, and B1 is the end we look for") require.Equal(t, refB, eq.Origin(), "Expecting to be set back derivation L1 progress to B") - require.Equal(t, refA1, eq.Finalized(), "A1 is recognized as finalized before we run any steps") + require.Equal(t, refA1, ec.Finalized(), "A1 is recognized as finalized before we run any steps") // First step after reset will do a fork choice update eng.ExpectForkchoiceUpdate(ð.ForkchoiceState{ @@ -833,7 +835,6 @@ func TestVerifyNewL1Origin(t *testing.T) { // L1 chain reorgs so new origin is at same slot as refF but on a different fork prev.origin = test.newOrigin - eq.UnsafeL2Head() err = eq.Step(context.Background()) if test.expectReset { require.ErrorIs(t, err, ErrReset, "should reset pipeline due to mismatched origin") @@ -910,7 +911,8 @@ func TestBlockBuildingRace(t *testing.T) { } prev := &fakeAttributesQueue{origin: refA, attrs: attrs, islastInSpan: true} - eq := NewEngineQueue(logger, cfg, eng, metrics, prev, l1F, &sync.Config{}) + ec := NewEngineController(eng, logger, metrics, &rollup.Config{}, sync.CLSync) + eq := NewEngineQueue(logger, cfg, eng, ec, metrics, prev, l1F, &sync.Config{}) require.ErrorIs(t, eq.Reset(context.Background(), eth.L1BlockRef{}, eth.SystemConfig{}), io.EOF) id := eth.PayloadID{0xff} @@ -1001,7 +1003,7 @@ func TestBlockBuildingRace(t *testing.T) { // Now complete the job, as external user of the engine _, _, err = eq.ConfirmPayload(context.Background()) require.NoError(t, err) - require.Equal(t, refA1, eq.SafeL2Head(), "safe head should have changed") + require.Equal(t, refA1, ec.SafeL2Head(), "safe head should have changed") require.NoError(t, eq.Step(context.Background())) require.Nil(t, eq.safeAttributes, "attributes should now be invalidated") @@ -1080,7 +1082,8 @@ func TestResetLoop(t *testing.T) { prev := &fakeAttributesQueue{origin: refA, attrs: attrs, islastInSpan: true} - eq := NewEngineQueue(logger, cfg, eng, metrics.NoopMetrics, prev, l1F, &sync.Config{}) + ec := NewEngineController(eng, logger, metrics.NoopMetrics, &rollup.Config{}, sync.CLSync) + eq := NewEngineQueue(logger, cfg, eng, ec, metrics.NoopMetrics, prev, l1F, &sync.Config{}) eq.ec.SetUnsafeHead(refA2) eq.ec.SetSafeHead(refA1) eq.ec.SetFinalizedHead(refA0) @@ -1185,7 +1188,8 @@ func TestEngineQueue_StepPopOlderUnsafe(t *testing.T) { prev := &fakeAttributesQueue{origin: refA} - eq := NewEngineQueue(logger, cfg, eng, metrics.NoopMetrics, prev, l1F, &sync.Config{}) + ec := NewEngineController(eng, logger, metrics.NoopMetrics, &rollup.Config{}, sync.CLSync) + eq := NewEngineQueue(logger, cfg, eng, ec, metrics.NoopMetrics, prev, l1F, &sync.Config{}) eq.ec.SetUnsafeHead(refA2) eq.ec.SetSafeHead(refA0) eq.ec.SetFinalizedHead(refA0) diff --git a/op-node/rollup/derive/payloads_queue.go b/op-node/rollup/derive/payloads_queue.go index 78b1ffbb57e4d..3551c5c775646 100644 --- a/op-node/rollup/derive/payloads_queue.go +++ b/op-node/rollup/derive/payloads_queue.go @@ -7,6 +7,7 @@ import ( "github.com/ethereum/go-ethereum/common" + "github.com/ethereum-optimism/optimism/op-node/rollup" "github.com/ethereum-optimism/optimism/op-service/eth" ) @@ -76,11 +77,12 @@ func payloadMemSize(p *eth.ExecutionPayload) uint64 { // When the size grows too large, the first (lowest block-number) payload is removed from the queue. // PayloadsQueue allows entries with same block number, but does not allow duplicate blocks type PayloadsQueue struct { - pq payloadsByNumber - currentSize uint64 - MaxSize uint64 - blockHashes map[common.Hash]struct{} - SizeFn func(p *eth.ExecutionPayload) uint64 + pq payloadsByNumber + currentSize uint64 + MaxSize uint64 + blockHashes map[common.Hash]struct{} + SizeFn func(p *eth.ExecutionPayload) uint64 + highestBlock *eth.ExecutionPayload } func NewPayloadsQueue(maxSize uint64, sizeFn func(p *eth.ExecutionPayload) uint64) *PayloadsQueue { @@ -128,9 +130,19 @@ func (upq *PayloadsQueue) Push(p *eth.ExecutionPayload) error { upq.Pop() } upq.blockHashes[p.BlockHash] = struct{}{} + if upq.highestBlock == nil || upq.highestBlock.BlockNumber < p.BlockNumber { + upq.highestBlock = p + } return nil } +func (upq *PayloadsQueue) HighestUnsafeBlock(rollupCfg *rollup.Config) (eth.L2BlockRef, error) { + if upq.highestBlock == nil { + return eth.L2BlockRef{}, nil + } + return PayloadToBlockRef(rollupCfg, upq.highestBlock) +} + // Peek retrieves the payload with the lowest block number from the queue in O(1), or nil if the queue is empty. func (upq *PayloadsQueue) Peek() *eth.ExecutionPayload { if len(upq.pq) == 0 { diff --git a/op-node/rollup/derive/pipeline.go b/op-node/rollup/derive/pipeline.go index cceab5e446558..d4d6ca89962c7 100644 --- a/op-node/rollup/derive/pipeline.go +++ b/op-node/rollup/derive/pipeline.go @@ -32,33 +32,19 @@ type L1Fetcher interface { L1TransactionFetcher } -// ResettableEngineControl wraps EngineControl with reset-functionality, -// which handles reorgs like the derivation pipeline: -// by determining the last valid block references to continue from. -type ResettableEngineControl interface { - EngineControl - Reset() -} - type ResettableStage interface { // Reset resets a pull stage. `base` refers to the L1 Block Reference to reset to, with corresponding configuration. Reset(ctx context.Context, base eth.L1BlockRef, baseCfg eth.SystemConfig) error } type EngineQueueStage interface { - EngineControl - + HighestUnsafeBlock() eth.L2BlockRef FinalizedL1() eth.L1BlockRef - Finalized() eth.L2BlockRef - UnsafeL2Head() eth.L2BlockRef - SafeL2Head() eth.L2BlockRef - PendingSafeL2Head() eth.L2BlockRef Origin() eth.L1BlockRef SystemConfig() eth.SystemConfig Finalize(l1Origin eth.L1BlockRef) AddUnsafePayload(payload *eth.ExecutionPayload) - UnsafeL2SyncTarget() eth.L2BlockRef Step(context.Context) error } @@ -81,7 +67,8 @@ type DerivationPipeline struct { } // NewDerivationPipeline creates a derivation pipeline, which should be reset before use. -func NewDerivationPipeline(log log.Logger, rollupCfg *rollup.Config, l1Fetcher L1Fetcher, l1Blobs L1BlobsFetcher, engine Engine, metrics Metrics, syncCfg *sync.Config) *DerivationPipeline { + +func NewDerivationPipeline(log log.Logger, rollupCfg *rollup.Config, l1Fetcher L1Fetcher, l1Blobs L1BlobsFetcher, l2Source L2Source, engine LocalEngineControl, metrics Metrics, syncCfg *sync.Config) *DerivationPipeline { // Pull stages l1Traversal := NewL1Traversal(log, rollupCfg, l1Fetcher) @@ -90,12 +77,12 @@ func NewDerivationPipeline(log log.Logger, rollupCfg *rollup.Config, l1Fetcher L frameQueue := NewFrameQueue(log, l1Src) bank := NewChannelBank(log, rollupCfg, frameQueue, l1Fetcher, metrics) chInReader := NewChannelInReader(rollupCfg, log, bank, metrics) - batchQueue := NewBatchQueue(log, rollupCfg, chInReader, engine) - attrBuilder := NewFetchingAttributesBuilder(rollupCfg, l1Fetcher, engine) + batchQueue := NewBatchQueue(log, rollupCfg, chInReader, l2Source) + attrBuilder := NewFetchingAttributesBuilder(rollupCfg, l1Fetcher, l2Source) attributesQueue := NewAttributesQueue(log, rollupCfg, attrBuilder, batchQueue) // Step stages - eng := NewEngineQueue(log, rollupCfg, engine, metrics, attributesQueue, l1Fetcher, syncCfg) + eng := NewEngineQueue(log, rollupCfg, l2Source, engine, metrics, attributesQueue, l1Fetcher, syncCfg) // Reset from engine queue then up from L1 Traversal. The stages do not talk to each other during // the reset, but after the engine queue, this is the order in which the stages could talk to each other. @@ -140,47 +127,14 @@ func (dp *DerivationPipeline) FinalizedL1() eth.L1BlockRef { return dp.eng.FinalizedL1() } -func (dp *DerivationPipeline) Finalized() eth.L2BlockRef { - return dp.eng.Finalized() -} - -func (dp *DerivationPipeline) SafeL2Head() eth.L2BlockRef { - return dp.eng.SafeL2Head() -} - -func (dp *DerivationPipeline) PendingSafeL2Head() eth.L2BlockRef { - return dp.eng.PendingSafeL2Head() -} - -// UnsafeL2Head returns the head of the L2 chain that we are deriving for, this may be past what we derived from L1 -func (dp *DerivationPipeline) UnsafeL2Head() eth.L2BlockRef { - return dp.eng.UnsafeL2Head() -} - -func (dp *DerivationPipeline) StartPayload(ctx context.Context, parent eth.L2BlockRef, attrs *AttributesWithParent, updateSafe bool) (errType BlockInsertionErrType, err error) { - return dp.eng.StartPayload(ctx, parent, attrs, updateSafe) -} - -func (dp *DerivationPipeline) ConfirmPayload(ctx context.Context) (out *eth.ExecutionPayload, errTyp BlockInsertionErrType, err error) { - return dp.eng.ConfirmPayload(ctx) -} - -func (dp *DerivationPipeline) CancelPayload(ctx context.Context, force bool) error { - return dp.eng.CancelPayload(ctx, force) -} - -func (dp *DerivationPipeline) BuildingPayload() (onto eth.L2BlockRef, id eth.PayloadID, safe bool) { - return dp.eng.BuildingPayload() -} - // AddUnsafePayload schedules an execution payload to be processed, ahead of deriving it from L1 func (dp *DerivationPipeline) AddUnsafePayload(payload *eth.ExecutionPayload) { dp.eng.AddUnsafePayload(payload) } -// UnsafeL2SyncTarget retrieves the first queued-up L2 unsafe payload, or a zeroed reference if there is none. -func (dp *DerivationPipeline) UnsafeL2SyncTarget() eth.L2BlockRef { - return dp.eng.UnsafeL2SyncTarget() +// HighestUnsafeBlock returns the highest queued unsafe block +func (dp *DerivationPipeline) HighestUnsafeBlock() eth.L2BlockRef { + return dp.eng.HighestUnsafeBlock() } // Step tries to progress the buffer. diff --git a/op-node/rollup/driver/driver.go b/op-node/rollup/driver/driver.go index a88d54d8900f8..c105c4c43550f 100644 --- a/op-node/rollup/driver/driver.go +++ b/op-node/rollup/driver/driver.go @@ -56,15 +56,11 @@ type DerivationPipeline interface { Reset() Step(ctx context.Context) error AddUnsafePayload(payload *eth.ExecutionPayload) - UnsafeL2SyncTarget() eth.L2BlockRef Finalize(ref eth.L1BlockRef) FinalizedL1() eth.L1BlockRef - Finalized() eth.L2BlockRef - SafeL2Head() eth.L2BlockRef - UnsafeL2Head() eth.L2BlockRef - PendingSafeL2Head() eth.L2BlockRef Origin() eth.L1BlockRef EngineReady() bool + HighestUnsafeBlock() eth.L2BlockRef } type L1StateIface interface { @@ -122,15 +118,16 @@ func NewDriver(driverCfg *Config, cfg *rollup.Config, l2 L2Chain, l1 L1Chain, l1 sequencerConfDepth := NewConfDepth(driverCfg.SequencerConfDepth, l1State.L1Head, l1) findL1Origin := NewL1OriginSelector(log, cfg, sequencerConfDepth) verifConfDepth := NewConfDepth(driverCfg.VerifierConfDepth, l1State.L1Head, l1) - derivationPipeline := derive.NewDerivationPipeline(log, cfg, verifConfDepth, l1Blobs, l2, metrics, syncCfg) + engine := derive.NewEngineController(l2, log, metrics, cfg, syncCfg.SyncMode) + derivationPipeline := derive.NewDerivationPipeline(log, cfg, verifConfDepth, l1Blobs, l2, engine, metrics, syncCfg) attrBuilder := derive.NewFetchingAttributesBuilder(cfg, l1, l2) - engine := derivationPipeline - meteredEngine := NewMeteredEngine(cfg, engine, metrics, log) + meteredEngine := NewMeteredEngine(cfg, engine, metrics, log) // Only use the metered engine in the sequencer b/c it records sequencing metrics. sequencer := NewSequencer(log, cfg, meteredEngine, attrBuilder, findL1Origin, metrics) driverCtx, driverCancel := context.WithCancel(context.Background()) return &Driver{ l1State: l1State, derivation: derivationPipeline, + engineController: engine, stateReq: make(chan chan struct{}), forceReset: make(chan chan struct{}, 10), startSequencer: make(chan hashAndErrorChannel, 10), diff --git a/op-node/rollup/driver/metered_engine.go b/op-node/rollup/driver/metered_engine.go index 9cd5c8c907df7..e4c56c35b5e08 100644 --- a/op-node/rollup/driver/metered_engine.go +++ b/op-node/rollup/driver/metered_engine.go @@ -21,7 +21,7 @@ type EngineMetrics interface { // MeteredEngine wraps an EngineControl and adds metrics such as block building time diff and sealing time type MeteredEngine struct { - inner derive.ResettableEngineControl + inner derive.EngineControl cfg *rollup.Config metrics EngineMetrics @@ -30,10 +30,7 @@ type MeteredEngine struct { buildingStartTime time.Time } -// MeteredEngine implements derive.ResettableEngineControl -var _ derive.ResettableEngineControl = (*MeteredEngine)(nil) - -func NewMeteredEngine(cfg *rollup.Config, inner derive.ResettableEngineControl, metrics EngineMetrics, log log.Logger) *MeteredEngine { +func NewMeteredEngine(cfg *rollup.Config, inner derive.EngineControl, metrics EngineMetrics, log log.Logger) *MeteredEngine { return &MeteredEngine{ inner: inner, cfg: cfg, @@ -93,7 +90,3 @@ func (m *MeteredEngine) CancelPayload(ctx context.Context, force bool) error { func (m *MeteredEngine) BuildingPayload() (onto eth.L2BlockRef, id eth.PayloadID, safe bool) { return m.inner.BuildingPayload() } - -func (m *MeteredEngine) Reset() { - m.inner.Reset() -} diff --git a/op-node/rollup/driver/sequencer.go b/op-node/rollup/driver/sequencer.go index 4cdc91bf51b02..fbd707e282fec 100644 --- a/op-node/rollup/driver/sequencer.go +++ b/op-node/rollup/driver/sequencer.go @@ -34,7 +34,7 @@ type Sequencer struct { log log.Logger rollupCfg *rollup.Config - engine derive.ResettableEngineControl + engine derive.EngineControl attrBuilder derive.AttributesBuilder l1OriginSelector L1OriginSelectorIface @@ -47,7 +47,7 @@ type Sequencer struct { nextAction time.Time } -func NewSequencer(log log.Logger, rollupCfg *rollup.Config, engine derive.ResettableEngineControl, attributesBuilder derive.AttributesBuilder, l1OriginSelector L1OriginSelectorIface, metrics SequencerMetrics) *Sequencer { +func NewSequencer(log log.Logger, rollupCfg *rollup.Config, engine derive.EngineControl, attributesBuilder derive.AttributesBuilder, l1OriginSelector L1OriginSelectorIface, metrics SequencerMetrics) *Sequencer { return &Sequencer{ log: log, rollupCfg: rollupCfg, @@ -214,7 +214,7 @@ func (d *Sequencer) RunNextSequencerAction(ctx context.Context) (*eth.ExecutionP d.metrics.RecordSequencerReset() d.nextAction = d.timeNow().Add(time.Second * time.Duration(d.rollupCfg.BlockTime)) // hold off from sequencing for a full block d.CancelBuildingBlock(ctx) - d.engine.Reset() + return nil, err } else if errors.Is(err, derive.ErrTemporary) { d.log.Error("sequencer failed temporarily to seal new block", "err", err) d.nextAction = d.timeNow().Add(time.Second) @@ -239,7 +239,7 @@ func (d *Sequencer) RunNextSequencerAction(ctx context.Context) (*eth.ExecutionP d.log.Error("sequencer failed to seal new block, requiring derivation reset", "err", err) d.metrics.RecordSequencerReset() d.nextAction = d.timeNow().Add(time.Second * time.Duration(d.rollupCfg.BlockTime)) // hold off from sequencing for a full block - d.engine.Reset() + return nil, err } else if errors.Is(err, derive.ErrTemporary) { d.log.Error("sequencer temporarily failed to start building new block", "err", err) d.nextAction = d.timeNow().Add(time.Second) diff --git a/op-node/rollup/driver/sequencer_test.go b/op-node/rollup/driver/sequencer_test.go index e8026619f53c9..7fd23e832faf8 100644 --- a/op-node/rollup/driver/sequencer_test.go +++ b/op-node/rollup/driver/sequencer_test.go @@ -125,11 +125,7 @@ func (m *FakeEngineControl) resetBuildingState() { m.buildingAttrs = nil } -func (m *FakeEngineControl) Reset() { - m.err = nil -} - -var _ derive.ResettableEngineControl = (*FakeEngineControl)(nil) +var _ derive.EngineControl = (*FakeEngineControl)(nil) type testAttrBuilderFn func(ctx context.Context, l2Parent eth.L2BlockRef, epoch eth.BlockID) (attrs *eth.PayloadAttributes, err error) @@ -349,7 +345,11 @@ func TestSequencerChaosMonkey(t *testing.T) { // no error } payload, err := seq.RunNextSequencerAction(context.Background()) - require.NoError(t, err) + // RunNextSequencerAction passes ErrReset & ErrCritical through. + // Only supress ErrReset, not ErrCritical + if !errors.Is(err, derive.ErrReset) { + require.NoError(t, err) + } if payload != nil { require.Equal(t, engControl.UnsafeL2Head().ID(), payload.ID(), "head must stay in sync with emitted payloads") var tx types.Transaction diff --git a/op-node/rollup/driver/state.go b/op-node/rollup/driver/state.go index d91a06e506f5e..d2ccfbb0c4e7a 100644 --- a/op-node/rollup/driver/state.go +++ b/op-node/rollup/driver/state.go @@ -32,6 +32,10 @@ type Driver struct { // The derivation pipeline determines the new l2Safe. derivation DerivationPipeline + // The engine controller is used by the sequencer & derivation components. + // We will also use it for EL sync in a future PR. + engineController *derive.EngineController + // Requests to block the event loop for synchronous execution to avoid reading an inconsistent state stateReq chan chan struct{} @@ -229,7 +233,7 @@ func (s *Driver) eventLoop() { syncCheckInterval := time.Duration(s.config.BlockTime) * time.Second * 2 altSyncTicker := time.NewTicker(syncCheckInterval) defer altSyncTicker.Stop() - lastUnsafeL2 := s.derivation.UnsafeL2Head() + lastUnsafeL2 := s.engineController.UnsafeL2Head() for { if s.driverCtx.Err() != nil { // don't try to schedule/handle more work when we are closing. @@ -241,18 +245,18 @@ func (s *Driver) eventLoop() { // And avoid sequencing if the derivation pipeline indicates the engine is not ready. if s.driverConfig.SequencerEnabled && !s.driverConfig.SequencerStopped && s.l1State.L1Head() != (eth.L1BlockRef{}) && s.derivation.EngineReady() { - if s.driverConfig.SequencerMaxSafeLag > 0 && s.derivation.SafeL2Head().Number+s.driverConfig.SequencerMaxSafeLag <= s.derivation.UnsafeL2Head().Number { + if s.driverConfig.SequencerMaxSafeLag > 0 && s.engineController.SafeL2Head().Number+s.driverConfig.SequencerMaxSafeLag <= s.engineController.UnsafeL2Head().Number { // If the safe head has fallen behind by a significant number of blocks, delay creating new blocks // until the safe lag is below SequencerMaxSafeLag. if sequencerCh != nil { s.log.Warn( "Delay creating new block since safe lag exceeds limit", - "safe_l2", s.derivation.SafeL2Head(), - "unsafe_l2", s.derivation.UnsafeL2Head(), + "safe_l2", s.engineController.SafeL2Head(), + "unsafe_l2", s.engineController.UnsafeL2Head(), ) sequencerCh = nil } - } else if s.sequencer.BuildingOnto().ID() != s.derivation.UnsafeL2Head().ID() { + } else if s.sequencer.BuildingOnto().ID() != s.engineController.UnsafeL2Head().ID() { // If we are sequencing, and the L1 state is ready, update the trigger for the next sequencer action. // This may adjust at any time based on fork-choice changes or previous errors. // @@ -265,7 +269,7 @@ func (s *Driver) eventLoop() { // If the engine is not ready, or if the L2 head is actively changing, then reset the alt-sync: // there is no need to request L2 blocks when we are syncing already. - if head := s.derivation.UnsafeL2Head(); head != lastUnsafeL2 || !s.derivation.EngineReady() { + if head := s.engineController.UnsafeL2Head(); head != lastUnsafeL2 || !s.derivation.EngineReady() { lastUnsafeL2 = head altSyncTicker.Reset(syncCheckInterval) } @@ -273,7 +277,9 @@ func (s *Driver) eventLoop() { select { case <-sequencerCh: payload, err := s.sequencer.RunNextSequencerAction(s.driverCtx) - if err != nil { + if errors.Is(err, derive.ErrReset) { + s.derivation.Reset() + } else if err != nil { s.log.Error("Sequencer critical error", "err", err) return } @@ -325,7 +331,7 @@ func (s *Driver) eventLoop() { s.metrics.SetDerivationIdle(true) continue } else if err != nil && errors.Is(err, derive.EngineELSyncing) { - s.log.Debug("Derivation process went idle because the engine is syncing", "progress", s.derivation.Origin(), "unsafe_head", s.derivation.UnsafeL2Head(), "err", err) + s.log.Debug("Derivation process went idle because the engine is syncing", "progress", s.derivation.Origin(), "unsafe_head", s.engineController.UnsafeL2Head(), "err", err) stepAttempts = 0 s.metrics.SetDerivationIdle(true) continue @@ -362,7 +368,7 @@ func (s *Driver) eventLoop() { s.metrics.RecordPipelineReset() close(respCh) case resp := <-s.startSequencer: - unsafeHead := s.derivation.UnsafeL2Head().Hash + unsafeHead := s.engineController.UnsafeL2Head().Hash if !s.driverConfig.SequencerStopped { resp.err <- errors.New("sequencer already running") } else if !bytes.Equal(unsafeHead[:], resp.hash[:]) { @@ -390,7 +396,7 @@ func (s *Driver) eventLoop() { // Cancel any inflight block building. If we don't cancel this, we can resume sequencing an old block // even if we've received new unsafe heads in the interim, causing us to introduce a re-org. s.sequencer.CancelBuildingBlock(s.driverCtx) - respCh <- hashAndError{hash: s.derivation.UnsafeL2Head().Hash} + respCh <- hashAndError{hash: s.engineController.UnsafeL2Head().Hash} } case respCh := <-s.sequencerActive: respCh <- !s.driverConfig.SequencerStopped @@ -484,11 +490,10 @@ func (s *Driver) syncStatus() *eth.SyncStatus { HeadL1: s.l1State.L1Head(), SafeL1: s.l1State.L1Safe(), FinalizedL1: s.l1State.L1Finalized(), - UnsafeL2: s.derivation.UnsafeL2Head(), - SafeL2: s.derivation.SafeL2Head(), - FinalizedL2: s.derivation.Finalized(), - PendingSafeL2: s.derivation.PendingSafeL2Head(), - UnsafeL2SyncTarget: s.derivation.UnsafeL2SyncTarget(), + UnsafeL2: s.engineController.UnsafeL2Head(), + SafeL2: s.engineController.SafeL2Head(), + FinalizedL2: s.engineController.Finalized(), + PendingSafeL2: s.engineController.PendingSafeL2Head(), } } @@ -537,9 +542,9 @@ func (s *Driver) snapshot(event string) { "event", event, "l1Head", deferJSONString{s.l1State.L1Head()}, "l1Current", deferJSONString{s.derivation.Origin()}, - "l2Head", deferJSONString{s.derivation.UnsafeL2Head()}, - "l2Safe", deferJSONString{s.derivation.SafeL2Head()}, - "l2FinalizedHead", deferJSONString{s.derivation.Finalized()}) + "l2Head", deferJSONString{s.engineController.UnsafeL2Head()}, + "l2Safe", deferJSONString{s.engineController.SafeL2Head()}, + "l2FinalizedHead", deferJSONString{s.engineController.Finalized()}) } type hashAndError struct { @@ -556,8 +561,8 @@ type hashAndErrorChannel struct { // WARNING: This is only an outgoing signal, the blocks are not guaranteed to be retrieved. // Results are received through OnUnsafeL2Payload. func (s *Driver) checkForGapInUnsafeQueue(ctx context.Context) error { - start := s.derivation.UnsafeL2Head() - end := s.derivation.UnsafeL2SyncTarget() + start := s.engineController.UnsafeL2Head() + end := s.derivation.HighestUnsafeBlock() // Check if we have missing blocks between the start and end. Request them if we do. if end == (eth.L2BlockRef{}) { s.log.Debug("requesting sync with open-end range", "start", start) diff --git a/op-program/client/driver/driver.go b/op-program/client/driver/driver.go index 298587b407f60..2f98dfb7adb2f 100644 --- a/op-program/client/driver/driver.go +++ b/op-program/client/driver/driver.go @@ -20,6 +20,9 @@ var ( type Derivation interface { Step(ctx context.Context) error +} + +type EngineState interface { SafeL2Head() eth.L2BlockRef } @@ -31,16 +34,19 @@ type L2Source interface { type Driver struct { logger log.Logger pipeline Derivation + engine EngineState l2OutputRoot func(uint64) (eth.Bytes32, error) targetBlockNum uint64 } func NewDriver(logger log.Logger, cfg *rollup.Config, l1Source derive.L1Fetcher, l2Source L2Source, targetBlockNum uint64) *Driver { - pipeline := derive.NewDerivationPipeline(logger, cfg, l1Source, nil, l2Source, metrics.NoopMetrics, &sync.Config{}) + engine := derive.NewEngineController(l2Source, logger, metrics.NoopMetrics, cfg, sync.CLSync) + pipeline := derive.NewDerivationPipeline(logger, cfg, l1Source, nil, l2Source, engine, metrics.NoopMetrics, &sync.Config{}) pipeline.Reset() return &Driver{ logger: logger, pipeline: pipeline, + engine: engine, l2OutputRoot: l2Source.L2OutputRoot, targetBlockNum: targetBlockNum, } @@ -52,10 +58,10 @@ func NewDriver(logger log.Logger, cfg *rollup.Config, l1Source derive.L1Fetcher, // Returns a non-EOF error if the derivation failed func (d *Driver) Step(ctx context.Context) error { if err := d.pipeline.Step(ctx); errors.Is(err, io.EOF) { - d.logger.Info("Derivation complete: reached L1 head", "head", d.pipeline.SafeL2Head()) + d.logger.Info("Derivation complete: reached L1 head", "head", d.engine.SafeL2Head()) return io.EOF } else if errors.Is(err, derive.NotEnoughData) { - head := d.pipeline.SafeL2Head() + head := d.engine.SafeL2Head() if head.Number >= d.targetBlockNum { d.logger.Info("Derivation complete: reached L2 block", "head", head) return io.EOF @@ -74,7 +80,7 @@ func (d *Driver) Step(ctx context.Context) error { } func (d *Driver) SafeHead() eth.L2BlockRef { - return d.pipeline.SafeL2Head() + return d.engine.SafeL2Head() } func (d *Driver) ValidateClaim(l2ClaimBlockNum uint64, claimedOutputRoot eth.Bytes32) error { diff --git a/op-service/eth/sync_status.go b/op-service/eth/sync_status.go index 55618952f7f20..d6ff393a61d75 100644 --- a/op-service/eth/sync_status.go +++ b/op-service/eth/sync_status.go @@ -34,7 +34,4 @@ type SyncStatus struct { FinalizedL2 L2BlockRef `json:"finalized_l2"` // PendingSafeL2 points to the L2 block processed from the batch, but not consolidated to the safe block yet. PendingSafeL2 L2BlockRef `json:"pending_safe_l2"` - // UnsafeL2SyncTarget points to the first unprocessed unsafe L2 block. - // It may be zeroed if there is no targeted block. - UnsafeL2SyncTarget L2BlockRef `json:"queued_unsafe_l2"` }