diff --git a/op-batcher/batcher/channel_manager.go b/op-batcher/batcher/channel_manager.go index 23e8f7843696..f33c9d3b5448 100644 --- a/op-batcher/batcher/channel_manager.go +++ b/op-batcher/batcher/channel_manager.go @@ -10,6 +10,7 @@ import ( "github.com/ethereum-optimism/optimism/op-node/rollup" "github.com/ethereum-optimism/optimism/op-node/rollup/derive" "github.com/ethereum-optimism/optimism/op-service/eth" + "github.com/ethereum-optimism/optimism/op-service/queue" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/log" @@ -32,7 +33,7 @@ type channelManager struct { rollupCfg *rollup.Config // All blocks since the last request for new tx data. - blocks []*types.Block + blocks queue.Queue[*types.Block] // The latest L1 block from all the L2 blocks in the most recently closed channel l1OriginLastClosedChannel eth.BlockID // The default ChannelConfig to use for the next channel @@ -68,7 +69,7 @@ func (s *channelManager) Clear(l1OriginLastClosedChannel eth.BlockID) { s.mu.Lock() defer s.mu.Unlock() s.log.Trace("clearing channel manager state") - s.blocks = s.blocks[:0] + s.blocks.Clear() s.l1OriginLastClosedChannel = l1OriginLastClosedChannel s.tip = common.Hash{} s.closed = false @@ -106,9 +107,11 @@ func (s *channelManager) TxConfirmed(_id txID, inclusionBlock eth.BlockID) { if channel, ok := s.txChannels[id]; ok { delete(s.txChannels, id) done, blocks := channel.TxConfirmed(id, inclusionBlock) - s.blocks = append(blocks, s.blocks...) if done { s.removePendingChannel(channel) + if len(blocks) > 0 { + s.blocks.Prepend(blocks...) + } } } else { s.log.Warn("transaction from unknown channel marked as confirmed", "id", id) @@ -208,7 +211,7 @@ func (s *channelManager) getReadyChannel(l1Head eth.BlockID) (*channel, error) { } dataPending := firstWithTxData != nil - s.log.Debug("Requested tx data", "l1Head", l1Head, "txdata_pending", dataPending, "blocks_pending", len(s.blocks)) + s.log.Debug("Requested tx data", "l1Head", l1Head, "txdata_pending", dataPending, "blocks_pending", s.blocks.Len()) // Short circuit if there is pending tx data or the channel manager is closed if dataPending { @@ -222,7 +225,7 @@ func (s *channelManager) getReadyChannel(l1Head eth.BlockID) (*channel, error) { // No pending tx data, so we have to add new blocks to the channel // If we have no saved blocks, we will not be able to create valid frames - if len(s.blocks) == 0 { + if s.blocks.Len() == 0 { return nil, io.EOF } @@ -274,14 +277,14 @@ func (s *channelManager) ensureChannelWithSpace(l1Head eth.BlockID) error { "id", pc.ID(), "l1Head", l1Head, "l1OriginLastClosedChannel", s.l1OriginLastClosedChannel, - "blocks_pending", len(s.blocks), + "blocks_pending", s.blocks.Len(), "batch_type", cfg.BatchType, "compression_algo", cfg.CompressorConfig.CompressionAlgo, "target_num_frames", cfg.TargetNumFrames, "max_frame_size", cfg.MaxFrameSize, "use_blobs", cfg.UseBlobs, ) - s.metr.RecordChannelOpened(pc.ID(), len(s.blocks)) + s.metr.RecordChannelOpened(pc.ID(), s.blocks.Len()) return nil } @@ -304,7 +307,13 @@ func (s *channelManager) processBlocks() error { _chFullErr *ChannelFullError // throw away, just for type checking latestL2ref eth.L2BlockRef ) - for i, block := range s.blocks { + + for i := 0; ; i++ { + block, ok := s.blocks.PeekN(i) + if !ok { + break + } + l1info, err := s.currentChannel.AddBlock(block) if errors.As(err, &_chFullErr) { // current block didn't get added because channel is already full @@ -323,22 +332,16 @@ func (s *channelManager) processBlocks() error { } } - if blocksAdded == len(s.blocks) { - // all blocks processed, reuse slice - s.blocks = s.blocks[:0] - } else { - // remove processed blocks - s.blocks = s.blocks[blocksAdded:] - } + _, _ = s.blocks.DequeueN(blocksAdded) s.metr.RecordL2BlocksAdded(latestL2ref, blocksAdded, - len(s.blocks), + s.blocks.Len(), s.currentChannel.InputBytes(), s.currentChannel.ReadyBytes()) s.log.Debug("Added blocks to channel", "blocks_added", blocksAdded, - "blocks_pending", len(s.blocks), + "blocks_pending", s.blocks.Len(), "channel_full", s.currentChannel.IsFull(), "input_bytes", s.currentChannel.InputBytes(), "ready_bytes", s.currentChannel.ReadyBytes(), @@ -363,7 +366,7 @@ func (s *channelManager) outputFrames() error { inBytes, outBytes := s.currentChannel.InputBytes(), s.currentChannel.OutputBytes() s.metr.RecordChannelClosed( s.currentChannel.ID(), - len(s.blocks), + s.blocks.Len(), s.currentChannel.TotalFrames(), inBytes, outBytes, @@ -377,7 +380,7 @@ func (s *channelManager) outputFrames() error { s.log.Info("Channel closed", "id", s.currentChannel.ID(), - "blocks_pending", len(s.blocks), + "blocks_pending", s.blocks.Len(), "num_frames", s.currentChannel.TotalFrames(), "input_bytes", inBytes, "output_bytes", outBytes, @@ -404,7 +407,7 @@ func (s *channelManager) AddL2Block(block *types.Block) error { } s.metr.RecordL2BlockInPendingQueue(block) - s.blocks = append(s.blocks, block) + s.blocks.Enqueue(block) s.tip = block.Hash() return nil @@ -489,7 +492,7 @@ func (s *channelManager) Requeue(newCfg ChannelConfig) { } // We put the blocks back at the front of the queue: - s.blocks = append(blocksToRequeue, s.blocks...) + s.blocks.Prepend(blocksToRequeue...) // Channels which where already being submitted are put back s.channelQueue = newChannelQueue s.currentChannel = nil diff --git a/op-batcher/batcher/channel_manager_test.go b/op-batcher/batcher/channel_manager_test.go index dc913505c05f..fac34f8c931e 100644 --- a/op-batcher/batcher/channel_manager_test.go +++ b/op-batcher/batcher/channel_manager_test.go @@ -13,6 +13,7 @@ import ( "github.com/ethereum-optimism/optimism/op-node/rollup/derive" derivetest "github.com/ethereum-optimism/optimism/op-node/rollup/derive/test" "github.com/ethereum-optimism/optimism/op-service/eth" + "github.com/ethereum-optimism/optimism/op-service/queue" "github.com/ethereum-optimism/optimism/op-service/testlog" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" @@ -87,7 +88,7 @@ func ChannelManagerReturnsErrReorg(t *testing.T, batchType uint) { require.NoError(t, m.AddL2Block(c)) require.ErrorIs(t, m.AddL2Block(x), ErrReorg) - require.Equal(t, []*types.Block{a, b, c}, m.blocks) + require.Equal(t, queue.Queue[*types.Block]{a, b, c}, m.blocks) } // ChannelManagerReturnsErrReorgWhenDrained ensures that the channel manager @@ -626,7 +627,7 @@ func TestChannelManager_Requeue(t *testing.T) { // This is the snapshot of channel manager state we want to reinstate // when we requeue - stateSnapshot := []*types.Block{blockA, blockB} + stateSnapshot := queue.Queue[*types.Block]{blockA, blockB} m.blocks = stateSnapshot require.Empty(t, m.channelQueue) @@ -664,5 +665,6 @@ func TestChannelManager_Requeue(t *testing.T) { // The requeue shouldn't affect the pending channel require.Contains(t, m.channelQueue, channel0) + require.NotContains(t, m.blocks, blockA) } diff --git a/op-service/queue/queue.go b/op-service/queue/queue.go new file mode 100644 index 000000000000..8dd0d5e16d0d --- /dev/null +++ b/op-service/queue/queue.go @@ -0,0 +1,75 @@ +package queue + +// Queue implements a FIFO queue. +type Queue[T any] []T + +// Enqueue adds the elements to the back of the queue. +func (q *Queue[T]) Enqueue(t ...T) { + if len(t) == 0 { + return + } + *q = append(*q, t...) +} + +// Dequeue removes a single element from the front of the queue +// (if there is one) and returns it. Returns a zero value and false +// if there is no element to dequeue. +func (q *Queue[T]) Dequeue() (T, bool) { + if len(*q) == 0 { + var zeroValue T + return zeroValue, false + } + t := (*q)[0] + *q = (*q)[1:] + return t, true +} + +// DequeueN removes N elements from the front of the queue +// (if there are enough) and returns a slice of those elements. Returns +// a nil slice and false if there are insufficient elements to dequeue. +func (q *Queue[T]) DequeueN(N int) ([]T, bool) { + if len(*q) < N { + return nil, false + } + t := (*q)[0:N] + *q = (*q)[N:] + return t, true +} + +// Prepend inserts the elements at the front of the queue, +// preserving their order. A noop if t is empty. +func (q *Queue[T]) Prepend(t ...T) { + if len(t) == 0 { + return + } + *q = append(t, *q...) +} + +// Clear removes all elements from the queue. +func (q *Queue[T]) Clear() { + *q = (*q)[:0] +} + +// Len returns the number of elements in the queue. +func (q *Queue[T]) Len() int { + return len(*q) +} + +// Peek returns the single element at the front of the queue +// (if there is one) without removing it. Returns a zero value and +// false if there is no element to peek at. +func (q *Queue[T]) Peek() (T, bool) { + return q.PeekN(0) +} + +// PeekN returns the element in Nth position in the queue +// Returns a zero value and false if there are insufficient elements +// in the queue. +func (q *Queue[T]) PeekN(N int) (T, bool) { + if len(*q) <= N { + var zeroValue T + return zeroValue, false + } + t := (*q)[N] + return t, true +} diff --git a/op-service/queue/queue_test.go b/op-service/queue/queue_test.go new file mode 100644 index 000000000000..deca8ab411a5 --- /dev/null +++ b/op-service/queue/queue_test.go @@ -0,0 +1,122 @@ +package queue + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestQueue(t *testing.T) { + t.Run("enqueue amd dequeue", func(t *testing.T) { + q := Queue[int]{} + q.Enqueue(1, 2, 3, 4) + + p, peekOk := q.Peek() + require.True(t, peekOk) + require.Equal(t, 1, p) + + d, dequeueOk := q.Dequeue() + require.Equal(t, 1, d) + require.True(t, dequeueOk) + require.Equal(t, 3, q.Len()) + p, peekOk = q.Peek() + require.True(t, peekOk) + require.Equal(t, 2, p) + + d, dequeueOk = q.Dequeue() + require.Equal(t, 2, d) + require.True(t, dequeueOk) + require.Equal(t, 2, q.Len()) + p, peekOk = q.Peek() + require.True(t, peekOk) + require.Equal(t, 3, p) + + d, dequeueOk = q.Dequeue() + require.Equal(t, 3, d) + require.True(t, dequeueOk) + require.Equal(t, 1, q.Len()) + p, peekOk = q.Peek() + require.True(t, peekOk) + require.Equal(t, 4, p) + + d, dequeueOk = q.Dequeue() + require.Equal(t, 4, d) + require.True(t, dequeueOk) + require.Equal(t, 0, q.Len()) + p, peekOk = q.Peek() + require.False(t, peekOk) + require.Equal(t, 0, p) + + d, dequeueOk = q.Dequeue() + require.Equal(t, 0, d) + require.False(t, dequeueOk) + require.Equal(t, 0, q.Len()) + p, peekOk = q.Peek() + require.False(t, peekOk) + require.Equal(t, 0, p) + p, peekOk = q.Peek() + require.False(t, peekOk) + require.Equal(t, 0, p) + }) + + t.Run("peekN and deqeueueN", func(t *testing.T) { + q := Queue[int]{} + q.Enqueue(1, 2, 3, 4) + + p, peekOk := q.PeekN(1) + require.True(t, peekOk) + require.Equal(t, 2, p) + + p, peekOk = q.PeekN(2) + require.Equal(t, 3, p) + require.True(t, peekOk) + require.Equal(t, 4, q.Len()) + + p, peekOk = q.PeekN(4) + require.Equal(t, 0, p) + require.False(t, peekOk) + + d, dequeueOk := q.DequeueN(1) + require.Equal(t, []int{1}, d) + require.True(t, dequeueOk) + require.Equal(t, 3, q.Len()) + + d, dequeueOk = q.DequeueN(3) + require.Equal(t, []int{2, 3, 4}, d) + require.True(t, dequeueOk) + require.Equal(t, 0, q.Len()) + }) + + t.Run("enqueue and clear", func(t *testing.T) { + q := Queue[int]{} + q.Enqueue(5, 6, 7) + + q.Clear() + require.Equal(t, 0, q.Len()) + + d, ok := q.Dequeue() + require.Equal(t, 0, d) + require.False(t, ok) + }) + + t.Run("prepend", func(t *testing.T) { + var q, r Queue[int] + q.Enqueue(5, 6, 7) + r.Enqueue(8, 9) + + q.Prepend(r...) + require.Equal(t, 5, q.Len()) + + d, ok := q.Dequeue() + require.Equal(t, 8, d) + require.True(t, ok) + require.Equal(t, 4, q.Len()) + + q.Prepend() + require.Equal(t, 4, q.Len()) + + d, ok = q.Dequeue() + require.Equal(t, 9, d) + require.True(t, ok) + }) +}