Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(relayer): Asynchronous message processing, error handling, nonce management, and indexer folder structuring #259

Merged
merged 24 commits into from
Nov 17, 2022
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
6a67af3
wg/errchan handling, block batch size config param for ndexer, .defau…
cyberhorsey Nov 11, 2022
48d0a2d
mysql conn params + block batch size param
cyberhorsey Nov 11, 2022
fd6f108
wip tests fo indexer + go mod tidy
cyberhorsey Nov 11, 2022
5eb0d61
add gosimple linter
cyberhorsey Nov 11, 2022
6ac57c1
refactor handle_even with cleaner handle methods. test for canProcess…
cyberhorsey Nov 14, 2022
426f5ab
Merge branch 'main' into async_relayer
cyberhorsey Nov 14, 2022
1308298
subscribe return error
cyberhorsey Nov 14, 2022
ffe31d3
merge main
cyberhorsey Nov 15, 2022
3c1c78c
Update packages/relayer/cli/cli.go
cyberhorsey Nov 15, 2022
03583c2
check negative ints for configs
cyberhorsey Nov 15, 2022
e98a16b
Merge branch 'async_relayer' of github.com:taikochain/taiko-mono into…
cyberhorsey Nov 15, 2022
03b1aa3
Update packages/relayer/indexer/watch_errors.go
cyberhorsey Nov 15, 2022
2cae802
Defer mutex unlock in process message
cyberhorsey Nov 15, 2022
50243f0
Merge branch 'async_relayer' of github.com:taikochain/taiko-mono into…
cyberhorsey Nov 15, 2022
e3e6cd1
lint
cyberhorsey Nov 15, 2022
69cf857
waitgroup => errgroup
cyberhorsey Nov 15, 2022
3d0879d
use ResubscribeErr
cyberhorsey Nov 15, 2022
a124290
subscription backoff in seconds
cyberhorsey Nov 15, 2022
70a1d33
Update packages/relayer/indexer/filter_then_subscribe.go
cyberhorsey Nov 15, 2022
4239be3
lint
cyberhorsey Nov 15, 2022
3a4423c
Merge branch 'async_relayer' of github.com:taikochain/taiko-mono into…
cyberhorsey Nov 15, 2022
4e3cc85
merge main + resolve conflicts
cyberhorsey Nov 16, 2022
bf0f4e9
lint
cyberhorsey Nov 16, 2022
ec80d8f
bump lint funlen
cyberhorsey Nov 16, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 10 additions & 4 deletions packages/relayer/.default.env
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,13 @@ MYSQL_PASSWORD=root
MYSQL_DATABASE=relayer
MYSQL_HOST=localhost:3306
RELAYER_ECDSA_KEY=
L1_BRIDGE_ADDRESS=0xa566811E9E63e4F573Df89d5453bB89F239F7e10
L2_BRIDGE_ADDRESS=0xa566811E9E63e4F573Df89d5453bB89F239F7e10
L1_RPC_URL="wss://eth-goerli.g.alchemy.com/v2/bPAA5rQ42Zoo4ts9TYnTB2t0cuc5lf7_"
L2_RPC_URL="wss://rinkeby-light.eth.linkpool.io/ws"
L1_BRIDGE_ADDRESS=0xB12d6112D64B213880Fa53F815aF1F29c91CaCe9
L2_BRIDGE_ADDRESS=0x4eA05A0f7713333AeB4bB73F17aEeFE146CF13E3
L1_TAIKO_ADDRESS=0x9b557777Be33A8A2fE6aF93E017A0d139B439E5D
L2_TAIKO_ADDRESS=0x0027f309f7F94A8Efb6A3DBfb30827f1062803F4
L1_RPC_URL=ws://34.132.67.34:8546
L2_RPC_URL=ws://ws.a1.testnet.taiko.xyz
BLOCK_BATCH_SIZE=2
MYSQL_MAX_IDLE_CONNS=
MYSQL_MAX_OPEN_CONNS=
MYSQL_CONN_MAX_LIFETIME_IN_MS=
1 change: 1 addition & 0 deletions packages/relayer/.golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ linters:
- gofmt
- golint
- gosec
- gosimple
- lll
- whitespace
- wsl
Expand Down
52 changes: 52 additions & 0 deletions packages/relayer/cli/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"context"
"fmt"
"os"
"strconv"
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/rpc"
Expand Down Expand Up @@ -33,6 +35,8 @@ var (
"MYSQL_HOST",
"RELAYER_ECDSA_KEY",
}

defaultBlockBatchSize = 2
)

// TODO: implement `resync` mode to wipe DB and restart from block 0
Expand Down Expand Up @@ -108,6 +112,11 @@ func makeIndexers(layer Layer, db *gorm.DB) ([]*indexer.Service, func(), error)
return nil, nil, err
}

blockBatchSize, err := strconv.Atoi(os.Getenv("BLOCK_BATCH_SIZE"))
if err != nil || blockBatchSize == 0 {
cyberhorsey marked this conversation as resolved.
Show resolved Hide resolved
blockBatchSize = defaultBlockBatchSize
}

indexers := make([]*indexer.Service, 0)

if layer == L1 || layer == Both {
Expand All @@ -123,6 +132,8 @@ func makeIndexers(layer Layer, db *gorm.DB) ([]*indexer.Service, func(), error)
BridgeAddress: common.HexToAddress(os.Getenv("L1_BRIDGE_ADDRESS")),
DestBridgeAddress: common.HexToAddress(os.Getenv("L2_BRIDGE_ADDRESS")),
DestTaikoAddress: common.HexToAddress(os.Getenv("L2_TAIKO_ADDRESS")),

BlockBatchSize: uint64(blockBatchSize),
})
if err != nil {
log.Fatal(err)
Expand All @@ -144,6 +155,8 @@ func makeIndexers(layer Layer, db *gorm.DB) ([]*indexer.Service, func(), error)
BridgeAddress: common.HexToAddress(os.Getenv("L2_BRIDGE_ADDRESS")),
DestBridgeAddress: common.HexToAddress(os.Getenv("L1_BRIDGE_ADDRESS")),
DestTaikoAddress: common.HexToAddress(os.Getenv("L1_TAIKO_ADDRESS")),

BlockBatchSize: uint64(blockBatchSize),
})
if err != nil {
log.Fatal(err)
Expand Down Expand Up @@ -188,6 +201,45 @@ func openDBConnection(opts relayer.DBConnectionOpts) *gorm.DB {
log.Fatal(err)
}

sqlDB, err := db.DB()
if err != nil {
log.Fatal(err)
}

var (
defaultMaxIdleConns = 50
defaultMaxOpenConns = 200
defaultConnMaxLifetime = 10 * time.Second
)

maxIdleConns, err := strconv.Atoi(os.Getenv("MYSQL_MAX_IDLE_CONNS"))
if err != nil || maxIdleConns == 0 {
cyberhorsey marked this conversation as resolved.
Show resolved Hide resolved
maxIdleConns = defaultMaxIdleConns
}

maxOpenConns, err := strconv.Atoi(os.Getenv("MYSQL_MAX_OPEN_CONNS"))
if err != nil || maxOpenConns == 0 {
cyberhorsey marked this conversation as resolved.
Show resolved Hide resolved
maxOpenConns = defaultMaxOpenConns
}

var maxLifetime time.Duration

connMaxLifetime, err := strconv.Atoi(os.Getenv("MYSQL_CONN_MAX_LIFETIME_IN_MS"))
if err != nil || connMaxLifetime == 0 {
cyberhorsey marked this conversation as resolved.
Show resolved Hide resolved
maxLifetime = defaultConnMaxLifetime
} else {
maxLifetime = time.Duration(connMaxLifetime)
}

// SetMaxOpenConns sets the maximum number of open connections to the database.
sqlDB.SetMaxOpenConns(maxOpenConns)

// SetMaxIdleConns sets the maximum number of connections in the idle connection pool.
sqlDB.SetMaxIdleConns(maxIdleConns)

// SetConnMaxLifetime sets the maximum amount of time a connection may be reused.
sqlDB.SetConnMaxLifetime(maxLifetime)

return db
}

Expand Down
199 changes: 15 additions & 184 deletions packages/relayer/indexer/filter_then_subscribe.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,12 @@ package indexer

import (
"context"
"encoding/json"
"math/big"
"sync"

"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
"github.com/taikochain/taiko-mono/packages/relayer"
"github.com/taikochain/taiko-mono/packages/relayer/contracts"
)

var (
Expand All @@ -21,6 +18,8 @@ var (
// up to the latest block. As it goes, it tries to process messages.
// When it catches up, it then starts to Subscribe to latest events as they come in.
func (svc *Service) FilterThenSubscribe(ctx context.Context) error {
go svc.watchErrors()

chainID, err := svc.ethClient.ChainID(ctx)
if err != nil {
return errors.Wrap(err, "s.ethClient.ChainID()")
Expand All @@ -35,38 +34,26 @@ func (svc *Service) FilterThenSubscribe(ctx context.Context) error {
return errors.Wrap(err, "s.blockRepo.GetLatestBlock()")
}

log.Infof("latest processed block: %v", latestProcessedBlock.Height)

if err != nil {
return errors.Wrap(err, "bridge.FilterMessageSent")
}

header, err := svc.ethClient.HeaderByNumber(ctx, nil)
if err != nil {
return errors.Wrap(err, "s.ethClient.HeaderByNumber")
}

// if we have already done the latest block, exit early
// TODO: call SubscribeMessageSent, as we can now just watch the chain for new blocks
// if we have already done the latest block, subscribe to new changes
if latestProcessedBlock.Height == header.Number.Uint64() {
return svc.subscribe(ctx, chainID)
}

const batchSize = 1000

svc.processingBlock = latestProcessedBlock

log.Infof("getting events between %v and %v in batches of %v",
svc.processingBlock.Height,
header.Number.Int64(),
batchSize,
svc.blockBatchSize,
)

// todo: parallelize/concurrently catch up. don't think we need to do this in order.
// use WaitGroup.
// we get a timeout/EOF if we don't batch.
for i := latestProcessedBlock.Height; i < header.Number.Uint64(); i += batchSize {
var end uint64 = svc.processingBlock.Height + batchSize
for i := latestProcessedBlock.Height; i < header.Number.Uint64(); i += svc.blockBatchSize {
var end uint64 = svc.processingBlock.Height + svc.blockBatchSize
// if the end of the batch is greater than the latest block number, set end
// to the latest block number
if end > header.Number.Uint64() {
Expand Down Expand Up @@ -94,12 +81,17 @@ func (svc *Service) FilterThenSubscribe(ctx context.Context) error {

log.Info("found events")

wg := &sync.WaitGroup{}

// TODO: do we want to limit the number of possible goroutines in the waitgroup?
// right now it is dependent on how many events are found in the
// block range. the main concern would be exceeding DB connection pooling limits.
for {
if err := svc.handleEvent(ctx, chainID, events.Event); err != nil {
return errors.Wrap(err, "svc.handleEvent")
}
go svc.handleEvent(ctx, wg, svc.errChan, chainID, events.Event)

if !events.Next() {
wg.Wait()

if err := svc.handleNoEventsRemaining(ctx, chainID, events); err != nil {
return errors.Wrap(err, "svc.handleNoEventsRemaining")
}
Expand All @@ -122,164 +114,3 @@ func (svc *Service) FilterThenSubscribe(ctx context.Context) error {

return svc.subscribe(ctx, chainID)
}

// subscribe subscribes to latest events
func (svc *Service) subscribe(ctx context.Context, chainID *big.Int) error {
sink := make(chan *contracts.BridgeMessageSent)

sub, err := svc.bridge.WatchMessageSent(&bind.WatchOpts{}, sink, nil)
if err != nil {
return errors.Wrap(err, "svc.bridge.WatchMessageSent")
}

defer sub.Unsubscribe()

for {
select {
case err := <-sub.Err():
return err
case event := <-sink:
if err := svc.handleEvent(ctx, chainID, event); err != nil {
return errors.Wrap(err, "svc.handleEvent")
}
}
}
}

// handleEvent handles an individual MessageSent event
func (svc *Service) handleEvent(ctx context.Context, chainID *big.Int, event *contracts.BridgeMessageSent) error {
log.Infof("event found. signal:%v for block: %v", common.Hash(event.Signal).Hex(), event.Raw.BlockNumber)

marshaled, err := json.Marshal(event)
if err != nil {
return errors.Wrap(err, "json.Marshal(event)")
}

raw := event.Raw

// handle chain re-org by checking Removed property, no need to
// return error, just continue and do not process.
if raw.Removed {
return nil
}

// save event to database for later processing outside
// the indexer
log.Info("saving event to database")

eventStatus := relayer.EventStatusNew
// if gasLimit is 0, relayer can not process this.
if event.Message.GasLimit == nil || event.Message.GasLimit.Cmp(common.Big0) == 0 {
eventStatus = relayer.EventStatusNewOnlyOwner
}

e, err := svc.eventRepo.Save(relayer.SaveEventOpts{
Name: eventName,
Data: string(marshaled),
ChainID: chainID,
Status: eventStatus,
})
if err != nil {
return errors.Wrap(err, "svc.eventRepo.Save")
}

// we can not process, exit early
if eventStatus == relayer.EventStatusNewOnlyOwner && event.Message.Owner != svc.relayerAddr {
log.Infof("gasLimit == 0 and owner is not the current relayer key, can not process. continuing loop")
return nil
}

messageStatus, err := svc.destBridge.GetMessageStatus(nil, event.Signal)
if err != nil {
return errors.Wrap(err, "svc.destBridge.GetMessageStatus")
}

if messageStatus == uint8(relayer.EventStatusNew) {
log.Info("message not processed yet, attempting processing")
// process the message
if err := svc.processor.ProcessMessage(ctx, event, e); err != nil {
// TODO: handle error here, update in eventRepo, continue on in processing
return errors.Wrap(err, "svc.processMessage")
}
}

// if the block number of the event is higher than the block we are processing,
// we can now consider that previous block processed. save it to the DB
// and bump the block number.
if raw.BlockNumber > svc.processingBlock.Height {
log.Info("raw blockNumber > processingBlock.height")
log.Infof("saving new latest processed block to DB: %v", raw.BlockNumber)

if err := svc.blockRepo.Save(relayer.SaveBlockOpts{
Height: svc.processingBlock.Height,
Hash: common.HexToHash(svc.processingBlock.Hash),
ChainID: chainID,
EventName: eventName,
}); err != nil {
return errors.Wrap(err, "svc.blockRepo.Save")
}

svc.processingBlock = &relayer.Block{
Height: raw.BlockNumber,
Hash: raw.BlockHash.Hex(),
}
}

return nil
}

// handleNoEventsRemaining is used when the batch had events, but is now finished, and we need to
// update the latest block processed
func (svc *Service) handleNoEventsRemaining(
ctx context.Context,
chainID *big.Int,
events *contracts.BridgeMessageSentIterator,
) error {
log.Info("no events remaining to be processed")

if events.Error() != nil {
return errors.Wrap(events.Error(), "events.Error")
}

log.Infof("saving new latest processed block to DB: %v", events.Event.Raw.BlockNumber)

if err := svc.blockRepo.Save(relayer.SaveBlockOpts{
Height: events.Event.Raw.BlockNumber,
Hash: events.Event.Raw.BlockHash,
ChainID: chainID,
EventName: eventName,
}); err != nil {
return errors.Wrap(err, "svc.blockRepo.Save")
}

return nil
}

// handleNoEventsInBatch is used when an entire batch call has no events in the entire response,
// and we need to update the latest block processed
func (svc *Service) handleNoEventsInBatch(ctx context.Context, chainID *big.Int, blockNumber int64) error {
log.Infof("no events in batch")

header, err := svc.ethClient.HeaderByNumber(ctx, big.NewInt(blockNumber))
if err != nil {
return errors.Wrap(err, "svc.ethClient.HeaderByNumber")
}

log.Infof("setting last processed block to height: %v, hash: %v", blockNumber, header.Hash().Hex())

if err := svc.blockRepo.Save(relayer.SaveBlockOpts{
Height: uint64(blockNumber),
Hash: header.Hash(),
ChainID: chainID,
EventName: eventName,
}); err != nil {
return errors.Wrap(err, "svc.blockRepo.Save")
}

svc.processingBlock = &relayer.Block{
Height: uint64(blockNumber),
Hash: header.Hash().Hex(),
}

return nil
}
Loading