From 3ee0373dc2e28535033f4110109db35ec4349f9c Mon Sep 17 00:00:00 2001 From: ptrus Date: Sun, 14 May 2023 11:53:29 +0200 Subject: [PATCH] analyzer/block: configurable batch size --- analyzer/block/block.go | 11 +++++++---- analyzer/block/block_test.go | 17 ++++++++++------- config/config.go | 9 +++++++++ 3 files changed, 26 insertions(+), 11 deletions(-) diff --git a/analyzer/block/block.go b/analyzer/block/block.go index efdee7b83..a356db9c9 100644 --- a/analyzer/block/block.go +++ b/analyzer/block/block.go @@ -22,8 +22,8 @@ import ( const ( // Timeout to process a block. processBlockTimeout = 61 * time.Second - // Number of blocks to be processed in a batch. - blocksBatchSize = 100 + // Default number of blocks to be processed in a batch. + defaultBatchSize = 1_000 // Lock expire timeout for blocks (in minutes). Locked blocks not processed within // this time can be picked again. lockExpiryMinutes = 5 @@ -121,7 +121,7 @@ func (b *blockBasedAnalyzer) fetchBatchForProcessing(ctx context.Context, from u from, to, 0, - blocksBatchSize, + b.config.BatchSize, ) case false: // Fetch and lock blocks for processing. @@ -132,7 +132,7 @@ func (b *blockBasedAnalyzer) fetchBatchForProcessing(ctx context.Context, from u from, to, lockExpiryMinutes, - blocksBatchSize, + b.config.BatchSize, ) } if err != nil { @@ -284,6 +284,9 @@ func NewAnalyzer( logger *log.Logger, slowSync bool, ) (analyzer.Analyzer, error) { + if config.BatchSize == 0 { + config.BatchSize = defaultBatchSize + } return &blockBasedAnalyzer{ config: config, analyzerName: name, diff --git a/analyzer/block/block_test.go b/analyzer/block/block_test.go index cc2a8003e..554501049 100644 --- a/analyzer/block/block_test.go +++ b/analyzer/block/block_test.go @@ -28,6 +28,9 @@ const migrationsPath = "file://../../storage/migrations" const testsTimeout = 10 * time.Second +// Default block based config used in most tests. +var testBlockBasedConfig = &config.BlockBasedAnalyzerConfig{From: 1, To: 1_000, BatchSize: 100} + type mockProcessor struct { name string latestBlockHeight uint64 @@ -134,7 +137,7 @@ func TestFastSyncBlockAnalyzer(t *testing.T) { db := setupDB(t) p := &mockProcessor{name: "test-analyzer", latestBlockHeight: 10_000, storage: db} - analyzer := setupAnalyzer(t, db, p, &config.BlockBasedAnalyzerConfig{From: 1, To: 1_000}, false) + analyzer := setupAnalyzer(t, db, p, testBlockBasedConfig, false) // Run the analyzer and ensure all blocks are processed. var wg sync.WaitGroup @@ -170,7 +173,7 @@ func TestMultipleFastSyncBlockAnalyzers(t *testing.T) { as := []analyzer.Analyzer{} for i := 0; i < numAnalyzers; i++ { p := &mockProcessor{name: "test-analyzer", latestBlockHeight: 10_000, storage: db} - analyzer := setupAnalyzer(t, db, p, &config.BlockBasedAnalyzerConfig{From: 1, To: 1_000}, false) + analyzer := setupAnalyzer(t, db, p, testBlockBasedConfig, false) ps = append(ps, p) as = append(as, analyzer) } @@ -220,7 +223,7 @@ func TestFailingFastSyncBlockAnalyzers(t *testing.T) { } } p := &mockProcessor{name: "test-analyzer", latestBlockHeight: 10_000, storage: db, fail: fail} - analyzer := setupAnalyzer(t, db, p, &config.BlockBasedAnalyzerConfig{From: 1, To: 1_000}, false) + analyzer := setupAnalyzer(t, db, p, testBlockBasedConfig, false) ps = append(ps, p) as = append(as, analyzer) } @@ -263,7 +266,7 @@ func TestDistinctFastSyncBlockAnalyzers(t *testing.T) { as := []analyzer.Analyzer{} for i := 0; i < numAnalyzers; i++ { p := &mockProcessor{name: fmt.Sprintf("test-analyzer-%d", i), latestBlockHeight: 1_000, storage: db} - analyzer := setupAnalyzer(t, db, p, &config.BlockBasedAnalyzerConfig{From: 1, To: 1_000}, false) + analyzer := setupAnalyzer(t, db, p, testBlockBasedConfig, false) ps = append(ps, p) as = append(as, analyzer) } @@ -300,7 +303,7 @@ func TestSlowSyncBlockAnalyzer(t *testing.T) { db := setupDB(t) p := &mockProcessor{name: "test-analyzer", latestBlockHeight: 10_000, storage: db} - analyzer := setupAnalyzer(t, db, p, &config.BlockBasedAnalyzerConfig{From: 1, To: 1_000}, true) + analyzer := setupAnalyzer(t, db, p, testBlockBasedConfig, true) // Run the analyzer and ensure all blocks are processed. var wg sync.WaitGroup @@ -339,7 +342,7 @@ func TestFailingSlowSyncBlockAnalyzer(t *testing.T) { } return nil }} - analyzer := setupAnalyzer(t, db, p, &config.BlockBasedAnalyzerConfig{From: 1, To: 100}, true) + analyzer := setupAnalyzer(t, db, p, &config.BlockBasedAnalyzerConfig{From: 1, To: 100, BatchSize: 100}, true) // Run the analyzer and ensure all blocks are processed. var wg sync.WaitGroup @@ -376,7 +379,7 @@ func TestDistinctSlowSyncBlockAnalyzers(t *testing.T) { as := []analyzer.Analyzer{} for i := 0; i < numAnalyzers; i++ { p := &mockProcessor{name: fmt.Sprintf("test-analyzer-%d", i), latestBlockHeight: 1_000, storage: db} - analyzer := setupAnalyzer(t, db, p, &config.BlockBasedAnalyzerConfig{From: 1, To: 1_000}, true) + analyzer := setupAnalyzer(t, db, p, testBlockBasedConfig, true) ps = append(ps, p) as = append(as, analyzer) } diff --git a/config/config.go b/config/config.go index 5f307e79d..13c1d80ea 100644 --- a/config/config.go +++ b/config/config.go @@ -250,6 +250,15 @@ type BlockBasedAnalyzerConfig struct { // continue processing new blocks until the next breaking // upgrade. To uint64 `koanf:"to"` + + // BatchSize determines the maximum number of blocks the block analyzer + // processes per batch. processes per batch. This is relevant only when the analyzer is + // still catching up to the latest block. + // Optimal value depends on block processing speed. Ideally, it should + // be set to the number of blocks processed within 30-60 seconds. + // + // Uses default value of 1000 if unset/set to 0. + BatchSize uint64 `koanf:"batch_size"` } type IntervalBasedAnalyzerConfig struct {