diff --git a/analyzer/consensus/consensus.go b/analyzer/consensus/consensus.go index 85759b418..042f0f849 100644 --- a/analyzer/consensus/consensus.go +++ b/analyzer/consensus/consensus.go @@ -67,27 +67,16 @@ type Main struct { var _ analyzer.Analyzer = (*Main)(nil) // NewMain returns a new main analyzer for the consensus layer. -func NewMain(sourceConfig *config.SourceConfig, cfg *config.BlockBasedAnalyzerConfig, target storage.TargetStorage, logger *log.Logger) (*Main, error) { - ctx := context.Background() - - // Initialize source storage. - client, err := source.NewConsensusClient(ctx, sourceConfig) - if err != nil { - logger.Error("error creating consensus client", - "err", err.Error(), - ) - return nil, err - } - +func NewMain(cfg *config.BlockBasedAnalyzerConfig, genesisChainContext string, sourceClient *source.ConsensusClient, target storage.TargetStorage, logger *log.Logger) (*Main, error) { // Configure analyzer. blockRange := analyzer.BlockRange{ From: cfg.From, To: cfg.To, } ac := analyzer.ConsensusConfig{ - GenesisChainContext: sourceConfig.History().CurrentRecord().ChainContext, + GenesisChainContext: genesisChainContext, Range: blockRange, - Source: client, + Source: sourceClient, } logger.Info("Starting consensus analyzer", "config", ac) @@ -101,8 +90,6 @@ func NewMain(sourceConfig *config.SourceConfig, cfg *config.BlockBasedAnalyzerCo // Start starts the main consensus analyzer. func (m *Main) Start(ctx context.Context) { - defer m.cleanup() - // Get block to be indexed. var height int64 @@ -182,14 +169,6 @@ func (m *Main) Start(ctx context.Context) { m.cfg.Range.From, m.cfg.Range.To)) } -func (m *Main) cleanup() { - if err := m.cfg.Source.Close(); err != nil { - m.logger.Error("failed to cleanly close consensus data source", - "err", err.Error(), - ) - } -} - // Name returns the name of the Main. func (m *Main) Name() string { return ConsensusAnalyzerName diff --git a/analyzer/evmtokenbalances/evm_token_balances.go b/analyzer/evmtokenbalances/evm_token_balances.go index f9f220cb5..c56789cd2 100644 --- a/analyzer/evmtokenbalances/evm_token_balances.go +++ b/analyzer/evmtokenbalances/evm_token_balances.go @@ -13,11 +13,10 @@ import ( "github.com/oasisprotocol/oasis-indexer/analyzer/runtime" "github.com/oasisprotocol/oasis-indexer/analyzer/util" "github.com/oasisprotocol/oasis-indexer/common" - "github.com/oasisprotocol/oasis-indexer/config" "github.com/oasisprotocol/oasis-indexer/log" "github.com/oasisprotocol/oasis-indexer/storage" "github.com/oasisprotocol/oasis-indexer/storage/client" - "github.com/oasisprotocol/oasis-indexer/storage/oasis" + source "github.com/oasisprotocol/oasis-indexer/storage/oasis" ) // Imagine a timeline starting from a `balanceOf` output `v0` followed by @@ -88,23 +87,13 @@ var _ analyzer.Analyzer = (*Main)(nil) func NewMain( runtime common.Runtime, - sourceConfig *config.SourceConfig, + sourceClient *source.RuntimeClient, target storage.TargetStorage, logger *log.Logger, ) (*Main, error) { - ctx := context.Background() - - // Initialize source storage. - client, err := oasis.NewRuntimeClient(ctx, sourceConfig, runtime) - if err != nil { - logger.Error("error creating runtime client", - "err", err, - ) - return nil, err - } ac := analyzer.RuntimeConfig{ RuntimeName: runtime, - Source: client, + Source: sourceClient, } return &Main{ @@ -258,8 +247,6 @@ func (m Main) processBatch(ctx context.Context) (int, error) { } func (m Main) Start(ctx context.Context) { - defer m.cleanup() - backoff, err := util.NewBackoff( 100*time.Millisecond, // Cap the timeout at the expected round time. All runtimes currently have the same round time. @@ -278,7 +265,6 @@ func (m Main) Start(ctx context.Context) { // Process another batch of token balances. case <-ctx.Done(): m.logger.Warn("shutting down evm_token_balances analyzer", "reason", ctx.Err()) - m.cleanup() return } @@ -300,12 +286,6 @@ func (m Main) Start(ctx context.Context) { } } -func (m *Main) cleanup() { - if err := m.cfg.Source.Close(); err != nil { - m.logger.Error("error closing data source", "err", err) - } -} - func (m Main) Name() string { return EvmTokenBalancesAnalyzerPrefix + string(m.cfg.RuntimeName) } diff --git a/analyzer/evmtokens/evm_tokens.go b/analyzer/evmtokens/evm_tokens.go index 5dfbfcc6c..70b4c9cba 100644 --- a/analyzer/evmtokens/evm_tokens.go +++ b/analyzer/evmtokens/evm_tokens.go @@ -12,11 +12,10 @@ import ( "github.com/oasisprotocol/oasis-indexer/analyzer/runtime" "github.com/oasisprotocol/oasis-indexer/analyzer/util" "github.com/oasisprotocol/oasis-indexer/common" - "github.com/oasisprotocol/oasis-indexer/config" "github.com/oasisprotocol/oasis-indexer/log" "github.com/oasisprotocol/oasis-indexer/storage" "github.com/oasisprotocol/oasis-indexer/storage/client" - "github.com/oasisprotocol/oasis-indexer/storage/oasis" + source "github.com/oasisprotocol/oasis-indexer/storage/oasis" ) // The token analyzer (1) gets a list from the database of tokens to download @@ -45,23 +44,13 @@ var _ analyzer.Analyzer = (*Main)(nil) func NewMain( runtime common.Runtime, - sourceConfig *config.SourceConfig, + sourceClient *source.RuntimeClient, target storage.TargetStorage, logger *log.Logger, ) (*Main, error) { - ctx := context.Background() - - // Initialize source storage. - client, err := oasis.NewRuntimeClient(ctx, sourceConfig, runtime) - if err != nil { - logger.Error("error creating runtime client", - "err", err, - ) - return nil, err - } ac := analyzer.RuntimeConfig{ RuntimeName: runtime, - Source: client, + Source: sourceClient, } return &Main{ @@ -200,8 +189,6 @@ func (m Main) processBatch(ctx context.Context) (int, error) { } func (m Main) Start(ctx context.Context) { - defer m.cleanup() - backoff, err := util.NewBackoff( 100*time.Millisecond, // Cap the timeout at the expected round time. All runtimes currently have the same round time. @@ -241,12 +228,6 @@ func (m Main) Start(ctx context.Context) { } } -func (m *Main) cleanup() { - if err := m.cfg.Source.Close(); err != nil { - m.logger.Error("error closing data source", "err", err) - } -} - func (m Main) Name() string { return EvmTokensAnalyzerPrefix + string(m.cfg.RuntimeName) } diff --git a/analyzer/runtime/runtime.go b/analyzer/runtime/runtime.go index a2057b512..daeec5b8f 100644 --- a/analyzer/runtime/runtime.go +++ b/analyzer/runtime/runtime.go @@ -20,7 +20,7 @@ import ( "github.com/oasisprotocol/oasis-indexer/log" "github.com/oasisprotocol/oasis-indexer/metrics" "github.com/oasisprotocol/oasis-indexer/storage" - "github.com/oasisprotocol/oasis-indexer/storage/oasis" + source "github.com/oasisprotocol/oasis-indexer/storage/oasis" ) const ( @@ -40,30 +40,21 @@ var _ analyzer.Analyzer = (*Main)(nil) // NewRuntimeAnalyzer returns a new main analyzer for a runtime. func NewRuntimeAnalyzer( runtime common.Runtime, - sourceConfig *config.SourceConfig, + runtimeMetadata *sdkConfig.ParaTime, cfg *config.BlockBasedAnalyzerConfig, + sourceClient *source.RuntimeClient, target storage.TargetStorage, logger *log.Logger, ) (*Main, error) { - ctx := context.Background() - - // Initialize source storage. - client, err := oasis.NewRuntimeClient(ctx, sourceConfig, runtime) - if err != nil { - logger.Error("error creating runtime client", - "err", err, - ) - return nil, err - } roundRange := analyzer.RoundRange{ From: uint64(cfg.From), To: uint64(cfg.To), } ac := analyzer.RuntimeConfig{ RuntimeName: runtime, - ParaTime: sourceConfig.SDKParaTime(runtime), + ParaTime: runtimeMetadata, Range: roundRange, - Source: client, + Source: sourceClient, } return &Main{ @@ -75,8 +66,6 @@ func NewRuntimeAnalyzer( } func (m *Main) Start(ctx context.Context) { - defer m.cleanup() - if err := m.prework(ctx); err != nil { m.logger.Error("error doing prework", "err", err, @@ -149,14 +138,6 @@ func (m *Main) Start(ctx context.Context) { m.cfg.Range.From, m.cfg.Range.To)) } -func (m *Main) cleanup() { - if err := m.cfg.Source.Close(); err != nil { - m.logger.Error("failed to cleanly close consensus data source", - "err", err.Error(), - ) - } -} - // Name returns the name of the Main. func (m *Main) Name() string { return string(m.cfg.RuntimeName) diff --git a/cmd/analyzer/analyzer.go b/cmd/analyzer/analyzer.go index e2ab1a0fa..7cbb4214f 100644 --- a/cmd/analyzer/analyzer.go +++ b/cmd/analyzer/analyzer.go @@ -3,6 +3,7 @@ package analyzer import ( "context" + "fmt" "os" "os/signal" "sync" @@ -24,6 +25,7 @@ import ( "github.com/oasisprotocol/oasis-indexer/config" "github.com/oasisprotocol/oasis-indexer/log" "github.com/oasisprotocol/oasis-indexer/storage" + source "github.com/oasisprotocol/oasis-indexer/storage/oasis" ) const ( @@ -136,8 +138,67 @@ func wipeStorage(cfg *config.StorageConfig) error { type Service struct { Analyzers map[string]analyzer.Analyzer - target storage.TargetStorage - logger *log.Logger + sources *sourceFactory + target storage.TargetStorage + logger *log.Logger +} + +// sourceFactory stores singletons of the sources used by all the analyzers in a Service. +// This enables re-use of node connections as well as graceful shutdown. +// Note: NOT thread safe. +type sourceFactory struct { + cfg config.SourceConfig + + consensus *source.ConsensusClient + runtimes map[common.Runtime]*source.RuntimeClient +} + +func newSourceFactory(cfg config.SourceConfig) *sourceFactory { + return &sourceFactory{ + cfg: cfg, + runtimes: make(map[common.Runtime]*source.RuntimeClient), + } +} + +func (s *sourceFactory) Close() error { + var firstErr error + if s.consensus != nil { + if err := s.consensus.Close(); err != nil { + firstErr = err + } + } + for _, runtimeClient := range s.runtimes { + if err := runtimeClient.Close(); err != nil && firstErr == nil { + firstErr = err + } + } + + return firstErr +} + +func (s *sourceFactory) Consensus(ctx context.Context) (*source.ConsensusClient, error) { + if s.consensus == nil { + client, err := source.NewConsensusClient(ctx, &s.cfg) + if err != nil { + return nil, fmt.Errorf("error creating consensus client: %w", err) + } + s.consensus = client + } + + return s.consensus, nil +} + +func (s *sourceFactory) Runtime(ctx context.Context, runtime common.Runtime) (*source.RuntimeClient, error) { + _, ok := s.runtimes[runtime] + if !ok { + client, err := source.NewRuntimeClient(ctx, &s.cfg, runtime) + if err != nil { + return nil, fmt.Errorf("error creating %s client: %w", string(runtime), err) + } + s.runtimes[runtime] = client + } + + return s.runtimes[runtime], nil } type A = analyzer.Analyzer @@ -161,10 +222,14 @@ func addAnalyzer(analyzers map[string]A, errSoFar error, analyzerGenerator func( // NewService creates new Service. func NewService(cfg *config.AnalysisConfig) (*Service, error) { + ctx := context.Background() logger := cmdCommon.Logger().WithModule(moduleName) + // Initialize source storage. + sources := newSourceFactory(cfg.Source) + // Initialize target storage. - client, err := cmdCommon.NewClient(cfg.Storage, logger) + dbClient, err := cmdCommon.NewClient(cfg.Storage, logger) if err != nil { return nil, err } @@ -173,52 +238,88 @@ func NewService(cfg *config.AnalysisConfig) (*Service, error) { analyzers := map[string]A{} if cfg.Analyzers.Consensus != nil { analyzers, err = addAnalyzer(analyzers, err, func() (A, error) { - return consensus.NewMain(&cfg.Source, cfg.Analyzers.Consensus, client, logger) + genesisChainContext := cfg.Source.History().CurrentRecord().ChainContext + sourceClient, err1 := sources.Consensus(ctx) + if err1 != nil { + return nil, err1 + } + return consensus.NewMain(cfg.Analyzers.Consensus, genesisChainContext, sourceClient, dbClient, logger) }) } if cfg.Analyzers.Emerald != nil { analyzers, err = addAnalyzer(analyzers, err, func() (A, error) { - return runtime.NewRuntimeAnalyzer(common.RuntimeEmerald, &cfg.Source, cfg.Analyzers.Emerald, client, logger) + runtimeMetadata := cfg.Source.SDKParaTime(common.RuntimeEmerald) + sourceClient, err1 := sources.Runtime(ctx, common.RuntimeEmerald) + if err1 != nil { + return nil, err1 + } + return runtime.NewRuntimeAnalyzer(common.RuntimeEmerald, runtimeMetadata, cfg.Analyzers.Emerald, sourceClient, dbClient, logger) }) } if cfg.Analyzers.Sapphire != nil { analyzers, err = addAnalyzer(analyzers, err, func() (A, error) { - return runtime.NewRuntimeAnalyzer(common.RuntimeSapphire, &cfg.Source, cfg.Analyzers.Sapphire, client, logger) + runtimeMetadata := cfg.Source.SDKParaTime(common.RuntimeSapphire) + sourceClient, err1 := sources.Runtime(ctx, common.RuntimeSapphire) + if err1 != nil { + return nil, err1 + } + return runtime.NewRuntimeAnalyzer(common.RuntimeSapphire, runtimeMetadata, cfg.Analyzers.Sapphire, sourceClient, dbClient, logger) }) } if cfg.Analyzers.Cipher != nil { analyzers, err = addAnalyzer(analyzers, err, func() (A, error) { - return runtime.NewRuntimeAnalyzer(common.RuntimeCipher, &cfg.Source, cfg.Analyzers.Cipher, client, logger) + runtimeMetadata := cfg.Source.SDKParaTime(common.RuntimeCipher) + sourceClient, err1 := sources.Runtime(ctx, common.RuntimeCipher) + if err1 != nil { + return nil, err1 + } + return runtime.NewRuntimeAnalyzer(common.RuntimeCipher, runtimeMetadata, cfg.Analyzers.Cipher, sourceClient, dbClient, logger) }) } if cfg.Analyzers.EmeraldEvmTokens != nil { analyzers, err = addAnalyzer(analyzers, err, func() (A, error) { - return evmtokens.NewMain(common.RuntimeEmerald, &cfg.Source, client, logger) + sourceClient, err1 := sources.Runtime(ctx, common.RuntimeEmerald) + if err1 != nil { + return nil, err1 + } + return evmtokens.NewMain(common.RuntimeEmerald, sourceClient, dbClient, logger) }) } if cfg.Analyzers.SapphireEvmTokens != nil { analyzers, err = addAnalyzer(analyzers, err, func() (A, error) { - return evmtokens.NewMain(common.RuntimeSapphire, &cfg.Source, client, logger) + sourceClient, err1 := sources.Runtime(ctx, common.RuntimeSapphire) + if err1 != nil { + return nil, err1 + } + return evmtokens.NewMain(common.RuntimeSapphire, sourceClient, dbClient, logger) }) } if cfg.Analyzers.EmeraldEvmTokenBalances != nil { analyzers, err = addAnalyzer(analyzers, err, func() (A, error) { - return evmtokenbalances.NewMain(common.RuntimeEmerald, &cfg.Source, client, logger) + sourceClient, err1 := sources.Runtime(ctx, common.RuntimeEmerald) + if err1 != nil { + return nil, err1 + } + return evmtokenbalances.NewMain(common.RuntimeEmerald, sourceClient, dbClient, logger) }) } if cfg.Analyzers.SapphireEvmTokenBalances != nil { analyzers, err = addAnalyzer(analyzers, err, func() (A, error) { - return evmtokenbalances.NewMain(common.RuntimeSapphire, &cfg.Source, client, logger) + sourceClient, err1 := sources.Runtime(ctx, common.RuntimeSapphire) + if err1 != nil { + return nil, err1 + } + return evmtokenbalances.NewMain(common.RuntimeSapphire, sourceClient, dbClient, logger) }) } if cfg.Analyzers.MetadataRegistry != nil { analyzers, err = addAnalyzer(analyzers, err, func() (A, error) { - return analyzer.NewMetadataRegistryAnalyzer(cfg.Analyzers.MetadataRegistry, client, logger) + return analyzer.NewMetadataRegistryAnalyzer(cfg.Analyzers.MetadataRegistry, dbClient, logger) }) } if cfg.Analyzers.AggregateStats != nil { analyzers, err = addAnalyzer(analyzers, err, func() (A, error) { - return analyzer.NewAggregateStatsAnalyzer(cfg.Analyzers.AggregateStats, client, logger) + return analyzer.NewAggregateStatsAnalyzer(cfg.Analyzers.AggregateStats, dbClient, logger) }) } if err != nil { @@ -230,8 +331,9 @@ func NewService(cfg *config.AnalysisConfig) (*Service, error) { return &Service{ Analyzers: analyzers, - target: client, - logger: logger, + sources: sources, + target: dbClient, + logger: logger, }, nil } @@ -283,7 +385,14 @@ func (a *Service) Start() { // cleanup cleans up resources used by the service. func (a *Service) cleanup() { + if err := a.sources.Close(); err != nil { + a.logger.Error("failed to cleanly close data source", + "firstErr", err.Error(), + ) + } + a.logger.Info("all source connections have closed cleanly") a.target.Close() + a.logger.Info("indexer db connection closed cleanly") } // Register registers the process sub-command.