From 74046dda4fcded1d70aec0fd10e4667c4f97d4af Mon Sep 17 00:00:00 2001 From: jonastheis <4181434+jonastheis@users.noreply.github.com> Date: Wed, 29 Jan 2025 10:30:58 +0800 Subject: [PATCH] implement access to V1 and V2 messages and replace usage so that V1 is used before EuclidV2 fork and V2 afterward --- core/block_validator.go | 46 +++++++- core/blockchain.go | 12 ++ core/rawdb/accessors_l1_message.go | 140 +++++++++++++++++++++++- core/rawdb/accessors_l1_message_test.go | 6 +- core/rawdb/schema.go | 1 + core/types/block.go | 16 +++ miner/scroll_worker.go | 8 +- rollup/sync_service/bridge_client.go | 22 ++-- rollup/sync_service/sync_service.go | 10 +- 9 files changed, 238 insertions(+), 23 deletions(-) diff --git a/core/block_validator.go b/core/block_validator.go index 9eb3accacdb4..fab1c4060b73 100644 --- a/core/block_validator.go +++ b/core/block_validator.go @@ -139,8 +139,52 @@ func (v *BlockValidator) ValidateL1Messages(block *types.Block) error { queueIndex := *nextQueueIndex L1SectionOver := false - it := rawdb.IterateL1MessagesFrom(v.bc.db, queueIndex) + // From EuclidV2 onwards there can't be any skipped L1 messages, and we use a different L1MessageQueueV2. + if v.config.IsEuclidV2(block.Time()) { + it := rawdb.IterateL1MessagesV2From(v.bc.db, queueIndex) + for _, tx := range block.Transactions() { + if !tx.IsL1MessageTx() { + L1SectionOver = true + continue // we do not verify L2 transactions here + } + + // check that L1 messages are before L2 transactions + if L1SectionOver { + return consensus.ErrInvalidL1MessageOrder + } + + // queue index must be equal to the expected value + txQueueIndex := tx.AsL1MessageTx().QueueIndex + if txQueueIndex != queueIndex { + return consensus.ErrInvalidL1MessageOrder + } + + if exists := it.Next(); !exists { + if err := it.Error(); err != nil { + log.Error("Unexpected DB error in ValidateL1Messages", "err", err, "queueIndex", txQueueIndex) + } + // the message in this block is not available in our local db. + // we'll reprocess this block at a later time. + return consensus.ErrMissingL1MessageData + } + + // check that the L1 message in the block is the same that we collected from L1 + msg := it.L1Message() + expectedHash := types.NewTx(&msg).Hash() + + if tx.Hash() != expectedHash { + return consensus.ErrUnknownL1Message + } + + // we expect L1 messages to be in order and contiguous + queueIndex++ + } + + return nil + } + + it := rawdb.IterateL1MessagesV1From(v.bc.db, queueIndex) for _, tx := range block.Transactions() { if !tx.IsL1MessageTx() { L1SectionOver = true diff --git a/core/blockchain.go b/core/blockchain.go index 57d82bc118a4..73e2a57fa21a 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -1853,6 +1853,18 @@ func (bc *BlockChain) BuildAndWriteBlock(parentBlock *types.Block, header *types l.BlockHash = blockHash } + // Make sure the block body is valid e.g. ordering of L1 messages is correct and continuous. + if err = bc.validator.ValidateBody(fullBlock); err != nil { + bc.reportBlock(fullBlock, receipts, err) + return NonStatTy, fmt.Errorf("error validating block body %d: %w", fullBlock.Number().Uint64(), err) + } + + // Double check: even though we just built the block, make sure it is valid. + if err = bc.validator.ValidateState(fullBlock, statedb, receipts, gasUsed); err != nil { + bc.reportBlock(fullBlock, receipts, err) + return NonStatTy, fmt.Errorf("error validating block %d: %w", fullBlock.Number().Uint64(), err) + } + return bc.writeBlockWithState(fullBlock, receipts, logs, statedb, false) } diff --git a/core/rawdb/accessors_l1_message.go b/core/rawdb/accessors_l1_message.go index 4ae27c7b8ff0..1cc2b535213d 100644 --- a/core/rawdb/accessors_l1_message.go +++ b/core/rawdb/accessors_l1_message.go @@ -141,9 +141,9 @@ type L1MessageIterator struct { maxQueueIndex uint64 } -// IterateL1MessagesFrom creates an L1MessageIterator that iterates over +// iterateL1MessagesFrom creates an L1MessageIterator that iterates over // all L1 message in the database starting at the provided enqueue index. -func IterateL1MessagesFrom(db ethdb.Database, fromQueueIndex uint64) L1MessageIterator { +func iterateL1MessagesFrom(db ethdb.Database, fromQueueIndex uint64) L1MessageIterator { start := encodeBigEndian(fromQueueIndex) it := db.NewIterator(l1MessagePrefix, start) keyLength := len(l1MessagePrefix) + 8 @@ -208,10 +208,64 @@ func (it *L1MessageIterator) Error() error { return it.inner.Error() } -// ReadL1MessagesFrom retrieves up to `maxCount` L1 messages starting at `startIndex`. -func ReadL1MessagesFrom(db ethdb.Database, startIndex, maxCount uint64) []types.L1MessageTx { +// L1MessageV1Iterator is a wrapper around L1MessageIterator that allows us to iterate over L1 messages V1. +type L1MessageV1Iterator struct { + db ethdb.Database + L1MessageIterator +} + +func IterateL1MessagesV1From(db ethdb.Database, fromQueueIndex uint64) L1MessageV1Iterator { + return L1MessageV1Iterator{ + db: db, + L1MessageIterator: iterateL1MessagesFrom(db, fromQueueIndex), + } +} + +func (it *L1MessageV1Iterator) Next() bool { + for it.L1MessageIterator.Next() { + // L1MessageV2StartIndex is the first queue index of L1 messages that are from L1MessageQueueV2. + // Therefore, we stop reading L1 messages V1 when we reach this index. + // We need to check in every iteration as the start index can be set in the meantime when we are reading L1 messages. + v2StartIndex := ReadL1MessageV2StartIndex(it.db) + if v2StartIndex != nil && it.QueueIndex() >= *v2StartIndex { + return false + } + return true + } + return false +} + +// L1MessageV2Iterator is a wrapper around L1MessageIterator that allows us to iterate over L1 messages V2. +type L1MessageV2Iterator struct { + v2StartIndex *uint64 + L1MessageIterator +} + +func IterateL1MessagesV2From(db ethdb.Database, fromQueueIndex uint64) L1MessageV2Iterator { + v2StartIndex := ReadL1MessageV2StartIndex(db) + + return L1MessageV2Iterator{ + v2StartIndex: v2StartIndex, + L1MessageIterator: iterateL1MessagesFrom(db, fromQueueIndex), + } +} + +func (it *L1MessageV2Iterator) Next() bool { + if it.v2StartIndex == nil { + return false + } + + for it.L1MessageIterator.Next() { + return it.QueueIndex() >= *it.v2StartIndex + } + + return false +} + +// ReadL1MessagesV1From retrieves up to `maxCount` L1 messages V1 starting at `startIndex`. +func ReadL1MessagesV1From(db ethdb.Database, startIndex, maxCount uint64) []types.L1MessageTx { msgs := make([]types.L1MessageTx, 0, maxCount) - it := IterateL1MessagesFrom(db, startIndex) + it := IterateL1MessagesV1From(db, startIndex) defer it.Release() index := startIndex @@ -223,7 +277,50 @@ func ReadL1MessagesFrom(db ethdb.Database, startIndex, maxCount uint64) []types. // sanity check if msg.QueueIndex != index { log.Crit( - "Unexpected QueueIndex in ReadL1MessagesFrom", + "Unexpected QueueIndex in ReadL1MessagesV1From", + "expected", index, + "got", msg.QueueIndex, + "startIndex", startIndex, + "maxCount", maxCount, + ) + } + + msgs = append(msgs, msg) + index += 1 + count -= 1 + + iteratorL1MessageSizeGauge.Update(int64(unsafe.Sizeof(msg) + uintptr(cap(msg.Data)))) + + if msg.QueueIndex == it.maxQueueIndex { + break + } + } + + if err := it.Error(); err != nil { + log.Crit("Failed to read L1 messages", "err", err) + } + + return msgs +} + +// ReadL1MessagesV2From retrieves up to `maxCount` L1 messages V2 starting at `startIndex`. +// If startIndex is smaller than L1MessageV2StartIndex, this function will return an empty slice. +func ReadL1MessagesV2From(db ethdb.Database, startIndex, maxCount uint64) []types.L1MessageTx { + msgs := make([]types.L1MessageTx, 0, maxCount) + + it := IterateL1MessagesV2From(db, startIndex) + defer it.Release() + + index := startIndex + count := maxCount + + for count > 0 && it.Next() { + msg := it.L1Message() + + // sanity check + if msg.QueueIndex != index { + log.Crit( + "Unexpected QueueIndex in ReadL1MessagesV1From", "expected", index, "got", msg.QueueIndex, "startIndex", startIndex, @@ -275,3 +372,34 @@ func ReadFirstQueueIndexNotInL2Block(db ethdb.Reader, l2BlockHash common.Hash) * queueIndex := binary.BigEndian.Uint64(data) return &queueIndex } + +// WriteL1MessageV2StartIndex writes the start index of L1 messages that are from L1MessageQueueV2. +func WriteL1MessageV2StartIndex(db ethdb.KeyValueWriter, queueIndex uint64) { + value := big.NewInt(0).SetUint64(queueIndex).Bytes() + + if err := db.Put(l1MessageV2StartIndex, value); err != nil { + log.Crit("Failed to update L1MessageV2 start index", "err", err) + } +} + +// ReadL1MessageV2StartIndex retrieves the start index of L1 messages that are from L1MessageQueueV2. +func ReadL1MessageV2StartIndex(db ethdb.Reader) *uint64 { + data, err := db.Get(l1MessageV2StartIndex) + if err != nil && isNotFoundErr(err) { + return nil + } + if err != nil { + log.Crit("Failed to read L1MessageV2 start index from database", "err", err) + } + if len(data) == 0 { + return nil + } + + number := new(big.Int).SetBytes(data) + if !number.IsUint64() { + log.Crit("Unexpected number for L1MessageV2 start index", "number", number) + } + + res := number.Uint64() + return &res +} diff --git a/core/rawdb/accessors_l1_message_test.go b/core/rawdb/accessors_l1_message_test.go index 314d6a604a6e..0759ce3e56d1 100644 --- a/core/rawdb/accessors_l1_message_test.go +++ b/core/rawdb/accessors_l1_message_test.go @@ -72,7 +72,7 @@ func TestIterateL1Message(t *testing.T) { t.Fatal("max index mismatch", "expected", 1000, "got", max) } - it := IterateL1MessagesFrom(db, 103) + it := iterateL1MessagesFrom(db, 103) defer it.Release() for ii := 2; ii < len(msgs); ii++ { @@ -104,7 +104,7 @@ func TestReadL1MessageTxRange(t *testing.T) { db := NewMemoryDatabase() WriteL1Messages(db, msgs) - got := ReadL1MessagesFrom(db, 101, 3) + got := ReadL1MessagesV1From(db, 101, 3) if len(got) != 3 { t.Fatal("Invalid length", "expected", 3, "got", len(got)) @@ -151,7 +151,7 @@ func TestIterationStopsAtMaxQueueIndex(t *testing.T) { WriteHighestSyncedQueueIndex(db, 102) // iteration should terminate at 102 and not read 103 - got := ReadL1MessagesFrom(db, 100, 10) + got := ReadL1MessagesV1From(db, 100, 10) if len(got) != 3 { t.Fatal("Invalid length", "expected", 3, "got", len(got)) diff --git a/core/rawdb/schema.go b/core/rawdb/schema.go index 47b29c77d840..f4a939bf09e1 100644 --- a/core/rawdb/schema.go +++ b/core/rawdb/schema.go @@ -109,6 +109,7 @@ var ( l1MessagePrefix = []byte("L1") // l1MessagePrefix + queueIndex (uint64 big endian) -> L1MessageTx firstQueueIndexNotInL2BlockPrefix = []byte("q") // firstQueueIndexNotInL2BlockPrefix + L2 block hash -> enqueue index highestSyncedQueueIndexKey = []byte("HighestSyncedQueueIndex") + l1MessageV2StartIndex = []byte("L1MessageV2StartIndex") // Scroll rollup event store rollupEventSyncedL1BlockNumberKey = []byte("R-LastRollupEventSyncedL1BlockNumber") diff --git a/core/types/block.go b/core/types/block.go index bee3dee5168c..56fa6deb6631 100644 --- a/core/types/block.go +++ b/core/types/block.go @@ -421,6 +421,22 @@ func (b *Block) ContainsL1Messages() bool { return false } +func (b *Block) NumL1Messages() int { + if l1MsgCount := b.l1MsgCount.Load(); l1MsgCount != nil { + return l1MsgCount.(int) + } + + count := 0 + for _, tx := range b.transactions { + if tx.IsL1MessageTx() { + count += 1 + } + } + + b.l1MsgCount.Store(count) + return count +} + // NumL1MessagesProcessed returns the number of L1 messages processed in this block. // This count includes both skipped and included messages. // `firstQueueIndex` is the first queue index available for this block to process. diff --git a/miner/scroll_worker.go b/miner/scroll_worker.go index a6aa7c21c207..af7cd9e238ce 100644 --- a/miner/scroll_worker.go +++ b/miner/scroll_worker.go @@ -438,7 +438,13 @@ func (w *worker) updateSnapshot() { func (w *worker) collectPendingL1Messages(startIndex uint64) []types.L1MessageTx { maxCount := w.chainConfig.Scroll.L1Config.NumL1MessagesPerBlock - return rawdb.ReadL1MessagesFrom(w.eth.ChainDb(), startIndex, maxCount) + + // If we are on EuclidV2, we need to read L1 messages from L1MessageQueueV2. + if w.chainConfig.IsEuclidV2(w.current.header.Time) { + return rawdb.ReadL1MessagesV2From(w.eth.ChainDb(), startIndex, maxCount) + } + + return rawdb.ReadL1MessagesV1From(w.eth.ChainDb(), startIndex, maxCount) } // newWork diff --git a/rollup/sync_service/bridge_client.go b/rollup/sync_service/bridge_client.go index 16f2fdc807db..31f795c3c180 100644 --- a/rollup/sync_service/bridge_client.go +++ b/rollup/sync_service/bridge_client.go @@ -69,10 +69,10 @@ func newBridgeClient(ctx context.Context, l1Client EthClient, l1ChainId uint64, // fetchMessagesInRange retrieves and parses all L1 messages between the // provided from and to L1 block numbers (inclusive). -func (c *BridgeClient) fetchMessagesInRange(ctx context.Context, from, to uint64) ([]types.L1MessageTx, error) { +func (c *BridgeClient) fetchMessagesInRange(ctx context.Context, from, to uint64) ([]types.L1MessageTx, []types.L1MessageTx, error) { log.Trace("BridgeClient fetchMessagesInRange", "fromBlock", from, "toBlock", to) - var msgs []types.L1MessageTx + var msgsV1, msgsV2 []types.L1MessageTx opts := bind.FilterOpts{ Start: from, @@ -85,7 +85,7 @@ func (c *BridgeClient) fetchMessagesInRange(ctx context.Context, from, to uint64 if !c.skipV1L1Messages { it, err := c.filtererV1.FilterQueueTransaction(&opts, nil, nil) if err != nil { - return nil, err + return nil, nil, err } for it.Next() { @@ -93,10 +93,10 @@ func (c *BridgeClient) fetchMessagesInRange(ctx context.Context, from, to uint64 log.Trace("Received new L1 QueueTransaction event from L1MessageQueueV1", "event", event) if !event.GasLimit.IsUint64() { - return nil, fmt.Errorf("invalid QueueTransaction event: QueueIndex = %v, GasLimit = %v", event.QueueIndex, event.GasLimit) + return nil, nil, fmt.Errorf("invalid QueueTransaction event: QueueIndex = %v, GasLimit = %v", event.QueueIndex, event.GasLimit) } - msgs = append(msgs, types.L1MessageTx{ + msgsV1 = append(msgsV1, types.L1MessageTx{ QueueIndex: event.QueueIndex, Gas: event.GasLimit.Uint64(), To: &event.Target, @@ -107,7 +107,7 @@ func (c *BridgeClient) fetchMessagesInRange(ctx context.Context, from, to uint64 } if err = it.Error(); err != nil { - return nil, err + return nil, nil, err } } @@ -115,7 +115,7 @@ func (c *BridgeClient) fetchMessagesInRange(ctx context.Context, from, to uint64 // L1MessageQueueV2 to enqueue L1 messages before EuclidV2. it, err := c.filtererV2.FilterQueueTransaction(&opts, nil, nil) if err != nil { - return nil, err + return nil, nil, err } for it.Next() { @@ -123,10 +123,10 @@ func (c *BridgeClient) fetchMessagesInRange(ctx context.Context, from, to uint64 log.Trace("Received new L1 QueueTransaction event from L1MessageQueueV2", "event", event) if !event.GasLimit.IsUint64() { - return nil, fmt.Errorf("invalid QueueTransaction event: QueueIndex = %v, GasLimit = %v", event.QueueIndex, event.GasLimit) + return nil, nil, fmt.Errorf("invalid QueueTransaction event: QueueIndex = %v, GasLimit = %v", event.QueueIndex, event.GasLimit) } - msgs = append(msgs, types.L1MessageTx{ + msgsV2 = append(msgsV2, types.L1MessageTx{ QueueIndex: event.QueueIndex, Gas: event.GasLimit.Uint64(), To: &event.Target, @@ -140,10 +140,10 @@ func (c *BridgeClient) fetchMessagesInRange(ctx context.Context, from, to uint64 } if err = it.Error(); err != nil { - return nil, err + return nil, nil, err } - return msgs, nil + return msgsV1, msgsV2, nil } func (c *BridgeClient) getLatestConfirmedBlockNumber(ctx context.Context) (uint64, error) { diff --git a/rollup/sync_service/sync_service.go b/rollup/sync_service/sync_service.go index 75add85e7c5b..28b288e03a2e 100644 --- a/rollup/sync_service/sync_service.go +++ b/rollup/sync_service/sync_service.go @@ -227,7 +227,7 @@ func (s *SyncService) fetchMessages() { to = latestConfirmed } - msgs, err := s.client.fetchMessagesInRange(s.ctx, from, to) + msgsV1, msgsV2, err := s.client.fetchMessagesInRange(s.ctx, from, to) if err != nil { // flush pending writes to database if from > 0 { @@ -237,6 +237,14 @@ func (s *SyncService) fetchMessages() { return } + // write start index of very first L1MessageV2 to database + if len(msgsV2) > 0 && rawdb.ReadL1MessageV2StartIndex(s.db) == nil { + firstL1MessageV2 := msgsV2[0] + rawdb.WriteL1MessageV2StartIndex(batchWriter, firstL1MessageV2.QueueIndex) + } + + msgs := append(msgsV1, msgsV2...) + if len(msgs) > 0 { log.Debug("Received new L1 events", "fromBlock", from, "toBlock", to, "count", len(msgs)) rawdb.WriteL1Messages(batchWriter, msgs) // collect messages in memory