Skip to content

Commit

Permalink
op-node: Initialize EngineController inside the Driver
Browse files Browse the repository at this point in the history
This moves the EngineController up to be able to use it without having
to intialize the Derivation Pipeline.
  • Loading branch information
trianglesphere committed Jan 13, 2024
1 parent 0a22f8b commit fe72b65
Show file tree
Hide file tree
Showing 17 changed files with 127 additions and 169 deletions.
12 changes: 6 additions & 6 deletions op-e2e/actions/l2_sequencer.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func NewL2Sequencer(t Testing, log log.Logger, l1 derive.L1Fetcher, eng L2API, c
}
return &L2Sequencer{
L2Verifier: *ver,
sequencer: driver.NewSequencer(log, cfg, ver.derivation, attrBuilder, l1OriginSelector, metrics.NoopMetrics),
sequencer: driver.NewSequencer(log, cfg, ver.engine, attrBuilder, l1OriginSelector, metrics.NoopMetrics),
mockL1OriginSelector: l1OriginSelector,
failL2GossipUnsafeBlock: nil,
}
Expand Down Expand Up @@ -104,7 +104,7 @@ func (s *L2Sequencer) ActL2EndBlock(t Testing) {

// ActL2KeepL1Origin makes the sequencer use the current L1 origin, even if the next origin is available.
func (s *L2Sequencer) ActL2KeepL1Origin(t Testing) {
parent := s.derivation.UnsafeL2Head()
parent := s.engine.UnsafeL2Head()
// force old origin, for testing purposes
oldOrigin, err := s.l1.L1BlockRefByHash(t.Ctx(), parent.L1Origin.Hash)
require.NoError(t, err, "failed to get current origin: %s", parent.L1Origin)
Expand All @@ -113,7 +113,7 @@ func (s *L2Sequencer) ActL2KeepL1Origin(t Testing) {

// ActBuildToL1Head builds empty blocks until (incl.) the L1 head becomes the L2 origin
func (s *L2Sequencer) ActBuildToL1Head(t Testing) {
for s.derivation.UnsafeL2Head().L1Origin.Number < s.l1State.L1Head().Number {
for s.engine.UnsafeL2Head().L1Origin.Number < s.l1State.L1Head().Number {
s.ActL2PipelineFull(t)
s.ActL2StartBlock(t)
s.ActL2EndBlock(t)
Expand All @@ -122,7 +122,7 @@ func (s *L2Sequencer) ActBuildToL1Head(t Testing) {

// ActBuildToL1HeadUnsafe builds empty blocks until (incl.) the L1 head becomes the L1 origin of the L2 head
func (s *L2Sequencer) ActBuildToL1HeadUnsafe(t Testing) {
for s.derivation.UnsafeL2Head().L1Origin.Number < s.l1State.L1Head().Number {
for s.engine.UnsafeL2Head().L1Origin.Number < s.l1State.L1Head().Number {
// Note: the derivation pipeline does not run, we are just sequencing a block on top of the existing L2 chain.
s.ActL2StartBlock(t)
s.ActL2EndBlock(t)
Expand All @@ -133,7 +133,7 @@ func (s *L2Sequencer) ActBuildToL1HeadUnsafe(t Testing) {
func (s *L2Sequencer) ActBuildToL1HeadExcl(t Testing) {
for {
s.ActL2PipelineFull(t)
nextOrigin, err := s.mockL1OriginSelector.FindL1Origin(t.Ctx(), s.derivation.UnsafeL2Head())
nextOrigin, err := s.mockL1OriginSelector.FindL1Origin(t.Ctx(), s.engine.UnsafeL2Head())
require.NoError(t, err)
if nextOrigin.Number >= s.l1State.L1Head().Number {
break
Expand All @@ -147,7 +147,7 @@ func (s *L2Sequencer) ActBuildToL1HeadExcl(t Testing) {
func (s *L2Sequencer) ActBuildToL1HeadExclUnsafe(t Testing) {
for {
// Note: the derivation pipeline does not run, we are just sequencing a block on top of the existing L2 chain.
nextOrigin, err := s.mockL1OriginSelector.FindL1Origin(t.Ctx(), s.derivation.UnsafeL2Head())
nextOrigin, err := s.mockL1OriginSelector.FindL1Origin(t.Ctx(), s.engine.UnsafeL2Head())
require.NoError(t, err)
if nextOrigin.Number >= s.l1State.L1Head().Number {
break
Expand Down
14 changes: 8 additions & 6 deletions op-e2e/actions/l2_verifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ type L2Verifier struct {
}

// L2 rollup
engine *derive.EngineController
derivation *derive.DerivationPipeline

l1 derive.L1Fetcher
Expand All @@ -59,12 +60,14 @@ type L2API interface {

func NewL2Verifier(t Testing, log log.Logger, l1 derive.L1Fetcher, eng L2API, cfg *rollup.Config, syncCfg *sync.Config) *L2Verifier {
metrics := &testutils.TestDerivationMetrics{}
pipeline := derive.NewDerivationPipeline(log, cfg, l1, nil, eng, metrics, syncCfg)
engine := derive.NewEngineController(eng, log, metrics, cfg, syncCfg.SyncMode)
pipeline := derive.NewDerivationPipeline(log, cfg, l1, nil, eng, engine, metrics, syncCfg)
pipeline.Reset()

rollupNode := &L2Verifier{
log: log,
eng: eng,
engine: engine,
derivation: pipeline,
l1: l1,
l1State: driver.NewL1State(log, metrics),
Expand Down Expand Up @@ -132,19 +135,19 @@ func (s *l2VerifierBackend) OnUnsafeL2Payload(ctx context.Context, payload *eth.
}

func (s *L2Verifier) L2Finalized() eth.L2BlockRef {
return s.derivation.Finalized()
return s.engine.Finalized()
}

func (s *L2Verifier) L2Safe() eth.L2BlockRef {
return s.derivation.SafeL2Head()
return s.engine.SafeL2Head()
}

func (s *L2Verifier) L2PendingSafe() eth.L2BlockRef {
return s.derivation.PendingSafeL2Head()
return s.engine.PendingSafeL2Head()
}

func (s *L2Verifier) L2Unsafe() eth.L2BlockRef {
return s.derivation.UnsafeL2Head()
return s.engine.UnsafeL2Head()
}

func (s *L2Verifier) SyncStatus() *eth.SyncStatus {
Expand All @@ -158,7 +161,6 @@ func (s *L2Verifier) SyncStatus() *eth.SyncStatus {
SafeL2: s.L2Safe(),
FinalizedL2: s.L2Finalized(),
PendingSafeL2: s.L2PendingSafe(),
UnsafeL2SyncTarget: s.derivation.UnsafeL2SyncTarget(),
}
}

Expand Down
4 changes: 2 additions & 2 deletions op-e2e/actions/reorg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -833,11 +833,11 @@ func SyncAfterReorg(gt *testing.T, deltaTimeOffset *hexutil.Uint64) {
miner.ActL1SetFeeRecipient(common.Address{'A', 0})
miner.ActEmptyBlock(t)
sequencer.ActL1HeadSignal(t)
for sequencer.derivation.UnsafeL2Head().L1Origin.Number < sequencer.l1State.L1Head().Number {
for sequencer.engine.UnsafeL2Head().L1Origin.Number < sequencer.l1State.L1Head().Number {
// build L2 blocks until the L1 origin is the current L1 head(A0)
sequencer.ActL2PipelineFull(t)
sequencer.ActL2StartBlock(t)
if sequencer.derivation.UnsafeL2Head().Number == 11 {
if sequencer.engine.UnsafeL2Head().Number == 11 {
// include a user tx at L2 block #12 to make a state transition
alice.L2.ActResetTxOpts(t)
alice.L2.ActSetTxToAddr(&dp.Addresses.Bob)(t)
Expand Down
4 changes: 2 additions & 2 deletions op-e2e/actions/span_batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -509,7 +509,7 @@ func TestSpanBatchLowThroughputChain(gt *testing.T) {
totalTxCount := 0
// Make 600 L2 blocks (L1BlockTime / L2BlockTime * 50) including 1~3 txs
for i := 0; i < 50; i++ {
for sequencer.derivation.UnsafeL2Head().L1Origin.Number < sequencer.l1State.L1Head().Number {
for sequencer.engine.UnsafeL2Head().L1Origin.Number < sequencer.l1State.L1Head().Number {
sequencer.ActL2StartBlock(t)
// fill the block with random number of L2 txs
for j := 0; j < rand.Intn(3); j++ {
Expand Down Expand Up @@ -646,7 +646,7 @@ func TestBatchEquivalence(gt *testing.T) {
sequencer.ActL2PipelineFull(t)
totalTxCount := 0
// Build random blocks
for sequencer.derivation.UnsafeL2Head().L1Origin.Number < sequencer.l1State.L1Head().Number {
for sequencer.engine.UnsafeL2Head().L1Origin.Number < sequencer.l1State.L1Head().Number {
sequencer.ActL2StartBlock(t)
// fill the block with random number of L2 txs
for j := 0; j < rand.Intn(3); j++ {
Expand Down
1 change: 0 additions & 1 deletion op-node/node/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,6 @@ func randomSyncStatus(rng *rand.Rand) *eth.SyncStatus {
SafeL2: testutils.RandomL2BlockRef(rng),
FinalizedL2: testutils.RandomL2BlockRef(rng),
PendingSafeL2: testutils.RandomL2BlockRef(rng),
UnsafeL2SyncTarget: testutils.RandomL2BlockRef(rng),
}
}

Expand Down
3 changes: 1 addition & 2 deletions op-node/rollup/derive/engine_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ type EngineController struct {

// Block Head State
unsafeHead eth.L2BlockRef
pendingSafeHead eth.L2BlockRef
pendingSafeHead eth.L2BlockRef // L2 block processed from the middle of a span batch, but not marked as the safe block yet.
safeHead eth.L2BlockRef
finalizedHead eth.L2BlockRef
needFCUCall bool
Expand Down Expand Up @@ -165,7 +165,6 @@ func (e *EngineController) ConfirmPayload(ctx context.Context) (out *eth.Executi
e.unsafeHead = ref

e.metrics.RecordL2Ref("l2_unsafe", ref)
e.metrics.RecordL2Ref("l2_engineSyncTarget", ref)
if e.buildingSafe {
e.metrics.RecordL2Ref("l2_pending_safe", ref)
e.pendingSafeHead = ref
Expand Down
30 changes: 10 additions & 20 deletions op-node/rollup/derive/engine_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,15 +153,13 @@ type EngineQueue struct {
syncCfg *sync.Config
}

var _ EngineControl = (*EngineQueue)(nil)

// NewEngineQueue creates a new EngineQueue, which should be Reset(origin) before use.
func NewEngineQueue(log log.Logger, cfg *rollup.Config, engine Engine, metrics Metrics, prev NextAttributesProvider, l1Fetcher L1Fetcher, syncCfg *sync.Config) *EngineQueue {
func NewEngineQueue(log log.Logger, cfg *rollup.Config, l2Source L2Source, engine LocalEngineControl, metrics Metrics, prev NextAttributesProvider, l1Fetcher L1Fetcher, syncCfg *sync.Config) *EngineQueue {
return &EngineQueue{
log: log,
cfg: cfg,
ec: NewEngineController(engine, log, metrics, cfg, syncCfg.SyncMode),
engine: engine,
ec: engine,
engine: l2Source,
metrics: metrics,
finalityData: make([]FinalityData, 0, finalityLookback),
unsafePayloads: NewPayloadsQueue(maxUnsafePayloadsMemory, payloadMemSize),
Expand Down Expand Up @@ -225,20 +223,12 @@ func (eq *EngineQueue) FinalizedL1() eth.L1BlockRef {
return eq.finalizedL1
}

func (eq *EngineQueue) Finalized() eth.L2BlockRef {
return eq.ec.Finalized()
}

func (eq *EngineQueue) UnsafeL2Head() eth.L2BlockRef {
return eq.ec.UnsafeL2Head()
}

func (eq *EngineQueue) SafeL2Head() eth.L2BlockRef {
return eq.ec.SafeL2Head()
}

func (eq *EngineQueue) PendingSafeL2Head() eth.L2BlockRef {
return eq.ec.PendingSafeL2Head()
func (eq *EngineQueue) HighestUnsafeBlock() eth.L2BlockRef {
ref, err := eq.unsafePayloads.HighestUnsafeBlock(eq.cfg)
if err != nil {
return eth.L2BlockRef{}
}
return ref
}

// Determine if the engine is syncing to the target block
Expand All @@ -261,7 +251,7 @@ func (eq *EngineQueue) Step(ctx context.Context) error {
// EOF error means we can't process the next unsafe payload. Then we should process next safe attributes.
}
if eq.isEngineSyncing() {
// Make pipeline first focus to sync unsafe blocks to engineSyncTarget
// The pipeline cannot move forwards if doing EL sync.
return EngineELSyncing
}
if eq.safeAttributes != nil {
Expand Down
36 changes: 20 additions & 16 deletions op-node/rollup/derive/engine_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,12 +248,13 @@ func TestEngineQueue_Finalize(t *testing.T) {

prev := &fakeAttributesQueue{}

eq := NewEngineQueue(logger, cfg, eng, metrics, prev, l1F, &sync.Config{})
ec := NewEngineController(eng, logger, metrics, &rollup.Config{}, sync.CLSync)
eq := NewEngineQueue(logger, cfg, eng, ec, metrics, prev, l1F, &sync.Config{})
require.ErrorIs(t, eq.Reset(context.Background(), eth.L1BlockRef{}, eth.SystemConfig{}), io.EOF)

require.Equal(t, refB1, eq.SafeL2Head(), "L2 reset should go back to sequence window ago: blocks with origin E and D are not safe until we reconcile, C is extra, and B1 is the end we look for")
require.Equal(t, refB1, ec.SafeL2Head(), "L2 reset should go back to sequence window ago: blocks with origin E and D are not safe until we reconcile, C is extra, and B1 is the end we look for")
require.Equal(t, refB, eq.Origin(), "Expecting to be set back derivation L1 progress to B")
require.Equal(t, refA1, eq.Finalized(), "A1 is recognized as finalized before we run any steps")
require.Equal(t, refA1, ec.Finalized(), "A1 is recognized as finalized before we run any steps")

// now say C1 was included in D and became the new safe head
eq.origin = refD
Expand All @@ -270,7 +271,7 @@ func TestEngineQueue_Finalize(t *testing.T) {
// let's finalize D (current L1), from which we fully derived C1 (it was safe head), but not D0 (included in E)
eq.Finalize(refD)

require.Equal(t, refC1, eq.Finalized(), "C1 was included in finalized D, and should now be finalized")
require.Equal(t, refC1, ec.Finalized(), "C1 was included in finalized D, and should now be finalized")

l1F.AssertExpectations(t)
eng.AssertExpectations(t)
Expand Down Expand Up @@ -483,12 +484,13 @@ func TestEngineQueue_ResetWhenUnsafeOriginNotCanonical(t *testing.T) {

prev := &fakeAttributesQueue{origin: refE}

eq := NewEngineQueue(logger, cfg, eng, metrics, prev, l1F, &sync.Config{})
ec := NewEngineController(eng, logger, metrics, &rollup.Config{}, sync.CLSync)
eq := NewEngineQueue(logger, cfg, eng, ec, metrics, prev, l1F, &sync.Config{})
require.ErrorIs(t, eq.Reset(context.Background(), eth.L1BlockRef{}, eth.SystemConfig{}), io.EOF)

require.Equal(t, refB1, eq.SafeL2Head(), "L2 reset should go back to sequence window ago: blocks with origin E and D are not safe until we reconcile, C is extra, and B1 is the end we look for")
require.Equal(t, refB1, ec.SafeL2Head(), "L2 reset should go back to sequence window ago: blocks with origin E and D are not safe until we reconcile, C is extra, and B1 is the end we look for")
require.Equal(t, refB, eq.Origin(), "Expecting to be set back derivation L1 progress to B")
require.Equal(t, refA1, eq.Finalized(), "A1 is recognized as finalized before we run any steps")
require.Equal(t, refA1, ec.Finalized(), "A1 is recognized as finalized before we run any steps")

// First step after reset will do a fork choice update
eng.ExpectForkchoiceUpdate(&eth.ForkchoiceState{
Expand All @@ -508,7 +510,6 @@ func TestEngineQueue_ResetWhenUnsafeOriginNotCanonical(t *testing.T) {
ParentHash: refE.Hash,
Time: refF.Time,
}
eq.UnsafeL2Head()
err = eq.Step(context.Background())
require.ErrorIs(t, err, ErrReset, "should reset pipeline due to mismatched origin")

Expand Down Expand Up @@ -813,12 +814,13 @@ func TestVerifyNewL1Origin(t *testing.T) {
}, nil)

prev := &fakeAttributesQueue{origin: refE}
eq := NewEngineQueue(logger, cfg, eng, metrics, prev, l1F, &sync.Config{})
ec := NewEngineController(eng, logger, metrics, &rollup.Config{}, sync.CLSync)
eq := NewEngineQueue(logger, cfg, eng, ec, metrics, prev, l1F, &sync.Config{})
require.ErrorIs(t, eq.Reset(context.Background(), eth.L1BlockRef{}, eth.SystemConfig{}), io.EOF)

require.Equal(t, refB1, eq.SafeL2Head(), "L2 reset should go back to sequence window ago: blocks with origin E and D are not safe until we reconcile, C is extra, and B1 is the end we look for")
require.Equal(t, refB1, ec.SafeL2Head(), "L2 reset should go back to sequence window ago: blocks with origin E and D are not safe until we reconcile, C is extra, and B1 is the end we look for")
require.Equal(t, refB, eq.Origin(), "Expecting to be set back derivation L1 progress to B")
require.Equal(t, refA1, eq.Finalized(), "A1 is recognized as finalized before we run any steps")
require.Equal(t, refA1, ec.Finalized(), "A1 is recognized as finalized before we run any steps")

// First step after reset will do a fork choice update
eng.ExpectForkchoiceUpdate(&eth.ForkchoiceState{
Expand All @@ -833,7 +835,6 @@ func TestVerifyNewL1Origin(t *testing.T) {

// L1 chain reorgs so new origin is at same slot as refF but on a different fork
prev.origin = test.newOrigin
eq.UnsafeL2Head()
err = eq.Step(context.Background())
if test.expectReset {
require.ErrorIs(t, err, ErrReset, "should reset pipeline due to mismatched origin")
Expand Down Expand Up @@ -910,7 +911,8 @@ func TestBlockBuildingRace(t *testing.T) {
}

prev := &fakeAttributesQueue{origin: refA, attrs: attrs, islastInSpan: true}
eq := NewEngineQueue(logger, cfg, eng, metrics, prev, l1F, &sync.Config{})
ec := NewEngineController(eng, logger, metrics, &rollup.Config{}, sync.CLSync)
eq := NewEngineQueue(logger, cfg, eng, ec, metrics, prev, l1F, &sync.Config{})
require.ErrorIs(t, eq.Reset(context.Background(), eth.L1BlockRef{}, eth.SystemConfig{}), io.EOF)

id := eth.PayloadID{0xff}
Expand Down Expand Up @@ -1001,7 +1003,7 @@ func TestBlockBuildingRace(t *testing.T) {
// Now complete the job, as external user of the engine
_, _, err = eq.ConfirmPayload(context.Background())
require.NoError(t, err)
require.Equal(t, refA1, eq.SafeL2Head(), "safe head should have changed")
require.Equal(t, refA1, ec.SafeL2Head(), "safe head should have changed")

require.NoError(t, eq.Step(context.Background()))
require.Nil(t, eq.safeAttributes, "attributes should now be invalidated")
Expand Down Expand Up @@ -1080,7 +1082,8 @@ func TestResetLoop(t *testing.T) {

prev := &fakeAttributesQueue{origin: refA, attrs: attrs, islastInSpan: true}

eq := NewEngineQueue(logger, cfg, eng, metrics.NoopMetrics, prev, l1F, &sync.Config{})
ec := NewEngineController(eng, logger, metrics.NoopMetrics, &rollup.Config{}, sync.CLSync)
eq := NewEngineQueue(logger, cfg, eng, ec, metrics.NoopMetrics, prev, l1F, &sync.Config{})
eq.ec.SetUnsafeHead(refA2)
eq.ec.SetSafeHead(refA1)
eq.ec.SetFinalizedHead(refA0)
Expand Down Expand Up @@ -1185,7 +1188,8 @@ func TestEngineQueue_StepPopOlderUnsafe(t *testing.T) {

prev := &fakeAttributesQueue{origin: refA}

eq := NewEngineQueue(logger, cfg, eng, metrics.NoopMetrics, prev, l1F, &sync.Config{})
ec := NewEngineController(eng, logger, metrics.NoopMetrics, &rollup.Config{}, sync.CLSync)
eq := NewEngineQueue(logger, cfg, eng, ec, metrics.NoopMetrics, prev, l1F, &sync.Config{})
eq.ec.SetUnsafeHead(refA2)
eq.ec.SetSafeHead(refA0)
eq.ec.SetFinalizedHead(refA0)
Expand Down
22 changes: 17 additions & 5 deletions op-node/rollup/derive/payloads_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

"github.com/ethereum/go-ethereum/common"

"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-service/eth"
)

Expand Down Expand Up @@ -76,11 +77,12 @@ func payloadMemSize(p *eth.ExecutionPayload) uint64 {
// When the size grows too large, the first (lowest block-number) payload is removed from the queue.
// PayloadsQueue allows entries with same block number, but does not allow duplicate blocks
type PayloadsQueue struct {
pq payloadsByNumber
currentSize uint64
MaxSize uint64
blockHashes map[common.Hash]struct{}
SizeFn func(p *eth.ExecutionPayload) uint64
pq payloadsByNumber
currentSize uint64
MaxSize uint64
blockHashes map[common.Hash]struct{}
SizeFn func(p *eth.ExecutionPayload) uint64
highestBlock *eth.ExecutionPayload
}

func NewPayloadsQueue(maxSize uint64, sizeFn func(p *eth.ExecutionPayload) uint64) *PayloadsQueue {
Expand Down Expand Up @@ -128,9 +130,19 @@ func (upq *PayloadsQueue) Push(p *eth.ExecutionPayload) error {
upq.Pop()
}
upq.blockHashes[p.BlockHash] = struct{}{}
if upq.highestBlock == nil || upq.highestBlock.BlockNumber < p.BlockNumber {
upq.highestBlock = p
}
return nil
}

func (upq *PayloadsQueue) HighestUnsafeBlock(rollupCfg *rollup.Config) (eth.L2BlockRef, error) {
if upq.highestBlock == nil {
return eth.L2BlockRef{}, nil
}
return PayloadToBlockRef(rollupCfg, upq.highestBlock)
}

// Peek retrieves the payload with the lowest block number from the queue in O(1), or nil if the queue is empty.
func (upq *PayloadsQueue) Peek() *eth.ExecutionPayload {
if len(upq.pq) == 0 {
Expand Down
Loading

0 comments on commit fe72b65

Please sign in to comment.