Skip to content

Commit

Permalink
feat(eventindexer): support multiple swap pairs (#14130)
Browse files Browse the repository at this point in the history
  • Loading branch information
cyberhorsey authored Jul 9, 2023
1 parent 6d7f2e0 commit 2f4a0be
Show file tree
Hide file tree
Showing 6 changed files with 66 additions and 24 deletions.
2 changes: 1 addition & 1 deletion packages/eventindexer/.l2.env
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ MYSQL_MAX_IDLE_CONNS=50
MYSQL_MAX_OPEN_CONNS=3000
MYSQL_CONN_MAX_LIFETIME_IN_MS=100000
PROVER_POOL_ADDRESS=0x7D992599E1B8b4508Ba6E2Ba97893b4C36C23A28
SWAP_ADDRESS=0x501f63210aE6D7Eeb50DaE74DA5Ae407515ee246
SWAP_ADDRESSES=0x501f63210aE6D7Eeb50DaE74DA5Ae407515ee246,0x926815A3fb587DDF5e2d2A03ea235630c0A53a16,0x2223D60359736532958DF6a4E9A5e4A5a71729A1
RPC_URL=wss://ws.test.taiko.xyz
CORS_ORIGINS=*
BLOCK_BATCH_SIZE=1000
Expand Down
14 changes: 13 additions & 1 deletion packages/eventindexer/cli/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ func Run(
RPCClient: rpcClient,
SrcTaikoAddress: common.HexToAddress(os.Getenv("L1_TAIKO_ADDRESS")),
SrcBridgeAddress: common.HexToAddress(os.Getenv("BRIDGE_ADDRESS")),
SrcSwapAddress: common.HexToAddress(os.Getenv("SWAP_ADDRESS")),
SrcSwapAddresses: stringsToAddresses(strings.Split(os.Getenv("SWAP_ADDRESSES"), ",")),
BlockBatchSize: uint64(blockBatchSize),
SubscriptionBackoff: subscriptionBackoff,
})
Expand All @@ -158,6 +158,18 @@ func Run(
<-forever
}

func stringsToAddresses(s []string) []common.Address {
a := []common.Address{}

for _, v := range s {
if v != "" {
a = append(a, common.HexToAddress(v))
}
}

return a
}

func openDBConnection(opts eventindexer.DBConnectionOpts) (eventindexer.DB, error) {
dsn := ""
if opts.Password == "" {
Expand Down
20 changes: 11 additions & 9 deletions packages/eventindexer/indexer/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,16 +70,18 @@ func L2FilterFunc(
svc *Service,
filterOpts *bind.FilterOpts,
) error {
swaps, err := svc.swap.FilterSwap(filterOpts, nil, nil)
if err != nil {
return errors.Wrap(err, "svc.bridge.FilterSwap")
}
for _, s := range svc.swaps {
swaps, err := s.FilterSwap(filterOpts, nil, nil)
if err != nil {
return errors.Wrap(err, "svc.bridge.FilterSwap")
}

// only save ones above 0.01 ETH, this is only for Galaxe
// and we dont care about the rest
err = svc.saveSwapEvents(ctx, chainID, swaps)
if err != nil {
return errors.Wrap(err, "svc.saveSwapEvents")
// only save ones above 0.01 ETH, this is only for Galaxe
// and we dont care about the rest
err = svc.saveSwapEvents(ctx, chainID, swaps)
if err != nil {
return errors.Wrap(err, "svc.saveSwapEvents")
}
}

return nil
Expand Down
22 changes: 22 additions & 0 deletions packages/eventindexer/indexer/save_message_sent_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,18 @@ import (
"encoding/json"
"math/big"

"github.com/ethereum/go-ethereum/common"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
"github.com/taikoxyz/taiko-mono/packages/eventindexer"
"github.com/taikoxyz/taiko-mono/packages/eventindexer/contracts/bridge"
)

var (
minEthAmount = new(big.Int).SetUint64(150000000000000000)
zeroHash = common.HexToHash("0x0000000000000000000000000000000000000000000000000000000000000000")
)

func (svc *Service) saveMessageSentEvents(
ctx context.Context,
chainID *big.Int,
Expand Down Expand Up @@ -43,6 +49,22 @@ func (svc *Service) saveMessageSentEvent(
chainID *big.Int,
event *bridge.BridgeMessageSent,
) error {
// only save eth transfers
if event.Message.Data != nil && common.BytesToHash(event.Message.Data) != zeroHash {
log.Info("skipping message sent event, is not eth transfer")
return nil
}

// amount must be >= 0.15 eth
if event.Message.DepositValue.Cmp(minEthAmount) < 0 {
log.Infof("skipping message sent event, value: %v, requiredValue: %v",
event.Message.DepositValue.String(),
minEthAmount.String(),
)

return nil
}

marshaled, err := json.Marshal(event)
if err != nil {
return errors.Wrap(err, "json.Marshal(event)")
Expand Down
20 changes: 12 additions & 8 deletions packages/eventindexer/indexer/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ type Service struct {

taikol1 *taikol1.TaikoL1
bridge *bridge.Bridge
swap *swap.Swap
swaps []*swap.Swap
}

type NewServiceOpts struct {
Expand All @@ -41,7 +41,7 @@ type NewServiceOpts struct {
RPCClient *rpc.Client
SrcTaikoAddress common.Address
SrcBridgeAddress common.Address
SrcSwapAddress common.Address
SrcSwapAddresses []common.Address
BlockBatchSize uint64
SubscriptionBackoff time.Duration
}
Expand Down Expand Up @@ -79,12 +79,16 @@ func NewService(opts NewServiceOpts) (*Service, error) {
}
}

var swapContract *swap.Swap
var swapContracts []*swap.Swap

if opts.SrcSwapAddress.Hex() != ZeroAddress.Hex() {
swapContract, err = swap.NewSwap(opts.SrcSwapAddress, opts.EthClient)
if err != nil {
return nil, errors.Wrap(err, "contracts.NewBridge")
if opts.SrcSwapAddresses != nil && len(opts.SrcSwapAddresses) > 0 {
for _, v := range opts.SrcSwapAddresses {
swapContract, err := swap.NewSwap(v, opts.EthClient)
if err != nil {
return nil, errors.Wrap(err, "contracts.NewBridge")
}

swapContracts = append(swapContracts, swapContract)
}
}

Expand All @@ -95,7 +99,7 @@ func NewService(opts NewServiceOpts) (*Service, error) {
ethClient: opts.EthClient,
taikol1: taikoL1,
bridge: bridgeContract,
swap: swapContract,
swaps: swapContracts,

blockBatchSize: opts.BlockBatchSize,
subscriptionBackoff: opts.SubscriptionBackoff,
Expand Down
12 changes: 7 additions & 5 deletions packages/eventindexer/indexer/subscribe.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,10 @@ func (svc *Service) subscribe(ctx context.Context, chainID *big.Int) error {
go svc.subscribeMessageSent(ctx, chainID, errChan)
}

if svc.swap != nil {
go svc.subscribeSwap(ctx, chainID, errChan)
if svc.swaps != nil {
for _, swap := range svc.swaps {
go svc.subscribeSwap(ctx, swap, chainID, errChan)
}
}

// nolint: gosimple
Expand Down Expand Up @@ -306,16 +308,16 @@ func (svc *Service) subscribeMessageSent(ctx context.Context, chainID *big.Int,
}
}

func (svc *Service) subscribeSwap(ctx context.Context, chainID *big.Int, errChan chan error) {
func (svc *Service) subscribeSwap(ctx context.Context, s *swap.Swap, chainID *big.Int, errChan chan error) {
sink := make(chan *swap.SwapSwap)

sub := event.ResubscribeErr(svc.subscriptionBackoff, func(ctx context.Context, err error) (event.Subscription, error) {
if err != nil {
log.Errorf("svc.swap.WatchSwap: %v", err)
log.Errorf("s.WatchSwap: %v", err)
}
log.Info("resubscribing to Swap events")

return svc.swap.WatchSwap(&bind.WatchOpts{
return s.WatchSwap(&bind.WatchOpts{
Context: ctx,
}, sink, nil, nil)
})
Expand Down

0 comments on commit 2f4a0be

Please sign in to comment.