diff --git a/docs/bridge/docs/rfq/Relayer/Relayer.md b/docs/bridge/docs/rfq/Relayer/Relayer.md index 49555381c9..612dcb25ed 100644 --- a/docs/bridge/docs/rfq/Relayer/Relayer.md +++ b/docs/bridge/docs/rfq/Relayer/Relayer.md @@ -193,6 +193,8 @@ The relayer is configured with a yaml file. The following is an example configur - `rebalance_interval` - How often to rebalance, formatted as (s = seconds, m = minutes, h = hours) - `relayer_api_port` - the relayer api is used to control the relayer. This api should be secured/not public. - `base_chain_config`: Base chain config is the default config applied for each chain if the other chains do not override it. This is covered in the chains section. + - `enable_guard` - Run a guard on the same instance. + - `submit_single_quotes` - Wether to use the batch endpoint for posting quotes to the api. This can be useful for debugging. - `chains` - each chain has a different config that overrides base_chain_config. Here are the parameters for each chain - `rfq_address` - the address of the rfq contract on this chain. These addresses are available [here](../Contracts.md). diff --git a/ethergo/backends/anvil/anvil.go b/ethergo/backends/anvil/anvil.go index 9c95d15da9..2f631bedec 100644 --- a/ethergo/backends/anvil/anvil.go +++ b/ethergo/backends/anvil/anvil.go @@ -80,8 +80,10 @@ func NewAnvilBackend(ctx context.Context, t *testing.T, args *OptionBuilder) *Ba runOptions := &dockertest.RunOptions{ Repository: "ghcr.io/foundry-rs/foundry", - Tag: "nightly-deb3116955eea4333f9e4e4516104be4182e9ee2", - Cmd: []string{strings.Join(append([]string{"anvil"}, commandArgs...), " ")}, + Tag: "nightly-1bac1b3d79243cea755800bf396c30a3d74741bf", + Platform: "linux/amd64", + + Cmd: []string{strings.Join(append([]string{"anvil"}, commandArgs...), " ")}, Labels: map[string]string{ "test-id": uuid.New().String(), }, diff --git a/ethergo/submitter/submitter.go b/ethergo/submitter/submitter.go index 0823d690cc..2d0a781688 100644 --- a/ethergo/submitter/submitter.go +++ b/ethergo/submitter/submitter.go @@ -48,6 +48,8 @@ type TransactionSubmitter interface { GetSubmissionStatus(ctx context.Context, chainID *big.Int, nonce uint64) (status SubmissionStatus, err error) // Address returns the address of the signer. Address() common.Address + // Started returns whether the submitter is running. + Started() bool } // txSubmitterImpl is the implementation of the transaction submitter. @@ -83,6 +85,10 @@ type txSubmitterImpl struct { // distinctChainIDs is the distinct chain ids for the transaction submitter. // note: this map should not be appended to! distinctChainIDs []*big.Int + // started indicates whether the submitter has started. + started bool + // startMux is the mutex for started. + startMux sync.RWMutex } // ClientFetcher is the interface for fetching a chain client. @@ -107,6 +113,13 @@ func NewTransactionSubmitter(metrics metrics.Handler, signer signer.Signer, fetc } } +// Started returns whether the submitter is running. +func (t *txSubmitterImpl) Started() bool { + t.startMux.RLock() + defer t.startMux.RUnlock() + return t.started +} + // GetRetryInterval returns the retry interval for the transaction submitter. func (t *txSubmitterImpl) GetRetryInterval() time.Duration { retryInterval := time.Second * 2 @@ -126,9 +139,29 @@ func (t *txSubmitterImpl) GetDistinctInterval() time.Duration { return retryInterval } +// attemptMarkStarted attempts to mark the submitter as started. +// if the submitter is already started, an error is returned. +func (t *txSubmitterImpl) attemptMarkStarted() error { + t.startMux.Lock() + defer t.startMux.Unlock() + if t.started { + return ErrSubmitterAlreadyStarted + } + t.started = true + return nil +} + +// ErrSubmitterAlreadyStarted is the error for when the submitter is already started. +var ErrSubmitterAlreadyStarted = errors.New("submitter already started") + // Start starts the transaction submitter. // nolint: cyclop func (t *txSubmitterImpl) Start(parentCtx context.Context) (err error) { + err = t.attemptMarkStarted() + if err != nil { + return err + } + t.otelRecorder, err = newOtelRecorder(t.metrics, t.signer) if err != nil { return fmt.Errorf("could not create otel recorder: %w", err) @@ -313,6 +346,9 @@ func (t *txSubmitterImpl) triggerProcessQueue(ctx context.Context) { } } +// ErrNotStarted is the error for when the submitter is not started. +var ErrNotStarted = errors.New("submitter is not started") + // nolint: cyclop func (t *txSubmitterImpl) SubmitTransaction(parentCtx context.Context, chainID *big.Int, call ContractCallType) (nonce uint64, err error) { ctx, span := t.metrics.Tracer().Start(parentCtx, "submitter.SubmitTransaction", trace.WithAttributes( @@ -324,6 +360,10 @@ func (t *txSubmitterImpl) SubmitTransaction(parentCtx context.Context, chainID * metrics.EndSpanWithErr(span, err) }() + if !t.Started() { + logger.Errorf("%v in a future version, this will hard error", ErrNotStarted.Error()) + } + // make sure we have a client for this chain. chainClient, err := t.fetcher.GetClient(ctx, chainID) if err != nil { diff --git a/services/rfq/contracts/fastbridge/events.go b/services/rfq/contracts/fastbridge/events.go index 9ebbc6c5d6..d1e9bef16e 100644 --- a/services/rfq/contracts/fastbridge/events.go +++ b/services/rfq/contracts/fastbridge/events.go @@ -17,6 +17,8 @@ var ( BridgeProofProvidedTopic common.Hash // BridgeDepositClaimedTopic is the topic emitted by a bridge relay. BridgeDepositClaimedTopic common.Hash + // BridgeProofDisputedTopic is the topic emitted by a bridge dispute. + BridgeProofDisputedTopic common.Hash ) // static checks to make sure topics actually exist. @@ -32,6 +34,7 @@ func init() { BridgeRelayedTopic = parsedABI.Events["BridgeRelayed"].ID BridgeProofProvidedTopic = parsedABI.Events["BridgeProofProvided"].ID BridgeDepositClaimedTopic = parsedABI.Events["BridgeDepositClaimed"].ID + BridgeProofDisputedTopic = parsedABI.Events["BridgeProofDisputed"].ID _, err = parsedABI.EventByID(BridgeRequestedTopic) if err != nil { @@ -47,6 +50,11 @@ func init() { if err != nil { panic(err) } + + _, err = parsedABI.EventByID(BridgeProofDisputedTopic) + if err != nil { + panic(err) + } } // topicMap maps events to topics. @@ -57,6 +65,7 @@ func topicMap() map[EventType]common.Hash { BridgeRelayedEvent: BridgeRelayedTopic, BridgeProofProvidedEvent: BridgeProofProvidedTopic, BridgeDepositClaimedEvent: BridgeDepositClaimedTopic, + BridgeDisputeEvent: BridgeProofDisputedTopic, } } diff --git a/services/rfq/contracts/fastbridge/eventtype_string.go b/services/rfq/contracts/fastbridge/eventtype_string.go index 35a39eddf4..10e419b234 100644 --- a/services/rfq/contracts/fastbridge/eventtype_string.go +++ b/services/rfq/contracts/fastbridge/eventtype_string.go @@ -12,11 +12,12 @@ func _() { _ = x[BridgeRelayedEvent-2] _ = x[BridgeProofProvidedEvent-3] _ = x[BridgeDepositClaimedEvent-4] + _ = x[BridgeDisputeEvent-5] } -const _EventType_name = "BridgeRequestedEventBridgeRelayedEventBridgeProofProvidedEventBridgeDepositClaimedEvent" +const _EventType_name = "BridgeRequestedEventBridgeRelayedEventBridgeProofProvidedEventBridgeDepositClaimedEventBridgeDisputeEvent" -var _EventType_index = [...]uint8{0, 20, 38, 62, 87} +var _EventType_index = [...]uint8{0, 20, 38, 62, 87, 105} func (i EventType) String() string { i -= 1 diff --git a/services/rfq/contracts/fastbridge/parser.go b/services/rfq/contracts/fastbridge/parser.go index 19bbe73024..29faa55352 100644 --- a/services/rfq/contracts/fastbridge/parser.go +++ b/services/rfq/contracts/fastbridge/parser.go @@ -20,6 +20,8 @@ const ( BridgeProofProvidedEvent // BridgeDepositClaimedEvent is the event type for the BridgeDepositClaimed event. BridgeDepositClaimedEvent + // BridgeDisputeEvent is the event type for the BridgeDispute event. + BridgeDisputeEvent ) // Parser parses events from the fastbridge contracat. @@ -82,6 +84,13 @@ func (p parserImpl) ParseEvent(log ethTypes.Log) (_ EventType, event interface{} return noOpEvent, nil, false } return eventType, claimed, true + case BridgeDisputeEvent: + disputed, err := p.filterer.ParseBridgeProofDisputed(log) + if err != nil { + return noOpEvent, nil, false + } + return eventType, disputed, true + } return eventType, nil, true diff --git a/services/rfq/e2e/rfq_test.go b/services/rfq/e2e/rfq_test.go index e8a76a29df..f75f3a9586 100644 --- a/services/rfq/e2e/rfq_test.go +++ b/services/rfq/e2e/rfq_test.go @@ -1,6 +1,7 @@ package e2e_test import ( + "fmt" "math/big" "testing" "time" @@ -19,6 +20,8 @@ import ( omnirpcClient "github.com/synapsecns/sanguine/services/omnirpc/client" "github.com/synapsecns/sanguine/services/rfq/api/client" "github.com/synapsecns/sanguine/services/rfq/contracts/fastbridge" + "github.com/synapsecns/sanguine/services/rfq/guard/guarddb" + guardService "github.com/synapsecns/sanguine/services/rfq/guard/service" "github.com/synapsecns/sanguine/services/rfq/relayer/chain" "github.com/synapsecns/sanguine/services/rfq/relayer/reldb" "github.com/synapsecns/sanguine/services/rfq/relayer/service" @@ -36,9 +39,12 @@ type IntegrationSuite struct { omniClient omnirpcClient.RPCClient metrics metrics.Handler store reldb.Service + guardStore guarddb.Service apiServer string relayer *service.Relayer + guard *guardService.Guard relayerWallet wallet.Wallet + guardWallet wallet.Wallet userWallet wallet.Wallet } @@ -67,11 +73,6 @@ const ( func (i *IntegrationSuite) SetupTest() { i.TestSuite.SetupTest() - // TODO: no need for this when anvil CI issues are fixed - if core.GetEnvBool("CI", false) { - return - } - i.manager = testutil.NewDeployManager(i.T()) i.cctpDeployManager = cctpTest.NewDeployManager(i.T()) // TODO: consider jaeger @@ -82,6 +83,7 @@ func (i *IntegrationSuite) SetupTest() { // setup the api server i.setupQuoterAPI() i.setupRelayer() + i.setupGuard() } // getOtherBackend gets the backend that is not the current one. This is a helper @@ -96,9 +98,13 @@ func (i *IntegrationSuite) getOtherBackend(backend backends.SimulatedTestBackend } func (i *IntegrationSuite) TestUSDCtoUSDC() { - if core.GetEnvBool("CI", false) { - i.T().Skip("skipping until anvil issues are fixed in CI") - } + // start the relayer and guard + go func() { + _ = i.relayer.Start(i.GetTestContext()) + }() + go func() { + _ = i.guard.Start(i.GetTestContext()) + }() // load token contracts const startAmount = 1000 @@ -240,13 +246,26 @@ func (i *IntegrationSuite) TestUSDCtoUSDC() { i.NoError(err) return len(originPendingRebals) > 0 }) + + i.Eventually(func() bool { + // verify that the guard has marked the tx as validated + results, err := i.guardStore.GetPendingProvensByStatus(i.GetTestContext(), guarddb.Validated) + i.NoError(err) + return len(results) == 1 + }) } // nolint: cyclop func (i *IntegrationSuite) TestETHtoETH() { - if core.GetEnvBool("CI", false) { - i.T().Skip("skipping until anvil issues are fixed in CI") - } + + // start the relayer and guard + go func() { + _ = i.relayer.Start(i.GetTestContext()) + }() + go func() { + _ = i.guard.Start(i.GetTestContext()) + }() + // Send ETH to the relayer on destination const initialBalance = 10 i.destBackend.FundAccount(i.GetTestContext(), i.relayerWallet.Address(), *big.NewInt(initialBalance)) @@ -347,4 +366,108 @@ func (i *IntegrationSuite) TestETHtoETH() { } return false }) + + i.Eventually(func() bool { + // verify that the guard has marked the tx as validated + results, err := i.guardStore.GetPendingProvensByStatus(i.GetTestContext(), guarddb.Validated) + i.NoError(err) + return len(results) == 1 + }) +} + +func (i *IntegrationSuite) TestDispute() { + // start the guard + go func() { + _ = i.guard.Start(i.GetTestContext()) + }() + + // load token contracts + const startAmount = 1000 + const rfqAmount = 900 + opts := i.destBackend.GetTxContext(i.GetTestContext(), nil) + destUSDC, destUSDCHandle := i.cctpDeployManager.GetMockMintBurnTokenType(i.GetTestContext(), i.destBackend) + realStartAmount, err := testutil.AdjustAmount(i.GetTestContext(), big.NewInt(startAmount), destUSDC.ContractHandle()) + i.NoError(err) + realRFQAmount, err := testutil.AdjustAmount(i.GetTestContext(), big.NewInt(rfqAmount), destUSDC.ContractHandle()) + i.NoError(err) + + // add initial usdc to relayer on destination + tx, err := destUSDCHandle.MintPublic(opts.TransactOpts, i.relayerWallet.Address(), realStartAmount) + i.Nil(err) + i.destBackend.WaitForConfirmation(i.GetTestContext(), tx) + i.Approve(i.destBackend, destUSDC, i.relayerWallet) + + // add initial USDC to relayer on origin + optsOrigin := i.originBackend.GetTxContext(i.GetTestContext(), nil) + originUSDC, originUSDCHandle := i.cctpDeployManager.GetMockMintBurnTokenType(i.GetTestContext(), i.originBackend) + tx, err = originUSDCHandle.MintPublic(optsOrigin.TransactOpts, i.relayerWallet.Address(), realStartAmount) + i.Nil(err) + i.originBackend.WaitForConfirmation(i.GetTestContext(), tx) + i.Approve(i.originBackend, originUSDC, i.relayerWallet) + + // add initial USDC to user on origin + tx, err = originUSDCHandle.MintPublic(optsOrigin.TransactOpts, i.userWallet.Address(), realRFQAmount) + i.Nil(err) + i.originBackend.WaitForConfirmation(i.GetTestContext(), tx) + i.Approve(i.originBackend, originUSDC, i.userWallet) + + // now we can send the money + _, originFastBridge := i.manager.GetFastBridge(i.GetTestContext(), i.originBackend) + auth := i.originBackend.GetTxContext(i.GetTestContext(), i.userWallet.AddressPtr()) + // we want 499 usdc for 500 requested within a day + tx, err = originFastBridge.Bridge(auth.TransactOpts, fastbridge.IFastBridgeBridgeParams{ + DstChainId: uint32(i.destBackend.GetChainID()), + To: i.userWallet.Address(), + OriginToken: originUSDC.Address(), + SendChainGas: true, + DestToken: destUSDC.Address(), + OriginAmount: realRFQAmount, + DestAmount: new(big.Int).Sub(realRFQAmount, big.NewInt(10_000_000)), + Deadline: new(big.Int).SetInt64(time.Now().Add(time.Hour * 24).Unix()), + }) + i.NoError(err) + i.originBackend.WaitForConfirmation(i.GetTestContext(), tx) + + // fetch the txid and raw request + var txID [32]byte + var rawRequest []byte + parser, err := fastbridge.NewParser(originFastBridge.Address()) + i.NoError(err) + i.Eventually(func() bool { + receipt, err := i.originBackend.TransactionReceipt(i.GetTestContext(), tx.Hash()) + i.NoError(err) + for _, log := range receipt.Logs { + _, parsedEvent, ok := parser.ParseEvent(*log) + if !ok { + continue + } + event, ok := parsedEvent.(*fastbridge.FastBridgeBridgeRequested) + if ok { + rawRequest = event.Request + txID = event.TransactionId + return true + } + } + return false + }) + + // call prove() from the relayer wallet before relay actually occurred on dest + relayerAuth := i.originBackend.GetTxContext(i.GetTestContext(), i.relayerWallet.AddressPtr()) + fakeHash := common.HexToHash("0xdeadbeef") + tx, err = originFastBridge.Prove(relayerAuth.TransactOpts, rawRequest, fakeHash) + i.NoError(err) + i.originBackend.WaitForConfirmation(i.GetTestContext(), tx) + + // verify that the guard calls Dispute() + i.Eventually(func() bool { + results, err := i.guardStore.GetPendingProvensByStatus(i.GetTestContext(), guarddb.Disputed) + i.NoError(err) + if len(results) != 1 { + return false + } + fmt.Printf("GOT RESULTS: %v\n", results) + result, err := i.guardStore.GetPendingProvenByID(i.GetTestContext(), txID) + i.NoError(err) + return result.TxHash == fakeHash && result.Status == guarddb.Disputed && result.TransactionID == txID + }) } diff --git a/services/rfq/e2e/setup_test.go b/services/rfq/e2e/setup_test.go index 6df213a31e..460047ffc8 100644 --- a/services/rfq/e2e/setup_test.go +++ b/services/rfq/e2e/setup_test.go @@ -21,7 +21,6 @@ import ( "github.com/synapsecns/sanguine/ethergo/backends" "github.com/synapsecns/sanguine/ethergo/backends/anvil" "github.com/synapsecns/sanguine/ethergo/backends/base" - "github.com/synapsecns/sanguine/ethergo/backends/geth" "github.com/synapsecns/sanguine/ethergo/contracts" signerConfig "github.com/synapsecns/sanguine/ethergo/signer/config" "github.com/synapsecns/sanguine/ethergo/signer/wallet" @@ -32,6 +31,9 @@ import ( "github.com/synapsecns/sanguine/services/rfq/api/db/sql" "github.com/synapsecns/sanguine/services/rfq/api/rest" "github.com/synapsecns/sanguine/services/rfq/contracts/ierc20" + "github.com/synapsecns/sanguine/services/rfq/guard/guardconfig" + guardConnect "github.com/synapsecns/sanguine/services/rfq/guard/guarddb/connect" + guardService "github.com/synapsecns/sanguine/services/rfq/guard/service" "github.com/synapsecns/sanguine/services/rfq/relayer/chain" "github.com/synapsecns/sanguine/services/rfq/relayer/relconfig" "github.com/synapsecns/sanguine/services/rfq/relayer/reldb/connect" @@ -94,6 +96,9 @@ func (i *IntegrationSuite) setupBackends() { i.relayerWallet, err = wallet.FromRandom() i.NoError(err) + i.guardWallet, err = wallet.FromRandom() + i.NoError(err) + i.userWallet, err = wallet.FromRandom() i.NoError(err) @@ -108,7 +113,9 @@ func (i *IntegrationSuite) setupBackends() { }() go func() { defer wg.Done() - i.destBackend = geth.NewEmbeddedBackendForChainID(i.GetTestContext(), i.T(), big.NewInt(destBackendChainID)) + options := anvil.NewAnvilOptionBuilder() + options.SetChainID(destBackendChainID) + i.destBackend = anvil.NewAnvilBackend(i.GetTestContext(), i.T(), options) i.setupBE(i.destBackend) }() wg.Wait() @@ -132,10 +139,12 @@ func (i *IntegrationSuite) setupBE(backend backends.SimulatedTestBackend) { // store the keys backend.Store(base.WalletToKey(i.T(), i.relayerWallet)) + backend.Store(base.WalletToKey(i.T(), i.guardWallet)) backend.Store(base.WalletToKey(i.T(), i.userWallet)) // fund each of the wallets backend.FundAccount(i.GetTestContext(), i.relayerWallet.Address(), ethAmount) + backend.FundAccount(i.GetTestContext(), i.guardWallet.Address(), ethAmount) backend.FundAccount(i.GetTestContext(), i.userWallet.Address(), ethAmount) go func() { @@ -144,7 +153,7 @@ func (i *IntegrationSuite) setupBE(backend backends.SimulatedTestBackend) { // TODO: in the case of relayer this not finishing before the test starts can lead to race conditions since // nonce may be shared between submitter and relayer. Think about how to deal w/ this. - for _, user := range []wallet.Wallet{i.relayerWallet, i.userWallet} { + for _, user := range []wallet.Wallet{i.relayerWallet, i.guardWallet, i.userWallet} { go func(userWallet wallet.Wallet) { for _, token := range predeployTokens { i.Approve(backend, i.manager.Get(i.GetTestContext(), backend, token), userWallet) @@ -217,36 +226,14 @@ func (i *IntegrationSuite) Approve(backend backends.SimulatedTestBackend, token } } -func (i *IntegrationSuite) setupRelayer() { - // add myself as a filler - var wg sync.WaitGroup - wg.Add(2) - - for _, backend := range core.ToSlice(i.originBackend, i.destBackend) { - go func(backend backends.SimulatedTestBackend) { - defer wg.Done() - - metadata, rfqContract := i.manager.GetFastBridge(i.GetTestContext(), backend) - - txContext := backend.GetTxContext(i.GetTestContext(), metadata.OwnerPtr()) - relayerRole, err := rfqContract.RELAYERROLE(&bind.CallOpts{Context: i.GetTestContext()}) - i.NoError(err) - - tx, err := rfqContract.GrantRole(txContext.TransactOpts, relayerRole, i.relayerWallet.Address()) - i.NoError(err) - - backend.WaitForConfirmation(i.GetTestContext(), tx) - }(backend) - } - wg.Wait() - +func (i *IntegrationSuite) getRelayerConfig() relconfig.Config { // construct the config relayerAPIPort, err := freeport.GetFreePort() i.NoError(err) dsn := filet.TmpDir(i.T(), "") cctpContractOrigin, _ := i.cctpDeployManager.GetSynapseCCTP(i.GetTestContext(), i.originBackend) cctpContractDest, _ := i.cctpDeployManager.GetSynapseCCTP(i.GetTestContext(), i.destBackend) - cfg := relconfig.Config{ + return relconfig.Config{ // generated ex-post facto Chains: map[int]relconfig.ChainConfig{ originBackendChainID: { @@ -300,6 +287,32 @@ func (i *IntegrationSuite) setupRelayer() { }, RebalanceInterval: 0, } +} + +func (i *IntegrationSuite) setupRelayer() { + // add myself as a filler + var wg sync.WaitGroup + wg.Add(2) + + for _, backend := range core.ToSlice(i.originBackend, i.destBackend) { + go func(backend backends.SimulatedTestBackend) { + defer wg.Done() + + metadata, rfqContract := i.manager.GetFastBridge(i.GetTestContext(), backend) + + txContext := backend.GetTxContext(i.GetTestContext(), metadata.OwnerPtr()) + relayerRole, err := rfqContract.RELAYERROLE(&bind.CallOpts{Context: i.GetTestContext()}) + i.NoError(err) + + tx, err := rfqContract.GrantRole(txContext.TransactOpts, relayerRole, i.relayerWallet.Address()) + i.NoError(err) + + backend.WaitForConfirmation(i.GetTestContext(), tx) + }(backend) + } + wg.Wait() + + cfg := i.getRelayerConfig() // in the first backend, we want to deploy a bunch of different tokens // TODO: functionalize me. @@ -374,15 +387,52 @@ func (i *IntegrationSuite) setupRelayer() { fmt.Sprintf("%d-%s", originBackendChainID, chain.EthAddress), } - // TODO: good chance we wanna leave actually starting this up to the indiividual test. + var err error i.relayer, err = service.NewRelayer(i.GetTestContext(), i.metrics, cfg) i.NoError(err) - go func() { - err = i.relayer.Start(i.GetTestContext()) - }() dbType, err := dbcommon.DBTypeFromString(cfg.Database.Type) i.NoError(err) i.store, err = connect.Connect(i.GetTestContext(), dbType, cfg.Database.DSN, i.metrics) i.NoError(err) } + +func (i *IntegrationSuite) setupGuard() { + // add myself as a guard + var wg sync.WaitGroup + wg.Add(2) + + for _, backend := range core.ToSlice(i.originBackend, i.destBackend) { + go func(backend backends.SimulatedTestBackend) { + defer wg.Done() + + metadata, rfqContract := i.manager.GetFastBridge(i.GetTestContext(), backend) + + txContext := backend.GetTxContext(i.GetTestContext(), metadata.OwnerPtr()) + guardRole, err := rfqContract.GUARDROLE(&bind.CallOpts{Context: i.GetTestContext()}) + i.NoError(err) + + tx, err := rfqContract.GrantRole(txContext.TransactOpts, guardRole, i.guardWallet.Address()) + i.NoError(err) + + backend.WaitForConfirmation(i.GetTestContext(), tx) + }(backend) + } + wg.Wait() + + relayerCfg := i.getRelayerConfig() + guardCfg := guardconfig.NewGuardConfigFromRelayer(relayerCfg) + guardCfg.Signer = signerConfig.SignerConfig{ + Type: signerConfig.FileType.String(), + File: filet.TmpFile(i.T(), "", i.guardWallet.PrivateKeyHex()).Name(), + } + + var err error + i.guard, err = guardService.NewGuard(i.GetTestContext(), i.metrics, guardCfg, nil) + i.NoError(err) + + dbType, err := dbcommon.DBTypeFromString(guardCfg.Database.Type) + i.NoError(err) + i.guardStore, err = guardConnect.Connect(i.GetTestContext(), dbType, guardCfg.Database.DSN, i.metrics) + i.NoError(err) +} diff --git a/services/rfq/guard/cmd/cmd.go b/services/rfq/guard/cmd/cmd.go new file mode 100644 index 0000000000..5a171b500f --- /dev/null +++ b/services/rfq/guard/cmd/cmd.go @@ -0,0 +1,35 @@ +package cmd + +import ( + "fmt" + + "github.com/synapsecns/sanguine/core/commandline" + "github.com/synapsecns/sanguine/core/config" + "github.com/synapsecns/sanguine/core/metrics" + "github.com/urfave/cli/v2" +) + +// Start starts the command line tool. +func Start(args []string, buildInfo config.BuildInfo) { + app := cli.NewApp() + app.Name = buildInfo.Name() + app.Description = buildInfo.VersionString() + "Synapse RFQ Guard" + app.Usage = fmt.Sprintf("%s --help", buildInfo.Name()) + app.EnableBashCompletion = true + // TODO: should we really halt boot on because of metrics? + app.Before = func(c *cli.Context) error { + // nolint:wrapcheck + return metrics.Setup(c.Context, buildInfo) + } + + // commands + app.Commands = cli.Commands{runCommand} + shellCommand := commandline.GenerateShellCommand(app.Commands) + app.Commands = append(app.Commands, shellCommand) + app.Action = shellCommand.Action + + err := app.Run(args) + if err != nil { + panic(err) + } +} diff --git a/services/rfq/guard/cmd/commands.go b/services/rfq/guard/cmd/commands.go new file mode 100644 index 0000000000..48ff0a46cb --- /dev/null +++ b/services/rfq/guard/cmd/commands.go @@ -0,0 +1,46 @@ +// Package cmd provides the command line interface for the RFQ guard service +package cmd + +import ( + "fmt" + + "github.com/synapsecns/sanguine/core" + "github.com/synapsecns/sanguine/core/commandline" + "github.com/synapsecns/sanguine/core/metrics" + "github.com/synapsecns/sanguine/services/rfq/guard/guardconfig" + "github.com/synapsecns/sanguine/services/rfq/guard/service" + "github.com/urfave/cli/v2" +) + +var configFlag = &cli.StringFlag{ + Name: "config", + Usage: "path to the config file", + TakesFile: true, +} + +// runCommand runs the rfq guard. +var runCommand = &cli.Command{ + Name: "run", + Description: "run the guard", + Flags: []cli.Flag{configFlag, &commandline.LogLevel}, + Action: func(c *cli.Context) (err error) { + commandline.SetLogLevel(c) + cfg, err := guardconfig.LoadConfig(core.ExpandOrReturnPath(c.String(configFlag.Name))) + if err != nil { + return fmt.Errorf("could not read config file: %w", err) + } + + metricsProvider := metrics.Get() + + guard, err := service.NewGuard(c.Context, metricsProvider, cfg, nil) + if err != nil { + return fmt.Errorf("could not create guard: %w", err) + } + + err = guard.Start(c.Context) + if err != nil { + return fmt.Errorf("could not start guard: %w", err) + } + return nil + }, +} diff --git a/services/rfq/guard/guardconfig/config.go b/services/rfq/guard/guardconfig/config.go new file mode 100644 index 0000000000..eeed2f3a24 --- /dev/null +++ b/services/rfq/guard/guardconfig/config.go @@ -0,0 +1,123 @@ +// Package guardconfig contains the config yaml object for the relayer. +package guardconfig + +import ( + "fmt" + "os" + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/jftuga/ellipsis" + "github.com/synapsecns/sanguine/ethergo/signer/config" + submitterConfig "github.com/synapsecns/sanguine/ethergo/submitter/config" + "github.com/synapsecns/sanguine/services/rfq/relayer/relconfig" + "gopkg.in/yaml.v2" + + "path/filepath" +) + +// Config represents the configuration for the relayer. +type Config struct { + // Chains is a map of chainID -> chain config. + Chains map[int]ChainConfig `yaml:"chains"` + // OmniRPCURL is the URL of the OmniRPC server. + OmniRPCURL string `yaml:"omnirpc_url"` + // Database is the database config. + Database DatabaseConfig `yaml:"database"` + // Signer is the signer config. + Signer config.SignerConfig `yaml:"signer"` + // SubmitterConfig is the submitter config. + SubmitterConfig submitterConfig.Config `yaml:"submitter_config"` + // DBSelectorInterval is the interval for the db selector. + DBSelectorInterval time.Duration `yaml:"db_selector_interval"` +} + +// ChainConfig represents the configuration for a chain. +type ChainConfig struct { + // Bridge is the rfq bridge contract address. + RFQAddress string `yaml:"rfq_address"` + // Confirmations is the number of required confirmations. + Confirmations uint64 `yaml:"confirmations"` +} + +// DatabaseConfig represents the configuration for the database. +type DatabaseConfig struct { + Type string `yaml:"type"` + DSN string `yaml:"dsn"` // Data Source Name +} + +// LoadConfig loads the config from the given path. +func LoadConfig(path string) (config Config, err error) { + input, err := os.ReadFile(filepath.Clean(path)) + if err != nil { + return Config{}, fmt.Errorf("failed to read file: %w", err) + } + err = yaml.Unmarshal(input, &config) + if err != nil { + return Config{}, fmt.Errorf("could not unmarshall config %s: %w", ellipsis.Shorten(string(input), 30), err) + } + err = config.Validate() + if err != nil { + return config, fmt.Errorf("error validating config: %w", err) + } + return config, nil +} + +// Validate validates the config. +func (c Config) Validate() (err error) { + for chainID := range c.Chains { + addr, err := c.GetRFQAddress(chainID) + if err != nil { + return fmt.Errorf("could not get rfq address: %w", err) + } + if !common.IsHexAddress(addr) { + return fmt.Errorf("invalid rfq address: %s", addr) + } + } + + return nil +} + +// GetChains returns the chains config. +func (c Config) GetChains() map[int]ChainConfig { + return c.Chains +} + +// GetRFQAddress returns the RFQ address for the given chain. +func (c Config) GetRFQAddress(chainID int) (string, error) { + chainCfg, ok := c.Chains[chainID] + if !ok { + return "", fmt.Errorf("chain config not found for chain %d", chainID) + } + return chainCfg.RFQAddress, nil +} + +const defaultDBSelectorIntervalSeconds = 1 + +// GetDBSelectorInterval returns the interval for the DB selector. +func (c Config) GetDBSelectorInterval() time.Duration { + interval := c.DBSelectorInterval + if interval <= 0 { + interval = time.Duration(defaultDBSelectorIntervalSeconds) * time.Second + } + return interval +} + +// NewGuardConfigFromRelayer creates a new guard config from a relayer config. +func NewGuardConfigFromRelayer(relayerCfg relconfig.Config) Config { + cfg := Config{ + Chains: make(map[int]ChainConfig), + OmniRPCURL: relayerCfg.OmniRPCURL, + Database: DatabaseConfig(relayerCfg.Database), + Signer: relayerCfg.Signer, + SubmitterConfig: relayerCfg.SubmitterConfig, + DBSelectorInterval: relayerCfg.DBSelectorInterval, + } + for chainID, chainCfg := range relayerCfg.GetChains() { + cfg.Chains[chainID] = ChainConfig{ + RFQAddress: chainCfg.RFQAddress, + Confirmations: chainCfg.Confirmations, + } + } + return cfg +} diff --git a/services/rfq/guard/guarddb/base/bridge.go b/services/rfq/guard/guarddb/base/bridge.go new file mode 100644 index 0000000000..965d49eced --- /dev/null +++ b/services/rfq/guard/guarddb/base/bridge.go @@ -0,0 +1,44 @@ +package base + +import ( + "context" + "errors" + "fmt" + + "github.com/ethereum/go-ethereum/common/hexutil" + "github.com/synapsecns/sanguine/services/rfq/guard/guarddb" + "gorm.io/gorm" + "gorm.io/gorm/clause" +) + +// StoreBridgeRequest stores a quote request. +func (s Store) StoreBridgeRequest(ctx context.Context, request guarddb.BridgeRequest) error { + model := FromBridgeRequest(request) + dbTx := s.DB().WithContext(ctx).Clauses(clause.OnConflict{ + Columns: []clause.Column{{Name: transactionIDFieldName}}, + DoUpdates: clause.AssignmentColumns([]string{transactionIDFieldName}), + }).Create(&model) + if dbTx.Error != nil { + return fmt.Errorf("could not store request: %w", dbTx.Error) + } + return nil +} + +// GetBridgeRequestByID gets a quote request by id. Should return ErrNoBridgeRequestForID if not found. +func (s Store) GetBridgeRequestByID(ctx context.Context, id [32]byte) (*guarddb.BridgeRequest, error) { + var modelResult BridgeRequestModel + tx := s.DB().WithContext(ctx).Where(fmt.Sprintf("%s = ?", transactionIDFieldName), hexutil.Encode(id[:])).First(&modelResult) + if errors.Is(tx.Error, gorm.ErrRecordNotFound) { + return nil, guarddb.ErrNoBridgeRequestForID + } + + if tx.Error != nil { + return nil, fmt.Errorf("could not get request") + } + + qr, err := modelResult.ToBridgeRequest() + if err != nil { + return nil, err + } + return qr, nil +} diff --git a/services/rfq/guard/guarddb/base/doc.go b/services/rfq/guard/guarddb/base/doc.go new file mode 100644 index 0000000000..a693590881 --- /dev/null +++ b/services/rfq/guard/guarddb/base/doc.go @@ -0,0 +1,2 @@ +// Package base contains the base implementation for different sql driers. +package base diff --git a/services/rfq/guard/guarddb/base/model.go b/services/rfq/guard/guarddb/base/model.go new file mode 100644 index 0000000000..b8dd1795ef --- /dev/null +++ b/services/rfq/guard/guarddb/base/model.go @@ -0,0 +1,182 @@ +package base + +import ( + "errors" + "fmt" + "math/big" + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/common/hexutil" + "github.com/synapsecns/sanguine/core/dbcommon" + "github.com/synapsecns/sanguine/services/rfq/contracts/fastbridge" + "github.com/synapsecns/sanguine/services/rfq/guard/guarddb" +) + +func init() { + namer := dbcommon.NewNamer(GetAllModels()) + statusFieldName = namer.GetConsistentName("Status") + transactionIDFieldName = namer.GetConsistentName("TransactionID") +} + +var ( + // statusFieldName is the status field name. + statusFieldName string + // transactionIDFieldName is the transactions id field name. + transactionIDFieldName string +) + +// PendingProvenModel is the primary event model. +type PendingProvenModel struct { + // CreatedAt is the creation time + CreatedAt time.Time + // UpdatedAt is the update time + UpdatedAt time.Time + // Origin is the origin chain id + Origin uint32 + // TransactionID is the transaction id of the event + TransactionID string `gorm:"column:transaction_id;primaryKey"` + // TxHash is the hash of the relay transaction on destination + TxHash string + // Status is the status of the event + Status guarddb.PendingProvenStatus +} + +// FromPendingProven converts a quote request to an object that can be stored in the db. +func FromPendingProven(proven guarddb.PendingProven) PendingProvenModel { + return PendingProvenModel{ + Origin: proven.Origin, + TransactionID: hexutil.Encode(proven.TransactionID[:]), + TxHash: proven.TxHash.Hex(), + Status: proven.Status, + } +} + +// ToPendingProven converts a db object to a pending proven. +func (p PendingProvenModel) ToPendingProven() (*guarddb.PendingProven, error) { + txID, err := hexutil.Decode(p.TransactionID) + if err != nil { + return nil, fmt.Errorf("could not get transaction id: %w", err) + } + + transactionID, err := sliceToArray(txID) + if err != nil { + return nil, fmt.Errorf("could not convert transaction id: %w", err) + } + + return &guarddb.PendingProven{ + Origin: p.Origin, + TransactionID: transactionID, + TxHash: common.HexToHash(p.TxHash), + Status: p.Status, + }, nil +} + +// BridgeRequestModel is the primary event model. +type BridgeRequestModel struct { + // CreatedAt is the creation time + CreatedAt time.Time + // UpdatedAt is the update time + UpdatedAt time.Time + // TransactionID is the transaction id of the event + TransactionID string `gorm:"column:transaction_id;primaryKey"` + // OriginChainID is the origin chain for the transactions + OriginChainID uint32 + // DestChainID is the destination chain for the tx + DestChainID uint32 + // OriginSender is the original sender + OriginSender string + // DestRecipient is the recipient of the destination tx + DestRecipient string + // OriginToken is the origin token address + OriginToken string + // DestToken is the destination token address + DestToken string + // OriginAmount is the origin amount stored for sorting. + // This is not the source of truth, but is approximate + OriginAmount string + // DestAmount is the destination amount stored for sorting. + DestAmount string + // Deadline is the deadline for the relay + Deadline time.Time `gorm:"index"` + // OriginNonce is the nonce on the origin chain in the app. + // this is not effected by the message.sender nonce. + OriginNonce int `gorm:"index"` + // RawRequest is the raw request, hex encoded. + RawRequest string + // SendChainGas is true if the chain should send gas + SendChainGas bool +} + +// FromBridgeRequest converts a bridge request object to db model. +func FromBridgeRequest(request guarddb.BridgeRequest) BridgeRequestModel { + return BridgeRequestModel{ + TransactionID: hexutil.Encode(request.TransactionID[:]), + OriginChainID: request.Transaction.OriginChainId, + DestChainID: request.Transaction.DestChainId, + OriginSender: request.Transaction.OriginSender.String(), + DestRecipient: request.Transaction.DestRecipient.String(), + OriginToken: request.Transaction.OriginToken.String(), + RawRequest: hexutil.Encode(request.RawRequest), + SendChainGas: request.Transaction.SendChainGas, + DestToken: request.Transaction.DestToken.String(), + OriginAmount: request.Transaction.OriginAmount.String(), + DestAmount: request.Transaction.DestAmount.String(), + Deadline: time.Unix(int64(request.Transaction.Deadline.Uint64()), 0), + OriginNonce: int(request.Transaction.Nonce.Uint64()), + } +} + +// ToBridgeRequest converts the bridge request db model to object. +func (b BridgeRequestModel) ToBridgeRequest() (*guarddb.BridgeRequest, error) { + txID, err := hexutil.Decode(b.TransactionID) + if err != nil { + return nil, fmt.Errorf("could not get transaction id: %w", err) + } + + req, err := hexutil.Decode(b.RawRequest) + if err != nil { + return nil, fmt.Errorf("could not get request: %w", err) + } + + transactionID, err := sliceToArray(txID) + if err != nil { + return nil, fmt.Errorf("could not convert transaction id: %w", err) + } + + originAmount, ok := new(big.Int).SetString(b.OriginAmount, 10) + if !ok { + return nil, errors.New("could not convert origin amount") + } + destAmount, ok := new(big.Int).SetString(b.DestAmount, 10) + if !ok { + return nil, errors.New("could not convert dest amount") + } + + return &guarddb.BridgeRequest{ + TransactionID: transactionID, + RawRequest: req, + Transaction: fastbridge.IFastBridgeBridgeTransaction{ + OriginChainId: b.OriginChainID, + DestChainId: b.DestChainID, + OriginSender: common.HexToAddress(b.OriginSender), + DestRecipient: common.HexToAddress(b.DestRecipient), + OriginToken: common.HexToAddress(b.OriginToken), + SendChainGas: b.SendChainGas, + DestToken: common.HexToAddress(b.DestToken), + OriginAmount: originAmount, + DestAmount: destAmount, + Deadline: big.NewInt(b.Deadline.Unix()), + Nonce: big.NewInt(int64(b.OriginNonce)), + }, + }, nil +} + +func sliceToArray(slice []byte) ([32]byte, error) { + var arr [32]byte + if len(slice) != 32 { + return arr, errors.New("slice is not 32 bytes long") + } + copy(arr[:], slice) + return arr, nil +} diff --git a/services/rfq/guard/guarddb/base/proven.go b/services/rfq/guard/guarddb/base/proven.go new file mode 100644 index 0000000000..32da3915f6 --- /dev/null +++ b/services/rfq/guard/guarddb/base/proven.go @@ -0,0 +1,80 @@ +package base + +import ( + "context" + "errors" + "fmt" + + "github.com/ethereum/go-ethereum/common/hexutil" + "github.com/synapsecns/sanguine/services/rfq/guard/guarddb" + "gorm.io/gorm" + "gorm.io/gorm/clause" +) + +// StorePendingProven stores a quote request. +func (s Store) StorePendingProven(ctx context.Context, proven guarddb.PendingProven) error { + model := FromPendingProven(proven) + dbTx := s.DB().WithContext(ctx).Clauses(clause.OnConflict{ + Columns: []clause.Column{{Name: transactionIDFieldName}}, + DoUpdates: clause.AssignmentColumns([]string{transactionIDFieldName}), + }).Create(&model) + if dbTx.Error != nil { + return fmt.Errorf("could not store proven: %w", dbTx.Error) + } + return nil +} + +// UpdatePendingProvenStatus updates the status of a pending proven. +func (s Store) UpdatePendingProvenStatus(ctx context.Context, id [32]byte, status guarddb.PendingProvenStatus) error { + tx := s.DB().WithContext(ctx).Model(&PendingProvenModel{}). + Where(fmt.Sprintf("%s = ?", transactionIDFieldName), hexutil.Encode(id[:])). + Update(statusFieldName, status) + if tx.Error != nil { + return fmt.Errorf("could not update: %w", tx.Error) + } + return nil +} + +// GetPendingProvensByStatus gets pending provens by status. +func (s Store) GetPendingProvensByStatus(ctx context.Context, matchStatuses ...guarddb.PendingProvenStatus) (res []*guarddb.PendingProven, _ error) { + var provenResults []PendingProvenModel + + inArgs := make([]int, len(matchStatuses)) + for i := range matchStatuses { + inArgs[i] = int(matchStatuses[i].Int()) + } + + // TODO: consider pagination + tx := s.DB().WithContext(ctx).Model(&PendingProvenModel{}).Where(fmt.Sprintf("%s IN ?", statusFieldName), inArgs).Find(&provenResults) + if tx.Error != nil { + return []*guarddb.PendingProven{}, fmt.Errorf("could not get db results: %w", tx.Error) + } + + for _, result := range provenResults { + marshaled, err := result.ToPendingProven() + if err != nil { + return []*guarddb.PendingProven{}, fmt.Errorf("could not get provens") + } + res = append(res, marshaled) + } + return res, nil +} + +// GetPendingProvenByID gets a quote request by id. Should return ErrNoProvenForID if not found. +func (s Store) GetPendingProvenByID(ctx context.Context, id [32]byte) (*guarddb.PendingProven, error) { + var modelResult PendingProvenModel + tx := s.DB().WithContext(ctx).Where(fmt.Sprintf("%s = ?", transactionIDFieldName), hexutil.Encode(id[:])).First(&modelResult) + if errors.Is(tx.Error, gorm.ErrRecordNotFound) { + return nil, guarddb.ErrNoProvenForID + } + + if tx.Error != nil { + return nil, fmt.Errorf("could not get proven") + } + + qr, err := modelResult.ToPendingProven() + if err != nil { + return nil, err + } + return qr, nil +} diff --git a/services/rfq/guard/guarddb/base/store.go b/services/rfq/guard/guarddb/base/store.go new file mode 100644 index 0000000000..b4e08a9bae --- /dev/null +++ b/services/rfq/guard/guarddb/base/store.go @@ -0,0 +1,44 @@ +package base + +import ( + "github.com/synapsecns/sanguine/core/metrics" + listenerDB "github.com/synapsecns/sanguine/ethergo/listener/db" + submitterDB "github.com/synapsecns/sanguine/ethergo/submitter/db" + "github.com/synapsecns/sanguine/ethergo/submitter/db/txdb" + "github.com/synapsecns/sanguine/services/rfq/guard/guarddb" + "gorm.io/gorm" +) + +// Store implements the service. +type Store struct { + listenerDB.ChainListenerDB + db *gorm.DB + submitterStore submitterDB.Service +} + +// NewStore creates a new store. +func NewStore(db *gorm.DB, metrics metrics.Handler) *Store { + txDB := txdb.NewTXStore(db, metrics) + + return &Store{ChainListenerDB: listenerDB.NewChainListenerStore(db, metrics), db: db, submitterStore: txDB} +} + +// DB gets the database object for mutation outside of the lib. +func (s Store) DB() *gorm.DB { + return s.db +} + +// SubmitterDB gets the submitter database object for mutation outside of the lib. +func (s Store) SubmitterDB() submitterDB.Service { + return s.submitterStore +} + +// GetAllModels gets all models to migrate +// see: https://medium.com/@SaifAbid/slice-interfaces-8c78f8b6345d for an explanation of why we can't do this at initialization time +func GetAllModels() (allModels []interface{}) { + allModels = append(txdb.GetAllModels(), &PendingProvenModel{}, &BridgeRequestModel{}) + allModels = append(allModels, listenerDB.GetAllModels()...) + return allModels +} + +var _ guarddb.Service = &Store{} diff --git a/services/rfq/guard/guarddb/connect/sql.go b/services/rfq/guard/guarddb/connect/sql.go new file mode 100644 index 0000000000..1fe39f9024 --- /dev/null +++ b/services/rfq/guard/guarddb/connect/sql.go @@ -0,0 +1,39 @@ +// Package connect contains the database connection logic for the RFQ relayer. +// TODO: this is a dumb name for a package in a dumb place. Move it somewhere else. +package connect + +import ( + "context" + "errors" + "fmt" + + "github.com/synapsecns/sanguine/core/dbcommon" + "github.com/synapsecns/sanguine/core/metrics" + "github.com/synapsecns/sanguine/services/rfq/guard/guarddb" + "github.com/synapsecns/sanguine/services/rfq/guard/guarddb/mysql" + "github.com/synapsecns/sanguine/services/rfq/guard/guarddb/sqlite" +) + +// Connect connects to the database. +func Connect(ctx context.Context, dbType dbcommon.DBType, path string, metrics metrics.Handler) (guarddb.Service, error) { + switch dbType { + case dbcommon.Mysql: + store, err := mysql.NewMysqlStore(ctx, path, metrics) + if err != nil { + return nil, fmt.Errorf("could not create mysql store: %w", err) + } + + return store, nil + case dbcommon.Sqlite: + store, err := sqlite.NewSqliteStore(ctx, path, metrics) + if err != nil { + return nil, fmt.Errorf("could not create sqlite store: %w", err) + } + + return store, nil + case dbcommon.Clickhouse: + return nil, errors.New("driver not supported") + default: + return nil, fmt.Errorf("unsupported driver: %s", dbType) + } +} diff --git a/services/rfq/guard/guarddb/db.go b/services/rfq/guard/guarddb/db.go new file mode 100644 index 0000000000..236a5b7e2c --- /dev/null +++ b/services/rfq/guard/guarddb/db.go @@ -0,0 +1,116 @@ +package guarddb + +import ( + "context" + "database/sql/driver" + "errors" + "fmt" + + "github.com/synapsecns/sanguine/ethergo/listener/db" + "github.com/synapsecns/sanguine/services/rfq/contracts/fastbridge" + + "github.com/ethereum/go-ethereum/common" + "github.com/synapsecns/sanguine/core/dbcommon" + submitterDB "github.com/synapsecns/sanguine/ethergo/submitter/db" +) + +var ( + // ErrNoProvenForID means the proven was not found. + ErrNoProvenForID = errors.New("no proven found for tx id") + // ErrNoBridgeRequestForID means the bridge request was not found. + ErrNoBridgeRequestForID = errors.New("no bridge request found for tx id") +) + +// Writer is the interface for writing to the database. +type Writer interface { + // StoreBridgeRequest stores a bridge request. + StoreBridgeRequest(ctx context.Context, request BridgeRequest) error + // StorePendingProven stores a pending proven. + StorePendingProven(ctx context.Context, proven PendingProven) error + // UpdatePendingProvenStatus updates the status of a pending proven. + UpdatePendingProvenStatus(ctx context.Context, id [32]byte, status PendingProvenStatus) error +} + +// Reader is the interface for reading from the database. +type Reader interface { + // GetPendingProvensByStatus gets pending provens by status. + GetPendingProvensByStatus(ctx context.Context, matchStatuses ...PendingProvenStatus) ([]*PendingProven, error) + // GetPendingProvenByID gets a pending proven by id. Should return ErrNoProvenForID if not found + GetPendingProvenByID(ctx context.Context, id [32]byte) (*PendingProven, error) + // GetBridgeRequestByID gets a bridge request by id. Should return ErrNoBridgeRequestForID if not found + GetBridgeRequestByID(ctx context.Context, id [32]byte) (*BridgeRequest, error) +} + +// Service is the interface for the database service. +type Service interface { + Reader + // SubmitterDB returns the submitter database service. + SubmitterDB() submitterDB.Service + Writer + db.ChainListenerDB +} + +// BridgeRequest is the bridge request object. +type BridgeRequest struct { + TransactionID [32]byte + Transaction fastbridge.IFastBridgeBridgeTransaction + RawRequest []byte +} + +// PendingProven is the pending proven object. +type PendingProven struct { + Origin uint32 + TransactionID [32]byte + TxHash common.Hash + Status PendingProvenStatus +} + +// PendingProvenStatus is the status of a quote request in the db. +// This is the primary mechanism for moving data through the app. +// +// TODO: consider making this an interface and exporting that. +// +// EXTREMELY IMPORTANT: DO NOT ADD NEW VALUES TO THIS ENUM UNLESS THEY ARE AT THE END. +// +//go:generate go run golang.org/x/tools/cmd/stringer -type=PendingProvenStatus +type PendingProvenStatus uint8 + +const ( + // ProveCalled means the prove() function has been called. + ProveCalled PendingProvenStatus = iota + 1 + // Validated means the prove() call has been properly validated on the dest chain. + Validated + // DisputePending means dispute() has been called in the event of an invalid prove(). + DisputePending + // Disputed means the dispute() call has been confirmed. + Disputed +) + +// Int returns the int value of the quote request status. +func (q PendingProvenStatus) Int() uint8 { + return uint8(q) +} + +// GormDataType implements the gorm common interface for enums. +func (q PendingProvenStatus) GormDataType() string { + return dbcommon.EnumDataType +} + +// Scan implements the gorm common interface for enums. +func (q *PendingProvenStatus) Scan(src any) error { + res, err := dbcommon.EnumScan(src) + if err != nil { + return fmt.Errorf("could not scan %w", err) + } + newStatus := PendingProvenStatus(res) + *q = newStatus + return nil +} + +// Value implements the gorm common interface for enums. +func (q PendingProvenStatus) Value() (driver.Value, error) { + // nolint: wrapcheck + return dbcommon.EnumValue(q) +} + +var _ dbcommon.Enum = (*PendingProvenStatus)(nil) diff --git a/services/rfq/guard/guarddb/doc.go b/services/rfq/guard/guarddb/doc.go new file mode 100644 index 0000000000..c81d904fed --- /dev/null +++ b/services/rfq/guard/guarddb/doc.go @@ -0,0 +1,3 @@ +// Package guarddb contains the database interface for the rfq guard. +// All data store types must confrm to this interface. +package guarddb diff --git a/services/rfq/guard/guarddb/mysql/mysql.go b/services/rfq/guard/guarddb/mysql/mysql.go new file mode 100644 index 0000000000..0048e56638 --- /dev/null +++ b/services/rfq/guard/guarddb/mysql/mysql.go @@ -0,0 +1,63 @@ +// Package mysql provides a common interface for starting mysql databases +package mysql + +import ( + "context" + "fmt" + "time" + + "github.com/ipfs/go-log" + "github.com/synapsecns/sanguine/core/dbcommon" + "github.com/synapsecns/sanguine/core/metrics" + "github.com/synapsecns/sanguine/services/rfq/guard/guarddb/base" + "gorm.io/driver/mysql" + "gorm.io/gorm" + "gorm.io/gorm/schema" +) + +var logger = log.Logger("mysql-logger") + +// Store is the mysql store. It extends the base store for mysql specific queries. +type Store struct { + *base.Store +} + +// MaxIdleConns is exported here for testing. Tests execute too slowly with a reconnect each time. +var MaxIdleConns = 0 + +// NamingStrategy is used to exported here for testing. +var NamingStrategy = schema.NamingStrategy{} + +// NewMysqlStore creates a new mysql store for a given data store. +func NewMysqlStore(ctx context.Context, dbURL string, handler metrics.Handler) (*Store, error) { + logger.Debug("create mysql store") + + gdb, err := gorm.Open(mysql.Open(dbURL), &gorm.Config{ + Logger: dbcommon.GetGormLogger(logger), + FullSaveAssociations: true, + NamingStrategy: NamingStrategy, + NowFunc: time.Now, + }) + + if err != nil { + return nil, fmt.Errorf("could not create mysql connection: %w", err) + } + + sqlDB, err := gdb.DB() + if err != nil { + return nil, fmt.Errorf("could not get sql db: %w", err) + } + + // fixes a timeout issue https://stackoverflow.com/a/42146536 + sqlDB.SetMaxIdleConns(MaxIdleConns) + sqlDB.SetConnMaxLifetime(time.Hour) + + handler.AddGormCallbacks(gdb) + + err = gdb.WithContext(ctx).AutoMigrate(base.GetAllModels()...) + if err != nil { + return nil, fmt.Errorf("could not migrate on mysql: %w", err) + } + + return &Store{base.NewStore(gdb, handler)}, nil +} diff --git a/services/rfq/guard/guarddb/pendingprovenstatus_string.go b/services/rfq/guard/guarddb/pendingprovenstatus_string.go new file mode 100644 index 0000000000..31aa25e8e9 --- /dev/null +++ b/services/rfq/guard/guarddb/pendingprovenstatus_string.go @@ -0,0 +1,27 @@ +// Code generated by "stringer -type=PendingProvenStatus"; DO NOT EDIT. + +package guarddb + +import "strconv" + +func _() { + // An "invalid array index" compiler error signifies that the constant values have changed. + // Re-run the stringer command to generate them again. + var x [1]struct{} + _ = x[ProveCalled-1] + _ = x[Validated-2] + _ = x[DisputePending-3] + _ = x[Disputed-4] +} + +const _PendingProvenStatus_name = "ProveCalledValidatedDisputePendingDisputed" + +var _PendingProvenStatus_index = [...]uint8{0, 11, 20, 34, 42} + +func (i PendingProvenStatus) String() string { + i -= 1 + if i >= PendingProvenStatus(len(_PendingProvenStatus_index)-1) { + return "PendingProvenStatus(" + strconv.FormatInt(int64(i+1), 10) + ")" + } + return _PendingProvenStatus_name[_PendingProvenStatus_index[i]:_PendingProvenStatus_index[i+1]] +} diff --git a/services/rfq/guard/guarddb/sqlite/sqlite.go b/services/rfq/guard/guarddb/sqlite/sqlite.go new file mode 100644 index 0000000000..0a1d4c939e --- /dev/null +++ b/services/rfq/guard/guarddb/sqlite/sqlite.go @@ -0,0 +1,62 @@ +// Package sqlite provides a common interface for starting sql-lite databases +package sqlite + +import ( + "context" + "fmt" + "os" + + "github.com/synapsecns/sanguine/services/rfq/guard/guarddb" + "github.com/synapsecns/sanguine/services/rfq/guard/guarddb/base" + + "github.com/ipfs/go-log" + common_base "github.com/synapsecns/sanguine/core/dbcommon" + "github.com/synapsecns/sanguine/core/metrics" + "gorm.io/driver/sqlite" + "gorm.io/gorm" +) + +// Store is the sqlite store. It extends the base store for sqlite specific queries. +type Store struct { + *base.Store +} + +var logger = log.Logger("rfq-sqlite") + +// NewSqliteStore creates a new sqlite data store. +func NewSqliteStore(parentCtx context.Context, dbPath string, handler metrics.Handler) (_ *Store, err error) { + logger.Debugf("creating sqlite store at %s", dbPath) + + ctx, span := handler.Tracer().Start(parentCtx, "start-sqlite") + defer func() { + metrics.EndSpanWithErr(span, err) + }() + + // create the directory to the store if it doesn't exist + err = os.MkdirAll(dbPath, os.ModePerm) + if err != nil { + return nil, fmt.Errorf("could not create sqlite store") + } + + logger.Warnf("rfq database is at %s/synapse.db", dbPath) + + gdb, err := gorm.Open(sqlite.Open(fmt.Sprintf("%s/%s", dbPath, "synapse.db")), &gorm.Config{ + DisableForeignKeyConstraintWhenMigrating: true, + Logger: common_base.GetGormLogger(logger), + FullSaveAssociations: true, + SkipDefaultTransaction: true, + }) + if err != nil { + return nil, fmt.Errorf("could not connect to db %s: %w", dbPath, err) + } + + handler.AddGormCallbacks(gdb) + + err = gdb.WithContext(ctx).AutoMigrate(base.GetAllModels()...) + if err != nil { + return nil, fmt.Errorf("could not migrate models: %w", err) + } + return &Store{base.NewStore(gdb, handler)}, nil +} + +var _ guarddb.Service = &Store{} diff --git a/services/rfq/guard/service/doc.go b/services/rfq/guard/service/doc.go new file mode 100644 index 0000000000..30310f7a59 --- /dev/null +++ b/services/rfq/guard/service/doc.go @@ -0,0 +1,2 @@ +// Package service contains the core of the guard. +package service diff --git a/services/rfq/guard/service/guard.go b/services/rfq/guard/service/guard.go new file mode 100644 index 0000000000..eaf2e71823 --- /dev/null +++ b/services/rfq/guard/service/guard.go @@ -0,0 +1,258 @@ +package service + +import ( + "context" + "errors" + "fmt" + "time" + + "github.com/ethereum/go-ethereum/accounts/abi/bind" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ipfs/go-log" + "github.com/synapsecns/sanguine/core/dbcommon" + "github.com/synapsecns/sanguine/core/metrics" + "github.com/synapsecns/sanguine/ethergo/listener" + signerConfig "github.com/synapsecns/sanguine/ethergo/signer/config" + "github.com/synapsecns/sanguine/ethergo/submitter" + omniClient "github.com/synapsecns/sanguine/services/omnirpc/client" + "github.com/synapsecns/sanguine/services/rfq/contracts/fastbridge" + "github.com/synapsecns/sanguine/services/rfq/guard/guardconfig" + "github.com/synapsecns/sanguine/services/rfq/guard/guarddb" + "github.com/synapsecns/sanguine/services/rfq/guard/guarddb/connect" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" + "golang.org/x/sync/errgroup" +) + +var logger = log.Logger("guard") + +// Guard monitors calls to prove() and verifies them. +type Guard struct { + cfg guardconfig.Config + metrics metrics.Handler + db guarddb.Service + client omniClient.RPCClient + chainListeners map[int]listener.ContractListener + contracts map[int]*fastbridge.FastBridgeRef + txSubmitter submitter.TransactionSubmitter +} + +// NewGuard creates a new Guard. +// +//nolint:cyclop +func NewGuard(ctx context.Context, metricHandler metrics.Handler, cfg guardconfig.Config, txSubmitter submitter.TransactionSubmitter) (*Guard, error) { + omniClient := omniClient.NewOmnirpcClient(cfg.OmniRPCURL, metricHandler, omniClient.WithCaptureReqRes()) + chainListeners := make(map[int]listener.ContractListener) + + dbType, err := dbcommon.DBTypeFromString(cfg.Database.Type) + if err != nil { + return nil, fmt.Errorf("could not get db type: %w", err) + } + store, err := connect.Connect(ctx, dbType, cfg.Database.DSN, metricHandler) + if err != nil { + return nil, fmt.Errorf("could not make db: %w", err) + } + + // setup chain listeners + contracts := make(map[int]*fastbridge.FastBridgeRef) + for chainID := range cfg.GetChains() { + rfqAddr, err := cfg.GetRFQAddress(chainID) + if err != nil { + return nil, fmt.Errorf("could not get rfq address: %w", err) + } + chainClient, err := omniClient.GetChainClient(ctx, chainID) + if err != nil { + return nil, fmt.Errorf("could not get chain client: %w", err) + } + + contract, err := fastbridge.NewFastBridgeRef(common.HexToAddress(rfqAddr), chainClient) + if err != nil { + return nil, fmt.Errorf("could not create fast bridge contract: %w", err) + } + startBlock, err := contract.DeployBlock(&bind.CallOpts{Context: ctx}) + if err != nil { + return nil, fmt.Errorf("could not get deploy block: %w", err) + } + chainListener, err := listener.NewChainListener(chainClient, store, common.HexToAddress(rfqAddr), uint64(startBlock.Int64()), metricHandler) + if err != nil { + return nil, fmt.Errorf("could not get chain listener: %w", err) + } + chainListeners[chainID] = chainListener + + // setup FastBridge contract on this chain + contracts[chainID], err = fastbridge.NewFastBridgeRef(common.HexToAddress(rfqAddr), chainClient) + if err != nil { + return nil, fmt.Errorf("could not create bridge contract: %w", err) + } + } + + // build submitter from config if one is not supplied + if txSubmitter == nil { + sg, err := signerConfig.SignerFromConfig(ctx, cfg.Signer) + if err != nil { + return nil, fmt.Errorf("could not get signer: %w", err) + } + txSubmitter = submitter.NewTransactionSubmitter(metricHandler, sg, omniClient, store.SubmitterDB(), &cfg.SubmitterConfig) + } + + return &Guard{ + cfg: cfg, + metrics: metricHandler, + db: store, + client: omniClient, + chainListeners: chainListeners, + contracts: contracts, + txSubmitter: txSubmitter, + }, nil +} + +// Start starts the guard. +func (g *Guard) Start(ctx context.Context) (err error) { + group, ctx := errgroup.WithContext(ctx) + group.Go(func() error { + err := g.startChainIndexers(ctx) + if err != nil { + return fmt.Errorf("could not start chain indexers: %w", err) + } + return nil + }) + group.Go(func() error { + err = g.runDBSelector(ctx) + if err != nil { + return fmt.Errorf("could not start db selector: %w", err) + } + return nil + }) + + group.Go(func() error { + if !g.txSubmitter.Started() { + err = g.txSubmitter.Start(ctx) + // defensive coding against potential race. + if err != nil && !errors.Is(err, submitter.ErrSubmitterAlreadyStarted) { + return fmt.Errorf("could not start tx submitter: %w", err) + } + } + return nil + }) + + err = group.Wait() + if err != nil { + return fmt.Errorf("could not wait for group: %w", err) + } + + return nil +} + +func (g *Guard) runDBSelector(ctx context.Context) (err error) { + interval := g.cfg.GetDBSelectorInterval() + + for { + select { + case <-ctx.Done(): + return fmt.Errorf("could not run db selector: %w", ctx.Err()) + case <-time.After(interval): + err := g.processDB(ctx) + if err != nil { + return err + } + } + } +} + +func (g *Guard) startChainIndexers(ctx context.Context) (err error) { + group, ctx := errgroup.WithContext(ctx) + + for chainID := range g.cfg.GetChains() { + chainID := chainID // capture func literal + + group.Go(func() error { + err := g.runChainIndexer(ctx, chainID) + if err != nil { + return fmt.Errorf("could not runChainIndexer chain indexer for chain %d: %w", chainID, err) + } + return nil + }) + } + + err = group.Wait() + if err != nil { + return fmt.Errorf("could not run chain indexers") + } + + return nil +} + +//nolint:cyclop +func (g Guard) runChainIndexer(ctx context.Context, chainID int) (err error) { + chainListener := g.chainListeners[chainID] + + parser, err := fastbridge.NewParser(chainListener.Address()) + if err != nil { + return fmt.Errorf("could not parse: %w", err) + } + + err = chainListener.Listen(ctx, func(parentCtx context.Context, log types.Log) (err error) { + et, parsedEvent, ok := parser.ParseEvent(log) + // handle unknown event + if !ok { + if len(log.Topics) != 0 { + logger.Warnf("unknown event %s", log.Topics[0]) + } + return nil + } + + ctx, span := g.metrics.Tracer().Start(parentCtx, fmt.Sprintf("handleLog-%s", et), trace.WithAttributes( + attribute.String(metrics.TxHash, log.TxHash.String()), + attribute.Int(metrics.Origin, chainID), + attribute.String(metrics.Contract, log.Address.String()), + attribute.String("block_hash", log.BlockHash.String()), + attribute.Int64("block_number", int64(log.BlockNumber)), + )) + + defer func() { + metrics.EndSpanWithErr(span, err) + }() + + switch event := parsedEvent.(type) { + case *fastbridge.FastBridgeBridgeRequested: + err = g.handleBridgeRequestedLog(ctx, event, chainID) + if err != nil { + return fmt.Errorf("could not handle request: %w", err) + } + case *fastbridge.FastBridgeBridgeProofProvided: + err = g.handleProofProvidedLog(ctx, event, chainID) + if err != nil { + return fmt.Errorf("could not handle request: %w", err) + } + case *fastbridge.FastBridgeBridgeProofDisputed: + err = g.handleProofDisputedLog(ctx, event) + if err != nil { + return fmt.Errorf("could not handle request: %w", err) + } + } + + return nil + }) + + if err != nil { + return fmt.Errorf("listener failed: %w", err) + } + return nil +} + +func (g *Guard) processDB(ctx context.Context) (err error) { + provens, err := g.db.GetPendingProvensByStatus(ctx, guarddb.ProveCalled) + if err != nil { + return fmt.Errorf("could not get pending provens: %w", err) + } + + for _, proven := range provens { + err := g.handleProveCalled(ctx, proven) + if err != nil { + return fmt.Errorf("could not handle prove called: %w", err) + } + } + + return nil +} diff --git a/services/rfq/guard/service/handlers.go b/services/rfq/guard/service/handlers.go new file mode 100644 index 0000000000..cc98c7e4e7 --- /dev/null +++ b/services/rfq/guard/service/handlers.go @@ -0,0 +1,227 @@ +package service + +import ( + "context" + "errors" + "fmt" + "math/big" + "time" + + "github.com/ethereum/go-ethereum" + "github.com/ethereum/go-ethereum/accounts/abi/bind" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/common/hexutil" + "github.com/ethereum/go-ethereum/core/types" + "github.com/synapsecns/sanguine/core/metrics" + "github.com/synapsecns/sanguine/core/retry" + "github.com/synapsecns/sanguine/services/rfq/contracts/fastbridge" + "github.com/synapsecns/sanguine/services/rfq/guard/guarddb" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" +) + +var maxRPCRetryTime = 15 * time.Second + +func (g *Guard) handleBridgeRequestedLog(parentCtx context.Context, req *fastbridge.FastBridgeBridgeRequested, chainID int) (err error) { + ctx, span := g.metrics.Tracer().Start(parentCtx, "handleBridgeRequestedLog-guard", trace.WithAttributes( + attribute.Int(metrics.Origin, chainID), + attribute.String("transaction_id", hexutil.Encode(req.TransactionId[:])), + )) + defer func() { + metrics.EndSpanWithErr(span, err) + }() + + originClient, err := g.client.GetChainClient(ctx, chainID) + if err != nil { + return fmt.Errorf("could not get correct omnirpc client: %w", err) + } + + fastBridge, err := fastbridge.NewFastBridgeRef(req.Raw.Address, originClient) + if err != nil { + return fmt.Errorf("could not get correct fast bridge: %w", err) + } + + var bridgeTx fastbridge.IFastBridgeBridgeTransaction + call := func(ctx context.Context) error { + bridgeTx, err = fastBridge.GetBridgeTransaction(&bind.CallOpts{Context: ctx}, req.Request) + if err != nil { + return fmt.Errorf("could not get bridge transaction: %w", err) + } + return nil + } + err = retry.WithBackoff(ctx, call, retry.WithMaxTotalTime(maxRPCRetryTime)) + if err != nil { + return fmt.Errorf("could not make call: %w", err) + } + + dbReq := guarddb.BridgeRequest{ + RawRequest: req.Request, + TransactionID: req.TransactionId, + Transaction: bridgeTx, + } + err = g.db.StoreBridgeRequest(ctx, dbReq) + if err != nil { + return fmt.Errorf("could not get db: %w", err) + } + return nil +} + +func (g *Guard) handleProofProvidedLog(parentCtx context.Context, event *fastbridge.FastBridgeBridgeProofProvided, chainID int) (err error) { + ctx, span := g.metrics.Tracer().Start(parentCtx, "handleProofProvidedLog-guard", trace.WithAttributes( + attribute.Int(metrics.Origin, chainID), + attribute.String("transaction_id", hexutil.Encode(event.TransactionId[:])), + attribute.String("tx_hash", hexutil.Encode(event.TransactionHash[:])), + )) + defer func() { + metrics.EndSpanWithErr(span, err) + }() + + proven := guarddb.PendingProven{ + Origin: uint32(chainID), + TransactionID: event.TransactionId, + TxHash: event.TransactionHash, + Status: guarddb.ProveCalled, + } + err = g.db.StorePendingProven(ctx, proven) + if err != nil { + return fmt.Errorf("could not store pending proven: %w", err) + } + + return nil +} + +func (g *Guard) handleProofDisputedLog(parentCtx context.Context, event *fastbridge.FastBridgeBridgeProofDisputed) (err error) { + ctx, span := g.metrics.Tracer().Start(parentCtx, "handleProofDisputedLog-guard", trace.WithAttributes( + attribute.String("transaction_id", hexutil.Encode(event.TransactionId[:])), + )) + defer func() { + metrics.EndSpanWithErr(span, err) + }() + + err = g.db.UpdatePendingProvenStatus(ctx, event.TransactionId, guarddb.Disputed) + if err != nil { + return fmt.Errorf("could not update pending proven status: %w", err) + } + + return nil +} + +func (g *Guard) handleProveCalled(parentCtx context.Context, proven *guarddb.PendingProven) (err error) { + ctx, span := g.metrics.Tracer().Start(parentCtx, "handleProveCalled", trace.WithAttributes( + attribute.String("transaction_id", hexutil.Encode(proven.TransactionID[:])), + )) + defer func() { + metrics.EndSpanWithErr(span, err) + }() + + // first, get the corresponding bridge request + bridgeRequest, err := g.db.GetBridgeRequestByID(ctx, proven.TransactionID) + if err != nil { + return fmt.Errorf("could not get bridge request: %w", err) + } + + valid, err := g.isProveValid(ctx, proven, bridgeRequest) + if err != nil { + return fmt.Errorf("could not check prove validity: %w", err) + } + span.SetAttributes(attribute.Bool("valid", valid)) + + //nolint:nestif + if valid { + // mark as validated + err = g.db.UpdatePendingProvenStatus(ctx, proven.TransactionID, guarddb.Validated) + if err != nil { + return fmt.Errorf("could not update pending proven status: %w", err) + } + } else { + // trigger dispute + contract, ok := g.contracts[int(bridgeRequest.Transaction.OriginChainId)] + if !ok { + return fmt.Errorf("could not get contract for chain: %d", bridgeRequest.Transaction.OriginChainId) + } + _, err = g.txSubmitter.SubmitTransaction(ctx, big.NewInt(int64(bridgeRequest.Transaction.OriginChainId)), func(transactor *bind.TransactOpts) (tx *types.Transaction, err error) { + tx, err = contract.Dispute(transactor, proven.TransactionID) + if err != nil { + return nil, fmt.Errorf("could not dispute: %w", err) + } + + return tx, nil + }) + + if err != nil { + return fmt.Errorf("could not dispute: %w", err) + } + + // mark as dispute pending + err = g.db.UpdatePendingProvenStatus(ctx, proven.TransactionID, guarddb.DisputePending) + if err != nil { + return fmt.Errorf("could not update pending proven status: %w", err) + } + } + + return nil +} + +func (g *Guard) isProveValid(ctx context.Context, proven *guarddb.PendingProven, bridgeRequest *guarddb.BridgeRequest) (bool, error) { + // get the receipt for this tx on dest chain + chainClient, err := g.client.GetChainClient(ctx, int(bridgeRequest.Transaction.DestChainId)) + if err != nil { + return false, fmt.Errorf("could not get chain client: %w", err) + } + receipt, err := chainClient.TransactionReceipt(ctx, proven.TxHash) + if errors.Is(err, ethereum.NotFound) { + // if tx hash does not exist, we want to consider the proof invalid + return false, nil + } + if err != nil { + return false, fmt.Errorf("could not get receipt: %w", err) + } + addr, err := g.cfg.GetRFQAddress(int(bridgeRequest.Transaction.DestChainId)) + if err != nil { + return false, fmt.Errorf("could not get rfq address: %w", err) + } + parser, err := fastbridge.NewParser(common.HexToAddress(addr)) + if err != nil { + return false, fmt.Errorf("could not get parser: %w", err) + } + + for _, log := range receipt.Logs { + _, parsedEvent, ok := parser.ParseEvent(*log) + if !ok { + continue + } + + event, ok := parsedEvent.(*fastbridge.FastBridgeBridgeRelayed) + if ok { + return relayMatchesBridgeRequest(event, bridgeRequest), nil + } + } + + return false, nil +} + +func relayMatchesBridgeRequest(event *fastbridge.FastBridgeBridgeRelayed, bridgeRequest *guarddb.BridgeRequest) bool { + //TODO: is this exhaustive? + if event.TransactionId != bridgeRequest.TransactionID { + return false + } + if event.OriginAmount.Cmp(bridgeRequest.Transaction.OriginAmount) != 0 { + return false + } + if event.DestAmount.Cmp(bridgeRequest.Transaction.DestAmount) != 0 { + return false + } + if event.OriginChainId != bridgeRequest.Transaction.OriginChainId { + return false + } + if event.To != bridgeRequest.Transaction.DestRecipient { + return false + } + if event.OriginToken != bridgeRequest.Transaction.OriginToken { + return false + } + if event.DestToken != bridgeRequest.Transaction.DestToken { + return false + } + return true +} diff --git a/services/rfq/relayer/relconfig/config.go b/services/rfq/relayer/relconfig/config.go index 344f2c7b07..5d69fbd38f 100644 --- a/services/rfq/relayer/relconfig/config.go +++ b/services/rfq/relayer/relconfig/config.go @@ -55,6 +55,8 @@ type Config struct { EnableAPIWithdrawals bool `yaml:"enable_api_withdrawals"` // WithdrawalWhitelist is a list of addresses that are allowed to withdraw. WithdrawalWhitelist []string `yaml:"withdrawal_whitelist"` + // UseEmbeddedGuard enables the embedded guard. + UseEmbeddedGuard bool `yaml:"enable_guard"` // SubmitSingleQuotes enables submitting single quotes. SubmitSingleQuotes bool `yaml:"submit_single_quotes"` } diff --git a/services/rfq/relayer/reldb/doc.go b/services/rfq/relayer/reldb/doc.go index eceacfd65d..41125227cc 100644 --- a/services/rfq/relayer/reldb/doc.go +++ b/services/rfq/relayer/reldb/doc.go @@ -1,3 +1,3 @@ -// Package reldb contains the datbaase interface for the rfq relayer. +// Package reldb contains the database interface for the rfq relayer. // All data store types must confrm to this interface. package reldb diff --git a/services/rfq/relayer/service/relayer.go b/services/rfq/relayer/service/relayer.go index ace4355d54..5a95e3fd97 100644 --- a/services/rfq/relayer/service/relayer.go +++ b/services/rfq/relayer/service/relayer.go @@ -26,6 +26,8 @@ import ( omniClient "github.com/synapsecns/sanguine/services/omnirpc/client" rfqAPIClient "github.com/synapsecns/sanguine/services/rfq/api/client" "github.com/synapsecns/sanguine/services/rfq/contracts/fastbridge" + "github.com/synapsecns/sanguine/services/rfq/guard/guardconfig" + serviceGuard "github.com/synapsecns/sanguine/services/rfq/guard/service" "github.com/synapsecns/sanguine/services/rfq/relayer/inventory" "github.com/synapsecns/sanguine/services/rfq/relayer/pricer" "github.com/synapsecns/sanguine/services/rfq/relayer/quoter" @@ -266,6 +268,14 @@ func (r *Relayer) Start(ctx context.Context) (err error) { return nil }) + g.Go(func() error { + err = r.startGuard(ctx) + if err != nil { + return fmt.Errorf("could not start guard: %w", err) + } + return nil + }) + err = g.Wait() if err != nil { return fmt.Errorf("could not start: %w", err) @@ -326,6 +336,26 @@ func (r *Relayer) startCCTPRelayer(ctx context.Context) (err error) { return nil } +// startGuard starts the guard, if specified. +func (r *Relayer) startGuard(ctx context.Context) (err error) { + if !r.cfg.UseEmbeddedGuard { + return nil + } + + guardCfg := guardconfig.NewGuardConfigFromRelayer(r.cfg) + guard, err := serviceGuard.NewGuard(ctx, r.metrics, guardCfg, r.submitter) + if err != nil { + return fmt.Errorf("could not create guard: %w", err) + } + + err = guard.Start(ctx) + if err != nil { + return fmt.Errorf("could not start guard: %w", err) + } + + return nil +} + func (r *Relayer) processDB(ctx context.Context, serial bool, matchStatuses ...reldb.QuoteRequestStatus) (err error) { ctx, span := r.metrics.Tracer().Start(ctx, "processDB", trace.WithAttributes( attribute.Bool("serial", serial),