Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix trace batch remote requests in parallel limitation #2244

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 15 additions & 23 deletions jsonrpc/endpoints_debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func (d *DebugEndpoints) TraceBlockByNumber(number types.BlockNumber, cfg *trace
block, err := d.state.GetL2BlockByNumber(ctx, blockNumber, dbTx)
if errors.Is(err, state.ErrNotFound) {
return nil, types.NewRPCError(types.DefaultErrorCode, fmt.Sprintf("block #%d not found", blockNumber))
} else if err == state.ErrNotFound {
} else if err != nil {
return rpcErrorResponse(types.DefaultErrorCode, "failed to get block by number", err)
}

Expand All @@ -118,7 +118,7 @@ func (d *DebugEndpoints) TraceBlockByHash(hash types.ArgHash, cfg *traceConfig)
block, err := d.state.GetL2BlockByHash(ctx, hash.Hash(), dbTx)
if errors.Is(err, state.ErrNotFound) {
return nil, types.NewRPCError(types.DefaultErrorCode, fmt.Sprintf("block %s not found", hash.Hash().String()))
} else if err == state.ErrNotFound {
} else if err != nil {
return rpcErrorResponse(types.DefaultErrorCode, "failed to get block by hash", err)
}

Expand Down Expand Up @@ -182,9 +182,9 @@ func (d *DebugEndpoints) TraceBatchByNumber(httpRequest *http.Request, number ty
}

batch, err := d.state.GetBatchByNumber(ctx, batchNumber, dbTx)
if errors.Is(err, state.ErrNotFound) {
if errors.Is(err, state.ErrStateNotSynchronized) {
return nil, types.NewRPCError(types.DefaultErrorCode, fmt.Sprintf("batch #%d not found", batchNumber))
} else if err == state.ErrNotFound {
} else if err != nil {
return rpcErrorResponse(types.DefaultErrorCode, "failed to get batch by number", err)
}

Expand All @@ -202,7 +202,7 @@ func (d *DebugEndpoints) TraceBatchByNumber(httpRequest *http.Request, number ty
receipts = append(receipts, *receipt)
}

requests := make(chan (ethTypes.Receipt), bufferSize)
buffer := make(chan byte, bufferSize)

mu := sync.Mutex{}
wg := sync.WaitGroup{}
Expand All @@ -211,20 +211,25 @@ func (d *DebugEndpoints) TraceBatchByNumber(httpRequest *http.Request, number ty

// gets the trace from the jRPC and adds it to the responses
loadTraceByTxHash := func(receipt ethTypes.Receipt) {
defer func() {
<-buffer // make buffer slot free
wg.Done()
}()
buffer <- 1 // use buffer free slot or wait for a free slot

response := traceResponse{
blockNumber: receipt.BlockNumber.Uint64(),
txIndex: uint64(receipt.TransactionIndex),
txHash: receipt.TxHash,
}

defer wg.Done()
res, err := client.JSONRPCCall(rpcURL, "debug_traceTransaction", receipt.TxHash.String(), cfg)
if err != nil {
err := fmt.Errorf("failed to get tx trace from remote jRPC server %v, err: %w", rpcURL, err)
err := fmt.Errorf("failed to get tx trace from remote jRPC server %v for tx %v, err: %w", rpcURL, receipt.TxHash.String(), err)
log.Errorf(err.Error())
response.err = err
} else if res.Error != nil {
err := fmt.Errorf("tx trace error returned from remote jRPC server %v, %v %v", rpcURL, res.Error.Code, res.Error.Message)
err := fmt.Errorf("tx trace error returned from remote jRPC server %v for tx %v, err: %v - %v", rpcURL, receipt.TxHash.String(), res.Error.Code, res.Error.Message)
log.Errorf(err.Error())
response.err = err
} else {
Expand All @@ -237,29 +242,16 @@ func (d *DebugEndpoints) TraceBatchByNumber(httpRequest *http.Request, number ty
responses = append(responses, response)
}

// goes through the buffer and loads the trace
// by all the transactions added in the buffer
// then add the results to the responses map
go func() {
index := uint(0)
for req := range requests {
go loadTraceByTxHash(req)
index++
}
}()

// add receipts to the buffer
// load traces for each transaction
for _, receipt := range receipts {
requests <- receipt
go loadTraceByTxHash(receipt)
}

// wait the traces to be loaded
if waitTimeout(&wg, d.cfg.ReadTimeout.Duration) {
return rpcErrorResponse(types.DefaultErrorCode, fmt.Sprintf("failed to get traces for batch %v: timeout reached", batchNumber), nil)
}

close(requests)

// since the txs are attached to a L2 Block and the L2 Block is
// the struct attached to the Batch, in order to always respond
// the traces in the same order, we need to order the transactions
Expand Down