Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor state errors #2105

Merged
merged 11 commits into from
May 17, 2023
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 6 additions & 10 deletions pool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,16 +186,12 @@ func (p *Pool) PreExecuteTx(ctx context.Context, tx types.Transaction) (preExecu
return response, err
}

response.usedZkCounters = processBatchResponse.UsedZkCounters

if !processBatchResponse.RomOOC {
if processBatchResponse.Responses != nil && len(processBatchResponse.Responses) > 0 {
r := processBatchResponse.Responses[0]
response.isOOC = executor.IsROMOutOfGasError(executor.RomErrorCode(r.RomError))
response.isReverted = errors.Is(r.RomError, runtime.ErrExecutionReverted)
}
} else {
response.isOOG = processBatchResponse.RomOOC
if processBatchResponse.Responses != nil && len(processBatchResponse.Responses) > 0 {
errorToCheck := processBatchResponse.Responses[0].RomError
response.isReverted = errors.Is(errorToCheck, runtime.ErrExecutionReverted)
response.isOOC = executor.IsROMOutOfCountersError(executor.RomErrorCode(errorToCheck))
response.isOOG = errors.Is(errorToCheck, runtime.ErrOutOfGas)
response.usedZkCounters = processBatchResponse.UsedZkCounters
}

return response, nil
Expand Down
19 changes: 3 additions & 16 deletions sequencer/dbmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -441,14 +441,14 @@ func (d *dbManager) CloseBatch(ctx context.Context, params ClosingBatchParameter
}

// ProcessForcedBatch process a forced batch
func (d *dbManager) ProcessForcedBatch(forcedBatchNum uint64, request state.ProcessRequest) (*state.ProcessBatchResponse, error) {
func (d *dbManager) ProcessForcedBatch(forcedBatch state.ForcedBatch, request state.ProcessRequest) (*state.ProcessBatchResponse, error) {
// Open Batch
processingCtx := state.ProcessingContext{
BatchNumber: request.BatchNumber,
Coinbase: request.Coinbase,
Timestamp: request.Timestamp,
GlobalExitRoot: request.GlobalExitRoot,
ForcedBatchNum: &forcedBatchNum,
ForcedBatchNum: &forcedBatch.ForcedBatchNumber,
}
dbTx, err := d.state.BeginStateTransaction(d.ctx)
if err != nil {
Expand All @@ -468,19 +468,6 @@ func (d *dbManager) ProcessForcedBatch(forcedBatchNum uint64, request state.Proc
return nil, err
}

// Fetch Forced Batch
forcedBatch, err := d.state.GetForcedBatch(d.ctx, forcedBatchNum, dbTx)
if err != nil {
if rollbackErr := dbTx.Rollback(d.ctx); rollbackErr != nil {
log.Errorf(
"failed to rollback dbTx when getting forced batch err: %v. Rollback err: %v",
rollbackErr, err,
)
}
log.Errorf("failed to get a forced batch, err: %v", err)
return nil, err
}

// Process Batch
processBatchResponse, err := d.state.ProcessSequencerBatch(d.ctx, request.BatchNumber, forcedBatch.RawTxsData, request.Caller, dbTx)
if err != nil {
Expand All @@ -490,7 +477,7 @@ func (d *dbManager) ProcessForcedBatch(forcedBatchNum uint64, request state.Proc
// Close Batch
txsBytes := uint64(0)
for _, resp := range processBatchResponse.Responses {
if !resp.IsProcessed {
if !resp.ChangesStateRoot {
continue
}
txsBytes += resp.Tx.Size()
Expand Down
59 changes: 32 additions & 27 deletions sequencer/finalizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ func (f *finalizer) newWIPBatch(ctx context.Context) (*WipBatch, error) {

// Reprocess full batch as sanity check
processBatchResponse, err := f.reprocessFullBatch(ctx, f.batch.batchNumber, f.batch.stateRoot)
if err != nil || processBatchResponse.RomOOC {
if err != nil || processBatchResponse.IsRomOOCError {
log.Info("halting the finalizer because of a reprocessing error")
if err != nil {
f.halt(ctx, fmt.Errorf("failed to reprocess batch, err: %v", err))
Expand Down Expand Up @@ -354,14 +354,12 @@ func (f *finalizer) processTransaction(ctx context.Context, tx *TxTracker) error
metrics.ProcessingTime(time.Since(start))
}()

var ger common.Hash
if f.batch.isEmpty() {
ger = f.batch.globalExitRoot
f.processRequest.GlobalExitRoot = f.batch.globalExitRoot
} else {
ger = state.ZeroHash
f.processRequest.GlobalExitRoot = state.ZeroHash
}

f.processRequest.GlobalExitRoot = ger
if tx != nil {
f.processRequest.Transactions = tx.RawTx
} else {
Expand All @@ -372,37 +370,36 @@ func (f *finalizer) processTransaction(ctx context.Context, tx *TxTracker) error
hash = tx.HashStr
}
log.Infof("processTransaction: single tx. Batch.BatchNumber: %d, BatchNumber: %d, OldStateRoot: %s, txHash: %s, GER: %s", f.batch.batchNumber, f.processRequest.BatchNumber, f.processRequest.OldStateRoot, hash, f.processRequest.GlobalExitRoot.String())
result, err := f.executor.ProcessBatch(ctx, f.processRequest, true)
processBatchResponse, err := f.executor.ProcessBatch(ctx, f.processRequest, true)
if err != nil {
log.Errorf("failed to process transaction: %s", err)
return err
}

oldStateRoot := f.batch.stateRoot
if len(result.Responses) > 0 && tx != nil {
err = f.handleTxProcessResp(ctx, tx, result, oldStateRoot)
if len(processBatchResponse.Responses) > 0 && tx != nil {
err = f.handleProcessTransactionResponse(ctx, tx, processBatchResponse, oldStateRoot)
if err != nil {
return err
}
}

// Update in-memory batch and processRequest
f.processRequest.OldStateRoot = result.NewStateRoot
f.batch.stateRoot = result.NewStateRoot
f.batch.localExitRoot = result.NewLocalExitRoot
log.Infof("processTransaction: data loaded in memory. batch.batchNumber: %d, batchNumber: %d, result.NewStateRoot: %s, result.NewLocalExitRoot: %s, oldStateRoot: %s", f.batch.batchNumber, f.processRequest.BatchNumber, result.NewStateRoot.String(), result.NewLocalExitRoot.String(), oldStateRoot.String())
f.processRequest.OldStateRoot = processBatchResponse.NewStateRoot
f.batch.stateRoot = processBatchResponse.NewStateRoot
f.batch.localExitRoot = processBatchResponse.NewLocalExitRoot
log.Infof("processTransaction: data loaded in memory. batch.batchNumber: %d, batchNumber: %d, result.NewStateRoot: %s, result.NewLocalExitRoot: %s, oldStateRoot: %s", f.batch.batchNumber, f.processRequest.BatchNumber, processBatchResponse.NewStateRoot.String(), processBatchResponse.NewLocalExitRoot.String(), oldStateRoot.String())

return nil
}

// handleTxProcessResp handles the response of transaction processing.
func (f *finalizer) handleTxProcessResp(ctx context.Context, tx *TxTracker, result *state.ProcessBatchResponse, oldStateRoot common.Hash) error {
// handleProcessTransactionResponse handles the response of transaction processing.
func (f *finalizer) handleProcessTransactionResponse(ctx context.Context, tx *TxTracker, result *state.ProcessBatchResponse, oldStateRoot common.Hash) error {
// Handle Transaction Error

errorCode := executor.RomErrorCode(result.Responses[0].RomError)
if result.RomOOC || executor.IsIntrinsicError(errorCode) {
if !state.TxChangesStateRoot(errorCode) {
// If intrinsic error or OOC error, we skip adding the transaction to the batch
f.handleTransactionError(ctx, result, tx)
f.handleProcessTransactionError(ctx, result, tx)
return result.Responses[0].RomError
}

Expand Down Expand Up @@ -473,8 +470,8 @@ func (f *finalizer) updateWorkerAfterTxStored(ctx context.Context, tx *TxTracker
metrics.WorkerProcessingTime(time.Since(start))
}

// handleTransactionError handles the error of a transaction
func (f *finalizer) handleTransactionError(ctx context.Context, result *state.ProcessBatchResponse, tx *TxTracker) *sync.WaitGroup {
// handleProcessTransactionError handles the error of a transaction
func (f *finalizer) handleProcessTransactionError(ctx context.Context, result *state.ProcessBatchResponse, tx *TxTracker) *sync.WaitGroup {
txResponse := result.Responses[0]
errorCode := executor.RomErrorCode(txResponse.RomError)
addressInfo := result.ReadWriteAddresses[tx.From]
Expand Down Expand Up @@ -513,6 +510,7 @@ func (f *finalizer) handleTransactionError(ctx context.Context, result *state.Pr
wg.Add(1)
txToDelete := txToDelete
go func() {
defer wg.Done()
err := f.dbManager.UpdateTxStatus(ctx, txToDelete.Hash, pool.TxStatusFailed, false, &failedReason)
metrics.TxProcessed(metrics.TxProcessedLabelFailed, 1)
if err != nil {
Expand All @@ -528,6 +526,7 @@ func (f *finalizer) handleTransactionError(ctx context.Context, result *state.Pr

wg.Add(1)
go func() {
defer wg.Done()
// Update the status of the transaction to failed
err := f.dbManager.UpdateTxStatus(ctx, tx.Hash, pool.TxStatusFailed, false, &failedReason)
if err != nil {
Expand Down Expand Up @@ -618,11 +617,7 @@ func (f *finalizer) processForcedBatches(ctx context.Context, lastBatchNumberInS
defer f.nextForcedBatchesMux.Unlock()
f.nextForcedBatchDeadline = 0

dbTx, err := f.dbManager.BeginStateTransaction(ctx)
if err != nil {
return 0, common.Hash{}, fmt.Errorf("failed to begin state transaction, err: %w", err)
}
lastTrustedForcedBatchNumber, err := f.dbManager.GetLastTrustedForcedBatchNumber(ctx, dbTx)
lastTrustedForcedBatchNumber, err := f.dbManager.GetLastTrustedForcedBatchNumber(ctx, nil)
if err != nil {
return 0, common.Hash{}, fmt.Errorf("failed to get last trusted forced batch number, err: %w", err)
}
Expand Down Expand Up @@ -657,7 +652,7 @@ func (f *finalizer) processForcedBatch(ctx context.Context, lastBatchNumberInSta
Timestamp: now(),
Caller: stateMetrics.SequencerCallerLabel,
}
response, err := f.dbManager.ProcessForcedBatch(forcedBatch.ForcedBatchNumber, request)
response, err := f.dbManager.ProcessForcedBatch(forcedBatch, request)
if err != nil {
// If there is EXECUTOR (Batch level) error, halt the finalizer.
f.halt(ctx, fmt.Errorf("failed to process forced batch, Executor err: %w", err))
Expand All @@ -669,7 +664,7 @@ func (f *finalizer) processForcedBatch(ctx context.Context, lastBatchNumberInSta
f.nextGERMux.Lock()
f.lastGERHash = forcedBatch.GlobalExitRoot
f.nextGERMux.Unlock()
if len(response.Responses) > 0 && !response.RomOOC {
if len(response.Responses) > 0 && !response.IsRomOOCError {
f.handleForcedTxsProcessResp(request, response, stateRoot)
}

Expand Down Expand Up @@ -783,7 +778,7 @@ func (f *finalizer) reprocessFullBatch(ctx context.Context, batchNum uint64, exp
return nil, err
}

if result.RomOOC {
if result.IsRomOOCError {
log.Errorf("failed to process batch %v because OutOfCounters", batch.BatchNumber)
payload, err := json.Marshal(processRequest)
if err != nil {
Expand Down Expand Up @@ -886,25 +881,35 @@ func (f *finalizer) isBatchAlmostFull() bool {
resources := f.batch.remainingResources
zkCounters := resources.ZKCounters
result := false
resourceDesc := ""
if resources.Bytes <= f.getConstraintThresholdUint64(f.batchConstraints.MaxBatchBytesSize) {
resourceDesc = "MaxBatchBytesSize"
result = true
} else if zkCounters.UsedSteps <= f.getConstraintThresholdUint32(f.batchConstraints.MaxSteps) {
resourceDesc = "MaxSteps"
result = true
} else if zkCounters.UsedPoseidonPaddings <= f.getConstraintThresholdUint32(f.batchConstraints.MaxPoseidonPaddings) {
resourceDesc = "MaxPoseidonPaddings"
result = true
} else if zkCounters.UsedBinaries <= f.getConstraintThresholdUint32(f.batchConstraints.MaxBinaries) {
resourceDesc = "MaxBinaries"
result = true
} else if zkCounters.UsedKeccakHashes <= f.getConstraintThresholdUint32(f.batchConstraints.MaxKeccakHashes) {
resourceDesc = "MaxKeccakHashes"
result = true
} else if zkCounters.UsedArithmetics <= f.getConstraintThresholdUint32(f.batchConstraints.MaxArithmetics) {
resourceDesc = "MaxArithmetics"
result = true
} else if zkCounters.UsedMemAligns <= f.getConstraintThresholdUint32(f.batchConstraints.MaxMemAligns) {
resourceDesc = "MaxMemAligns"
result = true
} else if zkCounters.CumulativeGasUsed <= f.getConstraintThresholdUint64(f.batchConstraints.MaxCumulativeGasUsed) {
resourceDesc = "MaxCumulativeGasUsed"
result = true
}

if result {
log.Infof("Closing batch: %d, because it reached %s threshold limit", f.batch.batchNumber, resourceDesc)
f.batch.closingReason = state.BatchAlmostFullClosingReason
}

Expand Down
2 changes: 1 addition & 1 deletion sequencer/finalizer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1123,7 +1123,7 @@ func TestFinalizer_handleTransactionError(t *testing.T) {
}

// act
wg := f.handleTransactionError(ctx, result, tx)
wg := f.handleProcessTransactionError(ctx, result, tx)
if wg != nil {
wg.Wait()
}
Expand Down
2 changes: 1 addition & 1 deletion sequencer/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ type dbManagerInterface interface {
GetBatchByNumber(ctx context.Context, batchNumber uint64, dbTx pgx.Tx) (*state.Batch, error)
IsBatchClosed(ctx context.Context, batchNum uint64) (bool, error)
GetLatestGer(ctx context.Context, maxBlockNumber uint64) (state.GlobalExitRoot, time.Time, error)
ProcessForcedBatch(forcedBatchNum uint64, request state.ProcessRequest) (*state.ProcessBatchResponse, error)
ProcessForcedBatch(forcedBatch state.ForcedBatch, request state.ProcessRequest) (*state.ProcessBatchResponse, error)
GetForcedBatchesSince(ctx context.Context, forcedBatchNumber, maxBlockNumber uint64, dbTx pgx.Tx) ([]*state.ForcedBatch, error)
GetLastL2BlockHeader(ctx context.Context, dbTx pgx.Tx) (*types.Header, error)
GetLastBlock(ctx context.Context, dbTx pgx.Tx) (*state.Block, error)
Expand Down
18 changes: 9 additions & 9 deletions sequencer/mock_db_manager.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion state/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -425,7 +425,7 @@ func (s *State) ProcessAndStoreClosedBatch(ctx context.Context, processingCtx Pr
// Filter unprocessed txs and decode txs to store metadata
// note that if the batch is not well encoded it will result in an empty batch (with no txs)
for i := 0; i < len(processed.Responses); i++ {
if !isProcessed(processed.Responses[i].Error) {
if !TxChangesStateRoot(processed.Responses[i].Error) {
if executor.IsROMOutOfCountersError(processed.Responses[i].Error) {
processed.Responses = []*pb.ProcessTransactionResponse{}
break
Expand Down
49 changes: 32 additions & 17 deletions state/converters.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,27 +48,42 @@ func (s *State) convertToProcessBatchResponse(txs []types.Transaction, response
return nil, err
}

romOOC := response.Error != executor.EXECUTOR_ERROR_NO_ERROR
if !romOOC && len(response.Responses) > 0 {
// Check out of counters
errorToCheck := response.Responses[len(response.Responses)-1].Error
romOOC = executor.IsROMOutOfCountersError(errorToCheck)
isExecutorLevelError := (response.Error != executor.EXECUTOR_ERROR_NO_ERROR)
isRomLevelError := false
isRomOOCError := false

if response.Responses != nil {
for _, resp := range response.Responses {
if resp.Error != pb.RomError_ROM_ERROR_NO_ERROR {
isRomLevelError = true
break
}
}

if len(response.Responses) > 0 {
// Check out of counters
errorToCheck := response.Responses[len(response.Responses)-1].Error
isRomOOCError = executor.IsROMOutOfCountersError(errorToCheck)
}
}

return &ProcessBatchResponse{
NewStateRoot: common.BytesToHash(response.NewStateRoot),
NewAccInputHash: common.BytesToHash(response.NewAccInputHash),
NewLocalExitRoot: common.BytesToHash(response.NewLocalExitRoot),
NewBatchNumber: response.NewBatchNum,
UsedZkCounters: convertToCounters(response),
Responses: responses,
ExecutorError: executor.ExecutorErr(response.Error),
RomOOC: romOOC,
ReadWriteAddresses: readWriteAddresses,
NewStateRoot: common.BytesToHash(response.NewStateRoot),
NewAccInputHash: common.BytesToHash(response.NewAccInputHash),
NewLocalExitRoot: common.BytesToHash(response.NewLocalExitRoot),
NewBatchNumber: response.NewBatchNum,
UsedZkCounters: convertToCounters(response),
Responses: responses,
ExecutorError: executor.ExecutorErr(response.Error),
IsExecutorLevelError: isExecutorLevelError,
IsRomLevelError: isRomLevelError,
IsRomOOCError: isRomOOCError,
ReadWriteAddresses: readWriteAddresses,
}, nil
}

func isProcessed(err pb.RomError) bool {
// TxChangesStateRoot returns true if the transaction changes the state root
func TxChangesStateRoot(err pb.RomError) bool {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe rename this to IsStateRootChanged.

return !executor.IsIntrinsicError(err) && !executor.IsROMOutOfCountersError(err)
}

Expand Down Expand Up @@ -125,7 +140,7 @@ func (s *State) convertToProcessTransactionResponse(txs []types.Transaction, res
result.CreateAddress = common.HexToAddress(response.CreateAddress)
result.StateRoot = common.BytesToHash(response.StateRoot)
result.Logs = convertToLog(response.Logs)
result.IsProcessed = isProcessed(response.Error)
result.ChangesStateRoot = TxChangesStateRoot(response.Error)
result.ExecutionTrace = *trace
result.CallTrace = convertToExecutorTrace(response.CallTrace)
result.Tx = txs[i]
Expand Down Expand Up @@ -158,7 +173,7 @@ func (s *State) convertToProcessTransactionResponse(txs []types.Transaction, res
log.Debugf("ProcessTransactionResponse[GasUsed]: %v", result.GasUsed)
log.Debugf("ProcessTransactionResponse[GasLeft]: %v", result.GasLeft)
log.Debugf("ProcessTransactionResponse[GasRefunded]: %v", result.GasRefunded)
log.Debugf("ProcessTransactionResponse[IsProcessed]: %v", result.IsProcessed)
log.Debugf("ProcessTransactionResponse[ChangesStateRoot]: %v", result.ChangesStateRoot)
}

return results, nil
Expand Down
2 changes: 1 addition & 1 deletion state/pgstatestorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,7 @@ func (p *PostgresStorage) GetForcedBatch(ctx context.Context, forcedBatchNumber

// GetForcedBatchesSince gets L1 forced batches since forcedBatchNumber
func (p *PostgresStorage) GetForcedBatchesSince(ctx context.Context, forcedBatchNumber, maxBlockNumber uint64, dbTx pgx.Tx) ([]*ForcedBatch, error) {
const getForcedBatchesSQL = "SELECT forced_batch_num, global_exit_root, timestamp, raw_txs_data, coinbase, block_num FROM state.forced_batch WHERE forced_batch_num > $1 AND block_num <= $2"
const getForcedBatchesSQL = "SELECT forced_batch_num, global_exit_root, timestamp, raw_txs_data, coinbase, block_num FROM state.forced_batch WHERE forced_batch_num > $1 AND block_num <= $2 ORDER BY forced_batch_num ASC"
q := p.getExecQuerier(dbTx)
rows, err := q.Query(ctx, getForcedBatchesSQL, forcedBatchNumber, maxBlockNumber)
if errors.Is(err, pgx.ErrNoRows) {
Expand Down
Loading