Skip to content

Commit

Permalink
do not close source in analyzers; rename vars; update comments
Browse files Browse the repository at this point in the history
  • Loading branch information
Andrew7234 committed May 4, 2023
1 parent eb28a99 commit d6a7fd4
Show file tree
Hide file tree
Showing 5 changed files with 29 additions and 54 deletions.
10 changes: 0 additions & 10 deletions analyzer/consensus/consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,6 @@ func NewMain(cfg *config.BlockBasedAnalyzerConfig, genesisChainContext string, s

// Start starts the main consensus analyzer.
func (m *Main) Start(ctx context.Context) {
defer m.cleanup()

// Get block to be indexed.
var height int64

Expand Down Expand Up @@ -171,14 +169,6 @@ func (m *Main) Start(ctx context.Context) {
m.cfg.Range.From, m.cfg.Range.To))
}

func (m *Main) cleanup() {
if err := m.cfg.Source.Close(); err != nil {
m.logger.Error("failed to cleanly close consensus data source",
"err", err.Error(),
)
}
}

// Name returns the name of the Main.
func (m *Main) Name() string {
return ConsensusAnalyzerName
Expand Down
9 changes: 0 additions & 9 deletions analyzer/evmtokenbalances/evm_token_balances.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,8 +247,6 @@ func (m Main) processBatch(ctx context.Context) (int, error) {
}

func (m Main) Start(ctx context.Context) {
defer m.cleanup()

backoff, err := util.NewBackoff(
100*time.Millisecond,
// Cap the timeout at the expected round time. All runtimes currently have the same round time.
Expand All @@ -267,7 +265,6 @@ func (m Main) Start(ctx context.Context) {
// Process another batch of token balances.
case <-ctx.Done():
m.logger.Warn("shutting down evm_token_balances analyzer", "reason", ctx.Err())
m.cleanup()
return
}

Expand All @@ -289,12 +286,6 @@ func (m Main) Start(ctx context.Context) {
}
}

func (m *Main) cleanup() {
if err := m.cfg.Source.Close(); err != nil {
m.logger.Error("error closing data source", "err", err)
}
}

func (m Main) Name() string {
return EvmTokenBalancesAnalyzerPrefix + string(m.cfg.RuntimeName)
}
8 changes: 0 additions & 8 deletions analyzer/evmtokens/evm_tokens.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,8 +189,6 @@ func (m Main) processBatch(ctx context.Context) (int, error) {
}

func (m Main) Start(ctx context.Context) {
defer m.cleanup()

backoff, err := util.NewBackoff(
100*time.Millisecond,
// Cap the timeout at the expected round time. All runtimes currently have the same round time.
Expand Down Expand Up @@ -230,12 +228,6 @@ func (m Main) Start(ctx context.Context) {
}
}

func (m *Main) cleanup() {
if err := m.cfg.Source.Close(); err != nil {
m.logger.Error("error closing data source", "err", err)
}
}

func (m Main) Name() string {
return EvmTokensAnalyzerPrefix + string(m.cfg.RuntimeName)
}
14 changes: 2 additions & 12 deletions analyzer/runtime/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ var _ analyzer.Analyzer = (*Main)(nil)
// NewRuntimeAnalyzer returns a new main analyzer for a runtime.
func NewRuntimeAnalyzer(
runtime common.Runtime,
sdkPT *sdkConfig.ParaTime,
runtimeMetadata *sdkConfig.ParaTime,
cfg *config.BlockBasedAnalyzerConfig,
sourceClient *source.RuntimeClient,
target storage.TargetStorage,
Expand All @@ -52,7 +52,7 @@ func NewRuntimeAnalyzer(
}
ac := analyzer.RuntimeConfig{
RuntimeName: runtime,
ParaTime: sdkPT,
ParaTime: runtimeMetadata,
Range: roundRange,
Source: sourceClient,
}
Expand All @@ -66,8 +66,6 @@ func NewRuntimeAnalyzer(
}

func (m *Main) Start(ctx context.Context) {
defer m.cleanup()

if err := m.prework(ctx); err != nil {
m.logger.Error("error doing prework",
"err", err,
Expand Down Expand Up @@ -140,14 +138,6 @@ func (m *Main) Start(ctx context.Context) {
m.cfg.Range.From, m.cfg.Range.To))
}

func (m *Main) cleanup() {
if err := m.cfg.Source.Close(); err != nil {
m.logger.Error("failed to cleanly close consensus data source",
"err", err.Error(),
)
}
}

// Name returns the name of the Main.
func (m *Main) Name() string {
return string(m.cfg.RuntimeName)
Expand Down
42 changes: 27 additions & 15 deletions cmd/analyzer/analyzer.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,19 +143,27 @@ type Service struct {
logger *log.Logger
}

// sourceFactory stores the sources used by all the analyzers in a Service.
// sourceFactory stores singletons of the sources used by all the analyzers in a Service.
// This enables re-use of node connections as well as graceful shutdown.
// Note: NOT thread safe.
type sourceFactory struct {
cfg config.SourceConfig

consensus *source.ConsensusClient
runtimes map[common.Runtime]*source.RuntimeClient
}

func newSourceFactory(cfg config.SourceConfig) *sourceFactory {
return &sourceFactory{
cfg: cfg,
runtimes: make(map[common.Runtime]*source.RuntimeClient),
}
}

func (s *sourceFactory) Close() error {
var firstErr error
if s.consensus != nil {
if err := s.consensus.Close(); err != nil && firstErr == nil {
if err := s.consensus.Close(); err != nil {
firstErr = err
}
}
Expand Down Expand Up @@ -218,9 +226,7 @@ func NewService(cfg *config.AnalysisConfig) (*Service, error) {
logger := cmdCommon.Logger().WithModule(moduleName)

// Initialize source storage.
sources := sourceFactory{
cfg: cfg.Source,
}
sources := newSourceFactory(cfg.Source)

// Initialize target storage.
dbClient, err := cmdCommon.NewClient(cfg.Storage, logger)
Expand All @@ -232,42 +238,42 @@ func NewService(cfg *config.AnalysisConfig) (*Service, error) {
analyzers := map[string]A{}
if cfg.Analyzers.Consensus != nil {
analyzers, err = addAnalyzer(analyzers, err, func() (A, error) {
chainContext := cfg.Source.History().CurrentRecord().ChainContext
genesisChainContext := cfg.Source.History().CurrentRecord().ChainContext
sourceClient, err1 := sources.Consensus(ctx)
if err1 != nil {
return nil, err1
}
return consensus.NewMain(cfg.Analyzers.Consensus, chainContext, sourceClient, dbClient, logger)
return consensus.NewMain(cfg.Analyzers.Consensus, genesisChainContext, sourceClient, dbClient, logger)
})
}
if cfg.Analyzers.Emerald != nil {
analyzers, err = addAnalyzer(analyzers, err, func() (A, error) {
sdkPT := cfg.Source.SDKParaTime(common.RuntimeEmerald)
runtimeMetadata := cfg.Source.SDKParaTime(common.RuntimeEmerald)
sourceClient, err1 := sources.Runtime(ctx, common.RuntimeEmerald)
if err1 != nil {
return nil, err1
}
return runtime.NewRuntimeAnalyzer(common.RuntimeEmerald, sdkPT, cfg.Analyzers.Emerald, sourceClient, dbClient, logger)
return runtime.NewRuntimeAnalyzer(common.RuntimeEmerald, runtimeMetadata, cfg.Analyzers.Emerald, sourceClient, dbClient, logger)
})
}
if cfg.Analyzers.Sapphire != nil {
analyzers, err = addAnalyzer(analyzers, err, func() (A, error) {
sdkPT := cfg.Source.SDKParaTime(common.RuntimeSapphire)
runtimeMetadata := cfg.Source.SDKParaTime(common.RuntimeSapphire)
sourceClient, err1 := sources.Runtime(ctx, common.RuntimeSapphire)
if err1 != nil {
return nil, err1
}
return runtime.NewRuntimeAnalyzer(common.RuntimeSapphire, sdkPT, cfg.Analyzers.Sapphire, sourceClient, dbClient, logger)
return runtime.NewRuntimeAnalyzer(common.RuntimeSapphire, runtimeMetadata, cfg.Analyzers.Sapphire, sourceClient, dbClient, logger)
})
}
if cfg.Analyzers.Cipher != nil {
analyzers, err = addAnalyzer(analyzers, err, func() (A, error) {
sdkPT := cfg.Source.SDKParaTime(common.RuntimeCipher)
runtimeMetadata := cfg.Source.SDKParaTime(common.RuntimeCipher)
sourceClient, err1 := sources.Runtime(ctx, common.RuntimeCipher)
if err1 != nil {
return nil, err1
}
return runtime.NewRuntimeAnalyzer(common.RuntimeCipher, sdkPT, cfg.Analyzers.Cipher, sourceClient, dbClient, logger)
return runtime.NewRuntimeAnalyzer(common.RuntimeCipher, runtimeMetadata, cfg.Analyzers.Cipher, sourceClient, dbClient, logger)
})
}
if cfg.Analyzers.EmeraldEvmTokens != nil {
Expand Down Expand Up @@ -325,7 +331,7 @@ func NewService(cfg *config.AnalysisConfig) (*Service, error) {
return &Service{
Analyzers: analyzers,

sources: &sources,
sources: sources,
target: dbClient,
logger: logger,
}, nil
Expand Down Expand Up @@ -379,8 +385,14 @@ func (a *Service) Start() {

// cleanup cleans up resources used by the service.
func (a *Service) cleanup() {
a.sources.Close()
if err := a.sources.Close(); err != nil {
a.logger.Error("failed to cleanly close data source",
"firstErr", err.Error(),
)
}
a.logger.Info("all source connections have closed cleanly")
a.target.Close()
a.logger.Info("indexer db connection closed cleanly")
}

// Register registers the process sub-command.
Expand Down

0 comments on commit d6a7fd4

Please sign in to comment.