From 2273d105b5dfa6479dc2aa74c16fd0365d06e31a Mon Sep 17 00:00:00 2001 From: David Date: Fri, 21 Jul 2023 12:20:31 +0800 Subject: [PATCH] feat(proposer): handle transaction replacement underpriced error (#322) --- cmd/flags/proposer.go | 8 ++++ pkg/rpc/utils.go | 41 +++++++++++++++++ pkg/rpc/utils_test.go | 38 ++++++++++++++++ proposer/config.go | 66 +++++++++++++++------------ proposer/config_test.go | 3 ++ proposer/proposer.go | 96 ++++++++++++++++++++++++++++++++++----- proposer/proposer_test.go | 73 ++++++++++++++++++++++++----- testutils/suite.go | 4 ++ 8 files changed, 279 insertions(+), 50 deletions(-) diff --git a/cmd/flags/proposer.go b/cmd/flags/proposer.go index 9b2dac781..9a3c1414f 100644 --- a/cmd/flags/proposer.go +++ b/cmd/flags/proposer.go @@ -57,6 +57,13 @@ var ( } ProposeBlockTxGasLimit = &cli.Uint64Flag{ Name: "proposeBlockTxGasLimit", + Usage: "Gas limit will be used for TaikoL1.proposeBlock transactions", + Category: proposerCategory, + } + ProposeBlockTxReplacementMultiplier = &cli.Uint64Flag{ + Name: "proposeBlockTxReplacementMultiplier", + Value: 2, + Usage: "Gas tip multiplier when replacing a TaikoL1.proposeBlock transaction with same nonce", Category: proposerCategory, } ) @@ -73,4 +80,5 @@ var ProposerFlags = MergeFlags(CommonFlags, []cli.Flag{ MinBlockGasLimit, MaxProposedTxListsPerEpoch, ProposeBlockTxGasLimit, + ProposeBlockTxReplacementMultiplier, }) diff --git a/pkg/rpc/utils.go b/pkg/rpc/utils.go index 3ba7f0191..575ae83ca 100644 --- a/pkg/rpc/utils.go +++ b/pkg/rpc/utils.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "math/big" + "strconv" "strings" "time" @@ -119,6 +120,46 @@ func NeedNewProof( return false, nil } +type AccountPoolContent map[string]map[string]*types.Transaction + +// ContentFrom fetches a given account's transactions list from a node's transactions pool. +func ContentFrom( + ctx context.Context, + rawRPC *rpc.Client, + address common.Address, +) (AccountPoolContent, error) { + var result AccountPoolContent + return result, rawRPC.CallContext( + ctx, + &result, + "txpool_contentFrom", + address, + ) +} + +// GetPendingTxByNonce tries to retrieve a pending transaction with a given nonce in a node's mempool. +func GetPendingTxByNonce( + ctx context.Context, + cli *Client, + address common.Address, + nonce uint64, +) (*types.Transaction, error) { + content, err := ContentFrom(ctx, cli.L1RawRPC, address) + if err != nil { + return nil, err + } + + for _, txMap := range content { + for txNonce, tx := range txMap { + if txNonce == strconv.Itoa(int(nonce)) { + return tx, nil + } + } + } + + return nil, nil +} + // SetHead makes a `debug_setHead` RPC call to set the chain's head, should only be used // for testing purpose. func SetHead(ctx context.Context, rpc *rpc.Client, headNum *big.Int) error { diff --git a/pkg/rpc/utils_test.go b/pkg/rpc/utils_test.go index 1fbc59fd9..4eb9911fc 100644 --- a/pkg/rpc/utils_test.go +++ b/pkg/rpc/utils_test.go @@ -2,11 +2,14 @@ package rpc import ( "context" + "os" + "strconv" "testing" "time" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/crypto" "github.com/stretchr/testify/require" ) @@ -31,3 +34,38 @@ func TestStringToBytes32(t *testing.T) { require.Equal(t, [32]byte{}, StringToBytes32("")) require.Equal(t, [32]byte{0x61, 0x62, 0x63}, StringToBytes32("abc")) } + +func TestL1ContentFrom(t *testing.T) { + client := newTestClient(t) + l2Head, err := client.L2.HeaderByNumber(context.Background(), nil) + require.Nil(t, err) + + baseFee, err := client.TaikoL2.GetBasefee(nil, 0, 60000000, uint32(l2Head.GasUsed)) + require.Nil(t, err) + + testAddrPrivKey, err := crypto.ToECDSA(common.Hex2Bytes(os.Getenv("L1_PROPOSER_PRIVATE_KEY"))) + require.Nil(t, err) + + testAddr := crypto.PubkeyToAddress(testAddrPrivKey.PublicKey) + + nonce, err := client.L2.PendingNonceAt(context.Background(), testAddr) + require.Nil(t, err) + + tx := types.NewTransaction( + nonce, + testAddr, + common.Big1, + 100000, + baseFee, + []byte{}, + ) + signedTx, err := types.SignTx(tx, types.LatestSignerForChainID(client.L2ChainID), testAddrPrivKey) + require.Nil(t, err) + require.Nil(t, client.L2.SendTransaction(context.Background(), signedTx)) + + content, err := ContentFrom(context.Background(), client.L2RawRPC, testAddr) + require.Nil(t, err) + + require.NotZero(t, len(content["pending"])) + require.Equal(t, signedTx.Nonce(), content["pending"][strconv.Itoa(int(signedTx.Nonce()))].Nonce()) +} diff --git a/proposer/config.go b/proposer/config.go index 91502d2a3..9db37f7f9 100644 --- a/proposer/config.go +++ b/proposer/config.go @@ -14,20 +14,21 @@ import ( // Config contains all configurations to initialize a Taiko proposer. type Config struct { - L1Endpoint string - L2Endpoint string - TaikoL1Address common.Address - TaikoL2Address common.Address - L1ProposerPrivKey *ecdsa.PrivateKey - L2SuggestedFeeRecipient common.Address - ProposeInterval *time.Duration - CommitSlot uint64 - LocalAddresses []common.Address - ProposeEmptyBlocksInterval *time.Duration - MinBlockGasLimit uint64 - MaxProposedTxListsPerEpoch uint64 - ProposeBlockTxGasLimit *uint64 - BackOffRetryInterval time.Duration + L1Endpoint string + L2Endpoint string + TaikoL1Address common.Address + TaikoL2Address common.Address + L1ProposerPrivKey *ecdsa.PrivateKey + L2SuggestedFeeRecipient common.Address + ProposeInterval *time.Duration + CommitSlot uint64 + LocalAddresses []common.Address + ProposeEmptyBlocksInterval *time.Duration + MinBlockGasLimit uint64 + MaxProposedTxListsPerEpoch uint64 + ProposeBlockTxGasLimit *uint64 + BackOffRetryInterval time.Duration + ProposeBlockTxReplacementMultiplier uint64 } // NewConfigFromCliContext initializes a Config instance from @@ -80,20 +81,29 @@ func NewConfigFromCliContext(c *cli.Context) (*Config, error) { proposeBlockTxGasLimit = &gasLimit } + proposeBlockTxReplacementMultiplier := c.Uint64(flags.ProposeBlockTxReplacementMultiplier.Name) + if proposeBlockTxReplacementMultiplier == 0 { + return nil, fmt.Errorf( + "invalid --proposeBlockTxReplacementMultiplier value: %d", + proposeBlockTxReplacementMultiplier, + ) + } + return &Config{ - L1Endpoint: c.String(flags.L1WSEndpoint.Name), - L2Endpoint: c.String(flags.L2HTTPEndpoint.Name), - TaikoL1Address: common.HexToAddress(c.String(flags.TaikoL1Address.Name)), - TaikoL2Address: common.HexToAddress(c.String(flags.TaikoL2Address.Name)), - L1ProposerPrivKey: l1ProposerPrivKey, - L2SuggestedFeeRecipient: common.HexToAddress(l2SuggestedFeeRecipient), - ProposeInterval: proposingInterval, - CommitSlot: c.Uint64(flags.CommitSlot.Name), - LocalAddresses: localAddresses, - ProposeEmptyBlocksInterval: proposeEmptyBlocksInterval, - MinBlockGasLimit: c.Uint64(flags.MinBlockGasLimit.Name), - MaxProposedTxListsPerEpoch: c.Uint64(flags.MaxProposedTxListsPerEpoch.Name), - ProposeBlockTxGasLimit: proposeBlockTxGasLimit, - BackOffRetryInterval: time.Duration(c.Uint64(flags.BackOffRetryInterval.Name)) * time.Second, + L1Endpoint: c.String(flags.L1WSEndpoint.Name), + L2Endpoint: c.String(flags.L2HTTPEndpoint.Name), + TaikoL1Address: common.HexToAddress(c.String(flags.TaikoL1Address.Name)), + TaikoL2Address: common.HexToAddress(c.String(flags.TaikoL2Address.Name)), + L1ProposerPrivKey: l1ProposerPrivKey, + L2SuggestedFeeRecipient: common.HexToAddress(l2SuggestedFeeRecipient), + ProposeInterval: proposingInterval, + CommitSlot: c.Uint64(flags.CommitSlot.Name), + LocalAddresses: localAddresses, + ProposeEmptyBlocksInterval: proposeEmptyBlocksInterval, + MinBlockGasLimit: c.Uint64(flags.MinBlockGasLimit.Name), + MaxProposedTxListsPerEpoch: c.Uint64(flags.MaxProposedTxListsPerEpoch.Name), + ProposeBlockTxGasLimit: proposeBlockTxGasLimit, + BackOffRetryInterval: time.Duration(c.Uint64(flags.BackOffRetryInterval.Name)) * time.Second, + ProposeBlockTxReplacementMultiplier: proposeBlockTxReplacementMultiplier, }, nil } diff --git a/proposer/config_test.go b/proposer/config_test.go index 6615b2e2b..0a2fd4aea 100644 --- a/proposer/config_test.go +++ b/proposer/config_test.go @@ -36,6 +36,7 @@ func (s *ProposerTestSuite) TestNewConfigFromCliContext() { &cli.StringFlag{Name: flags.ProposeInterval.Name}, &cli.Uint64Flag{Name: flags.CommitSlot.Name}, &cli.StringFlag{Name: flags.TxPoolLocals.Name}, + &cli.Uint64Flag{Name: flags.ProposeBlockTxReplacementMultiplier.Name}, } app.Action = func(ctx *cli.Context) error { c, err := NewConfigFromCliContext(ctx) @@ -50,6 +51,7 @@ func (s *ProposerTestSuite) TestNewConfigFromCliContext() { s.Equal(uint64(commitSlot), c.CommitSlot) s.Equal(1, len(c.LocalAddresses)) s.Equal(goldenTouchAddress, c.LocalAddresses[0]) + s.Equal(uint64(5), c.ProposeBlockTxReplacementMultiplier) s.Nil(new(Proposer).InitFromCli(context.Background(), ctx)) return err @@ -66,5 +68,6 @@ func (s *ProposerTestSuite) TestNewConfigFromCliContext() { "-" + flags.ProposeInterval.Name, proposeInterval, "-" + flags.CommitSlot.Name, strconv.Itoa(commitSlot), "-" + flags.TxPoolLocals.Name, goldenTouchAddress.Hex(), + "-" + flags.ProposeBlockTxReplacementMultiplier.Name, "5", })) } diff --git a/proposer/proposer.go b/proposer/proposer.go index f75bf9c42..2c228da3a 100644 --- a/proposer/proposer.go +++ b/proposer/proposer.go @@ -8,9 +8,11 @@ import ( "math" "math/big" "math/rand" + "strings" "sync" "time" + "github.com/cenkalti/backoff/v4" "github.com/ethereum/go-ethereum/accounts/abi/bind" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" @@ -27,8 +29,9 @@ import ( ) var ( - errNoNewTxs = errors.New("no new transactions") - waitReceiptTimeout = 1 * time.Minute + errNoNewTxs = errors.New("no new transactions") + waitReceiptTimeout = 1 * time.Minute + maxSendProposeBlockTxRetry = 10 ) // Proposer keep proposing new transactions from L2 execution engine's tx pool at a fixed interval. @@ -50,6 +53,7 @@ type Proposer struct { minBlockGasLimit *uint64 maxProposedTxListsPerEpoch uint64 proposeBlockTxGasLimit *uint64 + txReplacementTipMultiplier uint64 // Protocol configurations protocolConfigs *bindings.TaikoDataConfig @@ -84,6 +88,7 @@ func InitFromConfig(ctx context.Context, p *Proposer, cfg *Config) (err error) { p.locals = cfg.LocalAddresses p.commitSlot = cfg.CommitSlot p.maxProposedTxListsPerEpoch = cfg.MaxProposedTxListsPerEpoch + p.txReplacementTipMultiplier = cfg.ProposeBlockTxReplacementMultiplier p.ctx = ctx // RPC clients @@ -270,14 +275,14 @@ func (p *Proposer) ProposeOp(ctx context.Context) error { return nil } -// ProposeTxList proposes the given transactions list to TaikoL1 smart contract. -func (p *Proposer) ProposeTxList( +// sendProposeBlockTx tries to send a TaikoL1.proposeBlock transaction. +func (p *Proposer) sendProposeBlockTx( ctx context.Context, meta *encoding.TaikoL1BlockMetadataInput, txListBytes []byte, - txNum uint, nonce *uint64, -) error { + isReplacement bool, +) (*types.Transaction, error) { if p.minBlockGasLimit != nil && meta.GasLimit < uint32(*p.minBlockGasLimit) { meta.GasLimit = uint32(*p.minBlockGasLimit) } @@ -285,12 +290,11 @@ func (p *Proposer) ProposeTxList( // Propose the transactions list inputs, err := encoding.EncodeProposeBlockInput(meta) if err != nil { - return err + return nil, err } - opts, err := getTxOpts(ctx, p.rpc.L1, p.l1ProposerPrivKey, p.rpc.L1ChainID) if err != nil { - return err + return nil, err } if nonce != nil { opts.Nonce = new(big.Int).SetUint64(*nonce) @@ -298,16 +302,86 @@ func (p *Proposer) ProposeTxList( if p.proposeBlockTxGasLimit != nil { opts.GasLimit = *p.proposeBlockTxGasLimit } + if isReplacement { + log.Info("Try replacing a transaction with same nonce", "sender", p.l1ProposerAddress, "nonce", nonce) + originalTx, err := rpc.GetPendingTxByNonce(ctx, p.rpc, p.l1ProposerAddress, *nonce) + if err != nil || originalTx == nil { + log.Warn( + "Original transaction not found", + "sender", p.l1ProposerAddress, + "nonce", nonce, + "error", err, + ) + + opts.GasTipCap = new(big.Int).Mul(opts.GasTipCap, new(big.Int).SetUint64(p.txReplacementTipMultiplier)) + } else { + log.Info( + "Original transaction to replace", + "sender", p.l1ProposerAddress, + "nonce", nonce, + "tx", originalTx, + ) + + opts.GasTipCap = new(big.Int).Mul( + originalTx.GasTipCap(), + new(big.Int).SetUint64(p.txReplacementTipMultiplier), + ) + } + } proposeTx, err := p.rpc.TaikoL1.ProposeBlock(opts, inputs, txListBytes) if err != nil { - return encoding.TryParsingCustomError(err) + return nil, encoding.TryParsingCustomError(err) + } + + return proposeTx, nil +} + +// ProposeTxList proposes the given transactions list to TaikoL1 smart contract. +func (p *Proposer) ProposeTxList( + ctx context.Context, + meta *encoding.TaikoL1BlockMetadataInput, + txListBytes []byte, + txNum uint, + nonce *uint64, +) error { + var ( + isReplacement bool + tx *types.Transaction + err error + ) + if err := backoff.Retry( + func() error { + if ctx.Err() != nil { + return nil + } + if tx, err = p.sendProposeBlockTx(ctx, meta, txListBytes, nonce, isReplacement); err != nil { + log.Warn("Failed to send propose block transaction, retrying", "error", err) + if strings.Contains(err.Error(), "replacement transaction underpriced") { + isReplacement = true + } else { + isReplacement = false + } + return err + } + + return nil + }, + backoff.WithMaxRetries(backoff.NewExponentialBackOff(), uint64(maxSendProposeBlockTxRetry)), + ); err != nil { + return err + } + if ctx.Err() != nil { + return ctx.Err() + } + if err != nil { + return err } ctxWithTimeout, cancel := context.WithTimeout(ctx, waitReceiptTimeout) defer cancel() - if _, err := rpc.WaitReceipt(ctxWithTimeout, p.rpc.L1, proposeTx); err != nil { + if _, err := rpc.WaitReceipt(ctxWithTimeout, p.rpc.L1, tx); err != nil { return err } diff --git a/proposer/proposer_test.go b/proposer/proposer_test.go index f0d6fa2b4..8fa7c1b19 100644 --- a/proposer/proposer_test.go +++ b/proposer/proposer_test.go @@ -10,8 +10,10 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/rlp" "github.com/stretchr/testify/suite" "github.com/taikoxyz/taiko-client/bindings" + "github.com/taikoxyz/taiko-client/bindings/encoding" "github.com/taikoxyz/taiko-client/testutils" ) @@ -32,14 +34,15 @@ func (s *ProposerTestSuite) SetupTest() { ctx, cancel := context.WithCancel(context.Background()) proposeInterval := 1024 * time.Hour // No need to periodically propose transactions list in unit tests s.Nil(InitFromConfig(ctx, p, (&Config{ - L1Endpoint: os.Getenv("L1_NODE_WS_ENDPOINT"), - L2Endpoint: os.Getenv("L2_EXECUTION_ENGINE_HTTP_ENDPOINT"), - TaikoL1Address: common.HexToAddress(os.Getenv("TAIKO_L1_ADDRESS")), - TaikoL2Address: common.HexToAddress(os.Getenv("TAIKO_L2_ADDRESS")), - L1ProposerPrivKey: l1ProposerPrivKey, - L2SuggestedFeeRecipient: common.HexToAddress(os.Getenv("L2_SUGGESTED_FEE_RECIPIENT")), - ProposeInterval: &proposeInterval, - MaxProposedTxListsPerEpoch: 1, + L1Endpoint: os.Getenv("L1_NODE_WS_ENDPOINT"), + L2Endpoint: os.Getenv("L2_EXECUTION_ENGINE_HTTP_ENDPOINT"), + TaikoL1Address: common.HexToAddress(os.Getenv("TAIKO_L1_ADDRESS")), + TaikoL2Address: common.HexToAddress(os.Getenv("TAIKO_L2_ADDRESS")), + L1ProposerPrivKey: l1ProposerPrivKey, + L2SuggestedFeeRecipient: common.HexToAddress(os.Getenv("L2_SUGGESTED_FEE_RECIPIENT")), + ProposeInterval: &proposeInterval, + MaxProposedTxListsPerEpoch: 1, + ProposeBlockTxReplacementMultiplier: 2, }))) s.p = p @@ -61,9 +64,6 @@ func (s *ProposerTestSuite) TestName() { } func (s *ProposerTestSuite) TestProposeOp() { - // Nothing to propose - s.EqualError(errNoNewTxs, s.p.ProposeOp(context.Background()).Error()) - // Propose txs in L2 execution engine's mempool sink := make(chan *bindings.TaikoL1ClientBlockProposed) @@ -130,6 +130,57 @@ func (s *ProposerTestSuite) TestCustomProposeOpHook() { s.True(flag) } +func (s *ProposerTestSuite) TestSendProposeBlockTx() { + opts, err := getTxOpts( + context.Background(), + s.p.rpc.L1, + s.p.l1ProposerPrivKey, + s.RpcClient.L1ChainID, + ) + s.Nil(err) + s.Greater(opts.GasTipCap.Uint64(), uint64(0)) + + nonce, err := s.RpcClient.L1.PendingNonceAt(context.Background(), s.p.l1ProposerAddress) + s.Nil(err) + + tx := types.NewTransaction( + nonce, + common.BytesToAddress([]byte{}), + common.Big1, + 100000, + opts.GasTipCap, + []byte{}, + ) + + s.SetL1Automine(false) + defer s.SetL1Automine(true) + + signedTx, err := types.SignTx(tx, types.LatestSignerForChainID(s.RpcClient.L1ChainID), s.p.l1ProposerPrivKey) + s.Nil(err) + s.Nil(s.RpcClient.L1.SendTransaction(context.Background(), signedTx)) + + var emptyTxs []types.Transaction + encoded, err := rlp.EncodeToBytes(emptyTxs) + s.Nil(err) + + newTx, err := s.p.sendProposeBlockTx( + context.Background(), + &encoding.TaikoL1BlockMetadataInput{ + Beneficiary: s.p.L2SuggestedFeeRecipient(), + GasLimit: 21000, + TxListHash: crypto.Keccak256Hash(encoded), + TxListByteStart: common.Big0, + TxListByteEnd: new(big.Int).SetUint64(uint64(len(encoded))), + CacheTxListInfo: 0, + }, + encoded, + &nonce, + true, + ) + s.Nil(err) + s.Greater(newTx.GasTipCap().Uint64(), tx.GasTipCap().Uint64()) +} + func (s *ProposerTestSuite) TestUpdateProposingTicker() { oneHour := 1 * time.Hour s.p.proposingInterval = &oneHour diff --git a/testutils/suite.go b/testutils/suite.go index 4af00825e..e7828854b 100644 --- a/testutils/suite.go +++ b/testutils/suite.go @@ -118,3 +118,7 @@ func (s *ClientTestSuite) TearDownTest() { s.Nil(rpc.SetHead(context.Background(), s.RpcClient.L2RawRPC, common.Big0)) } + +func (s *ClientTestSuite) SetL1Automine(automine bool) { + s.Nil(s.RpcClient.L1RawRPC.CallContext(context.Background(), nil, "evm_setAutomine", automine)) +}