From 6eb298f31723e838ac4261fbecbfcfce371d8606 Mon Sep 17 00:00:00 2001 From: David Date: Tue, 27 Aug 2024 16:24:31 +0800 Subject: [PATCH] feat(taiko-client): introduce `TxMgrSelector` for proposer / prover (#17986) Co-authored-by: maskpp --- .../taiko-client/internal/utils/test_utils.go | 16 ----- .../internal/utils/txmgr_selector.go | 62 +++++++++++++++++++ .../internal/utils/txmgr_selector_test.go | 29 +++++++++ packages/taiko-client/proposer/proposer.go | 50 +++++---------- .../taiko-client/proposer/proposer_test.go | 3 +- .../proof_submitter/transaction/sender.go | 53 +++++++--------- 6 files changed, 130 insertions(+), 83 deletions(-) delete mode 100644 packages/taiko-client/internal/utils/test_utils.go create mode 100644 packages/taiko-client/internal/utils/txmgr_selector.go create mode 100644 packages/taiko-client/internal/utils/txmgr_selector_test.go diff --git a/packages/taiko-client/internal/utils/test_utils.go b/packages/taiko-client/internal/utils/test_utils.go deleted file mode 100644 index a1ca5db35fe..00000000000 --- a/packages/taiko-client/internal/utils/test_utils.go +++ /dev/null @@ -1,16 +0,0 @@ -package utils - -import ( - "context" - "testing" - - "github.com/ethereum/go-ethereum/rpc" - "github.com/stretchr/testify/assert" -) - -// MineL1Block mines a block on the L1 chain. -func MineL1Block(t *testing.T, l1Client *rpc.Client) { - var blockID string - assert.Nil(t, l1Client.CallContext(context.Background(), &blockID, "evm_mine")) - assert.NotEmpty(t, blockID) -} diff --git a/packages/taiko-client/internal/utils/txmgr_selector.go b/packages/taiko-client/internal/utils/txmgr_selector.go new file mode 100644 index 00000000000..1224bff4588 --- /dev/null +++ b/packages/taiko-client/internal/utils/txmgr_selector.go @@ -0,0 +1,62 @@ +package utils + +import ( + "time" + + "github.com/ethereum-optimism/optimism/op-service/txmgr" +) + +var ( + defaultPrivateTxMgrRetryInterval = 5 * time.Minute +) + +// TxMgrSelector is responsible for selecting the correct transaction manager, +// it will choose the transaction manager for a private mempool if it is available and works well, +// otherwise it will choose the normal transaction manager. +type TxMgrSelector struct { + txMgr *txmgr.SimpleTxManager + privateTxMgr *txmgr.SimpleTxManager + privateTxMgrFailedAt *time.Time + privateTxMgrRetryInterval time.Duration +} + +// NewTxMgrSelector creates a new TxMgrSelector instance. +func NewTxMgrSelector( + txMgr *txmgr.SimpleTxManager, + privateTxMgr *txmgr.SimpleTxManager, + privateTxMgrRetryInterval *time.Duration, +) *TxMgrSelector { + retryInterval := defaultPrivateTxMgrRetryInterval + if privateTxMgrRetryInterval != nil { + retryInterval = *privateTxMgrRetryInterval + } + + return &TxMgrSelector{ + txMgr: txMgr, + privateTxMgr: privateTxMgr, + privateTxMgrFailedAt: nil, + privateTxMgrRetryInterval: retryInterval, + } +} + +// Select selects a transaction manager based on the current state. +func (s *TxMgrSelector) Select() (*txmgr.SimpleTxManager, bool) { + // If there is no private transaction manager, return the normal transaction manager. + if s.privateTxMgr == nil { + return s.txMgr, false + } + + // If the private transaction manager has failed, check if it is time to retry. + if s.privateTxMgrFailedAt == nil || time.Now().After(s.privateTxMgrFailedAt.Add(s.privateTxMgrRetryInterval)) { + return s.privateTxMgr, true + } + + // Otherwise, return the normal transaction manager. + return s.txMgr, false +} + +// RecordPrivateTxMgrFailed records the time when the private transaction manager has failed. +func (s *TxMgrSelector) RecordPrivateTxMgrFailed() { + now := time.Now() + s.privateTxMgrFailedAt = &now +} diff --git a/packages/taiko-client/internal/utils/txmgr_selector_test.go b/packages/taiko-client/internal/utils/txmgr_selector_test.go new file mode 100644 index 00000000000..92ff69d1d07 --- /dev/null +++ b/packages/taiko-client/internal/utils/txmgr_selector_test.go @@ -0,0 +1,29 @@ +package utils + +import ( + "testing" + + "github.com/ethereum-optimism/optimism/op-service/txmgr" + "github.com/stretchr/testify/require" +) + +var ( + testTxMgr = &txmgr.SimpleTxManager{} + testSelector = NewTxMgrSelector(testTxMgr, nil, nil) +) + +func TestNewTxMgrSelector(t *testing.T) { + require.Equal(t, defaultPrivateTxMgrRetryInterval, testSelector.privateTxMgrRetryInterval) +} + +func TestSelect(t *testing.T) { + txMgr, isPrivate := testSelector.Select() + require.NotNil(t, txMgr) + require.False(t, isPrivate) +} + +func TestRecordPrivateTxMgrFailed(t *testing.T) { + require.Nil(t, testSelector.privateTxMgrFailedAt) + testSelector.RecordPrivateTxMgrFailed() + require.NotNil(t, testSelector.privateTxMgrFailedAt) +} diff --git a/packages/taiko-client/proposer/proposer.go b/packages/taiko-client/proposer/proposer.go index 502807ccc82..a7552e8f005 100644 --- a/packages/taiko-client/proposer/proposer.go +++ b/packages/taiko-client/proposer/proposer.go @@ -52,8 +52,7 @@ type Proposer struct { lastProposedAt time.Time totalEpochs uint64 - txmgr *txmgr.SimpleTxManager - privateTxmgr *txmgr.SimpleTxManager + txmgrSelector *utils.TxMgrSelector ctx context.Context wg sync.WaitGroup @@ -94,10 +93,8 @@ func (p *Proposer) InitFromConfig( return err } - if txMgr != nil { - p.txmgr = txMgr - } else { - if p.txmgr, err = txmgr.NewSimpleTxManager( + if txMgr == nil { + if txMgr, err = txmgr.NewSimpleTxManager( "proposer", log.Root(), &metrics.TxMgrMetrics, @@ -106,22 +103,8 @@ func (p *Proposer) InitFromConfig( return err } } - if privateTxMgr != nil { - p.privateTxmgr = privateTxMgr - } else { - if cfg.PrivateTxmgrConfigs != nil && len(cfg.PrivateTxmgrConfigs.L1RPCURL) > 0 { - if p.privateTxmgr, err = txmgr.NewSimpleTxManager( - "privateTxProposer", - log.Root(), - &metrics.TxMgrMetrics, - *cfg.PrivateTxmgrConfigs, - ); err != nil { - return err - } - } else { - p.privateTxmgr = nil - } - } + + p.txmgrSelector = utils.NewTxMgrSelector(txMgr, privateTxMgr, nil) chainConfig := config.NewChainConfig(p.protocolConfigs) p.chainConfig = chainConfig @@ -410,20 +393,19 @@ func (p *Proposer) updateProposingTicker() { p.proposingTimer = time.NewTimer(duration) } +// sendTx is the internal function to send a transaction with a selected tx manager. func (p *Proposer) sendTx(ctx context.Context, txCandidate *txmgr.TxCandidate) error { - if p.privateTxmgr != nil { - receipt, err := p.privateTxmgr.Send(ctx, *txCandidate) - if err != nil || receipt.Status != types.ReceiptStatusSuccessful { - log.Warn("Failed to send TaikoL1.proposeBlock transaction by private tx manager", - "error", encoding.TryParsingCustomError(err), - ) - } else { - return nil - } - } - receipt, err := p.txmgr.Send(ctx, *txCandidate) + txMgr, isPrivate := p.txmgrSelector.Select() + receipt, err := txMgr.Send(ctx, *txCandidate) if err != nil { - log.Warn("Failed to send TaikoL1.proposeBlock transaction", "error", encoding.TryParsingCustomError(err)) + log.Warn( + "Failed to send TaikoL1.proposeBlock / TaikoL1.proposeBlockV2 transaction by tx manager", + "isPrivateMempool", isPrivate, + "error", encoding.TryParsingCustomError(err), + ) + if isPrivate { + p.txmgrSelector.RecordPrivateTxMgrFailed() + } return err } diff --git a/packages/taiko-client/proposer/proposer_test.go b/packages/taiko-client/proposer/proposer_test.go index a013f475c61..5d3d5f1c62a 100644 --- a/packages/taiko-client/proposer/proposer_test.go +++ b/packages/taiko-client/proposer/proposer_test.go @@ -162,7 +162,8 @@ func (s *ProposerTestSuite) TestProposeTxLists() { } for _, txCandidate := range txCandidates { - receipt, err := p.txmgr.Send(ctx, txCandidate) + txMgr, _ := p.txmgrSelector.Select() + receipt, err := txMgr.Send(ctx, txCandidate) s.Nil(err) s.Nil(encoding.TryParsingCustomErrorFromReceipt(ctx, p.rpc.L1, p.proposerAddress, receipt)) } diff --git a/packages/taiko-client/prover/proof_submitter/transaction/sender.go b/packages/taiko-client/prover/proof_submitter/transaction/sender.go index beffb831f56..d75f72241eb 100644 --- a/packages/taiko-client/prover/proof_submitter/transaction/sender.go +++ b/packages/taiko-client/prover/proof_submitter/transaction/sender.go @@ -14,6 +14,7 @@ import ( "github.com/taikoxyz/taiko-mono/packages/taiko-client/bindings/encoding" "github.com/taikoxyz/taiko-mono/packages/taiko-client/internal/metrics" + "github.com/taikoxyz/taiko-mono/packages/taiko-client/internal/utils" "github.com/taikoxyz/taiko-mono/packages/taiko-client/pkg/rpc" producer "github.com/taikoxyz/taiko-mono/packages/taiko-client/prover/proof_producer" ) @@ -21,8 +22,7 @@ import ( // Sender is responsible for sending proof submission transactions with a backoff policy. type Sender struct { rpc *rpc.Client - txmgr *txmgr.SimpleTxManager - privateTxmgr *txmgr.SimpleTxManager + txmgrSelector *utils.TxMgrSelector proverSetAddress common.Address gasLimit uint64 } @@ -37,8 +37,7 @@ func NewSender( ) *Sender { return &Sender{ rpc: cli, - txmgr: txmgr, - privateTxmgr: privateTxmgr, + txmgrSelector: utils.NewTxMgrSelector(txmgr, privateTxmgr, nil), proverSetAddress: proverSetAddress, gasLimit: gasLimit, } @@ -78,38 +77,28 @@ func (s *Sender) Send( } // Send the transaction. - var ( - next = true - receipt *types.Receipt - ) - if s.privateTxmgr != nil { - receipt, err = s.privateTxmgr.Send(ctx, *txCandidate) - if err != nil || receipt.Status != types.ReceiptStatusSuccessful { - log.Warn("Failed to send transaction by private tx manager in sender", - "error", encoding.TryParsingCustomError(err), - ) - } else { - next = false + txMgr, isPrivate := s.txmgrSelector.Select() + receipt, err := txMgr.Send(ctx, *txCandidate) + if err != nil { + if isPrivate { + s.txmgrSelector.RecordPrivateTxMgrFailed() } + return encoding.TryParsingCustomError(err) } - if next { - receipt, err = s.txmgr.Send(ctx, *txCandidate) - if err != nil { - return encoding.TryParsingCustomError(err) - } - if receipt.Status != types.ReceiptStatusSuccessful { - log.Error( - "Failed to submit proof", - "blockID", proofWithHeader.BlockID, - "tier", proofWithHeader.Tier, - "txHash", receipt.TxHash, - "error", encoding.TryParsingCustomErrorFromReceipt(ctx, s.rpc.L1, s.txmgr.From(), receipt), - ) - metrics.ProverSubmissionRevertedCounter.Add(1) - return ErrUnretryableSubmission - } + if receipt.Status != types.ReceiptStatusSuccessful { + log.Error( + "Failed to submit proof", + "blockID", proofWithHeader.BlockID, + "tier", proofWithHeader.Tier, + "txHash", receipt.TxHash, + "isPrivateMempool", isPrivate, + "error", encoding.TryParsingCustomErrorFromReceipt(ctx, s.rpc.L1, txMgr.From(), receipt), + ) + metrics.ProverSubmissionRevertedCounter.Add(1) + return ErrUnretryableSubmission } + log.Info( "💰 Your block proof was accepted", "blockID", proofWithHeader.BlockID,