From 5e1dfee09eb217282368ea0cb148bf776d351122 Mon Sep 17 00:00:00 2001 From: David Date: Tue, 8 Aug 2023 11:37:16 +0800 Subject: [PATCH 1/3] fix(pkg): set more RPC context timeout --- .../anchor_tx_constructor.go | 2 + pkg/rpc/methods.go | 94 +++++++++++-------- pkg/rpc/utils.go | 15 +++ 3 files changed, 74 insertions(+), 37 deletions(-) diff --git a/driver/anchor_tx_constructor/anchor_tx_constructor.go b/driver/anchor_tx_constructor/anchor_tx_constructor.go index 3522359a2..2c4eaeccc 100644 --- a/driver/anchor_tx_constructor/anchor_tx_constructor.go +++ b/driver/anchor_tx_constructor/anchor_tx_constructor.go @@ -102,6 +102,8 @@ func (c *AnchorTxConstructor) transactOpts( return nil, err } + log.Info("Golden touch account nonce", "nonce", nonce) + return &bind.TransactOpts{ From: c.goldenTouchAddress, Signer: func(address common.Address, tx *types.Transaction) (*types.Transaction, error) { diff --git a/pkg/rpc/methods.go b/pkg/rpc/methods.go index 313b7dbc4..0ef0bbdcc 100644 --- a/pkg/rpc/methods.go +++ b/pkg/rpc/methods.go @@ -33,14 +33,17 @@ var ( // ensureGenesisMatched fetches the L2 genesis block from TaikoL1 contract, // and checks whether the fetched genesis is same to the node local genesis. func (c *Client) ensureGenesisMatched(ctx context.Context) error { - stateVars, err := c.GetProtocolStateVariables(&bind.CallOpts{Context: ctx}) + ctxWithTimeout, cancel := ctxWithTimeoutOrDefault(ctx, defaultTimeout) + defer cancel() + + stateVars, err := c.GetProtocolStateVariables(&bind.CallOpts{Context: ctxWithTimeout}) if err != nil { return err } // Fetch the genesis `BlockVerified` event. iter, err := c.TaikoL1.FilterBlockVerified( - &bind.FilterOpts{Start: stateVars.GenesisHeight, End: &stateVars.GenesisHeight, Context: ctx}, + &bind.FilterOpts{Start: stateVars.GenesisHeight, End: &stateVars.GenesisHeight, Context: ctxWithTimeout}, []*big.Int{common.Big0}, ) if err != nil { @@ -48,7 +51,7 @@ func (c *Client) ensureGenesisMatched(ctx context.Context) error { } // Fetch the node's genesis block. - nodeGenesis, err := c.L2.HeaderByNumber(ctx, common.Big0) + nodeGenesis, err := c.L2.HeaderByNumber(ctxWithTimeout, common.Big0) if err != nil { return err } @@ -104,27 +107,29 @@ func (c *Client) WaitTillL2ExecutionEngineSynced(ctx context.Context) error { // LatestL2KnownL1Header fetches the L2 execution engine's latest known L1 header. func (c *Client) LatestL2KnownL1Header(ctx context.Context) (*types.Header, error) { - headL1Origin, err := c.L2.HeadL1Origin(ctx) + ctxWithTimeout, cancel := ctxWithTimeoutOrDefault(ctx, defaultTimeout) + defer cancel() + headL1Origin, err := c.L2.HeadL1Origin(ctxWithTimeout) if err != nil { switch err.Error() { case ethereum.NotFound.Error(): - return c.GetGenesisL1Header(ctx) + return c.GetGenesisL1Header(ctxWithTimeout) default: return nil, err } } if headL1Origin == nil { - return c.GetGenesisL1Header(ctx) + return c.GetGenesisL1Header(ctxWithTimeout) } - header, err := c.L1.HeaderByHash(ctx, headL1Origin.L1BlockHash) + header, err := c.L1.HeaderByHash(ctxWithTimeout, headL1Origin.L1BlockHash) if err != nil { switch err.Error() { case ethereum.NotFound.Error(): log.Warn("Latest L2 known L1 header not found, use genesis instead", "hash", headL1Origin.L1BlockHash) - return c.GetGenesisL1Header(ctx) + return c.GetGenesisL1Header(ctxWithTimeout) default: return nil, err } @@ -137,33 +142,39 @@ func (c *Client) LatestL2KnownL1Header(ctx context.Context) (*types.Header, erro // GetGenesisL1Header fetches the L1 header that including L2 genesis block. func (c *Client) GetGenesisL1Header(ctx context.Context) (*types.Header, error) { - stateVars, err := c.GetProtocolStateVariables(&bind.CallOpts{Context: ctx}) + ctxWithTimeout, cancel := ctxWithTimeoutOrDefault(ctx, defaultTimeout) + defer cancel() + + stateVars, err := c.GetProtocolStateVariables(&bind.CallOpts{Context: ctxWithTimeout}) if err != nil { return nil, err } - return c.L1.HeaderByNumber(ctx, new(big.Int).SetUint64(stateVars.GenesisHeight)) + return c.L1.HeaderByNumber(ctxWithTimeout, new(big.Int).SetUint64(stateVars.GenesisHeight)) } // L2ParentByBlockId fetches the block header from L2 execution engine with the largest block id that // smaller than the given `blockId`. func (c *Client) L2ParentByBlockId(ctx context.Context, blockID *big.Int) (*types.Header, error) { + ctxWithTimeout, cancel := ctxWithTimeoutOrDefault(ctx, defaultTimeout) + defer cancel() + parentBlockId := new(big.Int).Sub(blockID, common.Big1) log.Debug("Get parent block by block ID", "parentBlockId", parentBlockId) if parentBlockId.Cmp(common.Big0) == 0 { - return c.L2.HeaderByNumber(ctx, common.Big0) + return c.L2.HeaderByNumber(ctxWithTimeout, common.Big0) } - l1Origin, err := c.L2.L1OriginByID(ctx, parentBlockId) + l1Origin, err := c.L2.L1OriginByID(ctxWithTimeout, parentBlockId) if err != nil { return nil, err } log.Debug("Parent block L1 origin", "l1Origin", l1Origin, "parentBlockId", parentBlockId) - return c.L2.HeaderByHash(ctx, l1Origin.L2BlockHash) + return c.L2.HeaderByHash(ctxWithTimeout, l1Origin.L2BlockHash) } // WaitL1Origin keeps waiting until the L1Origin with given block ID appears on the L2 execution engine. @@ -219,6 +230,9 @@ func (c *Client) GetPoolContent( locals []common.Address, maxTransactionsLists uint64, ) ([]types.Transactions, error) { + ctxWithTimeout, cancel := ctxWithTimeoutOrDefault(ctx, defaultTimeout) + defer cancel() + var localsArg []string for _, local := range locals { localsArg = append(localsArg, local.Hex()) @@ -226,7 +240,7 @@ func (c *Client) GetPoolContent( var result []types.Transactions err := c.L2RawRPC.CallContext( - ctx, + ctxWithTimeout, &result, "taiko_txPoolContent", beneficiary, @@ -247,8 +261,11 @@ func (c *Client) L2AccountNonce( account common.Address, height *big.Int, ) (uint64, error) { + ctxWithTimeout, cancel := ctxWithTimeoutOrDefault(ctx, defaultTimeout) + defer cancel() + var result hexutil.Uint64 - err := c.L2RawRPC.CallContext(ctx, &result, "eth_getTransactionCount", account, hexutil.EncodeBig(height)) + err := c.L2RawRPC.CallContext(ctxWithTimeout, &result, "eth_getTransactionCount", account, hexutil.EncodeBig(height)) return uint64(result), err } @@ -262,18 +279,19 @@ type L2SyncProgress struct { // L2ExecutionEngineSyncProgress fetches the sync progress of the given L2 execution engine. func (c *Client) L2ExecutionEngineSyncProgress(ctx context.Context) (*L2SyncProgress, error) { + ctxWithTimeout, cancel := ctxWithTimeoutOrDefault(ctx, defaultTimeout) + defer cancel() + var ( progress = new(L2SyncProgress) err error ) - - g, ctx := errgroup.WithContext(ctx) + g, ctx := errgroup.WithContext(ctxWithTimeout) g.Go(func() error { progress.SyncProgress, err = c.L2.SyncProgress(ctx) return err }) - g.Go(func() error { stateVars, err := c.GetProtocolStateVariables(&bind.CallOpts{Context: ctx}) if err != nil { @@ -282,7 +300,6 @@ func (c *Client) L2ExecutionEngineSyncProgress(ctx context.Context) (*L2SyncProg progress.HighestBlockID = new(big.Int).SetUint64(stateVars.NumBlocks - 1) return nil }) - g.Go(func() error { headL1Origin, err := c.L2.HeadL1Origin(ctx) if err != nil { @@ -335,14 +352,8 @@ func (c *Client) GetStorageRoot( contract common.Address, height *big.Int, ) (common.Hash, error) { - var ( - ctxWithTimeout = ctx - cancel context.CancelFunc - ) - if _, ok := ctx.Deadline(); !ok { - ctxWithTimeout, cancel = context.WithTimeout(ctx, defaultTimeout) - defer cancel() - } + ctxWithTimeout, cancel := ctxWithTimeoutOrDefault(ctx, defaultTimeout) + defer cancel() proof, err := gethclient.GetProof( ctxWithTimeout, @@ -366,14 +377,17 @@ func (c *Client) CheckL1ReorgFromL2EE(ctx context.Context, blockID *big.Int) (bo blockIDToReset *big.Int ) for { + ctxWithTimeout, cancel := ctxWithTimeoutOrDefault(ctx, defaultTimeout) + defer cancel() + if blockID.Cmp(common.Big0) == 0 { - stateVars, err := c.TaikoL1.GetStateVariables(&bind.CallOpts{Context: ctx}) + stateVars, err := c.TaikoL1.GetStateVariables(&bind.CallOpts{Context: ctxWithTimeout}) if err != nil { return false, nil, nil, err } if l1CurrentToReset, err = c.L1.HeaderByNumber( - ctx, + ctxWithTimeout, new(big.Int).SetUint64(stateVars.GenesisHeight), ); err != nil { return false, nil, nil, err @@ -383,14 +397,14 @@ func (c *Client) CheckL1ReorgFromL2EE(ctx context.Context, blockID *big.Int) (bo break } - l1Origin, err := c.L2.L1OriginByID(ctx, blockID) + l1Origin, err := c.L2.L1OriginByID(ctxWithTimeout, blockID) if err != nil { if err.Error() == ethereum.NotFound.Error() { log.Info("L1Origin not found", "blockID", blockID) // If the L2 EE is just synced through P2P, there is a chance that the EE do not have // the chain head L1Origin information recorded. - justSyncedByP2P, err := c.IsJustSyncedByP2P(ctx) + justSyncedByP2P, err := c.IsJustSyncedByP2P(ctxWithTimeout) if err != nil { return false, nil, @@ -416,7 +430,7 @@ func (c *Client) CheckL1ReorgFromL2EE(ctx context.Context, blockID *big.Int) (bo return false, nil, nil, err } - l1Header, err := c.L1.HeaderByNumber(ctx, l1Origin.L1BlockHeight) + l1Header, err := c.L1.HeaderByNumber(ctxWithTimeout, l1Origin.L1BlockHeight) if err != nil { if err.Error() == ethereum.NotFound.Error() { continue @@ -465,8 +479,11 @@ func (c *Client) CheckL1ReorgFromL1Cursor( l1CurrentToReset *types.Header ) for { + ctxWithTimeout, cancel := ctxWithTimeoutOrDefault(ctx, defaultTimeout) + defer cancel() + if l1Current.Number.Uint64() <= genesisHeightL1 { - newL1Current, err := c.L1.HeaderByNumber(ctx, new(big.Int).SetUint64(genesisHeightL1)) + newL1Current, err := c.L1.HeaderByNumber(ctxWithTimeout, new(big.Int).SetUint64(genesisHeightL1)) if err != nil { return false, nil, nil, err } @@ -475,7 +492,7 @@ func (c *Client) CheckL1ReorgFromL1Cursor( break } - l1Header, err := c.L1.BlockByNumber(ctx, l1Current.Number) + l1Header, err := c.L1.BlockByNumber(ctxWithTimeout, l1Current.Number) if err != nil { if err.Error() == ethereum.NotFound.Error() { continue @@ -492,7 +509,7 @@ func (c *Client) CheckL1ReorgFromL1Cursor( "l1HashNew", l1Header.Hash(), ) reorged = true - if l1Current, err = c.L1.HeaderByHash(ctx, l1Current.ParentHash); err != nil { + if l1Current, err = c.L1.HeaderByHash(ctxWithTimeout, l1Current.ParentHash); err != nil { return false, nil, nil, err } continue @@ -515,12 +532,15 @@ func (c *Client) CheckL1ReorgFromL1Cursor( // IsJustSyncedByP2P checks whether the given L2 execution engine has just finished a P2P // sync. func (c *Client) IsJustSyncedByP2P(ctx context.Context) (bool, error) { - l2Head, err := c.L2.HeaderByNumber(ctx, nil) + ctxWithTimeout, cancel := ctxWithTimeoutOrDefault(ctx, defaultTimeout) + defer cancel() + + l2Head, err := c.L2.HeaderByNumber(ctxWithTimeout, nil) if err != nil { return false, err } - if _, err = c.L2.L1OriginByID(ctx, l2Head.Number); err != nil { + if _, err = c.L2.L1OriginByID(ctxWithTimeout, l2Head.Number); err != nil { if err.Error() == ethereum.NotFound.Error() { return true, nil } diff --git a/pkg/rpc/utils.go b/pkg/rpc/utils.go index 06e03c9ba..66d705acd 100644 --- a/pkg/rpc/utils.go +++ b/pkg/rpc/utils.go @@ -190,3 +190,18 @@ func IsArchiveNode(ctx context.Context, client *EthClient, l2GenesisHeight uint6 return true, nil } + +// ctxWithTimeoutOrDefault sets a context timeout if the deadline has not passed or is not set, +// and otherwise returns the context as passed in. cancel func is always set to an empty function +// so is safe to defer the cancel. +func ctxWithTimeoutOrDefault(ctx context.Context, defaultTimeout time.Duration) (context.Context, context.CancelFunc) { + var ( + ctxWithTimeout = ctx + cancel context.CancelFunc = func() {} + ) + if _, ok := ctx.Deadline(); !ok { + ctxWithTimeout, cancel = context.WithTimeout(ctx, defaultTimeout) + } + + return ctxWithTimeout, cancel +} From a93b09a4491e9309e93f6f6474582204bdd1cd6f Mon Sep 17 00:00:00 2001 From: David Date: Tue, 8 Aug 2023 11:49:27 +0800 Subject: [PATCH 2/3] feat: update ethclient --- pkg/rpc/ethclient.go | 75 ++++++++++++++++++-------------------------- 1 file changed, 30 insertions(+), 45 deletions(-) diff --git a/pkg/rpc/ethclient.go b/pkg/rpc/ethclient.go index fb6073be7..f5378d879 100644 --- a/pkg/rpc/ethclient.go +++ b/pkg/rpc/ethclient.go @@ -42,24 +42,9 @@ func NewEthClientWithDefaultTimeout( return &EthClient{Client: ethclient, timeout: defaultTimeout} } -// ctxWithTimeoutOrDefault sets a context timeout if the deadline has not passed or is not set, -// and otherwise returns the context as passed in. cancel func is always set to an empty function -// so is safe to defer the cancel. -func (c *EthClient) ctxWithTimeoutOrDefault(ctx context.Context) (context.Context, context.CancelFunc) { - var ( - ctxWithTimeout = ctx - cancel context.CancelFunc = func() {} - ) - if _, ok := ctx.Deadline(); !ok { - ctxWithTimeout, cancel = context.WithTimeout(ctx, c.timeout) - } - - return ctxWithTimeout, cancel -} - // ChainID retrieves the current chain ID for transaction replay protection. func (c *EthClient) ChainID(ctx context.Context) (*big.Int, error) { - ctxWithTimeout, cancel := c.ctxWithTimeoutOrDefault(ctx) + ctxWithTimeout, cancel := ctxWithTimeoutOrDefault(ctx, c.timeout) defer cancel() return c.Client.ChainID(ctxWithTimeout) @@ -70,7 +55,7 @@ func (c *EthClient) ChainID(ctx context.Context) (*big.Int, error) { // Note that loading full blocks requires two requests. Use HeaderByHash // if you don't need all transactions or uncle headers. func (c *EthClient) BlockByHash(ctx context.Context, hash common.Hash) (*types.Block, error) { - ctxWithTimeout, cancel := c.ctxWithTimeoutOrDefault(ctx) + ctxWithTimeout, cancel := ctxWithTimeoutOrDefault(ctx, c.timeout) defer cancel() return c.Client.BlockByHash(ctxWithTimeout, hash) @@ -82,7 +67,7 @@ func (c *EthClient) BlockByHash(ctx context.Context, hash common.Hash) (*types.B // Note that loading full blocks requires two requests. Use HeaderByNumber // if you don't need all transactions or uncle headers. func (c *EthClient) BlockByNumber(ctx context.Context, number *big.Int) (*types.Block, error) { - ctxWithTimeout, cancel := c.ctxWithTimeoutOrDefault(ctx) + ctxWithTimeout, cancel := ctxWithTimeoutOrDefault(ctx, c.timeout) defer cancel() return c.Client.BlockByNumber(ctxWithTimeout, number) @@ -90,7 +75,7 @@ func (c *EthClient) BlockByNumber(ctx context.Context, number *big.Int) (*types. // BlockNumber returns the most recent block number func (c *EthClient) BlockNumber(ctx context.Context) (uint64, error) { - ctxWithTimeout, cancel := c.ctxWithTimeoutOrDefault(ctx) + ctxWithTimeout, cancel := ctxWithTimeoutOrDefault(ctx, c.timeout) defer cancel() return c.Client.BlockNumber(ctxWithTimeout) @@ -98,7 +83,7 @@ func (c *EthClient) BlockNumber(ctx context.Context) (uint64, error) { // PeerCount returns the number of p2p peers as reported by the net_peerCount method. func (c *EthClient) PeerCount(ctx context.Context) (uint64, error) { - ctxWithTimeout, cancel := c.ctxWithTimeoutOrDefault(ctx) + ctxWithTimeout, cancel := ctxWithTimeoutOrDefault(ctx, c.timeout) defer cancel() return c.Client.PeerCount(ctxWithTimeout) @@ -106,7 +91,7 @@ func (c *EthClient) PeerCount(ctx context.Context) (uint64, error) { // HeaderByHash returns the block header with the given hash. func (c *EthClient) HeaderByHash(ctx context.Context, hash common.Hash) (*types.Header, error) { - ctxWithTimeout, cancel := c.ctxWithTimeoutOrDefault(ctx) + ctxWithTimeout, cancel := ctxWithTimeoutOrDefault(ctx, c.timeout) defer cancel() return c.Client.HeaderByHash(ctxWithTimeout, hash) @@ -115,7 +100,7 @@ func (c *EthClient) HeaderByHash(ctx context.Context, hash common.Hash) (*types. // HeaderByNumber returns a block header from the current canonical chain. If number is // nil, the latest known header is returned. func (c *EthClient) HeaderByNumber(ctx context.Context, number *big.Int) (*types.Header, error) { - ctxWithTimeout, cancel := c.ctxWithTimeoutOrDefault(ctx) + ctxWithTimeout, cancel := ctxWithTimeoutOrDefault(ctx, c.timeout) defer cancel() return c.Client.HeaderByNumber(ctxWithTimeout, number) @@ -126,7 +111,7 @@ func (c *EthClient) TransactionByHash( ctx context.Context, hash common.Hash, ) (tx *types.Transaction, isPending bool, err error) { - ctxWithTimeout, cancel := c.ctxWithTimeoutOrDefault(ctx) + ctxWithTimeout, cancel := ctxWithTimeoutOrDefault(ctx, c.timeout) defer cancel() return c.Client.TransactionByHash(ctxWithTimeout, hash) @@ -144,7 +129,7 @@ func (c *EthClient) TransactionSender( block common.Hash, index uint, ) (common.Address, error) { - ctxWithTimeout, cancel := c.ctxWithTimeoutOrDefault(ctx) + ctxWithTimeout, cancel := ctxWithTimeoutOrDefault(ctx, c.timeout) defer cancel() return c.Client.TransactionSender(ctxWithTimeout, tx, block, index) @@ -152,7 +137,7 @@ func (c *EthClient) TransactionSender( // TransactionCount returns the total number of transactions in the given block. func (c *EthClient) TransactionCount(ctx context.Context, blockHash common.Hash) (uint, error) { - ctxWithTimeout, cancel := c.ctxWithTimeoutOrDefault(ctx) + ctxWithTimeout, cancel := ctxWithTimeoutOrDefault(ctx, c.timeout) defer cancel() return c.Client.TransactionCount(ctxWithTimeout, blockHash) @@ -164,7 +149,7 @@ func (c *EthClient) TransactionInBlock( blockHash common.Hash, index uint, ) (*types.Transaction, error) { - ctxWithTimeout, cancel := c.ctxWithTimeoutOrDefault(ctx) + ctxWithTimeout, cancel := ctxWithTimeoutOrDefault(ctx, c.timeout) defer cancel() return c.Client.TransactionInBlock(ctxWithTimeout, blockHash, index) @@ -173,7 +158,7 @@ func (c *EthClient) TransactionInBlock( // SyncProgress retrieves the current progress of the sync algorithm. If there's // no sync currently running, it returns nil. func (c *EthClient) SyncProgress(ctx context.Context) (*ethereum.SyncProgress, error) { - ctxWithTimeout, cancel := c.ctxWithTimeoutOrDefault(ctx) + ctxWithTimeout, cancel := ctxWithTimeoutOrDefault(ctx, c.timeout) defer cancel() return c.Client.SyncProgress(ctxWithTimeout) @@ -181,7 +166,7 @@ func (c *EthClient) SyncProgress(ctx context.Context) (*ethereum.SyncProgress, e // NetworkID returns the network ID for this client. func (c *EthClient) NetworkID(ctx context.Context) (*big.Int, error) { - ctxWithTimeout, cancel := c.ctxWithTimeoutOrDefault(ctx) + ctxWithTimeout, cancel := ctxWithTimeoutOrDefault(ctx, c.timeout) defer cancel() return c.Client.NetworkID(ctxWithTimeout) @@ -194,7 +179,7 @@ func (c *EthClient) BalanceAt( account common.Address, blockNumber *big.Int, ) (*big.Int, error) { - ctxWithTimeout, cancel := c.ctxWithTimeoutOrDefault(ctx) + ctxWithTimeout, cancel := ctxWithTimeoutOrDefault(ctx, c.timeout) defer cancel() return c.Client.BalanceAt(ctxWithTimeout, account, blockNumber) @@ -208,7 +193,7 @@ func (c *EthClient) StorageAt( key common.Hash, blockNumber *big.Int, ) ([]byte, error) { - ctxWithTimeout, cancel := c.ctxWithTimeoutOrDefault(ctx) + ctxWithTimeout, cancel := ctxWithTimeoutOrDefault(ctx, c.timeout) defer cancel() return c.Client.StorageAt(ctxWithTimeout, account, key, blockNumber) @@ -221,7 +206,7 @@ func (c *EthClient) CodeAt( account common.Address, blockNumber *big.Int, ) ([]byte, error) { - ctxWithTimeout, cancel := c.ctxWithTimeoutOrDefault(ctx) + ctxWithTimeout, cancel := ctxWithTimeoutOrDefault(ctx, c.timeout) defer cancel() return c.Client.CodeAt(ctxWithTimeout, account, blockNumber) @@ -234,7 +219,7 @@ func (c *EthClient) NonceAt( account common.Address, blockNumber *big.Int, ) (uint64, error) { - ctxWithTimeout, cancel := c.ctxWithTimeoutOrDefault(ctx) + ctxWithTimeout, cancel := ctxWithTimeoutOrDefault(ctx, c.timeout) defer cancel() return c.Client.NonceAt(ctxWithTimeout, account, blockNumber) @@ -242,7 +227,7 @@ func (c *EthClient) NonceAt( // PendingBalanceAt returns the wei balance of the given account in the pending state. func (c *EthClient) PendingBalanceAt(ctx context.Context, account common.Address) (*big.Int, error) { - ctxWithTimeout, cancel := c.ctxWithTimeoutOrDefault(ctx) + ctxWithTimeout, cancel := ctxWithTimeoutOrDefault(ctx, c.timeout) defer cancel() return c.Client.PendingBalanceAt(ctxWithTimeout, account) @@ -254,7 +239,7 @@ func (c *EthClient) PendingStorageAt( account common.Address, key common.Hash, ) ([]byte, error) { - ctxWithTimeout, cancel := c.ctxWithTimeoutOrDefault(ctx) + ctxWithTimeout, cancel := ctxWithTimeoutOrDefault(ctx, c.timeout) defer cancel() return c.Client.PendingStorageAt(ctxWithTimeout, account, key) @@ -262,7 +247,7 @@ func (c *EthClient) PendingStorageAt( // PendingCodeAt returns the contract code of the given account in the pending state. func (c *EthClient) PendingCodeAt(ctx context.Context, account common.Address) ([]byte, error) { - ctxWithTimeout, cancel := c.ctxWithTimeoutOrDefault(ctx) + ctxWithTimeout, cancel := ctxWithTimeoutOrDefault(ctx, c.timeout) defer cancel() return c.Client.PendingCodeAt(ctxWithTimeout, account) @@ -271,7 +256,7 @@ func (c *EthClient) PendingCodeAt(ctx context.Context, account common.Address) ( // PendingNonceAt returns the account nonce of the given account in the pending state. // This is the nonce that should be used for the next transaction. func (c *EthClient) PendingNonceAt(ctx context.Context, account common.Address) (uint64, error) { - ctxWithTimeout, cancel := c.ctxWithTimeoutOrDefault(ctx) + ctxWithTimeout, cancel := ctxWithTimeoutOrDefault(ctx, c.timeout) defer cancel() return c.Client.PendingNonceAt(ctxWithTimeout, account) @@ -279,7 +264,7 @@ func (c *EthClient) PendingNonceAt(ctx context.Context, account common.Address) // PendingTransactionCount returns the total number of transactions in the pending state. func (c *EthClient) PendingTransactionCount(ctx context.Context) (uint, error) { - ctxWithTimeout, cancel := c.ctxWithTimeoutOrDefault(ctx) + ctxWithTimeout, cancel := ctxWithTimeoutOrDefault(ctx, c.timeout) defer cancel() return c.Client.PendingTransactionCount(ctxWithTimeout) @@ -296,7 +281,7 @@ func (c *EthClient) CallContract( msg ethereum.CallMsg, blockNumber *big.Int, ) ([]byte, error) { - ctxWithTimeout, cancel := c.ctxWithTimeoutOrDefault(ctx) + ctxWithTimeout, cancel := ctxWithTimeoutOrDefault(ctx, c.timeout) defer cancel() return c.Client.CallContract(ctxWithTimeout, msg, blockNumber) @@ -309,7 +294,7 @@ func (c *EthClient) CallContractAtHash( msg ethereum.CallMsg, blockHash common.Hash, ) ([]byte, error) { - ctxWithTimeout, cancel := c.ctxWithTimeoutOrDefault(ctx) + ctxWithTimeout, cancel := ctxWithTimeoutOrDefault(ctx, c.timeout) defer cancel() return c.Client.CallContractAtHash(ctxWithTimeout, msg, blockHash) @@ -318,7 +303,7 @@ func (c *EthClient) CallContractAtHash( // PendingCallContract executes a message call transaction using the EVM. // The state seen by the contract call is the pending state. func (c *EthClient) PendingCallContract(ctx context.Context, msg ethereum.CallMsg) ([]byte, error) { - ctxWithTimeout, cancel := c.ctxWithTimeoutOrDefault(ctx) + ctxWithTimeout, cancel := ctxWithTimeoutOrDefault(ctx, c.timeout) defer cancel() return c.Client.PendingCallContract(ctxWithTimeout, msg) @@ -327,7 +312,7 @@ func (c *EthClient) PendingCallContract(ctx context.Context, msg ethereum.CallMs // SuggestGasPrice retrieves the currently suggested gas price to allow a timely // execution of a transaction. func (c *EthClient) SuggestGasPrice(ctx context.Context) (*big.Int, error) { - ctxWithTimeout, cancel := c.ctxWithTimeoutOrDefault(ctx) + ctxWithTimeout, cancel := ctxWithTimeoutOrDefault(ctx, c.timeout) defer cancel() return c.Client.SuggestGasPrice(ctxWithTimeout) @@ -336,7 +321,7 @@ func (c *EthClient) SuggestGasPrice(ctx context.Context) (*big.Int, error) { // SuggestGasTipCap retrieves the currently suggested gas tip cap after 1559 to // allow a timely execution of a transaction. func (c *EthClient) SuggestGasTipCap(ctx context.Context) (*big.Int, error) { - ctxWithTimeout, cancel := c.ctxWithTimeoutOrDefault(ctx) + ctxWithTimeout, cancel := ctxWithTimeoutOrDefault(ctx, c.timeout) defer cancel() return c.Client.SuggestGasTipCap(ctxWithTimeout) @@ -349,7 +334,7 @@ func (c *EthClient) FeeHistory( lastBlock *big.Int, rewardPercentiles []float64, ) (*ethereum.FeeHistory, error) { - ctxWithTimeout, cancel := c.ctxWithTimeoutOrDefault(ctx) + ctxWithTimeout, cancel := ctxWithTimeoutOrDefault(ctx, c.timeout) defer cancel() return c.Client.FeeHistory(ctxWithTimeout, blockCount, lastBlock, rewardPercentiles) @@ -360,7 +345,7 @@ func (c *EthClient) FeeHistory( // the true gas limit requirement as other transactions may be added or removed by miners, // but it should provide a basis for setting a reasonable default. func (c *EthClient) EstimateGas(ctx context.Context, msg ethereum.CallMsg) (uint64, error) { - ctxWithTimeout, cancel := c.ctxWithTimeoutOrDefault(ctx) + ctxWithTimeout, cancel := ctxWithTimeoutOrDefault(ctx, c.timeout) defer cancel() return c.Client.EstimateGas(ctxWithTimeout, msg) @@ -371,7 +356,7 @@ func (c *EthClient) EstimateGas(ctx context.Context, msg ethereum.CallMsg) (uint // If the transaction was a contract creation use the TransactionReceipt method to get the // contract address after the transaction has been mined. func (c *EthClient) SendTransaction(ctx context.Context, tx *types.Transaction) error { - ctxWithTimeout, cancel := c.ctxWithTimeoutOrDefault(ctx) + ctxWithTimeout, cancel := ctxWithTimeoutOrDefault(ctx, c.timeout) defer cancel() return c.Client.SendTransaction(ctxWithTimeout, tx) From 01050858768d4865e7d29a4a5fde667ee4741f79 Mon Sep 17 00:00:00 2001 From: David Date: Tue, 8 Aug 2023 12:08:30 +0800 Subject: [PATCH 3/3] feat: update pkg --- pkg/rpc/client.go | 29 +++++++++++++++------------ pkg/rpc/dial.go | 17 ++++++++++++---- pkg/rpc/utils.go | 50 +++++++++++++++++++++++++++++++---------------- 3 files changed, 62 insertions(+), 34 deletions(-) diff --git a/pkg/rpc/client.go b/pkg/rpc/client.go index e477bab33..2dfad43b6 100644 --- a/pkg/rpc/client.go +++ b/pkg/rpc/client.go @@ -58,19 +58,23 @@ type ClientConfig struct { // NewClient initializes all RPC clients used by Taiko client softwares. func NewClient(ctx context.Context, cfg *ClientConfig) (*Client, error) { - l1EthClient, err := DialClientWithBackoff(ctx, cfg.L1Endpoint, cfg.RetryInterval) + ctxWithTimeout, cancel := ctxWithTimeoutOrDefault(ctx, defaultTimeout) + defer cancel() + + l1EthClient, err := DialClientWithBackoff(ctxWithTimeout, cfg.L1Endpoint, cfg.RetryInterval) if err != nil { return nil, err } - l2EthClient, err := DialClientWithBackoff(ctx, cfg.L2Endpoint, cfg.RetryInterval) + l2EthClient, err := DialClientWithBackoff(ctxWithTimeout, cfg.L2Endpoint, cfg.RetryInterval) if err != nil { return nil, err } - var l1RPC *EthClient - var l2RPC *EthClient - + var ( + l1RPC *EthClient + l2RPC *EthClient + ) if cfg.Timeout != nil { l1RPC = NewEthClientWithTimeout(l1EthClient, *cfg.Timeout) l2RPC = NewEthClientWithTimeout(l2EthClient, *cfg.Timeout) @@ -97,12 +101,12 @@ func NewClient(ctx context.Context, cfg *ClientConfig) (*Client, error) { return nil, err } - stateVars, err := taikoL1.GetStateVariables(&bind.CallOpts{Context: ctx}) + stateVars, err := taikoL1.GetStateVariables(&bind.CallOpts{Context: ctxWithTimeout}) if err != nil { return nil, err } - isArchive, err := IsArchiveNode(ctx, l1RPC, stateVars.GenesisHeight) + isArchive, err := IsArchiveNode(ctxWithTimeout, l1RPC, stateVars.GenesisHeight) if err != nil { return nil, err } @@ -121,12 +125,12 @@ func NewClient(ctx context.Context, cfg *ClientConfig) (*Client, error) { return nil, err } - l1ChainID, err := l1RPC.ChainID(ctx) + l1ChainID, err := l1RPC.ChainID(ctxWithTimeout) if err != nil { return nil, err } - l2ChainID, err := l2RPC.ChainID(ctx) + l2ChainID, err := l2RPC.ChainID(ctxWithTimeout) if err != nil { return nil, err } @@ -136,7 +140,7 @@ func NewClient(ctx context.Context, cfg *ClientConfig) (*Client, error) { var l2AuthRPC *EngineClient if len(cfg.L2EngineEndpoint) != 0 && len(cfg.JwtSecret) != 0 { if l2AuthRPC, err = DialEngineClientWithBackoff( - ctx, + ctxWithTimeout, cfg.L2EngineEndpoint, cfg.JwtSecret, cfg.RetryInterval, @@ -147,8 +151,7 @@ func NewClient(ctx context.Context, cfg *ClientConfig) (*Client, error) { var l2CheckPoint *EthClient if len(cfg.L2CheckPoint) != 0 { - l2CheckPointEthClient, err := DialClientWithBackoff(ctx, cfg.L2CheckPoint, cfg.RetryInterval) - + l2CheckPointEthClient, err := DialClientWithBackoff(ctxWithTimeout, cfg.L2CheckPoint, cfg.RetryInterval) if err != nil { return nil, err } @@ -176,7 +179,7 @@ func NewClient(ctx context.Context, cfg *ClientConfig) (*Client, error) { L2ChainID: l2ChainID, } - if err := client.ensureGenesisMatched(ctx); err != nil { + if err := client.ensureGenesisMatched(ctxWithTimeout); err != nil { return nil, err } diff --git a/pkg/rpc/dial.go b/pkg/rpc/dial.go index 6d3fee5fa..66cafabd3 100644 --- a/pkg/rpc/dial.go +++ b/pkg/rpc/dial.go @@ -20,7 +20,10 @@ func DialClientWithBackoff(ctx context.Context, url string, retryInterval time.D var client *ethclient.Client if err := backoff.Retry( func() (err error) { - client, err = ethclient.DialContext(ctx, url) + ctxWithTimeout, cancel := ctxWithTimeoutOrDefault(ctx, defaultTimeout) + defer cancel() + + client, err = ethclient.DialContext(ctxWithTimeout, url) if err != nil { log.Error("Dial ethclient error", "url", url, "error", err) } @@ -45,7 +48,10 @@ func DialEngineClientWithBackoff( var engineClient *EngineClient if err := backoff.Retry( func() (err error) { - client, err := DialEngineClient(ctx, url, jwtSecret) + ctxWithTimeout, cancel := ctxWithTimeoutOrDefault(ctx, defaultTimeout) + defer cancel() + + client, err := DialEngineClient(ctxWithTimeout, url, jwtSecret) if err != nil { log.Error("Dial engine client error", "url", url, "error", err) return err @@ -65,6 +71,9 @@ func DialEngineClientWithBackoff( // DialEngineClient initializes an RPC connection with authentication headers. // Taken from https://github.com/prysmaticlabs/prysm/blob/v2.1.4/beacon-chain/execution/rpc_connection.go#L151 func DialEngineClient(ctx context.Context, endpointUrl string, jwtSecret string) (*rpc.Client, error) { + ctxWithTimeout, cancel := ctxWithTimeoutOrDefault(ctx, defaultTimeout) + defer cancel() + endpoint := network.Endpoint{ Url: endpointUrl, Auth: network.AuthorizationData{ @@ -81,12 +90,12 @@ func DialEngineClient(ctx context.Context, endpointUrl string, jwtSecret string) } switch u.Scheme { case "http", "https": - client, err = rpc.DialOptions(ctx, endpoint.Url, rpc.WithHTTPClient(endpoint.HttpClient())) + client, err = rpc.DialOptions(ctxWithTimeout, endpoint.Url, rpc.WithHTTPClient(endpoint.HttpClient())) if err != nil { return nil, err } case "": - client, err = rpc.DialIPC(ctx, endpoint.Url) + client, err = rpc.DialIPC(ctxWithTimeout, endpoint.Url) if err != nil { return nil, err } diff --git a/pkg/rpc/utils.go b/pkg/rpc/utils.go index 66d705acd..c1ba7ab4f 100644 --- a/pkg/rpc/utils.go +++ b/pkg/rpc/utils.go @@ -44,16 +44,12 @@ func WaitReceipt( tx *types.Transaction, ) (*types.Receipt, error) { ticker := time.NewTicker(waitReceiptPollingInterval) - defer ticker.Stop() + ctxWithTimeout, cancel := ctxWithTimeoutOrDefault(ctx, defaultWaitReceiptTimeout) - var ( - ctxWithTimeout = ctx - cancel context.CancelFunc - ) - if _, ok := ctx.Deadline(); !ok { - ctxWithTimeout, cancel = context.WithTimeout(ctx, defaultWaitReceiptTimeout) - defer cancel() - } + defer func() { + cancel() + ticker.Stop() + }() for { select { @@ -81,26 +77,34 @@ func NeedNewProof( id *big.Int, proverAddress common.Address, ) (bool, error) { + ctxWithTimeout, cancel := ctxWithTimeoutOrDefault(ctx, defaultTimeout) + defer cancel() + var parent *types.Header if id.Cmp(common.Big1) == 0 { - header, err := cli.L2.HeaderByNumber(ctx, common.Big0) + header, err := cli.L2.HeaderByNumber(ctxWithTimeout, common.Big0) if err != nil { return false, err } parent = header } else { - parentL1Origin, err := cli.WaitL1Origin(ctx, new(big.Int).Sub(id, common.Big1)) + parentL1Origin, err := cli.WaitL1Origin(ctxWithTimeout, new(big.Int).Sub(id, common.Big1)) if err != nil { return false, err } - if parent, err = cli.L2.HeaderByHash(ctx, parentL1Origin.L2BlockHash); err != nil { + if parent, err = cli.L2.HeaderByHash(ctxWithTimeout, parentL1Origin.L2BlockHash); err != nil { return false, err } } - fc, err := cli.TaikoL1.GetForkChoice(&bind.CallOpts{Context: ctx}, id, parent.Hash(), uint32(parent.GasUsed)) + fc, err := cli.TaikoL1.GetForkChoice( + &bind.CallOpts{Context: ctxWithTimeout}, + id, + parent.Hash(), + uint32(parent.GasUsed), + ) if err != nil { if !strings.Contains(encoding.TryParsingCustomError(err).Error(), "L1_FORK_CHOICE_NOT_FOUND") { return false, encoding.TryParsingCustomError(err) @@ -132,9 +136,12 @@ func ContentFrom( rawRPC *rpc.Client, address common.Address, ) (AccountPoolContent, error) { + ctxWithTimeout, cancel := ctxWithTimeoutOrDefault(ctx, defaultTimeout) + defer cancel() + var result AccountPoolContent return result, rawRPC.CallContext( - ctx, + ctxWithTimeout, &result, "txpool_contentFrom", address, @@ -148,7 +155,10 @@ func GetPendingTxByNonce( address common.Address, nonce uint64, ) (*types.Transaction, error) { - content, err := ContentFrom(ctx, cli.L1RawRPC, address) + ctxWithTimeout, cancel := ctxWithTimeoutOrDefault(ctx, defaultTimeout) + defer cancel() + + content, err := ContentFrom(ctxWithTimeout, cli.L1RawRPC, address) if err != nil { return nil, err } @@ -167,7 +177,10 @@ func GetPendingTxByNonce( // SetHead makes a `debug_setHead` RPC call to set the chain's head, should only be used // for testing purpose. func SetHead(ctx context.Context, rpc *rpc.Client, headNum *big.Int) error { - return gethclient.New(rpc).SetHead(ctx, headNum) + ctxWithTimeout, cancel := ctxWithTimeoutOrDefault(ctx, defaultTimeout) + defer cancel() + + return gethclient.New(rpc).SetHead(ctxWithTimeout, headNum) } // StringToBytes32 converts the given string to [32]byte. @@ -180,7 +193,10 @@ func StringToBytes32(str string) [32]byte { // IsArchiveNode checks if the given node is an archive node. func IsArchiveNode(ctx context.Context, client *EthClient, l2GenesisHeight uint64) (bool, error) { - if _, err := client.BalanceAt(ctx, zeroAddress, new(big.Int).SetUint64(l2GenesisHeight)); err != nil { + ctxWithTimeout, cancel := ctxWithTimeoutOrDefault(ctx, defaultTimeout) + defer cancel() + + if _, err := client.BalanceAt(ctxWithTimeout, zeroAddress, new(big.Int).SetUint64(l2GenesisHeight)); err != nil { if strings.Contains(err.Error(), "missing trie node") { return false, nil }