Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(relayer): Asynchronous message processing, error handling, nonce management, and indexer folder structuring #259

Merged
merged 24 commits into from
Nov 17, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
6a67af3
wg/errchan handling, block batch size config param for ndexer, .defau…
cyberhorsey Nov 11, 2022
48d0a2d
mysql conn params + block batch size param
cyberhorsey Nov 11, 2022
fd6f108
wip tests fo indexer + go mod tidy
cyberhorsey Nov 11, 2022
5eb0d61
add gosimple linter
cyberhorsey Nov 11, 2022
6ac57c1
refactor handle_even with cleaner handle methods. test for canProcess…
cyberhorsey Nov 14, 2022
426f5ab
Merge branch 'main' into async_relayer
cyberhorsey Nov 14, 2022
1308298
subscribe return error
cyberhorsey Nov 14, 2022
ffe31d3
merge main
cyberhorsey Nov 15, 2022
3c1c78c
Update packages/relayer/cli/cli.go
cyberhorsey Nov 15, 2022
03583c2
check negative ints for configs
cyberhorsey Nov 15, 2022
e98a16b
Merge branch 'async_relayer' of github.com:taikochain/taiko-mono into…
cyberhorsey Nov 15, 2022
03b1aa3
Update packages/relayer/indexer/watch_errors.go
cyberhorsey Nov 15, 2022
2cae802
Defer mutex unlock in process message
cyberhorsey Nov 15, 2022
50243f0
Merge branch 'async_relayer' of github.com:taikochain/taiko-mono into…
cyberhorsey Nov 15, 2022
e3e6cd1
lint
cyberhorsey Nov 15, 2022
69cf857
waitgroup => errgroup
cyberhorsey Nov 15, 2022
3d0879d
use ResubscribeErr
cyberhorsey Nov 15, 2022
a124290
subscription backoff in seconds
cyberhorsey Nov 15, 2022
70a1d33
Update packages/relayer/indexer/filter_then_subscribe.go
cyberhorsey Nov 15, 2022
4239be3
lint
cyberhorsey Nov 15, 2022
3a4423c
Merge branch 'async_relayer' of github.com:taikochain/taiko-mono into…
cyberhorsey Nov 15, 2022
4e3cc85
merge main + resolve conflicts
cyberhorsey Nov 16, 2022
bf0f4e9
lint
cyberhorsey Nov 16, 2022
ec80d8f
bump lint funlen
cyberhorsey Nov 16, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ require (
go.opencensus.io v0.23.0 // indirect
golang.org/x/crypto v0.0.0-20220722155217-630584e8d5aa // indirect
golang.org/x/net v0.0.0-20220812174116-3211cb980234 // indirect
golang.org/x/sync v0.1.0 // indirect
golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab // indirect
google.golang.org/genproto v0.0.0-20220617124728-180714bec0ad // indirect
google.golang.org/grpc v1.47.0 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -970,6 +970,8 @@ golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20220601150217-0de741cfad7f h1:Ax0t5p6N38Ga0dThY21weqDEyz2oklo4IvDkpigvkD8=
golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o=
golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
Expand Down
16 changes: 12 additions & 4 deletions packages/relayer/.default.env
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,16 @@ MYSQL_PASSWORD=root
MYSQL_DATABASE=relayer
MYSQL_HOST=localhost:3306
RELAYER_ECDSA_KEY=
L1_BRIDGE_ADDRESS=0xa566811E9E63e4F573Df89d5453bB89F239F7e10
L2_BRIDGE_ADDRESS=0xa566811E9E63e4F573Df89d5453bB89F239F7e10
L1_RPC_URL="wss://eth-goerli.g.alchemy.com/v2/bPAA5rQ42Zoo4ts9TYnTB2t0cuc5lf7_"
L2_RPC_URL="wss://rinkeby-light.eth.linkpool.io/ws"
L1_BRIDGE_ADDRESS=0xB12d6112D64B213880Fa53F815aF1F29c91CaCe9
L2_BRIDGE_ADDRESS=0x4eA05A0f7713333AeB4bB73F17aEeFE146CF13E3
L1_TAIKO_ADDRESS=0x9b557777Be33A8A2fE6aF93E017A0d139B439E5D
L2_TAIKO_ADDRESS=0x0027f309f7F94A8Efb6A3DBfb30827f1062803F4
L1_RPC_URL=ws://34.132.67.34:8546
L2_RPC_URL=ws://ws.a1.testnet.taiko.xyz
BLOCK_BATCH_SIZE=2
MYSQL_MAX_IDLE_CONNS=
MYSQL_MAX_OPEN_CONNS=
MYSQL_CONN_MAX_LIFETIME_IN_MS=
NUM_GOROUTINES=20
SUBSCRIPTION_BACKOFF_IN_SECONDS=3
CONFIRMATIONS_BEFORE_PROCESSING=15
7 changes: 4 additions & 3 deletions packages/relayer/.golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,17 @@ linters:
- gofmt
- golint
- gosec
- gosimple
- lll
- whitespace
- wsl

linters-settings:
funlen:
lines: 105
statements: 45
lines: 116
statements: 48
gocognit:
min-complexity: 32
min-complexity: 35

issues:
exclude-rules:
Expand Down
74 changes: 71 additions & 3 deletions packages/relayer/cli/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"os"
"strconv"
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/rpc"
Expand Down Expand Up @@ -36,7 +37,10 @@ var (
"CONFIRMATIONS_BEFORE_PROCESSING",
}

defaultConfirmations = 15
defaultBlockBatchSize = 2
defaultNumGoroutines = 10
defaultSubscriptionBackoff = 2 * time.Second
defaultConfirmations = 15
)

func Run(mode relayer.Mode, layer relayer.Layer) {
Expand Down Expand Up @@ -111,6 +115,25 @@ func makeIndexers(layer relayer.Layer, db *gorm.DB) ([]*indexer.Service, func(),
return nil, nil, err
}

blockBatchSize, err := strconv.Atoi(os.Getenv("BLOCK_BATCH_SIZE"))
if err != nil || blockBatchSize <= 0 {
blockBatchSize = defaultBlockBatchSize
}

numGoroutines, err := strconv.Atoi(os.Getenv("NUM_GOROUTINES"))
if err != nil || numGoroutines <= 0 {
numGoroutines = defaultNumGoroutines
}

var subscriptionBackoff time.Duration

subscriptionBackoffInSeconds, err := strconv.Atoi(os.Getenv("SUBSCRIPTION_BACKOFF_IN_SECONDS"))
if err != nil || numGoroutines <= 0 {
subscriptionBackoff = defaultSubscriptionBackoff
} else {
subscriptionBackoff = time.Duration(subscriptionBackoffInSeconds) * time.Second
}

confirmations, err := strconv.Atoi(os.Getenv("CONFIRMATIONS_BEFORE_PROCESSING"))
if err != nil || confirmations <= 0 {
confirmations = defaultConfirmations
Expand All @@ -132,7 +155,10 @@ func makeIndexers(layer relayer.Layer, db *gorm.DB) ([]*indexer.Service, func(),
DestBridgeAddress: common.HexToAddress(os.Getenv("L2_BRIDGE_ADDRESS")),
DestTaikoAddress: common.HexToAddress(os.Getenv("L2_TAIKO_ADDRESS")),

Confirmations: uint64(confirmations),
BlockBatchSize: uint64(blockBatchSize),
NumGoroutines: numGoroutines,
SubscriptionBackoff: subscriptionBackoff,
Confirmations: uint64(confirmations),
})
if err != nil {
log.Fatal(err)
Expand All @@ -155,7 +181,10 @@ func makeIndexers(layer relayer.Layer, db *gorm.DB) ([]*indexer.Service, func(),
DestBridgeAddress: common.HexToAddress(os.Getenv("L1_BRIDGE_ADDRESS")),
DestTaikoAddress: common.HexToAddress(os.Getenv("L1_TAIKO_ADDRESS")),

Confirmations: uint64(confirmations),
BlockBatchSize: uint64(blockBatchSize),
NumGoroutines: numGoroutines,
SubscriptionBackoff: subscriptionBackoff,
Confirmations: uint64(confirmations),
})
if err != nil {
log.Fatal(err)
Expand Down Expand Up @@ -200,6 +229,45 @@ func openDBConnection(opts relayer.DBConnectionOpts) *gorm.DB {
log.Fatal(err)
}

sqlDB, err := db.DB()
if err != nil {
log.Fatal(err)
}

var (
defaultMaxIdleConns = 50
defaultMaxOpenConns = 200
defaultConnMaxLifetime = 10 * time.Second
)

maxIdleConns, err := strconv.Atoi(os.Getenv("MYSQL_MAX_IDLE_CONNS"))
if err != nil || maxIdleConns <= 0 {
maxIdleConns = defaultMaxIdleConns
}

maxOpenConns, err := strconv.Atoi(os.Getenv("MYSQL_MAX_OPEN_CONNS"))
if err != nil || maxOpenConns <= 0 {
maxOpenConns = defaultMaxOpenConns
}

var maxLifetime time.Duration

connMaxLifetime, err := strconv.Atoi(os.Getenv("MYSQL_CONN_MAX_LIFETIME_IN_MS"))
if err != nil || connMaxLifetime <= 0 {
maxLifetime = defaultConnMaxLifetime
} else {
maxLifetime = time.Duration(connMaxLifetime)
}

// SetMaxOpenConns sets the maximum number of open connections to the database.
sqlDB.SetMaxOpenConns(maxOpenConns)

// SetMaxIdleConns sets the maximum number of connections in the idle connection pool.
sqlDB.SetMaxIdleConns(maxIdleConns)

// SetConnMaxLifetime sets the maximum amount of time a connection may be reused.
sqlDB.SetConnMaxLifetime(maxLifetime)

return db
}

Expand Down
Loading