Skip to content

Commit

Permalink
implement access to V1 and V2 messages and replace usage so that V1 i…
Browse files Browse the repository at this point in the history
…s used before EuclidV2 fork and V2 afterward
  • Loading branch information
jonastheis committed Jan 29, 2025
1 parent 882fb38 commit 74046dd
Show file tree
Hide file tree
Showing 9 changed files with 238 additions and 23 deletions.
46 changes: 45 additions & 1 deletion core/block_validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,8 +139,52 @@ func (v *BlockValidator) ValidateL1Messages(block *types.Block) error {
queueIndex := *nextQueueIndex

L1SectionOver := false
it := rawdb.IterateL1MessagesFrom(v.bc.db, queueIndex)

// From EuclidV2 onwards there can't be any skipped L1 messages, and we use a different L1MessageQueueV2.
if v.config.IsEuclidV2(block.Time()) {
it := rawdb.IterateL1MessagesV2From(v.bc.db, queueIndex)
for _, tx := range block.Transactions() {
if !tx.IsL1MessageTx() {
L1SectionOver = true
continue // we do not verify L2 transactions here
}

// check that L1 messages are before L2 transactions
if L1SectionOver {
return consensus.ErrInvalidL1MessageOrder
}

// queue index must be equal to the expected value
txQueueIndex := tx.AsL1MessageTx().QueueIndex
if txQueueIndex != queueIndex {
return consensus.ErrInvalidL1MessageOrder
}

if exists := it.Next(); !exists {
if err := it.Error(); err != nil {
log.Error("Unexpected DB error in ValidateL1Messages", "err", err, "queueIndex", txQueueIndex)
}
// the message in this block is not available in our local db.
// we'll reprocess this block at a later time.
return consensus.ErrMissingL1MessageData
}

// check that the L1 message in the block is the same that we collected from L1
msg := it.L1Message()
expectedHash := types.NewTx(&msg).Hash()

if tx.Hash() != expectedHash {
return consensus.ErrUnknownL1Message
}

// we expect L1 messages to be in order and contiguous
queueIndex++
}

return nil
}

it := rawdb.IterateL1MessagesV1From(v.bc.db, queueIndex)
for _, tx := range block.Transactions() {
if !tx.IsL1MessageTx() {
L1SectionOver = true
Expand Down
12 changes: 12 additions & 0 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -1853,6 +1853,18 @@ func (bc *BlockChain) BuildAndWriteBlock(parentBlock *types.Block, header *types
l.BlockHash = blockHash
}

// Make sure the block body is valid e.g. ordering of L1 messages is correct and continuous.
if err = bc.validator.ValidateBody(fullBlock); err != nil {
bc.reportBlock(fullBlock, receipts, err)
return NonStatTy, fmt.Errorf("error validating block body %d: %w", fullBlock.Number().Uint64(), err)
}

// Double check: even though we just built the block, make sure it is valid.
if err = bc.validator.ValidateState(fullBlock, statedb, receipts, gasUsed); err != nil {
bc.reportBlock(fullBlock, receipts, err)
return NonStatTy, fmt.Errorf("error validating block %d: %w", fullBlock.Number().Uint64(), err)
}

return bc.writeBlockWithState(fullBlock, receipts, logs, statedb, false)
}

Expand Down
140 changes: 134 additions & 6 deletions core/rawdb/accessors_l1_message.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,9 +141,9 @@ type L1MessageIterator struct {
maxQueueIndex uint64
}

// IterateL1MessagesFrom creates an L1MessageIterator that iterates over
// iterateL1MessagesFrom creates an L1MessageIterator that iterates over
// all L1 message in the database starting at the provided enqueue index.
func IterateL1MessagesFrom(db ethdb.Database, fromQueueIndex uint64) L1MessageIterator {
func iterateL1MessagesFrom(db ethdb.Database, fromQueueIndex uint64) L1MessageIterator {
start := encodeBigEndian(fromQueueIndex)
it := db.NewIterator(l1MessagePrefix, start)
keyLength := len(l1MessagePrefix) + 8
Expand Down Expand Up @@ -208,10 +208,64 @@ func (it *L1MessageIterator) Error() error {
return it.inner.Error()
}

// ReadL1MessagesFrom retrieves up to `maxCount` L1 messages starting at `startIndex`.
func ReadL1MessagesFrom(db ethdb.Database, startIndex, maxCount uint64) []types.L1MessageTx {
// L1MessageV1Iterator is a wrapper around L1MessageIterator that allows us to iterate over L1 messages V1.
type L1MessageV1Iterator struct {
db ethdb.Database
L1MessageIterator
}

func IterateL1MessagesV1From(db ethdb.Database, fromQueueIndex uint64) L1MessageV1Iterator {
return L1MessageV1Iterator{
db: db,
L1MessageIterator: iterateL1MessagesFrom(db, fromQueueIndex),
}
}

func (it *L1MessageV1Iterator) Next() bool {
for it.L1MessageIterator.Next() {
// L1MessageV2StartIndex is the first queue index of L1 messages that are from L1MessageQueueV2.
// Therefore, we stop reading L1 messages V1 when we reach this index.
// We need to check in every iteration as the start index can be set in the meantime when we are reading L1 messages.
v2StartIndex := ReadL1MessageV2StartIndex(it.db)
if v2StartIndex != nil && it.QueueIndex() >= *v2StartIndex {
return false
}
return true
}
return false
}

// L1MessageV2Iterator is a wrapper around L1MessageIterator that allows us to iterate over L1 messages V2.
type L1MessageV2Iterator struct {
v2StartIndex *uint64
L1MessageIterator
}

func IterateL1MessagesV2From(db ethdb.Database, fromQueueIndex uint64) L1MessageV2Iterator {
v2StartIndex := ReadL1MessageV2StartIndex(db)

return L1MessageV2Iterator{
v2StartIndex: v2StartIndex,
L1MessageIterator: iterateL1MessagesFrom(db, fromQueueIndex),
}
}

func (it *L1MessageV2Iterator) Next() bool {
if it.v2StartIndex == nil {
return false
}

for it.L1MessageIterator.Next() {
return it.QueueIndex() >= *it.v2StartIndex
}

return false
}

// ReadL1MessagesV1From retrieves up to `maxCount` L1 messages V1 starting at `startIndex`.
func ReadL1MessagesV1From(db ethdb.Database, startIndex, maxCount uint64) []types.L1MessageTx {
msgs := make([]types.L1MessageTx, 0, maxCount)
it := IterateL1MessagesFrom(db, startIndex)
it := IterateL1MessagesV1From(db, startIndex)
defer it.Release()

index := startIndex
Expand All @@ -223,7 +277,50 @@ func ReadL1MessagesFrom(db ethdb.Database, startIndex, maxCount uint64) []types.
// sanity check
if msg.QueueIndex != index {
log.Crit(
"Unexpected QueueIndex in ReadL1MessagesFrom",
"Unexpected QueueIndex in ReadL1MessagesV1From",
"expected", index,
"got", msg.QueueIndex,
"startIndex", startIndex,
"maxCount", maxCount,
)
}

msgs = append(msgs, msg)
index += 1
count -= 1

iteratorL1MessageSizeGauge.Update(int64(unsafe.Sizeof(msg) + uintptr(cap(msg.Data))))

if msg.QueueIndex == it.maxQueueIndex {
break
}
}

if err := it.Error(); err != nil {
log.Crit("Failed to read L1 messages", "err", err)
}

return msgs
}

// ReadL1MessagesV2From retrieves up to `maxCount` L1 messages V2 starting at `startIndex`.
// If startIndex is smaller than L1MessageV2StartIndex, this function will return an empty slice.
func ReadL1MessagesV2From(db ethdb.Database, startIndex, maxCount uint64) []types.L1MessageTx {
msgs := make([]types.L1MessageTx, 0, maxCount)

it := IterateL1MessagesV2From(db, startIndex)
defer it.Release()

index := startIndex
count := maxCount

for count > 0 && it.Next() {
msg := it.L1Message()

// sanity check
if msg.QueueIndex != index {
log.Crit(
"Unexpected QueueIndex in ReadL1MessagesV1From",
"expected", index,
"got", msg.QueueIndex,
"startIndex", startIndex,
Expand Down Expand Up @@ -275,3 +372,34 @@ func ReadFirstQueueIndexNotInL2Block(db ethdb.Reader, l2BlockHash common.Hash) *
queueIndex := binary.BigEndian.Uint64(data)
return &queueIndex
}

// WriteL1MessageV2StartIndex writes the start index of L1 messages that are from L1MessageQueueV2.
func WriteL1MessageV2StartIndex(db ethdb.KeyValueWriter, queueIndex uint64) {
value := big.NewInt(0).SetUint64(queueIndex).Bytes()

if err := db.Put(l1MessageV2StartIndex, value); err != nil {
log.Crit("Failed to update L1MessageV2 start index", "err", err)
}
}

// ReadL1MessageV2StartIndex retrieves the start index of L1 messages that are from L1MessageQueueV2.
func ReadL1MessageV2StartIndex(db ethdb.Reader) *uint64 {
data, err := db.Get(l1MessageV2StartIndex)
if err != nil && isNotFoundErr(err) {
return nil
}
if err != nil {
log.Crit("Failed to read L1MessageV2 start index from database", "err", err)
}
if len(data) == 0 {
return nil
}

number := new(big.Int).SetBytes(data)
if !number.IsUint64() {
log.Crit("Unexpected number for L1MessageV2 start index", "number", number)
}

res := number.Uint64()
return &res
}
6 changes: 3 additions & 3 deletions core/rawdb/accessors_l1_message_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func TestIterateL1Message(t *testing.T) {
t.Fatal("max index mismatch", "expected", 1000, "got", max)
}

it := IterateL1MessagesFrom(db, 103)
it := iterateL1MessagesFrom(db, 103)
defer it.Release()

for ii := 2; ii < len(msgs); ii++ {
Expand Down Expand Up @@ -104,7 +104,7 @@ func TestReadL1MessageTxRange(t *testing.T) {
db := NewMemoryDatabase()
WriteL1Messages(db, msgs)

got := ReadL1MessagesFrom(db, 101, 3)
got := ReadL1MessagesV1From(db, 101, 3)

if len(got) != 3 {
t.Fatal("Invalid length", "expected", 3, "got", len(got))
Expand Down Expand Up @@ -151,7 +151,7 @@ func TestIterationStopsAtMaxQueueIndex(t *testing.T) {
WriteHighestSyncedQueueIndex(db, 102)

// iteration should terminate at 102 and not read 103
got := ReadL1MessagesFrom(db, 100, 10)
got := ReadL1MessagesV1From(db, 100, 10)

if len(got) != 3 {
t.Fatal("Invalid length", "expected", 3, "got", len(got))
Expand Down
1 change: 1 addition & 0 deletions core/rawdb/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ var (
l1MessagePrefix = []byte("L1") // l1MessagePrefix + queueIndex (uint64 big endian) -> L1MessageTx
firstQueueIndexNotInL2BlockPrefix = []byte("q") // firstQueueIndexNotInL2BlockPrefix + L2 block hash -> enqueue index
highestSyncedQueueIndexKey = []byte("HighestSyncedQueueIndex")
l1MessageV2StartIndex = []byte("L1MessageV2StartIndex")

// Scroll rollup event store
rollupEventSyncedL1BlockNumberKey = []byte("R-LastRollupEventSyncedL1BlockNumber")
Expand Down
16 changes: 16 additions & 0 deletions core/types/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -421,6 +421,22 @@ func (b *Block) ContainsL1Messages() bool {
return false
}

func (b *Block) NumL1Messages() int {
if l1MsgCount := b.l1MsgCount.Load(); l1MsgCount != nil {
return l1MsgCount.(int)
}

count := 0
for _, tx := range b.transactions {
if tx.IsL1MessageTx() {
count += 1
}
}

b.l1MsgCount.Store(count)
return count
}

// NumL1MessagesProcessed returns the number of L1 messages processed in this block.
// This count includes both skipped and included messages.
// `firstQueueIndex` is the first queue index available for this block to process.
Expand Down
8 changes: 7 additions & 1 deletion miner/scroll_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -438,7 +438,13 @@ func (w *worker) updateSnapshot() {

func (w *worker) collectPendingL1Messages(startIndex uint64) []types.L1MessageTx {
maxCount := w.chainConfig.Scroll.L1Config.NumL1MessagesPerBlock
return rawdb.ReadL1MessagesFrom(w.eth.ChainDb(), startIndex, maxCount)

// If we are on EuclidV2, we need to read L1 messages from L1MessageQueueV2.
if w.chainConfig.IsEuclidV2(w.current.header.Time) {
return rawdb.ReadL1MessagesV2From(w.eth.ChainDb(), startIndex, maxCount)
}

return rawdb.ReadL1MessagesV1From(w.eth.ChainDb(), startIndex, maxCount)
}

// newWork
Expand Down
22 changes: 11 additions & 11 deletions rollup/sync_service/bridge_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,10 +69,10 @@ func newBridgeClient(ctx context.Context, l1Client EthClient, l1ChainId uint64,

// fetchMessagesInRange retrieves and parses all L1 messages between the
// provided from and to L1 block numbers (inclusive).
func (c *BridgeClient) fetchMessagesInRange(ctx context.Context, from, to uint64) ([]types.L1MessageTx, error) {
func (c *BridgeClient) fetchMessagesInRange(ctx context.Context, from, to uint64) ([]types.L1MessageTx, []types.L1MessageTx, error) {
log.Trace("BridgeClient fetchMessagesInRange", "fromBlock", from, "toBlock", to)

var msgs []types.L1MessageTx
var msgsV1, msgsV2 []types.L1MessageTx

opts := bind.FilterOpts{
Start: from,
Expand All @@ -85,18 +85,18 @@ func (c *BridgeClient) fetchMessagesInRange(ctx context.Context, from, to uint64
if !c.skipV1L1Messages {
it, err := c.filtererV1.FilterQueueTransaction(&opts, nil, nil)
if err != nil {
return nil, err
return nil, nil, err
}

for it.Next() {
event := it.Event
log.Trace("Received new L1 QueueTransaction event from L1MessageQueueV1", "event", event)

if !event.GasLimit.IsUint64() {
return nil, fmt.Errorf("invalid QueueTransaction event: QueueIndex = %v, GasLimit = %v", event.QueueIndex, event.GasLimit)
return nil, nil, fmt.Errorf("invalid QueueTransaction event: QueueIndex = %v, GasLimit = %v", event.QueueIndex, event.GasLimit)
}

msgs = append(msgs, types.L1MessageTx{
msgsV1 = append(msgsV1, types.L1MessageTx{
QueueIndex: event.QueueIndex,
Gas: event.GasLimit.Uint64(),
To: &event.Target,
Expand All @@ -107,26 +107,26 @@ func (c *BridgeClient) fetchMessagesInRange(ctx context.Context, from, to uint64
}

if err = it.Error(); err != nil {
return nil, err
return nil, nil, err
}
}

// We always query L1MessageQueueV2. Before EuclidV2 L1 upgrade tx this will return an empty list as we don't use
// L1MessageQueueV2 to enqueue L1 messages before EuclidV2.
it, err := c.filtererV2.FilterQueueTransaction(&opts, nil, nil)
if err != nil {
return nil, err
return nil, nil, err
}

for it.Next() {
event := it.Event
log.Trace("Received new L1 QueueTransaction event from L1MessageQueueV2", "event", event)

if !event.GasLimit.IsUint64() {
return nil, fmt.Errorf("invalid QueueTransaction event: QueueIndex = %v, GasLimit = %v", event.QueueIndex, event.GasLimit)
return nil, nil, fmt.Errorf("invalid QueueTransaction event: QueueIndex = %v, GasLimit = %v", event.QueueIndex, event.GasLimit)
}

msgs = append(msgs, types.L1MessageTx{
msgsV2 = append(msgsV2, types.L1MessageTx{
QueueIndex: event.QueueIndex,
Gas: event.GasLimit.Uint64(),
To: &event.Target,
Expand All @@ -140,10 +140,10 @@ func (c *BridgeClient) fetchMessagesInRange(ctx context.Context, from, to uint64
}

if err = it.Error(); err != nil {
return nil, err
return nil, nil, err
}

return msgs, nil
return msgsV1, msgsV2, nil
}

func (c *BridgeClient) getLatestConfirmedBlockNumber(ctx context.Context) (uint64, error) {
Expand Down
Loading

0 comments on commit 74046dd

Please sign in to comment.