Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

op-batcher: fix channel duration timeout management #12916

Merged
merged 2 commits into from
Nov 18, 2024
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
op-batcher: fix channel duration timeout management
Previously, we would use L1 data to help track channel durations. For example, the batcher would be configured to post data every hour. We update a global state variable with the latest l1 origin of a channel when it closed, and compute the deadline for that channel using a duration delta starting at that l1 origin timestamp.

Since we changed the way autoDA switching works, a channel can be _closed_ (due to a duration timeout or other reason) and this will cause the l1 origin state variable to move forward, extending the deadline ready for the next channel. Crucially, with autoDA switching nowadays, the closed channel will not always be submitted on chain (it can be discarded and the blocks requeued). If it is discarded, the channel duration timeout has already been extended.

The fix for this is to update the global state variable at channel submission time, not channel closing time.
geoknee committed Nov 13, 2024
commit fdd331f7d4d306e3a5f9fd9bf558b18c81e744c6
27 changes: 14 additions & 13 deletions op-batcher/batcher/channel_manager.go
Original file line number Diff line number Diff line change
@@ -39,8 +39,9 @@ type channelManager struct {

// All blocks since the last request for new tx data.
blocks queue.Queue[*types.Block]
// The latest L1 block from all the L2 blocks in the most recently closed channel
l1OriginLastClosedChannel eth.BlockID
// The latest L1 block from all the L2 blocks in the most recently submitted channel.
// Used to track channel duration timeouts.
l1OriginLastSubmittedChannel eth.BlockID
// The default ChannelConfig to use for the next channel
defaultCfg ChannelConfig
// last block hash - for reorg detection
@@ -75,12 +76,12 @@ func (s *channelManager) SetChannelOutFactory(outFactory ChannelOutFactory) {

// Clear clears the entire state of the channel manager.
// It is intended to be used before launching op-batcher and after an L2 reorg.
func (s *channelManager) Clear(l1OriginLastClosedChannel eth.BlockID) {
func (s *channelManager) Clear(l1OriginLastSubmittedChannel eth.BlockID) {
s.mu.Lock()
defer s.mu.Unlock()
s.log.Trace("clearing channel manager state")
s.blocks.Clear()
s.l1OriginLastClosedChannel = l1OriginLastClosedChannel
s.l1OriginLastSubmittedChannel = l1OriginLastSubmittedChannel
s.tip = common.Hash{}
s.closed = false
s.currentChannel = nil
@@ -160,6 +161,12 @@ func (s *channelManager) nextTxData(channel *channel) (txData, error) {
return txData{}, io.EOF // TODO: not enough data error instead
}
tx := channel.NextTxData()

// update s.l1OriginLastSubmittedChannel so that the next
// channel's duration timeout will trigger properly
if channel.LatestL1Origin().Number > s.l1OriginLastSubmittedChannel.Number {
s.l1OriginLastSubmittedChannel = channel.LatestL1Origin()
}
s.txChannels[tx.ID().String()] = channel
return tx, nil
}
@@ -284,15 +291,15 @@ func (s *channelManager) ensureChannelWithSpace(l1Head eth.BlockID) error {
return fmt.Errorf("creating channel out: %w", err)
}

pc := newChannel(s.log, s.metr, cfg, s.rollupCfg, s.l1OriginLastClosedChannel.Number, channelOut)
pc := newChannel(s.log, s.metr, cfg, s.rollupCfg, s.l1OriginLastSubmittedChannel.Number, channelOut)

s.currentChannel = pc
s.channelQueue = append(s.channelQueue, pc)

s.log.Info("Created channel",
"id", pc.ID(),
"l1Head", l1Head,
"l1OriginLastClosedChannel", s.l1OriginLastClosedChannel,
"l1OriginLastSubmittedChannel", s.l1OriginLastSubmittedChannel,
"blocks_pending", s.blocks.Len(),
"batch_type", cfg.BatchType,
"compression_algo", cfg.CompressorConfig.CompressionAlgo,
@@ -374,11 +381,6 @@ func (s *channelManager) outputFrames() error {
return nil
}

lastClosedL1Origin := s.currentChannel.LatestL1Origin()
if lastClosedL1Origin.Number > s.l1OriginLastClosedChannel.Number {
s.l1OriginLastClosedChannel = lastClosedL1Origin
}

inBytes, outBytes := s.currentChannel.InputBytes(), s.currentChannel.OutputBytes()
s.metr.RecordChannelClosed(
s.currentChannel.ID(),
@@ -401,12 +403,11 @@ func (s *channelManager) outputFrames() error {
"input_bytes", inBytes,
"output_bytes", outBytes,
"oldest_l1_origin", s.currentChannel.OldestL1Origin(),
"l1_origin", lastClosedL1Origin,
"l1_origin", s.currentChannel.LatestL1Origin(),
"oldest_l2", s.currentChannel.OldestL2(),
"latest_l2", s.currentChannel.LatestL2(),
"full_reason", s.currentChannel.FullErr(),
"compr_ratio", comprRatio,
"latest_l1_origin", s.l1OriginLastClosedChannel,
)
return nil
}
8 changes: 4 additions & 4 deletions op-batcher/batcher/channel_manager_test.go
Original file line number Diff line number Diff line change
@@ -130,7 +130,7 @@ func ChannelManager_Clear(t *testing.T, batchType uint) {

// Channel Manager state should be empty by default
require.Empty(m.blocks)
require.Equal(eth.BlockID{}, m.l1OriginLastClosedChannel)
require.Equal(eth.BlockID{}, m.l1OriginLastSubmittedChannel)
require.Equal(common.Hash{}, m.tip)
require.Nil(m.currentChannel)
require.Empty(m.channelQueue)
@@ -161,8 +161,8 @@ func ChannelManager_Clear(t *testing.T, batchType uint) {
require.NoError(m.outputFrames())
_, err := m.nextTxData(m.currentChannel)
require.NoError(err)
require.NotNil(m.l1OriginLastClosedChannel)
require.Len(m.blocks, 0)
require.NotNil(m.l1OriginLastSubmittedChannel)
require.Equal(newL1Tip, m.tip)
require.Len(m.currentChannel.pendingTransactions, 1)

@@ -184,7 +184,7 @@ func ChannelManager_Clear(t *testing.T, batchType uint) {

// Check that the entire channel manager state cleared
require.Empty(m.blocks)
require.Equal(uint64(123), m.l1OriginLastClosedChannel.Number)
require.Equal(uint64(123), m.l1OriginLastSubmittedChannel.Number)
require.Equal(common.Hash{}, m.tip)
require.Nil(m.currentChannel)
require.Empty(m.channelQueue)
@@ -475,7 +475,7 @@ func TestChannelManager_ChannelCreation(t *testing.T) {
t.Run(test.name, func(t *testing.T) {
m := NewChannelManager(l, metrics.NoopMetrics, cfg, defaultTestRollupConfig)

m.l1OriginLastClosedChannel = test.safeL1Block
m.l1OriginLastSubmittedChannel = test.safeL1Block
require.Nil(t, m.currentChannel)

require.NoError(t, m.ensureChannelWithSpace(eth.BlockID{}))