Skip to content

Commit

Permalink
analyzer/block: configurable batch size
Browse files Browse the repository at this point in the history
  • Loading branch information
ptrus committed May 16, 2023
1 parent 46e5f0e commit 3ee0373
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 11 deletions.
11 changes: 7 additions & 4 deletions analyzer/block/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ import (
const (
// Timeout to process a block.
processBlockTimeout = 61 * time.Second
// Number of blocks to be processed in a batch.
blocksBatchSize = 100
// Default number of blocks to be processed in a batch.
defaultBatchSize = 1_000
// Lock expire timeout for blocks (in minutes). Locked blocks not processed within
// this time can be picked again.
lockExpiryMinutes = 5
Expand Down Expand Up @@ -121,7 +121,7 @@ func (b *blockBasedAnalyzer) fetchBatchForProcessing(ctx context.Context, from u
from,
to,
0,
blocksBatchSize,
b.config.BatchSize,
)
case false:
// Fetch and lock blocks for processing.
Expand All @@ -132,7 +132,7 @@ func (b *blockBasedAnalyzer) fetchBatchForProcessing(ctx context.Context, from u
from,
to,
lockExpiryMinutes,
blocksBatchSize,
b.config.BatchSize,
)
}
if err != nil {
Expand Down Expand Up @@ -284,6 +284,9 @@ func NewAnalyzer(
logger *log.Logger,
slowSync bool,
) (analyzer.Analyzer, error) {
if config.BatchSize == 0 {
config.BatchSize = defaultBatchSize
}
return &blockBasedAnalyzer{
config: config,
analyzerName: name,
Expand Down
17 changes: 10 additions & 7 deletions analyzer/block/block_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ const migrationsPath = "file://../../storage/migrations"

const testsTimeout = 10 * time.Second

// Default block based config used in most tests.
var testBlockBasedConfig = &config.BlockBasedAnalyzerConfig{From: 1, To: 1_000, BatchSize: 100}

type mockProcessor struct {
name string
latestBlockHeight uint64
Expand Down Expand Up @@ -134,7 +137,7 @@ func TestFastSyncBlockAnalyzer(t *testing.T) {

db := setupDB(t)
p := &mockProcessor{name: "test-analyzer", latestBlockHeight: 10_000, storage: db}
analyzer := setupAnalyzer(t, db, p, &config.BlockBasedAnalyzerConfig{From: 1, To: 1_000}, false)
analyzer := setupAnalyzer(t, db, p, testBlockBasedConfig, false)

// Run the analyzer and ensure all blocks are processed.
var wg sync.WaitGroup
Expand Down Expand Up @@ -170,7 +173,7 @@ func TestMultipleFastSyncBlockAnalyzers(t *testing.T) {
as := []analyzer.Analyzer{}
for i := 0; i < numAnalyzers; i++ {
p := &mockProcessor{name: "test-analyzer", latestBlockHeight: 10_000, storage: db}
analyzer := setupAnalyzer(t, db, p, &config.BlockBasedAnalyzerConfig{From: 1, To: 1_000}, false)
analyzer := setupAnalyzer(t, db, p, testBlockBasedConfig, false)
ps = append(ps, p)
as = append(as, analyzer)
}
Expand Down Expand Up @@ -220,7 +223,7 @@ func TestFailingFastSyncBlockAnalyzers(t *testing.T) {
}
}
p := &mockProcessor{name: "test-analyzer", latestBlockHeight: 10_000, storage: db, fail: fail}
analyzer := setupAnalyzer(t, db, p, &config.BlockBasedAnalyzerConfig{From: 1, To: 1_000}, false)
analyzer := setupAnalyzer(t, db, p, testBlockBasedConfig, false)
ps = append(ps, p)
as = append(as, analyzer)
}
Expand Down Expand Up @@ -263,7 +266,7 @@ func TestDistinctFastSyncBlockAnalyzers(t *testing.T) {
as := []analyzer.Analyzer{}
for i := 0; i < numAnalyzers; i++ {
p := &mockProcessor{name: fmt.Sprintf("test-analyzer-%d", i), latestBlockHeight: 1_000, storage: db}
analyzer := setupAnalyzer(t, db, p, &config.BlockBasedAnalyzerConfig{From: 1, To: 1_000}, false)
analyzer := setupAnalyzer(t, db, p, testBlockBasedConfig, false)
ps = append(ps, p)
as = append(as, analyzer)
}
Expand Down Expand Up @@ -300,7 +303,7 @@ func TestSlowSyncBlockAnalyzer(t *testing.T) {

db := setupDB(t)
p := &mockProcessor{name: "test-analyzer", latestBlockHeight: 10_000, storage: db}
analyzer := setupAnalyzer(t, db, p, &config.BlockBasedAnalyzerConfig{From: 1, To: 1_000}, true)
analyzer := setupAnalyzer(t, db, p, testBlockBasedConfig, true)

// Run the analyzer and ensure all blocks are processed.
var wg sync.WaitGroup
Expand Down Expand Up @@ -339,7 +342,7 @@ func TestFailingSlowSyncBlockAnalyzer(t *testing.T) {
}
return nil
}}
analyzer := setupAnalyzer(t, db, p, &config.BlockBasedAnalyzerConfig{From: 1, To: 100}, true)
analyzer := setupAnalyzer(t, db, p, &config.BlockBasedAnalyzerConfig{From: 1, To: 100, BatchSize: 100}, true)

// Run the analyzer and ensure all blocks are processed.
var wg sync.WaitGroup
Expand Down Expand Up @@ -376,7 +379,7 @@ func TestDistinctSlowSyncBlockAnalyzers(t *testing.T) {
as := []analyzer.Analyzer{}
for i := 0; i < numAnalyzers; i++ {
p := &mockProcessor{name: fmt.Sprintf("test-analyzer-%d", i), latestBlockHeight: 1_000, storage: db}
analyzer := setupAnalyzer(t, db, p, &config.BlockBasedAnalyzerConfig{From: 1, To: 1_000}, true)
analyzer := setupAnalyzer(t, db, p, testBlockBasedConfig, true)
ps = append(ps, p)
as = append(as, analyzer)
}
Expand Down
9 changes: 9 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,15 @@ type BlockBasedAnalyzerConfig struct {
// continue processing new blocks until the next breaking
// upgrade.
To uint64 `koanf:"to"`

// BatchSize determines the maximum number of blocks the block analyzer
// processes per batch. processes per batch. This is relevant only when the analyzer is
// still catching up to the latest block.
// Optimal value depends on block processing speed. Ideally, it should
// be set to the number of blocks processed within 30-60 seconds.
//
// Uses default value of 1000 if unset/set to 0.
BatchSize uint64 `koanf:"batch_size"`
}

type IntervalBasedAnalyzerConfig struct {
Expand Down

0 comments on commit 3ee0373

Please sign in to comment.