diff --git a/services/sinner/service/indexer.go b/services/sinner/service/indexer.go index 662f7264be..727ade4174 100644 --- a/services/sinner/service/indexer.go +++ b/services/sinner/service/indexer.go @@ -28,10 +28,12 @@ type ChainIndexer struct { fetcher fetcher.ScribeFetcher // config is the config for the backfiller. config indexerConfig.ChainConfig + // default refresh rate + refreshRate time.Duration } // NewChainIndexer creates a new chain indexer. -func NewChainIndexer(eventDB db.EventDB, parsers Parsers, fetcher fetcher.ScribeFetcher, config indexerConfig.ChainConfig) *ChainIndexer { +func NewChainIndexer(eventDB db.EventDB, parsers Parsers, fetcher fetcher.ScribeFetcher, config indexerConfig.ChainConfig, refreshRate time.Duration) *ChainIndexer { if config.FetchBlockIncrement < 1 { config.FetchBlockIncrement = 10000 } @@ -40,6 +42,7 @@ func NewChainIndexer(eventDB db.EventDB, parsers Parsers, fetcher fetcher.Scribe parsers, fetcher, config, + refreshRate, } return &chainIndexer } @@ -120,18 +123,15 @@ func (c ChainIndexer) createEventParser(contract indexerConfig.ContractConfig) ( // indexContractEvents indexes all events for a contract. func (c ChainIndexer) indexContractEvents(contractCtx context.Context, contract indexerConfig.ContractConfig, eventParser types.EventParser) error { - refreshRate := time.Duration(1) - for { select { case <-contractCtx.Done(): return fmt.Errorf("could not index contract. Error: %w", contractCtx.Err()) - case <-time.After(refreshRate): + case <-time.After(c.refreshRate): startHeight, endHeight, err := c.fetchBlockRange(contractCtx, contract) if err != nil { return err } - fmt.Println("SSS") if err := c.processBlocksInRange(contractCtx, startHeight, endHeight, contract, eventParser); err != nil { return err } diff --git a/services/sinner/service/indexer_test.go b/services/sinner/service/indexer_test.go index 253a39ac1a..1ecf8ed7e3 100644 --- a/services/sinner/service/indexer_test.go +++ b/services/sinner/service/indexer_test.go @@ -66,7 +66,7 @@ func (t *ServiceSuite) TestChainIndexer() { originParser, err := origin.NewParser(common.HexToAddress(config.Contracts[0].Address), testDB, t.originChainID) Nil(t.T(), err) parsers.OriginParser = originParser - chainIndexer := service.NewChainIndexer(testDB, parsers, t.scribeFetcher, config) + chainIndexer := service.NewChainIndexer(testDB, parsers, t.scribeFetcher, config, 1*time.Second) originEvent := model.OriginSent{} indexingCtx, cancelIndexing := context.WithCancel(ctx) go func() { diff --git a/services/sinner/service/service_test.go b/services/sinner/service/service_test.go index da6ec1a82a..a06671d16a 100644 --- a/services/sinner/service/service_test.go +++ b/services/sinner/service/service_test.go @@ -2,6 +2,7 @@ package service_test import ( "context" + "github.com/synapsecns/sanguine/services/sinner/db" . "github.com/stretchr/testify/assert" graphqlModel "github.com/synapsecns/sanguine/services/sinner/graphql/server/graph/model" @@ -16,88 +17,91 @@ import ( // TestSinner tests Sinner. func (t *ServiceSuite) TestSinner() { - // Store test logs and txs - err := t.scribeDB.StoreLogs(t.GetTestContext(), t.originChainID, t.originTestLog) - Nil(t.T(), err) - err = t.scribeDB.StoreEthTx(t.GetTestContext(), t.originTestTx, t.originChainID, t.originTestLog.BlockHash, t.originTestLog.BlockNumber, uint64(t.originTestLog.TxIndex)) - Nil(t.T(), err) - err = t.scribeDB.StoreLastIndexed(t.GetTestContext(), t.originTestLog.Address, t.originChainID, 625782, false) - Nil(t.T(), err) - - err = t.scribeDB.StoreLogs(t.GetTestContext(), t.destinationChainID, t.destinationTestLog) - Nil(t.T(), err) - err = t.scribeDB.StoreEthTx(t.GetTestContext(), t.destinationTestTx, t.destinationChainID, t.destinationTestLog.BlockHash, t.destinationTestLog.BlockNumber, uint64(t.destinationTestLog.TxIndex)) - Nil(t.T(), err) - err = t.scribeDB.StoreLastIndexed(t.GetTestContext(), t.destinationTestLog.Address, t.destinationChainID, 1975780, false) - Nil(t.T(), err) - - originConfig := indexerConfig.ChainConfig{ - ChainID: t.originChainID, - FetchBlockIncrement: 10, - Contracts: []indexerConfig.ContractConfig{{ - ContractType: "origin", - Address: t.originTestLog.Address.String(), - StartBlock: 625780, - }}, - } - - destinationConfig := indexerConfig.ChainConfig{ - ChainID: t.destinationChainID, - FetchBlockIncrement: 10, - Contracts: []indexerConfig.ContractConfig{{ - ContractType: "execution_hub", - Address: t.destinationTestLog.Address.String(), - StartBlock: 1975778, - }}, - } - - config := indexerConfig.Config{ - ScribeURL: t.scribeFetcherPath, - DBPath: t.scribeDBPath, - DefaultRefreshRate: 1, - DBType: "sqlite", - SkipMigrations: true, - Chains: []indexerConfig.ChainConfig{originConfig, destinationConfig}, - } - - sinner, err := service.NewSinner(t.sqlite, config, t.metrics) - Nil(t.T(), err) - - originEvent := model.OriginSent{} - destinationEvent := model.Executed{} - - indexingCtx, cancelIndexing := context.WithCancel(t.GetTestContext()) - go func() { - err = sinner.Index(indexingCtx) + t.RunOnAllDBs(func(testDB db.TestEventDB) { + + // Store test logs and txs + err := t.scribeDB.StoreLogs(t.GetTestContext(), t.originChainID, t.originTestLog) + Nil(t.T(), err) + err = t.scribeDB.StoreEthTx(t.GetTestContext(), t.originTestTx, t.originChainID, t.originTestLog.BlockHash, t.originTestLog.BlockNumber, uint64(t.originTestLog.TxIndex)) + Nil(t.T(), err) + err = t.scribeDB.StoreLastIndexed(t.GetTestContext(), t.originTestLog.Address, t.originChainID, 625782, false) + Nil(t.T(), err) + + err = t.scribeDB.StoreLogs(t.GetTestContext(), t.destinationChainID, t.destinationTestLog) + Nil(t.T(), err) + err = t.scribeDB.StoreEthTx(t.GetTestContext(), t.destinationTestTx, t.destinationChainID, t.destinationTestLog.BlockHash, t.destinationTestLog.BlockNumber, uint64(t.destinationTestLog.TxIndex)) + Nil(t.T(), err) + err = t.scribeDB.StoreLastIndexed(t.GetTestContext(), t.destinationTestLog.Address, t.destinationChainID, 1975780, false) + Nil(t.T(), err) + + originConfig := indexerConfig.ChainConfig{ + ChainID: t.originChainID, + FetchBlockIncrement: 10, + Contracts: []indexerConfig.ContractConfig{{ + ContractType: "origin", + Address: t.originTestLog.Address.String(), + StartBlock: 625780, + }}, + } + + destinationConfig := indexerConfig.ChainConfig{ + ChainID: t.destinationChainID, + FetchBlockIncrement: 10, + Contracts: []indexerConfig.ContractConfig{{ + ContractType: "execution_hub", + Address: t.destinationTestLog.Address.String(), + StartBlock: 1975778, + }}, + } + + config := indexerConfig.Config{ + ScribeURL: t.scribeFetcherPath, + DBPath: t.scribeDBPath, + DefaultRefreshRate: 1, + DBType: "sqlite", + SkipMigrations: true, + Chains: []indexerConfig.ChainConfig{originConfig, destinationConfig}, + } + + sinner, err := service.NewSinner(testDB, config, t.metrics) Nil(t.T(), err) - }() - - timeout := 2 * time.Second - go func() { - for { - select { - case <-t.GetTestContext().Done(): - return - case <-time.After(timeout): - // check db - t.sqlite.UNSAFE_DB().WithContext(t.GetTestContext()).Find(&model.OriginSent{}).First(&originEvent) - t.sqlite.UNSAFE_DB().WithContext(t.GetTestContext()).Find(&model.Executed{}).First(&destinationEvent) - if len(originEvent.MessageHash) > 0 && len(destinationEvent.MessageHash) > 0 { - // cancel if message stored - cancelIndexing() + + originEvent := model.OriginSent{} + destinationEvent := model.Executed{} + + indexingCtx, cancelIndexing := context.WithCancel(t.GetTestContext()) + go func() { + err = sinner.Index(indexingCtx) + Nil(t.T(), err) + }() + + timeout := 2 * time.Second + go func() { + for { + select { + case <-t.GetTestContext().Done(): + return + case <-time.After(timeout): + // check db + testDB.UNSAFE_DB().WithContext(t.GetTestContext()).Find(&model.OriginSent{}).First(&originEvent) + testDB.UNSAFE_DB().WithContext(t.GetTestContext()).Find(&model.Executed{}).First(&destinationEvent) + if len(originEvent.MessageHash) > 0 && len(destinationEvent.MessageHash) > 0 { + // cancel if message stored + cancelIndexing() + } } } - } - }() - <-indexingCtx.Done() - - // Check parity of events - Equal(t.T(), t.originTestLog.TxHash.String(), originEvent.TxHash) - Equal(t.T(), t.destinationTestLog.TxHash.String(), destinationEvent.TxHash) - - // Get and check the message status - messageStatus, err := t.sqlite.RetrieveMessageStatus(t.GetTestContext(), originEvent.MessageHash) - Nil(t.T(), err) - Equal(t.T(), t.originTestLog.TxHash.String(), *messageStatus.OriginTxHash) - Equal(t.T(), graphqlModel.MessageStateLastSeenDestination, *messageStatus.LastSeen) + }() + <-indexingCtx.Done() + + // Check parity of events + Equal(t.T(), t.originTestLog.TxHash.String(), originEvent.TxHash) + Equal(t.T(), t.destinationTestLog.TxHash.String(), destinationEvent.TxHash) + + // Get and check the message status + messageStatus, err := testDB.RetrieveMessageStatus(t.GetTestContext(), originEvent.MessageHash) + Nil(t.T(), err) + Equal(t.T(), t.originTestLog.TxHash.String(), *messageStatus.OriginTxHash) + Equal(t.T(), graphqlModel.MessageStateLastSeenDestination, *messageStatus.LastSeen) + }) } diff --git a/services/sinner/service/sinner.go b/services/sinner/service/sinner.go index 98f5bd0df4..a676afa4c3 100644 --- a/services/sinner/service/sinner.go +++ b/services/sinner/service/sinner.go @@ -53,10 +53,10 @@ func NewSinner(eventDB db.EventDB, config indexerConfig.Config, handler metrics. } fetcher := fetcherpkg.NewFetcher(gqlClient.NewClient(&httpClient, config.ScribeURL), handler) - + refreshRate := time.Duration(config.DefaultRefreshRate) * time.Second // Initialize each chain backfiller. for _, chainConfig := range config.Chains { - chainIndexer, err := getChainIndexer(eventDB, chainConfig.ChainID, fetcher, chainConfig) + chainIndexer, err := getChainIndexer(eventDB, chainConfig.ChainID, fetcher, chainConfig, refreshRate) if err != nil { return nil, fmt.Errorf("could not get chain indexer: %w", err) } @@ -119,7 +119,7 @@ func (e Sinner) Index(ctx context.Context) error { } // nolint gocognit,cyclop -func getChainIndexer(eventDB db.EventDB, chainID uint32, fetcher fetcherpkg.ScribeFetcher, chainConfig indexerConfig.ChainConfig) (*ChainIndexer, error) { +func getChainIndexer(eventDB db.EventDB, chainID uint32, fetcher fetcherpkg.ScribeFetcher, chainConfig indexerConfig.ChainConfig, refreshRate time.Duration) (*ChainIndexer, error) { parsers := Parsers{ ChainID: chainID, } @@ -140,7 +140,7 @@ func getChainIndexer(eventDB db.EventDB, chainID uint32, fetcher fetcherpkg.Scri } } - chainIndexer := NewChainIndexer(eventDB, parsers, fetcher, chainConfig) + chainIndexer := NewChainIndexer(eventDB, parsers, fetcher, chainConfig, refreshRate) return chainIndexer, nil } diff --git a/services/sinner/service/suite_test.go b/services/sinner/service/suite_test.go index 312e1a38d8..6059640c4e 100644 --- a/services/sinner/service/suite_test.go +++ b/services/sinner/service/suite_test.go @@ -3,6 +3,7 @@ package service_test import ( "context" "database/sql" + "os" "github.com/ethereum/go-ethereum/common" "github.com/stretchr/testify/suite" @@ -20,7 +21,6 @@ import ( "math/big" "net/http" - "os" "sync" "sync/atomic" "testing" @@ -50,7 +50,6 @@ import ( type ServiceSuite struct { *testsuite.TestSuite dbs []db.TestEventDB - sqlite db.TestEventDB logIndex atomic.Int64 scribeDB scribedb.EventDB scribeDBPath string @@ -77,6 +76,16 @@ func NewEventServiceSuite(tb testing.TB) *ServiceSuite { } } +func (t *ServiceSuite) SetupTest() { + t.TestSuite.SetupTest() + + sqliteStore, err := sqlite.NewSqliteStore(t.GetTestContext(), filet.TmpDir(t.T(), ""), t.metrics, false) + Nil(t.T(), err) + + t.dbs = []db.TestEventDB{sqliteStore} + t.setupMysqlDB() +} + func (t *ServiceSuite) SetupSuite() { t.TestSuite.SetupSuite() t.logIndex.Store(0) @@ -99,8 +108,6 @@ func (t *ServiceSuite) SetupSuite() { Nil(t.T(), err) t.dbs = []db.TestEventDB{sqliteStore} - t.sqlite = sqliteStore - t.setupMysqlDB() t.originChainID = 421614 t.destinationChainID = 444 t.scribeDB, t.scribeFetcher = t.CreateScribeFetcher(t.GetSuiteContext()) @@ -110,7 +117,6 @@ func (t *ServiceSuite) SetupSuite() { } func (t *ServiceSuite) setupMysqlDB() { - t.T().Helper() // skip if mysql test disabled, this really only needs to be run in ci // skip if mysql test disabled if os.Getenv(dbcommon.EnableMysqlTestVar) == "" { @@ -136,7 +142,7 @@ func (t *ServiceSuite) setupMysqlDB() { mysql.MaxOpenConns = 10 // create the sql store - mysqlStore, err := mysql.NewMysqlStore(t.GetSuiteContext(), connString, t.metrics, false) + mysqlStore, err := mysql.NewMysqlStore(t.GetTestContext(), connString, t.metrics, false) Nil(t.T(), err) // add the db t.dbs = append(t.dbs, mysqlStore)