From a146bf9894887c73cd784fa1dcedd2fd4732a0f9 Mon Sep 17 00:00:00 2001 From: ptrus Date: Tue, 4 Jul 2023 10:39:34 +0200 Subject: [PATCH] tx_volume stats: Produce windowed results, compute more efficiently --- analyzer/aggregate_stats.go | 277 ----------------- analyzer/aggregate_stats/aggregate_stats.go | 286 ++++++++++++++++++ analyzer/aggregate_stats/stats_computation.go | 84 +++++ analyzer/queries/queries.go | 65 +++- api/errors.go | 6 +- api/middleware.go | 9 +- api/spec/v1.yaml | 30 +- cmd/analyzer/analyzer.go | 3 +- storage/client/client.go | 30 +- storage/client/queries/queries.go | 23 +- storage/migrations/03_agg_stats.up.sql | 2 +- .../migrations/08_agg_stats_refactor.up.sql | 49 +++ .../e2e_regression/expected/bucket_size.body | 4 - .../expected/emerald_tx_volume.body | 4 +- .../expected/nonstandard_bucket_size.body | 4 - .../expected/nonstandard_bucket_size.headers | 6 - .../expected/nonstandard_window_size.body | 3 + .../expected/nonstandard_window_size.headers | 7 + tests/e2e_regression/expected/tx_volume.body | 4 +- .../e2e_regression/expected/window_size.body | 4 + ...ucket_size.headers => window_size.headers} | 0 tests/e2e_regression/run.sh | 6 +- 22 files changed, 555 insertions(+), 351 deletions(-) delete mode 100644 analyzer/aggregate_stats.go create mode 100644 analyzer/aggregate_stats/aggregate_stats.go create mode 100644 analyzer/aggregate_stats/stats_computation.go create mode 100644 storage/migrations/08_agg_stats_refactor.up.sql delete mode 100644 tests/e2e_regression/expected/bucket_size.body delete mode 100644 tests/e2e_regression/expected/nonstandard_bucket_size.body delete mode 100644 tests/e2e_regression/expected/nonstandard_bucket_size.headers create mode 100644 tests/e2e_regression/expected/nonstandard_window_size.body create mode 100644 tests/e2e_regression/expected/nonstandard_window_size.headers create mode 100644 tests/e2e_regression/expected/window_size.body rename tests/e2e_regression/expected/{bucket_size.headers => window_size.headers} (100%) diff --git a/analyzer/aggregate_stats.go b/analyzer/aggregate_stats.go deleted file mode 100644 index 6fba3b397..000000000 --- a/analyzer/aggregate_stats.go +++ /dev/null @@ -1,277 +0,0 @@ -package analyzer - -import ( - "context" - "errors" - "fmt" - "sync" - "time" - - "github.com/jackc/pgx/v5" - - "github.com/oasisprotocol/nexus/analyzer/queries" - "github.com/oasisprotocol/nexus/common" - "github.com/oasisprotocol/nexus/config" - "github.com/oasisprotocol/nexus/log" - "github.com/oasisprotocol/nexus/metrics" - "github.com/oasisprotocol/nexus/storage" -) - -const ( - AggregateStatsAnalyzerName = "aggregate_stats" - - // We would ideally recompute tx volume stats (which include 5-min aggregates) every 5 min, - // but the current naive implementation is too inefficient for that. Allow for the update - // to take longer. - txVolumeStatsUpdateTimeout = 10 * time.Minute // as of height 10767524, this takes ~3m15s - - // Name of the consensus layer. - layerConsensus = "consensus" - - // Limit the number of daily active accounts aggregation windows to be computed in a single - // iteration. - // This limits database batch sizes in hypothetical cases if the daily active accounts - // analyzer would be far behind latest indexed block (e.g. if it would not be enabled during initial sync). - // Likely never the case in normal operation. - dailyActiveAccountsBatchLimit = 1_000 - // Interval between daily active accounts iterations. - // The worker only computes new data and iteration with no updates are cheap - // so it's fine to run this frequently. - dailyActiveAccountsInterval = 1 * time.Minute - - // 24-hour sliding window with 5 minute step. - dailyActiveAccountsWindowSize = 24 * time.Hour - dailyActiveAccountsWindowStep = 5 * time.Minute -) - -// Layers that are tracked for daily active account stats. -// XXX: Instead of hardcoding, these could also be obtained -// by (periodically) querying the `runtime_related_transactions` table. -var dailyActiveAccountsLayers = []string{ - layerConsensus, - string(common.RuntimeEmerald), - string(common.RuntimeSapphire), - // RuntimeCipher.String(), // Enable once Cipher is supported by Nexus. -} - -type AggregateStatsAnalyzer struct { - target storage.TargetStorage - - txVolumeInterval time.Duration - - logger *log.Logger - metrics metrics.DatabaseMetrics -} - -var _ Analyzer = (*AggregateStatsAnalyzer)(nil) - -func (a *AggregateStatsAnalyzer) Name() string { - return AggregateStatsAnalyzerName -} - -func NewAggregateStatsAnalyzer(cfg *config.AggregateStatsConfig, target storage.TargetStorage, logger *log.Logger) (*AggregateStatsAnalyzer, error) { - logger.Info("starting aggregate_stats analyzer") - return &AggregateStatsAnalyzer{ - target: target, - txVolumeInterval: cfg.TxVolumeInterval, - logger: logger.With("analyzer", AggregateStatsAnalyzerName), - metrics: metrics.NewDefaultDatabaseMetrics(AggregateStatsAnalyzerName), - }, nil -} - -func (a *AggregateStatsAnalyzer) Start(ctx context.Context) { - var wg sync.WaitGroup - wg.Add(2) - - go func() { - defer wg.Done() - a.txVolumeWorker(ctx) - }() - go func() { - defer wg.Done() - a.dailyActiveAccountsWorker(ctx) - }() - - wg.Wait() -} - -// txVolumeWorker periodically updates the tx volume stats. -func (a *AggregateStatsAnalyzer) txVolumeWorker(ctx context.Context) { - for { - a.logger.Info("updating tx volume stats (5-min and daily)") - - // Note: The order matters here! Daily tx volume is a materialized view - // that's instantiated from 5 minute tx volume. - batch := &storage.QueryBatch{} - batch.Queue(queries.RefreshMin5TxVolume) - batch.Queue(queries.RefreshDailyTxVolume) - - ctxWithTimeout, cancelCtx := context.WithTimeout(ctx, txVolumeStatsUpdateTimeout) - if err := a.writeToDB(ctxWithTimeout, batch, "update_tx_volume_stats"); err != nil { - a.logger.Error("failed to trigger tx volume stats updates", "err", err) - } - a.logger.Info("updated tx volume stats", "num_materialized_views", batch.Len()) - cancelCtx() - - select { - case <-time.After(a.txVolumeInterval): - // Update stats. - case <-ctx.Done(): - a.logger.Error("shutting down tx volume worker", "reason", ctx.Err()) - return - } - } -} - -func (a *AggregateStatsAnalyzer) writeToDB(ctx context.Context, batch *storage.QueryBatch, opName string) error { - timer := a.metrics.DatabaseTimer(a.target.Name(), opName) - defer timer.ObserveDuration() - - if err := a.target.SendBatch(ctx, batch); err != nil { - a.metrics.DatabaseCounter(a.target.Name(), opName, "failure").Inc() - return err - } - a.metrics.DatabaseCounter(a.target.Name(), opName, "success").Inc() - - return nil -} - -// dailyActiveAccountsWorker computes a sliding window of daily unique active accounts. -func (a *AggregateStatsAnalyzer) dailyActiveAccountsWorker(ctx context.Context) { - windowSize := dailyActiveAccountsWindowSize - windowStep := dailyActiveAccountsWindowStep - - // Rounds the timestamp down to the nearest windowStep interval. - // e.g. if windowStep is 5 minutes, then 12:34:56 will be rounded down to 12:30:00. - floorWindow := func(ts *time.Time) time.Time { - return ts.Truncate(windowStep) - } - - for { - for _, layer := range dailyActiveAccountsLayers { - a.logger.Info("updating daily active account stats", "layer", layer) - - // Query the latest indexed block timestamp. - latestBlockTs, err := a.latestBlockTs(ctx, layer) - if err != nil { - // Log this as info as this can be expected when there's no indexed blocks yet, or if any of the layers are not being indexed. - a.logger.Info("failed querying latest indexed block timestamp, skipping iteration", "layer", layer, "err", err) - continue - } - latestPossibleWindow := floorWindow(latestBlockTs) - - // Start at the latest already computed window, or at the earliest indexed - // block if no stats have been computed yet. - var latestStatsTs time.Time - err = a.target.QueryRow( - ctx, - queries.LatestDailyAccountStats, - layer, - ).Scan(&latestStatsTs) - switch { - case err == nil: - // Continues below. - case errors.Is(pgx.ErrNoRows, err): - // No stats yet. Start at the earliest indexed block. - var earliestBlockTs *time.Time - earliestBlockTs, err = a.earliestBlockTs(ctx, layer) - if err != nil { - a.logger.Error("failed querying earliest indexed block timestamp", "layer", layer, "err", err) - continue - } - latestStatsTs = floorWindow(earliestBlockTs) - default: - a.logger.Error("failed querying latest daily accounts stats window", "layer", layer, "err", err) - continue - } - - // Compute daily unique active accounts for windows until the latest indexed block. - batch := &storage.QueryBatch{} - for { - nextWindow := latestStatsTs.Add(windowStep) - if !latestPossibleWindow.After(nextWindow) { - // Cannot yet compute stats for next window. Stop. - break - } - windowStart := nextWindow.Add(-windowSize) - windowEnd := nextWindow - - // Compute active accounts for the provided time window. - var acctsRow pgx.Row - switch layer { - case layerConsensus: - acctsRow = a.target.QueryRow( - ctx, - queries.ConsensusActiveAccounts, - windowStart, // from - windowEnd, // to - ) - default: - acctsRow = a.target.QueryRow( - ctx, - queries.RuntimeActiveAccounts, - layer, // runtime - windowStart, // from - windowEnd, // to - ) - } - var activeAccounts uint64 - if err := acctsRow.Scan(&activeAccounts); err != nil { - a.logger.Error("failed to compute daily active accounts", "layer", layer, "window_start", windowStart, "window_end", windowEnd, "err", err) - continue - } - - // Insert computed active accounts. - batch.Queue(queries.InsertDailyAccountStats, layer, windowEnd.UTC(), activeAccounts) - if batch.Len() > dailyActiveAccountsBatchLimit { - break - } - latestStatsTs = nextWindow - } - if err := a.writeToDB(ctx, batch, fmt.Sprintf("update_daily_active_account_%s_stats", layer)); err != nil { - a.logger.Error("failed to insert daily active account stats update", "err", err, "layer", layer) - } - a.logger.Info("updated daily active account stats", "layer", layer, "num_inserts", batch.Len()) - } - - select { - case <-time.After(dailyActiveAccountsInterval): - // Update stats again. - case <-ctx.Done(): - a.logger.Error("shutting down daily active accounts worker", "reason", ctx.Err()) - return - } - } -} - -// Queries the latest indexed block of the specified layer. -func (a *AggregateStatsAnalyzer) latestBlockTs(ctx context.Context, layer string) (*time.Time, error) { - var latestBlockTsRow pgx.Row - switch layer { - case layerConsensus: - latestBlockTsRow = a.target.QueryRow(ctx, queries.LatestConsensusBlockTime) - default: - latestBlockTsRow = a.target.QueryRow(ctx, queries.LatestRuntimeBlockTime, layer) - } - var latestBlockTs *time.Time - if err := latestBlockTsRow.Scan(&latestBlockTs); err != nil { // Fails with ErrNoRows if no blocks. - return nil, err - } - return latestBlockTs, nil -} - -// Queries the earliest indexed block for the specified layer. -func (a *AggregateStatsAnalyzer) earliestBlockTs(ctx context.Context, layer string) (*time.Time, error) { - var earliestBlockTsRow pgx.Row - switch layer { - case layerConsensus: - earliestBlockTsRow = a.target.QueryRow(ctx, queries.EarliestConsensusBlockTime) - default: - earliestBlockTsRow = a.target.QueryRow(ctx, queries.EarliestRuntimeBlockTime, layer) - } - var earliestBlockTs *time.Time - if err := earliestBlockTsRow.Scan(&earliestBlockTs); err != nil { // Fails with ErrNoRows if no blocks. - return nil, err - } - return earliestBlockTs, nil -} diff --git a/analyzer/aggregate_stats/aggregate_stats.go b/analyzer/aggregate_stats/aggregate_stats.go new file mode 100644 index 000000000..e8d481fe6 --- /dev/null +++ b/analyzer/aggregate_stats/aggregate_stats.go @@ -0,0 +1,286 @@ +package aggregate_stats + +import ( + "context" + "errors" + "fmt" + "sync" + "time" + + "github.com/jackc/pgx/v5" + + "github.com/oasisprotocol/nexus/analyzer" + "github.com/oasisprotocol/nexus/analyzer/queries" + "github.com/oasisprotocol/nexus/common" + "github.com/oasisprotocol/nexus/config" + "github.com/oasisprotocol/nexus/log" + "github.com/oasisprotocol/nexus/metrics" + "github.com/oasisprotocol/nexus/storage" +) + +const ( + aggregateStatsAnalyzerName = "aggregate_stats" + + // Limit the number of stats aggregation windows to be computed in a single + // iteration. + // This limits database batch sizes in cases if the stats analyzer would be far behind + // latest indexed block (e.g. if it would not be enabled during initial sync). + // Likely never the case in normal operation. + statsBatchLimit = 1_000 + + // Name of the consensus layer. + layerConsensus = "consensus" + + // Interval between stat computation iterations. + // The workers only computes new data and iteration with no updates are cheap + // so it's fine to run this frequently. + statsComputationInterval = 1 * time.Minute +) + +// Layers that are tracked for aggregate stats. +// XXX: Instead of hardcoding, these could also be obtained +// by (periodically) querying the `runtime_related_transactions` table. +var statsLayers = []string{ + layerConsensus, + string(common.RuntimeEmerald), + string(common.RuntimeSapphire), + // RuntimeCipher.String(), // Enable once Cipher is supported by the indexer. +} + +type aggregateStatsAnalyzer struct { + target storage.TargetStorage + + txVolumeInterval time.Duration + + logger *log.Logger + metrics metrics.DatabaseMetrics +} + +var _ analyzer.Analyzer = (*aggregateStatsAnalyzer)(nil) + +func (a *aggregateStatsAnalyzer) Name() string { + return aggregateStatsAnalyzerName +} + +func NewAggregateStatsAnalyzer(cfg *config.AggregateStatsConfig, target storage.TargetStorage, logger *log.Logger) (analyzer.Analyzer, error) { + logger.Info("starting aggregate_stats analyzer") + return &aggregateStatsAnalyzer{ + target: target, + txVolumeInterval: cfg.TxVolumeInterval, + logger: logger.With("analyzer", aggregateStatsAnalyzerName), + metrics: metrics.NewDefaultDatabaseMetrics(aggregateStatsAnalyzerName), + }, nil +} + +func (a *aggregateStatsAnalyzer) Start(ctx context.Context) { + var wg sync.WaitGroup + wg.Add(1) + + go func() { + defer wg.Done() + a.aggregateStatsWorker(ctx) + }() + + wg.Wait() +} + +func (a *aggregateStatsAnalyzer) writeToDB(ctx context.Context, batch *storage.QueryBatch, opName string) error { + timer := a.metrics.DatabaseTimer(a.target.Name(), opName) + defer timer.ObserveDuration() + + if err := a.target.SendBatch(ctx, batch); err != nil { + a.metrics.DatabaseCounter(a.target.Name(), opName, "failure").Inc() + return err + } + a.metrics.DatabaseCounter(a.target.Name(), opName, "success").Inc() + + return nil +} + +func (a *aggregateStatsAnalyzer) aggregateStatsWorker(ctx context.Context) { + statsComputations := []*statsComputation{} + + // Compute 5-minute tx volume stats every 5 minutes for all layers. + for _, layer := range statsLayers { + statsComputations = append(statsComputations, &statsComputation{ + name: "min5_tx_volume_" + layer, + layer: layer, + dbTable: "stats.min5_tx_volume", + dbColumn: "tx_volume", + consensusStatsQuery: queries.ConsensusTxVolume, + runtimeStatsQuery: queries.RuntimeTxVolume, + windowSize: 5 * time.Minute, + windowStep: 5 * time.Minute, + target: a.target, + }) + } + // Compute daily tx volume stats every 5 minutes for all layers. + // Uses the stats.min5_tx_volume results so that it is efficient. + for _, layer := range statsLayers { + layer := layer + statsComputations = append(statsComputations, &statsComputation{ + name: "daily_tx_volume_" + layer, + layer: layer, + dbTable: "stats.daily_tx_volume", + dbColumn: "tx_volume", + consensusStatsQuery: queries.ConsensusDailyTxVolume, + runtimeStatsQuery: queries.RuntimeDailyTxVolume, + windowSize: 24 * time.Hour, + windowStep: 5 * time.Minute, + target: a.target, + customLatestAvailableTs: func(ctx context.Context, target storage.TargetStorage) (*time.Time, error) { + // Latest available data is the latest computed 5-minute tx volume window. + latestTsRow := a.target.QueryRow(ctx, queries.LatestMin5TxVolume, layer) + var latestTs *time.Time + if err := latestTsRow.Scan(&latestTs); err != nil { // Fails with ErrNoRows if no stats. + return nil, err + } + return latestTs, nil + }, + }) + } + + // Compute daily active accounts every 5 minutes for all layers. + for _, layer := range statsLayers { + statsComputations = append(statsComputations, &statsComputation{ + name: "daily_active_accounts" + "_" + layer, + layer: layer, + dbTable: "stats.daily_active_accounts", + dbColumn: "active_accounts", + consensusStatsQuery: queries.ConsensusActiveAccounts, + runtimeStatsQuery: queries.RuntimeActiveAccounts, + windowSize: 24 * time.Hour, + windowStep: 5 * time.Minute, + target: a.target, + }) + } + + for { + for _, statsComputation := range statsComputations { + logger := a.logger.With("name", statsComputation.Name(), "layer", statsComputation.Layer()) + + windowSize := statsComputation.WindowSize() + windowStep := statsComputation.WindowStep() + // Rounds the timestamp down to the nearest windowStep interval. + // e.g. if windowStep is 5 minutes, then 12:34:56 will be rounded down to 12:30:00. + floorWindow := func(ts *time.Time) time.Time { + return ts.Truncate(windowStep) + } + + logger.Info("updating stats") + + // First we find the start and end timestamps for which we can compute stats. + // Start at the latest already computed window, or at the earliest indexed + // block if no stats have been computed yet. + latestComputed, err := statsComputation.LatestComputedTs(ctx) + switch { + case err == nil: + // Continues below. + case errors.Is(pgx.ErrNoRows, err): + // No stats yet. Start at the earliest indexed block. + var earliestBlockTs *time.Time + earliestBlockTs, err = a.earliestBlockTs(ctx, statsComputation.Layer()) + if err != nil { + logger.Error("failed querying earliest indexed block timestamp", "err", err) + continue + } + latestComputed = floorWindow(earliestBlockTs) + default: + logger.Error("failed querying latest computed stats window", "err", err) + continue + } + + // End at the latest indexed block timestamp rounded down to the windowStep interval. + // Unless a customLatestAvailableTs function is provided, in which case use that. + var latestPossibleWindow time.Time + if fn := statsComputation.customLatestAvailableTs; fn == nil { + // Use the latest indexed block timestamp for the layer. + latestBlockTs, err := a.latestBlockTs(ctx, statsComputation.Layer()) + if err != nil { + // Log this as info as this can be expected when there's no indexed blocks yet, or if any of the layers are not being indexed. + logger.Info("failed querying latest indexed block timestamp, skipping iteration", "err", err) + continue + } + latestPossibleWindow = floorWindow(latestBlockTs) + } else { + // Compute the latest available window using the provided custom function. + ts, err := fn(ctx, a.target) + if err != nil { + // Log this as info as this can be expected when there's no indexed blocks yet, or if any of the layers are not being indexed. + logger.Info("failed querying the latest possible window, skipping iteration", "err", err) + continue + } + latestPossibleWindow = floorWindow(ts) + } + + // Compute stats for all windows between `latestComputed` and `latestPossibleWindow`. + batch := &storage.QueryBatch{} + for { + nextWindow := latestComputed.Add(windowStep) + if !latestPossibleWindow.After(nextWindow) { + // Cannot yet compute stats for this window. Stop. + break + } + windowStart := nextWindow.Add(-windowSize) + windowEnd := nextWindow + + // Compute stats for the provided time window. + queries, err := statsComputation.ComputeStats(ctx, windowStart, windowEnd) + if err != nil { + logger.Error("failed to compute stat for the window", "window_start", windowStart, "window_end", windowEnd, "err", err) + break + } + batch.Extend(queries) + + if batch.Len() > statsBatchLimit { + break + } + latestComputed = nextWindow + } + if err := a.writeToDB(ctx, batch, fmt.Sprintf("update_stats_%s", a.Name())); err != nil { + logger.Error("failed to insert computed stats update", "err", err) + } + logger.Info("updated stats", "num_inserts", batch.Len()) + } + + select { + case <-time.After(statsComputationInterval): + // Update stats again. + case <-ctx.Done(): + a.logger.Error("shutting down aggregate stats worker", "reason", ctx.Err()) + return + } + } +} + +// Queries the latest indexed block of the specified layer. +func (a *aggregateStatsAnalyzer) latestBlockTs(ctx context.Context, layer string) (*time.Time, error) { + var latestBlockTsRow pgx.Row + switch layer { + case layerConsensus: + latestBlockTsRow = a.target.QueryRow(ctx, queries.LatestConsensusBlockTime) + default: + latestBlockTsRow = a.target.QueryRow(ctx, queries.LatestRuntimeBlockTime, layer) + } + var latestBlockTs *time.Time + if err := latestBlockTsRow.Scan(&latestBlockTs); err != nil { // Fails with ErrNoRows if no blocks. + return nil, err + } + return latestBlockTs, nil +} + +// Queries the earliest indexed block for the specified layer. +func (a *aggregateStatsAnalyzer) earliestBlockTs(ctx context.Context, layer string) (*time.Time, error) { + var earliestBlockTsRow pgx.Row + switch layer { + case layerConsensus: + earliestBlockTsRow = a.target.QueryRow(ctx, queries.EarliestConsensusBlockTime) + default: + earliestBlockTsRow = a.target.QueryRow(ctx, queries.EarliestRuntimeBlockTime, layer) + } + var earliestBlockTs *time.Time + if err := earliestBlockTsRow.Scan(&earliestBlockTs); err != nil { // Fails with ErrNoRows if no blocks. + return nil, err + } + return earliestBlockTs, nil +} diff --git a/analyzer/aggregate_stats/stats_computation.go b/analyzer/aggregate_stats/stats_computation.go new file mode 100644 index 000000000..3b7839724 --- /dev/null +++ b/analyzer/aggregate_stats/stats_computation.go @@ -0,0 +1,84 @@ +package aggregate_stats + +import ( + "context" + "fmt" + "time" + + "github.com/jackc/pgx/v5" + + "github.com/oasisprotocol/nexus/analyzer/queries" + "github.com/oasisprotocol/nexus/storage" +) + +type statsComputation struct { + name string + target storage.TargetStorage + layer string + dbTable string + dbColumn string + consensusStatsQuery string + runtimeStatsQuery string + windowSize time.Duration + windowStep time.Duration + + customLatestAvailableTs func(ctx context.Context, target storage.TargetStorage) (*time.Time, error) +} + +func (s *statsComputation) WindowSize() time.Duration { + return s.windowSize +} + +func (s *statsComputation) WindowStep() time.Duration { + return s.windowStep +} + +func (s *statsComputation) Name() string { + return s.name +} + +func (s *statsComputation) Layer() string { + return s.layer +} + +func (s *statsComputation) LatestComputedTs(ctx context.Context) (time.Time, error) { + var latestStatsTs time.Time + err := s.target.QueryRow( + ctx, + fmt.Sprintf(queries.LatestStatsComputation, s.dbTable), + s.layer, + ).Scan(&latestStatsTs) + return latestStatsTs, err +} + +func (s *statsComputation) ComputeStats(ctx context.Context, windowStart time.Time, windowEnd time.Time) (*storage.QueryBatch, error) { + batch := &storage.QueryBatch{} + + var row pgx.Row + switch s.layer { + case layerConsensus: + row = s.target.QueryRow( + ctx, + s.consensusStatsQuery, + windowStart, // from + windowEnd, // to + ) + default: + row = s.target.QueryRow( + ctx, + s.runtimeStatsQuery, + s.layer, // runtime + windowStart, // from + windowEnd, // to + ) + } + var result uint64 + if err := row.Scan(&result); err != nil { + return nil, err + } + + // Insert computed stat. + batch.Queue(fmt.Sprintf(queries.InsertStatsComputation, s.dbTable, s.dbColumn), s.layer, windowEnd.UTC(), result) + + return batch, nil +} diff --git a/analyzer/queries/queries.go b/analyzer/queries/queries.go index d88237353..387db2b03 100644 --- a/analyzer/queries/queries.go +++ b/analyzer/queries/queries.go @@ -615,26 +615,18 @@ var ( WHERE runtime = $1 AND contract_address = $2` - RefreshDailyTxVolume = ` - REFRESH MATERIALIZED VIEW CONCURRENTLY stats.daily_tx_volume - ` - - RefreshMin5TxVolume = ` - REFRESH MATERIALIZED VIEW CONCURRENTLY stats.min5_tx_volume - ` - - // LatestDailyAccountStats is the query to get the timestamp of the latest daily active accounts stat. - LatestDailyAccountStats = ` + // LatestStatsComputation is the query to get the timestamp of the latest computed stat window. + LatestStatsComputation = ` SELECT window_end - FROM stats.daily_active_accounts + FROM %s WHERE layer = $1 ORDER BY window_end DESC LIMIT 1 ` - // InsertDailyAccountStats is the query to insert the daily active accounts stat. - InsertDailyAccountStats = ` - INSERT INTO stats.daily_active_accounts (layer, window_end, active_accounts) + // InsertStatsComputation is the query to insert the stats computation. + InsertStatsComputation = ` + INSERT INTO %s (layer, window_end, %s) VALUES ($1, $2, $3) ` @@ -692,5 +684,50 @@ var ( FROM chain.runtime_related_transactions AS rt JOIN chain.runtime_blocks AS b ON (rt.runtime = b.runtime AND rt.tx_round = b.round) WHERE (rt.runtime = $1 AND b.timestamp >= $2::timestamptz AND b.timestamp < $3::timestamptz) + ` + + // ConsensusTxVolume is the query to get the number the number of transactions + // in the given window at the consensus layer. + ConsensusTxVolume = ` + SELECT COUNT(*) + FROM chain.blocks AS b + JOIN chain.transactions AS t ON b.height = t.block + WHERE (b.time >= $1::timestamptz AND b.time < $2::timestamptz) + ` + + // RuntimeTxVolume is the query to get the number of of transactions + // in the runtime layer within the given time range. + RuntimeTxVolume = ` + SELECT COUNT(*) + FROM chain.runtime_blocks AS b + JOIN chain.runtime_transactions AS t ON (b.round = t.round AND b.runtime = t.runtime) + WHERE (b.runtime = $1 AND b.timestamp >= $2::timestamptz AND b.timestamp < $3::timestamptz) + ` + // ConsensusDailyTxVolume is the query to get the number of transactions + // in the consensus layer within the given time range, by using the already computed + // 5 minute windowed data. + ConsensusDailyTxVolume = ` + SELECT COALESCE(SUM(t.tx_volume), 0) + FROM stats.min5_tx_volume AS t + WHERE (t.layer = 'consensus' AND t.window_end >= $1::timestamptz AND t.window_end < $2::timestamptz) + ` + + // RuntimeDailyTxVolume is the query to get the number of transactions + // in the given runtime layer within the given time range, by using the already computed + // 5 minute windowed data. + RuntimeDailyTxVolume = ` + SELECT SUM(t.tx_volume) + FROM stats.min5_tx_volume AS t + WHERE (t.layer = $1 AND t.window_end >= $2::timestamptz AND t.window_end < $3::timestamptz) + ` + + // LatestMin5TxVolume is the query to get the timestamp of the latest computed + // 5 minute windowed data for the given layer. + LatestMin5TxVolume = ` + SELECT window_end + FROM stats.min5_tx_volume as t + WHERE t.layer = $1 + ORDER BY window_end DESC + LIMIT 1 ` ) diff --git a/api/errors.go b/api/errors.go index 7eadb522a..7aec8b6b4 100644 --- a/api/errors.go +++ b/api/errors.go @@ -44,11 +44,11 @@ func HttpCodeForError(err error) int { errType := errVal.Type() switch { - case err == ErrBadChainID: + case errors.Is(err, ErrBadChainID): return http.StatusNotFound - case err == ErrBadRequest: + case errors.Is(err, ErrBadRequest): return http.StatusBadRequest - case err == ErrNotFound: + case errors.Is(err, ErrNotFound): return http.StatusNotFound case errType == reflect.TypeOf(ErrStorageError{}): return http.StatusInternalServerError diff --git a/api/middleware.go b/api/middleware.go index 56aa2aa05..2c9676f91 100644 --- a/api/middleware.go +++ b/api/middleware.go @@ -19,7 +19,8 @@ import ( var ( defaultOffset = uint64(0) defaultLimit = uint64(100) - defaultBucketSizeSeconds = uint32(3600) + defaultWindowSizeSeconds = uint32(86400) + defaultWindowStepSeconds = uint32(86400) maxLimit = uint64(1000) ) @@ -126,8 +127,10 @@ func fixDefaultsAndLimits(p any) { f.Set(reflect.ValueOf(&defaultLimit)) case "Offset": f.Set(reflect.ValueOf(&defaultOffset)) - case "BucketSizeSeconds": - f.Set(reflect.ValueOf(&defaultBucketSizeSeconds)) + case "WindowSizeSeconds": + f.Set(reflect.ValueOf(&defaultWindowSizeSeconds)) + case "WindowStepSeconds": + f.Set(reflect.ValueOf(&defaultWindowStepSeconds)) } } } diff --git a/api/spec/v1.yaml b/api/spec/v1.yaml index 4ad662721..59520e910 100644 --- a/api/spec/v1.yaml +++ b/api/spec/v1.yaml @@ -50,16 +50,16 @@ x-query-params: $ref: '#/components/schemas/Runtime' description: | The runtime which to query. - - &bucket_size_seconds + - &window_size_seconds in: query - name: bucket_size_seconds + name: window_size_seconds schema: type: integer format: uint32 default: 86400 description: | - The size of buckets into which the statistic is grouped, in seconds. - The backend supports a limited number of bucket sizes: 300 (5 minutes) and + The size of windows into which the statistic is grouped, in seconds. + The backend supports a limited number of window sizes: 300 (5 minutes) and 86400 (1 day). Requests with other values may be rejected. - &window_step_seconds in: query @@ -67,7 +67,6 @@ x-query-params: schema: type: integer format: uint32 - enum: [300, 86400] default: 86400 description: | The size of the step between returned statistic windows, in seconds. @@ -1014,7 +1013,7 @@ paths: /{runtime}/evm_tokens/{address}/holders: get: summary: | - Returns the list of holders of an EVM (ERC-20, ...) token. + Returns the list of holders of an EVM (ERC-20, ...) token. This endpoint does not verify that `address` is actually an EVM token; if it is not, it will simply return an empty list. parameters: - *limit @@ -1077,7 +1076,8 @@ paths: parameters: - *limit - *offset - - *bucket_size_seconds + - *window_size_seconds + - *window_step_seconds - in: path name: layer required: true @@ -2525,12 +2525,12 @@ components: TxVolumeList: type: object - required: [bucket_size_seconds, buckets] + required: [window_size_seconds, windows] properties: - bucket_size_seconds: + window_size_seconds: type: integer format: uint32 - buckets: + windows: type: array items: $ref: '#/components/schemas/TxVolume' @@ -2540,17 +2540,17 @@ components: TxVolume: type: object - required: [bucket_start, tx_volume] + required: [window_end, tx_volume] properties: - bucket_start: + window_end: type: string format: date-time - description: The date for this daily transaction volume measurement. + description: The end date for this daily transaction volume measurement. example: *iso_timestamp_1 tx_volume: type: integer format: uint64 - description: The transaction volume on this day. + description: The transaction volume for this window. example: 420 ActiveAccountsList: @@ -2580,7 +2580,7 @@ components: active_accounts: type: integer format: uint64 - description: The number of active accounts for the 24hour window starting at bucket_start. + description: The number of active accounts for the 24hour window ending at window_end. example: 420 responses: diff --git a/cmd/analyzer/analyzer.go b/cmd/analyzer/analyzer.go index b3d548517..df98b2590 100644 --- a/cmd/analyzer/analyzer.go +++ b/cmd/analyzer/analyzer.go @@ -17,6 +17,7 @@ import ( "github.com/spf13/cobra" "github.com/oasisprotocol/nexus/analyzer" + "github.com/oasisprotocol/nexus/analyzer/aggregate_stats" "github.com/oasisprotocol/nexus/analyzer/consensus" "github.com/oasisprotocol/nexus/analyzer/evmcontractcode" "github.com/oasisprotocol/nexus/analyzer/evmtokenbalances" @@ -347,7 +348,7 @@ func NewService(cfg *config.AnalysisConfig) (*Service, error) { } if cfg.Analyzers.AggregateStats != nil { analyzers, err = addAnalyzer(analyzers, err, func() (A, error) { - return analyzer.NewAggregateStatsAnalyzer(cfg.Analyzers.AggregateStats, dbClient, logger) + return aggregate_stats.NewAggregateStatsAnalyzer(cfg.Analyzers.AggregateStats, dbClient, logger) }) } if cfg.Analyzers.EVMContractsVerifier != nil { diff --git a/storage/client/client.go b/storage/client/client.go index 3f3b0dcd5..c2d26cee8 100644 --- a/storage/client/client.go +++ b/storage/client/client.go @@ -1586,15 +1586,23 @@ func (c *StorageClient) RuntimeStatus(ctx context.Context) (*RuntimeStatus, erro return &s, nil } -// TxVolumes returns a list of transaction volumes per time bucket. +// TxVolumes returns a list of transaction volumes per time window. func (c *StorageClient) TxVolumes(ctx context.Context, layer apiTypes.Layer, p apiTypes.GetLayerStatsTxVolumeParams) (*TxVolumeList, error) { var query string - if *p.BucketSizeSeconds == 300 { + + switch { + case *p.WindowSizeSeconds == 300 && *p.WindowStepSeconds == 300: + // 5 minute window, 5 minute step. query = queries.FineTxVolumes - } else { - var day uint32 = 86400 - p.BucketSizeSeconds = &day - query = queries.TxVolumes + case *p.WindowSizeSeconds == 86400 && *p.WindowStepSeconds == 86400: + // 1 day window, 1 day step. + query = queries.DailyTxVolumes + case *p.WindowSizeSeconds == 86400 && *p.WindowStepSeconds == 300: + // 1 day window, 5 minute step. + query = queries.FineDailyTxVolumes + default: + // Unsupported: case *p.WindowSizeSeconds == 300 && *p.WindowStepSeconds == 86400: + return nil, fmt.Errorf("invalid window size parameters: %w", apiCommon.ErrBadRequest) } rows, err := c.db.Query( @@ -1610,19 +1618,19 @@ func (c *StorageClient) TxVolumes(ctx context.Context, layer apiTypes.Layer, p a defer rows.Close() ts := TxVolumeList{ - BucketSizeSeconds: *p.BucketSizeSeconds, - Buckets: []apiTypes.TxVolume{}, + WindowSizeSeconds: *p.WindowSizeSeconds, + Windows: []apiTypes.TxVolume{}, } for rows.Next() { var t TxVolume if err := rows.Scan( - &t.BucketStart, + &t.WindowEnd, &t.TxVolume, ); err != nil { return nil, wrapError(err) } - t.BucketStart = t.BucketStart.UTC() // Ensure UTC timestamp in response. - ts.Buckets = append(ts.Buckets, t) + t.WindowEnd = t.WindowEnd.UTC() // Ensure UTC timestamp in response. + ts.Windows = append(ts.Windows, t) } return &ts, nil diff --git a/storage/client/queries/queries.go b/storage/client/queries/queries.go index 8698d1c7b..b572f2dde 100644 --- a/storage/client/queries/queries.go +++ b/storage/client/queries/queries.go @@ -528,22 +528,35 @@ const ( WHERE runtime_id = $1::text ` + // FineTxVolumes returns the fine-grained query for 5-minute sampled tx volume windows. FineTxVolumes = ` - SELECT window_start, tx_volume + SELECT window_end, tx_volume FROM stats.min5_tx_volume WHERE layer = $1::text ORDER BY - window_start DESC + window_end DESC LIMIT $2::bigint OFFSET $3::bigint ` - TxVolumes = ` - SELECT window_start, tx_volume + // FineDailyTxVolumes returns the query for daily tx volume windows. + FineDailyTxVolumes = ` + SELECT window_end, tx_volume FROM stats.daily_tx_volume WHERE layer = $1::text ORDER BY - window_start DESC + window_end DESC + LIMIT $2::bigint + OFFSET $3::bigint + ` + + // DailyTxVolumes returns the query for daily sampled daily tx volume windows. + DailyTxVolumes = ` + SELECT date_trunc('day', window_end) as window_end, tx_volume + FROM stats.daily_tx_volume + WHERE (layer = $1::text AND (window_end AT TIME ZONE 'UTC')::time = '00:00:00') + ORDER BY + window_end DESC LIMIT $2::bigint OFFSET $3::bigint ` diff --git a/storage/migrations/03_agg_stats.up.sql b/storage/migrations/03_agg_stats.up.sql index 9775a0c51..2a563bf2b 100644 --- a/storage/migrations/03_agg_stats.up.sql +++ b/storage/migrations/03_agg_stats.up.sql @@ -47,7 +47,7 @@ CREATE MATERIALIZED VIEW stats.daily_tx_volume AS GROUP BY 1, 2; CREATE UNIQUE INDEX ix_stats_daily_tx_volume_window_start ON stats.daily_tx_volume (layer, window_start); -- A unique index is required for CONCURRENTLY refreshing the view. --- daily_active_accounts stores the sliding widnow for the number of unique accounts per day +-- daily_active_accounts stores the sliding window for the number of unique accounts per day -- that were involved in transactions. CREATE TABLE stats.daily_active_accounts ( diff --git a/storage/migrations/08_agg_stats_refactor.up.sql b/storage/migrations/08_agg_stats_refactor.up.sql new file mode 100644 index 000000000..e37c64bf4 --- /dev/null +++ b/storage/migrations/08_agg_stats_refactor.up.sql @@ -0,0 +1,49 @@ +-- Drop stats materialized views and replace with tables. +-- Also migrate the data from the old materialized views to the new tables. + +BEGIN; + +-- First create new tables. The _new suffix is used to avoid name conflicts with the old materialized views, +-- and will be dropped at the end of the migration. + +-- min5_tx_volume stores the 5-minute tx volumes in 5-minute windows, per layer. +CREATE TABLE stats.min5_tx_volume_new +( + layer TEXT NOT NULL, + window_end TIMESTAMP WITH TIME ZONE NOT NULL, + tx_volume uint63 NOT NULL, + + PRIMARY KEY (layer, window_end) +); +-- daily_tx_volume stores the sliding window for the number of transactions per day per layer. +CREATE TABLE stats.daily_tx_volume_new +( + layer TEXT NOT NULL, + window_end TIMESTAMP WITH TIME ZONE NOT NULL, + tx_volume uint63 NOT NULL, + + PRIMARY KEY (layer, window_end) +); +-- Index for efficient query of the daily samples. +CREATE INDEX ix_stats_daily_tx_volume_daily_windows ON stats.daily_tx_volume_new (layer, window_end) WHERE ((window_end AT TIME ZONE 'UTC')::time = '00:00:00'); + +-- Migrate the data from the old materialized views to the new tables. +INSERT INTO stats.min5_tx_volume_new +SELECT layer, window_start + INTERVAL '5 minute', tx_volume +FROM stats.min5_tx_volume; + +INSERT INTO stats.daily_tx_volume_new +SELECT layer, window_start + INTERVAL '1 day', tx_volume +FROM stats.daily_tx_volume; + +-- Drop the old materialized views. +DROP MATERIALIZED VIEW IF EXISTS stats.daily_tx_volume; +DROP MATERIALIZED VIEW IF EXISTS stats.min5_tx_volume; + +-- Drop the _new suffixes from the new tables. +ALTER TABLE stats.min5_tx_volume_new +RENAME TO min5_tx_volume; +ALTER TABLE stats.daily_tx_volume_new +RENAME TO daily_tx_volume; + +COMMIT; diff --git a/tests/e2e_regression/expected/bucket_size.body b/tests/e2e_regression/expected/bucket_size.body deleted file mode 100644 index 2c4c93492..000000000 --- a/tests/e2e_regression/expected/bucket_size.body +++ /dev/null @@ -1,4 +0,0 @@ -{ - "bucket_size_seconds": 300, - "buckets": [] -} diff --git a/tests/e2e_regression/expected/emerald_tx_volume.body b/tests/e2e_regression/expected/emerald_tx_volume.body index 8673db24c..792a4bf1e 100644 --- a/tests/e2e_regression/expected/emerald_tx_volume.body +++ b/tests/e2e_regression/expected/emerald_tx_volume.body @@ -1,4 +1,4 @@ { - "bucket_size_seconds": 86400, - "buckets": [] + "window_size_seconds": 86400, + "windows": [] } diff --git a/tests/e2e_regression/expected/nonstandard_bucket_size.body b/tests/e2e_regression/expected/nonstandard_bucket_size.body deleted file mode 100644 index 8673db24c..000000000 --- a/tests/e2e_regression/expected/nonstandard_bucket_size.body +++ /dev/null @@ -1,4 +0,0 @@ -{ - "bucket_size_seconds": 86400, - "buckets": [] -} diff --git a/tests/e2e_regression/expected/nonstandard_bucket_size.headers b/tests/e2e_regression/expected/nonstandard_bucket_size.headers deleted file mode 100644 index 1cce01c2d..000000000 --- a/tests/e2e_regression/expected/nonstandard_bucket_size.headers +++ /dev/null @@ -1,6 +0,0 @@ -HTTP/1.1 200 OK -Content-Type: application/json -Vary: Origin -Date: UNINTERESTING -Content-Length: UNINTERESTING - diff --git a/tests/e2e_regression/expected/nonstandard_window_size.body b/tests/e2e_regression/expected/nonstandard_window_size.body new file mode 100644 index 000000000..076180344 --- /dev/null +++ b/tests/e2e_regression/expected/nonstandard_window_size.body @@ -0,0 +1,3 @@ +{ + "msg": "invalid window size parameters: invalid request parameters" +} diff --git a/tests/e2e_regression/expected/nonstandard_window_size.headers b/tests/e2e_regression/expected/nonstandard_window_size.headers new file mode 100644 index 000000000..c52b02ddd --- /dev/null +++ b/tests/e2e_regression/expected/nonstandard_window_size.headers @@ -0,0 +1,7 @@ +HTTP/1.1 400 Bad Request +Content-Type: application/json; charset=utf-8 +Vary: Origin +X-Content-Type-Options: nosniff +Date: UNINTERESTING +Content-Length: UNINTERESTING + diff --git a/tests/e2e_regression/expected/tx_volume.body b/tests/e2e_regression/expected/tx_volume.body index 8673db24c..792a4bf1e 100644 --- a/tests/e2e_regression/expected/tx_volume.body +++ b/tests/e2e_regression/expected/tx_volume.body @@ -1,4 +1,4 @@ { - "bucket_size_seconds": 86400, - "buckets": [] + "window_size_seconds": 86400, + "windows": [] } diff --git a/tests/e2e_regression/expected/window_size.body b/tests/e2e_regression/expected/window_size.body new file mode 100644 index 000000000..bed16d725 --- /dev/null +++ b/tests/e2e_regression/expected/window_size.body @@ -0,0 +1,4 @@ +{ + "window_size_seconds": 300, + "windows": [] +} diff --git a/tests/e2e_regression/expected/bucket_size.headers b/tests/e2e_regression/expected/window_size.headers similarity index 100% rename from tests/e2e_regression/expected/bucket_size.headers rename to tests/e2e_regression/expected/window_size.headers diff --git a/tests/e2e_regression/run.sh b/tests/e2e_regression/run.sh index 36ad78b65..031d67e1c 100755 --- a/tests/e2e_regression/run.sh +++ b/tests/e2e_regression/run.sh @@ -54,8 +54,8 @@ testCases=( 'proposal /v1/consensus/proposals/2' 'votes /v1/consensus/proposals/2/votes' 'tx_volume /v1/consensus/stats/tx_volume' - 'bucket_size /v1/consensus/stats/tx_volume?bucket_size_seconds=300' - 'nonstandard_bucket_size /v1/consensus/stats/tx_volume?bucket_size_seconds=301' + 'window_size /v1/consensus/stats/tx_volume?window_size_seconds=300&window_step_seconds=300' + 'nonstandard_window_size /v1/consensus/stats/tx_volume?window_size_seconds=301&window_step_seconds=300' 'active_accounts /v1/consensus/stats/active_accounts' 'active_accounts_window /v1/consensus/stats/active_accounts?window_step_seconds=300' 'active_accounts_emerald /v1/emerald/stats/active_accounts' @@ -127,7 +127,7 @@ diff --recursive "$SCRIPT_DIR/expected" "$outDir" >/dev/null || { # Create a copy of the `expected` dir with the symlink contents materialized; we'll diff against that. rm -rf /tmp/nexus-e2e-expected; cp -r --dereference "$SCRIPT_DIR/expected" /tmp/nexus-e2e-expected; } - if [[ -t 1 ]]; then # Running in a terminal + if [[ -t 1 ]]; then # Running in a terminal echo "Press enter see the diff, or Ctrl-C to abort." read -r git diff --no-index /tmp/nexus-e2e-expected "$SCRIPT_DIR/actual" || true