From 512d9fa30ec7f03d222324107499f9cdd403f283 Mon Sep 17 00:00:00 2001 From: Max Planck Date: Thu, 14 Sep 2023 19:28:59 -0400 Subject: [PATCH] executor retry on rpc calls --- agents/agents/executor/executor.go | 138 ++++++++++++----------- agents/agents/executor/executor_utils.go | 60 +++++----- agents/agents/guard/fraud.go | 4 + agents/config/executor/config.go | 7 +- 4 files changed, 109 insertions(+), 100 deletions(-) diff --git a/agents/agents/executor/executor.go b/agents/agents/executor/executor.go index bbd8855d51..ad776aea4c 100644 --- a/agents/agents/executor/executor.go +++ b/agents/agents/executor/executor.go @@ -11,7 +11,6 @@ import ( "github.com/ethereum/go-ethereum/accounts/abi/bind" "github.com/ethereum/go-ethereum/common" ethTypes "github.com/ethereum/go-ethereum/core/types" - "github.com/jpillora/backoff" "github.com/synapsecns/sanguine/agents/agents/executor/db" execTypes "github.com/synapsecns/sanguine/agents/agents/executor/types" "github.com/synapsecns/sanguine/agents/config/executor" @@ -24,6 +23,7 @@ import ( "github.com/synapsecns/sanguine/agents/types" "github.com/synapsecns/sanguine/core/merkle" "github.com/synapsecns/sanguine/core/metrics" + "github.com/synapsecns/sanguine/core/retry" ethergoChain "github.com/synapsecns/sanguine/ethergo/chain" agentsConfig "github.com/synapsecns/sanguine/ethergo/signer/config" "github.com/synapsecns/sanguine/ethergo/signer/signer" @@ -87,6 +87,8 @@ type Executor struct { handler metrics.Handler // txSubmitter is the transaction submitter. txSubmitter submitter.TransactionSubmitter + // retryConfig is the retry configuration for RPC calls. + retryConfig []retry.WithBackoffConfigurator // NowFunc returns the current time. NowFunc func() time.Time } @@ -148,6 +150,10 @@ func NewExecutor(ctx context.Context, config executor.Config, executorDB db.Exec txSubmitter := submitter.NewTransactionSubmitter(handler, executorSigner, omniRPCClient, executorDB.SubmitterDB(), &config.SubmitterConfig) + retryConfig := []retry.WithBackoffConfigurator{ + retry.WithMaxAttemptTime(time.Second * time.Duration(config.MaxRetrySeconds)), + } + if config.ExecuteInterval == 0 { config.ExecuteInterval = 2 } @@ -239,6 +245,7 @@ func NewExecutor(ctx context.Context, config executor.Config, executorDB db.Exec chainExecutors: chainExecutors, handler: handler, txSubmitter: txSubmitter, + retryConfig: retryConfig, NowFunc: time.Now, }, nil } @@ -340,8 +347,16 @@ func (e Executor) Execute(parentCtx context.Context, message types.Message) (_ b return false, nil } - proof, err := e.chainExecutors[message.OriginDomain()].merkleTree.MerkleProof(*nonce-1, (*state).Nonce()) + var proof [][]byte + contractCall := func(ctx context.Context) error { + proof, err = e.chainExecutors[message.OriginDomain()].merkleTree.MerkleProof(*nonce-1, (*state).Nonce()) + if err != nil { + return fmt.Errorf("could not get merkle proof: %w", err) + } + return nil + } + err = retry.WithBackoff(ctx, contractCall, e.retryConfig...) if err != nil { return false, fmt.Errorf("could not get merkle proof: %w", err) } @@ -425,12 +440,31 @@ func (e Executor) Execute(parentCtx context.Context, message types.Message) (_ b // verifyMessageMerkleProof verifies a message against the merkle tree at the state of the given nonce. func (e Executor) verifyMessageMerkleProof(message types.Message) (bool, error) { - root, err := e.chainExecutors[message.OriginDomain()].merkleTree.Root(message.Nonce()) + var root []byte + contractCall := func(ctx context.Context) error { + var err error + root, err = e.chainExecutors[message.OriginDomain()].merkleTree.Root(message.Nonce()) + if err != nil { + return fmt.Errorf("could not get root: %w", err) + } + + return nil + } + err := retry.WithBackoff(context.Background(), contractCall, e.retryConfig...) if err != nil { return false, fmt.Errorf("could not get root: %w", err) } - proof, err := e.chainExecutors[message.OriginDomain()].merkleTree.MerkleProof(message.Nonce()-1, message.Nonce()) + var proof [][]byte + contractCall = func(ctx context.Context) error { + proof, err = e.chainExecutors[message.OriginDomain()].merkleTree.MerkleProof(message.Nonce()-1, message.Nonce()) + if err != nil { + return fmt.Errorf("could not get merkle proof: %w", err) + } + + return nil + } + err = retry.WithBackoff(context.Background(), contractCall, e.retryConfig...) if err != nil { return false, fmt.Errorf("could not get merkle proof: %w", err) } @@ -523,38 +557,25 @@ func (e Executor) verifyMessageOptimisticPeriod(parentCtx context.Context, messa return nil, nil } - b := &backoff.Backoff{ - Factor: 2, - Jitter: true, - Min: 30 * time.Millisecond, - Max: 3 * time.Second, - } - - timeout := time.Duration(0) - var currentTime uint64 + chainCall := func(ctx context.Context) error { + var err error + latestHeader, err := e.chainExecutors[chainID].rpcClient.HeaderByNumber(ctx, nil) + if err != nil { + return fmt.Errorf("could not get latest header: %w", err) + } -retryLoop: - for { - select { - case <-ctx.Done(): - return nil, fmt.Errorf("context canceled: %w", ctx.Err()) - case <-time.After(timeout): - if b.Attempt() >= rpcRetry { - return nil, fmt.Errorf("could not get latest header: %w", err) - } - - latestHeader, err := e.chainExecutors[destinationDomain].rpcClient.HeaderByNumber(ctx, nil) - if err != nil { - timeout = b.Duration() - - continue - } + if latestHeader == nil { + return fmt.Errorf("latest header is nil") + } - currentTime = latestHeader.Time + currentTime = latestHeader.Time - break retryLoop - } + return nil + } + err = retry.WithBackoff(ctx, chainCall, e.retryConfig...) + if err != nil { + return nil, fmt.Errorf("could not get latest header: %w", err) } if *messageMinimumTime > currentTime { @@ -616,43 +637,28 @@ func (e Executor) checkIfExecuted(parentCtx context.Context, message types.Messa metrics.EndSpanWithErr(span, err) }() - b := &backoff.Backoff{ - Factor: 2, - Jitter: true, - Min: 30 * time.Millisecond, - Max: 3 * time.Second, - } - - timeout := time.Duration(0) - - for { - select { - case <-ctx.Done(): - return false, fmt.Errorf("context canceled: %w", ctx.Err()) - case <-time.After(timeout): - if b.Attempt() >= rpcRetry { - return false, fmt.Errorf("could not get executed status: %w", ctx.Err()) - } - - executed, err := e.chainExecutors[message.DestinationDomain()].boundDestination.MessageStatus(ctx, message) - if err != nil { - timeout = b.Duration() - span.AddEvent("could not get executed status", - trace.WithAttributes(attribute.String("error", err.Error())), - trace.WithAttributes(attribute.String("timeout", timeout.String())), - ) - continue - } + var executed uint8 + contractCall := func(ctx context.Context) error { + var err error + executed, err = e.chainExecutors[message.DestinationDomain()].boundDestination.MessageStatus(ctx, message) + if err != nil { + return fmt.Errorf("could not get executed status: %w", err) + } - if execTypes.MessageStatusType(executed) == execTypes.Success { - span.AddEvent("message executed") - return true, nil - } + return nil + } + err = retry.WithBackoff(ctx, contractCall, e.retryConfig...) + if err != nil { + return false, fmt.Errorf("could not get executed status: %w", err) + } - span.AddEvent("message not executed") - return false, nil - } + if execTypes.MessageStatusType(executed) == execTypes.Success { + span.AddEvent("message executed") + return true, nil } + + span.AddEvent("message not executed") + return false, nil } // streamLogs uses gRPC to stream logs into a channel. diff --git a/agents/agents/executor/executor_utils.go b/agents/agents/executor/executor_utils.go index 585c5a02a9..029e43d88f 100644 --- a/agents/agents/executor/executor_utils.go +++ b/agents/agents/executor/executor_utils.go @@ -4,16 +4,15 @@ import ( "context" "fmt" "math/big" - "time" "github.com/ethereum/go-ethereum/common" ethTypes "github.com/ethereum/go-ethereum/core/types" - "github.com/jpillora/backoff" "github.com/synapsecns/sanguine/agents/contracts/inbox" "github.com/synapsecns/sanguine/agents/contracts/lightinbox" "github.com/synapsecns/sanguine/agents/contracts/origin" "github.com/synapsecns/sanguine/agents/contracts/summit" "github.com/synapsecns/sanguine/agents/types" + "github.com/synapsecns/sanguine/core/retry" ) // logToMessage converts the log to a leaf data. @@ -160,13 +159,23 @@ func (e Executor) processSnapshot(ctx context.Context, snapshot types.Snapshot, return fmt.Errorf("could not encode state: %w", err) } // Verify that the state is valid w.r.t. Origin. - valid, err := e.chainExecutors[state.Origin()].boundOrigin.IsValidState( - ctx, - statePayload, - ) + var valid bool + contractCall := func(ctx context.Context) error { + valid, err = e.chainExecutors[state.Origin()].boundOrigin.IsValidState( + ctx, + statePayload, + ) + if err != nil { + return fmt.Errorf("could not check validity of state: %w", err) + } + + return nil + } + err = retry.WithBackoff(ctx, contractCall, e.retryConfig...) if err != nil { return fmt.Errorf("could not check validity of state: %w", err) } + if !valid { stateRoot := state.Root() logger.Infof("snapshot has invalid state. Origin: %d. SnapshotRoot: %s", state.Origin(), common.BytesToHash(stateRoot[:]).String()) @@ -199,36 +208,23 @@ func (e Executor) processAttestation(ctx context.Context, attestation types.Atte } // If the attestation is on a remote chain, we need to fetch the timestamp via an RPC call. - b := &backoff.Backoff{ - Factor: 2, - Jitter: true, - Min: 30 * time.Millisecond, - Max: 3 * time.Second, - } - - timeout := time.Duration(0) - var logHeader *ethTypes.Header var err error + contractCall := func(ctx context.Context) error { + logHeader, err = e.chainExecutors[chainID].rpcClient.HeaderByNumber(ctx, big.NewInt(int64(logBlockNumber))) + if err != nil { + return fmt.Errorf("could not get log header: %w", err) + } -retryLoop: - for { - select { - case <-ctx.Done(): - return fmt.Errorf("context canceled: %w", ctx.Err()) - case <-time.After(timeout): - if b.Attempt() >= rpcRetry { - return fmt.Errorf("could not get log header: %w", err) - } - logHeader, err = e.chainExecutors[chainID].rpcClient.HeaderByNumber(ctx, big.NewInt(int64(logBlockNumber))) - if err != nil { - timeout = b.Duration() - - continue - } + return nil + } + err = retry.WithBackoff(ctx, contractCall, e.retryConfig...) + if err != nil { + return fmt.Errorf("could not get log header: %w", err) + } - break retryLoop - } + if logHeader == nil { + return fmt.Errorf("could not get log header") } err = e.executorDB.StoreAttestation(ctx, attestation, chainID, logBlockNumber, logHeader.Time) diff --git a/agents/agents/guard/fraud.go b/agents/agents/guard/fraud.go index 7dee240103..604ffa5bb3 100644 --- a/agents/agents/guard/fraud.go +++ b/agents/agents/guard/fraud.go @@ -535,6 +535,7 @@ func (g Guard) handleStatusUpdated(ctx context.Context, log ethTypes.Log, chainI if err != nil { return fmt.Errorf("could not parse status updated: %w", err) } + fmt.Printf("handleStatusUpdated: %v on %d\n", types.AgentFlagType(statusUpdated.Flag).String(), chainID) //nolint:exhaustive switch types.AgentFlagType(statusUpdated.Flag) { @@ -708,6 +709,7 @@ func (g Guard) updateAgentStatus(ctx context.Context, chainID uint32) error { if len(eligibleAgentTrees) == 0 { return nil } + fmt.Printf("got eligible agent trees: %v\n", eligibleAgentTrees) var localRoot [32]byte contractCall := func(ctx context.Context) error { @@ -727,6 +729,7 @@ func (g Guard) updateAgentStatus(ctx context.Context, chainID uint32) error { if err != nil { return fmt.Errorf("could not get block number for local root: %w", err) } + fmt.Printf("got localRootBlockNumber %d for agent root %s on chain %d\n", localRootBlockNumber, common.BytesToHash(localRoot[:]).String(), chainID) // Filter the eligible agent roots by the given block number and call updateAgentStatus(). for _, t := range eligibleAgentTrees { @@ -737,6 +740,7 @@ func (g Guard) updateAgentStatus(ctx context.Context, chainID uint32) error { return fmt.Errorf("could not get block number for local root: %w", err) } //nolint:nestif + fmt.Printf("comparing localRootBlockNumber %d with treeBlockNumber %d\n", localRootBlockNumber, treeBlockNumber) if localRootBlockNumber >= treeBlockNumber { logger.Infof("Relaying agent status for agent %s on chain %d", tree.AgentAddress.String(), chainID) // Fetch the agent status to be relayed from Summit. diff --git a/agents/config/executor/config.go b/agents/config/executor/config.go index 5d0b178063..3ae01e82a2 100644 --- a/agents/config/executor/config.go +++ b/agents/config/executor/config.go @@ -3,14 +3,15 @@ package executor import ( "context" "fmt" + "os" + "path/filepath" + "github.com/davecgh/go-spew/spew" "github.com/jftuga/ellipsis" "github.com/synapsecns/sanguine/agents/config" signerConfig "github.com/synapsecns/sanguine/ethergo/signer/config" submitterConfig "github.com/synapsecns/sanguine/ethergo/submitter/config" "gopkg.in/yaml.v2" - "os" - "path/filepath" ) // Config is used to configure an Executor agent. @@ -44,6 +45,8 @@ type Config struct { DBPrefix string `yaml:"db_prefix"` // SubmitterConfig is the config for the submitter. SubmitterConfig submitterConfig.Config `yaml:"submitter_config"` + // MaxRetrySeconds is the maximum number of seconds to retry an RPC call (not a transaction). + MaxRetrySeconds uint32 `yaml:"max_retry_seconds"` } // IsValid makes sure the config is valid. This is done by calling IsValid() on each