From b5d81802e32b038b5bcdd26f233b0cd4b3eca3fa Mon Sep 17 00:00:00 2001 From: jeff <113397187+cyberhorsey@users.noreply.github.com> Date: Mon, 12 Dec 2022 07:43:45 -0800 Subject: [PATCH] feat(relayer): only process profitable transactions (#408) --- packages/relayer/.golangci.yml | 2 +- packages/relayer/cli/cli.go | 18 ++++- packages/relayer/cli/cli_test.go | 2 +- packages/relayer/cmd/main.go | 7 ++ packages/relayer/errors.go | 7 +- packages/relayer/event.go | 10 ++- packages/relayer/flags.go | 2 + .../relayer/http/get_events_by_address.go | 23 ++++-- .../http/get_events_by_address_test.go | 8 ++ .../relayer/indexer/filter_then_subscribe.go | 10 ++- packages/relayer/indexer/service.go | 2 + packages/relayer/message/get_latest_nonce.go | 23 ++++++ .../relayer/message/get_latest_nonce_test.go | 19 +++++ packages/relayer/message/is_profitable.go | 41 +++++++++++ .../relayer/message/is_profitable_test.go | 73 +++++++++++++++++++ packages/relayer/message/process_message.go | 57 ++++----------- .../relayer/message/process_message_test.go | 51 +++++++------ packages/relayer/message/processor.go | 5 ++ packages/relayer/message/processor_test.go | 3 +- .../relayer/message/wait_for_confirmations.go | 28 +++++++ .../message/wait_for_confirmations_test.go | 16 ++++ packages/relayer/mock/bridge.go | 18 +++-- packages/relayer/mock/event_repository.go | 36 ++++++++- packages/relayer/repo/event.go | 19 ++++- packages/relayer/repo/event_test.go | 64 +++++++++++++++- 25 files changed, 445 insertions(+), 99 deletions(-) create mode 100644 packages/relayer/message/get_latest_nonce.go create mode 100644 packages/relayer/message/get_latest_nonce_test.go create mode 100644 packages/relayer/message/is_profitable.go create mode 100644 packages/relayer/message/is_profitable_test.go create mode 100644 packages/relayer/message/wait_for_confirmations.go create mode 100644 packages/relayer/message/wait_for_confirmations_test.go diff --git a/packages/relayer/.golangci.yml b/packages/relayer/.golangci.yml index aab7bf85581..8408925a1d2 100644 --- a/packages/relayer/.golangci.yml +++ b/packages/relayer/.golangci.yml @@ -28,7 +28,7 @@ linters: linters-settings: funlen: - lines: 117 + lines: 123 statements: 50 gocognit: min-complexity: 37 diff --git a/packages/relayer/cli/cli.go b/packages/relayer/cli/cli.go index 68508524382..9a14f230964 100644 --- a/packages/relayer/cli/cli.go +++ b/packages/relayer/cli/cli.go @@ -48,7 +48,13 @@ var ( defaultConfirmations = 15 ) -func Run(mode relayer.Mode, watchMode relayer.WatchMode, layer relayer.Layer, httpOnly relayer.HTTPOnly) { +func Run( + mode relayer.Mode, + watchMode relayer.WatchMode, + layer relayer.Layer, + httpOnly relayer.HTTPOnly, + profitableOnly relayer.ProfitableOnly, +) { if err := loadAndValidateEnv(); err != nil { log.Fatal(err) } @@ -95,7 +101,7 @@ func Run(mode relayer.Mode, watchMode relayer.WatchMode, layer relayer.Layer, ht }() if !httpOnly { - indexers, closeFunc, err := makeIndexers(layer, db) + indexers, closeFunc, err := makeIndexers(layer, db, profitableOnly) if err != nil { sqlDB.Close() log.Fatal(err) @@ -116,7 +122,11 @@ func Run(mode relayer.Mode, watchMode relayer.WatchMode, layer relayer.Layer, ht <-forever } -func makeIndexers(layer relayer.Layer, db relayer.DB) ([]*indexer.Service, func(), error) { +func makeIndexers( + layer relayer.Layer, + db relayer.DB, + profitableOnly relayer.ProfitableOnly, +) ([]*indexer.Service, func(), error) { eventRepository, err := repo.NewEventRepository(db) if err != nil { return nil, nil, err @@ -192,6 +202,7 @@ func makeIndexers(layer relayer.Layer, db relayer.DB) ([]*indexer.Service, func( NumGoroutines: numGoroutines, SubscriptionBackoff: subscriptionBackoff, Confirmations: uint64(confirmations), + ProfitableOnly: profitableOnly, }) if err != nil { log.Fatal(err) @@ -218,6 +229,7 @@ func makeIndexers(layer relayer.Layer, db relayer.DB) ([]*indexer.Service, func( NumGoroutines: numGoroutines, SubscriptionBackoff: subscriptionBackoff, Confirmations: uint64(confirmations), + ProfitableOnly: profitableOnly, }) if err != nil { log.Fatal(err) diff --git a/packages/relayer/cli/cli_test.go b/packages/relayer/cli/cli_test.go index 55a3769c7da..ecb41afb718 100644 --- a/packages/relayer/cli/cli_test.go +++ b/packages/relayer/cli/cli_test.go @@ -203,7 +203,7 @@ func Test_makeIndexers(t *testing.T) { defer reset() } - indexers, cancel, err := makeIndexers(tt.layer, tt.dbFunc(t)) + indexers, cancel, err := makeIndexers(tt.layer, tt.dbFunc(t), relayer.ProfitableOnly(true)) if cancel != nil { defer cancel() } diff --git a/packages/relayer/cmd/main.go b/packages/relayer/cmd/main.go index 8b5b85dc464..e83c3888da3 100644 --- a/packages/relayer/cmd/main.go +++ b/packages/relayer/cmd/main.go @@ -36,6 +36,12 @@ func main() { false: run an http server and index blocks `) + profitableOnlyPtr := flag.Bool("profitable-only", false, `only process profitable transactions. + options: + true: + false: + `) + flag.Parse() if !relayer.IsInSlice(relayer.Mode(*modePtr), relayer.Modes) { @@ -51,5 +57,6 @@ func main() { relayer.WatchMode(*watchModePtr), relayer.Layer(*layersPtr), relayer.HTTPOnly(*httpOnlyPtr), + relayer.ProfitableOnly(*profitableOnlyPtr), ) } diff --git a/packages/relayer/errors.go b/packages/relayer/errors.go index db7911e06b4..934e32c822a 100644 --- a/packages/relayer/errors.go +++ b/packages/relayer/errors.go @@ -21,5 +21,10 @@ var ( "ERR_INVALID_CONFIRMATIONS", "Confirmations amount is invalid, must be numerical and > 0", ) - ErrInvalidMode = errors.Validation.NewWithKeyAndDetail("ERR_INVALID_MODE", "Mode not supported") + ErrInvalidMode = errors.Validation.NewWithKeyAndDetail("ERR_INVALID_MODE", "Mode not supported") + ErrUnprofitable = errors.Validation.NewWithKeyAndDetail("ERR_UNPROFITABLE", "Transaction is unprofitable to process") + ErrNotReceived = errors.BadRequest.NewWithKeyAndDetail( + "ERR_NOT_RECEIVED", + "Message not received on destination chain", + ) ) diff --git a/packages/relayer/event.go b/packages/relayer/event.go index 374a679ee94..96bda10f9a2 100644 --- a/packages/relayer/event.go +++ b/packages/relayer/event.go @@ -51,5 +51,13 @@ type SaveEventOpts struct { type EventRepository interface { Save(ctx context.Context, opts SaveEventOpts) (*Event, error) UpdateStatus(ctx context.Context, id int, status EventStatus) error - FindAllByAddress(ctx context.Context, chainID *big.Int, address common.Address) ([]*Event, error) + FindAllByAddressAndChainID( + ctx context.Context, + chainID *big.Int, + address common.Address, + ) ([]*Event, error) + FindAllByAddress( + ctx context.Context, + address common.Address, + ) ([]*Event, error) } diff --git a/packages/relayer/flags.go b/packages/relayer/flags.go index 89378354b65..59cebcb1117 100644 --- a/packages/relayer/flags.go +++ b/packages/relayer/flags.go @@ -27,3 +27,5 @@ var ( ) type HTTPOnly bool + +type ProfitableOnly bool diff --git a/packages/relayer/http/get_events_by_address.go b/packages/relayer/http/get_events_by_address.go index 580793e910b..ada66ed007d 100644 --- a/packages/relayer/http/get_events_by_address.go +++ b/packages/relayer/http/get_events_by_address.go @@ -1,7 +1,6 @@ package http import ( - "errors" "html" "math/big" "net/http" @@ -9,17 +8,31 @@ import ( "github.com/cyberhorsey/webutils" "github.com/ethereum/go-ethereum/common" "github.com/labstack/echo/v4" + "github.com/taikoxyz/taiko-mono/packages/relayer" ) func (srv *Server) GetEventsByAddress(c echo.Context) error { chainID, ok := new(big.Int).SetString(c.QueryParam("chainID"), 10) - if !ok { - return webutils.LogAndRenderErrors(c, http.StatusUnprocessableEntity, errors.New("invalid chain id")) - } address := html.EscapeString(c.QueryParam("address")) - events, err := srv.eventRepo.FindAllByAddress(c.Request().Context(), chainID, common.HexToAddress(address)) + var events []*relayer.Event + + var err error + + if ok { + events, err = srv.eventRepo.FindAllByAddressAndChainID( + c.Request().Context(), + chainID, + common.HexToAddress(address), + ) + } else { + events, err = srv.eventRepo.FindAllByAddress( + c.Request().Context(), + common.HexToAddress(address), + ) + } + if err != nil { return webutils.LogAndRenderErrors(c, http.StatusUnprocessableEntity, err) } diff --git a/packages/relayer/http/get_events_by_address_test.go b/packages/relayer/http/get_events_by_address_test.go index 4121dc674aa..fa93b8b6071 100644 --- a/packages/relayer/http/get_events_by_address_test.go +++ b/packages/relayer/http/get_events_by_address_test.go @@ -48,6 +48,14 @@ func Test_GetEventsByAddress(t *testing.T) { []string{`[{"id":780800018316137516,"name":"name", "data":{"Owner":"0x0000000000000000000000000000000000000123"},"status":0,"chainID":167001}]`}, }, + { + "successNoChainID", + "0x0000000000000000000000000000000000000123", + "", + http.StatusOK, + []string{`[{"id":780800018316137516,"name":"name", + "data":{"Owner":"0x0000000000000000000000000000000000000123"},"status":0,"chainID":167001}]`}, + }, } for _, tt := range tests { diff --git a/packages/relayer/indexer/filter_then_subscribe.go b/packages/relayer/indexer/filter_then_subscribe.go index 208b1cd4903..9fb5dbc05f4 100644 --- a/packages/relayer/indexer/filter_then_subscribe.go +++ b/packages/relayer/indexer/filter_then_subscribe.go @@ -42,11 +42,12 @@ func (svc *Service) FilterThenSubscribe( } if svc.processingBlockHeight == header.Number.Uint64() { - log.Info("caught up, subscribing to new incoming events") + log.Infof("chain ID %v caught up, subscribing to new incoming events", chainID.Uint64()) return svc.subscribe(ctx, chainID) } - log.Infof("getting events between %v and %v in batches of %v", + log.Infof("chain ID %v getting events between %v and %v in batches of %v", + chainID.Uint64(), svc.processingBlockHeight, header.Number.Int64(), svc.blockBatchSize, @@ -111,7 +112,10 @@ func (svc *Service) FilterThenSubscribe( } } - log.Info("indexer fully caught up, checking latest block number to see if it's advanced") + log.Infof( + "chain id %v indexer fully caught up, checking latest block number to see if it's advanced", + chainID.Uint64(), + ) latestBlock, err := svc.ethClient.HeaderByNumber(ctx, nil) if err != nil { diff --git a/packages/relayer/indexer/service.go b/packages/relayer/indexer/service.go index ea313f9c47e..aa69bd1227d 100644 --- a/packages/relayer/indexer/service.go +++ b/packages/relayer/indexer/service.go @@ -65,6 +65,7 @@ type NewServiceOpts struct { NumGoroutines int SubscriptionBackoff time.Duration Confirmations uint64 + ProfitableOnly relayer.ProfitableOnly } func NewService(opts NewServiceOpts) (*Service, error) { @@ -153,6 +154,7 @@ func NewService(opts NewServiceOpts) (*Service, error) { RelayerAddress: relayerAddr, Confirmations: opts.Confirmations, SrcETHClient: opts.EthClient, + ProfitableOnly: opts.ProfitableOnly, }) if err != nil { return nil, errors.Wrap(err, "message.NewProcessor") diff --git a/packages/relayer/message/get_latest_nonce.go b/packages/relayer/message/get_latest_nonce.go new file mode 100644 index 00000000000..8dbba0bddef --- /dev/null +++ b/packages/relayer/message/get_latest_nonce.go @@ -0,0 +1,23 @@ +package message + +import ( + "context" + "math/big" + + "github.com/ethereum/go-ethereum/accounts/abi/bind" +) + +func (p *Processor) getLatestNonce(ctx context.Context, auth *bind.TransactOpts) error { + pendingNonce, err := p.destEthClient.PendingNonceAt(ctx, p.relayerAddr) + if err != nil { + return err + } + + if pendingNonce > p.destNonce { + p.setLatestNonce(pendingNonce) + } + + auth.Nonce = big.NewInt(int64(p.destNonce)) + + return nil +} diff --git a/packages/relayer/message/get_latest_nonce_test.go b/packages/relayer/message/get_latest_nonce_test.go new file mode 100644 index 00000000000..720ddbd316b --- /dev/null +++ b/packages/relayer/message/get_latest_nonce_test.go @@ -0,0 +1,19 @@ +package message + +import ( + "context" + "testing" + + "github.com/ethereum/go-ethereum/accounts/abi/bind" + "github.com/stretchr/testify/assert" + "github.com/taikoxyz/taiko-mono/packages/relayer/mock" +) + +func Test_getLatestNonce(t *testing.T) { + p := newTestProcessor(true) + + err := p.getLatestNonce(context.Background(), &bind.TransactOpts{}) + assert.Nil(t, err) + + assert.Equal(t, p.destNonce, mock.PendingNonce) +} diff --git a/packages/relayer/message/is_profitable.go b/packages/relayer/message/is_profitable.go new file mode 100644 index 00000000000..49c29fcd16c --- /dev/null +++ b/packages/relayer/message/is_profitable.go @@ -0,0 +1,41 @@ +package message + +import ( + "context" + "math/big" + + "github.com/ethereum/go-ethereum/accounts/abi/bind" + "github.com/pkg/errors" + "github.com/taikoxyz/taiko-mono/packages/relayer/contracts" +) + +func (p *Processor) isProfitable(ctx context.Context, message contracts.IBridgeMessage, proof []byte) (bool, error) { + processingFee := message.ProcessingFee + + if processingFee == nil || processingFee.Cmp(big.NewInt(0)) != 1 { + return false, nil + } + + auth, err := bind.NewKeyedTransactorWithChainID(p.ecdsaKey, message.DestChainId) + if err != nil { + return false, errors.Wrap(err, "bind.NewKeyedTransactorWithChainID") + } + + auth.NoSend = true + + auth.Context = ctx + + // estimate gas with auth.NoSend set to true + tx, err := p.destBridge.ProcessMessage(auth, message, proof) + if err != nil { + return false, errors.Wrap(err, "p.destBridge.ProcessMessage") + } + + cost := tx.Cost() + + if processingFee.Cmp(cost) != 1 { + return false, nil + } + + return true, nil +} diff --git a/packages/relayer/message/is_profitable_test.go b/packages/relayer/message/is_profitable_test.go new file mode 100644 index 00000000000..23c9855244f --- /dev/null +++ b/packages/relayer/message/is_profitable_test.go @@ -0,0 +1,73 @@ +package message + +import ( + "context" + "math/big" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/taikoxyz/taiko-mono/packages/relayer/contracts" + "github.com/taikoxyz/taiko-mono/packages/relayer/mock" +) + +func Test_isProfitable(t *testing.T) { + p := newTestProcessor(true) + + tests := []struct { + name string + message contracts.IBridgeMessage + proof []byte + wantProfitable bool + wantErr error + }{ + { + "zeroProcessingFee", + contracts.IBridgeMessage{ + ProcessingFee: big.NewInt(0), + }, + nil, + false, + nil, + }, + { + "nilProcessingFee", + contracts.IBridgeMessage{}, + nil, + false, + nil, + }, + { + "lowProcessingFee", + contracts.IBridgeMessage{ + ProcessingFee: new(big.Int).Sub(mock.ProcessMessageTx.Cost(), big.NewInt(1)), + DestChainId: big.NewInt(167001), + }, + nil, + false, + nil, + }, + { + "profitableProcessingFee", + contracts.IBridgeMessage{ + ProcessingFee: new(big.Int).Add(mock.ProcessMessageTx.Cost(), big.NewInt(1)), + DestChainId: big.NewInt(167001), + }, + nil, + true, + nil, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + profitable, err := p.isProfitable( + context.Background(), + tt.message, + tt.proof, + ) + + assert.Equal(t, tt.wantProfitable, profitable) + assert.Equal(t, tt.wantErr, err) + }) + } +} diff --git a/packages/relayer/message/process_message.go b/packages/relayer/message/process_message.go index abfd6e5fa73..0e06a94d232 100644 --- a/packages/relayer/message/process_message.go +++ b/packages/relayer/message/process_message.go @@ -3,8 +3,6 @@ package message import ( "context" "encoding/hex" - "math/big" - "time" "github.com/ethereum/go-ethereum/accounts/abi/bind" "github.com/ethereum/go-ethereum/common" @@ -26,9 +24,6 @@ func (p *Processor) ProcessMessage( event *contracts.BridgeMessageSent, e *relayer.Event, ) error { - log.Infof("processing message for signal: %v", common.Hash(event.Signal).Hex()) - - // TODO: if relayer can not process, save this to DB with status Unprocessable if event.Message.GasLimit == nil || event.Message.GasLimit.Cmp(common.Big0) == 0 { return errors.New("only user can process this, gasLimit set to 0") } @@ -72,8 +67,7 @@ func (p *Processor) ProcessMessage( return errors.Wrap(err, "p.destBridge.IsMessageReceived") } - // message will fail when we try to process is - // TODO: update status in db + // message will fail when we try to process it if !received { return errors.New("message not received") } @@ -119,10 +113,6 @@ func (p *Processor) sendProcessMessageCall( auth.Context = ctx - // uncomment to skip `eth_estimateGas` - auth.GasLimit = 2000000 - auth.GasPrice = new(big.Int).SetUint64(500000000) - p.mu.Lock() defer p.mu.Unlock() @@ -130,6 +120,18 @@ func (p *Processor) sendProcessMessageCall( if err != nil { return nil, errors.New("p.getLatestNonce") } + + if p.profitableOnly { + profitable, err := p.isProfitable(ctx, event.Message, proof) + if err != nil { + return nil, errors.Wrap(err, "p.isProfitable") + } + + if !profitable { + return nil, relayer.ErrUnprofitable + } + } + // process the message on the destination bridge. tx, err := p.destBridge.ProcessMessage(auth, event.Message, proof) if err != nil { @@ -144,36 +146,3 @@ func (p *Processor) sendProcessMessageCall( func (p *Processor) setLatestNonce(nonce uint64) { p.destNonce = nonce } - -func (p *Processor) getLatestNonce(ctx context.Context, auth *bind.TransactOpts) error { - pendingNonce, err := p.destEthClient.PendingNonceAt(ctx, p.relayerAddr) - if err != nil { - return err - } - - if pendingNonce > p.destNonce { - p.setLatestNonce(pendingNonce) - } - - auth.Nonce = big.NewInt(int64(p.destNonce)) - - return nil -} - -func (p *Processor) waitForConfirmations(ctx context.Context, txHash common.Hash, blockNumber uint64) error { - // TODO: make timeout a config var - ctx, cancelFunc := context.WithTimeout(ctx, 2*time.Minute) - - defer cancelFunc() - - if err := relayer.WaitConfirmations( - ctx, - p.srcEthClient, - p.confirmations, - txHash, - ); err != nil { - return errors.Wrap(err, "relayer.WaitConfirmations") - } - - return nil -} diff --git a/packages/relayer/message/process_message_test.go b/packages/relayer/message/process_message_test.go index f65732b9731..79c445e00b9 100644 --- a/packages/relayer/message/process_message_test.go +++ b/packages/relayer/message/process_message_test.go @@ -5,7 +5,6 @@ import ( "math/big" "testing" - "github.com/ethereum/go-ethereum/accounts/abi/bind" "github.com/pkg/errors" "github.com/stretchr/testify/assert" "github.com/taikoxyz/taiko-mono/packages/relayer" @@ -13,30 +12,15 @@ import ( "github.com/taikoxyz/taiko-mono/packages/relayer/mock" ) -func Test_getLatestNonce(t *testing.T) { - p := newTestProcessor() - - err := p.getLatestNonce(context.Background(), &bind.TransactOpts{}) - assert.Nil(t, err) - - assert.Equal(t, p.destNonce, mock.PendingNonce) -} - -func Test_waitForConfirmations(t *testing.T) { - p := newTestProcessor() - - err := p.waitForConfirmations(context.TODO(), mock.SucceedTxHash, uint64(mock.BlockNum)) - assert.Nil(t, err) -} - func Test_sendProcessMessageCall(t *testing.T) { - p := newTestProcessor() + p := newTestProcessor(true) _, err := p.sendProcessMessageCall( context.Background(), &contracts.BridgeMessageSent{ Message: contracts.IBridgeMessage{ - DestChainId: mock.MockChainID, + DestChainId: mock.MockChainID, + ProcessingFee: new(big.Int).Add(mock.ProcessMessageTx.Cost(), big.NewInt(1)), }, }, []byte{}) @@ -46,7 +30,7 @@ func Test_sendProcessMessageCall(t *testing.T) { } func Test_ProcessMessage_messageNotReceived(t *testing.T) { - p := newTestProcessor() + p := newTestProcessor(true) err := p.ProcessMessage(context.Background(), &contracts.BridgeMessageSent{ Message: contracts.IBridgeMessage{ @@ -57,14 +41,14 @@ func Test_ProcessMessage_messageNotReceived(t *testing.T) { } func Test_ProcessMessage_gasLimit0(t *testing.T) { - p := newTestProcessor() + p := newTestProcessor(true) err := p.ProcessMessage(context.Background(), &contracts.BridgeMessageSent{}, &relayer.Event{}) assert.EqualError(t, errors.New("only user can process this, gasLimit set to 0"), err.Error()) } func Test_ProcessMessage_noChainId(t *testing.T) { - p := newTestProcessor() + p := newTestProcessor(true) err := p.ProcessMessage(context.Background(), &contracts.BridgeMessageSent{ Message: contracts.IBridgeMessage{ @@ -76,7 +60,25 @@ func Test_ProcessMessage_noChainId(t *testing.T) { } func Test_ProcessMessage(t *testing.T) { - p := newTestProcessor() + p := newTestProcessor(true) + + err := p.ProcessMessage(context.Background(), &contracts.BridgeMessageSent{ + Message: contracts.IBridgeMessage{ + GasLimit: big.NewInt(1), + DestChainId: mock.MockChainID, + ProcessingFee: big.NewInt(1000000000), + }, + Signal: mock.SuccessSignal, + }, &relayer.Event{}) + + assert.Nil( + t, + err, + ) +} + +func Test_ProcessMessage_unprofitable(t *testing.T) { + p := newTestProcessor(true) err := p.ProcessMessage(context.Background(), &contracts.BridgeMessageSent{ Message: contracts.IBridgeMessage{ @@ -86,8 +88,9 @@ func Test_ProcessMessage(t *testing.T) { Signal: mock.SuccessSignal, }, &relayer.Event{}) - assert.Nil( + assert.EqualError( t, err, + "p.sendProcessMessageCall: "+relayer.ErrUnprofitable.Error(), ) } diff --git a/packages/relayer/message/processor.go b/packages/relayer/message/processor.go index 697251e5a55..f3d472a493e 100644 --- a/packages/relayer/message/processor.go +++ b/packages/relayer/message/processor.go @@ -34,6 +34,8 @@ type Processor struct { destNonce uint64 relayerAddr common.Address confirmations uint64 + + profitableOnly relayer.ProfitableOnly } type NewProcessorOpts struct { @@ -47,6 +49,7 @@ type NewProcessorOpts struct { DestHeaderSyncer relayer.HeaderSyncer RelayerAddress common.Address Confirmations uint64 + ProfitableOnly relayer.ProfitableOnly } func NewProcessor(opts NewProcessorOpts) (*Processor, error) { @@ -103,5 +106,7 @@ func NewProcessor(opts NewProcessorOpts) (*Processor, error) { destNonce: 0, relayerAddr: opts.RelayerAddress, confirmations: opts.Confirmations, + + profitableOnly: opts.ProfitableOnly, }, nil } diff --git a/packages/relayer/message/processor_test.go b/packages/relayer/message/processor_test.go index 120450d57bc..1125a050aef 100644 --- a/packages/relayer/message/processor_test.go +++ b/packages/relayer/message/processor_test.go @@ -18,7 +18,7 @@ import ( var dummyEcdsaKey = "8da4ef21b864d2cc526dbdb2a120bd2874c36c9d0a1fb7f8c63d7f7a8b41de8f" -func newTestProcessor() *Processor { +func newTestProcessor(profitableOnly relayer.ProfitableOnly) *Processor { privateKey, _ := crypto.HexToECDSA(dummyEcdsaKey) prover, _ := proof.New( @@ -35,6 +35,7 @@ func newTestProcessor() *Processor { destHeaderSyncer: &mock.HeaderSyncer{}, prover: prover, rpc: &mock.Caller{}, + profitableOnly: profitableOnly, } } func Test_NewProcessor(t *testing.T) { diff --git a/packages/relayer/message/wait_for_confirmations.go b/packages/relayer/message/wait_for_confirmations.go new file mode 100644 index 00000000000..f27333df304 --- /dev/null +++ b/packages/relayer/message/wait_for_confirmations.go @@ -0,0 +1,28 @@ +package message + +import ( + "context" + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/pkg/errors" + "github.com/taikoxyz/taiko-mono/packages/relayer" +) + +func (p *Processor) waitForConfirmations(ctx context.Context, txHash common.Hash, blockNumber uint64) error { + // TODO: make timeout a config var + ctx, cancelFunc := context.WithTimeout(ctx, 2*time.Minute) + + defer cancelFunc() + + if err := relayer.WaitConfirmations( + ctx, + p.srcEthClient, + p.confirmations, + txHash, + ); err != nil { + return errors.Wrap(err, "relayer.WaitConfirmations") + } + + return nil +} diff --git a/packages/relayer/message/wait_for_confirmations_test.go b/packages/relayer/message/wait_for_confirmations_test.go new file mode 100644 index 00000000000..6b8f9a0e1be --- /dev/null +++ b/packages/relayer/message/wait_for_confirmations_test.go @@ -0,0 +1,16 @@ +package message + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/taikoxyz/taiko-mono/packages/relayer/mock" +) + +func Test_waitForConfirmations(t *testing.T) { + p := newTestProcessor(true) + + err := p.waitForConfirmations(context.TODO(), mock.SucceedTxHash, uint64(mock.BlockNum)) + assert.Nil(t, err) +} diff --git a/packages/relayer/mock/bridge.go b/packages/relayer/mock/bridge.go index 414d587009f..16c7c3357c7 100644 --- a/packages/relayer/mock/bridge.go +++ b/packages/relayer/mock/bridge.go @@ -20,6 +20,15 @@ var ( var dummyAddress = "0x63FaC9201494f0bd17B9892B9fae4d52fe3BD377" +var ProcessMessageTx = types.NewTransaction( + PendingNonce, + common.HexToAddress(dummyAddress), + big.NewInt(1), + 100, + big.NewInt(10), + nil, +) + type Bridge struct { MessagesSent int ErrorsSent int @@ -84,14 +93,7 @@ func (b *Bridge) ProcessMessage( message contracts.IBridgeMessage, proof []byte, ) (*types.Transaction, error) { - return types.NewTransaction( - PendingNonce, - common.HexToAddress(dummyAddress), - big.NewInt(1), - 100, - big.NewInt(10), - nil, - ), nil + return ProcessMessageTx, nil } func (b *Bridge) IsMessageReceived(opts *bind.CallOpts, signal [32]byte, srcChainId *big.Int, proof []byte) (bool, error) { // nolint diff --git a/packages/relayer/mock/event_repository.go b/packages/relayer/mock/event_repository.go index 3145a81497e..b0dd7aa4162 100644 --- a/packages/relayer/mock/event_repository.go +++ b/packages/relayer/mock/event_repository.go @@ -57,7 +57,7 @@ func (r *EventRepository) UpdateStatus(ctx context.Context, id int, status relay return nil } -func (r *EventRepository) FindAllByAddress( +func (r *EventRepository) FindAllByAddressAndChainID( ctx context.Context, chainID *big.Int, address common.Address, @@ -68,6 +68,40 @@ func (r *EventRepository) FindAllByAddress( events := make([]*relayer.Event, 0) + for _, e := range r.events { + if e.ChainID != int64(chainID.Uint64()) { + continue + } + + m, err := e.Data.MarshalJSON() + if err != nil { + return nil, err + } + + data := &d{} + if err := json.Unmarshal(m, data); err != nil { + return nil, err + } + + if data.Owner == address.Hex() { + events = append(events, e) + break + } + } + + return events, nil +} + +func (r *EventRepository) FindAllByAddress( + ctx context.Context, + address common.Address, +) ([]*relayer.Event, error) { + type d struct { + Owner string `json:"Owner"` + } + + events := make([]*relayer.Event, 0) + for _, e := range r.events { m, err := e.Data.MarshalJSON() if err != nil { diff --git a/packages/relayer/repo/event.go b/packages/relayer/repo/event.go index 13eedbc4834..58bb8c163a6 100644 --- a/packages/relayer/repo/event.go +++ b/packages/relayer/repo/event.go @@ -3,6 +3,7 @@ package repo import ( "context" "math/big" + "strings" "github.com/ethereum/go-ethereum/common" "github.com/pkg/errors" @@ -52,7 +53,7 @@ func (r *EventRepository) UpdateStatus(ctx context.Context, id int, status relay return nil } -func (r *EventRepository) FindAllByAddress( +func (r *EventRepository) FindAllByAddressAndChainID( ctx context.Context, chainID *big.Int, address common.Address, @@ -60,7 +61,21 @@ func (r *EventRepository) FindAllByAddress( e := make([]*relayer.Event, 0) if err := r.db.GormDB().Where("chain_id = ?", chainID.Int64()). Find(&e, datatypes.JSONQuery("data"). - Equals(address.Hex(), "Owner")).Error; err != nil { + Equals(strings.ToLower(address.Hex()), "Message", "Owner")).Error; err != nil { + return nil, errors.Wrap(err, "r.db.Find") + } + + return e, nil +} + +func (r *EventRepository) FindAllByAddress( + ctx context.Context, + address common.Address, +) ([]*relayer.Event, error) { + e := make([]*relayer.Event, 0) + if err := r.db.GormDB(). + Find(&e, datatypes.JSONQuery("data"). + Equals(strings.ToLower(address.Hex()), "Message", "Owner")).Error; err != nil { return nil, errors.Wrap(err, "r.db.Find") } diff --git a/packages/relayer/repo/event_test.go b/packages/relayer/repo/event_test.go index dfdd98f081d..4cb68f197ae 100644 --- a/packages/relayer/repo/event_test.go +++ b/packages/relayer/repo/event_test.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "math/big" + "strings" "testing" "github.com/ethereum/go-ethereum/common" @@ -118,7 +119,7 @@ func TestIntegration_Event_UpdateStatus(t *testing.T) { } } -func TestIntegration_Event_FindAllByAddress(t *testing.T) { +func TestIntegration_Event_FindAllByAddressAndChainID(t *testing.T) { db, close, err := testMysql(t) assert.Equal(t, nil, err) @@ -131,7 +132,7 @@ func TestIntegration_Event_FindAllByAddress(t *testing.T) { _, err = eventRepo.Save(context.Background(), relayer.SaveEventOpts{ Name: "name", - Data: fmt.Sprintf(`{"Owner":"%s"}`, addr.Hex()), + Data: fmt.Sprintf(`{"Message": {"Owner": "%s"}}`, strings.ToLower(addr.Hex())), ChainID: big.NewInt(1), Status: relayer.EventStatusDone, }) @@ -151,7 +152,7 @@ func TestIntegration_Event_FindAllByAddress(t *testing.T) { { ID: 1, Name: "name", - Data: datatypes.JSON([]byte(fmt.Sprintf(`{"Owner": "%s"}`, addr.Hex()))), + Data: datatypes.JSON([]byte(fmt.Sprintf(`{"Message": {"Owner": "%s"}}`, strings.ToLower(addr.Hex())))), ChainID: 1, Status: relayer.EventStatusDone, }, @@ -176,7 +177,62 @@ func TestIntegration_Event_FindAllByAddress(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - resp, err := eventRepo.FindAllByAddress(context.Background(), tt.chainID, tt.address) + resp, err := eventRepo.FindAllByAddressAndChainID(context.Background(), tt.chainID, tt.address) + assert.Equal(t, tt.wantResp, resp) + assert.Equal(t, tt.wantErr, err) + }) + } +} + +func TestIntegration_Event_FindAllByAddress(t *testing.T) { + db, close, err := testMysql(t) + assert.Equal(t, nil, err) + + defer close() + + eventRepo, err := NewEventRepository(db) + assert.Equal(t, nil, err) + + addr := common.HexToAddress("0x71C7656EC7ab88b098defB751B7401B5f6d8976F") + + _, err = eventRepo.Save(context.Background(), relayer.SaveEventOpts{ + Name: "name", + Data: fmt.Sprintf(`{"Message": {"Owner": "%s"}}`, strings.ToLower(addr.Hex())), + ChainID: big.NewInt(1), + Status: relayer.EventStatusDone, + }) + assert.Equal(t, nil, err) + tests := []struct { + name string + address common.Address + wantResp []*relayer.Event + wantErr error + }{ + { + "success", + addr, + []*relayer.Event{ + { + ID: 1, + Name: "name", + Data: datatypes.JSON([]byte(fmt.Sprintf(`{"Message": {"Owner": "%s"}}`, strings.ToLower(addr.Hex())))), + ChainID: 1, + Status: relayer.EventStatusDone, + }, + }, + nil, + }, + { + "noneByAddr", + common.HexToAddress("0x165CD37b4C644C2921454429E7F9358d18A45e14"), + []*relayer.Event{}, + nil, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + resp, err := eventRepo.FindAllByAddress(context.Background(), tt.address) assert.Equal(t, tt.wantResp, resp) assert.Equal(t, tt.wantErr, err) })