Skip to content

Commit

Permalink
Retry.WithBackoff for all agents. (#1377)
Browse files Browse the repository at this point in the history
* executor retry on rpc calls

* add default retry seconds

* [goreleaser]

* lint

* guard and notary retry

* clean

* get rid of session.vim

* merge master and undo readme change

---------

Co-authored-by: Trajan0x <[email protected]>
  • Loading branch information
CryptoMaxPlanck and trajan0x authored Oct 11, 2023
1 parent 3087759 commit 2ee5234
Show file tree
Hide file tree
Showing 6 changed files with 246 additions and 138 deletions.
143 changes: 76 additions & 67 deletions agents/agents/executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common"
ethTypes "github.com/ethereum/go-ethereum/core/types"
"github.com/jpillora/backoff"
"github.com/synapsecns/sanguine/agents/agents/executor/db"
execTypes "github.com/synapsecns/sanguine/agents/agents/executor/types"
"github.com/synapsecns/sanguine/agents/config/executor"
Expand All @@ -24,6 +23,7 @@ import (
"github.com/synapsecns/sanguine/agents/types"
"github.com/synapsecns/sanguine/core/merkle"
"github.com/synapsecns/sanguine/core/metrics"
"github.com/synapsecns/sanguine/core/retry"
ethergoChain "github.com/synapsecns/sanguine/ethergo/chain"
agentsConfig "github.com/synapsecns/sanguine/ethergo/signer/config"
"github.com/synapsecns/sanguine/ethergo/signer/signer"
Expand Down Expand Up @@ -87,6 +87,8 @@ type Executor struct {
handler metrics.Handler
// txSubmitter is the transaction submitter.
txSubmitter submitter.TransactionSubmitter
// retryConfig is the retry configuration for RPC calls.
retryConfig []retry.WithBackoffConfigurator
// NowFunc returns the current time.
NowFunc func() time.Time
}
Expand All @@ -99,7 +101,6 @@ type logOrderInfo struct {

const (
logChanSize = 1000
rpcRetry = 7
scribeConnectTimeout = 30 * time.Second
)

Expand Down Expand Up @@ -148,6 +149,14 @@ func NewExecutor(ctx context.Context, config executor.Config, executorDB db.Exec

txSubmitter := submitter.NewTransactionSubmitter(handler, executorSigner, omniRPCClient, executorDB.SubmitterDB(), &config.SubmitterConfig)

if config.MaxRetrySeconds == 0 {
config.MaxRetrySeconds = 30
}

retryConfig := []retry.WithBackoffConfigurator{
retry.WithMaxAttemptTime(time.Second * time.Duration(config.MaxRetrySeconds)),
}

if config.ExecuteInterval == 0 {
config.ExecuteInterval = 2
}
Expand Down Expand Up @@ -239,6 +248,7 @@ func NewExecutor(ctx context.Context, config executor.Config, executorDB db.Exec
chainExecutors: chainExecutors,
handler: handler,
txSubmitter: txSubmitter,
retryConfig: retryConfig,
NowFunc: time.Now,
}, nil
}
Expand Down Expand Up @@ -340,8 +350,16 @@ func (e Executor) Execute(parentCtx context.Context, message types.Message) (_ b
return false, nil
}

proof, err := e.chainExecutors[message.OriginDomain()].merkleTree.MerkleProof(*nonce-1, (*state).Nonce())
var proof [][]byte
contractCall := func(ctx context.Context) error {
proof, err = e.chainExecutors[message.OriginDomain()].merkleTree.MerkleProof(*nonce-1, (*state).Nonce())
if err != nil {
return fmt.Errorf("could not get merkle proof: %w", err)
}

return nil
}
err = retry.WithBackoff(ctx, contractCall, e.retryConfig...)
if err != nil {
return false, fmt.Errorf("could not get merkle proof: %w", err)
}
Expand Down Expand Up @@ -425,12 +443,31 @@ func (e Executor) Execute(parentCtx context.Context, message types.Message) (_ b

// verifyMessageMerkleProof verifies a message against the merkle tree at the state of the given nonce.
func (e Executor) verifyMessageMerkleProof(message types.Message) (bool, error) {
root, err := e.chainExecutors[message.OriginDomain()].merkleTree.Root(message.Nonce())
var root []byte
contractCall := func(ctx context.Context) error {
var err error
root, err = e.chainExecutors[message.OriginDomain()].merkleTree.Root(message.Nonce())
if err != nil {
return fmt.Errorf("could not get root: %w", err)
}

return nil
}
err := retry.WithBackoff(context.Background(), contractCall, e.retryConfig...)
if err != nil {
return false, fmt.Errorf("could not get root: %w", err)
}

proof, err := e.chainExecutors[message.OriginDomain()].merkleTree.MerkleProof(message.Nonce()-1, message.Nonce())
var proof [][]byte
contractCall = func(ctx context.Context) error {
proof, err = e.chainExecutors[message.OriginDomain()].merkleTree.MerkleProof(message.Nonce()-1, message.Nonce())
if err != nil {
return fmt.Errorf("could not get merkle proof: %w", err)
}

return nil
}
err = retry.WithBackoff(context.Background(), contractCall, e.retryConfig...)
if err != nil {
return false, fmt.Errorf("could not get merkle proof: %w", err)
}
Expand Down Expand Up @@ -523,38 +560,25 @@ func (e Executor) verifyMessageOptimisticPeriod(parentCtx context.Context, messa
return nil, nil
}

b := &backoff.Backoff{
Factor: 2,
Jitter: true,
Min: 30 * time.Millisecond,
Max: 3 * time.Second,
}

timeout := time.Duration(0)

var currentTime uint64
chainCall := func(ctx context.Context) error {
var err error
latestHeader, err := e.chainExecutors[chainID].rpcClient.HeaderByNumber(ctx, nil)
if err != nil {
return fmt.Errorf("could not get latest header: %w", err)
}

retryLoop:
for {
select {
case <-ctx.Done():
return nil, fmt.Errorf("context canceled: %w", ctx.Err())
case <-time.After(timeout):
if b.Attempt() >= rpcRetry {
return nil, fmt.Errorf("could not get latest header: %w", err)
}

latestHeader, err := e.chainExecutors[destinationDomain].rpcClient.HeaderByNumber(ctx, nil)
if err != nil {
timeout = b.Duration()

continue
}
if latestHeader == nil {
return fmt.Errorf("latest header is nil")
}

currentTime = latestHeader.Time
currentTime = latestHeader.Time

break retryLoop
}
return nil
}
err = retry.WithBackoff(ctx, chainCall, e.retryConfig...)
if err != nil {
return nil, fmt.Errorf("could not get latest header: %w", err)
}

if *messageMinimumTime > currentTime {
Expand Down Expand Up @@ -616,43 +640,28 @@ func (e Executor) checkIfExecuted(parentCtx context.Context, message types.Messa
metrics.EndSpanWithErr(span, err)
}()

b := &backoff.Backoff{
Factor: 2,
Jitter: true,
Min: 30 * time.Millisecond,
Max: 3 * time.Second,
}

timeout := time.Duration(0)

for {
select {
case <-ctx.Done():
return false, fmt.Errorf("context canceled: %w", ctx.Err())
case <-time.After(timeout):
if b.Attempt() >= rpcRetry {
return false, fmt.Errorf("could not get executed status: %w", ctx.Err())
}

executed, err := e.chainExecutors[message.DestinationDomain()].boundDestination.MessageStatus(ctx, message)
if err != nil {
timeout = b.Duration()
span.AddEvent("could not get executed status",
trace.WithAttributes(attribute.String("error", err.Error())),
trace.WithAttributes(attribute.String("timeout", timeout.String())),
)
continue
}
var executed uint8
contractCall := func(ctx context.Context) error {
var err error
executed, err = e.chainExecutors[message.DestinationDomain()].boundDestination.MessageStatus(ctx, message)
if err != nil {
return fmt.Errorf("could not get executed status: %w", err)
}

if execTypes.MessageStatusType(executed) == execTypes.Success {
span.AddEvent("message executed")
return true, nil
}
return nil
}
err = retry.WithBackoff(ctx, contractCall, e.retryConfig...)
if err != nil {
return false, fmt.Errorf("could not get executed status: %w", err)
}

span.AddEvent("message not executed")
return false, nil
}
if execTypes.MessageStatusType(executed) == execTypes.Success {
span.AddEvent("message executed")
return true, nil
}

span.AddEvent("message not executed")
return false, nil
}

// streamLogs uses gRPC to stream logs into a channel.
Expand Down
63 changes: 30 additions & 33 deletions agents/agents/executor/executor_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,15 @@ import (
"context"
"fmt"
"math/big"
"time"

"github.com/ethereum/go-ethereum/common"
ethTypes "github.com/ethereum/go-ethereum/core/types"
"github.com/jpillora/backoff"
"github.com/synapsecns/sanguine/agents/contracts/inbox"
"github.com/synapsecns/sanguine/agents/contracts/lightinbox"
"github.com/synapsecns/sanguine/agents/contracts/origin"
"github.com/synapsecns/sanguine/agents/contracts/summit"
"github.com/synapsecns/sanguine/agents/types"
"github.com/synapsecns/sanguine/core/retry"
)

// logToMessage converts the log to a leaf data.
Expand Down Expand Up @@ -154,19 +153,30 @@ func (e Executor) processMessage(ctx context.Context, message types.Message, log

// processAttestation processes and stores an attestation.
func (e Executor) processSnapshot(ctx context.Context, snapshot types.Snapshot, logBlockNumber uint64) error {
for _, state := range snapshot.States() {
for _, s := range snapshot.States() {
state := s
statePayload, err := state.Encode()
if err != nil {
return fmt.Errorf("could not encode state: %w", err)
}
// Verify that the state is valid w.r.t. Origin.
valid, err := e.chainExecutors[state.Origin()].boundOrigin.IsValidState(
ctx,
statePayload,
)
var valid bool
contractCall := func(ctx context.Context) error {
valid, err = e.chainExecutors[state.Origin()].boundOrigin.IsValidState(
ctx,
statePayload,
)
if err != nil {
return fmt.Errorf("could not check validity of state: %w", err)
}

return nil
}
err = retry.WithBackoff(ctx, contractCall, e.retryConfig...)
if err != nil {
return fmt.Errorf("could not check validity of state: %w", err)
}

if !valid {
stateRoot := state.Root()
logger.Infof("snapshot has invalid state. Origin: %d. SnapshotRoot: %s", state.Origin(), common.BytesToHash(stateRoot[:]).String())
Expand Down Expand Up @@ -199,36 +209,23 @@ func (e Executor) processAttestation(ctx context.Context, attestation types.Atte
}

// If the attestation is on a remote chain, we need to fetch the timestamp via an RPC call.
b := &backoff.Backoff{
Factor: 2,
Jitter: true,
Min: 30 * time.Millisecond,
Max: 3 * time.Second,
}

timeout := time.Duration(0)

var logHeader *ethTypes.Header
var err error
contractCall := func(ctx context.Context) error {
logHeader, err = e.chainExecutors[chainID].rpcClient.HeaderByNumber(ctx, big.NewInt(int64(logBlockNumber)))
if err != nil {
return fmt.Errorf("could not get log header: %w", err)
}

retryLoop:
for {
select {
case <-ctx.Done():
return fmt.Errorf("context canceled: %w", ctx.Err())
case <-time.After(timeout):
if b.Attempt() >= rpcRetry {
return fmt.Errorf("could not get log header: %w", err)
}
logHeader, err = e.chainExecutors[chainID].rpcClient.HeaderByNumber(ctx, big.NewInt(int64(logBlockNumber)))
if err != nil {
timeout = b.Duration()

continue
}
return nil
}
err = retry.WithBackoff(ctx, contractCall, e.retryConfig...)
if err != nil {
return fmt.Errorf("could not get log header: %w", err)
}

break retryLoop
}
if logHeader == nil {
return fmt.Errorf("could not get log header")
}

err = e.executorDB.StoreAttestation(ctx, attestation, chainID, logBlockNumber, logHeader.Time)
Expand Down
31 changes: 21 additions & 10 deletions agents/agents/guard/guard.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,11 @@ func NewGuard(ctx context.Context, cfg config.AgentConfig, omniRPCClient omnirpc
guard.originLatestStates = make(map[uint32]types.State, len(guard.domains))
guard.handler = handler
guard.txSubmitter = submitter.NewTransactionSubmitter(handler, guard.unbondedSigner, omniRPCClient, guardDB.SubmitterDB(), &cfg.SubmitterConfig)

if cfg.MaxRetrySeconds == 0 {
cfg.MaxRetrySeconds = 60
}

guard.retryConfig = []retry.WithBackoffConfigurator{
retry.WithMaxAttemptTime(time.Second * time.Duration(cfg.MaxRetrySeconds)),
}
Expand Down Expand Up @@ -276,16 +281,19 @@ func (g Guard) loadSummitLatestStates(parentCtx context.Context) {
))

originID := domain.Config().DomainID
latestState, err := g.domains[g.summitDomainID].Summit().GetLatestAgentState(ctx, originID, g.bondedSigner)
if err != nil {
latestState = nil
logger.Errorf("Failed calling GetLatestAgentState for originID %d on the Summit contract: err = %v", originID, err)
span.AddEvent("Failed calling GetLatestAgentState for originID on the Summit contract", trace.WithAttributes(
attribute.Int("originID", int(originID)),
attribute.String("err", err.Error()),
))

var latestState types.State
var err error
contractCall := func(ctx context.Context) error {
latestState, err = g.domains[g.summitDomainID].Summit().GetLatestAgentState(ctx, originID, g.bondedSigner)
if err != nil {
return fmt.Errorf("failed calling GetLatestAgentState for originID %d on the Summit contract: err = %w", originID, err)
}

return nil
}
if latestState != nil && latestState.Nonce() > uint32(0) {
err = retry.WithBackoff(ctx, contractCall, g.retryConfig...)
if err == nil && latestState.Nonce() > uint32(0) {
g.summitLatestStates[originID] = latestState
}

Expand All @@ -295,12 +303,15 @@ func (g Guard) loadSummitLatestStates(parentCtx context.Context) {

//nolint:cyclop
func (g Guard) loadOriginLatestStates(parentCtx context.Context) {
for _, domain := range g.domains {
for _, d := range g.domains {
domain := d
ctx, span := g.handler.Tracer().Start(parentCtx, "loadOriginLatestStates", trace.WithAttributes(
attribute.Int("domain", int(domain.Config().DomainID)),
))

originID := domain.Config().DomainID

// TODO: Wrap this with a retry if `Start` behavior changes.
latestState, err := domain.Origin().SuggestLatestState(ctx)
if err != nil {
latestState = nil
Expand Down
Loading

0 comments on commit 2ee5234

Please sign in to comment.