Skip to content

Commit

Permalink
mysql + refresh rate
Browse files Browse the repository at this point in the history
  • Loading branch information
nautsimon committed Oct 31, 2023
1 parent 5a2512d commit 90dcf30
Show file tree
Hide file tree
Showing 5 changed files with 107 additions and 97 deletions.
10 changes: 5 additions & 5 deletions services/sinner/service/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,12 @@ type ChainIndexer struct {
fetcher fetcher.ScribeFetcher
// config is the config for the backfiller.
config indexerConfig.ChainConfig
// default refresh rate
refreshRate time.Duration
}

// NewChainIndexer creates a new chain indexer.
func NewChainIndexer(eventDB db.EventDB, parsers Parsers, fetcher fetcher.ScribeFetcher, config indexerConfig.ChainConfig) *ChainIndexer {
func NewChainIndexer(eventDB db.EventDB, parsers Parsers, fetcher fetcher.ScribeFetcher, config indexerConfig.ChainConfig, refreshRate time.Duration) *ChainIndexer {
if config.FetchBlockIncrement < 1 {
config.FetchBlockIncrement = 10000
}

Check warning on line 39 in services/sinner/service/indexer.go

View check run for this annotation

Codecov / codecov/patch

services/sinner/service/indexer.go#L38-L39

Added lines #L38 - L39 were not covered by tests
Expand All @@ -40,6 +42,7 @@ func NewChainIndexer(eventDB db.EventDB, parsers Parsers, fetcher fetcher.Scribe
parsers,
fetcher,
config,
refreshRate,
}
return &chainIndexer
}
Expand Down Expand Up @@ -120,18 +123,15 @@ func (c ChainIndexer) createEventParser(contract indexerConfig.ContractConfig) (

// indexContractEvents indexes all events for a contract.
func (c ChainIndexer) indexContractEvents(contractCtx context.Context, contract indexerConfig.ContractConfig, eventParser types.EventParser) error {
refreshRate := time.Duration(1)

for {
select {
case <-contractCtx.Done():
return fmt.Errorf("could not index contract. Error: %w", contractCtx.Err())
case <-time.After(refreshRate):
case <-time.After(c.refreshRate):
startHeight, endHeight, err := c.fetchBlockRange(contractCtx, contract)
if err != nil {
return err
}

Check warning on line 134 in services/sinner/service/indexer.go

View check run for this annotation

Codecov / codecov/patch

services/sinner/service/indexer.go#L133-L134

Added lines #L133 - L134 were not covered by tests
fmt.Println("SSS")
if err := c.processBlocksInRange(contractCtx, startHeight, endHeight, contract, eventParser); err != nil {
return err
}

Check warning on line 137 in services/sinner/service/indexer.go

View check run for this annotation

Codecov / codecov/patch

services/sinner/service/indexer.go#L136-L137

Added lines #L136 - L137 were not covered by tests
Expand Down
2 changes: 1 addition & 1 deletion services/sinner/service/indexer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func (t *ServiceSuite) TestChainIndexer() {
originParser, err := origin.NewParser(common.HexToAddress(config.Contracts[0].Address), testDB, t.originChainID)
Nil(t.T(), err)
parsers.OriginParser = originParser
chainIndexer := service.NewChainIndexer(testDB, parsers, t.scribeFetcher, config)
chainIndexer := service.NewChainIndexer(testDB, parsers, t.scribeFetcher, config, 1*time.Second)
originEvent := model.OriginSent{}
indexingCtx, cancelIndexing := context.WithCancel(ctx)
go func() {
Expand Down
166 changes: 85 additions & 81 deletions services/sinner/service/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package service_test

import (
"context"
"github.com/synapsecns/sanguine/services/sinner/db"

. "github.com/stretchr/testify/assert"
graphqlModel "github.com/synapsecns/sanguine/services/sinner/graphql/server/graph/model"
Expand All @@ -16,88 +17,91 @@ import (

// TestSinner tests Sinner.
func (t *ServiceSuite) TestSinner() {
// Store test logs and txs
err := t.scribeDB.StoreLogs(t.GetTestContext(), t.originChainID, t.originTestLog)
Nil(t.T(), err)
err = t.scribeDB.StoreEthTx(t.GetTestContext(), t.originTestTx, t.originChainID, t.originTestLog.BlockHash, t.originTestLog.BlockNumber, uint64(t.originTestLog.TxIndex))
Nil(t.T(), err)
err = t.scribeDB.StoreLastIndexed(t.GetTestContext(), t.originTestLog.Address, t.originChainID, 625782, false)
Nil(t.T(), err)

err = t.scribeDB.StoreLogs(t.GetTestContext(), t.destinationChainID, t.destinationTestLog)
Nil(t.T(), err)
err = t.scribeDB.StoreEthTx(t.GetTestContext(), t.destinationTestTx, t.destinationChainID, t.destinationTestLog.BlockHash, t.destinationTestLog.BlockNumber, uint64(t.destinationTestLog.TxIndex))
Nil(t.T(), err)
err = t.scribeDB.StoreLastIndexed(t.GetTestContext(), t.destinationTestLog.Address, t.destinationChainID, 1975780, false)
Nil(t.T(), err)

originConfig := indexerConfig.ChainConfig{
ChainID: t.originChainID,
FetchBlockIncrement: 10,
Contracts: []indexerConfig.ContractConfig{{
ContractType: "origin",
Address: t.originTestLog.Address.String(),
StartBlock: 625780,
}},
}

destinationConfig := indexerConfig.ChainConfig{
ChainID: t.destinationChainID,
FetchBlockIncrement: 10,
Contracts: []indexerConfig.ContractConfig{{
ContractType: "execution_hub",
Address: t.destinationTestLog.Address.String(),
StartBlock: 1975778,
}},
}

config := indexerConfig.Config{
ScribeURL: t.scribeFetcherPath,
DBPath: t.scribeDBPath,
DefaultRefreshRate: 1,
DBType: "sqlite",
SkipMigrations: true,
Chains: []indexerConfig.ChainConfig{originConfig, destinationConfig},
}

sinner, err := service.NewSinner(t.sqlite, config, t.metrics)
Nil(t.T(), err)

originEvent := model.OriginSent{}
destinationEvent := model.Executed{}

indexingCtx, cancelIndexing := context.WithCancel(t.GetTestContext())
go func() {
err = sinner.Index(indexingCtx)
t.RunOnAllDBs(func(testDB db.TestEventDB) {

// Store test logs and txs
err := t.scribeDB.StoreLogs(t.GetTestContext(), t.originChainID, t.originTestLog)
Nil(t.T(), err)
err = t.scribeDB.StoreEthTx(t.GetTestContext(), t.originTestTx, t.originChainID, t.originTestLog.BlockHash, t.originTestLog.BlockNumber, uint64(t.originTestLog.TxIndex))
Nil(t.T(), err)
err = t.scribeDB.StoreLastIndexed(t.GetTestContext(), t.originTestLog.Address, t.originChainID, 625782, false)
Nil(t.T(), err)

err = t.scribeDB.StoreLogs(t.GetTestContext(), t.destinationChainID, t.destinationTestLog)
Nil(t.T(), err)
err = t.scribeDB.StoreEthTx(t.GetTestContext(), t.destinationTestTx, t.destinationChainID, t.destinationTestLog.BlockHash, t.destinationTestLog.BlockNumber, uint64(t.destinationTestLog.TxIndex))
Nil(t.T(), err)
err = t.scribeDB.StoreLastIndexed(t.GetTestContext(), t.destinationTestLog.Address, t.destinationChainID, 1975780, false)
Nil(t.T(), err)

originConfig := indexerConfig.ChainConfig{
ChainID: t.originChainID,
FetchBlockIncrement: 10,
Contracts: []indexerConfig.ContractConfig{{
ContractType: "origin",
Address: t.originTestLog.Address.String(),
StartBlock: 625780,
}},
}

destinationConfig := indexerConfig.ChainConfig{
ChainID: t.destinationChainID,
FetchBlockIncrement: 10,
Contracts: []indexerConfig.ContractConfig{{
ContractType: "execution_hub",
Address: t.destinationTestLog.Address.String(),
StartBlock: 1975778,
}},
}

config := indexerConfig.Config{
ScribeURL: t.scribeFetcherPath,
DBPath: t.scribeDBPath,
DefaultRefreshRate: 1,
DBType: "sqlite",
SkipMigrations: true,
Chains: []indexerConfig.ChainConfig{originConfig, destinationConfig},
}

sinner, err := service.NewSinner(testDB, config, t.metrics)
Nil(t.T(), err)
}()

timeout := 2 * time.Second
go func() {
for {
select {
case <-t.GetTestContext().Done():
return
case <-time.After(timeout):
// check db
t.sqlite.UNSAFE_DB().WithContext(t.GetTestContext()).Find(&model.OriginSent{}).First(&originEvent)
t.sqlite.UNSAFE_DB().WithContext(t.GetTestContext()).Find(&model.Executed{}).First(&destinationEvent)
if len(originEvent.MessageHash) > 0 && len(destinationEvent.MessageHash) > 0 {
// cancel if message stored
cancelIndexing()

originEvent := model.OriginSent{}
destinationEvent := model.Executed{}

indexingCtx, cancelIndexing := context.WithCancel(t.GetTestContext())
go func() {
err = sinner.Index(indexingCtx)
Nil(t.T(), err)
}()

timeout := 2 * time.Second
go func() {
for {
select {
case <-t.GetTestContext().Done():
return
case <-time.After(timeout):
// check db
testDB.UNSAFE_DB().WithContext(t.GetTestContext()).Find(&model.OriginSent{}).First(&originEvent)
testDB.UNSAFE_DB().WithContext(t.GetTestContext()).Find(&model.Executed{}).First(&destinationEvent)
if len(originEvent.MessageHash) > 0 && len(destinationEvent.MessageHash) > 0 {
// cancel if message stored
cancelIndexing()
}
}
}
}
}()
<-indexingCtx.Done()

// Check parity of events
Equal(t.T(), t.originTestLog.TxHash.String(), originEvent.TxHash)
Equal(t.T(), t.destinationTestLog.TxHash.String(), destinationEvent.TxHash)

// Get and check the message status
messageStatus, err := t.sqlite.RetrieveMessageStatus(t.GetTestContext(), originEvent.MessageHash)
Nil(t.T(), err)
Equal(t.T(), t.originTestLog.TxHash.String(), *messageStatus.OriginTxHash)
Equal(t.T(), graphqlModel.MessageStateLastSeenDestination, *messageStatus.LastSeen)
}()
<-indexingCtx.Done()

// Check parity of events
Equal(t.T(), t.originTestLog.TxHash.String(), originEvent.TxHash)
Equal(t.T(), t.destinationTestLog.TxHash.String(), destinationEvent.TxHash)

// Get and check the message status
messageStatus, err := testDB.RetrieveMessageStatus(t.GetTestContext(), originEvent.MessageHash)
Nil(t.T(), err)
Equal(t.T(), t.originTestLog.TxHash.String(), *messageStatus.OriginTxHash)
Equal(t.T(), graphqlModel.MessageStateLastSeenDestination, *messageStatus.LastSeen)
})
}
8 changes: 4 additions & 4 deletions services/sinner/service/sinner.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,10 @@ func NewSinner(eventDB db.EventDB, config indexerConfig.Config, handler metrics.
}

fetcher := fetcherpkg.NewFetcher(gqlClient.NewClient(&httpClient, config.ScribeURL), handler)

refreshRate := time.Duration(config.DefaultRefreshRate) * time.Second
// Initialize each chain backfiller.
for _, chainConfig := range config.Chains {
chainIndexer, err := getChainIndexer(eventDB, chainConfig.ChainID, fetcher, chainConfig)
chainIndexer, err := getChainIndexer(eventDB, chainConfig.ChainID, fetcher, chainConfig, refreshRate)
if err != nil {
return nil, fmt.Errorf("could not get chain indexer: %w", err)
}

Check warning on line 62 in services/sinner/service/sinner.go

View check run for this annotation

Codecov / codecov/patch

services/sinner/service/sinner.go#L61-L62

Added lines #L61 - L62 were not covered by tests
Expand Down Expand Up @@ -119,7 +119,7 @@ func (e Sinner) Index(ctx context.Context) error {
}

// nolint gocognit,cyclop
func getChainIndexer(eventDB db.EventDB, chainID uint32, fetcher fetcherpkg.ScribeFetcher, chainConfig indexerConfig.ChainConfig) (*ChainIndexer, error) {
func getChainIndexer(eventDB db.EventDB, chainID uint32, fetcher fetcherpkg.ScribeFetcher, chainConfig indexerConfig.ChainConfig, refreshRate time.Duration) (*ChainIndexer, error) {
parsers := Parsers{
ChainID: chainID,
}
Expand All @@ -140,7 +140,7 @@ func getChainIndexer(eventDB db.EventDB, chainID uint32, fetcher fetcherpkg.Scri
}
}

chainIndexer := NewChainIndexer(eventDB, parsers, fetcher, chainConfig)
chainIndexer := NewChainIndexer(eventDB, parsers, fetcher, chainConfig, refreshRate)

return chainIndexer, nil
}
18 changes: 12 additions & 6 deletions services/sinner/service/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package service_test
import (
"context"
"database/sql"
"os"

"github.com/ethereum/go-ethereum/common"
"github.com/stretchr/testify/suite"
Expand All @@ -20,7 +21,6 @@ import (

"math/big"
"net/http"
"os"
"sync"
"sync/atomic"
"testing"
Expand Down Expand Up @@ -50,7 +50,6 @@ import (
type ServiceSuite struct {
*testsuite.TestSuite
dbs []db.TestEventDB
sqlite db.TestEventDB
logIndex atomic.Int64
scribeDB scribedb.EventDB
scribeDBPath string
Expand All @@ -77,6 +76,16 @@ func NewEventServiceSuite(tb testing.TB) *ServiceSuite {
}
}

func (t *ServiceSuite) SetupTest() {
t.TestSuite.SetupTest()

sqliteStore, err := sqlite.NewSqliteStore(t.GetTestContext(), filet.TmpDir(t.T(), ""), t.metrics, false)
Nil(t.T(), err)

t.dbs = []db.TestEventDB{sqliteStore}
t.setupMysqlDB()
}

func (t *ServiceSuite) SetupSuite() {
t.TestSuite.SetupSuite()
t.logIndex.Store(0)
Expand All @@ -99,8 +108,6 @@ func (t *ServiceSuite) SetupSuite() {
Nil(t.T(), err)

t.dbs = []db.TestEventDB{sqliteStore}
t.sqlite = sqliteStore
t.setupMysqlDB()
t.originChainID = 421614
t.destinationChainID = 444
t.scribeDB, t.scribeFetcher = t.CreateScribeFetcher(t.GetSuiteContext())
Expand All @@ -110,7 +117,6 @@ func (t *ServiceSuite) SetupSuite() {
}

func (t *ServiceSuite) setupMysqlDB() {
t.T().Helper()
// skip if mysql test disabled, this really only needs to be run in ci
// skip if mysql test disabled
if os.Getenv(dbcommon.EnableMysqlTestVar) == "" {
Expand All @@ -136,7 +142,7 @@ func (t *ServiceSuite) setupMysqlDB() {
mysql.MaxOpenConns = 10

// create the sql store
mysqlStore, err := mysql.NewMysqlStore(t.GetSuiteContext(), connString, t.metrics, false)
mysqlStore, err := mysql.NewMysqlStore(t.GetTestContext(), connString, t.metrics, false)
Nil(t.T(), err)
// add the db
t.dbs = append(t.dbs, mysqlStore)
Expand Down

0 comments on commit 90dcf30

Please sign in to comment.