diff --git a/related-repositories/README.md b/related-repositories/README.md index d82c53389c3d..28121ee48907 100644 --- a/related-repositories/README.md +++ b/related-repositories/README.md @@ -9,7 +9,8 @@ The folder will allow developers to clone/move related repositories to this dire It is recommended that you move the following repositories under this folder. Keep the repository names! - `vulcanize/foundry-tests` -- `vulcanize/hive` - `vulcanize/ipld-eth-db` ## Symlinks + +You can also create symlinks in this folder with the location of your repositories. diff --git a/statediff/known_gaps.go b/statediff/known_gaps.go index f94b59262b20..46d5c6a11cc5 100644 --- a/statediff/known_gaps.go +++ b/statediff/known_gaps.go @@ -19,8 +19,10 @@ package statediff import ( "context" "fmt" + "io/ioutil" "math/big" "os" + "strings" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/statediff/indexer/database/sql" @@ -36,11 +38,6 @@ var ( defaultWriteFilePath = "./known_gaps.sql" ) -type KnownGaps interface { - PushKnownGaps(startingBlockNumber *big.Int, endingBlockNumber *big.Int, checkedOut bool, processingKey int64) error - FindAndUpdateGaps(latestBlockOnChain *big.Int, expectedDifference *big.Int, processingKey int64) error -} - type KnownGapsState struct { // Should we check for gaps by looking at the DB and comparing the latest block with head checkForGaps bool @@ -61,10 +58,15 @@ type KnownGapsState struct { writeFilePath string // DB object to use for reading and writing to the DB db sql.Database + //Do we have entries in the local sql file that need to be written to the DB + sqlFileWaitingForWrite bool + // Metrics object used to track metrics. + statediffMetrics statediffMetricsHandles } +// Unused func NewKnownGapsState(checkForGaps bool, processingKey int64, expectedDifference *big.Int, - errorState bool, writeFilePath string, db sql.Database) *KnownGapsState { + errorState bool, writeFilePath string, db sql.Database, statediffMetrics statediffMetricsHandles) *KnownGapsState { return &KnownGapsState{ checkForGaps: checkForGaps, @@ -73,11 +75,12 @@ func NewKnownGapsState(checkForGaps bool, processingKey int64, expectedDifferenc errorState: errorState, writeFilePath: writeFilePath, db: db, + statediffMetrics: statediffMetrics, } } -func MinMax(array []*big.Int) (*big.Int, *big.Int) { +func minMax(array []*big.Int) (*big.Int, *big.Int) { var max *big.Int = array[0] var min *big.Int = array[0] for _, value := range array { @@ -96,7 +99,7 @@ func MinMax(array []*big.Int) (*big.Int, *big.Int) { // 2. Write to sql file locally. // 3. Write to prometheus directly. // 4. Logs and error. -func (kg *KnownGapsState) PushKnownGaps(startingBlockNumber *big.Int, endingBlockNumber *big.Int, checkedOut bool, processingKey int64) error { +func (kg *KnownGapsState) pushKnownGaps(startingBlockNumber *big.Int, endingBlockNumber *big.Int, checkedOut bool, processingKey int64) error { if startingBlockNumber.Cmp(endingBlockNumber) != -1 { return fmt.Errorf("Starting Block %d, is greater than ending block %d", startingBlockNumber, endingBlockNumber) } @@ -106,9 +109,13 @@ func (kg *KnownGapsState) PushKnownGaps(startingBlockNumber *big.Int, endingBloc CheckedOut: checkedOut, ProcessingKey: processingKey, } - log.Info("Writing known gaps to the DB") + + log.Info("Updating Metrics for the start and end block") + //kg.statediffMetrics.knownGapStart.Update(startingBlockNumber.Int64()) + //kg.statediffMetrics.knownGapEnd.Update(endingBlockNumber.Int64()) var writeErr error + log.Info("Writing known gaps to the DB") if kg.db != nil { dbErr := kg.upsertKnownGaps(knownGap) if dbErr != nil { @@ -117,22 +124,24 @@ func (kg *KnownGapsState) PushKnownGaps(startingBlockNumber *big.Int, endingBloc } } else { writeErr = kg.upsertKnownGapsFile(knownGap) - } if writeErr != nil { - log.Info("Unsuccessful when writing to a file", "Error", writeErr) - return writeErr + log.Error("Unsuccessful when writing to a file", "Error", writeErr) + log.Error("Updating Metrics for the start and end error block") + log.Error("Unable to write the following Gaps to DB or File", "startBlock", startingBlockNumber, "endBlock", endingBlockNumber) + kg.statediffMetrics.knownGapErrorStart.Update(startingBlockNumber.Int64()) + kg.statediffMetrics.knownGapErrorEnd.Update(endingBlockNumber.Int64()) } return nil } // This is a simple wrapper function to write gaps from a knownErrorBlocks array. func (kg *KnownGapsState) captureErrorBlocks(knownErrorBlocks []*big.Int) { - startErrorBlock, endErrorBlock := MinMax(knownErrorBlocks) + startErrorBlock, endErrorBlock := minMax(knownErrorBlocks) log.Warn("The following Gaps were found", "knownErrorBlocks", knownErrorBlocks) log.Warn("Updating known Gaps table", "startErrorBlock", startErrorBlock, "endErrorBlock", endErrorBlock, "processingKey", kg.processingKey) - kg.PushKnownGaps(startErrorBlock, endErrorBlock, false, kg.processingKey) + kg.pushKnownGaps(startErrorBlock, endErrorBlock, false, kg.processingKey) } @@ -156,9 +165,9 @@ func isGap(latestBlockInDb *big.Int, latestBlockOnChain *big.Int, expectedDiffer // TODO: // REmove the return value // Write to file if err in writing to DB -func (kg *KnownGapsState) FindAndUpdateGaps(latestBlockOnChain *big.Int, expectedDifference *big.Int, processingKey int64) error { +func (kg *KnownGapsState) findAndUpdateGaps(latestBlockOnChain *big.Int, expectedDifference *big.Int, processingKey int64) error { // Make this global - latestBlockInDb, err := kg.QueryDbToBigInt(dbQueryString) + latestBlockInDb, err := kg.queryDbToBigInt(dbQueryString) if err != nil { return err } @@ -171,7 +180,7 @@ func (kg *KnownGapsState) FindAndUpdateGaps(latestBlockOnChain *big.Int, expecte endBlock.Sub(latestBlockOnChain, expectedDifference) log.Warn("Found Gaps starting at", "startBlock", startBlock, "endingBlock", endBlock) - err := kg.PushKnownGaps(startBlock, endBlock, false, processingKey) + err := kg.pushKnownGaps(startBlock, endBlock, false, processingKey) if err != nil { log.Error("We were unable to write the following gap to the DB", "start Block", startBlock, "endBlock", endBlock, "error", err) return err @@ -212,11 +221,37 @@ func (kg *KnownGapsState) upsertKnownGapsFile(knownGaps models.KnownGapsModel) e return err } log.Info("Wrote the gaps to a local SQL file") + kg.sqlFileWaitingForWrite = true + return nil +} + +func (kg *KnownGapsState) writeSqlFileStmtToDb() error { + log.Info("Writing the local SQL file for KnownGaps to the DB") + file, ioErr := ioutil.ReadFile(kg.writeFilePath) + + if ioErr != nil { + log.Error("Unable to open local SQL File for writing") + return ioErr + } + + requests := strings.Split(string(file), ";") + + for _, request := range requests { + _, err := kg.db.Exec(context.Background(), request) + if err != nil { + log.Error("Unable to run insert statement from file to the DB") + return err + } + } + if err := os.Truncate(kg.writeFilePath, 0); err != nil { + log.Info("Failed to empty knownGaps file after inserting statements to the DB", "error", err) + } + kg.sqlFileWaitingForWrite = false return nil } // This is a simple wrapper function which will run QueryRow on the DB -func (kg *KnownGapsState) QueryDb(queryString string) (string, error) { +func (kg *KnownGapsState) queryDb(queryString string) (string, error) { var ret string err := kg.db.QueryRow(context.Background(), queryString).Scan(&ret) if err != nil { @@ -228,9 +263,9 @@ func (kg *KnownGapsState) QueryDb(queryString string) (string, error) { // This function is a simple wrapper which will call QueryDb but the return value will be // a big int instead of a string -func (kg *KnownGapsState) QueryDbToBigInt(queryString string) (*big.Int, error) { +func (kg *KnownGapsState) queryDbToBigInt(queryString string) (*big.Int, error) { ret := new(big.Int) - res, err := kg.QueryDb(queryString) + res, err := kg.queryDb(queryString) if err != nil { return ret, err } diff --git a/statediff/known_gaps_test.go b/statediff/known_gaps_test.go index d7be1339c011..b1d951add062 100644 --- a/statediff/known_gaps_test.go +++ b/statediff/known_gaps_test.go @@ -3,12 +3,11 @@ package statediff import ( "context" "fmt" - "io/ioutil" "math/big" "os" - "strings" "testing" + "github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/statediff/indexer/database/sql" "github.com/ethereum/go-ethereum/statediff/indexer/database/sql/postgres" @@ -68,6 +67,7 @@ func testWriteToDb(t *testing.T, tests []gapValues, wipeDbBeforeStart bool) { processingKey: tc.processingKey, expectedDifference: big.NewInt(tc.expectedDif), db: db, + statediffMetrics: RegisterStatediffMetrics(metrics.DefaultRegistry), } service := &Service{ KnownGaps: knownGaps, @@ -106,6 +106,7 @@ func testWriteToFile(t *testing.T, tests []gapValues, wipeDbBeforeStart bool) { processingKey: tc.processingKey, expectedDifference: big.NewInt(tc.expectedDif), writeFilePath: knownGapsFilePath, + statediffMetrics: RegisterStatediffMetrics(metrics.DefaultRegistry), db: nil, // Only set to nil to be verbose that we can't use it } service := &Service{ @@ -116,18 +117,13 @@ func testWriteToFile(t *testing.T, tests []gapValues, wipeDbBeforeStart bool) { service.KnownGaps.knownErrorBlocks = knownErrorBlocks testCaptureErrorBlocks(t, service) - - file, ioErr := ioutil.ReadFile(knownGapsFilePath) - require.NoError(t, ioErr) - - requests := strings.Split(string(file), ";") - newDb := setupDb(t) service.KnownGaps.db = newDb - for _, request := range requests { - _, err := newDb.Exec(context.Background(), request) - require.NoError(t, err) + if service.KnownGaps.sqlFileWaitingForWrite { + writeErr := service.KnownGaps.writeSqlFileStmtToDb() + require.NoError(t, writeErr) } + // Validate that the upsert was done correctly. validateUpsert(t, service, tc.knownErrorBlocksStart, tc.knownErrorBlocksEnd) tearDown(t, newDb) @@ -145,12 +141,13 @@ func testFindAndUpdateGaps(t *testing.T, wipeDbBeforeStart bool) { processingKey: 1, expectedDifference: big.NewInt(1), db: db, + statediffMetrics: RegisterStatediffMetrics(metrics.DefaultRegistry), } service := &Service{ KnownGaps: knownGaps, } - latestBlockInDb, err := service.KnownGaps.QueryDbToBigInt("SELECT MAX(block_number) FROM eth.header_cids") + latestBlockInDb, err := service.KnownGaps.queryDbToBigInt("SELECT MAX(block_number) FROM eth.header_cids") if err != nil { t.Skip("Can't find a block in the eth.header_cids table.. Please put one there") } @@ -165,7 +162,7 @@ func testFindAndUpdateGaps(t *testing.T, wipeDbBeforeStart bool) { t.Log("The latest block on the chain is: ", latestBlockOnChain) t.Log("The latest block on the DB is: ", latestBlockInDb) - gapUpsertErr := service.KnownGaps.FindAndUpdateGaps(latestBlockOnChain, expectedDifference, 0) + gapUpsertErr := service.KnownGaps.findAndUpdateGaps(latestBlockOnChain, expectedDifference, 0) require.NoError(t, gapUpsertErr) startBlock := big.NewInt(0) @@ -195,7 +192,7 @@ func validateUpsert(t *testing.T, service *Service, startingBlock int64, endingB t.Logf("Starting to query blocks: %d - %d", startingBlock, endingBlock) queryString := fmt.Sprintf("SELECT starting_block_number from eth.known_gaps WHERE starting_block_number = %d AND ending_block_number = %d", startingBlock, endingBlock) - _, queryErr := service.KnownGaps.QueryDb(queryString) // Figure out the string. + _, queryErr := service.KnownGaps.queryDb(queryString) // Figure out the string. t.Logf("Updated Known Gaps table starting from, %d, and ending at, %d", startingBlock, endingBlock) require.NoError(t, queryErr) } diff --git a/statediff/metrics.go b/statediff/metrics.go index afc80e40e5bb..e67499b94328 100644 --- a/statediff/metrics.go +++ b/statediff/metrics.go @@ -50,6 +50,14 @@ type statediffMetricsHandles struct { // Current length of chainEvent channels serviceLoopChannelLen metrics.Gauge writeLoopChannelLen metrics.Gauge + // The start block of the known gap + knownGapStart metrics.Gauge + // The end block of the known gap + knownGapEnd metrics.Gauge + // A known gaps start block which had an error being written to the DB + knownGapErrorStart metrics.Gauge + // A known gaps end block which had an error being written to the DB + knownGapErrorEnd metrics.Gauge } func RegisterStatediffMetrics(reg metrics.Registry) statediffMetricsHandles { @@ -59,6 +67,10 @@ func RegisterStatediffMetrics(reg metrics.Registry) statediffMetricsHandles { lastStatediffHeight: metrics.NewGauge(), serviceLoopChannelLen: metrics.NewGauge(), writeLoopChannelLen: metrics.NewGauge(), + knownGapStart: metrics.NewGauge(), + knownGapEnd: metrics.NewGauge(), + knownGapErrorStart: metrics.NewGauge(), + knownGapErrorEnd: metrics.NewGauge(), } subsys := "service" reg.Register(metricName(subsys, "last_sync_height"), ctx.lastSyncHeight) @@ -66,5 +78,9 @@ func RegisterStatediffMetrics(reg metrics.Registry) statediffMetricsHandles { reg.Register(metricName(subsys, "last_statediff_height"), ctx.lastStatediffHeight) reg.Register(metricName(subsys, "service_loop_channel_len"), ctx.serviceLoopChannelLen) reg.Register(metricName(subsys, "write_loop_channel_len"), ctx.writeLoopChannelLen) + reg.Register(metricName(subsys, "known_gaps_start"), ctx.knownGapStart) + reg.Register(metricName(subsys, "known_gaps_end"), ctx.knownGapEnd) + reg.Register(metricName(subsys, "known_gaps_error_start"), ctx.knownGapErrorStart) + reg.Register(metricName(subsys, "known_gaps_error_end"), ctx.knownGapErrorEnd) return ctx } diff --git a/statediff/service.go b/statediff/service.go index 5bacd9ad77bc..3e3cbb9b3a49 100644 --- a/statediff/service.go +++ b/statediff/service.go @@ -185,11 +185,13 @@ func New(stack *node.Node, ethServ *eth.Ethereum, cfg *ethconfig.Config, params // If we ever have multiple processingKeys we can update them here // along with the expectedDifference knownGaps := &KnownGapsState{ - processingKey: 0, - expectedDifference: big.NewInt(1), - errorState: false, - writeFilePath: params.KnownGapsFilePath, - db: db, + processingKey: 0, + expectedDifference: big.NewInt(1), + errorState: false, + writeFilePath: params.KnownGapsFilePath, + db: db, + statediffMetrics: statediffMetrics, + sqlFileWaitingForWrite: false, } if params.IndexerConfig.Type() == shared.POSTGRES { knownGaps.checkForGaps = true @@ -336,7 +338,7 @@ func (sds *Service) writeLoopWorker(params workerParams) { // Check and update the gaps table. if sds.KnownGaps.checkForGaps && !sds.KnownGaps.errorState { log.Info("Checking for Gaps at", "current block", currentBlock.Number()) - go sds.KnownGaps.FindAndUpdateGaps(currentBlock.Number(), sds.KnownGaps.expectedDifference, sds.KnownGaps.processingKey) + go sds.KnownGaps.findAndUpdateGaps(currentBlock.Number(), sds.KnownGaps.expectedDifference, sds.KnownGaps.processingKey) sds.KnownGaps.checkForGaps = false } @@ -357,11 +359,17 @@ func (sds *Service) writeLoopWorker(params workerParams) { staticKnownErrorBlocks := make([]*big.Int, len(sds.KnownGaps.knownErrorBlocks)) copy(staticKnownErrorBlocks, sds.KnownGaps.knownErrorBlocks) sds.KnownGaps.knownErrorBlocks = nil - - log.Debug("Starting capturedMissedBlocks") go sds.KnownGaps.captureErrorBlocks(staticKnownErrorBlocks) } + if sds.KnownGaps.sqlFileWaitingForWrite { + log.Info("There are entries in the SQL file for knownGaps that should be written") + err := sds.KnownGaps.writeSqlFileStmtToDb() + if err != nil { + log.Error("Unable to write KnownGap sql file to DB") + } + } + // TODO: how to handle with concurrent workers statediffMetrics.lastStatediffHeight.Update(int64(currentBlock.Number().Uint64())) case <-sds.QuitChan: