From 7609bb48690752988c35a155c5e1dfe448c64d41 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Toni=20Ram=C3=ADrez?= Date: Tue, 16 May 2023 18:41:54 +0200 Subject: [PATCH 01/11] refactor state errors --- pool/pool.go | 16 ++--- sequencer/finalizer.go | 9 ++- state/batch.go | 2 +- state/converters.go | 50 +++++++------ state/pgstatestorage.go | 2 +- state/state_test.go | 156 ---------------------------------------- state/transaction.go | 37 ---------- state/types.go | 20 +++--- 8 files changed, 53 insertions(+), 239 deletions(-) diff --git a/pool/pool.go b/pool/pool.go index cb8aef3a6b..0a3d77e3ed 100644 --- a/pool/pool.go +++ b/pool/pool.go @@ -186,16 +186,12 @@ func (p *Pool) PreExecuteTx(ctx context.Context, tx types.Transaction) (preExecu return response, err } - response.usedZkCounters = processBatchResponse.UsedZkCounters - - if !processBatchResponse.RomOOC { - if processBatchResponse.Responses != nil && len(processBatchResponse.Responses) > 0 { - r := processBatchResponse.Responses[0] - response.isOOC = executor.IsROMOutOfGasError(executor.RomErrorCode(r.RomError)) - response.isReverted = errors.Is(r.RomError, runtime.ErrExecutionReverted) - } - } else { - response.isOOG = processBatchResponse.RomOOC + if processBatchResponse.Responses != nil && len(processBatchResponse.Responses) > 0 { + errorToCheck := processBatchResponse.Responses[0].RomError + response.isReverted = errors.Is(errorToCheck, runtime.ErrExecutionReverted) + response.isOOC = executor.IsROMOutOfCountersError(executor.RomErrorCode(errorToCheck)) + response.isOOG = errors.Is(errorToCheck, runtime.ErrOutOfGas) + response.usedZkCounters = processBatchResponse.UsedZkCounters } return response, nil diff --git a/sequencer/finalizer.go b/sequencer/finalizer.go index 77833cb667..f2028a8e8b 100644 --- a/sequencer/finalizer.go +++ b/sequencer/finalizer.go @@ -294,7 +294,7 @@ func (f *finalizer) newWIPBatch(ctx context.Context) (*WipBatch, error) { // Reprocess full batch as sanity check processBatchResponse, err := f.reprocessFullBatch(ctx, f.batch.batchNumber, f.batch.stateRoot) - if err != nil || processBatchResponse.RomOOC { + if err != nil || processBatchResponse.IsRomOOCError { log.Info("halting the finalizer because of a reprocessing error") if err != nil { f.halt(ctx, fmt.Errorf("failed to reprocess batch, err: %v", err)) @@ -398,9 +398,8 @@ func (f *finalizer) processTransaction(ctx context.Context, tx *TxTracker) error // handleTxProcessResp handles the response of transaction processing. func (f *finalizer) handleTxProcessResp(ctx context.Context, tx *TxTracker, result *state.ProcessBatchResponse, oldStateRoot common.Hash) error { // Handle Transaction Error - errorCode := executor.RomErrorCode(result.Responses[0].RomError) - if result.RomOOC || executor.IsIntrinsicError(errorCode) { + if result.IsRomOOCError || executor.IsIntrinsicError(errorCode) { // If intrinsic error or OOC error, we skip adding the transaction to the batch f.handleTransactionError(ctx, result, tx) return result.Responses[0].RomError @@ -669,7 +668,7 @@ func (f *finalizer) processForcedBatch(ctx context.Context, lastBatchNumberInSta f.nextGERMux.Lock() f.lastGERHash = forcedBatch.GlobalExitRoot f.nextGERMux.Unlock() - if len(response.Responses) > 0 && !response.RomOOC { + if len(response.Responses) > 0 && !response.IsRomOOCError { f.handleForcedTxsProcessResp(request, response, stateRoot) } @@ -783,7 +782,7 @@ func (f *finalizer) reprocessFullBatch(ctx context.Context, batchNum uint64, exp return nil, err } - if result.RomOOC { + if result.IsRomOOCError { log.Errorf("failed to process batch %v because OutOfCounters", batch.BatchNumber) payload, err := json.Marshal(processRequest) if err != nil { diff --git a/state/batch.go b/state/batch.go index 94dd9e5d35..848cfe1767 100644 --- a/state/batch.go +++ b/state/batch.go @@ -425,7 +425,7 @@ func (s *State) ProcessAndStoreClosedBatch(ctx context.Context, processingCtx Pr // Filter unprocessed txs and decode txs to store metadata // note that if the batch is not well encoded it will result in an empty batch (with no txs) for i := 0; i < len(processed.Responses); i++ { - if !isProcessed(processed.Responses[i].Error) { + if !executor.IsIntrinsicError(processed.Responses[i].Error) { if executor.IsROMOutOfCountersError(processed.Responses[i].Error) { processed.Responses = []*pb.ProcessTransactionResponse{} break diff --git a/state/converters.go b/state/converters.go index a01611845e..5ccbb96b30 100644 --- a/state/converters.go +++ b/state/converters.go @@ -48,30 +48,40 @@ func (s *State) convertToProcessBatchResponse(txs []types.Transaction, response return nil, err } - romOOC := response.Error != executor.EXECUTOR_ERROR_NO_ERROR - if !romOOC && len(response.Responses) > 0 { - // Check out of counters - errorToCheck := response.Responses[len(response.Responses)-1].Error - romOOC = executor.IsROMOutOfCountersError(errorToCheck) + isExecutorLevelError := (response.Error != executor.EXECUTOR_ERROR_NO_ERROR) + isRomLevelError := false + isRomOOCError := false + + if response.Responses != nil { + for _, resp := range response.Responses { + if resp.Error != pb.RomError_ROM_ERROR_NO_ERROR { + isRomLevelError = true + break + } + } + + if len(response.Responses) > 0 { + // Check out of counters + errorToCheck := response.Responses[len(response.Responses)-1].Error + isRomOOCError = executor.IsROMOutOfCountersError(errorToCheck) + } } return &ProcessBatchResponse{ - NewStateRoot: common.BytesToHash(response.NewStateRoot), - NewAccInputHash: common.BytesToHash(response.NewAccInputHash), - NewLocalExitRoot: common.BytesToHash(response.NewLocalExitRoot), - NewBatchNumber: response.NewBatchNum, - UsedZkCounters: convertToCounters(response), - Responses: responses, - ExecutorError: executor.ExecutorErr(response.Error), - RomOOC: romOOC, - ReadWriteAddresses: readWriteAddresses, + NewStateRoot: common.BytesToHash(response.NewStateRoot), + NewAccInputHash: common.BytesToHash(response.NewAccInputHash), + NewLocalExitRoot: common.BytesToHash(response.NewLocalExitRoot), + NewBatchNumber: response.NewBatchNum, + UsedZkCounters: convertToCounters(response), + Responses: responses, + ExecutorError: executor.ExecutorErr(response.Error), + IsExecutorLevelError: isExecutorLevelError, + IsRomLevelError: isRomLevelError, + IsRomOOCError: isRomOOCError, + ReadWriteAddresses: readWriteAddresses, }, nil } -func isProcessed(err pb.RomError) bool { - return !executor.IsIntrinsicError(err) && !executor.IsROMOutOfCountersError(err) -} - func convertToReadWriteAddresses(addresses map[string]*pb.InfoReadWrite) (map[common.Address]*InfoReadWrite, error) { results := make(map[common.Address]*InfoReadWrite, len(addresses)) @@ -125,7 +135,7 @@ func (s *State) convertToProcessTransactionResponse(txs []types.Transaction, res result.CreateAddress = common.HexToAddress(response.CreateAddress) result.StateRoot = common.BytesToHash(response.StateRoot) result.Logs = convertToLog(response.Logs) - result.IsProcessed = isProcessed(response.Error) + result.IsProcessed = !executor.IsIntrinsicError(response.Error) && !executor.IsROMOutOfCountersError(response.Error) result.ExecutionTrace = *trace result.CallTrace = convertToExecutorTrace(response.CallTrace) result.Tx = txs[i] @@ -158,7 +168,7 @@ func (s *State) convertToProcessTransactionResponse(txs []types.Transaction, res log.Debugf("ProcessTransactionResponse[GasUsed]: %v", result.GasUsed) log.Debugf("ProcessTransactionResponse[GasLeft]: %v", result.GasLeft) log.Debugf("ProcessTransactionResponse[GasRefunded]: %v", result.GasRefunded) - log.Debugf("ProcessTransactionResponse[IsProcessed]: %v", result.IsProcessed) + // log.Debugf("ProcessTransactionResponse[IsProcessed]: %v", result.IsProcessed) } return results, nil diff --git a/state/pgstatestorage.go b/state/pgstatestorage.go index 9435d1a428..9389df410a 100644 --- a/state/pgstatestorage.go +++ b/state/pgstatestorage.go @@ -334,7 +334,7 @@ func (p *PostgresStorage) GetForcedBatch(ctx context.Context, forcedBatchNumber // GetForcedBatchesSince gets L1 forced batches since forcedBatchNumber func (p *PostgresStorage) GetForcedBatchesSince(ctx context.Context, forcedBatchNumber, maxBlockNumber uint64, dbTx pgx.Tx) ([]*ForcedBatch, error) { - const getForcedBatchesSQL = "SELECT forced_batch_num, global_exit_root, timestamp, raw_txs_data, coinbase, block_num FROM state.forced_batch WHERE forced_batch_num > $1 AND block_num <= $2" + const getForcedBatchesSQL = "SELECT forced_batch_num, global_exit_root, timestamp, raw_txs_data, coinbase, block_num FROM state.forced_batch WHERE forced_batch_num > $1 AND block_num <= $2 ORDER BY forced_batch_num ASC" q := p.getExecQuerier(dbTx) rows, err := q.Query(ctx, getForcedBatchesSQL, forcedBatchNumber, maxBlockNumber) if errors.Is(err, pgx.ErrNoRows) { diff --git a/state/state_test.go b/state/state_test.go index 6144a1631c..a31c0fee30 100644 --- a/state/state_test.go +++ b/state/state_test.go @@ -650,162 +650,6 @@ func TestGetTxsHashesByBatchNumber(t *testing.T) { require.NoError(t, dbTx.Commit(ctx)) } -func TestDetermineProcessedTransactions(t *testing.T) { - tcs := []struct { - description string - input []*state.ProcessTransactionResponse - expectedProcessedOutput []*state.ProcessTransactionResponse - expectedUnprocessedOutput map[string]*state.ProcessTransactionResponse - }{ - { - description: "empty input returns empty", - input: []*state.ProcessTransactionResponse{}, - expectedProcessedOutput: []*state.ProcessTransactionResponse{}, - expectedUnprocessedOutput: map[string]*state.ProcessTransactionResponse{}, - }, - { - description: "single processed transaction returns itself", - input: []*state.ProcessTransactionResponse{ - {IsProcessed: true}, - }, - expectedProcessedOutput: []*state.ProcessTransactionResponse{ - {IsProcessed: true}, - }, - expectedUnprocessedOutput: map[string]*state.ProcessTransactionResponse{}, - }, - { - description: "single unprocessed transaction returns empty", - input: []*state.ProcessTransactionResponse{ - { - TxHash: common.HexToHash("a"), - IsProcessed: false, - }, - }, - expectedProcessedOutput: []*state.ProcessTransactionResponse{}, - expectedUnprocessedOutput: map[string]*state.ProcessTransactionResponse{ - "0x000000000000000000000000000000000000000000000000000000000000000a": { - TxHash: common.HexToHash("a"), - IsProcessed: false, - }, - }, - }, - { - description: "multiple processed transactions", - input: []*state.ProcessTransactionResponse{ - { - TxHash: common.HexToHash("a"), - IsProcessed: true, - }, - { - TxHash: common.HexToHash("b"), - IsProcessed: true, - }, - { - TxHash: common.HexToHash("c"), - IsProcessed: true, - }, - }, - expectedProcessedOutput: []*state.ProcessTransactionResponse{ - { - TxHash: common.HexToHash("a"), - IsProcessed: true, - }, - { - TxHash: common.HexToHash("b"), - IsProcessed: true, - }, - { - TxHash: common.HexToHash("c"), - IsProcessed: true, - }, - }, - expectedUnprocessedOutput: map[string]*state.ProcessTransactionResponse{}, - }, - { - description: "multiple unprocessed transactions", - input: []*state.ProcessTransactionResponse{ - { - TxHash: common.HexToHash("a"), - IsProcessed: false, - }, - { - TxHash: common.HexToHash("b"), - IsProcessed: false, - }, - { - TxHash: common.HexToHash("c"), - IsProcessed: false, - }, - }, - expectedProcessedOutput: []*state.ProcessTransactionResponse{}, - expectedUnprocessedOutput: map[string]*state.ProcessTransactionResponse{ - "0x000000000000000000000000000000000000000000000000000000000000000a": { - TxHash: common.HexToHash("a"), - IsProcessed: false, - }, - "0x000000000000000000000000000000000000000000000000000000000000000b": { - TxHash: common.HexToHash("b"), - IsProcessed: false, - }, - "0x000000000000000000000000000000000000000000000000000000000000000c": { - TxHash: common.HexToHash("c"), - IsProcessed: false, - }, - }, - }, - { - description: "mixed processed and unprocessed transactions", - input: []*state.ProcessTransactionResponse{ - { - TxHash: common.HexToHash("a"), - IsProcessed: true, - }, - { - TxHash: common.HexToHash("b"), - IsProcessed: false, - }, - { - TxHash: common.HexToHash("c"), - IsProcessed: true, - }, - { - TxHash: common.HexToHash("d"), - IsProcessed: false, - }, - }, - expectedProcessedOutput: []*state.ProcessTransactionResponse{ - { - TxHash: common.HexToHash("a"), - IsProcessed: true, - }, - { - TxHash: common.HexToHash("c"), - IsProcessed: true, - }, - }, - expectedUnprocessedOutput: map[string]*state.ProcessTransactionResponse{ - "0x000000000000000000000000000000000000000000000000000000000000000b": { - TxHash: common.HexToHash("b"), - IsProcessed: false, - }, - "0x000000000000000000000000000000000000000000000000000000000000000d": { - TxHash: common.HexToHash("d"), - IsProcessed: false, - }, - }, - }, - } - - for _, tc := range tcs { - tc := tc - t.Run(tc.description, func(t *testing.T) { - actualProcessedTx, _, actualUnprocessedTxs, _ := state.DetermineProcessedTransactions(tc.input) - require.Equal(t, tc.expectedProcessedOutput, actualProcessedTx) - require.Equal(t, tc.expectedUnprocessedOutput, actualUnprocessedTxs) - }) - } -} - func TestGenesis(t *testing.T) { block := state.Block{ BlockNumber: 1, diff --git a/state/transaction.go b/state/transaction.go index 56f1c067e6..65fbde338e 100644 --- a/state/transaction.go +++ b/state/transaction.go @@ -738,49 +738,12 @@ func (s *State) isContractCreation(tx *types.Transaction) bool { return tx.To() == nil && len(tx.Data()) > 0 } -// DetermineProcessedTransactions splits the given tx process responses -// returning a slice with only processed and a map unprocessed txs -// respectively. -func DetermineProcessedTransactions(responses []*ProcessTransactionResponse) ( - []*ProcessTransactionResponse, []string, map[string]*ProcessTransactionResponse, []string) { - processedTxResponses := []*ProcessTransactionResponse{} - processedTxsHashes := []string{} - unprocessedTxResponses := map[string]*ProcessTransactionResponse{} - unprocessedTxsHashes := []string{} - for _, response := range responses { - if response.IsProcessed { - processedTxResponses = append(processedTxResponses, response) - processedTxsHashes = append(processedTxsHashes, response.TxHash.String()) - } else { - log.Infof("Tx %s has not been processed", response.TxHash) - unprocessedTxResponses[response.TxHash.String()] = response - unprocessedTxsHashes = append(unprocessedTxsHashes, response.TxHash.String()) - } - } - return processedTxResponses, processedTxsHashes, unprocessedTxResponses, unprocessedTxsHashes -} - // StoreTransaction is used by the sequencer to add process a transaction func (s *State) StoreTransaction(ctx context.Context, batchNumber uint64, processedTx *ProcessTransactionResponse, coinbase common.Address, timestamp uint64, dbTx pgx.Tx) error { if dbTx == nil { return ErrDBTxNil } - // Check if last batch is closed. Note that it's assumed that only the latest batch can be open - /* - isBatchClosed, err := s.PostgresStorage.IsBatchClosed(ctx, batchNumber, dbTx) - if err != nil { - return err - } - if isBatchClosed { - return ErrBatchAlreadyClosed - } - - processingContext, err := s.GetProcessingContext(ctx, batchNumber, dbTx) - if err != nil { - return err - } - */ // if the transaction has an intrinsic invalid tx error it means // the transaction has not changed the state, so we don't store it if executor.IsIntrinsicError(executor.RomErrorCode(processedTx.RomError)) { diff --git a/state/types.go b/state/types.go index eccf1ed0bf..008898086c 100644 --- a/state/types.go +++ b/state/types.go @@ -27,15 +27,17 @@ type ProcessRequest struct { // ProcessBatchResponse represents the response of a batch process. type ProcessBatchResponse struct { - NewStateRoot common.Hash - NewAccInputHash common.Hash - NewLocalExitRoot common.Hash - NewBatchNumber uint64 - UsedZkCounters ZKCounters - Responses []*ProcessTransactionResponse - ExecutorError error - RomOOC bool - ReadWriteAddresses map[common.Address]*InfoReadWrite + NewStateRoot common.Hash + NewAccInputHash common.Hash + NewLocalExitRoot common.Hash + NewBatchNumber uint64 + UsedZkCounters ZKCounters + Responses []*ProcessTransactionResponse + ExecutorError error + IsRomLevelError bool + IsExecutorLevelError bool + IsRomOOCError bool + ReadWriteAddresses map[common.Address]*InfoReadWrite } // ProcessTransactionResponse represents the response of a tx process. From d1e408978562054f447bb0bcdaf4206d2e0f3edd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Toni=20Ram=C3=ADrez?= Date: Tue, 16 May 2023 18:45:08 +0200 Subject: [PATCH 02/11] refactor state errors --- state/converters.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/state/converters.go b/state/converters.go index 5ccbb96b30..3ad21450b2 100644 --- a/state/converters.go +++ b/state/converters.go @@ -168,7 +168,7 @@ func (s *State) convertToProcessTransactionResponse(txs []types.Transaction, res log.Debugf("ProcessTransactionResponse[GasUsed]: %v", result.GasUsed) log.Debugf("ProcessTransactionResponse[GasLeft]: %v", result.GasLeft) log.Debugf("ProcessTransactionResponse[GasRefunded]: %v", result.GasRefunded) - // log.Debugf("ProcessTransactionResponse[IsProcessed]: %v", result.IsProcessed) + log.Debugf("ProcessTransactionResponse[IsProcessed]: %v", result.IsProcessed) } return results, nil From 7b49a16a9c99708b8ba8859a22cec44e22292c35 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Toni=20Ram=C3=ADrez?= Date: Tue, 16 May 2023 19:00:30 +0200 Subject: [PATCH 03/11] refactor state errors --- sequencer/dbmanager.go | 2 +- state/batch.go | 2 +- state/converters.go | 8 ++++++-- state/types.go | 4 ++-- 4 files changed, 10 insertions(+), 6 deletions(-) diff --git a/sequencer/dbmanager.go b/sequencer/dbmanager.go index ec04964d04..8f3acbf7d9 100644 --- a/sequencer/dbmanager.go +++ b/sequencer/dbmanager.go @@ -490,7 +490,7 @@ func (d *dbManager) ProcessForcedBatch(forcedBatchNum uint64, request state.Proc // Close Batch txsBytes := uint64(0) for _, resp := range processBatchResponse.Responses { - if !resp.IsProcessed { + if !resp.ChangesStateRoot { continue } txsBytes += resp.Tx.Size() diff --git a/state/batch.go b/state/batch.go index 848cfe1767..852b097db4 100644 --- a/state/batch.go +++ b/state/batch.go @@ -425,7 +425,7 @@ func (s *State) ProcessAndStoreClosedBatch(ctx context.Context, processingCtx Pr // Filter unprocessed txs and decode txs to store metadata // note that if the batch is not well encoded it will result in an empty batch (with no txs) for i := 0; i < len(processed.Responses); i++ { - if !executor.IsIntrinsicError(processed.Responses[i].Error) { + if !txChangesStateRoot(processed.Responses[i].Error) { if executor.IsROMOutOfCountersError(processed.Responses[i].Error) { processed.Responses = []*pb.ProcessTransactionResponse{} break diff --git a/state/converters.go b/state/converters.go index 3ad21450b2..52301cd458 100644 --- a/state/converters.go +++ b/state/converters.go @@ -82,6 +82,10 @@ func (s *State) convertToProcessBatchResponse(txs []types.Transaction, response }, nil } +func txChangesStateRoot(err pb.RomError) bool { + return !executor.IsIntrinsicError(err) && !executor.IsROMOutOfCountersError(err) +} + func convertToReadWriteAddresses(addresses map[string]*pb.InfoReadWrite) (map[common.Address]*InfoReadWrite, error) { results := make(map[common.Address]*InfoReadWrite, len(addresses)) @@ -135,7 +139,7 @@ func (s *State) convertToProcessTransactionResponse(txs []types.Transaction, res result.CreateAddress = common.HexToAddress(response.CreateAddress) result.StateRoot = common.BytesToHash(response.StateRoot) result.Logs = convertToLog(response.Logs) - result.IsProcessed = !executor.IsIntrinsicError(response.Error) && !executor.IsROMOutOfCountersError(response.Error) + result.ChangesStateRoot = txChangesStateRoot(response.Error) result.ExecutionTrace = *trace result.CallTrace = convertToExecutorTrace(response.CallTrace) result.Tx = txs[i] @@ -168,7 +172,7 @@ func (s *State) convertToProcessTransactionResponse(txs []types.Transaction, res log.Debugf("ProcessTransactionResponse[GasUsed]: %v", result.GasUsed) log.Debugf("ProcessTransactionResponse[GasLeft]: %v", result.GasLeft) log.Debugf("ProcessTransactionResponse[GasRefunded]: %v", result.GasRefunded) - log.Debugf("ProcessTransactionResponse[IsProcessed]: %v", result.IsProcessed) + log.Debugf("ProcessTransactionResponse[ChangesStateRoot]: %v", result.ChangesStateRoot) } return results, nil diff --git a/state/types.go b/state/types.go index 008898086c..72e8c4c14e 100644 --- a/state/types.go +++ b/state/types.go @@ -63,8 +63,8 @@ type ProcessTransactionResponse struct { StateRoot common.Hash // Logs emitted by LOG opcode Logs []*types.Log - // IsProcessed indicates if this tx didn't fit into the batch - IsProcessed bool + // ChangesStateRoot indicates if this tx affects the state + ChangesStateRoot bool // Tx is the whole transaction object Tx types.Transaction // ExecutionTrace contains the traces produced in the execution From 4fa314639c69f9cbedccecae3f20af93050003a1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Toni=20Ram=C3=ADrez?= Date: Tue, 16 May 2023 19:04:00 +0200 Subject: [PATCH 04/11] refactor state errors --- sequencer/finalizer.go | 2 +- state/batch.go | 2 +- state/converters.go | 5 +++-- 3 files changed, 5 insertions(+), 4 deletions(-) diff --git a/sequencer/finalizer.go b/sequencer/finalizer.go index f2028a8e8b..299d0346bc 100644 --- a/sequencer/finalizer.go +++ b/sequencer/finalizer.go @@ -399,7 +399,7 @@ func (f *finalizer) processTransaction(ctx context.Context, tx *TxTracker) error func (f *finalizer) handleTxProcessResp(ctx context.Context, tx *TxTracker, result *state.ProcessBatchResponse, oldStateRoot common.Hash) error { // Handle Transaction Error errorCode := executor.RomErrorCode(result.Responses[0].RomError) - if result.IsRomOOCError || executor.IsIntrinsicError(errorCode) { + if !state.TxChangesStateRoot(errorCode) { // If intrinsic error or OOC error, we skip adding the transaction to the batch f.handleTransactionError(ctx, result, tx) return result.Responses[0].RomError diff --git a/state/batch.go b/state/batch.go index 852b097db4..57e063dcbf 100644 --- a/state/batch.go +++ b/state/batch.go @@ -425,7 +425,7 @@ func (s *State) ProcessAndStoreClosedBatch(ctx context.Context, processingCtx Pr // Filter unprocessed txs and decode txs to store metadata // note that if the batch is not well encoded it will result in an empty batch (with no txs) for i := 0; i < len(processed.Responses); i++ { - if !txChangesStateRoot(processed.Responses[i].Error) { + if !TxChangesStateRoot(processed.Responses[i].Error) { if executor.IsROMOutOfCountersError(processed.Responses[i].Error) { processed.Responses = []*pb.ProcessTransactionResponse{} break diff --git a/state/converters.go b/state/converters.go index 52301cd458..304de8bfda 100644 --- a/state/converters.go +++ b/state/converters.go @@ -82,7 +82,8 @@ func (s *State) convertToProcessBatchResponse(txs []types.Transaction, response }, nil } -func txChangesStateRoot(err pb.RomError) bool { +// TxChangesStateRoot returns true if the transaction changes the state root +func TxChangesStateRoot(err pb.RomError) bool { return !executor.IsIntrinsicError(err) && !executor.IsROMOutOfCountersError(err) } @@ -139,7 +140,7 @@ func (s *State) convertToProcessTransactionResponse(txs []types.Transaction, res result.CreateAddress = common.HexToAddress(response.CreateAddress) result.StateRoot = common.BytesToHash(response.StateRoot) result.Logs = convertToLog(response.Logs) - result.ChangesStateRoot = txChangesStateRoot(response.Error) + result.ChangesStateRoot = TxChangesStateRoot(response.Error) result.ExecutionTrace = *trace result.CallTrace = convertToExecutorTrace(response.CallTrace) result.Tx = txs[i] From 704cb5882aa926f679d36ca9488f57f71edc49ad Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Toni=20Ram=C3=ADrez?= Date: Wed, 17 May 2023 09:14:25 +0200 Subject: [PATCH 05/11] refactor state errors --- sequencer/finalizer.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sequencer/finalizer.go b/sequencer/finalizer.go index 299d0346bc..55a7247d42 100644 --- a/sequencer/finalizer.go +++ b/sequencer/finalizer.go @@ -480,7 +480,7 @@ func (f *finalizer) handleTransactionError(ctx context.Context, result *state.Pr log.Infof("handleTransactionError: error in tx: %s, errorCode: %d", tx.Hash.String(), errorCode) wg := new(sync.WaitGroup) failedReason := executor.RomErr(errorCode).Error() - if executor.IsROMOutOfCountersError(errorCode) { + if result.IsRomOOCError { log.Errorf("ROM out of counters error, marking tx with Hash: %s as INVALID, errorCode: %s", tx.Hash.String(), errorCode.String()) start := time.Now() f.worker.DeleteTx(tx.Hash, tx.From) From cdec53505f5dc63deb62e7048c67037f8db7400f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Toni=20Ram=C3=ADrez?= Date: Wed, 17 May 2023 09:47:11 +0200 Subject: [PATCH 06/11] refactor --- sequencer/dbmanager.go | 17 ++---------- sequencer/finalizer.go | 50 ++++++++++++++++++++---------------- sequencer/finalizer_test.go | 2 +- sequencer/interfaces.go | 2 +- sequencer/mock_db_manager.go | 18 ++++++------- 5 files changed, 41 insertions(+), 48 deletions(-) diff --git a/sequencer/dbmanager.go b/sequencer/dbmanager.go index 8f3acbf7d9..61dead2ed3 100644 --- a/sequencer/dbmanager.go +++ b/sequencer/dbmanager.go @@ -441,14 +441,14 @@ func (d *dbManager) CloseBatch(ctx context.Context, params ClosingBatchParameter } // ProcessForcedBatch process a forced batch -func (d *dbManager) ProcessForcedBatch(forcedBatchNum uint64, request state.ProcessRequest) (*state.ProcessBatchResponse, error) { +func (d *dbManager) ProcessForcedBatch(forcedBatch state.ForcedBatch, request state.ProcessRequest) (*state.ProcessBatchResponse, error) { // Open Batch processingCtx := state.ProcessingContext{ BatchNumber: request.BatchNumber, Coinbase: request.Coinbase, Timestamp: request.Timestamp, GlobalExitRoot: request.GlobalExitRoot, - ForcedBatchNum: &forcedBatchNum, + ForcedBatchNum: &forcedBatch.ForcedBatchNumber, } dbTx, err := d.state.BeginStateTransaction(d.ctx) if err != nil { @@ -468,19 +468,6 @@ func (d *dbManager) ProcessForcedBatch(forcedBatchNum uint64, request state.Proc return nil, err } - // Fetch Forced Batch - forcedBatch, err := d.state.GetForcedBatch(d.ctx, forcedBatchNum, dbTx) - if err != nil { - if rollbackErr := dbTx.Rollback(d.ctx); rollbackErr != nil { - log.Errorf( - "failed to rollback dbTx when getting forced batch err: %v. Rollback err: %v", - rollbackErr, err, - ) - } - log.Errorf("failed to get a forced batch, err: %v", err) - return nil, err - } - // Process Batch processBatchResponse, err := d.state.ProcessSequencerBatch(d.ctx, request.BatchNumber, forcedBatch.RawTxsData, request.Caller, dbTx) if err != nil { diff --git a/sequencer/finalizer.go b/sequencer/finalizer.go index 55a7247d42..46f32a5a49 100644 --- a/sequencer/finalizer.go +++ b/sequencer/finalizer.go @@ -354,14 +354,12 @@ func (f *finalizer) processTransaction(ctx context.Context, tx *TxTracker) error metrics.ProcessingTime(time.Since(start)) }() - var ger common.Hash if f.batch.isEmpty() { - ger = f.batch.globalExitRoot + f.processRequest.GlobalExitRoot = f.batch.globalExitRoot } else { - ger = state.ZeroHash + f.processRequest.GlobalExitRoot = state.ZeroHash } - f.processRequest.GlobalExitRoot = ger if tx != nil { f.processRequest.Transactions = tx.RawTx } else { @@ -372,36 +370,36 @@ func (f *finalizer) processTransaction(ctx context.Context, tx *TxTracker) error hash = tx.HashStr } log.Infof("processTransaction: single tx. Batch.BatchNumber: %d, BatchNumber: %d, OldStateRoot: %s, txHash: %s, GER: %s", f.batch.batchNumber, f.processRequest.BatchNumber, f.processRequest.OldStateRoot, hash, f.processRequest.GlobalExitRoot.String()) - result, err := f.executor.ProcessBatch(ctx, f.processRequest, true) + processBatchResponse, err := f.executor.ProcessBatch(ctx, f.processRequest, true) if err != nil { log.Errorf("failed to process transaction: %s", err) return err } oldStateRoot := f.batch.stateRoot - if len(result.Responses) > 0 && tx != nil { - err = f.handleTxProcessResp(ctx, tx, result, oldStateRoot) + if len(processBatchResponse.Responses) > 0 && tx != nil { + err = f.handleProcessTransactionResponse(ctx, tx, processBatchResponse, oldStateRoot) if err != nil { return err } } // Update in-memory batch and processRequest - f.processRequest.OldStateRoot = result.NewStateRoot - f.batch.stateRoot = result.NewStateRoot - f.batch.localExitRoot = result.NewLocalExitRoot - log.Infof("processTransaction: data loaded in memory. batch.batchNumber: %d, batchNumber: %d, result.NewStateRoot: %s, result.NewLocalExitRoot: %s, oldStateRoot: %s", f.batch.batchNumber, f.processRequest.BatchNumber, result.NewStateRoot.String(), result.NewLocalExitRoot.String(), oldStateRoot.String()) + f.processRequest.OldStateRoot = processBatchResponse.NewStateRoot + f.batch.stateRoot = processBatchResponse.NewStateRoot + f.batch.localExitRoot = processBatchResponse.NewLocalExitRoot + log.Infof("processTransaction: data loaded in memory. batch.batchNumber: %d, batchNumber: %d, result.NewStateRoot: %s, result.NewLocalExitRoot: %s, oldStateRoot: %s", f.batch.batchNumber, f.processRequest.BatchNumber, processBatchResponse.NewStateRoot.String(), processBatchResponse.NewLocalExitRoot.String(), oldStateRoot.String()) return nil } -// handleTxProcessResp handles the response of transaction processing. -func (f *finalizer) handleTxProcessResp(ctx context.Context, tx *TxTracker, result *state.ProcessBatchResponse, oldStateRoot common.Hash) error { +// handleProcessTransactionResponse handles the response of transaction processing. +func (f *finalizer) handleProcessTransactionResponse(ctx context.Context, tx *TxTracker, result *state.ProcessBatchResponse, oldStateRoot common.Hash) error { // Handle Transaction Error errorCode := executor.RomErrorCode(result.Responses[0].RomError) if !state.TxChangesStateRoot(errorCode) { // If intrinsic error or OOC error, we skip adding the transaction to the batch - f.handleTransactionError(ctx, result, tx) + f.handleProcessTransactionError(ctx, result, tx) return result.Responses[0].RomError } @@ -472,8 +470,8 @@ func (f *finalizer) updateWorkerAfterTxStored(ctx context.Context, tx *TxTracker metrics.WorkerProcessingTime(time.Since(start)) } -// handleTransactionError handles the error of a transaction -func (f *finalizer) handleTransactionError(ctx context.Context, result *state.ProcessBatchResponse, tx *TxTracker) *sync.WaitGroup { +// handleProcessTransactionError handles the error of a transaction +func (f *finalizer) handleProcessTransactionError(ctx context.Context, result *state.ProcessBatchResponse, tx *TxTracker) *sync.WaitGroup { txResponse := result.Responses[0] errorCode := executor.RomErrorCode(txResponse.RomError) addressInfo := result.ReadWriteAddresses[tx.From] @@ -512,6 +510,7 @@ func (f *finalizer) handleTransactionError(ctx context.Context, result *state.Pr wg.Add(1) txToDelete := txToDelete go func() { + defer wg.Done() err := f.dbManager.UpdateTxStatus(ctx, txToDelete.Hash, pool.TxStatusFailed, false, &failedReason) metrics.TxProcessed(metrics.TxProcessedLabelFailed, 1) if err != nil { @@ -527,6 +526,7 @@ func (f *finalizer) handleTransactionError(ctx context.Context, result *state.Pr wg.Add(1) go func() { + defer wg.Done() // Update the status of the transaction to failed err := f.dbManager.UpdateTxStatus(ctx, tx.Hash, pool.TxStatusFailed, false, &failedReason) if err != nil { @@ -617,11 +617,7 @@ func (f *finalizer) processForcedBatches(ctx context.Context, lastBatchNumberInS defer f.nextForcedBatchesMux.Unlock() f.nextForcedBatchDeadline = 0 - dbTx, err := f.dbManager.BeginStateTransaction(ctx) - if err != nil { - return 0, common.Hash{}, fmt.Errorf("failed to begin state transaction, err: %w", err) - } - lastTrustedForcedBatchNumber, err := f.dbManager.GetLastTrustedForcedBatchNumber(ctx, dbTx) + lastTrustedForcedBatchNumber, err := f.dbManager.GetLastTrustedForcedBatchNumber(ctx, nil) if err != nil { return 0, common.Hash{}, fmt.Errorf("failed to get last trusted forced batch number, err: %w", err) } @@ -656,7 +652,7 @@ func (f *finalizer) processForcedBatch(ctx context.Context, lastBatchNumberInSta Timestamp: now(), Caller: stateMetrics.SequencerCallerLabel, } - response, err := f.dbManager.ProcessForcedBatch(forcedBatch.ForcedBatchNumber, request) + response, err := f.dbManager.ProcessForcedBatch(forcedBatch, request) if err != nil { // If there is EXECUTOR (Batch level) error, halt the finalizer. f.halt(ctx, fmt.Errorf("failed to process forced batch, Executor err: %w", err)) @@ -885,25 +881,35 @@ func (f *finalizer) isBatchAlmostFull() bool { resources := f.batch.remainingResources zkCounters := resources.ZKCounters result := false + resourceDesc := "" if resources.Bytes <= f.getConstraintThresholdUint64(f.batchConstraints.MaxBatchBytesSize) { + resourceDesc = "MaxBatchBytesSize" result = true } else if zkCounters.UsedSteps <= f.getConstraintThresholdUint32(f.batchConstraints.MaxSteps) { + resourceDesc = "MaxSteps" result = true } else if zkCounters.UsedPoseidonPaddings <= f.getConstraintThresholdUint32(f.batchConstraints.MaxPoseidonPaddings) { + resourceDesc = "MaxPoseidonPaddings" result = true } else if zkCounters.UsedBinaries <= f.getConstraintThresholdUint32(f.batchConstraints.MaxBinaries) { + resourceDesc = "MaxBinaries" result = true } else if zkCounters.UsedKeccakHashes <= f.getConstraintThresholdUint32(f.batchConstraints.MaxKeccakHashes) { + resourceDesc = "MaxKeccakHashes" result = true } else if zkCounters.UsedArithmetics <= f.getConstraintThresholdUint32(f.batchConstraints.MaxArithmetics) { + resourceDesc = "MaxArithmetics" result = true } else if zkCounters.UsedMemAligns <= f.getConstraintThresholdUint32(f.batchConstraints.MaxMemAligns) { + resourceDesc = "MaxMemAligns" result = true } else if zkCounters.CumulativeGasUsed <= f.getConstraintThresholdUint64(f.batchConstraints.MaxCumulativeGasUsed) { + resourceDesc = "MaxCumulativeGasUsed" result = true } if result { + log.Infof("Closing batch: %d, because it reached %s threshold limit", f.batch.batchNumber, resourceDesc) f.batch.closingReason = state.BatchAlmostFullClosingReason } diff --git a/sequencer/finalizer_test.go b/sequencer/finalizer_test.go index 72b60d56c7..8e32e78c67 100644 --- a/sequencer/finalizer_test.go +++ b/sequencer/finalizer_test.go @@ -1123,7 +1123,7 @@ func TestFinalizer_handleTransactionError(t *testing.T) { } // act - wg := f.handleTransactionError(ctx, result, tx) + wg := f.handleProcessTransactionError(ctx, result, tx) if wg != nil { wg.Wait() } diff --git a/sequencer/interfaces.go b/sequencer/interfaces.go index c34ba08870..3a22795000 100644 --- a/sequencer/interfaces.go +++ b/sequencer/interfaces.go @@ -107,7 +107,7 @@ type dbManagerInterface interface { GetBatchByNumber(ctx context.Context, batchNumber uint64, dbTx pgx.Tx) (*state.Batch, error) IsBatchClosed(ctx context.Context, batchNum uint64) (bool, error) GetLatestGer(ctx context.Context, maxBlockNumber uint64) (state.GlobalExitRoot, time.Time, error) - ProcessForcedBatch(forcedBatchNum uint64, request state.ProcessRequest) (*state.ProcessBatchResponse, error) + ProcessForcedBatch(forcedBatch state.ForcedBatch, request state.ProcessRequest) (*state.ProcessBatchResponse, error) GetForcedBatchesSince(ctx context.Context, forcedBatchNumber, maxBlockNumber uint64, dbTx pgx.Tx) ([]*state.ForcedBatch, error) GetLastL2BlockHeader(ctx context.Context, dbTx pgx.Tx) (*types.Header, error) GetLastBlock(ctx context.Context, dbTx pgx.Tx) (*state.Block, error) diff --git a/sequencer/mock_db_manager.go b/sequencer/mock_db_manager.go index 636e131798..d31d4edbbc 100644 --- a/sequencer/mock_db_manager.go +++ b/sequencer/mock_db_manager.go @@ -533,25 +533,25 @@ func (_m *DbManagerMock) OpenBatch(ctx context.Context, processingContext state. return r0 } -// ProcessForcedBatch provides a mock function with given fields: forcedBatchNum, request -func (_m *DbManagerMock) ProcessForcedBatch(forcedBatchNum uint64, request state.ProcessRequest) (*state.ProcessBatchResponse, error) { - ret := _m.Called(forcedBatchNum, request) +// ProcessForcedBatch provides a mock function with given fields: forcedBatch, request +func (_m *DbManagerMock) ProcessForcedBatch(forcedBatch state.ForcedBatch, request state.ProcessRequest) (*state.ProcessBatchResponse, error) { + ret := _m.Called(forcedBatch, request) var r0 *state.ProcessBatchResponse var r1 error - if rf, ok := ret.Get(0).(func(uint64, state.ProcessRequest) (*state.ProcessBatchResponse, error)); ok { - return rf(forcedBatchNum, request) + if rf, ok := ret.Get(0).(func(state.ForcedBatch, state.ProcessRequest) (*state.ProcessBatchResponse, error)); ok { + return rf(forcedBatch, request) } - if rf, ok := ret.Get(0).(func(uint64, state.ProcessRequest) *state.ProcessBatchResponse); ok { - r0 = rf(forcedBatchNum, request) + if rf, ok := ret.Get(0).(func(state.ForcedBatch, state.ProcessRequest) *state.ProcessBatchResponse); ok { + r0 = rf(forcedBatch, request) } else { if ret.Get(0) != nil { r0 = ret.Get(0).(*state.ProcessBatchResponse) } } - if rf, ok := ret.Get(1).(func(uint64, state.ProcessRequest) error); ok { - r1 = rf(forcedBatchNum, request) + if rf, ok := ret.Get(1).(func(state.ForcedBatch, state.ProcessRequest) error); ok { + r1 = rf(forcedBatch, request) } else { r1 = ret.Error(1) } From 837c01263ed8b201a8afe7f0aac7eab4f3138add Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Toni=20Ram=C3=ADrez?= Date: Wed, 17 May 2023 10:03:58 +0200 Subject: [PATCH 07/11] fix --- sequencer/finalizer.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sequencer/finalizer.go b/sequencer/finalizer.go index 46f32a5a49..ee0adc8a0c 100644 --- a/sequencer/finalizer.go +++ b/sequencer/finalizer.go @@ -478,7 +478,7 @@ func (f *finalizer) handleProcessTransactionError(ctx context.Context, result *s log.Infof("handleTransactionError: error in tx: %s, errorCode: %d", tx.Hash.String(), errorCode) wg := new(sync.WaitGroup) failedReason := executor.RomErr(errorCode).Error() - if result.IsRomOOCError { + if executor.IsROMOutOfCountersError(errorCode) { log.Errorf("ROM out of counters error, marking tx with Hash: %s as INVALID, errorCode: %s", tx.Hash.String(), errorCode.String()) start := time.Now() f.worker.DeleteTx(tx.Hash, tx.From) From fd660f4a506482f50c7012f386215be28e85313a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Toni=20Ram=C3=ADrez?= Date: Wed, 17 May 2023 10:50:44 +0200 Subject: [PATCH 08/11] function rename --- sequencer/finalizer.go | 2 +- state/batch.go | 2 +- state/converters.go | 6 +++--- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/sequencer/finalizer.go b/sequencer/finalizer.go index ee0adc8a0c..21952e854a 100644 --- a/sequencer/finalizer.go +++ b/sequencer/finalizer.go @@ -397,7 +397,7 @@ func (f *finalizer) processTransaction(ctx context.Context, tx *TxTracker) error func (f *finalizer) handleProcessTransactionResponse(ctx context.Context, tx *TxTracker, result *state.ProcessBatchResponse, oldStateRoot common.Hash) error { // Handle Transaction Error errorCode := executor.RomErrorCode(result.Responses[0].RomError) - if !state.TxChangesStateRoot(errorCode) { + if !state.IsStateRootChanged(errorCode) { // If intrinsic error or OOC error, we skip adding the transaction to the batch f.handleProcessTransactionError(ctx, result, tx) return result.Responses[0].RomError diff --git a/state/batch.go b/state/batch.go index 57e063dcbf..6f302daa5f 100644 --- a/state/batch.go +++ b/state/batch.go @@ -425,7 +425,7 @@ func (s *State) ProcessAndStoreClosedBatch(ctx context.Context, processingCtx Pr // Filter unprocessed txs and decode txs to store metadata // note that if the batch is not well encoded it will result in an empty batch (with no txs) for i := 0; i < len(processed.Responses); i++ { - if !TxChangesStateRoot(processed.Responses[i].Error) { + if !IsStateRootChanged(processed.Responses[i].Error) { if executor.IsROMOutOfCountersError(processed.Responses[i].Error) { processed.Responses = []*pb.ProcessTransactionResponse{} break diff --git a/state/converters.go b/state/converters.go index 304de8bfda..07fa329b8e 100644 --- a/state/converters.go +++ b/state/converters.go @@ -82,8 +82,8 @@ func (s *State) convertToProcessBatchResponse(txs []types.Transaction, response }, nil } -// TxChangesStateRoot returns true if the transaction changes the state root -func TxChangesStateRoot(err pb.RomError) bool { +// IsStateRootChanged returns true if the transaction changes the state root +func IsStateRootChanged(err pb.RomError) bool { return !executor.IsIntrinsicError(err) && !executor.IsROMOutOfCountersError(err) } @@ -140,7 +140,7 @@ func (s *State) convertToProcessTransactionResponse(txs []types.Transaction, res result.CreateAddress = common.HexToAddress(response.CreateAddress) result.StateRoot = common.BytesToHash(response.StateRoot) result.Logs = convertToLog(response.Logs) - result.ChangesStateRoot = TxChangesStateRoot(response.Error) + result.ChangesStateRoot = IsStateRootChanged(response.Error) result.ExecutionTrace = *trace result.CallTrace = convertToExecutorTrace(response.CallTrace) result.Tx = txs[i] From 83eb21c18f355fc782e719d2a1bb2392dbb173e6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Toni=20Ram=C3=ADrez?= Date: Wed, 17 May 2023 13:04:20 +0200 Subject: [PATCH 09/11] fix wg --- sequencer/finalizer.go | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/sequencer/finalizer.go b/sequencer/finalizer.go index 21952e854a..428ab23248 100644 --- a/sequencer/finalizer.go +++ b/sequencer/finalizer.go @@ -294,12 +294,14 @@ func (f *finalizer) newWIPBatch(ctx context.Context) (*WipBatch, error) { // Reprocess full batch as sanity check processBatchResponse, err := f.reprocessFullBatch(ctx, f.batch.batchNumber, f.batch.stateRoot) - if err != nil || processBatchResponse.IsRomOOCError { + if err != nil || processBatchResponse.IsRomOOCError || processBatchResponse.ExecutorError != nil { log.Info("halting the finalizer because of a reprocessing error") if err != nil { f.halt(ctx, fmt.Errorf("failed to reprocess batch, err: %v", err)) - } else { + } else if processBatchResponse.IsRomOOCError { f.halt(ctx, fmt.Errorf("out of counters during reprocessFullBath")) + } else { + f.halt(ctx, fmt.Errorf("executor error during reprocessFullBath: %v", processBatchResponse.ExecutorError)) } } @@ -486,7 +488,6 @@ func (f *finalizer) handleProcessTransactionError(ctx context.Context, result *s wg.Add(1) go func() { - defer wg.Done() err := f.dbManager.UpdateTxStatus(ctx, tx.Hash, pool.TxStatusInvalid, false, &failedReason) if err != nil { log.Errorf("failed to update status to failed in the pool for tx: %s, err: %s", tx.Hash.String(), err) @@ -510,7 +511,6 @@ func (f *finalizer) handleProcessTransactionError(ctx context.Context, result *s wg.Add(1) txToDelete := txToDelete go func() { - defer wg.Done() err := f.dbManager.UpdateTxStatus(ctx, txToDelete.Hash, pool.TxStatusFailed, false, &failedReason) metrics.TxProcessed(metrics.TxProcessedLabelFailed, 1) if err != nil { @@ -526,7 +526,6 @@ func (f *finalizer) handleProcessTransactionError(ctx context.Context, result *s wg.Add(1) go func() { - defer wg.Done() // Update the status of the transaction to failed err := f.dbManager.UpdateTxStatus(ctx, tx.Hash, pool.TxStatusFailed, false, &failedReason) if err != nil { From f7c7632633b46866e63c42e7fb42f6a7b3805b1a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Toni=20Ram=C3=ADrez?= Date: Wed, 17 May 2023 13:07:45 +0200 Subject: [PATCH 10/11] fix wg --- sequencer/finalizer.go | 1 + 1 file changed, 1 insertion(+) diff --git a/sequencer/finalizer.go b/sequencer/finalizer.go index 428ab23248..cd8b9867a1 100644 --- a/sequencer/finalizer.go +++ b/sequencer/finalizer.go @@ -488,6 +488,7 @@ func (f *finalizer) handleProcessTransactionError(ctx context.Context, result *s wg.Add(1) go func() { + defer wg.Done() err := f.dbManager.UpdateTxStatus(ctx, tx.Hash, pool.TxStatusInvalid, false, &failedReason) if err != nil { log.Errorf("failed to update status to failed in the pool for tx: %s, err: %s", tx.Hash.String(), err) From 044714daf904db26b8b51aa3217fe38d47910328 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Toni=20Ram=C3=ADrez?= Date: Wed, 17 May 2023 13:40:22 +0200 Subject: [PATCH 11/11] fix --- sequencer/dbmanager.go | 17 ++++++++-- sequencer/finalizer.go | 4 ++- sequencer/interfaces.go | 2 +- sequencer/mock_db_manager.go | 18 +++++----- test/Makefile | 66 ++++++++++++++++++------------------ 5 files changed, 61 insertions(+), 46 deletions(-) diff --git a/sequencer/dbmanager.go b/sequencer/dbmanager.go index 61dead2ed3..86c92b3a87 100644 --- a/sequencer/dbmanager.go +++ b/sequencer/dbmanager.go @@ -441,14 +441,14 @@ func (d *dbManager) CloseBatch(ctx context.Context, params ClosingBatchParameter } // ProcessForcedBatch process a forced batch -func (d *dbManager) ProcessForcedBatch(forcedBatch state.ForcedBatch, request state.ProcessRequest) (*state.ProcessBatchResponse, error) { +func (d *dbManager) ProcessForcedBatch(ForcedBatchNumber uint64, request state.ProcessRequest) (*state.ProcessBatchResponse, error) { // Open Batch processingCtx := state.ProcessingContext{ BatchNumber: request.BatchNumber, Coinbase: request.Coinbase, Timestamp: request.Timestamp, GlobalExitRoot: request.GlobalExitRoot, - ForcedBatchNum: &forcedBatch.ForcedBatchNumber, + ForcedBatchNum: &ForcedBatchNumber, } dbTx, err := d.state.BeginStateTransaction(d.ctx) if err != nil { @@ -468,6 +468,19 @@ func (d *dbManager) ProcessForcedBatch(forcedBatch state.ForcedBatch, request st return nil, err } + // Fetch Forced Batch + forcedBatch, err := d.state.GetForcedBatch(d.ctx, ForcedBatchNumber, dbTx) + if err != nil { + if rollbackErr := dbTx.Rollback(d.ctx); rollbackErr != nil { + log.Errorf( + "failed to rollback dbTx when getting forced batch err: %v. Rollback err: %v", + rollbackErr, err, + ) + } + log.Errorf("failed to get a forced batch, err: %v", err) + return nil, err + } + // Process Batch processBatchResponse, err := d.state.ProcessSequencerBatch(d.ctx, request.BatchNumber, forcedBatch.RawTxsData, request.Caller, dbTx) if err != nil { diff --git a/sequencer/finalizer.go b/sequencer/finalizer.go index cd8b9867a1..0076ab6fd2 100644 --- a/sequencer/finalizer.go +++ b/sequencer/finalizer.go @@ -512,6 +512,7 @@ func (f *finalizer) handleProcessTransactionError(ctx context.Context, result *s wg.Add(1) txToDelete := txToDelete go func() { + defer wg.Done() err := f.dbManager.UpdateTxStatus(ctx, txToDelete.Hash, pool.TxStatusFailed, false, &failedReason) metrics.TxProcessed(metrics.TxProcessedLabelFailed, 1) if err != nil { @@ -527,6 +528,7 @@ func (f *finalizer) handleProcessTransactionError(ctx context.Context, result *s wg.Add(1) go func() { + defer wg.Done() // Update the status of the transaction to failed err := f.dbManager.UpdateTxStatus(ctx, tx.Hash, pool.TxStatusFailed, false, &failedReason) if err != nil { @@ -652,7 +654,7 @@ func (f *finalizer) processForcedBatch(ctx context.Context, lastBatchNumberInSta Timestamp: now(), Caller: stateMetrics.SequencerCallerLabel, } - response, err := f.dbManager.ProcessForcedBatch(forcedBatch, request) + response, err := f.dbManager.ProcessForcedBatch(forcedBatch.ForcedBatchNumber, request) if err != nil { // If there is EXECUTOR (Batch level) error, halt the finalizer. f.halt(ctx, fmt.Errorf("failed to process forced batch, Executor err: %w", err)) diff --git a/sequencer/interfaces.go b/sequencer/interfaces.go index 3a22795000..c8122080a1 100644 --- a/sequencer/interfaces.go +++ b/sequencer/interfaces.go @@ -107,7 +107,7 @@ type dbManagerInterface interface { GetBatchByNumber(ctx context.Context, batchNumber uint64, dbTx pgx.Tx) (*state.Batch, error) IsBatchClosed(ctx context.Context, batchNum uint64) (bool, error) GetLatestGer(ctx context.Context, maxBlockNumber uint64) (state.GlobalExitRoot, time.Time, error) - ProcessForcedBatch(forcedBatch state.ForcedBatch, request state.ProcessRequest) (*state.ProcessBatchResponse, error) + ProcessForcedBatch(ForcedBatchNumber uint64, request state.ProcessRequest) (*state.ProcessBatchResponse, error) GetForcedBatchesSince(ctx context.Context, forcedBatchNumber, maxBlockNumber uint64, dbTx pgx.Tx) ([]*state.ForcedBatch, error) GetLastL2BlockHeader(ctx context.Context, dbTx pgx.Tx) (*types.Header, error) GetLastBlock(ctx context.Context, dbTx pgx.Tx) (*state.Block, error) diff --git a/sequencer/mock_db_manager.go b/sequencer/mock_db_manager.go index d31d4edbbc..7f30bc1a11 100644 --- a/sequencer/mock_db_manager.go +++ b/sequencer/mock_db_manager.go @@ -533,25 +533,25 @@ func (_m *DbManagerMock) OpenBatch(ctx context.Context, processingContext state. return r0 } -// ProcessForcedBatch provides a mock function with given fields: forcedBatch, request -func (_m *DbManagerMock) ProcessForcedBatch(forcedBatch state.ForcedBatch, request state.ProcessRequest) (*state.ProcessBatchResponse, error) { - ret := _m.Called(forcedBatch, request) +// ProcessForcedBatch provides a mock function with given fields: ForcedBatchNumber, request +func (_m *DbManagerMock) ProcessForcedBatch(ForcedBatchNumber uint64, request state.ProcessRequest) (*state.ProcessBatchResponse, error) { + ret := _m.Called(ForcedBatchNumber, request) var r0 *state.ProcessBatchResponse var r1 error - if rf, ok := ret.Get(0).(func(state.ForcedBatch, state.ProcessRequest) (*state.ProcessBatchResponse, error)); ok { - return rf(forcedBatch, request) + if rf, ok := ret.Get(0).(func(uint64, state.ProcessRequest) (*state.ProcessBatchResponse, error)); ok { + return rf(ForcedBatchNumber, request) } - if rf, ok := ret.Get(0).(func(state.ForcedBatch, state.ProcessRequest) *state.ProcessBatchResponse); ok { - r0 = rf(forcedBatch, request) + if rf, ok := ret.Get(0).(func(uint64, state.ProcessRequest) *state.ProcessBatchResponse); ok { + r0 = rf(ForcedBatchNumber, request) } else { if ret.Get(0) != nil { r0 = ret.Get(0).(*state.ProcessBatchResponse) } } - if rf, ok := ret.Get(1).(func(state.ForcedBatch, state.ProcessRequest) error); ok { - r1 = rf(forcedBatch, request) + if rf, ok := ret.Get(1).(func(uint64, state.ProcessRequest) error); ok { + r1 = rf(ForcedBatchNumber, request) } else { r1 = ret.Error(1) } diff --git a/test/Makefile b/test/Makefile index d0641ba09f..eb23d6a773 100644 --- a/test/Makefile +++ b/test/Makefile @@ -474,41 +474,41 @@ install-mockery: ## Installs mockery with the correct version to generate the mo .PHONY: generate-mocks generate-mocks: ## Generates mocks for the tests, using mockery tool - mockery --name=storageInterface --dir=../jsonrpc --output=../jsonrpc --outpkg=jsonrpc --inpackage --structname=storageMock --filename=mock_storage.go - mockery --name=PoolInterface --dir=../jsonrpc/types --output=../jsonrpc/mocks --outpkg=mocks --structname=PoolMock --filename=mock_pool.go - mockery --name=StateInterface --dir=../jsonrpc/types --output=../jsonrpc/mocks --outpkg=mocks --structname=StateMock --filename=mock_state.go - mockery --name=Tx --srcpkg=github.com/jackc/pgx/v4 --output=../jsonrpc/mocks --outpkg=mocks --structname=DBTxMock --filename=mock_dbtx.go - - mockery --name=workerInterface --dir=../sequencer --output=../sequencer --outpkg=sequencer --inpackage --structname=WorkerMock --filename=mock_worker.go - mockery --name=stateInterface --dir=../sequencer --output=../sequencer --outpkg=sequencer --inpackage --structname=StateMock --filename=mock_state.go - mockery --name=txPool --dir=../sequencer --output=../sequencer --outpkg=sequencer --inpackage --structname=PoolMock --filename=mock_pool.go - mockery --name=Tx --srcpkg=github.com/jackc/pgx/v4 --output=../sequencer --outpkg=sequencer --structname=DbTxMock --filename=mock_dbtx.go - mockery --name=dbManagerInterface --dir=../sequencer --output=../sequencer --outpkg=sequencer --inpackage --structname=DbManagerMock --filename=mock_db_manager.go - mockery --name=etherman --dir=../sequencer --output=../sequencer --outpkg=sequencer --inpackage --structname=EthermanMock --filename=mock_etherman.go - - mockery --name=ethermanInterface --dir=../synchronizer --output=../synchronizer --outpkg=synchronizer --structname=ethermanMock --filename=mock_etherman.go - mockery --name=stateInterface --dir=../synchronizer --output=../synchronizer --outpkg=synchronizer --structname=stateMock --filename=mock_state.go - mockery --name=ethTxManager --dir=../synchronizer --output=../synchronizer --outpkg=synchronizer --structname=ethTxManagerMock --filename=mock_ethtxmanager.go - mockery --name=poolInterface --dir=../synchronizer --output=../synchronizer --outpkg=synchronizer --structname=poolMock --filename=mock_pool.go - mockery --name=zkEVMClientInterface --dir=../synchronizer --output=../synchronizer --outpkg=synchronizer --structname=zkEVMClientMock --filename=mock_zkevmclient.go - mockery --name=Tx --srcpkg=github.com/jackc/pgx/v4 --output=../synchronizer --outpkg=synchronizer --structname=dbTxMock --filename=mock_dbtx.go - - mockery --name=GasPricer --srcpkg=github.com/ethereum/go-ethereum --output=../etherman --outpkg=etherman --structname=etherscanMock --filename=mock_etherscan.go - mockery --name=GasPricer --srcpkg=github.com/ethereum/go-ethereum --output=../etherman --outpkg=etherman --structname=ethGasStationMock --filename=mock_ethgasstation.go - - mockery --name=ethermanInterface --dir=../ethtxmanager --output=../ethtxmanager --outpkg=ethtxmanager --structname=ethermanMock --filename=mock_etherman_test.go - mockery --name=stateInterface --dir=../ethtxmanager --output=../ethtxmanager --outpkg=ethtxmanager --structname=stateMock --filename=mock_state_test.go - - mockery --name=pool --dir=../gasprice --output=../gasprice --outpkg=gasprice --structname=poolMock --filename=mock_pool.go - mockery --name=ethermanInterface --dir=../gasprice --output=../gasprice --outpkg=gasprice --structname=ethermanMock --filename=mock_etherman.go + export "GOROOT=$$(go env GOROOT)" && $$(go env GOPATH)/bin/mockery --name=storageInterface --dir=../jsonrpc --output=../jsonrpc --outpkg=jsonrpc --inpackage --structname=storageMock --filename=mock_storage.go + export "GOROOT=$$(go env GOROOT)" && $$(go env GOPATH)/bin/mockery --name=PoolInterface --dir=../jsonrpc/types --output=../jsonrpc/mocks --outpkg=mocks --structname=PoolMock --filename=mock_pool.go + export "GOROOT=$$(go env GOROOT)" && $$(go env GOPATH)/bin/mockery --name=StateInterface --dir=../jsonrpc/types --output=../jsonrpc/mocks --outpkg=mocks --structname=StateMock --filename=mock_state.go + export "GOROOT=$$(go env GOROOT)" && $$(go env GOPATH)/bin/mockery --name=Tx --srcpkg=github.com/jackc/pgx/v4 --output=../jsonrpc/mocks --outpkg=mocks --structname=DBTxMock --filename=mock_dbtx.go + + export "GOROOT=$$(go env GOROOT)" && $$(go env GOPATH)/bin/mockery --name=workerInterface --dir=../sequencer --output=../sequencer --outpkg=sequencer --inpackage --structname=WorkerMock --filename=mock_worker.go + export "GOROOT=$$(go env GOROOT)" && $$(go env GOPATH)/bin/mockery --name=stateInterface --dir=../sequencer --output=../sequencer --outpkg=sequencer --inpackage --structname=StateMock --filename=mock_state.go + export "GOROOT=$$(go env GOROOT)" && $$(go env GOPATH)/bin/mockery --name=txPool --dir=../sequencer --output=../sequencer --outpkg=sequencer --inpackage --structname=PoolMock --filename=mock_pool.go + export "GOROOT=$$(go env GOROOT)" && $$(go env GOPATH)/bin/mockery --name=Tx --srcpkg=github.com/jackc/pgx/v4 --output=../sequencer --outpkg=sequencer --structname=DbTxMock --filename=mock_dbtx.go + export "GOROOT=$$(go env GOROOT)" && $$(go env GOPATH)/bin/mockery --name=dbManagerInterface --dir=../sequencer --output=../sequencer --outpkg=sequencer --inpackage --structname=DbManagerMock --filename=mock_db_manager.go + export "GOROOT=$$(go env GOROOT)" && $$(go env GOPATH)/bin/mockery --name=etherman --dir=../sequencer --output=../sequencer --outpkg=sequencer --inpackage --structname=EthermanMock --filename=mock_etherman.go + + export "GOROOT=$$(go env GOROOT)" && $$(go env GOPATH)/bin/mockery --name=ethermanInterface --dir=../synchronizer --output=../synchronizer --outpkg=synchronizer --structname=ethermanMock --filename=mock_etherman.go + export "GOROOT=$$(go env GOROOT)" && $$(go env GOPATH)/bin/mockery --name=stateInterface --dir=../synchronizer --output=../synchronizer --outpkg=synchronizer --structname=stateMock --filename=mock_state.go + export "GOROOT=$$(go env GOROOT)" && $$(go env GOPATH)/bin/mockery --name=ethTxManager --dir=../synchronizer --output=../synchronizer --outpkg=synchronizer --structname=ethTxManagerMock --filename=mock_ethtxmanager.go + export "GOROOT=$$(go env GOROOT)" && $$(go env GOPATH)/bin/mockery --name=poolInterface --dir=../synchronizer --output=../synchronizer --outpkg=synchronizer --structname=poolMock --filename=mock_pool.go + export "GOROOT=$$(go env GOROOT)" && $$(go env GOPATH)/bin/mockery --name=zkEVMClientInterface --dir=../synchronizer --output=../synchronizer --outpkg=synchronizer --structname=zkEVMClientMock --filename=mock_zkevmclient.go + export "GOROOT=$$(go env GOROOT)" && $$(go env GOPATH)/bin/mockery --name=Tx --srcpkg=github.com/jackc/pgx/v4 --output=../synchronizer --outpkg=synchronizer --structname=dbTxMock --filename=mock_dbtx.go + + export "GOROOT=$$(go env GOROOT)" && $$(go env GOPATH)/bin/mockery --name=GasPricer --srcpkg=github.com/ethereum/go-ethereum --output=../etherman --outpkg=etherman --structname=etherscanMock --filename=mock_etherscan.go + export "GOROOT=$$(go env GOROOT)" && $$(go env GOPATH)/bin/mockery --name=GasPricer --srcpkg=github.com/ethereum/go-ethereum --output=../etherman --outpkg=etherman --structname=ethGasStationMock --filename=mock_ethgasstation.go + + export "GOROOT=$$(go env GOROOT)" && $$(go env GOPATH)/bin/mockery --name=ethermanInterface --dir=../ethtxmanager --output=../ethtxmanager --outpkg=ethtxmanager --structname=ethermanMock --filename=mock_etherman_test.go + export "GOROOT=$$(go env GOROOT)" && $$(go env GOPATH)/bin/mockery --name=stateInterface --dir=../ethtxmanager --output=../ethtxmanager --outpkg=ethtxmanager --structname=stateMock --filename=mock_state_test.go + + export "GOROOT=$$(go env GOROOT)" && $$(go env GOPATH)/bin/mockery --name=pool --dir=../gasprice --output=../gasprice --outpkg=gasprice --structname=poolMock --filename=mock_pool.go + export "GOROOT=$$(go env GOROOT)" && $$(go env GOPATH)/bin/mockery --name=ethermanInterface --dir=../gasprice --output=../gasprice --outpkg=gasprice --structname=ethermanMock --filename=mock_etherman.go ## mocks for the aggregator tests - mockery --name=stateInterface --dir=../aggregator --output=../aggregator/mocks --outpkg=mocks --structname=StateMock --filename=mock_state.go - mockery --name=proverInterface --dir=../aggregator --output=../aggregator/mocks --outpkg=mocks --structname=ProverMock --filename=mock_prover.go - mockery --name=etherman --dir=../aggregator --output=../aggregator/mocks --outpkg=mocks --structname=Etherman --filename=mock_etherman.go - mockery --name=ethTxManager --dir=../aggregator --output=../aggregator/mocks --outpkg=mocks --structname=EthTxManager --filename=mock_ethtxmanager.go - mockery --name=aggregatorTxProfitabilityChecker --dir=../aggregator --output=../aggregator/mocks --outpkg=mocks --structname=ProfitabilityCheckerMock --filename=mock_profitabilitychecker.go - mockery --name=Tx --srcpkg=github.com/jackc/pgx/v4 --output=../aggregator/mocks --outpkg=mocks --structname=DbTxMock --filename=mock_dbtx.go + export "GOROOT=$$(go env GOROOT)" && $$(go env GOPATH)/bin/mockery --name=stateInterface --dir=../aggregator --output=../aggregator/mocks --outpkg=mocks --structname=StateMock --filename=mock_state.go + export "GOROOT=$$(go env GOROOT)" && $$(go env GOPATH)/bin/mockery --name=proverInterface --dir=../aggregator --output=../aggregator/mocks --outpkg=mocks --structname=ProverMock --filename=mock_prover.go + export "GOROOT=$$(go env GOROOT)" && $$(go env GOPATH)/bin/mockery --name=etherman --dir=../aggregator --output=../aggregator/mocks --outpkg=mocks --structname=Etherman --filename=mock_etherman.go + export "GOROOT=$$(go env GOROOT)" && $$(go env GOPATH)/bin/mockery --name=ethTxManager --dir=../aggregator --output=../aggregator/mocks --outpkg=mocks --structname=EthTxManager --filename=mock_ethtxmanager.go + export "GOROOT=$$(go env GOROOT)" && $$(go env GOPATH)/bin/mockery --name=aggregatorTxProfitabilityChecker --dir=../aggregator --output=../aggregator/mocks --outpkg=mocks --structname=ProfitabilityCheckerMock --filename=mock_profitabilitychecker.go + export "GOROOT=$$(go env GOROOT)" && $$(go env GOPATH)/bin/mockery --name=Tx --srcpkg=github.com/jackc/pgx/v4 --output=../aggregator/mocks --outpkg=mocks --structname=DbTxMock --filename=mock_dbtx.go .PHONY: run-benchmarks run-benchmarks: run-db ## Runs benchmars