Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

op-node: Execution Layer Sync #8968

Merged
merged 4 commits into from
Jan 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion op-e2e/actions/l2_engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/rpc"

"github.com/ethereum-optimism/optimism/op-node/rollup"
Expand Down Expand Up @@ -50,7 +51,7 @@ type EngineOption func(ethCfg *ethconfig.Config, nodeCfg *node.Config) error

func NewL2Engine(t Testing, log log.Logger, genesis *core.Genesis, rollupGenesisL1 eth.BlockID, jwtPath string, options ...EngineOption) *L2Engine {
n, ethBackend, apiBackend := newBackend(t, genesis, jwtPath, options)
engineApi := engineapi.NewL2EngineAPI(log, apiBackend)
engineApi := engineapi.NewL2EngineAPI(log, apiBackend, ethBackend.Downloader())
chain := ethBackend.BlockChain()
genesisBlock := chain.Genesis()
eng := &L2Engine{
Expand Down Expand Up @@ -131,6 +132,16 @@ func (e *engineApiBackend) Genesis() *core.Genesis {
return e.genesis
}

func (s *L2Engine) Enode() *enode.Node {
return s.node.Server().LocalNode().Node()
}

func (s *L2Engine) AddPeers(peers ...*enode.Node) {
for _, en := range peers {
s.node.Server().AddPeer(en)
}
}

func (s *L2Engine) EthClient() *ethclient.Client {
cl := s.node.Attach()
return ethclient.NewClient(cl)
Expand Down
22 changes: 21 additions & 1 deletion op-e2e/actions/l2_sequencer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@ import (
"testing"

"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/eth/ethconfig"
"github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/params"

"github.com/ethereum/go-ethereum/common"
Expand All @@ -16,14 +20,30 @@ import (
"github.com/ethereum-optimism/optimism/op-service/testlog"
)

func EngineWithP2P() EngineOption {
return func(ethCfg *ethconfig.Config, nodeCfg *node.Config) error {
p2pKey, err := crypto.GenerateKey()
if err != nil {
return err
}
nodeCfg.P2P = p2p.Config{
MaxPeers: 100,
NoDiscovery: true,
ListenAddr: "127.0.0.1:0",
PrivateKey: p2pKey,
}
return nil
}
}

func setupSequencerTest(t Testing, sd *e2eutils.SetupData, log log.Logger) (*L1Miner, *L2Engine, *L2Sequencer) {
jwtPath := e2eutils.WriteDefaultJWT(t)

miner := NewL1Miner(t, log, sd.L1Cfg)

l1F, err := sources.NewL1Client(miner.RPCClient(), log, nil, sources.L1ClientDefaultConfig(sd.RollupCfg, false, sources.RPCKindStandard))
require.NoError(t, err)
engine := NewL2Engine(t, log, sd.L2Cfg, sd.RollupCfg.Genesis.L1, jwtPath)
engine := NewL2Engine(t, log, sd.L2Cfg, sd.RollupCfg.Genesis.L1, jwtPath, EngineWithP2P())
l2Cl, err := sources.NewEngineClient(engine.RPCClient(), log, nil, sources.EngineClientDefaultConfig(sd.RollupCfg))
require.NoError(t, err)

Expand Down
10 changes: 10 additions & 0 deletions op-e2e/actions/l2_verifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,3 +254,13 @@ func (s *L2Verifier) ActL2UnsafeGossipReceive(payload *eth.ExecutionPayloadEnvel
s.derivation.AddUnsafePayload(payload)
}
}

// ActL2InsertUnsafePayload creates an action that can insert an unsafe execution payload
func (s *L2Verifier) ActL2InsertUnsafePayload(payload *eth.ExecutionPayloadEnvelope) Action {
return func(t Testing) {
ref, err := derive.PayloadToBlockRef(s.rollupCfg, payload.ExecutionPayload)
require.NoError(t, err)
err = s.engine.InsertUnsafePayload(t.Ctx(), payload, ref)
require.NoError(t, err)
}
}
2 changes: 1 addition & 1 deletion op-e2e/actions/l2_verifier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (

func setupVerifier(t Testing, sd *e2eutils.SetupData, log log.Logger, l1F derive.L1Fetcher, syncCfg *sync.Config) (*L2Engine, *L2Verifier) {
jwtPath := e2eutils.WriteDefaultJWT(t)
engine := NewL2Engine(t, log, sd.L2Cfg, sd.RollupCfg.Genesis.L1, jwtPath)
engine := NewL2Engine(t, log, sd.L2Cfg, sd.RollupCfg.Genesis.L1, jwtPath, EngineWithP2P())
engCl := engine.EngineClient(t, sd.RollupCfg)
mockBlobFetcher := &emptyL1BlobsFetcher{t: t}
verifier := NewL2Verifier(t, log, l1F, mockBlobFetcher, engCl, sd.RollupCfg, syncCfg)
Expand Down
45 changes: 26 additions & 19 deletions op-e2e/actions/sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"math/big"
"math/rand"
"testing"
"time"

"github.com/ethereum-optimism/optimism/op-batcher/compressor"
"github.com/ethereum-optimism/optimism/op-e2e/e2eutils"
Expand Down Expand Up @@ -164,43 +165,49 @@ func TestUnsafeSync(gt *testing.T) {
// TestELSync tests that a verifier will have the EL import the full chain from the sequencer
// when passed a single unsafe block. op-geth can either snap sync or full sync here.
func TestELSync(gt *testing.T) {
gt.Skip("not implemented yet")
t := NewDefaultTesting(gt)
dp := e2eutils.MakeDeployParams(t, defaultRollupTestParams)
sd := e2eutils.Setup(t, dp, defaultAlloc)
log := testlog.Logger(t, log.LvlInfo)

miner, seqEng, sequencer := setupSequencerTest(t, sd, log)
// Enable engine P2P sync
_, verifier := setupVerifier(t, sd, log, miner.L1Client(t, sd.RollupCfg), &sync.Config{SyncMode: sync.ELSync})
verEng, verifier := setupVerifier(t, sd, log, miner.L1Client(t, sd.RollupCfg), &sync.Config{SyncMode: sync.ELSync})

seqEng.AddPeers(verEng.Enode())
verEng.AddPeers(seqEng.Enode())

seqEngCl, err := sources.NewEngineClient(seqEng.RPCClient(), log, nil, sources.EngineClientDefaultConfig(sd.RollupCfg))
require.NoError(t, err)

sequencer.ActL2PipelineFull(t)
verifier.ActL2PipelineFull(t)

// Build a L2 block. This block will not be gossiped to verifier, so verifier can not advance chain by itself.
sequencer.ActL2StartBlock(t)
sequencer.ActL2EndBlock(t)

// Build 10 L1 blocks on the sequencer
for i := 0; i < 10; i++ {
// Build a L2 block
sequencer.ActL2StartBlock(t)
sequencer.ActL2EndBlock(t)
// Notify new L2 block to verifier by unsafe gossip
seqHead, err := seqEngCl.PayloadByLabel(t.Ctx(), eth.Unsafe)
require.NoError(t, err)
verifier.ActL2UnsafeGossipReceive(seqHead)(t)
// Handle unsafe payload
verifier.ActL2PipelineFull(t)
// Verifier must advance unsafe head after unsafe gossip.
require.Equal(t, sequencer.L2Unsafe().Hash, verifier.L2Unsafe().Hash)
}
// Actual test flow should be as follows:
// 1. Build a chain on the sequencer.
// 2. Gossip only a single final L2 block from the sequencer to the verifier.
// 3. Assert that the verifier has the full chain.

// Insert it on the verifier
seqHead, err := seqEngCl.PayloadByLabel(t.Ctx(), eth.Unsafe)
require.NoError(t, err)
seqStart, err := seqEngCl.PayloadByNumber(t.Ctx(), 1)
require.NoError(t, err)
verifier.ActL2InsertUnsafePayload(seqHead)(t)

// Expect snap sync to download & execute the entire chain
// Verify this by checking that the verifier has the correct value for block 1
require.Eventually(t,
func() bool {
block, err := verifier.eng.L2BlockRefByNumber(t.Ctx(), 1)
if err != nil {
return false
}
return seqStart.ExecutionPayload.BlockHash == block.Hash
},
60*time.Second, 1500*time.Millisecond,
)
}

func TestInvalidPayloadInSpanBatch(gt *testing.T) {
Expand Down
1 change: 1 addition & 0 deletions op-e2e/system_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,7 @@ func TestSystemE2E(t *testing.T) {

sys, err := cfg.Start(t)
require.Nil(t, err, "Error starting up system")
runE2ESystemTest(t, sys)
defer sys.Close()
}

Expand Down
3 changes: 2 additions & 1 deletion op-node/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -409,7 +409,8 @@ func (n *OpNode) initPProf(cfg *Config) error {

func (n *OpNode) initP2P(ctx context.Context, cfg *Config) error {
if cfg.P2P != nil {
p2pNode, err := p2p.NewNodeP2P(n.resourcesCtx, &cfg.Rollup, n.log, cfg.P2P, n, n.l2Source, n.runCfg, n.metrics, cfg.Sync.SyncMode == sync.ELSync)
// TODO(protocol-quest/97): Use EL Sync instead of CL Alt sync for fetching missing blocks in the payload queue.
p2pNode, err := p2p.NewNodeP2P(n.resourcesCtx, &cfg.Rollup, n.log, cfg.P2P, n, n.l2Source, n.runCfg, n.metrics, false)
if err != nil || p2pNode == nil {
return err
}
Expand Down
95 changes: 74 additions & 21 deletions op-node/rollup/derive/engine_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,33 @@ import (
"context"
"errors"
"fmt"
"time"

"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/async"
"github.com/ethereum-optimism/optimism/op-node/rollup/sync"
"github.com/ethereum-optimism/optimism/op-service/clock"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
)

type syncStatusEnum int

const (
syncStatusCL syncStatusEnum = iota
// We transition between the 4 EL states linearly. We spend the majority of the time in the second & fourth.
// We only want to EL sync if there is no finalized block & once we finish EL sync we need to mark the last block
// as finalized so we can switch to consolidation
// TODO(protocol-quest/91): We can restart EL sync & still consolidate if there finalized blocks on the execution client if the
// execution client is running in archive mode. In some cases we may want to switch back from CL to EL sync, but that is complicated.
syncStatusWillStartEL // First if we are directed to EL sync, check that nothing has been finalized yet
syncStatusStartedEL // Perform our EL sync
syncStatusFinishedELButNotFinalized // EL sync is done, but we need to mark the final sync block as finalized
syncStatusFinishedEL // EL sync is done & we should be performing consolidation
)

var errNoFCUNeeded = errors.New("no FCU call was needed")

var _ EngineControl = (*EngineController)(nil)
Expand All @@ -22,14 +40,18 @@ type ExecEngine interface {
GetPayload(ctx context.Context, payloadId eth.PayloadID) (*eth.ExecutionPayloadEnvelope, error)
ForkchoiceUpdate(ctx context.Context, state *eth.ForkchoiceState, attr *eth.PayloadAttributes) (*eth.ForkchoiceUpdatedResult, error)
NewPayload(ctx context.Context, payload *eth.ExecutionPayload, parentBeaconBlockRoot *common.Hash) (*eth.PayloadStatusV1, error)
L2BlockRefByLabel(ctx context.Context, label eth.BlockLabel) (eth.L2BlockRef, error)
}

type EngineController struct {
engine ExecEngine // Underlying execution engine RPC
log log.Logger
metrics Metrics
syncMode sync.Mode
rollupCfg *rollup.Config
engine ExecEngine // Underlying execution engine RPC
log log.Logger
metrics Metrics
syncMode sync.Mode
syncStatus syncStatusEnum
rollupCfg *rollup.Config
elStart time.Time
clock clock.Clock

// Block Head State
unsafeHead eth.L2BlockRef
Expand All @@ -46,11 +68,19 @@ type EngineController struct {
}

func NewEngineController(engine ExecEngine, log log.Logger, metrics Metrics, rollupCfg *rollup.Config, syncMode sync.Mode) *EngineController {
syncStatus := syncStatusCL
if syncMode == sync.ELSync {
syncStatus = syncStatusWillStartEL
}

return &EngineController{
engine: engine,
log: log,
metrics: metrics,
rollupCfg: rollupCfg,
engine: engine,
log: log,
metrics: metrics,
rollupCfg: rollupCfg,
syncMode: syncMode,
syncStatus: syncStatus,
clock: clock.SystemClock,
}
}

Expand All @@ -77,7 +107,7 @@ func (e *EngineController) BuildingPayload() (eth.L2BlockRef, eth.PayloadID, boo
}

func (e *EngineController) IsEngineSyncing() bool {
return false
return e.syncStatus == syncStatusWillStartEL || e.syncStatus == syncStatusStartedEL || e.syncStatus == syncStatusFinishedELButNotFinalized
}

// Setters
Expand Down Expand Up @@ -209,6 +239,9 @@ func (e *EngineController) resetBuildingState() {
// It returns true if the status is acceptable.
func (e *EngineController) checkNewPayloadStatus(status eth.ExecutePayloadStatus) bool {
if e.syncMode == sync.ELSync {
if status == eth.ExecutionValid && e.syncStatus == syncStatusStartedEL {
e.syncStatus = syncStatusFinishedELButNotFinalized
}
// Allow SYNCING and ACCEPTED if engine EL sync is enabled
return status == eth.ExecutionValid || status == eth.ExecutionSyncing || status == eth.ExecutionAccepted
}
Expand All @@ -219,6 +252,9 @@ func (e *EngineController) checkNewPayloadStatus(status eth.ExecutePayloadStatus
// It returns true if the status is acceptable.
func (e *EngineController) checkForkchoiceUpdatedStatus(status eth.ExecutePayloadStatus) bool {
if e.syncMode == sync.ELSync {
if status == eth.ExecutionValid && e.syncStatus == syncStatusStartedEL {
e.syncStatus = syncStatusFinishedELButNotFinalized
}
// Allow SYNCING if engine P2P sync is enabled
return status == eth.ExecutionValid || status == eth.ExecutionSyncing
}
Expand Down Expand Up @@ -258,6 +294,22 @@ func (e *EngineController) TryUpdateEngine(ctx context.Context) error {
}

func (e *EngineController) InsertUnsafePayload(ctx context.Context, envelope *eth.ExecutionPayloadEnvelope, ref eth.L2BlockRef) error {
// Check if there is a finalized head once when doing EL sync. If so, transition to CL sync
if e.syncStatus == syncStatusWillStartEL {
b, err := e.engine.L2BlockRefByLabel(ctx, eth.Finalized)
if errors.Is(err, ethereum.NotFound) {
e.syncStatus = syncStatusStartedEL
e.log.Info("Starting EL sync")
e.elStart = e.clock.Now()
} else if err == nil {
e.syncStatus = syncStatusFinishedEL
e.log.Info("Skipping EL sync and going straight to CL sync because there is a finalized block", "id", b.ID())
return nil
} else {
return NewTemporaryError(fmt.Errorf("failed to fetch finalized head: %w", err))
}
}
// Insert the payload & then call FCU
status, err := e.engine.NewPayload(ctx, envelope.ExecutionPayload, envelope.ParentBeaconBlockRoot)
if err != nil {
return NewTemporaryError(fmt.Errorf("failed to update insert payload: %w", err))
Expand All @@ -274,6 +326,12 @@ func (e *EngineController) InsertUnsafePayload(ctx context.Context, envelope *et
SafeBlockHash: e.safeHead.Hash,
FinalizedBlockHash: e.finalizedHead.Hash,
}
if e.syncStatus == syncStatusFinishedELButNotFinalized {
fc.SafeBlockHash = envelope.ExecutionPayload.BlockHash
fc.FinalizedBlockHash = envelope.ExecutionPayload.BlockHash
e.SetSafeHead(ref)
e.SetFinalizedHead(ref)
}
fcRes, err := e.engine.ForkchoiceUpdate(ctx, &fc, nil)
if err != nil {
var inputErr eth.InputError
Expand All @@ -293,23 +351,18 @@ func (e *EngineController) InsertUnsafePayload(ctx context.Context, envelope *et
return NewTemporaryError(fmt.Errorf("cannot prepare unsafe chain for new payload: new - %v; parent: %v; err: %w",
payload.ID(), payload.ParentID(), eth.ForkchoiceUpdateErr(fcRes.PayloadStatus)))
}
e.unsafeHead = ref
e.SetUnsafeHead(ref)
e.needFCUCall = false

if e.syncStatus == syncStatusFinishedELButNotFinalized {
e.log.Info("Finished EL sync", "sync_duration", e.clock.Since(e.elStart))
e.syncStatus = syncStatusFinishedEL
}

return nil
}

// ResetBuildingState implements LocalEngineControl.
func (e *EngineController) ResetBuildingState() {
e.resetBuildingState()
}

// ForkchoiceUpdate implements LocalEngineControl.
func (e *EngineController) ForkchoiceUpdate(ctx context.Context, state *eth.ForkchoiceState, attr *eth.PayloadAttributes) (*eth.ForkchoiceUpdatedResult, error) {
return e.engine.ForkchoiceUpdate(ctx, state, attr)
}

// NewPayload implements LocalEngineControl.
func (e *EngineController) NewPayload(ctx context.Context, payload *eth.ExecutionPayload, parentBeaconBlockRoot *common.Hash) (*eth.PayloadStatusV1, error) {
return e.engine.NewPayload(ctx, payload, parentBeaconBlockRoot)
}
1 change: 1 addition & 0 deletions op-node/rollup/driver/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ func NewDriver(driverCfg *Config, cfg *rollup.Config, l2 L2Chain, l1 L1Chain, l1
sequencerActive: make(chan chan bool, 10),
sequencerNotifs: sequencerStateListener,
config: cfg,
syncCfg: syncCfg,
driverConfig: driverCfg,
driverCtx: driverCtx,
driverCancel: driverCancel,
Expand Down
Loading
Loading