From 055870a317fa54037e6a3196a56a96aafccd6aab Mon Sep 17 00:00:00 2001 From: Freddy Caceres Date: Mon, 13 Sep 2021 22:59:06 +0200 Subject: [PATCH] json rpc concurrency --- ethereum/rpc/namespaces/debug/api.go | 95 ++++++++++++++------------ ethereum/rpc/namespaces/debug/types.go | 13 ++++ 2 files changed, 65 insertions(+), 43 deletions(-) create mode 100644 ethereum/rpc/namespaces/debug/types.go diff --git a/ethereum/rpc/namespaces/debug/api.go b/ethereum/rpc/namespaces/debug/api.go index 85b20f627e..06794ea6df 100644 --- a/ethereum/rpc/namespaces/debug/api.go +++ b/ethereum/rpc/namespaces/debug/api.go @@ -112,8 +112,8 @@ func (a *API) TraceTransaction(hash common.Hash, config *evmtypes.TraceConfig) ( return decodedResult, nil } -func (a *API) TraceBlockByNumber(height rpctypes.BlockNumber, config *evmtypes.TraceConfig) ([]interface{}, error) { - var res []interface{} +func (a *API) TraceBlockByNumber(height rpctypes.BlockNumber, config *evmtypes.TraceConfig) ([]*TxTraceResult, error) { + a.logger.Debug("debug_traceBlockByNumber", "height", height) if height == 0 { return nil, errors.New("genesis is not traceable") } @@ -127,59 +127,68 @@ func (a *API) TraceBlockByNumber(height rpctypes.BlockNumber, config *evmtypes.T if txsLength == 0 { // If there are no transactions return empty array - return res, nil + return []*TxTraceResult{}, nil } var ( - results = make([]*types.TxTraceResult, len(txsLength)) + results = make([]*TxTraceResult, txsLength) wg = new(sync.WaitGroup) - jobs = make(chan *types.TxTraceTask, len(req.Transactions)) + jobs = make(chan *TxTraceTask, txsLength) ) - transactionsTraceRequest := make([]*evmtypes.TraceBlockTransaction, txsLength) - - // Get all tx messages - for i, transaction := range resBlock.Block.Txs { - tx, err := a.clientCtx.TxConfig.TxDecoder()(transaction) - if err != nil { - a.logger.Debug("tx not found", "hash", transaction.Hash()) - return nil, err - } - - ethMessage, ok := tx.GetMsgs()[0].(*evmtypes.MsgEthereumTx) - if !ok { - // Just considers Ethereum transactions - continue - } - - transactionsTraceRequest[i] = &evmtypes.TraceBlockTransaction{ - Msg: ethMessage, - Index: uint32(i), - } + threads := runtime.NumCPU() + if threads > txsLength { + threads = txsLength } - - traceTxRequest := evmtypes.QueryTraceBlockRequest{ - TraceConfig: config, - Transactions: transactionsTraceRequest, + wg.Add(threads) + for th := 0; th < threads; th++ { + go func() { + defer wg.Done() + // Fetch and execute the next transaction trace tasks + for task := range jobs { + tx, err := a.clientCtx.TxConfig.TxDecoder()(resBlock.Block.Txs[task.Index]) + if err != nil { + a.logger.Debug("tx not found", "hash", resBlock.Block.Txs[task.Index].Hash()) + continue + } + + ethMessage, ok := tx.GetMsgs()[0].(*evmtypes.MsgEthereumTx) + if !ok { + // Just considers Ethereum transactions + continue + } + traceTxRequest := &evmtypes.QueryTraceTxRequest{ + Msg: ethMessage, + TxIndex: uint32(task.Index), + TraceConfig: config, + } + + res, err := a.queryClient.TraceTx(rpctypes.ContextWithHeight(int64(height)), traceTxRequest) + if err != nil { + results[task.Index] = &TxTraceResult{Error: err.Error()} + continue + } + // Response format is unknown due to custom tracer config param + // More information can be found here https://geth.ethereum.org/docs/dapp/tracing-filtered + var decodedResult interface{} + err = json.Unmarshal(res.Data, &decodedResult) + if err != nil { + results[task.Index] = &TxTraceResult{Error: err.Error()} + continue + } + results[task.Index] = &TxTraceResult{Result: decodedResult} + } + }() } - if config != nil { - traceTxRequest.TraceConfig = config + for i, _ := range resBlock.Block.Txs { + jobs <- &TxTraceTask{Index: i} } - traceResult, err := a.queryClient.TraceBlock(rpctypes.ContextWithHeight(int64(height)), &traceTxRequest) - if err != nil { - return nil, err - } - - // Response format is unknown due to custom tracer config param - // More information can be found here https://geth.ethereum.org/docs/dapp/tracing-filtered - err = json.Unmarshal(traceResult.Result, &res) - if err != nil { - return nil, err - } + close(jobs) + wg.Wait() - return res, nil + return results, nil } // BlockProfile turns on goroutine profiling for nsec seconds and writes profile data to diff --git a/ethereum/rpc/namespaces/debug/types.go b/ethereum/rpc/namespaces/debug/types.go new file mode 100644 index 0000000000..ca4be94cd5 --- /dev/null +++ b/ethereum/rpc/namespaces/debug/types.go @@ -0,0 +1,13 @@ +package debug + +// TxTraceTask represents a single transaction trace task when an entire block +// is being traced. +type TxTraceTask struct { + Index int // Transaction offset in the block +} + +// TxTraceResult is the result of a single transaction trace. +type TxTraceResult struct { + Result interface{} `json:"result,omitempty"` // Trace results produced by the tracer + Error string `json:"error,omitempty"` // Trace failure produced by the tracer +}