From 40325bc346aace3bb8c42ed8cb54bd00173f4734 Mon Sep 17 00:00:00 2001 From: maskpp Date: Wed, 6 Mar 2024 13:57:43 +0800 Subject: [PATCH] fix(pkg): fix a bug in transaction sender (#606) Co-authored-by: David --- driver/driver_test.go | 6 +-- internal/sender/common.go | 44 +++++++++------- internal/sender/sender.go | 95 +++++++++++++++++++++++++--------- internal/sender/sender_test.go | 42 ++++++++++++++- internal/testutils/helper.go | 8 +++ proposer/proposer.go | 62 +++++++++++----------- proposer/proposer_test.go | 2 +- 7 files changed, 178 insertions(+), 81 deletions(-) diff --git a/driver/driver_test.go b/driver/driver_test.go index c4fc572a1..282ca6422 100644 --- a/driver/driver_test.go +++ b/driver/driver_test.go @@ -164,7 +164,7 @@ func (s *DriverTestSuite) TestCheckL1ReorgToHigherFork() { s.Equal(l1Head3.Hash(), l1Head1.Hash()) // Because of evm_revert operation, the nonce of the proposer need to be adjusted. - sender.AdjustNonce(nil) + s.Nil(sender.SetNonce(nil, true)) // Propose ten blocks on another fork for i := 0; i < 10; i++ { s.ProposeInvalidTxListBytes(s.p) @@ -225,7 +225,7 @@ func (s *DriverTestSuite) TestCheckL1ReorgToLowerFork() { s.Equal(l1Head3.Number.Uint64(), l1Head1.Number.Uint64()) s.Equal(l1Head3.Hash(), l1Head1.Hash()) - sender.AdjustNonce(nil) + s.Nil(sender.SetNonce(nil, true)) // Propose one blocks on another fork s.ProposeInvalidTxListBytes(s.p) @@ -283,7 +283,7 @@ func (s *DriverTestSuite) TestCheckL1ReorgToSameHeightFork() { s.Equal(l1Head3.Number.Uint64(), l1Head1.Number.Uint64()) s.Equal(l1Head3.Hash(), l1Head1.Hash()) - sender.AdjustNonce(nil) + s.Nil(sender.SetNonce(nil, true)) // Propose two blocks on another fork s.ProposeInvalidTxListBytes(s.p) time.Sleep(3 * time.Second) diff --git a/internal/sender/common.go b/internal/sender/common.go index f563d5fed..29cb1af4b 100644 --- a/internal/sender/common.go +++ b/internal/sender/common.go @@ -48,27 +48,33 @@ func (s *Sender) adjustGas(txData types.TxData) { } } -// AdjustNonce adjusts the nonce of the given transaction with the current nonce of the sender. -func (s *Sender) AdjustNonce(txData types.TxData) { - nonce, err := s.client.NonceAt(s.ctx, s.Opts.From, nil) - if err != nil { - log.Warn("Failed to get the nonce", "from", s.Opts.From, "err", err) - return +// SetNonce adjusts the nonce of the given transaction with the current nonce of the sender. +func (s *Sender) SetNonce(txData types.TxData, adjust bool) (err error) { + var nonce uint64 + if adjust { + s.nonce, err = s.client.NonceAt(s.ctx, s.Opts.From, nil) + if err != nil { + log.Warn("Failed to get the nonce", "from", s.Opts.From, "err", err) + return err + } } - s.Opts.Nonce = new(big.Int).SetUint64(nonce) - - switch tx := txData.(type) { - case *types.DynamicFeeTx: - tx.Nonce = nonce - case *types.BlobTx: - tx.Nonce = nonce - case *types.LegacyTx: - tx.Nonce = nonce - case *types.AccessListTx: - tx.Nonce = nonce - default: - log.Debug("Unsupported transaction type when adjust nonce", "from", s.Opts.From) + nonce = s.nonce + + if !utils.IsNil(txData) { + switch tx := txData.(type) { + case *types.DynamicFeeTx: + tx.Nonce = nonce + case *types.BlobTx: + tx.Nonce = nonce + case *types.LegacyTx: + tx.Nonce = nonce + case *types.AccessListTx: + tx.Nonce = nonce + default: + return fmt.Errorf("unsupported transaction type: %v", txData) + } } + return } // updateGasTipGasFee updates the gas tip cap and gas fee cap of the sender with the given chain head info. diff --git a/internal/sender/sender.go b/internal/sender/sender.go index 59b64ec2e..dd46d7793 100644 --- a/internal/sender/sender.go +++ b/internal/sender/sender.go @@ -78,7 +78,8 @@ type Sender struct { head *types.Header client *rpc.EthClient - Opts *bind.TransactOpts + nonce uint64 + Opts *bind.TransactOpts unconfirmedTxs cmap.ConcurrentMap[string, *TxToConfirm] txToConfirmCh cmap.ConcurrentMap[string, chan *TxToConfirm] @@ -110,6 +111,12 @@ func NewSender(ctx context.Context, cfg *Config, client *rpc.EthClient, priv *ec } } + // Get the nonce + nonce, err := client.NonceAt(ctx, opts.From, nil) + if err != nil { + return nil, err + } + // Get the chain ID head, err := client.HeaderByNumber(ctx, nil) if err != nil { @@ -121,13 +128,12 @@ func NewSender(ctx context.Context, cfg *Config, client *rpc.EthClient, priv *ec Config: cfg, head: head, client: client, + nonce: nonce, Opts: opts, unconfirmedTxs: cmap.New[*TxToConfirm](), txToConfirmCh: cmap.New[chan *TxToConfirm](), stopCh: make(chan struct{}), } - // Initialize the nonce - sender.AdjustNonce(nil) // Initialize the gas fee related fields if err = sender.updateGasTipGasFee(head); err != nil { @@ -177,6 +183,10 @@ func (s *Sender) GetUnconfirmedTx(txID string) *types.Transaction { // SendRawTransaction sends a transaction to the given Ethereum node. func (s *Sender) SendRawTransaction(nonce uint64, target *common.Address, value *big.Int, data []byte) (string, error) { + if s.unconfirmedTxs.Count() >= unconfirmedTxsCap { + return "", fmt.Errorf("too many pending transactions") + } + gasLimit := s.GasLimit if gasLimit == 0 { var err error @@ -192,16 +202,36 @@ func (s *Sender) SendRawTransaction(nonce uint64, target *common.Address, value return "", err } } - return s.SendTransaction(types.NewTx(&types.DynamicFeeTx{ - ChainID: s.client.ChainID, - To: target, - Nonce: nonce, - GasFeeCap: s.Opts.GasFeeCap, - GasTipCap: s.Opts.GasTipCap, - Gas: gasLimit, - Value: value, - Data: data, - })) + + txID := uuid.New() + txToConfirm := &TxToConfirm{ + ID: txID, + originalTx: &types.DynamicFeeTx{ + ChainID: s.client.ChainID, + To: target, + Nonce: nonce, + GasFeeCap: s.Opts.GasFeeCap, + GasTipCap: s.Opts.GasTipCap, + Gas: gasLimit, + Value: value, + Data: data, + }, + } + + if err := s.send(txToConfirm, false); err != nil && !strings.Contains(err.Error(), "replacement transaction") { + log.Error("Failed to send transaction", + "tx_id", txID, + "nonce", txToConfirm.CurrentTx.Nonce(), + "err", err, + ) + return "", err + } + + // Add the transaction to the unconfirmed transactions + s.unconfirmedTxs.Set(txID, txToConfirm) + s.txToConfirmCh.Set(txID, make(chan *TxToConfirm, 1)) + + return txID, nil } // SendTransaction sends a transaction to the given Ethereum node. @@ -222,7 +252,7 @@ func (s *Sender) SendTransaction(tx *types.Transaction) (string, error) { CurrentTx: tx, } - if err := s.send(txToConfirm); err != nil && !strings.Contains(err.Error(), "replacement transaction") { + if err := s.send(txToConfirm, true); err != nil && !strings.Contains(err.Error(), "replacement transaction") { log.Error("Failed to send transaction", "tx_id", txID, "nonce", txToConfirm.CurrentTx.Nonce(), @@ -240,12 +270,19 @@ func (s *Sender) SendTransaction(tx *types.Transaction) (string, error) { } // send is the internal method to send the given transaction. -func (s *Sender) send(tx *TxToConfirm) error { +func (s *Sender) send(tx *TxToConfirm, resetNonce bool) error { s.mu.Lock() defer s.mu.Unlock() originalTx := tx.originalTx + if resetNonce { + // Set the nonce of the transaction. + if err := s.SetNonce(originalTx, false); err != nil { + return err + } + } + for i := 0; i < nonceIncorrectRetrys; i++ { // Retry when nonce is incorrect rawTx, err := s.Opts.Signer(s.Opts.From, types.NewTx(originalTx)) @@ -258,13 +295,21 @@ func (s *Sender) send(tx *TxToConfirm) error { // Check if the error is nonce too low if err != nil { if strings.Contains(err.Error(), "nonce too low") { - s.AdjustNonce(originalTx) - log.Warn("Nonce is incorrect, retry sending the transaction with new nonce", - "tx_id", tx.ID, - "nonce", tx.CurrentTx.Nonce(), - "hash", rawTx.Hash(), - "err", err, - ) + if err := s.SetNonce(originalTx, true); err != nil { + log.Error("Failed to set nonce when appear nonce too low", + "tx_id", tx.ID, + "nonce", tx.CurrentTx.Nonce(), + "hash", rawTx.Hash(), + "err", err, + ) + } else { + log.Warn("Nonce is incorrect, retry sending the transaction with new nonce", + "tx_id", tx.ID, + "nonce", tx.CurrentTx.Nonce(), + "hash", rawTx.Hash(), + "err", err, + ) + } continue } if strings.Contains(err.Error(), "replacement transaction underpriced") { @@ -287,7 +332,7 @@ func (s *Sender) send(tx *TxToConfirm) error { } break } - s.Opts.Nonce = new(big.Int).Add(s.Opts.Nonce, common.Big1) + s.nonce++ return nil } @@ -340,7 +385,7 @@ func (s *Sender) resendUnconfirmedTxs() { s.releaseUnconfirmedTx(id) continue } - if err := s.send(unconfirmedTx); err != nil { + if err := s.send(unconfirmedTx, true); err != nil { log.Warn( "Failed to resend the transaction", "tx_id", id, @@ -390,7 +435,7 @@ func (s *Sender) checkPendingTransactionsConfirmation() { } pendingTx.Receipt = receipt if receipt.Status != types.ReceiptStatusSuccessful { - pendingTx.Err = fmt.Errorf("transaction reverted, hash: %s", receipt.TxHash) + pendingTx.Err = fmt.Errorf("transaction status is failed, hash: %s", receipt.TxHash) s.releaseUnconfirmedTx(id) continue } diff --git a/internal/sender/sender_test.go b/internal/sender/sender_test.go index 80e8722ca..325ae43d7 100644 --- a/internal/sender/sender_test.go +++ b/internal/sender/sender_test.go @@ -23,14 +23,50 @@ type SenderTestSuite struct { sender *sender.Sender } -func (s *SenderTestSuite) TestNormalSender() { +func (s *SenderTestSuite) TestSendTransaction() { + var ( + opts = s.sender.Opts + client = s.RPCClient.L1 + eg errgroup.Group + ) + eg.SetLimit(runtime.NumCPU()) + for i := 0; i < 8; i++ { + i := i + eg.Go(func() error { + to := common.BigToAddress(big.NewInt(int64(i))) + tx := types.NewTx(&types.DynamicFeeTx{ + ChainID: client.ChainID, + To: &to, + GasFeeCap: opts.GasFeeCap, + GasTipCap: opts.GasTipCap, + Gas: 21000000, + Value: big.NewInt(1), + Data: nil, + }) + + _, err := s.sender.SendTransaction(tx) + return err + }) + } + s.Nil(eg.Wait()) + + for _, confirmCh := range s.sender.TxToConfirmChannels() { + confirm := <-confirmCh + s.Nil(confirm.Err) + } +} + +func (s *SenderTestSuite) TestSendRawTransaction() { + nonce, err := s.RPCClient.L1.NonceAt(context.Background(), s.sender.Opts.From, nil) + s.Nil(err) + var eg errgroup.Group eg.SetLimit(runtime.NumCPU()) for i := 0; i < 5; i++ { i := i eg.Go(func() error { addr := common.BigToAddress(big.NewInt(int64(i))) - _, err := s.sender.SendRawTransaction(s.sender.Opts.Nonce.Uint64(), &addr, big.NewInt(1), nil) + _, err := s.sender.SendRawTransaction(nonce+uint64(i), &addr, big.NewInt(1), nil) return err }) } @@ -121,6 +157,7 @@ func (s *SenderTestSuite) TestNonceTooLow() { func (s *SenderTestSuite) SetupTest() { s.ClientTestSuite.SetupTest() + s.SetL1Automine(true) ctx := context.Background() priv, err := crypto.ToECDSA(common.FromHex(os.Getenv("L1_PROPOSER_PRIVATE_KEY"))) @@ -137,6 +174,7 @@ func (s *SenderTestSuite) SetupTest() { } func (s *SenderTestSuite) TearDownTest() { + s.SetL1Automine(false) s.sender.Close() s.ClientTestSuite.TearDownTest() } diff --git a/internal/testutils/helper.go b/internal/testutils/helper.go index 63bd9657e..5c43f94f9 100644 --- a/internal/testutils/helper.go +++ b/internal/testutils/helper.go @@ -28,6 +28,10 @@ func (s *ClientTestSuite) ProposeInvalidTxListBytes(proposer Proposer) { invalidTxListBytes := RandomBytes(256) s.Nil(proposer.ProposeTxList(context.Background(), invalidTxListBytes, 1)) + for _, confirmCh := range proposer.GetSender().TxToConfirmChannels() { + confirm := <-confirmCh + s.Nil(confirm.Err) + } } func (s *ClientTestSuite) ProposeAndInsertEmptyBlocks( @@ -54,6 +58,10 @@ func (s *ClientTestSuite) ProposeAndInsertEmptyBlocks( s.Nil(err) s.Nil(proposer.ProposeTxList(context.Background(), encoded, 0)) + for _, confirmCh := range proposer.GetSender().TxToConfirmChannels() { + confirm := <-confirmCh + s.Nil(confirm.Err) + } s.ProposeInvalidTxListBytes(proposer) diff --git a/proposer/proposer.go b/proposer/proposer.go index b9a6f4e73..a50db7a57 100644 --- a/proposer/proposer.go +++ b/proposer/proposer.go @@ -17,15 +17,13 @@ import ( "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/rlp" - "github.com/urfave/cli/v2" - "golang.org/x/sync/errgroup" - "github.com/taikoxyz/taiko-client/bindings" "github.com/taikoxyz/taiko-client/bindings/encoding" "github.com/taikoxyz/taiko-client/internal/metrics" "github.com/taikoxyz/taiko-client/internal/sender" "github.com/taikoxyz/taiko-client/pkg/rpc" selector "github.com/taikoxyz/taiko-client/proposer/prover_selector" + "github.com/urfave/cli/v2" ) var ( @@ -247,30 +245,29 @@ func (p *Proposer) ProposeOp(ctx context.Context) error { if len(txLists) == 0 { return errNoNewTxs } - g := new(errgroup.Group) - for i, txs := range txLists { - func(i int, txs types.Transactions) { - g.Go(func() error { - if i >= int(p.MaxProposedTxListsPerEpoch) { - return nil - } - txListBytes, err := rlp.EncodeToBytes(txs) - if err != nil { - return fmt.Errorf("failed to encode transactions: %w", err) - } + for i, txs := range txLists { + if i >= int(p.MaxProposedTxListsPerEpoch) { + return nil + } - if err := p.ProposeTxList(ctx, txListBytes, uint(txs.Len())); err != nil { - return fmt.Errorf("failed to propose transactions: %w", err) - } + txListBytes, err := rlp.EncodeToBytes(txs) + if err != nil { + return fmt.Errorf("failed to encode transactions: %w", err) + } - return nil - }) - }(i, txs) + if err := p.ProposeTxList(ctx, txListBytes, uint(txs.Len())); err != nil { + return fmt.Errorf("failed to propose transactions: %w", err) + } } - if err := g.Wait(); err != nil { - return fmt.Errorf("failed to propose transactions: %w", err) + // Wait for all transactions to be confirmed. + for _, confirmCh := range p.sender.TxToConfirmChannels() { + confirm := <-confirmCh + if confirm.Err != nil { + log.Error("ProposeTxList error", "tx_id", confirm.ID, "error", confirm.Err) + return confirm.Err + } } if p.AfterCommitHook != nil { @@ -444,7 +441,6 @@ func (p *Proposer) ProposeTxList( txListBytes []byte, txNum uint, ) error { - var txID string if err := backoff.Retry( func() error { if ctx.Err() != nil { @@ -470,7 +466,7 @@ func (p *Proposer) ProposeTxList( log.Warn("Failed to make taikoL1.proposeBlock transaction", "error", encoding.TryParsingCustomError(err)) return err } - txID, err = p.sender.SendTransaction(tx) + _, err = p.sender.SendTransaction(tx) if err != nil { log.Warn("Failed to send taikoL1.proposeBlock transaction", "error", encoding.TryParsingCustomError(err)) return err @@ -488,12 +484,6 @@ func (p *Proposer) ProposeTxList( return ctx.Err() } - // Waiting for the transaction to be confirmed. - confirm := <-p.sender.TxToConfirmChannel(txID) - if confirm.Err != nil { - return confirm.Err - } - log.Info("📝 Propose transactions succeeded", "txs", txNum) metrics.ProposerProposedTxListsCounter.Inc(1) @@ -508,7 +498,17 @@ func (p *Proposer) ProposeEmptyBlockOp(ctx context.Context) error { if err != nil { return err } - return p.ProposeTxList(ctx, emptyTxListBytes, 0) + if err = p.ProposeTxList(ctx, emptyTxListBytes, 0); err != nil { + return err + } + for _, confirmCh := range p.sender.TxToConfirmChannels() { + confirm := <-confirmCh + if confirm.Err != nil { + log.Error("ProposeEmptyBlockOp error", "td_id", confirm.ID, "error", confirm.Err) + return confirm.Err + } + } + return nil } // updateProposingTicker updates the internal proposing timer. diff --git a/proposer/proposer_test.go b/proposer/proposer_test.go index 8c465aa3a..230a3dff3 100644 --- a/proposer/proposer_test.go +++ b/proposer/proposer_test.go @@ -157,7 +157,7 @@ func (s *ProposerTestSuite) TestSendProposeBlockTx() { s.SetL1Automine(false) defer s.SetL1Automine(true) - sender.AdjustNonce(nil) + s.Nil(sender.SetNonce(nil, true)) fee := big.NewInt(10000) opts := sender.Opts