Skip to content

Commit

Permalink
[Aggregator] More fine-grained logs in case of prover errors (#1718)
Browse files Browse the repository at this point in the history
Closes #1717.

This PR adds logs in case of errors happening in the methods responsible to interact with the prover for proof generation.
This additional logs are necessary in order to print more information, in particular, the `proofId` and `batches` (or `batch`) fields, which is not available in the outer scope (the `Channel` method) where the main error log is printed.
  • Loading branch information
kind84 authored Mar 8, 2023
1 parent b41129e commit 9858de7
Showing 1 changed file with 78 additions and 40 deletions.
118 changes: 78 additions & 40 deletions aggregator/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"strings"
"sync"
"time"
"unicode"

"github.com/0xPolygonHermez/zkevm-node/aggregator/metrics"
"github.com/0xPolygonHermez/zkevm-node/aggregator/pb"
Expand Down Expand Up @@ -124,7 +125,7 @@ func (a *Aggregator) Start(ctx context.Context) error {
address := fmt.Sprintf("%s:%d", a.cfg.Host, a.cfg.Port)
lis, err := net.Listen("tcp", address)
if err != nil {
log.Fatalf("failed to listen: %v", err)
log.Fatalf("Failed to listen: %v", err)
}

a.srv = grpc.NewServer()
Expand Down Expand Up @@ -178,12 +179,13 @@ func (a *Aggregator) Channel(stream pb.AggregatorService_ChannelServer) error {
"proverId", prover.ID(),
"proverAddr", prover.Addr(),
)
log.Debug("Establishing stream connection with prover")
log.Info("Establishing stream connection with prover")

// Check if prover supports the required Fork ID
if !prover.SupportsForkID(a.cfg.ForkId) {
log.Warnf("Prover does not support required fork ID: %d.", a.cfg.ForkId)
return fmt.Errorf("prover does not support required fork ID: %d", a.cfg.ForkId)
err := errors.New("prover does not support required fork ID")
log.Warn(FirstToUpper(err.Error()))
return err
}

for {
Expand All @@ -198,7 +200,7 @@ func (a *Aggregator) Channel(stream pb.AggregatorService_ChannelServer) error {
default:
isIdle, err := prover.IsIdle()
if err != nil {
log.Errorf("failed to check if prover is idle: %v", err)
log.Errorf("Failed to check if prover is idle: %v", err)
time.Sleep(a.cfg.RetryTime.Duration)
continue
}
Expand All @@ -210,17 +212,17 @@ func (a *Aggregator) Channel(stream pb.AggregatorService_ChannelServer) error {

_, err = a.tryBuildFinalProof(ctx, prover, nil)
if err != nil {
log.Errorf("error checking proofs to verify: %v", err)
log.Errorf("Error checking proofs to verify: %v", err)
}

proofGenerated, err := a.tryAggregateProofs(ctx, prover)
if err != nil {
log.Errorf("error trying to aggregate proofs: %v", err)
log.Errorf("Error trying to aggregate proofs: %v", err)
}
if !proofGenerated {
proofGenerated, err = a.tryGenerateBatchProof(ctx, prover)
if err != nil {
log.Errorf("error trying to generate proof: %v", err)
log.Errorf("Error trying to generate proof: %v", err)
}
}
if !proofGenerated {
Expand Down Expand Up @@ -252,7 +254,7 @@ func (a *Aggregator) sendFinalProof() {

finalBatch, err := a.State.GetBatchByNumber(ctx, proof.BatchNumberFinal, nil)
if err != nil {
log.Errorf("failed to retrieve batch with number [%d]: %v", proof.BatchNumberFinal, err)
log.Errorf("Failed to retrieve batch with number [%d]: %v", proof.BatchNumberFinal, err)
a.endProofVerification()
continue
}
Expand All @@ -269,15 +271,15 @@ func (a *Aggregator) sendFinalProof() {
sender := common.HexToAddress(a.cfg.SenderAddress)
to, data, err := a.Ethman.BuildTrustedVerifyBatchesTxData(proof.BatchNumber-1, proof.BatchNumberFinal, &inputs)
if err != nil {
log.Errorf("error estimating batch verification to add to eth tx manager: %v", err)
log.Errorf("Error estimating batch verification to add to eth tx manager: %v", err)
a.handleFailureToAddVerifyBatchToBeMonitored(ctx, proof)
continue
}
monitoredTxID := buildMonitoredTxID(proof.BatchNumber, proof.BatchNumberFinal)
err = a.EthTxManager.Add(ctx, ethTxManagerOwner, monitoredTxID, sender, to, nil, data, nil)
if err != nil {
log := log.WithFields("tx", monitoredTxID)
log.Errorf("error to add batch verification tx to eth tx manager: %v", err)
log.Errorf("Error to add batch verification tx to eth tx manager: %v", err)
a.handleFailureToAddVerifyBatchToBeMonitored(ctx, proof)
continue
}
Expand All @@ -298,7 +300,7 @@ func (a *Aggregator) handleFailureToAddVerifyBatchToBeMonitored(ctx context.Cont
proof.GeneratingSince = nil
err := a.State.UpdateGeneratedProof(ctx, proof, nil)
if err != nil {
log.Errorf("failed updating proof state (false): %v", err)
log.Errorf("Failed updating proof state (false): %v", err)
}
a.endProofVerification()
}
Expand Down Expand Up @@ -404,7 +406,7 @@ func (a *Aggregator) tryBuildFinalProof(ctx context.Context, prover proverInterf
proof.GeneratingSince = nil
err2 := a.State.UpdateGeneratedProof(a.ctx, proof, nil)
if err2 != nil {
log.Errorf("failed to unlock proof: %v", err2)
log.Errorf("Failed to unlock proof: %v", err2)
}
}
}()
Expand All @@ -428,7 +430,9 @@ func (a *Aggregator) tryBuildFinalProof(ctx context.Context, prover proverInterf
// at this point we have an eligible proof, build the final one using it
finalProof, err := a.buildFinalProof(ctx, prover, proof)
if err != nil {
return false, fmt.Errorf("failed to build final proof, %w", err)
err = fmt.Errorf("failed to build final proof, %w", err)
log.Error(FirstToUpper(err.Error()))
return false, err
}

msg := finalProofMsg{
Expand Down Expand Up @@ -519,7 +523,7 @@ func (a *Aggregator) unlockProofsToAggregate(ctx context.Context, proof1 *state.
if err != nil {
if err := dbTx.Rollback(ctx); err != nil {
err := fmt.Errorf("failed to rollback proof aggregation state: %w", err)
log.Error(err.Error())
log.Error(FirstToUpper(err.Error()))
return err
}
return fmt.Errorf("failed to release proof aggregation state: %w", err)
Expand Down Expand Up @@ -566,7 +570,7 @@ func (a *Aggregator) getAndLockProofsToAggregate(ctx context.Context, prover pro
if err != nil {
if err := dbTx.Rollback(ctx); err != nil {
err := fmt.Errorf("failed to rollback proof aggregation state %w", err)
log.Error(err.Error())
log.Error(FirstToUpper(err.Error()))
return nil, nil, err
}
return nil, nil, fmt.Errorf("failed to set proof aggregation state %w", err)
Expand Down Expand Up @@ -617,15 +621,19 @@ func (a *Aggregator) tryAggregateProofs(ctx context.Context, prover proverInterf
}()

log.Infof("Aggregating proofs: %d-%d and %d-%d", proof1.BatchNumber, proof1.BatchNumberFinal, proof2.BatchNumber, proof2.BatchNumberFinal)
log = log.WithFields("batches", fmt.Sprintf("%d-%d", proof1.BatchNumber, proof2.BatchNumberFinal))

batches := fmt.Sprintf("%d-%d", proof1.BatchNumber, proof2.BatchNumberFinal)
log = log.WithFields("batches", batches)

inputProver := map[string]interface{}{
"recursive_proof_1": proof1.Proof,
"recursive_proof_2": proof2.Proof,
}
b, err := json.Marshal(inputProver)
if err != nil {
return false, fmt.Errorf("failed to serialize input prover, %w", err)
err = fmt.Errorf("failed to serialize input prover, %w", err)
log.Error(FirstToUpper(err.Error()))
return false, err
}

proof := &state.Proof{
Expand All @@ -638,7 +646,9 @@ func (a *Aggregator) tryAggregateProofs(ctx context.Context, prover proverInterf

aggrProofID, err = prover.AggregatedProof(proof1.Proof, proof2.Proof)
if err != nil {
return false, fmt.Errorf("failed to get aggregated proof id, %w", err)
err = fmt.Errorf("failed to get aggregated proof id, %w", err)
log.Error(FirstToUpper(err.Error()))
return false, err
}

proof.ProofID = aggrProofID
Expand All @@ -648,7 +658,9 @@ func (a *Aggregator) tryAggregateProofs(ctx context.Context, prover proverInterf

recursiveProof, err := prover.WaitRecursiveProof(ctx, *proof.ProofID)
if err != nil {
return false, fmt.Errorf("failed to get aggregated proof from prover, %w", err)
err = fmt.Errorf("failed to get aggregated proof from prover, %w", err)
log.Error(FirstToUpper(err.Error()))
return false, err
}

log.Info("Aggregated proof generated")
Expand All @@ -659,17 +671,21 @@ func (a *Aggregator) tryAggregateProofs(ctx context.Context, prover proverInterf
// newly generated recursive proof
dbTx, err := a.State.BeginStateTransaction(ctx)
if err != nil {
return false, fmt.Errorf("failed to begin transaction to update proof aggregation state: %w", err)
err = fmt.Errorf("failed to begin transaction to update proof aggregation state, %w", err)
log.Error(FirstToUpper(err.Error()))
return false, err
}

err = a.State.DeleteGeneratedProofs(ctx, proof1.BatchNumber, proof2.BatchNumberFinal, dbTx)
if err != nil {
if err := dbTx.Rollback(ctx); err != nil {
err := fmt.Errorf("failed to rollback proof aggregation state: %w", err)
log.Error(err.Error())
err := fmt.Errorf("failed to rollback proof aggregation state, %w", err)
log.Error(FirstToUpper(err.Error()))
return false, err
}
return false, fmt.Errorf("failed to delete previously aggregated proofs: %w", err)
err = fmt.Errorf("failed to delete previously aggregated proofs, %w", err)
log.Error(FirstToUpper(err.Error()))
return false, err
}

now := time.Now().Round(time.Microsecond)
Expand All @@ -678,16 +694,20 @@ func (a *Aggregator) tryAggregateProofs(ctx context.Context, prover proverInterf
err = a.State.AddGeneratedProof(ctx, proof, dbTx)
if err != nil {
if err := dbTx.Rollback(ctx); err != nil {
err := fmt.Errorf("failed to rollback proof aggregation state: %w", err)
log.Error(err.Error())
err := fmt.Errorf("failed to rollback proof aggregation state, %w", err)
log.Error(FirstToUpper(err.Error()))
return false, err
}
return false, fmt.Errorf("failed to store the recursive proof: %w", err)
err = fmt.Errorf("failed to store the recursive proof, %w", err)
log.Error(FirstToUpper(err.Error()))
return false, err
}

err = dbTx.Commit(ctx)
if err != nil {
return false, fmt.Errorf("failed to store the recursive proof: %w", err)
err = fmt.Errorf("failed to store the recursive proof, %w", err)
log.Error(FirstToUpper(err.Error()))
return false, err
}

// NOTE(pg): the defer func is useless from now on, use a different variable
Expand All @@ -698,7 +718,7 @@ func (a *Aggregator) tryAggregateProofs(ctx context.Context, prover proverInterf
finalProofBuilt, finalProofErr := a.tryBuildFinalProof(ctx, prover, proof)
if finalProofErr != nil {
// just log the error and continue to handle the aggregated proof
log.Errorf("failed trying to check if recursive proof can be verified: %v", finalProofErr)
log.Errorf("Failed trying to check if recursive proof can be verified: %v", finalProofErr)
}

// NOTE(pg): prover is done, use a.ctx from now on
Expand All @@ -709,7 +729,8 @@ func (a *Aggregator) tryAggregateProofs(ctx context.Context, prover proverInterf
// final proof has not been generated, update the recursive proof
err := a.State.UpdateGeneratedProof(a.ctx, proof, nil)
if err != nil {
log.Errorf("Failed to store batch proof result, err %v", err)
err = fmt.Errorf("failed to store batch proof result, %w", err)
log.Error(FirstToUpper(err.Error()))
return false, err
}
}
Expand Down Expand Up @@ -795,13 +816,13 @@ func (a *Aggregator) tryGenerateBatchProof(ctx context.Context, prover proverInt
return false, err0
}

log = log.WithFields("batch", batchToProve.BatchNumber)

var (
genProofID *string
err error
)

log = log.WithFields("batch", batchToProve.BatchNumber)

defer func() {
if err != nil {
err2 := a.State.DeleteGeneratedProofs(a.ctx, proof.BatchNumber, proof.BatchNumberFinal, nil)
Expand All @@ -817,12 +838,16 @@ func (a *Aggregator) tryGenerateBatchProof(ctx context.Context, prover proverInt
log.Infof("Sending zki + batch to the prover, batchNumber [%d]", batchToProve.BatchNumber)
inputProver, err := a.buildInputProver(ctx, batchToProve)
if err != nil {
return false, fmt.Errorf("failed to build input prover, %w", err)
err = fmt.Errorf("failed to build input prover, %w", err)
log.Error(FirstToUpper(err.Error()))
return false, err
}

b, err := json.Marshal(inputProver)
if err != nil {
return false, fmt.Errorf("failed to serialize input prover, %w", err)
err = fmt.Errorf("failed to serialize input prover, %w", err)
log.Error(FirstToUpper(err.Error()))
return false, err
}

proof.InputProver = string(b)
Expand All @@ -832,7 +857,9 @@ func (a *Aggregator) tryGenerateBatchProof(ctx context.Context, prover proverInt

genProofID, err = prover.BatchProof(inputProver)
if err != nil {
return false, fmt.Errorf("failed to get batch proof id %w", err)
err = fmt.Errorf("failed to get batch proof id, %w", err)
log.Error(FirstToUpper(err.Error()))
return false, err
}

proof.ProofID = genProofID
Expand All @@ -842,7 +869,9 @@ func (a *Aggregator) tryGenerateBatchProof(ctx context.Context, prover proverInt

resGetProof, err := prover.WaitRecursiveProof(ctx, *proof.ProofID)
if err != nil {
return false, fmt.Errorf("failed to get proof from prover %w", err)
err = fmt.Errorf("failed to get proof from prover, %w", err)
log.Error(FirstToUpper(err.Error()))
return false, err
}

log.Info("Batch proof generated")
Expand All @@ -855,7 +884,7 @@ func (a *Aggregator) tryGenerateBatchProof(ctx context.Context, prover proverInt
finalProofBuilt, finalProofErr := a.tryBuildFinalProof(ctx, prover, proof)
if finalProofErr != nil {
// just log the error and continue to handle the generated proof
log.Errorf("error trying to build final proof %v", finalProofErr)
log.Errorf("Error trying to build final proof: %v", finalProofErr)
}

// NOTE(pg): prover is done, use a.ctx from now on
Expand All @@ -866,7 +895,8 @@ func (a *Aggregator) tryGenerateBatchProof(ctx context.Context, prover proverInt
// final proof has not been generated, update the batch proof
err := a.State.UpdateGeneratedProof(a.ctx, proof, nil)
if err != nil {
log.Errorf("Failed to store batch proof result, err %v", err)
err = fmt.Errorf("failed to store batch proof result, %w", err)
log.Error(FirstToUpper(err.Error()))
return false, err
}
}
Expand Down Expand Up @@ -1031,7 +1061,7 @@ func (a *Aggregator) handleMonitoredTxResult(result ethtxmanager.MonitoredTxResu
// proofs up to the last synced batch
err = a.State.CleanupGeneratedProofs(a.ctx, proofBatchNumberFinal, nil)
if err != nil {
log.Errorf("failed to store proof aggregation result: %v", err)
log.Errorf("Failed to store proof aggregation result: %v", err)
}
}

Expand All @@ -1047,7 +1077,7 @@ func (a *Aggregator) cleanupLockedProofs() {
case <-time.After(a.TimeCleanupLockedProofs.Duration):
n, err := a.State.CleanupLockedProofs(a.ctx, a.cfg.GeneratingProofCleanupThreshold, nil)
if err != nil {
log.Errorf("failed to cleanup locked proofs: %v", err)
log.Errorf("Failed to cleanup locked proofs: %v", err)
}
if n == 1 {
log.Warn("Found a stale proof and removed form cache")
Expand All @@ -1057,3 +1087,11 @@ func (a *Aggregator) cleanupLockedProofs() {
}
}
}

// FirstToUpper returns the string passed as argument with the first letter in
// uppercase.
func FirstToUpper(s string) string {
runes := []rune(s)
runes[0] = unicode.ToUpper(runes[0])
return string(runes)
}

0 comments on commit 9858de7

Please sign in to comment.