Skip to content

Commit

Permalink
op-batcher: Multi-blob Support (#9779)
Browse files Browse the repository at this point in the history
* op-batcher: Prepare multi-frame support

* op-batcher: adapt tests to multi-frame txData

* op-batcher: add multi-blob transaction support

The existing configuration parameter TargetNumFrames can be used to specify
the desired number of blobs per transaction.

* op-batcher: improve blobs configuration (for testing)

* op-e2e: add multi-blob batcher test

* op-batcher: consolidate txID String & TerminalString impls

and add a test for it.

* op-batcher: Fix config test

* op-e2e: Improve multi-blob test to assert full blobs

* op-batcher: resolve open TODOs & renames (multi-blob)

* op-batcher: Test channel.NextTxData for single and multi frame txs
  • Loading branch information
sebastianst authored Mar 12, 2024
1 parent 9c888f6 commit 25985c1
Show file tree
Hide file tree
Showing 16 changed files with 531 additions and 128 deletions.
43 changes: 28 additions & 15 deletions op-batcher/batcher/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@ type channel struct {

// pending channel builder
channelBuilder *ChannelBuilder
// Set of unconfirmed txID -> frame data. For tx resubmission
pendingTransactions map[txID]txData
// Set of unconfirmed txID -> tx data. For tx resubmission
pendingTransactions map[string]txData
// Set of confirmed txID -> inclusion block. For determining if the channel is timed out
confirmedTransactions map[txID]eth.BlockID
confirmedTransactions map[string]eth.BlockID

// True if confirmed TX list is updated. Set to false after updated min/max inclusion blocks.
confirmedTxUpdated bool
Expand All @@ -44,20 +44,20 @@ func newChannel(log log.Logger, metr metrics.Metricer, cfg ChannelConfig, rollup
metr: metr,
cfg: cfg,
channelBuilder: cb,
pendingTransactions: make(map[txID]txData),
confirmedTransactions: make(map[txID]eth.BlockID),
pendingTransactions: make(map[string]txData),
confirmedTransactions: make(map[string]eth.BlockID),
}, nil
}

// TxFailed records a transaction as failed. It will attempt to resubmit the data
// in the failed transaction.
func (s *channel) TxFailed(id txID) {
func (s *channel) TxFailed(id string) {
if data, ok := s.pendingTransactions[id]; ok {
s.log.Trace("marked transaction as failed", "id", id)
// Note: when the batcher is changed to send multiple frames per tx,
// this needs to be changed to iterate over all frames of the tx data
// and re-queue them.
s.channelBuilder.PushFrame(data.Frame())
s.channelBuilder.PushFrames(data.Frames()...)
delete(s.pendingTransactions, id)
} else {
s.log.Warn("unknown transaction marked as failed", "id", id)
Expand All @@ -70,7 +70,7 @@ func (s *channel) TxFailed(id txID) {
// a channel have been marked as confirmed on L1 the channel may be invalid & need to be
// resubmitted.
// This function may reset the pending channel if the pending channel has timed out.
func (s *channel) TxConfirmed(id txID, inclusionBlock eth.BlockID) (bool, []*types.Block) {
func (s *channel) TxConfirmed(id string, inclusionBlock eth.BlockID) (bool, []*types.Block) {
s.metr.RecordBatchTxSubmitted()
s.log.Debug("marked transaction as confirmed", "id", id, "block", inclusionBlock)
if _, ok := s.pendingTransactions[id]; !ok {
Expand Down Expand Up @@ -146,20 +146,33 @@ func (s *channel) ID() derive.ChannelID {
return s.channelBuilder.ID()
}

// NextTxData returns the next tx data packet.
// If cfg.MultiFrameTxs is false, it returns txData with a single frame.
// If cfg.MultiFrameTxs is true, it will read frames from its channel builder
// until it either doesn't have more frames or the target number of frames is reached.
//
// NextTxData should only be called after HasTxData returned true.
func (s *channel) NextTxData() txData {
frame := s.channelBuilder.NextFrame()

txdata := txData{frame}
id := txdata.ID()
nf := s.cfg.MaxFramesPerTx()
txdata := txData{frames: make([]frameData, 0, nf)}
for i := 0; i < nf && s.channelBuilder.HasFrame(); i++ {
frame := s.channelBuilder.NextFrame()
txdata.frames = append(txdata.frames, frame)
}

s.log.Trace("returning next tx data", "id", id)
id := txdata.ID().String()
s.log.Debug("returning next tx data", "id", id, "num_frames", len(txdata.frames))
s.pendingTransactions[id] = txdata

return txdata
}

func (s *channel) HasFrame() bool {
return s.channelBuilder.HasFrame()
func (s *channel) HasTxData() bool {
if s.IsFull() || !s.cfg.MultiFrameTxs {
return s.channelBuilder.HasFrame()
}
// collect enough frames if channel is not full yet
return s.channelBuilder.PendingFrames() >= int(s.cfg.MaxFramesPerTx())
}

func (s *channel) IsFull() bool {
Expand Down
27 changes: 22 additions & 5 deletions op-batcher/batcher/channel_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,17 @@ type ChannelConfig struct {

// BatchType indicates whether the channel uses SingularBatch or SpanBatch.
BatchType uint

// Whether to put all frames of a channel inside a single tx.
// Should only be used for blob transactions.
MultiFrameTxs bool
}

func (cc *ChannelConfig) MaxFramesPerTx() int {
if !cc.MultiFrameTxs {
return 1
}
return cc.CompressorConfig.TargetNumFrames
}

// Check validates the [ChannelConfig] parameters.
Expand Down Expand Up @@ -91,6 +102,10 @@ func (cc *ChannelConfig) Check() error {
return fmt.Errorf("unrecognized batch type: %d", cc.BatchType)
}

if nf := cc.CompressorConfig.TargetNumFrames; nf < 1 {
return fmt.Errorf("invalid number of frames %d", nf)
}

return nil
}

Expand Down Expand Up @@ -449,11 +464,13 @@ func (c *ChannelBuilder) NextFrame() frameData {
return f
}

// PushFrame adds the frame back to the internal frames queue. Panics if not of
// PushFrames adds the frames back to the internal frames queue. Panics if not of
// the same channel.
func (c *ChannelBuilder) PushFrame(frame frameData) {
if frame.id.chID != c.ID() {
panic("wrong channel")
func (c *ChannelBuilder) PushFrames(frames ...frameData) {
for _, f := range frames {
if f.id.chID != c.ID() {
panic("wrong channel")
}
c.frames = append(c.frames, f)
}
c.frames = append(c.frames, frame)
}
8 changes: 4 additions & 4 deletions op-batcher/batcher/channel_builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -410,7 +410,7 @@ func TestChannelBuilder_NextFrame(t *testing.T) {
require.NoError(t, err)

// Push one frame into to the channel builder
expectedTx := txID{chID: co.ID(), frameNumber: fn}
expectedTx := txID{frameID{chID: co.ID(), frameNumber: fn}}
expectedBytes := buf.Bytes()
frameData := frameData{
id: frameID{
Expand All @@ -419,14 +419,14 @@ func TestChannelBuilder_NextFrame(t *testing.T) {
},
data: expectedBytes,
}
cb.PushFrame(frameData)
cb.PushFrames(frameData)

// There should only be 1 frame in the channel builder
require.Equal(t, 1, cb.PendingFrames())

// We should be able to increment to the next frame
constructedFrame := cb.NextFrame()
require.Equal(t, expectedTx, constructedFrame.id)
require.Equal(t, expectedTx[0], constructedFrame.id)
require.Equal(t, expectedBytes, constructedFrame.data)
require.Equal(t, 0, cb.PendingFrames())

Expand Down Expand Up @@ -462,7 +462,7 @@ func TestChannelBuilder_OutputWrongFramePanic(t *testing.T) {
},
data: buf.Bytes(),
}
cb.PushFrame(frame)
cb.PushFrames(frame)
})
}

Expand Down
38 changes: 20 additions & 18 deletions op-batcher/batcher/channel_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ type channelManager struct {
// channels to read frame data from, for writing batches onchain
channelQueue []*channel
// used to lookup channels by tx ID upon tx success / failure
txChannels map[txID]*channel
txChannels map[string]*channel

// if set to true, prevents production of any new channel frames
closed bool
Expand All @@ -53,7 +53,7 @@ func NewChannelManager(log log.Logger, metr metrics.Metricer, cfg ChannelConfig,
metr: metr,
cfg: cfg,
rollupCfg: rollupCfg,
txChannels: make(map[txID]*channel),
txChannels: make(map[string]*channel),
}
}

Expand All @@ -68,14 +68,15 @@ func (s *channelManager) Clear() {
s.closed = false
s.currentChannel = nil
s.channelQueue = nil
s.txChannels = make(map[txID]*channel)
s.txChannels = make(map[string]*channel)
}

// TxFailed records a transaction as failed. It will attempt to resubmit the data
// in the failed transaction.
func (s *channelManager) TxFailed(id txID) {
func (s *channelManager) TxFailed(_id txID) {
s.mu.Lock()
defer s.mu.Unlock()
id := _id.String()
if channel, ok := s.txChannels[id]; ok {
delete(s.txChannels, id)
channel.TxFailed(id)
Expand All @@ -92,9 +93,10 @@ func (s *channelManager) TxFailed(id txID) {
// a channel have been marked as confirmed on L1 the channel may be invalid & need to be
// resubmitted.
// This function may reset the pending channel if the pending channel has timed out.
func (s *channelManager) TxConfirmed(id txID, inclusionBlock eth.BlockID) {
func (s *channelManager) TxConfirmed(_id txID, inclusionBlock eth.BlockID) {
s.mu.Lock()
defer s.mu.Unlock()
id := _id.String()
if channel, ok := s.txChannels[id]; ok {
delete(s.txChannels, id)
done, blocks := channel.TxConfirmed(id, inclusionBlock)
Expand Down Expand Up @@ -130,40 +132,40 @@ func (s *channelManager) removePendingChannel(channel *channel) {

// nextTxData pops off s.datas & handles updating the internal state
func (s *channelManager) nextTxData(channel *channel) (txData, error) {
if channel == nil || !channel.HasFrame() {
if channel == nil || !channel.HasTxData() {
s.log.Trace("no next tx data")
return txData{}, io.EOF // TODO: not enough data error instead
}
tx := channel.NextTxData()
s.txChannels[tx.ID()] = channel
s.txChannels[tx.ID().String()] = channel
return tx, nil
}

// TxData returns the next tx data that should be submitted to L1.
//
// It currently only uses one frame per transaction. If the pending channel is
// If the pending channel is
// full, it only returns the remaining frames of this channel until it got
// successfully fully sent to L1. It returns io.EOF if there's no pending frame.
// successfully fully sent to L1. It returns io.EOF if there's no pending tx data.
func (s *channelManager) TxData(l1Head eth.BlockID) (txData, error) {
s.mu.Lock()
defer s.mu.Unlock()
var firstWithFrame *channel
var firstWithTxData *channel
for _, ch := range s.channelQueue {
if ch.HasFrame() {
firstWithFrame = ch
if ch.HasTxData() {
firstWithTxData = ch
break
}
}

dataPending := firstWithFrame != nil && firstWithFrame.HasFrame()
s.log.Debug("Requested tx data", "l1Head", l1Head, "data_pending", dataPending, "blocks_pending", len(s.blocks))
dataPending := firstWithTxData != nil && firstWithTxData.HasTxData()
s.log.Debug("Requested tx data", "l1Head", l1Head, "txdata_pending", dataPending, "blocks_pending", len(s.blocks))

// Short circuit if there is a pending frame or the channel manager is closed.
// Short circuit if there is pending tx data or the channel manager is closed.
if dataPending || s.closed {
return s.nextTxData(firstWithFrame)
return s.nextTxData(firstWithTxData)
}

// No pending frame, so we have to add new blocks to the channel
// No pending tx data, so we have to add new blocks to the channel

// If we have no saved blocks, we will not be able to create valid frames
if len(s.blocks) == 0 {
Expand Down Expand Up @@ -385,7 +387,7 @@ func (s *channelManager) Close() error {
}
}

if s.currentChannel.HasFrame() {
if s.currentChannel.HasTxData() {
// Make it clear to the caller that there is remaining pending work.
return ErrPendingAfterClose
}
Expand Down
4 changes: 2 additions & 2 deletions op-batcher/batcher/channel_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ func ChannelManager_TxResend(t *testing.T, batchType uint) {

txdata0, err := m.TxData(eth.BlockID{})
require.NoError(err)
txdata0bytes := txdata0.Bytes()
txdata0bytes := txdata0.CallData()
data0 := make([]byte, len(txdata0bytes))
// make sure we have a clone for later comparison
copy(data0, txdata0bytes)
Expand All @@ -232,7 +232,7 @@ func ChannelManager_TxResend(t *testing.T, batchType uint) {
txdata1, err := m.TxData(eth.BlockID{})
require.NoError(err)

data1 := txdata1.Bytes()
data1 := txdata1.CallData()
require.Equal(data1, data0)
fs, err := derive.ParseFrames(data1)
require.NoError(err)
Expand Down
Loading

0 comments on commit 25985c1

Please sign in to comment.