diff --git a/.circleci/config.yml b/.circleci/config.yml index 0c582a18e9e8..d2fc56035922 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -203,6 +203,12 @@ jobs: - run: name: Copy FPAC allocs to .devnet-fpac command: cp -r .devnet/ .devnet-fault-proofs/ + - run: + name: Generate Plasma allocs + command: DEVNET_PLASMA="true" make devnet-allocs + - run: + name: Copy Plasma allocs to .devnet-plasma + command: cp -r .devnet/ .devnet-plasma/ - run: name: Generate non-FPAC allocs command: make devnet-allocs @@ -219,6 +225,8 @@ jobs: - ".devnet/addresses.json" - ".devnet-fault-proofs/allocs-l1.json" - ".devnet-fault-proofs/addresses.json" + - ".devnet-plasma/allocs-l1.json" + - ".devnet-plasma/addresses.json" - "packages/contracts-bedrock/deploy-config/devnetL1.json" - "packages/contracts-bedrock/deployments/devnetL1" @@ -896,6 +904,13 @@ jobs: - run: name: Set OP_E2E_USE_FPAC = true command: echo 'export OP_E2E_USE_FPAC=true' >> $BASH_ENV + - when: + condition: + equal: ['-plasma', <>] + steps: + - run: + name: Set OP_E2E_USE_PLASMA = true + command: echo 'export OP_E2E_USE_PLASMA=true' >> $BASH_ENV - check-changed: patterns: op-(.+),cannon,contracts-bedrock - run: @@ -1636,7 +1651,7 @@ workflows: name: op-e2e-action-tests<< matrix.fpac >> matrix: parameters: - fpac: ["", "-fault-proofs"] + fpac: ["", "-fault-proofs", "-plasma"] module: op-e2e target: test-actions parallelism: 1 diff --git a/bedrock-devnet/devnet/__init__.py b/bedrock-devnet/devnet/__init__.py index 4b06a1d76028..09766bdd9c37 100644 --- a/bedrock-devnet/devnet/__init__.py +++ b/bedrock-devnet/devnet/__init__.py @@ -29,6 +29,7 @@ # Global environment variables DEVNET_NO_BUILD = os.getenv('DEVNET_NO_BUILD') == "true" DEVNET_FPAC = os.getenv('DEVNET_FPAC') == "true" +DEVNET_PLASMA = os.getenv('DEVNET_PLASMA') == "true" class Bunch: def __init__(self, **kwds): @@ -130,6 +131,8 @@ def init_devnet_l1_deploy_config(paths, update_timestamp=False): if DEVNET_FPAC: deploy_config['useFaultProofs'] = True deploy_config['faultGameMaxDuration'] = 10 + if DEVNET_PLASMA: + deploy_config['usePlasma'] = True write_json(paths.devnet_config_path, deploy_config) def devnet_l1_genesis(paths): diff --git a/op-batcher/batcher/driver.go b/op-batcher/batcher/driver.go index a90912e9ade0..6b994ad708a2 100644 --- a/op-batcher/batcher/driver.go +++ b/op-batcher/batcher/driver.go @@ -393,13 +393,14 @@ func (l *BatchSubmitter) sendTransaction(ctx context.Context, txdata txData, que data := txdata.CallData() // if plasma DA is enabled we post the txdata to the DA Provider and replace it with the commitment. if l.Config.UsePlasma { - data, err = l.PlasmaDA.SetInput(ctx, data) + comm, err := l.PlasmaDA.SetInput(ctx, data) if err != nil { l.Log.Error("Failed to post input to Plasma DA", "error", err) // requeue frame if we fail to post to the DA Provider so it can be retried l.recordFailedTx(txdata, err) return nil } + data = comm.Encode() } candidate = l.calldataTxCandidate(data) } diff --git a/op-batcher/batcher/service.go b/op-batcher/batcher/service.go index 4c7cab5be388..cf09f5e18c83 100644 --- a/op-batcher/batcher/service.go +++ b/op-batcher/batcher/service.go @@ -209,6 +209,11 @@ func (bs *BatcherService) initChannelConfig(cfg *CLIConfig) error { default: return fmt.Errorf("unknown data availability type: %v", cfg.DataAvailabilityType) } + + if bs.UsePlasma && bs.ChannelConfig.MaxFrameSize > plasma.MaxInputSize { + return fmt.Errorf("max frame size %d exceeds plasma max input size %d", bs.ChannelConfig.MaxFrameSize, plasma.MaxInputSize) + } + bs.ChannelConfig.MaxFrameSize-- // subtract 1 byte for version if bs.ChannelConfig.CompressorConfig.Kind == compressor.ShadowKind { diff --git a/op-chain-ops/genesis/config.go b/op-chain-ops/genesis/config.go index 20f7a7a41871..badf461d21c2 100644 --- a/op-chain-ops/genesis/config.go +++ b/op-chain-ops/genesis/config.go @@ -243,15 +243,18 @@ type DeployConfig struct { // UsePlasma is a flag that indicates if the system is using op-plasma UsePlasma bool `json:"usePlasma"` - // DaChallengeWindow represents the block interval during which the availability of a data commitment can be challenged. - DaChallengeWindow uint64 `json:"daChallengeWindow"` - // DaResolveWindow represents the block interval during which a data availability challenge can be resolved. - DaResolveWindow uint64 `json:"daResolveWindow"` - // DaBondSize represents the required bond size to initiate a data availability challenge. - DaBondSize uint64 `json:"daBondSize"` - // DaResolverRefundPercentage represents the percentage of the resolving cost to be refunded to the resolver + // DAChallengeWindow represents the block interval during which the availability of a data commitment can be challenged. + DAChallengeWindow uint64 `json:"daChallengeWindow"` + // DAResolveWindow represents the block interval during which a data availability challenge can be resolved. + DAResolveWindow uint64 `json:"daResolveWindow"` + // DABondSize represents the required bond size to initiate a data availability challenge. + DABondSize uint64 `json:"daBondSize"` + // DAResolverRefundPercentage represents the percentage of the resolving cost to be refunded to the resolver // such as 100 means 100% refund. - DaResolverRefundPercentage uint64 `json:"daResolverRefundPercentage"` + DAResolverRefundPercentage uint64 `json:"daResolverRefundPercentage"` + + // DAChallengeProxy represents the L1 address of the DataAvailabilityChallenge contract. + DAChallengeProxy common.Address `json:"daChallengeProxy"` // When Cancun activates. Relative to L1 genesis. L1CancunTimeOffset *hexutil.Uint64 `json:"l1CancunTimeOffset,omitempty"` @@ -402,6 +405,17 @@ func (d *DeployConfig) Check() error { if d.DisputeGameFinalityDelaySeconds == 0 { log.Warn("DisputeGameFinalityDelaySeconds is 0") } + if d.UsePlasma { + if d.DAChallengeWindow == 0 { + return fmt.Errorf("%w: DAChallengeWindow cannot be 0 when using plasma mode", ErrInvalidDeployConfig) + } + if d.DAResolveWindow == 0 { + return fmt.Errorf("%w: DAResolveWindow cannot be 0 when using plasma mode", ErrInvalidDeployConfig) + } + if d.DAChallengeProxy == (common.Address{}) { + return fmt.Errorf("%w: DAChallengeContract cannot be empty when using plasma mode", ErrInvalidDeployConfig) + } + } // checkFork checks that fork A is before or at the same time as fork B checkFork := func(a, b *hexutil.Uint64, aName, bName string) error { if a == nil && b == nil { @@ -463,6 +477,7 @@ func (d *DeployConfig) SetDeployments(deployments *L1Deployments) { d.L1ERC721BridgeProxy = deployments.L1ERC721BridgeProxy d.SystemConfigProxy = deployments.SystemConfigProxy d.OptimismPortalProxy = deployments.OptimismPortalProxy + d.DAChallengeProxy = deployments.DataAvailabilityChallengeProxy } func (d *DeployConfig) GovernanceEnabled() bool { @@ -577,6 +592,10 @@ func (d *DeployConfig) RollupConfig(l1StartBlock *types.Block, l2GenesisBlockHas EcotoneTime: d.EcotoneTime(l1StartBlock.Time()), FjordTime: d.FjordTime(l1StartBlock.Time()), InteropTime: d.InteropTime(l1StartBlock.Time()), + UsePlasma: d.UsePlasma, + DAChallengeAddress: d.DAChallengeProxy, + DAChallengeWindow: d.DAChallengeWindow, + DAResolveWindow: d.DAResolveWindow, }, nil } diff --git a/op-chain-ops/genesis/testdata/test-deploy-config-full.json b/op-chain-ops/genesis/testdata/test-deploy-config-full.json index 39cff4b3caea..65390c661acc 100644 --- a/op-chain-ops/genesis/testdata/test-deploy-config-full.json +++ b/op-chain-ops/genesis/testdata/test-deploy-config-full.json @@ -83,6 +83,7 @@ "useFaultProofs": false, "usePlasma": false, "daBondSize": 0, + "daChallengeProxy": "0x0000000000000000000000000000000000000000", "daChallengeWindow": 0, "daResolveWindow": 0, "daResolverRefundPercentage": 0 diff --git a/op-e2e/actions/l2_batcher.go b/op-e2e/actions/l2_batcher.go index 7a0cb40c6aae..8427d33be862 100644 --- a/op-e2e/actions/l2_batcher.go +++ b/op-e2e/actions/l2_batcher.go @@ -24,6 +24,7 @@ import ( "github.com/ethereum-optimism/optimism/op-e2e/e2eutils" "github.com/ethereum-optimism/optimism/op-node/rollup" "github.com/ethereum-optimism/optimism/op-node/rollup/derive" + plasma "github.com/ethereum-optimism/optimism/op-plasma" "github.com/ethereum-optimism/optimism/op-service/eth" "github.com/ethereum-optimism/optimism/op-service/txmgr" ) @@ -42,6 +43,10 @@ type L1TxAPI interface { SendTransaction(ctx context.Context, tx *types.Transaction) error } +type PlasmaInputSetter interface { + SetInput(ctx context.Context, img []byte) (plasma.Keccak256Commitment, error) +} + type BatcherCfg struct { // Limit the size of txs MinL1TxSize uint64 @@ -53,8 +58,10 @@ type BatcherCfg struct { ForceSubmitSingularBatch bool ForceSubmitSpanBatch bool + UsePlasma bool DataAvailabilityType batcherFlags.DataAvailabilityType + PlasmaDA PlasmaInputSetter } func DefaultBatcherCfg(dp *e2eutils.DeployParams) *BatcherCfg { @@ -66,6 +73,17 @@ func DefaultBatcherCfg(dp *e2eutils.DeployParams) *BatcherCfg { } } +func PlasmaBatcherCfg(dp *e2eutils.DeployParams, plasmaDa PlasmaInputSetter) *BatcherCfg { + return &BatcherCfg{ + MinL1TxSize: 0, + MaxL1TxSize: 128_000, + BatcherKey: dp.Secrets.Batcher, + DataAvailabilityType: batcherFlags.CalldataType, + PlasmaDA: plasmaDa, + UsePlasma: true, + } +} + type L2BlockRefs interface { L2BlockRefByHash(ctx context.Context, hash common.Hash) (eth.L2BlockRef, error) } @@ -231,6 +249,13 @@ func (s *L2Batcher) ActL2BatchSubmit(t Testing, txOpts ...func(tx *types.Dynamic t.Fatalf("failed to output channel data to frame: %v", err) } + payload := data.Bytes() + if s.l2BatcherCfg.UsePlasma { + comm, err := s.l2BatcherCfg.PlasmaDA.SetInput(t.Ctx(), payload) + require.NoError(t, err, "failed to set input for plasma") + payload = comm.Encode() + } + nonce, err := s.l1.PendingNonceAt(t.Ctx(), s.batcherAddr) require.NoError(t, err, "need batcher nonce") @@ -247,7 +272,7 @@ func (s *L2Batcher) ActL2BatchSubmit(t Testing, txOpts ...func(tx *types.Dynamic To: &s.rollupCfg.BatchInboxAddress, GasTipCap: gasTipCap, GasFeeCap: gasFeeCap, - Data: data.Bytes(), + Data: payload, } for _, opt := range txOpts { opt(rawTx) @@ -259,7 +284,7 @@ func (s *L2Batcher) ActL2BatchSubmit(t Testing, txOpts ...func(tx *types.Dynamic txData = rawTx } else if s.l2BatcherCfg.DataAvailabilityType == batcherFlags.BlobsType { var b eth.Blob - require.NoError(t, b.FromData(data.Bytes()), "must turn data into blob") + require.NoError(t, b.FromData(payload), "must turn data into blob") sidecar, blobHashes, err := txmgr.MakeSidecar([]*eth.Blob{&b}) require.NoError(t, err) require.NotNil(t, pendingHeader.ExcessBlobGas, "need L1 header with 4844 properties") diff --git a/op-e2e/actions/l2_sequencer.go b/op-e2e/actions/l2_sequencer.go index c55dcfc45ba5..c1c93cf74dec 100644 --- a/op-e2e/actions/l2_sequencer.go +++ b/op-e2e/actions/l2_sequencer.go @@ -44,8 +44,8 @@ type L2Sequencer struct { } func NewL2Sequencer(t Testing, log log.Logger, l1 derive.L1Fetcher, blobSrc derive.L1BlobsFetcher, - eng L2API, cfg *rollup.Config, seqConfDepth uint64) *L2Sequencer { - ver := NewL2Verifier(t, log, l1, blobSrc, eng, cfg, &sync.Config{}, safedb.Disabled) + plasmaSrc derive.PlasmaInputFetcher, eng L2API, cfg *rollup.Config, seqConfDepth uint64) *L2Sequencer { + ver := NewL2Verifier(t, log, l1, blobSrc, plasmaSrc, eng, cfg, &sync.Config{}, safedb.Disabled) attrBuilder := derive.NewFetchingAttributesBuilder(cfg, l1, eng) seqConfDepthL1 := driver.NewConfDepth(seqConfDepth, ver.l1State.L1Head, l1) l1OriginSelector := &MockL1OriginSelector{ diff --git a/op-e2e/actions/l2_sequencer_test.go b/op-e2e/actions/l2_sequencer_test.go index 9c1a12db738f..1bcf4c4d0cff 100644 --- a/op-e2e/actions/l2_sequencer_test.go +++ b/op-e2e/actions/l2_sequencer_test.go @@ -16,6 +16,7 @@ import ( "github.com/stretchr/testify/require" "github.com/ethereum-optimism/optimism/op-e2e/e2eutils" + plasma "github.com/ethereum-optimism/optimism/op-plasma" "github.com/ethereum-optimism/optimism/op-service/sources" "github.com/ethereum-optimism/optimism/op-service/testlog" ) @@ -47,7 +48,7 @@ func setupSequencerTest(t Testing, sd *e2eutils.SetupData, log log.Logger) (*L1M l2Cl, err := sources.NewEngineClient(engine.RPCClient(), log, nil, sources.EngineClientDefaultConfig(sd.RollupCfg)) require.NoError(t, err) - sequencer := NewL2Sequencer(t, log, l1F, miner.BlobStore(), l2Cl, sd.RollupCfg, 0) + sequencer := NewL2Sequencer(t, log, l1F, miner.BlobStore(), plasma.Disabled, l2Cl, sd.RollupCfg, 0) return miner, engine, sequencer } diff --git a/op-e2e/actions/l2_verifier.go b/op-e2e/actions/l2_verifier.go index e98c0758cd6a..305f161e8d06 100644 --- a/op-e2e/actions/l2_verifier.go +++ b/op-e2e/actions/l2_verifier.go @@ -63,10 +63,10 @@ type safeDB interface { node.SafeDBReader } -func NewL2Verifier(t Testing, log log.Logger, l1 derive.L1Fetcher, blobsSrc derive.L1BlobsFetcher, eng L2API, cfg *rollup.Config, syncCfg *sync.Config, safeHeadListener safeDB) *L2Verifier { +func NewL2Verifier(t Testing, log log.Logger, l1 derive.L1Fetcher, blobsSrc derive.L1BlobsFetcher, plasmaSrc derive.PlasmaInputFetcher, eng L2API, cfg *rollup.Config, syncCfg *sync.Config, safeHeadListener safeDB) *L2Verifier { metrics := &testutils.TestDerivationMetrics{} engine := derive.NewEngineController(eng, log, metrics, cfg, syncCfg.SyncMode) - pipeline := derive.NewDerivationPipeline(log, cfg, l1, blobsSrc, nil, eng, engine, metrics, syncCfg, safeHeadListener) + pipeline := derive.NewDerivationPipeline(log, cfg, l1, blobsSrc, plasmaSrc, eng, engine, metrics, syncCfg, safeHeadListener) pipeline.Reset() rollupNode := &L2Verifier{ diff --git a/op-e2e/actions/l2_verifier_test.go b/op-e2e/actions/l2_verifier_test.go index 4819d4387cf8..6fe70a3e6fb4 100644 --- a/op-e2e/actions/l2_verifier_test.go +++ b/op-e2e/actions/l2_verifier_test.go @@ -4,6 +4,7 @@ import ( "testing" "github.com/ethereum-optimism/optimism/op-node/node/safedb" + plasma "github.com/ethereum-optimism/optimism/op-plasma" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/log" "github.com/stretchr/testify/require" @@ -40,7 +41,7 @@ func setupVerifier(t Testing, sd *e2eutils.SetupData, log log.Logger, l1F derive jwtPath := e2eutils.WriteDefaultJWT(t) engine := NewL2Engine(t, log, sd.L2Cfg, sd.RollupCfg.Genesis.L1, jwtPath, EngineWithP2P()) engCl := engine.EngineClient(t, sd.RollupCfg) - verifier := NewL2Verifier(t, log, l1F, blobSrc, engCl, sd.RollupCfg, syncCfg, cfg.safeHeadListener) + verifier := NewL2Verifier(t, log, l1F, blobSrc, plasma.Disabled, engCl, sd.RollupCfg, syncCfg, cfg.safeHeadListener) return engine, verifier } diff --git a/op-e2e/actions/plasma_test.go b/op-e2e/actions/plasma_test.go new file mode 100644 index 000000000000..0c1426b48a58 --- /dev/null +++ b/op-e2e/actions/plasma_test.go @@ -0,0 +1,440 @@ +package actions + +import ( + "math/big" + "math/rand" + "testing" + + "github.com/ethereum-optimism/optimism/op-bindings/bindings" + "github.com/ethereum-optimism/optimism/op-e2e/e2eutils" + "github.com/ethereum-optimism/optimism/op-node/node/safedb" + "github.com/ethereum-optimism/optimism/op-node/rollup/sync" + plasma "github.com/ethereum-optimism/optimism/op-plasma" + "github.com/ethereum-optimism/optimism/op-service/eth" + "github.com/ethereum-optimism/optimism/op-service/sources" + "github.com/ethereum-optimism/optimism/op-service/testlog" + "github.com/ethereum/go-ethereum/accounts/abi/bind" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/log" + "github.com/stretchr/testify/require" +) + +// Devnet allocs should have plasma mode enabled for these tests to pass + +// L2PlasmaDA is a test harness for manipulating plasma DA state. +type L2PlasmaDA struct { + log log.Logger + storage *plasma.DAErrFaker + daMgr *plasma.DA + plasmaCfg plasma.Config + contract *bindings.DataAvailabilityChallenge + batcher *L2Batcher + sequencer *L2Sequencer + engine *L2Engine + engCl *sources.EngineClient + sd *e2eutils.SetupData + dp *e2eutils.DeployParams + miner *L1Miner + alice *CrossLayerUser + lastComm []byte + lastCommBn uint64 +} + +type PlasmaParam func(p *e2eutils.TestParams) + +func NewL2PlasmaDA(t Testing, params ...PlasmaParam) *L2PlasmaDA { + p := &e2eutils.TestParams{ + MaxSequencerDrift: 2, + SequencerWindowSize: 4, + ChannelTimeout: 4, + L1BlockTime: 3, + UsePlasma: true, + } + for _, apply := range params { + apply(p) + } + log := testlog.Logger(t, log.LvlDebug) + + dp := e2eutils.MakeDeployParams(t, p) + sd := e2eutils.Setup(t, dp, defaultAlloc) + + require.True(t, sd.RollupCfg.UsePlasma) + + miner := NewL1Miner(t, log, sd.L1Cfg) + l1Client := miner.EthClient() + + jwtPath := e2eutils.WriteDefaultJWT(t) + engine := NewL2Engine(t, log, sd.L2Cfg, sd.RollupCfg.Genesis.L1, jwtPath) + engCl := engine.EngineClient(t, sd.RollupCfg) + + storage := &plasma.DAErrFaker{Client: plasma.NewMockDAClient(log)} + + l1F, err := sources.NewL1Client(miner.RPCClient(), log, nil, sources.L1ClientDefaultConfig(sd.RollupCfg, false, sources.RPCKindBasic)) + require.NoError(t, err) + + plasmaCfg, err := sd.RollupCfg.PlasmaConfig() + require.NoError(t, err) + + daMgr := plasma.NewPlasmaDAWithStorage(log, plasmaCfg, storage, &plasma.NoopMetrics{}) + + sequencer := NewL2Sequencer(t, log, l1F, nil, daMgr, engCl, sd.RollupCfg, 0) + miner.ActL1SetFeeRecipient(common.Address{'A'}) + sequencer.ActL2PipelineFull(t) + + batcher := NewL2Batcher(log, sd.RollupCfg, PlasmaBatcherCfg(dp, storage), sequencer.RollupClient(), l1Client, engine.EthClient(), engCl) + + addresses := e2eutils.CollectAddresses(sd, dp) + cl := engine.EthClient() + l2UserEnv := &BasicUserEnv[*L2Bindings]{ + EthCl: cl, + Signer: types.LatestSigner(sd.L2Cfg.Config), + AddressCorpora: addresses, + Bindings: NewL2Bindings(t, cl, engine.GethClient()), + } + alice := NewCrossLayerUser(log, dp.Secrets.Alice, rand.New(rand.NewSource(0xa57b))) + alice.L2.SetUserEnv(l2UserEnv) + + contract, err := bindings.NewDataAvailabilityChallenge(sd.RollupCfg.DAChallengeAddress, l1Client) + require.NoError(t, err) + + challengeWindow, err := contract.ChallengeWindow(nil) + require.NoError(t, err) + require.Equal(t, plasmaCfg.ChallengeWindow, challengeWindow.Uint64()) + + resolveWindow, err := contract.ResolveWindow(nil) + require.NoError(t, err) + require.Equal(t, plasmaCfg.ResolveWindow, resolveWindow.Uint64()) + + return &L2PlasmaDA{ + log: log, + storage: storage, + daMgr: daMgr, + plasmaCfg: plasmaCfg, + contract: contract, + batcher: batcher, + sequencer: sequencer, + engine: engine, + engCl: engCl, + sd: sd, + dp: dp, + miner: miner, + alice: alice, + } +} + +func (a *L2PlasmaDA) StorageClient() *plasma.DAErrFaker { + return a.storage +} + +func (a *L2PlasmaDA) NewVerifier(t Testing) *L2Verifier { + jwtPath := e2eutils.WriteDefaultJWT(t) + engine := NewL2Engine(t, a.log, a.sd.L2Cfg, a.sd.RollupCfg.Genesis.L1, jwtPath) + engCl := engine.EngineClient(t, a.sd.RollupCfg) + l1F, err := sources.NewL1Client(a.miner.RPCClient(), a.log, nil, sources.L1ClientDefaultConfig(a.sd.RollupCfg, false, sources.RPCKindBasic)) + require.NoError(t, err) + + daMgr := plasma.NewPlasmaDAWithStorage(a.log, a.plasmaCfg, a.storage, &plasma.NoopMetrics{}) + + verifier := NewL2Verifier(t, a.log, l1F, nil, daMgr, engCl, a.sd.RollupCfg, &sync.Config{}, safedb.Disabled) + + return verifier +} + +func (a *L2PlasmaDA) ActSequencerIncludeTx(t Testing) { + a.alice.L2.ActResetTxOpts(t) + a.alice.L2.ActSetTxToAddr(&a.dp.Addresses.Bob)(t) + a.alice.L2.ActMakeTx(t) + + a.sequencer.ActL2PipelineFull(t) + + a.sequencer.ActL2StartBlock(t) + a.engine.ActL2IncludeTx(a.alice.Address())(t) + a.sequencer.ActL2EndBlock(t) +} + +func (a *L2PlasmaDA) ActNewL2Tx(t Testing) { + a.ActSequencerIncludeTx(t) + + a.batcher.ActL2BatchBuffer(t) + a.batcher.ActL2ChannelClose(t) + a.batcher.ActL2BatchSubmit(t, func(tx *types.DynamicFeeTx) { + a.lastComm = tx.Data + }) + + a.miner.ActL1StartBlock(3)(t) + a.miner.ActL1IncludeTx(a.dp.Addresses.Batcher)(t) + a.miner.ActL1EndBlock(t) + + a.lastCommBn = a.miner.l1Chain.CurrentBlock().Number.Uint64() +} + +func (a *L2PlasmaDA) ActDeleteLastInput(t Testing) { + require.NoError(t, a.storage.Client.DeleteData(a.lastComm)) +} + +func (a *L2PlasmaDA) ActChallengeLastInput(t Testing) { + a.ActChallengeInput(t, a.lastComm, a.lastCommBn) + + a.log.Info("challenged last input", "block", a.lastCommBn) +} + +func (a *L2PlasmaDA) ActChallengeInput(t Testing, comm []byte, bn uint64) { + bondValue, err := a.contract.BondSize(&bind.CallOpts{}) + require.NoError(t, err) + + txOpts, err := bind.NewKeyedTransactorWithChainID(a.dp.Secrets.Alice, a.sd.L1Cfg.Config.ChainID) + require.NoError(t, err) + + txOpts.Value = bondValue + _, err = a.contract.Deposit(txOpts) + require.NoError(t, err) + + a.miner.ActL1StartBlock(3)(t) + a.miner.ActL1IncludeTx(a.alice.Address())(t) + a.miner.ActL1EndBlock(t) + + txOpts, err = bind.NewKeyedTransactorWithChainID(a.dp.Secrets.Alice, a.sd.L1Cfg.Config.ChainID) + require.NoError(t, err) + + _, err = a.contract.Challenge(txOpts, big.NewInt(int64(bn)), comm) + require.NoError(t, err) + + a.miner.ActL1StartBlock(3)(t) + a.miner.ActL1IncludeTx(a.alice.Address())(t) + a.miner.ActL1EndBlock(t) +} + +func (a *L2PlasmaDA) ActExpireLastInput(t Testing) { + reorgWindow := a.plasmaCfg.ResolveWindow + a.plasmaCfg.ChallengeWindow + for a.miner.l1Chain.CurrentBlock().Number.Uint64() <= a.lastCommBn+reorgWindow { + a.miner.ActL1StartBlock(3)(t) + a.miner.ActL1EndBlock(t) + } +} + +func (a *L2PlasmaDA) ActResolveLastChallenge(t Testing) { + // remove commitment byte prefix + input, err := a.storage.GetInput(t.Ctx(), a.lastComm[1:]) + require.NoError(t, err) + + txOpts, err := bind.NewKeyedTransactorWithChainID(a.dp.Secrets.Alice, a.sd.L1Cfg.Config.ChainID) + require.NoError(t, err) + + _, err = a.contract.Resolve(txOpts, big.NewInt(int64(a.lastCommBn)), a.lastComm, input) + require.NoError(t, err) + + a.miner.ActL1StartBlock(3)(t) + a.miner.ActL1IncludeTx(a.alice.Address())(t) + a.miner.ActL1EndBlock(t) +} + +func (a *L2PlasmaDA) ActL1Blocks(t Testing, n uint64) { + for i := uint64(0); i < n; i++ { + a.miner.ActL1StartBlock(3)(t) + a.miner.ActL1EndBlock(t) + } +} + +func (a *L2PlasmaDA) GetLastTxBlock(t Testing) *types.Block { + rcpt, err := a.engine.EthClient().TransactionReceipt(t.Ctx(), a.alice.L2.lastTxHash) + require.NoError(t, err) + blk, err := a.engine.EthClient().BlockByHash(t.Ctx(), rcpt.BlockHash) + require.NoError(t, err) + return blk +} + +func (a *L2PlasmaDA) ActL1Finalized(t Testing) { + latest := a.miner.l1Chain.CurrentBlock().Number.Uint64() + a.miner.ActL1Safe(t, latest) + a.miner.ActL1Finalize(t, latest) + a.sequencer.ActL1FinalizedSignal(t) +} + +// Commitment is challenged but never resolved, chain reorgs when challenge window expires. +func TestPlasma_ChallengeExpired(gt *testing.T) { + if !e2eutils.UsePlasma() { + gt.Skip("Plasma is not enabled") + } + + t := NewDefaultTesting(gt) + harness := NewL2PlasmaDA(t) + + // generate enough initial l1 blocks to have a finalized head. + harness.ActL1Blocks(t, 5) + + // Include a new l2 transaction, submitting an input commitment to the l1. + harness.ActNewL2Tx(t) + + // Challenge the input commitment on the l1 challenge contract. + harness.ActChallengeLastInput(t) + + blk := harness.GetLastTxBlock(t) + + // catch up the sequencer derivation pipeline with the new l1 blocks. + harness.sequencer.ActL2PipelineFull(t) + + // create enough l1 blocks to expire the resolve window. + harness.ActExpireLastInput(t) + + // catch up the sequencer derivation pipeline with the new l1 blocks. + harness.sequencer.ActL2PipelineFull(t) + + // the L1 finalized signal should trigger plasma to finalize the engine queue. + harness.ActL1Finalized(t) + + // move one more block for engine controller to update. + harness.ActL1Blocks(t, 1) + harness.sequencer.ActL2PipelineFull(t) + + // make sure that the finalized head was correctly updated on the engine. + l2Finalized, err := harness.engCl.L2BlockRefByLabel(t.Ctx(), eth.Finalized) + require.NoError(t, err) + require.Equal(t, uint64(8), l2Finalized.Number) + + newBlk, err := harness.engine.EthClient().BlockByNumber(t.Ctx(), blk.Number()) + require.NoError(t, err) + + // reorg happened even though data was available + require.NotEqual(t, blk.Hash(), newBlk.Hash()) + + // now delete the data from the storage service so it is not available at all + // to the verifier derivation pipeline. + harness.ActDeleteLastInput(t) + + syncStatus := harness.sequencer.SyncStatus() + + // verifier is able to sync with expired missing data + verifier := harness.NewVerifier(t) + verifier.ActL2PipelineFull(t) + verifier.ActL1FinalizedSignal(t) + + verifSyncStatus := verifier.SyncStatus() + + require.Equal(t, syncStatus.FinalizedL2, verifSyncStatus.FinalizedL2) +} + +// Commitment is challenged after sequencer derived the chain but data disappears. A verifier +// derivation pipeline stalls until the challenge is resolved and then resumes with data from the contract. +func TestPlasma_ChallengeResolved(gt *testing.T) { + if !e2eutils.UsePlasma() { + gt.Skip("Plasma is not enabled") + } + + t := NewDefaultTesting(gt) + harness := NewL2PlasmaDA(t) + + // include a new l2 transaction, submitting an input commitment to the l1. + harness.ActNewL2Tx(t) + + // generate 3 l1 blocks. + harness.ActL1Blocks(t, 3) + + // challenge the input commitment for that l2 transaction on the l1 challenge contract. + harness.ActChallengeLastInput(t) + + // catch up sequencer derivation pipeline. + // this syncs the latest event within the AltDA manager. + harness.sequencer.ActL2PipelineFull(t) + + // resolve the challenge on the l1 challenge contract. + harness.ActResolveLastChallenge(t) + + // catch up the sequencer derivation pipeline with the new l1 blocks. + // this syncs the resolved status and input data within the AltDA manager. + harness.sequencer.ActL2PipelineFull(t) + + // finalize l1 + harness.ActL1Finalized(t) + + // delete the data from the storage service so it is not available at all + // to the verifier derivation pipeline. + harness.ActDeleteLastInput(t) + + syncStatus := harness.sequencer.SyncStatus() + + // new verifier is able to sync and resolve the input from calldata + verifier := harness.NewVerifier(t) + verifier.ActL2PipelineFull(t) + verifier.ActL1FinalizedSignal(t) + + verifSyncStatus := verifier.SyncStatus() + + require.Equal(t, syncStatus.SafeL2, verifSyncStatus.SafeL2) +} + +// DA storage service goes offline while sequencer keeps making blocks. When storage comes back online, it should be able to catch up. +func TestPlasma_StorageError(gt *testing.T) { + if !e2eutils.UsePlasma() { + gt.Skip("Plasma is not enabled") + } + + t := NewDefaultTesting(gt) + harness := NewL2PlasmaDA(t) + + // include a new l2 transaction, submitting an input commitment to the l1. + harness.ActNewL2Tx(t) + + txBlk := harness.GetLastTxBlock(t) + + // mock a storage client error when trying to get the pre-image. + // this simulates the storage service going offline for example. + harness.storage.ActGetPreImageFail() + + // try to derive the l2 chain from the submitted inputs commitments. + // the storage call will fail the first time then succeed. + harness.sequencer.ActL2PipelineFull(t) + + // sequencer derivation was able to sync to latest l1 origin + syncStatus := harness.sequencer.SyncStatus() + require.Equal(t, uint64(1), syncStatus.SafeL2.Number) + require.Equal(t, txBlk.Hash(), syncStatus.SafeL2.Hash) +} + +// L1 chain reorgs a resolved challenge so it expires instead causing +// the l2 chain to reorg as well. +func TestPlasma_ChallengeReorg(gt *testing.T) { + if !e2eutils.UsePlasma() { + gt.Skip("Plasma is not enabled") + } + + t := NewDefaultTesting(gt) + harness := NewL2PlasmaDA(t) + + // New L2 tx added to a batch and committed to L1 + harness.ActNewL2Tx(t) + + // add a buffer of L1 blocks + harness.ActL1Blocks(t, 3) + + // challenge the input commitment + harness.ActChallengeLastInput(t) + + // keep track of the block where the L2 tx was included + blk := harness.GetLastTxBlock(t) + + // progress derivation pipeline + harness.sequencer.ActL2PipelineFull(t) + + // resolve the challenge so pipeline can progress + harness.ActResolveLastChallenge(t) + + // derivation marks the challenge as resolve, chain is not impacted + harness.sequencer.ActL2PipelineFull(t) + + // Rewind the L1, essentially reorging the challenge resolution + harness.miner.ActL1RewindToParent(t) + + // Now the L1 chain advances without the challenge resolution + // so the challenge is expired. + harness.ActExpireLastInput(t) + + // derivation pipeline reorgs the commitment out of the chain + harness.sequencer.ActL2PipelineFull(t) + + newBlk, err := harness.engine.EthClient().BlockByNumber(t.Ctx(), blk.Number()) + require.NoError(t, err) + + // confirm the reorg did happen + require.NotEqual(t, blk.Hash(), newBlk.Hash()) +} diff --git a/op-e2e/actions/reorg_test.go b/op-e2e/actions/reorg_test.go index 262ca842c1d3..1b4edbaf9cfe 100644 --- a/op-e2e/actions/reorg_test.go +++ b/op-e2e/actions/reorg_test.go @@ -16,6 +16,7 @@ import ( "github.com/ethereum-optimism/optimism/op-e2e/e2eutils" "github.com/ethereum-optimism/optimism/op-node/rollup/sync" + plasma "github.com/ethereum-optimism/optimism/op-plasma" "github.com/ethereum-optimism/optimism/op-service/client" "github.com/ethereum-optimism/optimism/op-service/eth" "github.com/ethereum-optimism/optimism/op-service/sources" @@ -617,7 +618,7 @@ func RestartOpGeth(gt *testing.T, deltaTimeOffset *hexutil.Uint64) { engRpc := &rpcWrapper{seqEng.RPCClient()} l2Cl, err := sources.NewEngineClient(engRpc, log, nil, sources.EngineClientDefaultConfig(sd.RollupCfg)) require.NoError(t, err) - sequencer := NewL2Sequencer(t, log, l1F, miner.BlobStore(), l2Cl, sd.RollupCfg, 0) + sequencer := NewL2Sequencer(t, log, l1F, miner.BlobStore(), plasma.Disabled, l2Cl, sd.RollupCfg, 0) batcher := NewL2Batcher(log, sd.RollupCfg, DefaultBatcherCfg(dp), sequencer.RollupClient(), miner.EthClient(), seqEng.EthClient(), seqEng.EngineClient(t, sd.RollupCfg)) @@ -705,7 +706,7 @@ func ConflictingL2Blocks(gt *testing.T, deltaTimeOffset *hexutil.Uint64) { require.NoError(t, err) l1F, err := sources.NewL1Client(miner.RPCClient(), log, nil, sources.L1ClientDefaultConfig(sd.RollupCfg, false, sources.RPCKindStandard)) require.NoError(t, err) - altSequencer := NewL2Sequencer(t, log, l1F, miner.BlobStore(), altSeqEngCl, sd.RollupCfg, 0) + altSequencer := NewL2Sequencer(t, log, l1F, miner.BlobStore(), plasma.Disabled, altSeqEngCl, sd.RollupCfg, 0) altBatcher := NewL2Batcher(log, sd.RollupCfg, DefaultBatcherCfg(dp), altSequencer.RollupClient(), miner.EthClient(), altSeqEng.EthClient(), altSeqEng.EngineClient(t, sd.RollupCfg)) diff --git a/op-e2e/e2eutils/setup.go b/op-e2e/e2eutils/setup.go index b7e02ddd1feb..a417060a7399 100644 --- a/op-e2e/e2eutils/setup.go +++ b/op-e2e/e2eutils/setup.go @@ -44,6 +44,7 @@ type TestParams struct { SequencerWindowSize uint64 ChannelTimeout uint64 L1BlockTime uint64 + UsePlasma bool } func MakeDeployParams(t require.TestingT, tp *TestParams) *DeployParams { @@ -57,6 +58,7 @@ func MakeDeployParams(t require.TestingT, tp *TestParams) *DeployParams { deployConfig.SequencerWindowSize = tp.SequencerWindowSize deployConfig.ChannelTimeout = tp.ChannelTimeout deployConfig.L1BlockTime = tp.L1BlockTime + deployConfig.UsePlasma = tp.UsePlasma ApplyDeployConfigForks(deployConfig) require.NoError(t, deployConfig.Check()) @@ -161,6 +163,10 @@ func Setup(t require.TestingT, deployParams *DeployParams, alloc *AllocParams) * EcotoneTime: deployConf.EcotoneTime(uint64(deployConf.L1GenesisBlockTimestamp)), FjordTime: deployConf.FjordTime(uint64(deployConf.L1GenesisBlockTimestamp)), InteropTime: deployConf.InteropTime(uint64(deployConf.L1GenesisBlockTimestamp)), + DAChallengeAddress: l1Deployments.DataAvailabilityChallengeProxy, + DAChallengeWindow: deployConf.DAChallengeWindow, + DAResolveWindow: deployConf.DAResolveWindow, + UsePlasma: deployConf.UsePlasma, } require.NoError(t, rollupCfg.Check()) @@ -208,3 +214,7 @@ func ApplyDeployConfigForks(deployConfig *genesis.DeployConfig) { func UseFPAC() bool { return os.Getenv("OP_E2E_USE_FPAC") == "true" } + +func UsePlasma() bool { + return os.Getenv("OP_E2E_USE_PLASMA") == "true" +} diff --git a/op-node/metrics/metrics.go b/op-node/metrics/metrics.go index 999f1b3edae4..e44020f7894b 100644 --- a/op-node/metrics/metrics.go +++ b/op-node/metrics/metrics.go @@ -10,6 +10,8 @@ import ( "github.com/ethereum/go-ethereum/params" "github.com/ethereum-optimism/optimism/op-node/p2p/store" + plasma "github.com/ethereum-optimism/optimism/op-plasma" + ophttp "github.com/ethereum-optimism/optimism/op-service/httputil" "github.com/ethereum-optimism/optimism/op-service/metrics" @@ -122,6 +124,8 @@ type Metrics struct { TransactionsSequencedTotal prometheus.Counter + PlasmaMetrics plasma.Metricer + // Channel Bank Metrics headChannelOpenedEvent *metrics.Event channelTimedOutEvent *metrics.Event @@ -384,6 +388,8 @@ func NewMetrics(procName string) *Metrics { "required", }), + PlasmaMetrics: plasma.MakeMetrics(ns, factory), + registry: registry, factory: factory, } diff --git a/op-node/node/node.go b/op-node/node/node.go index 7f8fad5adddc..324a595f4ab2 100644 --- a/op-node/node/node.go +++ b/op-node/node/node.go @@ -386,10 +386,12 @@ func (n *OpNode) initL2(ctx context.Context, cfg *Config, snapshotLog log.Logger sequencerConductor = NewConductorClient(cfg, n.log, n.metrics) } - plasmaDA := plasma.NewPlasmaDA(n.log, cfg.Plasma) - if cfg.Plasma.Enabled { - n.log.Info("Plasma DA enabled", "da_server", cfg.Plasma.DAServerURL) + // if plasma is not explicitly activated in the node CLI, the config + any error will be ignored. + rpCfg, err := cfg.Rollup.PlasmaConfig() + if cfg.Plasma.Enabled && err != nil { + return fmt.Errorf("failed to get plasma config: %w", err) } + plasmaDA := plasma.NewPlasmaDA(n.log, cfg.Plasma, rpCfg, n.metrics.PlasmaMetrics) if cfg.SafeDBPath != "" { n.log.Info("Safe head database enabled", "path", cfg.SafeDBPath) safeDB, err := safedb.NewSafeDB(n.log, cfg.SafeDBPath) diff --git a/op-node/rollup/derive/data_source.go b/op-node/rollup/derive/data_source.go index 705601306c14..2a1dbd7c00de 100644 --- a/op-node/rollup/derive/data_source.go +++ b/op-node/rollup/derive/data_source.go @@ -28,7 +28,15 @@ type L1BlobsFetcher interface { type PlasmaInputFetcher interface { // GetInput fetches the input for the given commitment at the given block number from the DA storage service. - GetInput(ctx context.Context, commitment []byte, blockNumber uint64) (plasma.Input, error) + GetInput(ctx context.Context, l1 plasma.L1Fetcher, c plasma.Keccak256Commitment, blockId eth.BlockID) (eth.Data, error) + // AdvanceL1Origin advances the L1 origin to the given block number, syncing the DA challenge events. + AdvanceL1Origin(ctx context.Context, l1 plasma.L1Fetcher, blockId eth.BlockID) error + // Reset the challenge origin in case of L1 reorg + Reset(ctx context.Context, base eth.L1BlockRef, baseCfg eth.SystemConfig) error + // Notify L1 finalized head so plasma finality is always behind L1 + Finalize(ref eth.L1BlockRef) + // Set the engine finalization signal callback + OnFinalizedHeadSignal(f plasma.HeadSignalFn) } // DataSourceFactory reads raw transactions from a given block & then filters for @@ -37,17 +45,17 @@ type PlasmaInputFetcher interface { type DataSourceFactory struct { log log.Logger dsCfg DataSourceConfig - fetcher L1TransactionFetcher + fetcher L1Fetcher blobsFetcher L1BlobsFetcher plasmaFetcher PlasmaInputFetcher ecotoneTime *uint64 } -func NewDataSourceFactory(log log.Logger, cfg *rollup.Config, fetcher L1TransactionFetcher, blobsFetcher L1BlobsFetcher, plasmaFetcher PlasmaInputFetcher) *DataSourceFactory { +func NewDataSourceFactory(log log.Logger, cfg *rollup.Config, fetcher L1Fetcher, blobsFetcher L1BlobsFetcher, plasmaFetcher PlasmaInputFetcher) *DataSourceFactory { config := DataSourceConfig{ l1Signer: cfg.L1Signer(), batchInboxAddress: cfg.BatchInboxAddress, - plasmaEnabled: cfg.IsPlasmaEnabled(), + plasmaEnabled: cfg.UsePlasma, } return &DataSourceFactory{ log: log, @@ -74,7 +82,7 @@ func (ds *DataSourceFactory) OpenData(ctx context.Context, ref eth.L1BlockRef, b } if ds.dsCfg.plasmaEnabled { // plasma([calldata | blobdata](l1Ref)) -> data - return NewPlasmaDataSource(ds.log, src, ds.plasmaFetcher, ref.ID()), nil + return NewPlasmaDataSource(ds.log, src, ds.fetcher, ds.plasmaFetcher, ref.ID()), nil } return src, nil } diff --git a/op-node/rollup/derive/engine_queue.go b/op-node/rollup/derive/engine_queue.go index 6817ba96b3c9..4dd4bebec49a 100644 --- a/op-node/rollup/derive/engine_queue.go +++ b/op-node/rollup/derive/engine_queue.go @@ -136,6 +136,21 @@ const finalityLookback = 4*32 + 1 // We do not want to do this too often, since it requires fetching a L1 block by number, so no cache data. const finalityDelay = 64 +// calcFinalityLookback calculates the default finality lookback based on DA challenge window if plasma +// mode is activated or L1 finality lookback. +func calcFinalityLookback(cfg *rollup.Config) uint64 { + // in plasma mode the longest finality lookback is a commitment is challenged on the last block of + // the challenge window in which case it will be both challenge + resolve window. + if cfg.UsePlasma { + lkb := cfg.DAChallengeWindow + cfg.DAResolveWindow + 1 + // in the case only if the plasma windows are longer than the default finality lookback + if lkb > finalityLookback { + return lkb + } + } + return finalityLookback +} + type FinalityData struct { // The last L2 block that was fully derived and inserted into the L2 engine while processing this L1 block. L2Block eth.L2BlockRef @@ -188,7 +203,7 @@ func NewEngineQueue(log log.Logger, cfg *rollup.Config, l2Source L2Source, engin ec: engine, engine: l2Source, metrics: metrics, - finalityData: make([]FinalityData, 0, finalityLookback), + finalityData: make([]FinalityData, 0, calcFinalityLookback(cfg)), unsafePayloads: NewPayloadsQueue(log, maxUnsafePayloadsMemory, payloadMemSize), prev: prev, l1Fetcher: l1Fetcher, @@ -424,8 +439,8 @@ func (eq *EngineQueue) postProcessSafeL2() error { return err } // prune finality data if necessary - if len(eq.finalityData) >= finalityLookback { - eq.finalityData = append(eq.finalityData[:0], eq.finalityData[1:finalityLookback]...) + if uint64(len(eq.finalityData)) >= calcFinalityLookback(eq.cfg) { + eq.finalityData = append(eq.finalityData[:0], eq.finalityData[1:calcFinalityLookback(eq.cfg)]...) } // remember the last L2 block that we fully derived from the given finality data if len(eq.finalityData) == 0 || eq.finalityData[len(eq.finalityData)-1].L1Block.Number < eq.origin.Number { diff --git a/op-node/rollup/derive/engine_queue_test.go b/op-node/rollup/derive/engine_queue_test.go index 45bcc8ef558b..190e86140efb 100644 --- a/op-node/rollup/derive/engine_queue_test.go +++ b/op-node/rollup/derive/engine_queue_test.go @@ -1218,3 +1218,99 @@ func TestEngineQueue_StepPopOlderUnsafe(t *testing.T) { l1F.AssertExpectations(t) eng.AssertExpectations(t) } + +func TestPlasmaFinalityData(t *testing.T) { + logger := testlog.Logger(t, log.LevelInfo) + eng := &testutils.MockEngine{} + l1F := &testutils.MockL1Source{} + + rng := rand.New(rand.NewSource(1234)) + + refA := testutils.RandomBlockRef(rng) + refA0 := eth.L2BlockRef{ + Hash: testutils.RandomHash(rng), + Number: 0, + ParentHash: common.Hash{}, + Time: refA.Time, + L1Origin: refA.ID(), + SequenceNumber: 0, + } + + prev := &fakeAttributesQueue{origin: refA} + + cfg := &rollup.Config{ + Genesis: rollup.Genesis{ + L1: refA.ID(), + L2: refA0.ID(), + L2Time: refA0.Time, + SystemConfig: eth.SystemConfig{ + BatcherAddr: common.Address{42}, + Overhead: [32]byte{123}, + Scalar: [32]byte{42}, + GasLimit: 20_000_000, + }, + }, + BlockTime: 1, + SeqWindowSize: 2, + UsePlasma: false, + DAChallengeWindow: 90, + DAResolveWindow: 90, + } + // shoud return l1 finality if plasma is not enabled + require.Equal(t, uint64(finalityLookback), calcFinalityLookback(cfg)) + + cfg.UsePlasma = true + expFinalityLookback := 181 + require.Equal(t, uint64(expFinalityLookback), calcFinalityLookback(cfg)) + + refA1 := eth.L2BlockRef{ + Hash: testutils.RandomHash(rng), + Number: refA0.Number + 1, + ParentHash: refA0.Hash, + Time: refA0.Time + cfg.BlockTime, + L1Origin: refA.ID(), + SequenceNumber: 1, + } + + ec := NewEngineController(eng, logger, metrics.NoopMetrics, &rollup.Config{}, sync.CLSync) + + eq := NewEngineQueue(logger, cfg, eng, ec, metrics.NoopMetrics, prev, l1F, &sync.Config{}, safedb.Disabled) + require.Equal(t, expFinalityLookback, cap(eq.finalityData)) + + l1parent := refA + l2parent := refA1 + + ec.SetSafeHead(l2parent) + require.NoError(t, eq.postProcessSafeL2()) + + // advance over 200 l1 origins each time incrementing new l2 safe heads + // and post processing. + for i := uint64(0); i < 200; i++ { + require.NoError(t, eq.postProcessSafeL2()) + + l1parent = eth.L1BlockRef{ + Hash: testutils.RandomHash(rng), + Number: l1parent.Number + 1, + ParentHash: l1parent.Hash, + Time: l1parent.Time + 12, + } + eq.origin = l1parent + + for j := uint64(0); i < cfg.SeqWindowSize; i++ { + l2parent = eth.L2BlockRef{ + Hash: testutils.RandomHash(rng), + Number: l2parent.Number + 1, + ParentHash: l2parent.Hash, + Time: l2parent.Time + cfg.BlockTime, + L1Origin: l1parent.ID(), + SequenceNumber: j, + } + ec.SetSafeHead(l2parent) + require.NoError(t, eq.postProcessSafeL2()) + } + } + + // finality data does not go over challenge + resolve windows + 1 capacity + // (prunes down to 180 then adds the extra 1 each time) + require.Equal(t, expFinalityLookback, len(eq.finalityData)) +} diff --git a/op-node/rollup/derive/pipeline.go b/op-node/rollup/derive/pipeline.go index 31e31f6266c9..405222bba565 100644 --- a/op-node/rollup/derive/pipeline.go +++ b/op-node/rollup/derive/pipeline.go @@ -53,6 +53,7 @@ type DerivationPipeline struct { log log.Logger rollupCfg *rollup.Config l1Fetcher L1Fetcher + plasma PlasmaInputFetcher // Index of the stage that is currently being reset. // >= len(stages) if no additional resetting is required @@ -68,11 +69,11 @@ type DerivationPipeline struct { // NewDerivationPipeline creates a derivation pipeline, which should be reset before use. -func NewDerivationPipeline(log log.Logger, rollupCfg *rollup.Config, l1Fetcher L1Fetcher, l1Blobs L1BlobsFetcher, plasmaInputs PlasmaInputFetcher, l2Source L2Source, engine LocalEngineControl, metrics Metrics, syncCfg *sync.Config, safeHeadListener SafeHeadListener) *DerivationPipeline { +func NewDerivationPipeline(log log.Logger, rollupCfg *rollup.Config, l1Fetcher L1Fetcher, l1Blobs L1BlobsFetcher, plasma PlasmaInputFetcher, l2Source L2Source, engine LocalEngineControl, metrics Metrics, syncCfg *sync.Config, safeHeadListener SafeHeadListener) *DerivationPipeline { // Pull stages l1Traversal := NewL1Traversal(log, rollupCfg, l1Fetcher) - dataSrc := NewDataSourceFactory(log, rollupCfg, l1Fetcher, l1Blobs, plasmaInputs) // auxiliary stage for L1Retrieval + dataSrc := NewDataSourceFactory(log, rollupCfg, l1Fetcher, l1Blobs, plasma) // auxiliary stage for L1Retrieval l1Src := NewL1Retrieval(log, dataSrc, l1Traversal) frameQueue := NewFrameQueue(log, l1Src) bank := NewChannelBank(log, rollupCfg, frameQueue, l1Fetcher, metrics) @@ -84,15 +85,21 @@ func NewDerivationPipeline(log log.Logger, rollupCfg *rollup.Config, l1Fetcher L // Step stages eng := NewEngineQueue(log, rollupCfg, l2Source, engine, metrics, attributesQueue, l1Fetcher, syncCfg, safeHeadListener) + // Plasma takes control of the engine finalization signal only when usePlasma is enabled. + plasma.OnFinalizedHeadSignal(func(ref eth.L1BlockRef) { + eng.Finalize(ref) + }) + // Reset from engine queue then up from L1 Traversal. The stages do not talk to each other during // the reset, but after the engine queue, this is the order in which the stages could talk to each other. // Note: The engine queue stage is the only reset that can fail. - stages := []ResettableStage{eng, l1Traversal, l1Src, frameQueue, bank, chInReader, batchQueue, attributesQueue} + stages := []ResettableStage{eng, l1Traversal, l1Src, plasma, frameQueue, bank, chInReader, batchQueue, attributesQueue} return &DerivationPipeline{ log: log, rollupCfg: rollupCfg, l1Fetcher: l1Fetcher, + plasma: plasma, resetting: 0, stages: stages, eng: eng, @@ -118,7 +125,13 @@ func (dp *DerivationPipeline) Origin() eth.L1BlockRef { } func (dp *DerivationPipeline) Finalize(l1Origin eth.L1BlockRef) { - dp.eng.Finalize(l1Origin) + // In plasma mode, the finalization signal is proxied through the plasma manager. + // Finality signal will come from the DA contract or L1 finality whichever is last. + if dp.rollupCfg.UsePlasma { + dp.plasma.Finalize(l1Origin) + } else { + dp.eng.Finalize(l1Origin) + } } // FinalizedL1 is the L1 finalization of the inner-most stage of the derivation pipeline, diff --git a/op-node/rollup/derive/plasma_data_source.go b/op-node/rollup/derive/plasma_data_source.go index 9c4dec938110..c1b5121128bf 100644 --- a/op-node/rollup/derive/plasma_data_source.go +++ b/op-node/rollup/derive/plasma_data_source.go @@ -2,8 +2,10 @@ package derive import ( "context" + "errors" "fmt" + plasma "github.com/ethereum-optimism/optimism/op-plasma" "github.com/ethereum-optimism/optimism/op-service/eth" "github.com/ethereum/go-ethereum/log" ) @@ -14,36 +16,78 @@ type PlasmaDataSource struct { log log.Logger src DataIter fetcher PlasmaInputFetcher + l1 L1Fetcher id eth.BlockID // keep track of a pending commitment so we can keep trying to fetch the input. - comm []byte + comm plasma.Keccak256Commitment } -func NewPlasmaDataSource(log log.Logger, src DataIter, fetcher PlasmaInputFetcher, id eth.BlockID) *PlasmaDataSource { +func NewPlasmaDataSource(log log.Logger, src DataIter, l1 L1Fetcher, fetcher PlasmaInputFetcher, id eth.BlockID) *PlasmaDataSource { return &PlasmaDataSource{ log: log, src: src, fetcher: fetcher, + l1: l1, id: id, } } func (s *PlasmaDataSource) Next(ctx context.Context) (eth.Data, error) { + // Process origin syncs the challenge contract events and updates the local challenge states + // before we can proceed to fetch the input data. This function can be called multiple times + // for the same origin and noop if the origin was already processed. It is also called if + // there is not commitment in the current origin. + if err := s.fetcher.AdvanceL1Origin(ctx, s.l1, s.id); err != nil { + if errors.Is(err, plasma.ErrReorgRequired) { + return nil, NewResetError(fmt.Errorf("new expired challenge")) + } + return nil, NewTemporaryError(fmt.Errorf("failed to advance plasma L1 origin: %w", err)) + } + if s.comm == nil { var err error // the l1 source returns the input commitment for the batch. - s.comm, err = s.src.Next(ctx) + data, err := s.src.Next(ctx) if err != nil { return nil, err } + // validate batcher inbox data is a commitment. + comm, err := plasma.DecodeKeccak256(data) + if err != nil { + s.log.Warn("invalid commitment", "commitment", data, "err", err) + return s.Next(ctx) + } + s.comm = comm } // use the commitment to fetch the input from the plasma DA provider. - resp, err := s.fetcher.GetInput(ctx, s.comm, s.id.Number) - if err != nil { + data, err := s.fetcher.GetInput(ctx, s.l1, s.comm, s.id) + // GetInput may call for a reorg if the pipeline is stalled and the plasma DA manager + // continued syncing origins detached from the pipeline origin. + if errors.Is(err, plasma.ErrReorgRequired) { + // challenge for a new previously derived commitment expired. + return nil, NewResetError(err) + } else if errors.Is(err, plasma.ErrExpiredChallenge) { + // this commitment was challenged and the challenge expired. + s.log.Warn("challenge expired, skipping batch", "comm", s.comm) + s.comm = nil + // skip the input + return s.Next(ctx) + } else if errors.Is(err, plasma.ErrMissingPastWindow) { + return nil, NewCriticalError(fmt.Errorf("data for comm %x not available: %w", s.comm, err)) + } else if errors.Is(err, plasma.ErrPendingChallenge) { + // continue stepping without slowing down. + return nil, NotEnoughData + } else if err != nil { // return temporary error so we can keep retrying. return nil, NewTemporaryError(fmt.Errorf("failed to fetch input data with comm %x from da service: %w", s.comm, err)) } + // inputs are limited to a max size to ensure they can be challenged in the DA contract. + if len(data) > plasma.MaxInputSize { + s.log.Warn("input data exceeds max size", "size", len(data), "max", plasma.MaxInputSize) + s.comm = nil + return s.Next(ctx) + } // reset the commitment so we can fetch the next one from the source at the next iteration. s.comm = nil - return resp.Data, nil + return data, nil } diff --git a/op-node/rollup/derive/plasma_data_source_test.go b/op-node/rollup/derive/plasma_data_source_test.go index dc96a4390b72..ba2e7c6f06ce 100644 --- a/op-node/rollup/derive/plasma_data_source_test.go +++ b/op-node/rollup/derive/plasma_data_source_test.go @@ -12,17 +12,35 @@ import ( "github.com/ethereum-optimism/optimism/op-service/eth" "github.com/ethereum-optimism/optimism/op-service/testlog" "github.com/ethereum-optimism/optimism/op-service/testutils" + "github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/params" + "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" ) +type MockFinalitySignal struct { + mock.Mock +} + +func (m *MockFinalitySignal) OnFinalized(blockRef eth.L1BlockRef) { + m.MethodCalled("OnFinalized", blockRef) +} + +func (m *MockFinalitySignal) ExpectFinalized(blockRef eth.L1BlockRef) { + m.On("OnFinalized", blockRef).Once() +} + // TestPlasmaDataSource verifies that commitments are correctly read from l1 and then // forwarded to the Plasma DA to return the correct inputs in the iterator. +// First it generates some L1 refs containing a random number of commitments, challenges +// the first 4 commitments then generates enough blocks to expire the challenge. +// Then it simulates rederiving while verifying it does skip the expired input until the next +// challenge expires. func TestPlasmaDataSource(t *testing.T) { logger := testlog.Logger(t, log.LevelDebug) ctx := context.Background() @@ -33,7 +51,17 @@ func TestPlasmaDataSource(t *testing.T) { storage := plasma.NewMockDAClient(logger) - da := plasma.NewPlasmaDAWithStorage(logger, storage) + pcfg := plasma.Config{ + ChallengeWindow: 90, ResolveWindow: 90, + } + metrics := &plasma.NoopMetrics{} + + daState := plasma.NewState(logger, metrics) + + da := plasma.NewPlasmaDAWithState(logger, pcfg, storage, metrics, daState) + + finalitySignal := &MockFinalitySignal{} + da.OnFinalizedHeadSignal(finalitySignal.OnFinalized) // Create rollup genesis and config l1Time := uint64(2) @@ -57,19 +85,23 @@ func TestPlasmaDataSource(t *testing.T) { L2: refA0.ID(), L2Time: refA0.Time, }, - BlockTime: 1, - SeqWindowSize: 20, - BatchInboxAddress: batcherInbox, - DAChallengeAddress: common.Address{43}, + BlockTime: 1, + SeqWindowSize: 20, + BatchInboxAddress: batcherInbox, + UsePlasma: true, } // keep track of random input data to validate against var inputs [][]byte + var comms []plasma.Keccak256Commitment signer := cfg.L1Signer() factory := NewDataSourceFactory(logger, cfg, l1F, nil, da) - for i := uint64(0); i <= 18; i++ { + nc := 0 + firstChallengeExpirationBlock := uint64(95) + + for i := uint64(0); i <= pcfg.ChallengeWindow+pcfg.ResolveWindow; i++ { parent := l1Refs[len(l1Refs)-1] // create a new mock l1 ref ref := eth.L1BlockRef{ @@ -80,6 +112,8 @@ func TestPlasmaDataSource(t *testing.T) { } l1Refs = append(l1Refs, ref) logger.Info("new l1 block", "ref", ref) + // called for each l1 block to sync challenges + l1F.ExpectFetchReceipts(ref.Hash, nil, types.Receipts{}, nil) // pick a random number of commitments to include in the l1 block c := rng.Intn(4) @@ -90,6 +124,7 @@ func TestPlasmaDataSource(t *testing.T) { input := testutils.RandomData(rng, 2000) comm, _ := storage.SetInput(ctx, input) inputs = append(inputs, input) + comms = append(comms, comm) tx, err := types.SignNewTx(batcherPriv, signer, &types.DynamicFeeTx{ ChainID: signer.ChainID(), @@ -99,18 +134,44 @@ func TestPlasmaDataSource(t *testing.T) { Gas: 100_000, To: &batcherInbox, Value: big.NewInt(int64(0)), - Data: comm, + Data: comm.Encode(), }) require.NoError(t, err) txs = append(txs, tx) + } logger.Info("included commitments", "count", c) l1F.ExpectInfoAndTxsByHash(ref.Hash, testutils.RandomBlockInfo(rng), txs, nil) + // called once per derivation + l1F.ExpectInfoAndTxsByHash(ref.Hash, testutils.RandomBlockInfo(rng), txs, nil) + + if ref.Number == 2 { + l1F.ExpectL1BlockRefByNumber(ref.Number, ref, nil) + finalitySignal.ExpectFinalized(ref) + } + + // challenge the first 4 commitments as soon as we have collected them all + if len(comms) >= 4 && nc < 7 { + // skip a block between each challenge transaction + if nc%2 == 0 { + daState.SetActiveChallenge(comms[nc/2].Encode(), ref.Number, pcfg.ResolveWindow) + logger.Info("setting active challenge", "comm", comms[nc/2]) + } + nc++ + } // create a new data source for each block src, err := factory.OpenData(ctx, ref, batcherAddr) require.NoError(t, err) + + // first challenge expires + if i == firstChallengeExpirationBlock { + _, err := src.Next(ctx) + require.ErrorIs(t, err, ErrReset) + break + } + for j := 0; j < c; j++ { data, err := src.Next(ctx) // check that each commitment is resolved @@ -121,4 +182,338 @@ func TestPlasmaDataSource(t *testing.T) { _, err = src.Next(ctx) require.ErrorIs(t, err, io.EOF) } + + logger.Info("pipeline reset ..................................") + + // start at 1 since first input should be skipped + nc = 1 + secondChallengeExpirationBlock := 98 + + for i := 1; i <= len(l1Refs)+2; i++ { + + var ref eth.L1BlockRef + // first we run through all the existing l1 blocks + if i < len(l1Refs) { + ref = l1Refs[i] + logger.Info("re deriving block", "ref", ref, "i", i) + + if i == len(l1Refs)-1 { + l1F.ExpectFetchReceipts(ref.Hash, nil, types.Receipts{}, nil) + } + // once past the l1 head, continue generating new l1 refs + } else { + parent := l1Refs[len(l1Refs)-1] + // create a new mock l1 ref + ref = eth.L1BlockRef{ + Hash: testutils.RandomHash(rng), + Number: parent.Number + 1, + ParentHash: parent.Hash, + Time: parent.Time + l1Time, + } + l1Refs = append(l1Refs, ref) + logger.Info("new l1 block", "ref", ref) + // called for each l1 block to sync challenges + l1F.ExpectFetchReceipts(ref.Hash, nil, types.Receipts{}, nil) + + // pick a random number of commitments to include in the l1 block + c := rng.Intn(4) + var txs []*types.Transaction + + for j := 0; j < c; j++ { + // mock input commitments in l1 transactions + input := testutils.RandomData(rng, 2000) + comm, _ := storage.SetInput(ctx, input) + inputs = append(inputs, input) + comms = append(comms, comm) + + tx, err := types.SignNewTx(batcherPriv, signer, &types.DynamicFeeTx{ + ChainID: signer.ChainID(), + Nonce: 0, + GasTipCap: big.NewInt(2 * params.GWei), + GasFeeCap: big.NewInt(30 * params.GWei), + Gas: 100_000, + To: &batcherInbox, + Value: big.NewInt(int64(0)), + Data: comm.Encode(), + }) + require.NoError(t, err) + + txs = append(txs, tx) + + } + logger.Info("included commitments", "count", c) + l1F.ExpectInfoAndTxsByHash(ref.Hash, testutils.RandomBlockInfo(rng), txs, nil) + } + + // create a new data source for each block + src, err := factory.OpenData(ctx, ref, batcherAddr) + require.NoError(t, err) + + // next challenge expires + if i == secondChallengeExpirationBlock { + _, err := src.Next(ctx) + require.ErrorIs(t, err, ErrReset) + break + } + + for data, err := src.Next(ctx); err != io.EOF; data, err = src.Next(ctx) { + logger.Info("yielding data") + // check that each commitment is resolved + require.NoError(t, err) + require.Equal(t, hexutil.Bytes(inputs[nc]), data) + + nc++ + } + + } + + // trigger l1 finalization signal + da.Finalize(l1Refs[len(l1Refs)-32]) + + finalitySignal.AssertExpectations(t) + l1F.AssertExpectations(t) +} + +// This tests makes sure the pipeline returns a temporary error if data is not found. +func TestPlasmaDataSourceStall(t *testing.T) { + logger := testlog.Logger(t, log.LevelDebug) + ctx := context.Background() + + rng := rand.New(rand.NewSource(1234)) + + l1F := &testutils.MockL1Source{} + + storage := plasma.NewMockDAClient(logger) + + pcfg := plasma.Config{ + ChallengeWindow: 90, ResolveWindow: 90, + } + + metrics := &plasma.NoopMetrics{} + + daState := plasma.NewState(logger, metrics) + + da := plasma.NewPlasmaDAWithState(logger, pcfg, storage, metrics, daState) + + finalitySignal := &MockFinalitySignal{} + da.OnFinalizedHeadSignal(finalitySignal.OnFinalized) + + // Create rollup genesis and config + l1Time := uint64(2) + refA := testutils.RandomBlockRef(rng) + refA.Number = 1 + l1Refs := []eth.L1BlockRef{refA} + refA0 := eth.L2BlockRef{ + Hash: testutils.RandomHash(rng), + Number: 0, + ParentHash: common.Hash{}, + Time: refA.Time, + L1Origin: refA.ID(), + SequenceNumber: 0, + } + batcherPriv := testutils.RandomKey() + batcherAddr := crypto.PubkeyToAddress(batcherPriv.PublicKey) + batcherInbox := common.Address{42} + cfg := &rollup.Config{ + Genesis: rollup.Genesis{ + L1: refA.ID(), + L2: refA0.ID(), + L2Time: refA0.Time, + }, + BlockTime: 1, + SeqWindowSize: 20, + BatchInboxAddress: batcherInbox, + UsePlasma: true, + } + + signer := cfg.L1Signer() + + factory := NewDataSourceFactory(logger, cfg, l1F, nil, da) + + parent := l1Refs[0] + // create a new mock l1 ref + ref := eth.L1BlockRef{ + Hash: testutils.RandomHash(rng), + Number: parent.Number + 1, + ParentHash: parent.Hash, + Time: parent.Time + l1Time, + } + l1F.ExpectFetchReceipts(ref.Hash, nil, types.Receipts{}, nil) + // mock input commitments in l1 transactions + input := testutils.RandomData(rng, 2000) + comm, _ := storage.SetInput(ctx, input) + + tx, err := types.SignNewTx(batcherPriv, signer, &types.DynamicFeeTx{ + ChainID: signer.ChainID(), + Nonce: 0, + GasTipCap: big.NewInt(2 * params.GWei), + GasFeeCap: big.NewInt(30 * params.GWei), + Gas: 100_000, + To: &batcherInbox, + Value: big.NewInt(int64(0)), + Data: comm.Encode(), + }) + require.NoError(t, err) + + txs := []*types.Transaction{tx} + + l1F.ExpectInfoAndTxsByHash(ref.Hash, testutils.RandomBlockInfo(rng), txs, nil) + + // delete the input from the DA provider so it returns not found + require.NoError(t, storage.DeleteData(comm.Encode())) + + // next block is fetched to look ahead challenges but is not yet available + l1F.ExpectL1BlockRefByNumber(ref.Number+1, eth.L1BlockRef{}, ethereum.NotFound) + + src, err := factory.OpenData(ctx, ref, batcherAddr) + require.NoError(t, err) + + // data is not found so we return a temporary error + _, err = src.Next(ctx) + require.ErrorIs(t, err, ErrTemporary) + + // next block is available with no challenge events + nextRef := eth.L1BlockRef{ + Number: ref.Number + 1, + Hash: testutils.RandomHash(rng), + } + l1F.ExpectL1BlockRefByNumber(nextRef.Number, nextRef, nil) + l1F.ExpectFetchReceipts(nextRef.Hash, nil, types.Receipts{}, nil) + + // not enough data + _, err = src.Next(ctx) + require.ErrorIs(t, err, NotEnoughData) + + // now challenge is resolved + daState.SetResolvedChallenge(comm.Encode(), input, ref.Number+2) + + // derivation can resume + data, err := src.Next(ctx) + require.NoError(t, err) + require.Equal(t, hexutil.Bytes(input), data) + + l1F.AssertExpectations(t) +} + +// TestPlasmaDataSourceInvalidData tests that the pipeline skips invalid data and continues +// this includes invalid commitments and oversized inputs. +func TestPlasmaDataSourceInvalidData(t *testing.T) { + logger := testlog.Logger(t, log.LevelDebug) + ctx := context.Background() + + rng := rand.New(rand.NewSource(1234)) + + l1F := &testutils.MockL1Source{} + + storage := plasma.NewMockDAClient(logger) + + pcfg := plasma.Config{ + ChallengeWindow: 90, ResolveWindow: 90, + } + + da := plasma.NewPlasmaDAWithStorage(logger, pcfg, storage, &plasma.NoopMetrics{}) + + // Create rollup genesis and config + l1Time := uint64(2) + refA := testutils.RandomBlockRef(rng) + refA.Number = 1 + l1Refs := []eth.L1BlockRef{refA} + refA0 := eth.L2BlockRef{ + Hash: testutils.RandomHash(rng), + Number: 0, + ParentHash: common.Hash{}, + Time: refA.Time, + L1Origin: refA.ID(), + SequenceNumber: 0, + } + batcherPriv := testutils.RandomKey() + batcherAddr := crypto.PubkeyToAddress(batcherPriv.PublicKey) + batcherInbox := common.Address{42} + cfg := &rollup.Config{ + Genesis: rollup.Genesis{ + L1: refA.ID(), + L2: refA0.ID(), + L2Time: refA0.Time, + }, + BlockTime: 1, + SeqWindowSize: 20, + BatchInboxAddress: batcherInbox, + UsePlasma: true, + } + + signer := cfg.L1Signer() + + factory := NewDataSourceFactory(logger, cfg, l1F, nil, da) + + parent := l1Refs[0] + // create a new mock l1 ref + ref := eth.L1BlockRef{ + Hash: testutils.RandomHash(rng), + Number: parent.Number + 1, + ParentHash: parent.Hash, + Time: parent.Time + l1Time, + } + l1F.ExpectFetchReceipts(ref.Hash, nil, types.Receipts{}, nil) + // mock input commitments in l1 transactions with an oversized input + input := testutils.RandomData(rng, plasma.MaxInputSize+1) + comm, _ := storage.SetInput(ctx, input) + + tx1, err := types.SignNewTx(batcherPriv, signer, &types.DynamicFeeTx{ + ChainID: signer.ChainID(), + Nonce: 0, + GasTipCap: big.NewInt(2 * params.GWei), + GasFeeCap: big.NewInt(30 * params.GWei), + Gas: 100_000, + To: &batcherInbox, + Value: big.NewInt(int64(0)), + Data: comm.Encode(), + }) + require.NoError(t, err) + + // valid data + input2 := testutils.RandomData(rng, 2000) + comm2, _ := storage.SetInput(ctx, input2) + tx2, err := types.SignNewTx(batcherPriv, signer, &types.DynamicFeeTx{ + ChainID: signer.ChainID(), + Nonce: 0, + GasTipCap: big.NewInt(2 * params.GWei), + GasFeeCap: big.NewInt(30 * params.GWei), + Gas: 100_000, + To: &batcherInbox, + Value: big.NewInt(int64(0)), + Data: comm2.Encode(), + }) + require.NoError(t, err) + + // invalid commitment + input3 := testutils.RandomData(rng, 32) + tx3, err := types.SignNewTx(batcherPriv, signer, &types.DynamicFeeTx{ + ChainID: signer.ChainID(), + Nonce: 0, + GasTipCap: big.NewInt(2 * params.GWei), + GasFeeCap: big.NewInt(30 * params.GWei), + Gas: 100_000, + To: &batcherInbox, + Value: big.NewInt(int64(0)), + Data: input3, + }) + require.NoError(t, err) + + txs := []*types.Transaction{tx1, tx2, tx3} + + l1F.ExpectInfoAndTxsByHash(ref.Hash, testutils.RandomBlockInfo(rng), txs, nil) + + src, err := factory.OpenData(ctx, ref, batcherAddr) + require.NoError(t, err) + + // oversized input should be skipped + data, err := src.Next(ctx) + require.NoError(t, err) + require.Equal(t, hexutil.Bytes(input2), data) + + // invalid commitment is skipped so should return an EOF + _, err = src.Next(ctx) + require.ErrorIs(t, err, io.EOF) + + l1F.AssertExpectations(t) } diff --git a/op-node/rollup/sync/start.go b/op-node/rollup/sync/start.go index 5023ed83c53e..34d650cec749 100644 --- a/op-node/rollup/sync/start.go +++ b/op-node/rollup/sync/start.go @@ -176,7 +176,7 @@ func FindL2Heads(ctx context.Context, cfg *rollup.Config, l1 L1Chain, l2 L2Chain if result.Unsafe == (eth.L2BlockRef{}) { result.Unsafe = n // Check we are not reorging L2 incredibly deep - if n.L1Origin.Number+(MaxReorgSeqWindows*cfg.SeqWindowSize) < prevUnsafe.L1Origin.Number { + if n.L1Origin.Number+(MaxReorgSeqWindows*cfg.SyncLookback()) < prevUnsafe.L1Origin.Number { // If the reorg depth is too large, something is fishy. // This can legitimately happen if L1 goes down for a while. But in that case, // restarting the L2 node with a bigger configured MaxReorgDepth is an acceptable @@ -201,7 +201,7 @@ func FindL2Heads(ctx context.Context, cfg *rollup.Config, l1 L1Chain, l2 L2Chain } // If the L2 block is at least as old as the previous safe head, and we have seen at least a full sequence window worth of L1 blocks to confirm - if n.Number <= result.Safe.Number && n.L1Origin.Number+cfg.SeqWindowSize < highestL2WithCanonicalL1Origin.L1Origin.Number && n.SequenceNumber == 0 { + if n.Number <= result.Safe.Number && n.L1Origin.Number+cfg.SyncLookback() < highestL2WithCanonicalL1Origin.L1Origin.Number && n.SequenceNumber == 0 { ready = true } diff --git a/op-node/rollup/types.go b/op-node/rollup/types.go index b4916492331a..1ad45272e8aa 100644 --- a/op-node/rollup/types.go +++ b/op-node/rollup/types.go @@ -12,6 +12,7 @@ import ( "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/params" + plasma "github.com/ethereum-optimism/optimism/op-plasma" "github.com/ethereum-optimism/optimism/op-service/eth" ) @@ -110,6 +111,17 @@ type Config struct { // L1 DataAvailabilityChallenge contract proxy address DAChallengeAddress common.Address `json:"da_challenge_address,omitempty"` + + // DA challenge window value set on the DAC contract. Used in plasma mode + // to compute when a commitment can no longer be challenged. + DAChallengeWindow uint64 `json:"da_challenge_window"` + + // DA resolve window value set on the DAC contract. Used in plasma mode + // to compute when a challenge expires and trigger a reorg if needed. + DAResolveWindow uint64 `json:"da_resolve_window"` + + // UsePlasma is activated when the chain is in plasma mode. + UsePlasma bool `json:"use_plasma"` } // ValidateL1Config checks L1 config variables for errors. @@ -393,9 +405,33 @@ func (c *Config) GetPayloadVersion(timestamp uint64) eth.EngineAPIMethod { } } -// IsPlasmaEnabled returns true if a DA Challenge proxy Address is provided in the rollup config. -func (c *Config) IsPlasmaEnabled() bool { - return c.DAChallengeAddress != (common.Address{}) +// PlasmaConfig validates and returns the plasma config from the rollup config. +func (c *Config) PlasmaConfig() (plasma.Config, error) { + if c.DAChallengeAddress == (common.Address{}) { + return plasma.Config{}, fmt.Errorf("missing DAChallengeAddress") + } + if c.DAChallengeWindow == uint64(0) { + return plasma.Config{}, fmt.Errorf("missing DAChallengeWindow") + } + if c.DAResolveWindow == uint64(0) { + return plasma.Config{}, fmt.Errorf("missing DAResolveWindow") + } + return plasma.Config{ + DAChallengeContractAddress: c.DAChallengeAddress, + ChallengeWindow: c.DAChallengeWindow, + ResolveWindow: c.DAResolveWindow, + }, nil +} + +// SyncLookback computes the number of blocks to walk back in order to find the correct L1 origin. +// In plasma mode longest possible window is challenge + resolve windows. +func (c *Config) SyncLookback() uint64 { + if c.UsePlasma { + if win := (c.DAChallengeWindow + c.DAResolveWindow); win > c.SeqWindowSize { + return win + } + } + return c.SeqWindowSize } // Description outputs a banner describing the important parts of rollup configuration in a human-readable form. diff --git a/op-plasma/commitment.go b/op-plasma/commitment.go new file mode 100644 index 000000000000..e0f49e538425 --- /dev/null +++ b/op-plasma/commitment.go @@ -0,0 +1,56 @@ +package plasma + +import ( + "bytes" + "errors" + + "github.com/ethereum/go-ethereum/crypto" +) + +// ErrInvalidCommitment is returned when the commitment cannot be parsed into a known commitment type. +var ErrInvalidCommitment = errors.New("invalid commitment") + +// ErrCommitmentMismatch is returned when the commitment does not match the given input. +var ErrCommitmentMismatch = errors.New("commitment mismatch") + +// CommitmentType is the commitment type prefix. +type CommitmentType byte + +// KeccakCommitmentType is the default commitment type for the DA storage. +const Keccak256CommitmentType CommitmentType = 0 + +// Keccak256Commitment is the default commitment type for op-plasma. +type Keccak256Commitment []byte + +// Encode adds a commitment type prefix self describing the commitment. +func (c Keccak256Commitment) Encode() []byte { + return append([]byte{byte(Keccak256CommitmentType)}, c...) +} + +// Verify checks if the commitment matches the given input. +func (c Keccak256Commitment) Verify(input []byte) error { + if !bytes.Equal(c, crypto.Keccak256(input)) { + return ErrCommitmentMismatch + } + return nil +} + +// Keccak256 creates a new commitment from the given input. +func Keccak256(input []byte) Keccak256Commitment { + return Keccak256Commitment(crypto.Keccak256(input)) +} + +// DecodeKeccak256 validates and casts the commitment into a Keccak256Commitment. +func DecodeKeccak256(commitment []byte) (Keccak256Commitment, error) { + if len(commitment) == 0 { + return nil, ErrInvalidCommitment + } + if commitment[0] != byte(Keccak256CommitmentType) { + return nil, ErrInvalidCommitment + } + c := commitment[1:] + if len(c) != 32 { + return nil, ErrInvalidCommitment + } + return c, nil +} diff --git a/op-plasma/daclient.go b/op-plasma/daclient.go index 80fb5b2cda1b..e1c03509f69a 100644 --- a/op-plasma/daclient.go +++ b/op-plasma/daclient.go @@ -7,16 +7,11 @@ import ( "fmt" "io" "net/http" - - "github.com/ethereum/go-ethereum/crypto" ) // ErrNotFound is returned when the server could not find the input. var ErrNotFound = errors.New("not found") -// ErrCommitmentMismatch is returned when the server returns the wrong input for the given commitment. -var ErrCommitmentMismatch = errors.New("commitment mismatch") - // ErrInvalidInput is returned when the input is not valid for posting to the DA storage. var ErrInvalidInput = errors.New("invalid input") @@ -34,9 +29,9 @@ func NewDAClient(url string, verify bool) *DAClient { return &DAClient{url, verify} } -// GetInput returns the input data for the given commitment bytes. -func (c *DAClient) GetInput(ctx context.Context, key []byte) ([]byte, error) { - req, err := http.NewRequestWithContext(ctx, http.MethodGet, fmt.Sprintf("%s/get/0x%x", c.url, key), nil) +// GetInput returns the input data for the given encoded commitment bytes. +func (c *DAClient) GetInput(ctx context.Context, comm Keccak256Commitment) ([]byte, error) { + req, err := http.NewRequestWithContext(ctx, http.MethodGet, fmt.Sprintf("%s/get/0x%x", c.url, comm.Encode()), nil) if err != nil { return nil, fmt.Errorf("failed to create HTTP request: %w", err) } @@ -53,20 +48,22 @@ func (c *DAClient) GetInput(ctx context.Context, key []byte) ([]byte, error) { return nil, err } if c.verify { - exp := crypto.Keccak256(input) - if !bytes.Equal(exp, key) { - return nil, ErrCommitmentMismatch + if err := comm.Verify(input); err != nil { + return nil, err } + } return input, nil } // SetInput sets the input data and returns the keccak256 hash commitment. -func (c *DAClient) SetInput(ctx context.Context, img []byte) ([]byte, error) { +func (c *DAClient) SetInput(ctx context.Context, img []byte) (Keccak256Commitment, error) { if len(img) == 0 { return nil, ErrInvalidInput } - key := crypto.Keccak256(img) + comm := Keccak256(img) + // encode with commitment type prefix + key := comm.Encode() body := bytes.NewReader(img) url := fmt.Sprintf("%s/put/0x%x", c.url, key) req, err := http.NewRequestWithContext(ctx, http.MethodPost, url, body) @@ -82,5 +79,5 @@ func (c *DAClient) SetInput(ctx context.Context, img []byte) ([]byte, error) { if resp.StatusCode != http.StatusOK { return nil, fmt.Errorf("failed to store preimage: %v", resp.StatusCode) } - return key, nil + return comm, nil } diff --git a/op-plasma/daclient_test.go b/op-plasma/daclient_test.go index c2afdfe38072..f605cda761ad 100644 --- a/op-plasma/daclient_test.go +++ b/op-plasma/daclient_test.go @@ -9,9 +9,7 @@ import ( "testing" "github.com/ethereum-optimism/optimism/op-service/testlog" - "github.com/ethereum-optimism/optimism/op-service/testutils" "github.com/ethereum/go-ethereum/common/hexutil" - "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/ethdb/memorydb" "github.com/ethereum/go-ethereum/log" "github.com/stretchr/testify/require" @@ -80,12 +78,12 @@ func TestDAClient(t *testing.T) { rng := rand.New(rand.NewSource(1234)) - input := testutils.RandomData(rng, 2000) + input := RandomData(rng, 2000) comm, err := client.SetInput(ctx, input) require.NoError(t, err) - require.Equal(t, comm, crypto.Keccak256(input)) + require.Equal(t, comm, Keccak256(input)) stored, err := client.GetInput(ctx, comm) require.NoError(t, err) @@ -93,13 +91,13 @@ func TestDAClient(t *testing.T) { require.Equal(t, input, stored) // set a bad commitment in the store - require.NoError(t, store.Put(comm, []byte("bad data"))) + require.NoError(t, store.Put(comm.Encode(), []byte("bad data"))) _, err = client.GetInput(ctx, comm) require.ErrorIs(t, err, ErrCommitmentMismatch) // test not found error - comm = crypto.Keccak256(testutils.RandomData(rng, 32)) + comm = Keccak256(RandomData(rng, 32)) _, err = client.GetInput(ctx, comm) require.ErrorIs(t, err, ErrNotFound) @@ -112,6 +110,6 @@ func TestDAClient(t *testing.T) { _, err = client.SetInput(ctx, input) require.Error(t, err) - _, err = client.GetInput(ctx, crypto.Keccak256(input)) + _, err = client.GetInput(ctx, Keccak256(input)) require.Error(t, err) } diff --git a/op-plasma/damgr.go b/op-plasma/damgr.go index f206678b67be..13d67aa505ba 100644 --- a/op-plasma/damgr.go +++ b/op-plasma/damgr.go @@ -2,48 +2,423 @@ package plasma import ( "context" + "errors" + "fmt" + "io" + "github.com/ethereum/go-ethereum/accounts/abi" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/log" + "github.com/ethereum-optimism/optimism/op-bindings/bindings" "github.com/ethereum-optimism/optimism/op-service/eth" ) +// ErrPendingChallenge is returned when data is not available but can still be challenged/resolved +// so derivation should halt temporarily. +var ErrPendingChallenge = errors.New("not found, pending challenge") + +// ErrExpiredChallenge is returned when a challenge was not resolved and derivation should skip this input. +var ErrExpiredChallenge = errors.New("challenge expired") + +// ErrMissingPastWindow is returned when the input data is MIA and cannot be challenged. +// This is a protocol fatal error. +var ErrMissingPastWindow = errors.New("data missing past window") + +// ErrInvalidChallenge is returned when a challenge event does is decoded but does not +// relate to the actual chain commitments. +var ErrInvalidChallenge = errors.New("invalid challenge") + +// L1Fetcher is the required interface for syncing the DA challenge contract state. +type L1Fetcher interface { + InfoAndTxsByHash(ctx context.Context, hash common.Hash) (eth.BlockInfo, types.Transactions, error) + FetchReceipts(ctx context.Context, blockHash common.Hash) (eth.BlockInfo, types.Receipts, error) + L1BlockRefByNumber(context.Context, uint64) (eth.L1BlockRef, error) +} + +// DAStorage interface for calling the DA storage server. type DAStorage interface { - GetInput(ctx context.Context, key []byte) ([]byte, error) - SetInput(ctx context.Context, img []byte) ([]byte, error) + GetInput(ctx context.Context, key Keccak256Commitment) ([]byte, error) + SetInput(ctx context.Context, img []byte) (Keccak256Commitment, error) +} + +// HeadSignalFn is the callback function to accept head-signals without a context. +type HeadSignalFn func(eth.L1BlockRef) + +// Config is the relevant subset of rollup config for plasma DA. +type Config struct { + // Required for filtering contract events + DAChallengeContractAddress common.Address + // The number of l1 blocks after the input is committed during which one can challenge. + ChallengeWindow uint64 + // The number of l1 blocks after a commitment is challenged during which one can resolve. + ResolveWindow uint64 } type DA struct { log log.Logger + cfg Config + metrics Metricer + storage DAStorage -} -type Input struct { - Data eth.Data + // the DA state keeps track of all the commitments and their challenge status. + state *State + + // the latest l1 block we synced challenge contract events from + origin eth.BlockID + // the latest recorded finalized head as per the challenge contract + finalizedHead eth.L1BlockRef + // the latest recorded finalized head as per the l1 finalization signal + l1FinalizedHead eth.L1BlockRef + // flag the reset function we are resetting because of an expired challenge + resetting bool + + finalizedHeadSignalFunc HeadSignalFn } // NewPlasmaDA creates a new PlasmaDA instance with the given log and CLIConfig. -func NewPlasmaDA(log log.Logger, cfg CLIConfig) *DA { +func NewPlasmaDA(log log.Logger, cli CLIConfig, cfg Config, metrics Metricer) *DA { + return NewPlasmaDAWithStorage(log, cfg, cli.NewDAClient(), metrics) +} + +// NewPlasmaDAWithStorage creates a new PlasmaDA instance with the given log and DAStorage interface. +func NewPlasmaDAWithStorage(log log.Logger, cfg Config, storage DAStorage, metrics Metricer) *DA { return &DA{ log: log, - storage: cfg.NewDAClient(), + cfg: cfg, + storage: storage, + metrics: metrics, + state: NewState(log, metrics), } } -// NewPlasmaDAWithStorage creates a new PlasmaDA instance with the given log and DAStorage interface. -func NewPlasmaDAWithStorage(log log.Logger, storage DAStorage) *DA { +// NewPlasmaWithState creates a plasma storage from initial state used for testing in isolation. +// We pass the L1Fetcher to each method so it is kept in sync with the conf depth of the pipeline. +func NewPlasmaDAWithState(log log.Logger, cfg Config, storage DAStorage, metrics Metricer, state *State) *DA { return &DA{ log: log, + cfg: cfg, storage: storage, + metrics: metrics, + state: state, } } +// OnFinalizedHeadSignal sets the callback function to be called when the finalized head is updated. +// This will signal to the engine queue that will set the proper L2 block as finalized. +func (d *DA) OnFinalizedHeadSignal(f HeadSignalFn) { + d.finalizedHeadSignalFunc = f +} + +// Finalize takes the L1 finality signal, compares the plasma finalized block and forwards the finality +// signal to the engine queue based on whichever is most behind. +func (d *DA) Finalize(l1Finalized eth.L1BlockRef) { + ref := d.finalizedHead + d.log.Info("received l1 finalized signal, forwarding to engine queue", "l1", l1Finalized, "plasma", ref) + // if the l1 finalized head is behind it is the finalized head + if l1Finalized.Number < d.finalizedHead.Number { + ref = l1Finalized + } + // prune finalized state + d.state.Prune(ref.Number) + + if d.finalizedHeadSignalFunc == nil { + d.log.Warn("finalized head signal function not set") + return + } + + // signal the engine queue + d.finalizedHeadSignalFunc(ref) +} + +// LookAhead increments the challenges origin and process the new block if it exists. +// It is used when the derivation pipeline stalls due to missing data and we need to continue +// syncing challenge events until the challenge is resolved or expires. +func (d *DA) LookAhead(ctx context.Context, l1 L1Fetcher) error { + blkRef, err := l1.L1BlockRefByNumber(ctx, d.origin.Number+1) + // temporary error, will do a backoff + if err != nil { + return err + } + return d.AdvanceL1Origin(ctx, l1, blkRef.ID()) +} + +// Reset the challenge event derivation origin in case of L1 reorg +func (d *DA) Reset(ctx context.Context, base eth.L1BlockRef, baseCfg eth.SystemConfig) error { + // resetting due to expired challenge, do not clear state. + // If the DA source returns ErrReset, the pipeline is forced to reset by the rollup driver. + // In that case the Reset function will be called immediately, BEFORE the pipeline can + // call any further stage to step. Thus the state will NOT be cleared if the reset originates + // from this stage of the pipeline. + if d.resetting { + d.resetting = false + } else { + // resetting due to L1 reorg, clear state + d.origin = base.ID() + d.state.Reset() + } + return io.EOF +} + // GetInput returns the input data for the given commitment bytes. blockNumber is required to lookup // the challenge status in the DataAvailabilityChallenge L1 contract. -func (d *DA) GetInput(ctx context.Context, commitment []byte, blockNumber uint64) (Input, error) { - data, err := d.storage.GetInput(ctx, commitment) +func (d *DA) GetInput(ctx context.Context, l1 L1Fetcher, comm Keccak256Commitment, blockId eth.BlockID) (eth.Data, error) { + // If the challenge head is ahead in the case of a pipeline reset or stall, we might have synced a + // challenge event for this commitment. Otherwise we mark the commitment as part of the canonical + // chain so potential future challenge events can be selected. + ch := d.state.GetOrTrackChallenge(comm.Encode(), blockId.Number, d.cfg.ChallengeWindow) + + // Fetch the input from the DA storage. + data, err := d.storage.GetInput(ctx, comm) + + // data is not found in storage but may be available if the challenge was resolved. + notFound := errors.Is(ErrNotFound, err) + + if err != nil && !notFound { + d.log.Error("failed to get preimage", "err", err) + // the storage client request failed for some other reason + // in which case derivation pipeline should be retried + return nil, err + } + + switch ch.challengeStatus { + case ChallengeActive: + if d.isExpired(ch.expiresAt) { + // this challenge has expired, this input must be skipped + return nil, ErrExpiredChallenge + } else if notFound { + // data is missing and a challenge is active, we must wait for the challenge to resolve + // hence we continue syncing new origins to sync the new challenge events. + if err := d.LookAhead(ctx, l1); err != nil { + return nil, err + } + return nil, ErrPendingChallenge + } + case ChallengeExpired: + // challenge was marked as expired, skip + return nil, ErrExpiredChallenge + case ChallengeResolved: + // challenge was resolved, data is available in storage, return directly + if !notFound { + return data, nil + } + // data not found in storage, return from challenge resolved input + resolvedInput, err := d.state.GetResolvedInput(comm.Encode()) + if err != nil { + return nil, err + } + return resolvedInput, nil + default: + if notFound { + if d.isExpired(ch.expiresAt) { + // we're past the challenge window and the data is not available + return nil, ErrMissingPastWindow + } else { + // continue syncing challenges hoping it eventually is challenged and resolved + if err := d.LookAhead(ctx, l1); err != nil { + return nil, err + } + return nil, ErrPendingChallenge + } + } + } + + return data, nil +} + +// isExpired returns whether the given expiration block number is lower or equal to the current head +func (d *DA) isExpired(bn uint64) bool { + return d.origin.Number >= bn +} + +// AdvanceL1Origin syncs any challenge events included in the l1 block, expires any active challenges +// after the new resolveWindow, computes and signals the new finalized head and sets the l1 block +// as the new head for tracking challenges. If forwards an error if any new challenge have expired to +// trigger a derivation reset. +func (d *DA) AdvanceL1Origin(ctx context.Context, l1 L1Fetcher, block eth.BlockID) error { + // do not repeat for the same origin + if block.Number <= d.origin.Number { + return nil + } + // sync challenges for the given block ID + if err := d.LoadChallengeEvents(ctx, l1, block); err != nil { + return err + } + // advance challenge window, computing the finalized head + bn, err := d.state.ExpireChallenges(block.Number) + if err != nil { + // warn the reset function not to clear the state + d.resetting = true + return err + } + + // finalized head signal is called only when the finalized head number increases + // and the l1 finalized head ahead of the DA finalized head. + if bn > d.finalizedHead.Number { + ref, err := l1.L1BlockRefByNumber(ctx, bn) + if err != nil { + return err + } + d.metrics.RecordChallengesHead("finalized", bn) + + // keep track of finalized had so it can be picked up by the + // l1 finalization signal + d.finalizedHead = ref + } + d.origin = block + d.metrics.RecordChallengesHead("latest", d.origin.Number) + + d.log.Info("processed plasma l1 origin", "origin", block, "next-finalized", bn, "finalized", d.finalizedHead.Number, "l1-finalize", d.l1FinalizedHead.Number) + return nil +} + +// LoadChallengeEvents fetches the l1 block receipts and updates the challenge status +func (d *DA) LoadChallengeEvents(ctx context.Context, l1 L1Fetcher, block eth.BlockID) error { + // filter any challenge event logs in the block + logs, err := d.fetchChallengeLogs(ctx, l1, block) if err != nil { - return Input{}, err + return err + } + + for _, log := range logs { + i := log.TxIndex + status, comm, err := d.decodeChallengeStatus(log) + if err != nil { + d.log.Error("failed to decode challenge event", "block", block.Number, "tx", i, "log", log.Index, "err", err) + continue + } + switch status { + case ChallengeResolved: + // cached with input resolution call so not expensive + _, txs, err := l1.InfoAndTxsByHash(ctx, block.Hash) + if err != nil { + d.log.Error("failed to fetch l1 block", "block", block.Number, "err", err) + continue + } + // avoid panic in black swan case of faulty rpc + if uint(len(txs)) <= i { + d.log.Error("tx/receipt mismatch in InfoAndTxsByHash") + continue + } + // select the transaction corresponding to the receipt + tx := txs[i] + // txs and receipts must be in the same order + if tx.Hash() != log.TxHash { + d.log.Error("tx hash mismatch", "block", block.Number, "txIdx", i, "log", log.Index, "txHash", tx.Hash(), "receiptTxHash", log.TxHash) + continue + } + // Decode the input from resolver tx calldata + input, err := DecodeResolvedInput(tx.Data()) + if err != nil { + d.log.Error("failed to decode resolved input", "block", block.Number, "txIdx", i, "err", err) + continue + } + if err := comm.Verify(input); err != nil { + d.log.Error("failed to verify commitment", "block", block.Number, "txIdx", i, "err", err) + continue + } + d.log.Debug("challenge resolved", "block", block, "txIdx", i) + d.state.SetResolvedChallenge(comm.Encode(), input, log.BlockNumber) + case ChallengeActive: + d.log.Info("detected new active challenge", "block", block) + d.state.SetActiveChallenge(comm.Encode(), log.BlockNumber, d.cfg.ResolveWindow) + default: + d.log.Warn("skipping unknown challenge status", "block", block.Number, "tx", i, "log", log.Index, "status", status) + } + } + return nil +} + +// fetchChallengeLogs returns logs for challenge events if any for the given block +func (d *DA) fetchChallengeLogs(ctx context.Context, l1 L1Fetcher, block eth.BlockID) ([]*types.Log, error) { //cached with deposits events call so not expensive + var logs []*types.Log + _, receipts, err := l1.FetchReceipts(ctx, block.Hash) + if err != nil { + return nil, err + } + d.log.Info("loading challenges", "epoch", block.Number, "numReceipts", len(receipts)) + for _, rec := range receipts { + // skip error logs + if rec.Status != types.ReceiptStatusSuccessful { + continue + } + for _, log := range rec.Logs { + if log.Address == d.cfg.DAChallengeContractAddress && len(log.Topics) > 0 && log.Topics[0] == ChallengeStatusEventABIHash { + logs = append(logs, log) + } + } + } + + return logs, nil +} + +// decodeChallengeStatus decodes and validates a challenge event from a transaction log, returning the associated commitment bytes. +func (d *DA) decodeChallengeStatus(log *types.Log) (ChallengeStatus, Keccak256Commitment, error) { + event, err := DecodeChallengeStatusEvent(log) + if err != nil { + return 0, nil, err + } + d.log.Debug("decoded challenge status event", "log", log, "event", event) + comm, err := DecodeKeccak256(event.ChallengedCommitment) + if err != nil { + return 0, nil, err + } + + bn := event.ChallengedBlockNumber.Uint64() + // if we are not tracking the commitment from processing the l1 origin in derivation, + // i.e. someone challenged garbage data, this challenge is invalid. + if !d.state.IsTracking(comm.Encode(), bn) { + return 0, nil, fmt.Errorf("%w: %x at block %d", ErrInvalidChallenge, comm.Encode(), bn) + } + return ChallengeStatus(event.Status), comm, nil +} + +var ( + ChallengeStatusEventName = "ChallengeStatusChanged" + ChallengeStatusEventABI = "ChallengeStatusChanged(uint256,bytes,uint8)" + ChallengeStatusEventABIHash = crypto.Keccak256Hash([]byte(ChallengeStatusEventABI)) +) + +// DecodeChallengeStatusEvent decodes the challenge status event from the log data and the indexed challenged +// hash and block number from the topics. +func DecodeChallengeStatusEvent(log *types.Log) (*bindings.DataAvailabilityChallengeChallengeStatusChanged, error) { + // abi lazy loaded, cached after decoded once + dacAbi, err := bindings.DataAvailabilityChallengeMetaData.GetAbi() + if err != nil { + return nil, err + } + var event bindings.DataAvailabilityChallengeChallengeStatusChanged + err = dacAbi.UnpackIntoInterface(&event, ChallengeStatusEventName, log.Data) + if err != nil { + return nil, err + } + var indexed abi.Arguments + for _, arg := range dacAbi.Events[ChallengeStatusEventName].Inputs { + if arg.Indexed { + indexed = append(indexed, arg) + } + } + if err := abi.ParseTopics(&event, indexed, log.Topics[1:]); err != nil { + return nil, err + } + return &event, nil +} + +// DecodeResolvedInput decodes the preimage bytes from the tx input data. +func DecodeResolvedInput(data []byte) ([]byte, error) { + dacAbi, _ := bindings.DataAvailabilityChallengeMetaData.GetAbi() + + args := make(map[string]interface{}) + err := dacAbi.Methods["resolve"].Inputs.UnpackIntoMap(args, data[4:]) + if err != nil { + return nil, err + } + rd, ok := args["resolveData"].([]byte) + if !ok || len(rd) == 0 { + return nil, fmt.Errorf("invalid resolve data") } - return Input{Data: data}, nil + return rd, nil } diff --git a/op-plasma/damgr_test.go b/op-plasma/damgr_test.go new file mode 100644 index 000000000000..b80b2a33046f --- /dev/null +++ b/op-plasma/damgr_test.go @@ -0,0 +1,304 @@ +package plasma + +import ( + "context" + "math/big" + "math/rand" + "testing" + + "github.com/ethereum-optimism/optimism/op-service/eth" + "github.com/ethereum-optimism/optimism/op-service/testlog" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/log" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" +) + +func RandomData(rng *rand.Rand, size int) []byte { + out := make([]byte, size) + rng.Read(out) + return out +} + +// TestDAChallengeState is a simple test with small values to verify the finalized head logic +func TestDAChallengeState(t *testing.T) { + logger := testlog.Logger(t, log.LvlDebug) + + rng := rand.New(rand.NewSource(1234)) + state := NewState(logger, &NoopMetrics{}) + + i := uint64(1) + + challengeWindow := uint64(6) + resolveWindow := uint64(6) + + // track commitments in the first 10 blocks + for ; i < 10; i++ { + // this is akin to stepping the derivation pipeline through a range a blocks each with a commitment + state.SetInputCommitment(RandomData(rng, 32), i, challengeWindow) + } + + // blocks are finalized after the challenge window expires + bn, err := state.ExpireChallenges(10) + require.NoError(t, err) + // finalized head = 10 - 6 = 4 + require.Equal(t, uint64(4), bn) + + // track the next commitment and mark it as challenged + c := RandomData(rng, 32) + // add input commitment at block i = 10 + state.SetInputCommitment(c, 10, challengeWindow) + // i+4 is the block at which it was challenged + state.SetActiveChallenge(c, 14, resolveWindow) + + for j := i + 1; j < 18; j++ { + // continue walking the pipeline through some more blocks with commitments + state.SetInputCommitment(RandomData(rng, 32), j, challengeWindow) + } + + // finalized l1 origin should not extend past the resolve window + bn, err = state.ExpireChallenges(18) + require.NoError(t, err) + // finalized is active_challenge_block - 1 = 10 - 1 and cannot move until the challenge expires + require.Equal(t, uint64(9), bn) + + // walk past the resolve window + for j := uint64(18); j < 22; j++ { + state.SetInputCommitment(RandomData(rng, 32), j, challengeWindow) + } + + // no more active challenges, the finalized head can catch up to the challenge window + bn, err = state.ExpireChallenges(22) + require.ErrorIs(t, err, ErrReorgRequired) + // finalized head is now 22 - 6 = 16 + require.Equal(t, uint64(16), bn) + + // cleanup state we don't need anymore + state.Prune(22) + // now if we expire the challenges again, it won't request a reorg again + bn, err = state.ExpireChallenges(22) + require.NoError(t, err) + // finalized head hasn't moved + require.Equal(t, uint64(16), bn) + + // add one more commitment and challenge it + c = RandomData(rng, 32) + state.SetInputCommitment(c, 22, challengeWindow) + // challenge 3 blocks after + state.SetActiveChallenge(c, 25, resolveWindow) + + // exceed the challenge window with more commitments + for j := uint64(23); j < 30; j++ { + state.SetInputCommitment(RandomData(rng, 32), j, challengeWindow) + } + + // finalized head should not extend past the resolve window + bn, err = state.ExpireChallenges(30) + require.NoError(t, err) + // finalized head is stuck waiting for resolve window + require.Equal(t, uint64(21), bn) + + input := RandomData(rng, 100) + // resolve the challenge + state.SetResolvedChallenge(c, input, 30) + + // finalized head catches up + bn, err = state.ExpireChallenges(31) + require.NoError(t, err) + // finalized head is now 31 - 6 = 25 + require.Equal(t, uint64(25), bn) + + // the resolved input is also stored + storedInput, err := state.GetResolvedInput(c) + require.NoError(t, err) + require.Equal(t, input, storedInput) +} + +// TestExpireChallenges expires challenges and prunes the state for longer windows +// with commitments every 6 blocks. +func TestExpireChallenges(t *testing.T) { + logger := testlog.Logger(t, log.LvlDebug) + + rng := rand.New(rand.NewSource(1234)) + state := NewState(logger, &NoopMetrics{}) + + comms := make(map[uint64][]byte) + + i := uint64(3713854) + + var finalized uint64 + + challengeWindow := uint64(90) + resolveWindow := uint64(90) + + // increment new commitments every 6 blocks + for ; i < 3713948; i += 6 { + comm := RandomData(rng, 32) + comms[i] = comm + logger.Info("set commitment", "block", i) + cm := state.GetOrTrackChallenge(comm, i, challengeWindow) + require.NotNil(t, cm) + + bn, err := state.ExpireChallenges(i) + logger.Info("expire challenges", "finalized head", bn, "err", err) + + // only update finalized head if it has moved + if bn > finalized { + finalized = bn + // prune unused state + state.Prune(bn) + } + } + + // activate a couple of subsequent challenges + state.SetActiveChallenge(comms[3713926], 3713948, resolveWindow) + + state.SetActiveChallenge(comms[3713932], 3713950, resolveWindow) + + // continue incrementing commitments + for ; i < 3714038; i += 6 { + comm := RandomData(rng, 32) + comms[i] = comm + logger.Info("set commitment", "block", i) + cm := state.GetOrTrackChallenge(comm, i, challengeWindow) + require.NotNil(t, cm) + + bn, err := state.ExpireChallenges(i) + logger.Info("expire challenges", "expired", bn, "err", err) + + if bn > finalized { + finalized = bn + state.Prune(bn) + } + + } + + // finalized head does not move as it expires previously seen blocks + bn, err := state.ExpireChallenges(3714034) + require.NoError(t, err) + require.Equal(t, uint64(3713920), bn) + + bn, err = state.ExpireChallenges(3714035) + require.NoError(t, err) + require.Equal(t, uint64(3713920), bn) + + bn, err = state.ExpireChallenges(3714036) + require.NoError(t, err) + require.Equal(t, uint64(3713920), bn) + + bn, err = state.ExpireChallenges(3714037) + require.NoError(t, err) + require.Equal(t, uint64(3713920), bn) + + // lastly we get to the resolve window and trigger a reorg + _, err = state.ExpireChallenges(3714038) + require.ErrorIs(t, err, ErrReorgRequired) + + // this is simulating a pipeline reset where it walks back challenge + resolve window + for i := uint64(3713854); i < 3714044; i += 6 { + cm := state.GetOrTrackChallenge(comms[i], i, challengeWindow) + require.NotNil(t, cm) + + // check that the challenge status was updated to expired + if i == 3713926 { + require.Equal(t, ChallengeExpired, cm.challengeStatus) + } + } + + bn, err = state.ExpireChallenges(3714038) + require.NoError(t, err) + + // finalized at last + require.Equal(t, uint64(3713926), bn) +} + +// cannot import from testutils at this time because of import cycle +type mockL1Fetcher struct { + mock.Mock +} + +func (m *mockL1Fetcher) InfoAndTxsByHash(ctx context.Context, hash common.Hash) (eth.BlockInfo, types.Transactions, error) { + out := m.Mock.Called(hash) + return out.Get(0).(eth.BlockInfo), out.Get(1).(types.Transactions), out.Error(2) +} + +func (m *mockL1Fetcher) ExpectInfoAndTxsByHash(hash common.Hash, info eth.BlockInfo, transactions types.Transactions, err error) { + m.Mock.On("InfoAndTxsByHash", hash).Once().Return(info, transactions, err) +} + +func (m *mockL1Fetcher) FetchReceipts(ctx context.Context, blockHash common.Hash) (eth.BlockInfo, types.Receipts, error) { + out := m.Mock.Called(blockHash) + return *out.Get(0).(*eth.BlockInfo), out.Get(1).(types.Receipts), out.Error(2) +} + +func (m *mockL1Fetcher) ExpectFetchReceipts(hash common.Hash, info eth.BlockInfo, receipts types.Receipts, err error) { + m.Mock.On("FetchReceipts", hash).Once().Return(&info, receipts, err) +} + +func (m *mockL1Fetcher) L1BlockRefByNumber(ctx context.Context, num uint64) (eth.L1BlockRef, error) { + out := m.Mock.Called(num) + return out.Get(0).(eth.L1BlockRef), out.Error(1) +} + +func (m *mockL1Fetcher) ExpectL1BlockRefByNumber(num uint64, ref eth.L1BlockRef, err error) { + m.Mock.On("L1BlockRefByNumber", num).Once().Return(ref, err) +} + +func TestFilterInvalidBlockNumber(t *testing.T) { + logger := testlog.Logger(t, log.LevelDebug) + ctx := context.Background() + + l1F := &mockL1Fetcher{} + + storage := NewMockDAClient(logger) + + daddr := common.HexToAddress("0x978e3286eb805934215a88694d80b09aded68d90") + pcfg := Config{ + ChallengeWindow: 90, ResolveWindow: 90, DAChallengeContractAddress: daddr, + } + + bn := uint64(19) + bhash := common.HexToHash("0xd438144ffab918b1349e7cd06889c26800c26d8edc34d64f750e3e097166a09c") + + da := NewPlasmaDAWithStorage(logger, pcfg, storage, &NoopMetrics{}) + + receipts := types.Receipts{&types.Receipt{ + Type: 2, + Status: 1, + Logs: []*types.Log{ + { + BlockNumber: bn, + Address: daddr, + Topics: []common.Hash{ + common.HexToHash("0xa448afda7ea1e3a7a10fcab0c29fe9a9dd85791503bf0171f281521551c7ec05"), + }, + }, + { + BlockNumber: bn, + Address: daddr, + Topics: []common.Hash{ + common.HexToHash("0xc5d8c630ba2fdacb1db24c4599df78c7fb8cf97b5aecde34939597f6697bb1ad"), + common.HexToHash("0x000000000000000000000000000000000000000000000000000000000000000e"), + }, + Data: common.FromHex("0x00000000000000000000000000000000000000000000000000000000000000400000000000000000000000000000000000000000000000000000000000000001000000000000000000000000000000000000000000000000000000000000002100eed82c1026bdd0f23461dd6ca515ef677624e63e6fc0ff91e3672af8eddf579d00000000000000000000000000000000000000000000000000000000000000"), + }, + }, + BlockNumber: big.NewInt(int64(bn)), + }} + id := eth.BlockID{ + Number: bn, + Hash: bhash, + } + l1F.ExpectFetchReceipts(bhash, nil, receipts, nil) + + // we get 1 logs successfully filtered as valid status updated contract event + logs, err := da.fetchChallengeLogs(ctx, l1F, id) + require.NoError(t, err) + require.Equal(t, len(logs), 1) + + _, _, err = da.decodeChallengeStatus(logs[0]) + // challenge was successfully decoded but is invalid because it does not belong + // to any known commitment previously submitted onchain. + require.ErrorIs(t, err, ErrInvalidChallenge) +} diff --git a/op-plasma/damock.go b/op-plasma/damock.go index 3b0f40fbff66..f21e80df25e0 100644 --- a/op-plasma/damock.go +++ b/op-plasma/damock.go @@ -3,8 +3,9 @@ package plasma import ( "context" "errors" + "io" - "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum-optimism/optimism/op-service/eth" "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/ethdb/memorydb" "github.com/ethereum/go-ethereum/log" @@ -24,17 +25,17 @@ func NewMockDAClient(log log.Logger) *MockDAClient { } } -func (c *MockDAClient) GetInput(ctx context.Context, key []byte) ([]byte, error) { - bytes, err := c.store.Get(key) +func (c *MockDAClient) GetInput(ctx context.Context, key Keccak256Commitment) ([]byte, error) { + bytes, err := c.store.Get(key.Encode()) if err != nil { return nil, ErrNotFound } return bytes, nil } -func (c *MockDAClient) SetInput(ctx context.Context, data []byte) ([]byte, error) { - key := crypto.Keccak256(data) - return key, c.store.Put(key, data) +func (c *MockDAClient) SetInput(ctx context.Context, data []byte) (Keccak256Commitment, error) { + key := Keccak256(data) + return key, c.store.Put(key.Encode(), data) } func (c *MockDAClient) DeleteData(key []byte) error { @@ -48,7 +49,7 @@ type DAErrFaker struct { setInputErr error } -func (f *DAErrFaker) GetInput(ctx context.Context, key []byte) ([]byte, error) { +func (f *DAErrFaker) GetInput(ctx context.Context, key Keccak256Commitment) ([]byte, error) { if err := f.getInputErr; err != nil { f.getInputErr = nil return nil, err @@ -56,7 +57,7 @@ func (f *DAErrFaker) GetInput(ctx context.Context, key []byte) ([]byte, error) { return f.Client.GetInput(ctx, key) } -func (f *DAErrFaker) SetPreImage(ctx context.Context, data []byte) ([]byte, error) { +func (f *DAErrFaker) SetInput(ctx context.Context, data []byte) (Keccak256Commitment, error) { if err := f.setInputErr; err != nil { f.setInputErr = nil return nil, err @@ -71,3 +72,28 @@ func (f *DAErrFaker) ActGetPreImageFail() { func (f *DAErrFaker) ActSetPreImageFail() { f.setInputErr = errors.New("set input failed") } + +var Disabled = &PlasmaDisabled{} + +var ErrNotEnabled = errors.New("plasma not enabled") + +// PlasmaDisabled is a noop plasma DA implementation for stubbing. +type PlasmaDisabled struct{} + +func (d *PlasmaDisabled) GetInput(ctx context.Context, l1 L1Fetcher, commitment Keccak256Commitment, blockId eth.BlockID) (eth.Data, error) { + return nil, ErrNotEnabled +} + +func (d *PlasmaDisabled) Reset(ctx context.Context, base eth.L1BlockRef, baseCfg eth.SystemConfig) error { + return io.EOF +} + +func (d *PlasmaDisabled) Finalize(ref eth.L1BlockRef) { +} + +func (d *PlasmaDisabled) OnFinalizedHeadSignal(f HeadSignalFn) { +} + +func (d *PlasmaDisabled) AdvanceL1Origin(ctx context.Context, l1 L1Fetcher, blockId eth.BlockID) error { + return ErrNotEnabled +} diff --git a/op-plasma/dastate.go b/op-plasma/dastate.go new file mode 100644 index 000000000000..465eefe9ff9f --- /dev/null +++ b/op-plasma/dastate.go @@ -0,0 +1,191 @@ +package plasma + +import ( + "container/heap" + "errors" + + "github.com/ethereum/go-ethereum/log" +) + +// ErrReorgRequired is returned when a commitment was derived but for which the challenge expired. +// This requires a reorg to rederive without the input even if the input was previously available. +var ErrReorgRequired = errors.New("reorg required") + +type ChallengeStatus uint8 + +const ( + ChallengeUnititialized ChallengeStatus = iota + ChallengeActive + ChallengeResolved + ChallengeExpired +) + +// Commitment keeps track of the onchain state of an input commitment. +type Commitment struct { + key []byte // the encoded commitment + input []byte // the input itself if it was resolved onchain + expiresAt uint64 // represents the block number after which the commitment can no longer be challenged or if challenged no longer be resolved. + blockNumber uint64 // block where the commitment is included as calldata to the batcher inbox + challengeStatus ChallengeStatus // latest known challenge status +} + +// CommQueue is a queue of commitments ordered by block number. +type CommQueue []*Commitment + +var _ heap.Interface = (*CommQueue)(nil) + +func (c CommQueue) Len() int { return len(c) } + +func (c CommQueue) Less(i, j int) bool { + return c[i].blockNumber < c[j].blockNumber +} + +func (c CommQueue) Swap(i, j int) { + c[i], c[j] = c[j], c[i] +} + +func (c *CommQueue) Push(x any) { + *c = append(*c, x.(*Commitment)) +} + +func (c *CommQueue) Pop() any { + old := *c + n := len(old) + item := old[n-1] + old[n-1] = nil // avoid memory leak + *c = old[0 : n-1] + return item +} + +// State tracks the commitment and their challenges in order of l1 inclusion. +type State struct { + comms CommQueue + commsByKey map[string]*Commitment + log log.Logger + metrics Metricer +} + +func NewState(log log.Logger, m Metricer) *State { + return &State{ + comms: make(CommQueue, 0), + commsByKey: make(map[string]*Commitment), + log: log, + metrics: m, + } +} + +// IsTracking returns whether we currently have a commitment for the given key. +func (s *State) IsTracking(key []byte, bn uint64) bool { + if c, ok := s.commsByKey[string(key)]; ok { + return c.blockNumber == bn + } + return false +} + +// SetActiveChallenge switches the state of a given commitment to active challenge. Noop if +// the commitment is not tracked as we don't want to track challenges for invalid commitments. +func (s *State) SetActiveChallenge(key []byte, challengedAt uint64, resolveWindow uint64) { + if c, ok := s.commsByKey[string(key)]; ok { + c.expiresAt = challengedAt + resolveWindow + c.challengeStatus = ChallengeActive + s.metrics.RecordActiveChallenge(c.blockNumber, challengedAt, key) + } +} + +// SetResolvedChallenge switches the state of a given commitment to resolved. Noop if +// the commitment is not tracked as we don't want to track challenges for invalid commitments. +// The input posted onchain is stored in the state for later retrieval. +func (s *State) SetResolvedChallenge(key []byte, input []byte, resolvedAt uint64) { + if c, ok := s.commsByKey[string(key)]; ok { + c.challengeStatus = ChallengeResolved + c.expiresAt = resolvedAt + c.input = input + s.metrics.RecordResolvedChallenge(key) + } +} + +// SetInputCommitment initializes a new commitment and adds it to the state. +// This is called when we see a commitment during derivation so we can refer to it later in +// challenges. +func (s *State) SetInputCommitment(key []byte, committedAt uint64, challengeWindow uint64) *Commitment { + c := &Commitment{ + key: key, + expiresAt: committedAt + challengeWindow, + blockNumber: committedAt, + } + s.log.Debug("append commitment", "expiresAt", c.expiresAt, "blockNumber", c.blockNumber) + heap.Push(&s.comms, c) + s.commsByKey[string(key)] = c + + return c +} + +// GetOrTrackChallenge returns the commitment for the given key if it is already tracked, or +// initializes a new commitment and adds it to the state. +func (s *State) GetOrTrackChallenge(key []byte, bn uint64, challengeWindow uint64) *Commitment { + if c, ok := s.commsByKey[string(key)]; ok { + return c + } + return s.SetInputCommitment(key, bn, challengeWindow) +} + +// GetResolvedInput returns the input bytes if the commitment was resolved onchain. +func (s *State) GetResolvedInput(key []byte) ([]byte, error) { + if c, ok := s.commsByKey[string(key)]; ok { + return c.input, nil + } + return nil, errors.New("commitment not found") +} + +// ExpireChallenges walks back from the oldest commitment to find the latest l1 origin +// for which input data can no longer be challenged. It also marks any active challenges +// as expired based on the new latest l1 origin. If any active challenges are expired +// it returns an error to signal that a derivation pipeline reset is required. +func (s *State) ExpireChallenges(bn uint64) (uint64, error) { + latest := uint64(0) + var err error + for i := 0; i < len(s.comms); i++ { + c := s.comms[i] + if c.expiresAt <= bn && c.blockNumber > latest { + latest = c.blockNumber + + if c.challengeStatus == ChallengeActive { + c.challengeStatus = ChallengeExpired + s.metrics.RecordExpiredChallenge(c.key) + err = ErrReorgRequired + } + } else { + break + } + } + return latest, err +} + +// safely prune in case reset is deeper than the finalized l1 +const commPruneMargin = 200 + +// Prune removes commitments once they can no longer be challenged or resolved. +func (s *State) Prune(bn uint64) { + if bn > commPruneMargin { + bn -= commPruneMargin + } else { + bn = 0 + } + if s.comms.Len() == 0 { + return + } + // only first element is the highest priority (lowest block number). + // next highest priority is swapped to the first position after a Pop. + for s.comms.Len() > 0 && s.comms[0].blockNumber < bn { + c := heap.Pop(&s.comms).(*Commitment) + s.log.Debug("prune commitment", "expiresAt", c.expiresAt, "blockNumber", c.blockNumber) + delete(s.commsByKey, string(c.key)) + } +} + +// In case of L1 reorg, state should be cleared so we can sync all the challenge events +// from scratch. +func (s *State) Reset() { + s.comms = s.comms[:0] + clear(s.commsByKey) +} diff --git a/op-plasma/metrics.go b/op-plasma/metrics.go new file mode 100644 index 000000000000..9843e91f3863 --- /dev/null +++ b/op-plasma/metrics.go @@ -0,0 +1,73 @@ +package plasma + +import ( + "github.com/ethereum-optimism/optimism/op-service/metrics" + "github.com/prometheus/client_golang/prometheus" +) + +type Metricer interface { + RecordActiveChallenge(commBlock uint64, startBlock uint64, hash []byte) + RecordResolvedChallenge(hash []byte) + RecordExpiredChallenge(hash []byte) + RecordChallengesHead(name string, num uint64) + RecordStorageError() +} + +type Metrics struct { + ChallengesStatus *prometheus.GaugeVec + ChallengesHead *prometheus.GaugeVec + + StorageErrors *metrics.Event +} + +var _ Metricer = (*Metrics)(nil) + +func MakeMetrics(ns string, factory metrics.Factory) *Metrics { + return &Metrics{ + ChallengesStatus: factory.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: ns, + Name: "challenges_status", + Help: "Gauge representing the status of challenges synced", + }, []string{"status"}), + ChallengesHead: factory.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: ns, + Name: "challenges_head", + Help: "Gauge representing the l1 heads of challenges synced", + }, []string{"type"}), + StorageErrors: metrics.NewEvent(factory, ns, "", "storage_errors", "errors when fetching or uploading to storage service"), + } +} + +func (m *Metrics) RecordChallenge(status string) { + m.ChallengesStatus.WithLabelValues(status).Inc() +} + +// RecordActiveChallenge records when a commitment is challenged including the block where the commitment +// is included, the block where the commitment was challenged and the commitment hash. +func (m *Metrics) RecordActiveChallenge(commBlock uint64, startBlock uint64, hash []byte) { + m.RecordChallenge("active") +} + +func (m *Metrics) RecordResolvedChallenge(hash []byte) { + m.RecordChallenge("resolved") +} + +func (m *Metrics) RecordExpiredChallenge(hash []byte) { + m.RecordChallenge("expired") +} + +func (m *Metrics) RecordStorageError() { + m.StorageErrors.Record() +} + +func (m *Metrics) RecordChallengesHead(name string, num uint64) { + m.ChallengesHead.WithLabelValues(name).Set(float64(num)) +} + +type NoopMetrics struct{} + +func (m *NoopMetrics) RecordActiveChallenge(commBlock uint64, startBlock uint64, hash []byte) {} +func (m *NoopMetrics) RecordResolvedChallenge(hash []byte) {} +func (m *NoopMetrics) RecordExpiredChallenge(hash []byte) {} +func (m *NoopMetrics) RecordChallengesHead(name string, num uint64) {} +func (m *NoopMetrics) RecordStorageError() {} diff --git a/op-plasma/params.go b/op-plasma/params.go new file mode 100644 index 000000000000..3377d981cd00 --- /dev/null +++ b/op-plasma/params.go @@ -0,0 +1,6 @@ +package plasma + +// Max input size ensures the canonical chain cannot include input batches too large to +// challenge in the Data Availability Challenge contract. Value in number of bytes. +// This value can only be changed in a hard fork. +const MaxInputSize = 130672 diff --git a/op-program/client/driver/driver.go b/op-program/client/driver/driver.go index 105128ac861f..95cba25037bb 100644 --- a/op-program/client/driver/driver.go +++ b/op-program/client/driver/driver.go @@ -11,6 +11,7 @@ import ( "github.com/ethereum-optimism/optimism/op-node/rollup" "github.com/ethereum-optimism/optimism/op-node/rollup/derive" "github.com/ethereum-optimism/optimism/op-node/rollup/sync" + plasma "github.com/ethereum-optimism/optimism/op-plasma" "github.com/ethereum-optimism/optimism/op-service/eth" "github.com/ethereum/go-ethereum/log" ) @@ -40,7 +41,7 @@ type Driver struct { func NewDriver(logger log.Logger, cfg *rollup.Config, l1Source derive.L1Fetcher, l1BlobsSource derive.L1BlobsFetcher, l2Source L2Source, targetBlockNum uint64) *Driver { engine := derive.NewEngineController(l2Source, logger, metrics.NoopMetrics, cfg, sync.CLSync) - pipeline := derive.NewDerivationPipeline(logger, cfg, l1Source, l1BlobsSource, nil, l2Source, engine, metrics.NoopMetrics, &sync.Config{}, safedb.Disabled) + pipeline := derive.NewDerivationPipeline(logger, cfg, l1Source, l1BlobsSource, plasma.Disabled, l2Source, engine, metrics.NoopMetrics, &sync.Config{}, safedb.Disabled) pipeline.Reset() return &Driver{ logger: logger, diff --git a/packages/contracts-bedrock/deploy-config/devnetL1-template.json b/packages/contracts-bedrock/deploy-config/devnetL1-template.json index 371704514ac0..60b585d55ab2 100644 --- a/packages/contracts-bedrock/deploy-config/devnetL1-template.json +++ b/packages/contracts-bedrock/deploy-config/devnetL1-template.json @@ -63,8 +63,8 @@ "respectedGameType": 0, "useFaultProofs": false, "usePlasma": false, - "daChallengeWindow": 1000, - "daResolveWindow": 1000, + "daChallengeWindow": 6, + "daResolveWindow": 6, "daBondSize": 1000000, "daResolverRefundPercentage": 0 }