Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

batcher: use abstract Queue type for blocks state #12180

Merged
merged 12 commits into from
Oct 2, 2024
45 changes: 24 additions & 21 deletions op-batcher/batcher/channel_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-service/queue"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
Expand All @@ -32,7 +33,7 @@ type channelManager struct {
rollupCfg *rollup.Config

// All blocks since the last request for new tx data.
blocks []*types.Block
blocks queue.Queue[*types.Block]
// The latest L1 block from all the L2 blocks in the most recently closed channel
l1OriginLastClosedChannel eth.BlockID
// The default ChannelConfig to use for the next channel
Expand Down Expand Up @@ -68,7 +69,7 @@ func (s *channelManager) Clear(l1OriginLastClosedChannel eth.BlockID) {
s.mu.Lock()
defer s.mu.Unlock()
s.log.Trace("clearing channel manager state")
s.blocks = s.blocks[:0]
s.blocks.Clear()
s.l1OriginLastClosedChannel = l1OriginLastClosedChannel
s.tip = common.Hash{}
s.closed = false
Expand Down Expand Up @@ -106,9 +107,11 @@ func (s *channelManager) TxConfirmed(_id txID, inclusionBlock eth.BlockID) {
if channel, ok := s.txChannels[id]; ok {
delete(s.txChannels, id)
done, blocks := channel.TxConfirmed(id, inclusionBlock)
s.blocks = append(blocks, s.blocks...)
if done {
s.removePendingChannel(channel)
if len(blocks) > 0 {
geoknee marked this conversation as resolved.
Show resolved Hide resolved
s.blocks.Prepend(blocks...)
}
}
} else {
s.log.Warn("transaction from unknown channel marked as confirmed", "id", id)
Expand Down Expand Up @@ -208,7 +211,7 @@ func (s *channelManager) getReadyChannel(l1Head eth.BlockID) (*channel, error) {
}

dataPending := firstWithTxData != nil
s.log.Debug("Requested tx data", "l1Head", l1Head, "txdata_pending", dataPending, "blocks_pending", len(s.blocks))
s.log.Debug("Requested tx data", "l1Head", l1Head, "txdata_pending", dataPending, "blocks_pending", s.blocks.Len())

// Short circuit if there is pending tx data or the channel manager is closed
if dataPending {
Expand All @@ -222,7 +225,7 @@ func (s *channelManager) getReadyChannel(l1Head eth.BlockID) (*channel, error) {
// No pending tx data, so we have to add new blocks to the channel

// If we have no saved blocks, we will not be able to create valid frames
if len(s.blocks) == 0 {
if s.blocks.Len() == 0 {
return nil, io.EOF
}

Expand Down Expand Up @@ -274,14 +277,14 @@ func (s *channelManager) ensureChannelWithSpace(l1Head eth.BlockID) error {
"id", pc.ID(),
"l1Head", l1Head,
"l1OriginLastClosedChannel", s.l1OriginLastClosedChannel,
"blocks_pending", len(s.blocks),
"blocks_pending", s.blocks.Len(),
"batch_type", cfg.BatchType,
"compression_algo", cfg.CompressorConfig.CompressionAlgo,
"target_num_frames", cfg.TargetNumFrames,
"max_frame_size", cfg.MaxFrameSize,
"use_blobs", cfg.UseBlobs,
)
s.metr.RecordChannelOpened(pc.ID(), len(s.blocks))
s.metr.RecordChannelOpened(pc.ID(), s.blocks.Len())

return nil
}
Expand All @@ -304,7 +307,13 @@ func (s *channelManager) processBlocks() error {
_chFullErr *ChannelFullError // throw away, just for type checking
latestL2ref eth.L2BlockRef
)
for i, block := range s.blocks {

for i := 0; ; i++ {
block, ok := s.blocks.PeekN(i)
if !ok {
break
}

l1info, err := s.currentChannel.AddBlock(block)
if errors.As(err, &_chFullErr) {
// current block didn't get added because channel is already full
Expand All @@ -323,22 +332,16 @@ func (s *channelManager) processBlocks() error {
}
}

if blocksAdded == len(s.blocks) {
// all blocks processed, reuse slice
s.blocks = s.blocks[:0]
} else {
// remove processed blocks
s.blocks = s.blocks[blocksAdded:]
}
_, _ = s.blocks.DequeueN(blocksAdded)

s.metr.RecordL2BlocksAdded(latestL2ref,
blocksAdded,
len(s.blocks),
s.blocks.Len(),
s.currentChannel.InputBytes(),
s.currentChannel.ReadyBytes())
s.log.Debug("Added blocks to channel",
"blocks_added", blocksAdded,
"blocks_pending", len(s.blocks),
"blocks_pending", s.blocks.Len(),
"channel_full", s.currentChannel.IsFull(),
"input_bytes", s.currentChannel.InputBytes(),
"ready_bytes", s.currentChannel.ReadyBytes(),
Expand All @@ -363,7 +366,7 @@ func (s *channelManager) outputFrames() error {
inBytes, outBytes := s.currentChannel.InputBytes(), s.currentChannel.OutputBytes()
s.metr.RecordChannelClosed(
s.currentChannel.ID(),
len(s.blocks),
s.blocks.Len(),
s.currentChannel.TotalFrames(),
inBytes,
outBytes,
Expand All @@ -377,7 +380,7 @@ func (s *channelManager) outputFrames() error {

s.log.Info("Channel closed",
"id", s.currentChannel.ID(),
"blocks_pending", len(s.blocks),
"blocks_pending", s.blocks.Len(),
"num_frames", s.currentChannel.TotalFrames(),
"input_bytes", inBytes,
"output_bytes", outBytes,
Expand All @@ -404,7 +407,7 @@ func (s *channelManager) AddL2Block(block *types.Block) error {
}

s.metr.RecordL2BlockInPendingQueue(block)
s.blocks = append(s.blocks, block)
s.blocks.Enqueue(block)
s.tip = block.Hash()

return nil
Expand Down Expand Up @@ -489,7 +492,7 @@ func (s *channelManager) Requeue(newCfg ChannelConfig) {
}

// We put the blocks back at the front of the queue:
s.blocks = append(blocksToRequeue, s.blocks...)
s.blocks.Prepend(blocksToRequeue...)
// Channels which where already being submitted are put back
s.channelQueue = newChannelQueue
s.currentChannel = nil
Expand Down
6 changes: 4 additions & 2 deletions op-batcher/batcher/channel_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
derivetest "github.com/ethereum-optimism/optimism/op-node/rollup/derive/test"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-service/queue"
"github.com/ethereum-optimism/optimism/op-service/testlog"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
Expand Down Expand Up @@ -87,7 +88,7 @@ func ChannelManagerReturnsErrReorg(t *testing.T, batchType uint) {
require.NoError(t, m.AddL2Block(c))
require.ErrorIs(t, m.AddL2Block(x), ErrReorg)

require.Equal(t, []*types.Block{a, b, c}, m.blocks)
require.Equal(t, queue.Queue[*types.Block]{a, b, c}, m.blocks)
}

// ChannelManagerReturnsErrReorgWhenDrained ensures that the channel manager
Expand Down Expand Up @@ -626,7 +627,7 @@ func TestChannelManager_Requeue(t *testing.T) {

// This is the snapshot of channel manager state we want to reinstate
// when we requeue
stateSnapshot := []*types.Block{blockA, blockB}
stateSnapshot := queue.Queue[*types.Block]{blockA, blockB}
m.blocks = stateSnapshot
require.Empty(t, m.channelQueue)

Expand Down Expand Up @@ -664,5 +665,6 @@ func TestChannelManager_Requeue(t *testing.T) {

// The requeue shouldn't affect the pending channel
require.Contains(t, m.channelQueue, channel0)

require.NotContains(t, m.blocks, blockA)
}
74 changes: 74 additions & 0 deletions op-service/queue/queue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package queue
sebastianst marked this conversation as resolved.
Show resolved Hide resolved

// Queue implements a FIFO queue.
type Queue[T any] []T
geoknee marked this conversation as resolved.
Show resolved Hide resolved

// Enqueue adds the elements to the back of the queue.
func (q *Queue[T]) Enqueue(t ...T) {
if len(t) == 0 {
return
}
*q = append(*q, t...)
}

// Dequeue removes a single element from the front of the queue
// (if there is one) and returns it. Returns a zero value and false
// if there is no element to dequeue.
func (q *Queue[T]) Dequeue() (T, bool) {
el, ok := q.DequeueN(1)
if !ok {
var zeroValue T
return zeroValue, false
}
return el[0], true
}

// DequeueN removes N elements from the front of the queue
// (if there are enough) and returns a slice of those elements. Returns
// a nil slice and false if there are insufficient elements to dequeue.
func (q *Queue[T]) DequeueN(N int) ([]T, bool) {
if len(*q) < N {
return nil, false
}
t := (*q)[0:N]
*q = (*q)[N:]
return t, true
}

// Prepend inserts the elements at the front of the queue,
// preserving their order. A noop if t is empty.
func (q *Queue[T]) Prepend(t ...T) {
if len(t) == 0 {
return
}
*q = append(t, *q...)
}

// Clear removes all elements from the queue.
func (q *Queue[T]) Clear() {
*q = (*q)[:0]
}

// Len returns the number of elements in the queue.
func (q *Queue[T]) Len() int {
return len(*q)
}

// Peek returns the single element at the front of the queue
// (if there is one) without removing it. Returns a zero value and
// false if there is no element to peek at.
func (q *Queue[T]) Peek() (T, bool) {
return q.PeekN(0)
}

// PeekN returns the element in Nth position in the queue
// Returns a zero value and false if there are insufficient elements
// in the queue.
func (q *Queue[T]) PeekN(N int) (T, bool) {
if len(*q) <= N {
var zeroValue T
return zeroValue, false
}
t := (*q)[N]
return t, true
}
122 changes: 122 additions & 0 deletions op-service/queue/queue_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
package queue

import (
"testing"

"github.com/stretchr/testify/require"
)

func TestQueue(t *testing.T) {
t.Run("enqueue amd dequeue", func(t *testing.T) {
q := Queue[int]{}
q.Enqueue(1, 2, 3, 4)

p, peekOk := q.Peek()
require.True(t, peekOk)
require.Equal(t, 1, p)

d, dequeueOk := q.Dequeue()
require.Equal(t, 1, d)
require.True(t, dequeueOk)
require.Equal(t, 3, q.Len())
p, peekOk = q.Peek()
require.True(t, peekOk)
require.Equal(t, 2, p)

d, dequeueOk = q.Dequeue()
require.Equal(t, 2, d)
require.True(t, dequeueOk)
require.Equal(t, 2, q.Len())
p, peekOk = q.Peek()
require.True(t, peekOk)
require.Equal(t, 3, p)

d, dequeueOk = q.Dequeue()
require.Equal(t, 3, d)
require.True(t, dequeueOk)
require.Equal(t, 1, q.Len())
p, peekOk = q.Peek()
require.True(t, peekOk)
require.Equal(t, 4, p)

d, dequeueOk = q.Dequeue()
require.Equal(t, 4, d)
require.True(t, dequeueOk)
require.Equal(t, 0, q.Len())
p, peekOk = q.Peek()
require.False(t, peekOk)
require.Equal(t, 0, p)

d, dequeueOk = q.Dequeue()
require.Equal(t, 0, d)
require.False(t, dequeueOk)
require.Equal(t, 0, q.Len())
p, peekOk = q.Peek()
require.False(t, peekOk)
require.Equal(t, 0, p)
p, peekOk = q.Peek()
require.False(t, peekOk)
require.Equal(t, 0, p)
})

t.Run("peekN and deqeueueN", func(t *testing.T) {
q := Queue[int]{}
q.Enqueue(1, 2, 3, 4)

p, peekOk := q.PeekN(1)
require.True(t, peekOk)
require.Equal(t, 2, p)

p, peekOk = q.PeekN(2)
require.Equal(t, 3, p)
require.True(t, peekOk)
require.Equal(t, 4, q.Len())

p, peekOk = q.PeekN(4)
require.Equal(t, 0, p)
require.False(t, peekOk)

d, dequeueOk := q.DequeueN(1)
require.Equal(t, []int{1}, d)
require.True(t, dequeueOk)
require.Equal(t, 3, q.Len())

d, dequeueOk = q.DequeueN(3)
require.Equal(t, []int{2, 3, 4}, d)
require.True(t, dequeueOk)
require.Equal(t, 0, q.Len())
})

t.Run("enqueue and clear", func(t *testing.T) {
q := Queue[int]{}
q.Enqueue(5, 6, 7)

q.Clear()
require.Equal(t, 0, q.Len())

d, ok := q.Dequeue()
require.Equal(t, 0, d)
require.False(t, ok)
})

t.Run("prepend", func(t *testing.T) {
var q, r Queue[int]
q.Enqueue(5, 6, 7)
r.Enqueue(8, 9)

q.Prepend(r...)
require.Equal(t, 5, q.Len())

d, ok := q.Dequeue()
require.Equal(t, 8, d)
require.True(t, ok)
require.Equal(t, 4, q.Len())

q.Prepend()
require.Equal(t, 4, q.Len())

d, ok = q.Dequeue()
require.Equal(t, 9, d)
require.True(t, ok)
})
}