Skip to content

Commit

Permalink
executor retry on rpc calls
Browse files Browse the repository at this point in the history
  • Loading branch information
CryptoMaxPlanck committed Sep 14, 2023
1 parent c9df345 commit 512d9fa
Show file tree
Hide file tree
Showing 4 changed files with 109 additions and 100 deletions.
138 changes: 72 additions & 66 deletions agents/agents/executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common"
ethTypes "github.com/ethereum/go-ethereum/core/types"
"github.com/jpillora/backoff"
"github.com/synapsecns/sanguine/agents/agents/executor/db"
execTypes "github.com/synapsecns/sanguine/agents/agents/executor/types"
"github.com/synapsecns/sanguine/agents/config/executor"
Expand All @@ -24,6 +23,7 @@ import (
"github.com/synapsecns/sanguine/agents/types"
"github.com/synapsecns/sanguine/core/merkle"
"github.com/synapsecns/sanguine/core/metrics"
"github.com/synapsecns/sanguine/core/retry"
ethergoChain "github.com/synapsecns/sanguine/ethergo/chain"
agentsConfig "github.com/synapsecns/sanguine/ethergo/signer/config"
"github.com/synapsecns/sanguine/ethergo/signer/signer"
Expand Down Expand Up @@ -87,6 +87,8 @@ type Executor struct {
handler metrics.Handler
// txSubmitter is the transaction submitter.
txSubmitter submitter.TransactionSubmitter
// retryConfig is the retry configuration for RPC calls.
retryConfig []retry.WithBackoffConfigurator
// NowFunc returns the current time.
NowFunc func() time.Time
}
Expand Down Expand Up @@ -148,6 +150,10 @@ func NewExecutor(ctx context.Context, config executor.Config, executorDB db.Exec

txSubmitter := submitter.NewTransactionSubmitter(handler, executorSigner, omniRPCClient, executorDB.SubmitterDB(), &config.SubmitterConfig)

retryConfig := []retry.WithBackoffConfigurator{
retry.WithMaxAttemptTime(time.Second * time.Duration(config.MaxRetrySeconds)),
}

if config.ExecuteInterval == 0 {
config.ExecuteInterval = 2
}
Expand Down Expand Up @@ -239,6 +245,7 @@ func NewExecutor(ctx context.Context, config executor.Config, executorDB db.Exec
chainExecutors: chainExecutors,
handler: handler,
txSubmitter: txSubmitter,
retryConfig: retryConfig,
NowFunc: time.Now,
}, nil
}
Expand Down Expand Up @@ -340,8 +347,16 @@ func (e Executor) Execute(parentCtx context.Context, message types.Message) (_ b
return false, nil
}

proof, err := e.chainExecutors[message.OriginDomain()].merkleTree.MerkleProof(*nonce-1, (*state).Nonce())
var proof [][]byte
contractCall := func(ctx context.Context) error {
proof, err = e.chainExecutors[message.OriginDomain()].merkleTree.MerkleProof(*nonce-1, (*state).Nonce())
if err != nil {
return fmt.Errorf("could not get merkle proof: %w", err)
}

return nil
}
err = retry.WithBackoff(ctx, contractCall, e.retryConfig...)
if err != nil {
return false, fmt.Errorf("could not get merkle proof: %w", err)
}
Expand Down Expand Up @@ -425,12 +440,31 @@ func (e Executor) Execute(parentCtx context.Context, message types.Message) (_ b

// verifyMessageMerkleProof verifies a message against the merkle tree at the state of the given nonce.
func (e Executor) verifyMessageMerkleProof(message types.Message) (bool, error) {
root, err := e.chainExecutors[message.OriginDomain()].merkleTree.Root(message.Nonce())
var root []byte
contractCall := func(ctx context.Context) error {
var err error
root, err = e.chainExecutors[message.OriginDomain()].merkleTree.Root(message.Nonce())
if err != nil {
return fmt.Errorf("could not get root: %w", err)
}

return nil
}
err := retry.WithBackoff(context.Background(), contractCall, e.retryConfig...)
if err != nil {
return false, fmt.Errorf("could not get root: %w", err)
}

proof, err := e.chainExecutors[message.OriginDomain()].merkleTree.MerkleProof(message.Nonce()-1, message.Nonce())
var proof [][]byte
contractCall = func(ctx context.Context) error {
proof, err = e.chainExecutors[message.OriginDomain()].merkleTree.MerkleProof(message.Nonce()-1, message.Nonce())
if err != nil {
return fmt.Errorf("could not get merkle proof: %w", err)
}

return nil
}
err = retry.WithBackoff(context.Background(), contractCall, e.retryConfig...)
if err != nil {
return false, fmt.Errorf("could not get merkle proof: %w", err)
}
Expand Down Expand Up @@ -523,38 +557,25 @@ func (e Executor) verifyMessageOptimisticPeriod(parentCtx context.Context, messa
return nil, nil
}

b := &backoff.Backoff{
Factor: 2,
Jitter: true,
Min: 30 * time.Millisecond,
Max: 3 * time.Second,
}

timeout := time.Duration(0)

var currentTime uint64
chainCall := func(ctx context.Context) error {
var err error
latestHeader, err := e.chainExecutors[chainID].rpcClient.HeaderByNumber(ctx, nil)
if err != nil {
return fmt.Errorf("could not get latest header: %w", err)
}

retryLoop:
for {
select {
case <-ctx.Done():
return nil, fmt.Errorf("context canceled: %w", ctx.Err())
case <-time.After(timeout):
if b.Attempt() >= rpcRetry {
return nil, fmt.Errorf("could not get latest header: %w", err)
}

latestHeader, err := e.chainExecutors[destinationDomain].rpcClient.HeaderByNumber(ctx, nil)
if err != nil {
timeout = b.Duration()

continue
}
if latestHeader == nil {
return fmt.Errorf("latest header is nil")
}

currentTime = latestHeader.Time
currentTime = latestHeader.Time

break retryLoop
}
return nil
}
err = retry.WithBackoff(ctx, chainCall, e.retryConfig...)
if err != nil {
return nil, fmt.Errorf("could not get latest header: %w", err)
}

if *messageMinimumTime > currentTime {
Expand Down Expand Up @@ -616,43 +637,28 @@ func (e Executor) checkIfExecuted(parentCtx context.Context, message types.Messa
metrics.EndSpanWithErr(span, err)
}()

b := &backoff.Backoff{
Factor: 2,
Jitter: true,
Min: 30 * time.Millisecond,
Max: 3 * time.Second,
}

timeout := time.Duration(0)

for {
select {
case <-ctx.Done():
return false, fmt.Errorf("context canceled: %w", ctx.Err())
case <-time.After(timeout):
if b.Attempt() >= rpcRetry {
return false, fmt.Errorf("could not get executed status: %w", ctx.Err())
}

executed, err := e.chainExecutors[message.DestinationDomain()].boundDestination.MessageStatus(ctx, message)
if err != nil {
timeout = b.Duration()
span.AddEvent("could not get executed status",
trace.WithAttributes(attribute.String("error", err.Error())),
trace.WithAttributes(attribute.String("timeout", timeout.String())),
)
continue
}
var executed uint8
contractCall := func(ctx context.Context) error {
var err error
executed, err = e.chainExecutors[message.DestinationDomain()].boundDestination.MessageStatus(ctx, message)
if err != nil {
return fmt.Errorf("could not get executed status: %w", err)
}

if execTypes.MessageStatusType(executed) == execTypes.Success {
span.AddEvent("message executed")
return true, nil
}
return nil
}
err = retry.WithBackoff(ctx, contractCall, e.retryConfig...)
if err != nil {
return false, fmt.Errorf("could not get executed status: %w", err)
}

span.AddEvent("message not executed")
return false, nil
}
if execTypes.MessageStatusType(executed) == execTypes.Success {
span.AddEvent("message executed")
return true, nil
}

span.AddEvent("message not executed")
return false, nil
}

// streamLogs uses gRPC to stream logs into a channel.
Expand Down
60 changes: 28 additions & 32 deletions agents/agents/executor/executor_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,15 @@ import (
"context"
"fmt"
"math/big"
"time"

"github.com/ethereum/go-ethereum/common"
ethTypes "github.com/ethereum/go-ethereum/core/types"
"github.com/jpillora/backoff"
"github.com/synapsecns/sanguine/agents/contracts/inbox"
"github.com/synapsecns/sanguine/agents/contracts/lightinbox"
"github.com/synapsecns/sanguine/agents/contracts/origin"
"github.com/synapsecns/sanguine/agents/contracts/summit"
"github.com/synapsecns/sanguine/agents/types"
"github.com/synapsecns/sanguine/core/retry"
)

// logToMessage converts the log to a leaf data.
Expand Down Expand Up @@ -160,13 +159,23 @@ func (e Executor) processSnapshot(ctx context.Context, snapshot types.Snapshot,
return fmt.Errorf("could not encode state: %w", err)
}
// Verify that the state is valid w.r.t. Origin.
valid, err := e.chainExecutors[state.Origin()].boundOrigin.IsValidState(
ctx,
statePayload,
)
var valid bool
contractCall := func(ctx context.Context) error {
valid, err = e.chainExecutors[state.Origin()].boundOrigin.IsValidState(

Check failure on line 164 in agents/agents/executor/executor_utils.go

View workflow job for this annotation

GitHub Actions / Lint (agents)

Using the variable on range scope `state` in function literal (scopelint)
ctx,
statePayload,
)
if err != nil {
return fmt.Errorf("could not check validity of state: %w", err)
}

return nil
}
err = retry.WithBackoff(ctx, contractCall, e.retryConfig...)
if err != nil {
return fmt.Errorf("could not check validity of state: %w", err)
}

if !valid {
stateRoot := state.Root()
logger.Infof("snapshot has invalid state. Origin: %d. SnapshotRoot: %s", state.Origin(), common.BytesToHash(stateRoot[:]).String())
Expand Down Expand Up @@ -199,36 +208,23 @@ func (e Executor) processAttestation(ctx context.Context, attestation types.Atte
}

// If the attestation is on a remote chain, we need to fetch the timestamp via an RPC call.
b := &backoff.Backoff{
Factor: 2,
Jitter: true,
Min: 30 * time.Millisecond,
Max: 3 * time.Second,
}

timeout := time.Duration(0)

var logHeader *ethTypes.Header
var err error
contractCall := func(ctx context.Context) error {
logHeader, err = e.chainExecutors[chainID].rpcClient.HeaderByNumber(ctx, big.NewInt(int64(logBlockNumber)))
if err != nil {
return fmt.Errorf("could not get log header: %w", err)
}

retryLoop:
for {
select {
case <-ctx.Done():
return fmt.Errorf("context canceled: %w", ctx.Err())
case <-time.After(timeout):
if b.Attempt() >= rpcRetry {
return fmt.Errorf("could not get log header: %w", err)
}
logHeader, err = e.chainExecutors[chainID].rpcClient.HeaderByNumber(ctx, big.NewInt(int64(logBlockNumber)))
if err != nil {
timeout = b.Duration()

continue
}
return nil
}
err = retry.WithBackoff(ctx, contractCall, e.retryConfig...)
if err != nil {
return fmt.Errorf("could not get log header: %w", err)
}

break retryLoop
}
if logHeader == nil {
return fmt.Errorf("could not get log header")
}

err = e.executorDB.StoreAttestation(ctx, attestation, chainID, logBlockNumber, logHeader.Time)
Expand Down
4 changes: 4 additions & 0 deletions agents/agents/guard/fraud.go
Original file line number Diff line number Diff line change
Expand Up @@ -535,6 +535,7 @@ func (g Guard) handleStatusUpdated(ctx context.Context, log ethTypes.Log, chainI
if err != nil {
return fmt.Errorf("could not parse status updated: %w", err)
}
fmt.Printf("handleStatusUpdated: %v on %d\n", types.AgentFlagType(statusUpdated.Flag).String(), chainID)

//nolint:exhaustive
switch types.AgentFlagType(statusUpdated.Flag) {
Expand Down Expand Up @@ -708,6 +709,7 @@ func (g Guard) updateAgentStatus(ctx context.Context, chainID uint32) error {
if len(eligibleAgentTrees) == 0 {
return nil
}
fmt.Printf("got eligible agent trees: %v\n", eligibleAgentTrees)

var localRoot [32]byte
contractCall := func(ctx context.Context) error {
Expand All @@ -727,6 +729,7 @@ func (g Guard) updateAgentStatus(ctx context.Context, chainID uint32) error {
if err != nil {
return fmt.Errorf("could not get block number for local root: %w", err)
}
fmt.Printf("got localRootBlockNumber %d for agent root %s on chain %d\n", localRootBlockNumber, common.BytesToHash(localRoot[:]).String(), chainID)

// Filter the eligible agent roots by the given block number and call updateAgentStatus().
for _, t := range eligibleAgentTrees {
Expand All @@ -737,6 +740,7 @@ func (g Guard) updateAgentStatus(ctx context.Context, chainID uint32) error {
return fmt.Errorf("could not get block number for local root: %w", err)
}
//nolint:nestif
fmt.Printf("comparing localRootBlockNumber %d with treeBlockNumber %d\n", localRootBlockNumber, treeBlockNumber)
if localRootBlockNumber >= treeBlockNumber {

Check failure on line 744 in agents/agents/guard/fraud.go

View workflow job for this annotation

GitHub Actions / Lint (agents)

`if localRootBlockNumber >= treeBlockNumber` has complex nested blocks (complexity: 6) (nestif)
logger.Infof("Relaying agent status for agent %s on chain %d", tree.AgentAddress.String(), chainID)
// Fetch the agent status to be relayed from Summit.
Expand Down
7 changes: 5 additions & 2 deletions agents/config/executor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,15 @@ package executor
import (
"context"
"fmt"
"os"
"path/filepath"

"github.com/davecgh/go-spew/spew"
"github.com/jftuga/ellipsis"
"github.com/synapsecns/sanguine/agents/config"
signerConfig "github.com/synapsecns/sanguine/ethergo/signer/config"
submitterConfig "github.com/synapsecns/sanguine/ethergo/submitter/config"
"gopkg.in/yaml.v2"
"os"
"path/filepath"
)

// Config is used to configure an Executor agent.
Expand Down Expand Up @@ -44,6 +45,8 @@ type Config struct {
DBPrefix string `yaml:"db_prefix"`
// SubmitterConfig is the config for the submitter.
SubmitterConfig submitterConfig.Config `yaml:"submitter_config"`
// MaxRetrySeconds is the maximum number of seconds to retry an RPC call (not a transaction).
MaxRetrySeconds uint32 `yaml:"max_retry_seconds"`
}

// IsValid makes sure the config is valid. This is done by calling IsValid() on each
Expand Down

0 comments on commit 512d9fa

Please sign in to comment.