Skip to content

Commit

Permalink
Merge pull request #401 from oasisprotocol/andrew7234/unify-analyzer-…
Browse files Browse the repository at this point in the history
…source

cache analyzer sources
  • Loading branch information
Andrew7234 authored May 4, 2023
2 parents ff7dd31 + 4b75ba3 commit 6b99c7b
Show file tree
Hide file tree
Showing 5 changed files with 138 additions and 108 deletions.
27 changes: 3 additions & 24 deletions analyzer/consensus/consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,27 +67,16 @@ type Main struct {
var _ analyzer.Analyzer = (*Main)(nil)

// NewMain returns a new main analyzer for the consensus layer.
func NewMain(sourceConfig *config.SourceConfig, cfg *config.BlockBasedAnalyzerConfig, target storage.TargetStorage, logger *log.Logger) (*Main, error) {
ctx := context.Background()

// Initialize source storage.
client, err := source.NewConsensusClient(ctx, sourceConfig)
if err != nil {
logger.Error("error creating consensus client",
"err", err.Error(),
)
return nil, err
}

func NewMain(cfg *config.BlockBasedAnalyzerConfig, genesisChainContext string, sourceClient *source.ConsensusClient, target storage.TargetStorage, logger *log.Logger) (*Main, error) {
// Configure analyzer.
blockRange := analyzer.BlockRange{
From: cfg.From,
To: cfg.To,
}
ac := analyzer.ConsensusConfig{
GenesisChainContext: sourceConfig.History().CurrentRecord().ChainContext,
GenesisChainContext: genesisChainContext,
Range: blockRange,
Source: client,
Source: sourceClient,
}

logger.Info("Starting consensus analyzer", "config", ac)
Expand All @@ -101,8 +90,6 @@ func NewMain(sourceConfig *config.SourceConfig, cfg *config.BlockBasedAnalyzerCo

// Start starts the main consensus analyzer.
func (m *Main) Start(ctx context.Context) {
defer m.cleanup()

// Get block to be indexed.
var height int64

Expand Down Expand Up @@ -182,14 +169,6 @@ func (m *Main) Start(ctx context.Context) {
m.cfg.Range.From, m.cfg.Range.To))
}

func (m *Main) cleanup() {
if err := m.cfg.Source.Close(); err != nil {
m.logger.Error("failed to cleanly close consensus data source",
"err", err.Error(),
)
}
}

// Name returns the name of the Main.
func (m *Main) Name() string {
return ConsensusAnalyzerName
Expand Down
26 changes: 3 additions & 23 deletions analyzer/evmtokenbalances/evm_token_balances.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,10 @@ import (
"github.com/oasisprotocol/oasis-indexer/analyzer/runtime"
"github.com/oasisprotocol/oasis-indexer/analyzer/util"
"github.com/oasisprotocol/oasis-indexer/common"
"github.com/oasisprotocol/oasis-indexer/config"
"github.com/oasisprotocol/oasis-indexer/log"
"github.com/oasisprotocol/oasis-indexer/storage"
"github.com/oasisprotocol/oasis-indexer/storage/client"
"github.com/oasisprotocol/oasis-indexer/storage/oasis"
source "github.com/oasisprotocol/oasis-indexer/storage/oasis"
)

// Imagine a timeline starting from a `balanceOf` output `v0` followed by
Expand Down Expand Up @@ -88,23 +87,13 @@ var _ analyzer.Analyzer = (*Main)(nil)

func NewMain(
runtime common.Runtime,
sourceConfig *config.SourceConfig,
sourceClient *source.RuntimeClient,
target storage.TargetStorage,
logger *log.Logger,
) (*Main, error) {
ctx := context.Background()

// Initialize source storage.
client, err := oasis.NewRuntimeClient(ctx, sourceConfig, runtime)
if err != nil {
logger.Error("error creating runtime client",
"err", err,
)
return nil, err
}
ac := analyzer.RuntimeConfig{
RuntimeName: runtime,
Source: client,
Source: sourceClient,
}

return &Main{
Expand Down Expand Up @@ -258,8 +247,6 @@ func (m Main) processBatch(ctx context.Context) (int, error) {
}

func (m Main) Start(ctx context.Context) {
defer m.cleanup()

backoff, err := util.NewBackoff(
100*time.Millisecond,
// Cap the timeout at the expected round time. All runtimes currently have the same round time.
Expand All @@ -278,7 +265,6 @@ func (m Main) Start(ctx context.Context) {
// Process another batch of token balances.
case <-ctx.Done():
m.logger.Warn("shutting down evm_token_balances analyzer", "reason", ctx.Err())
m.cleanup()
return
}

Expand All @@ -300,12 +286,6 @@ func (m Main) Start(ctx context.Context) {
}
}

func (m *Main) cleanup() {
if err := m.cfg.Source.Close(); err != nil {
m.logger.Error("error closing data source", "err", err)
}
}

func (m Main) Name() string {
return EvmTokenBalancesAnalyzerPrefix + string(m.cfg.RuntimeName)
}
25 changes: 3 additions & 22 deletions analyzer/evmtokens/evm_tokens.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,10 @@ import (
"github.com/oasisprotocol/oasis-indexer/analyzer/runtime"
"github.com/oasisprotocol/oasis-indexer/analyzer/util"
"github.com/oasisprotocol/oasis-indexer/common"
"github.com/oasisprotocol/oasis-indexer/config"
"github.com/oasisprotocol/oasis-indexer/log"
"github.com/oasisprotocol/oasis-indexer/storage"
"github.com/oasisprotocol/oasis-indexer/storage/client"
"github.com/oasisprotocol/oasis-indexer/storage/oasis"
source "github.com/oasisprotocol/oasis-indexer/storage/oasis"
)

// The token analyzer (1) gets a list from the database of tokens to download
Expand Down Expand Up @@ -45,23 +44,13 @@ var _ analyzer.Analyzer = (*Main)(nil)

func NewMain(
runtime common.Runtime,
sourceConfig *config.SourceConfig,
sourceClient *source.RuntimeClient,
target storage.TargetStorage,
logger *log.Logger,
) (*Main, error) {
ctx := context.Background()

// Initialize source storage.
client, err := oasis.NewRuntimeClient(ctx, sourceConfig, runtime)
if err != nil {
logger.Error("error creating runtime client",
"err", err,
)
return nil, err
}
ac := analyzer.RuntimeConfig{
RuntimeName: runtime,
Source: client,
Source: sourceClient,
}

return &Main{
Expand Down Expand Up @@ -200,8 +189,6 @@ func (m Main) processBatch(ctx context.Context) (int, error) {
}

func (m Main) Start(ctx context.Context) {
defer m.cleanup()

backoff, err := util.NewBackoff(
100*time.Millisecond,
// Cap the timeout at the expected round time. All runtimes currently have the same round time.
Expand Down Expand Up @@ -241,12 +228,6 @@ func (m Main) Start(ctx context.Context) {
}
}

func (m *Main) cleanup() {
if err := m.cfg.Source.Close(); err != nil {
m.logger.Error("error closing data source", "err", err)
}
}

func (m Main) Name() string {
return EvmTokensAnalyzerPrefix + string(m.cfg.RuntimeName)
}
29 changes: 5 additions & 24 deletions analyzer/runtime/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (
"github.com/oasisprotocol/oasis-indexer/log"
"github.com/oasisprotocol/oasis-indexer/metrics"
"github.com/oasisprotocol/oasis-indexer/storage"
"github.com/oasisprotocol/oasis-indexer/storage/oasis"
source "github.com/oasisprotocol/oasis-indexer/storage/oasis"
)

const (
Expand All @@ -40,30 +40,21 @@ var _ analyzer.Analyzer = (*Main)(nil)
// NewRuntimeAnalyzer returns a new main analyzer for a runtime.
func NewRuntimeAnalyzer(
runtime common.Runtime,
sourceConfig *config.SourceConfig,
runtimeMetadata *sdkConfig.ParaTime,
cfg *config.BlockBasedAnalyzerConfig,
sourceClient *source.RuntimeClient,
target storage.TargetStorage,
logger *log.Logger,
) (*Main, error) {
ctx := context.Background()

// Initialize source storage.
client, err := oasis.NewRuntimeClient(ctx, sourceConfig, runtime)
if err != nil {
logger.Error("error creating runtime client",
"err", err,
)
return nil, err
}
roundRange := analyzer.RoundRange{
From: uint64(cfg.From),
To: uint64(cfg.To),
}
ac := analyzer.RuntimeConfig{
RuntimeName: runtime,
ParaTime: sourceConfig.SDKParaTime(runtime),
ParaTime: runtimeMetadata,
Range: roundRange,
Source: client,
Source: sourceClient,
}

return &Main{
Expand All @@ -75,8 +66,6 @@ func NewRuntimeAnalyzer(
}

func (m *Main) Start(ctx context.Context) {
defer m.cleanup()

if err := m.prework(ctx); err != nil {
m.logger.Error("error doing prework",
"err", err,
Expand Down Expand Up @@ -149,14 +138,6 @@ func (m *Main) Start(ctx context.Context) {
m.cfg.Range.From, m.cfg.Range.To))
}

func (m *Main) cleanup() {
if err := m.cfg.Source.Close(); err != nil {
m.logger.Error("failed to cleanly close consensus data source",
"err", err.Error(),
)
}
}

// Name returns the name of the Main.
func (m *Main) Name() string {
return string(m.cfg.RuntimeName)
Expand Down
Loading

0 comments on commit 6b99c7b

Please sign in to comment.