From ee2815f388d4834302e07dc7beebe534453ed1d9 Mon Sep 17 00:00:00 2001 From: ptrus Date: Tue, 4 Jul 2023 10:39:34 +0200 Subject: [PATCH 1/2] tx_volume stats: Produce windowed results, compute more efficiently --- analyzer/aggregate_stats.go | 277 ----------------- analyzer/aggregate_stats/aggregate_stats.go | 280 ++++++++++++++++++ analyzer/aggregate_stats/queries.go | 109 +++++++ analyzer/aggregate_stats/stats_computation.go | 64 ++++ analyzer/queries/queries.go | 81 +---- api/errors.go | 6 +- api/middleware.go | 9 +- api/spec/v1.yaml | 30 +- cmd/analyzer/analyzer.go | 3 +- config/config.go | 11 +- config/docker-dev.yml | 3 +- config/local-dev.yml | 3 +- storage/client/client.go | 30 +- storage/client/queries/queries.go | 27 +- storage/migrations/03_agg_stats.up.sql | 2 +- .../migrations/11_agg_stats_refactor.up.sql | 73 +++++ tests/e2e_regression/e2e_config.yml | 3 +- .../e2e_regression/expected/bucket_size.body | 4 - .../expected/emerald_tx_volume.body | 4 +- .../expected/nonstandard_bucket_size.body | 4 - .../expected/nonstandard_bucket_size.headers | 6 - .../expected/nonstandard_window_size.body | 3 + .../expected/nonstandard_window_size.headers | 7 + tests/e2e_regression/expected/tx_volume.body | 4 +- .../e2e_regression/expected/window_size.body | 4 + ...ucket_size.headers => window_size.headers} | 0 tests/e2e_regression/run.sh | 6 +- 27 files changed, 618 insertions(+), 435 deletions(-) delete mode 100644 analyzer/aggregate_stats.go create mode 100644 analyzer/aggregate_stats/aggregate_stats.go create mode 100644 analyzer/aggregate_stats/queries.go create mode 100644 analyzer/aggregate_stats/stats_computation.go create mode 100644 storage/migrations/11_agg_stats_refactor.up.sql delete mode 100644 tests/e2e_regression/expected/bucket_size.body delete mode 100644 tests/e2e_regression/expected/nonstandard_bucket_size.body delete mode 100644 tests/e2e_regression/expected/nonstandard_bucket_size.headers create mode 100644 tests/e2e_regression/expected/nonstandard_window_size.body create mode 100644 tests/e2e_regression/expected/nonstandard_window_size.headers create mode 100644 tests/e2e_regression/expected/window_size.body rename tests/e2e_regression/expected/{bucket_size.headers => window_size.headers} (100%) diff --git a/analyzer/aggregate_stats.go b/analyzer/aggregate_stats.go deleted file mode 100644 index 6fba3b397..000000000 --- a/analyzer/aggregate_stats.go +++ /dev/null @@ -1,277 +0,0 @@ -package analyzer - -import ( - "context" - "errors" - "fmt" - "sync" - "time" - - "github.com/jackc/pgx/v5" - - "github.com/oasisprotocol/nexus/analyzer/queries" - "github.com/oasisprotocol/nexus/common" - "github.com/oasisprotocol/nexus/config" - "github.com/oasisprotocol/nexus/log" - "github.com/oasisprotocol/nexus/metrics" - "github.com/oasisprotocol/nexus/storage" -) - -const ( - AggregateStatsAnalyzerName = "aggregate_stats" - - // We would ideally recompute tx volume stats (which include 5-min aggregates) every 5 min, - // but the current naive implementation is too inefficient for that. Allow for the update - // to take longer. - txVolumeStatsUpdateTimeout = 10 * time.Minute // as of height 10767524, this takes ~3m15s - - // Name of the consensus layer. - layerConsensus = "consensus" - - // Limit the number of daily active accounts aggregation windows to be computed in a single - // iteration. - // This limits database batch sizes in hypothetical cases if the daily active accounts - // analyzer would be far behind latest indexed block (e.g. if it would not be enabled during initial sync). - // Likely never the case in normal operation. - dailyActiveAccountsBatchLimit = 1_000 - // Interval between daily active accounts iterations. - // The worker only computes new data and iteration with no updates are cheap - // so it's fine to run this frequently. - dailyActiveAccountsInterval = 1 * time.Minute - - // 24-hour sliding window with 5 minute step. - dailyActiveAccountsWindowSize = 24 * time.Hour - dailyActiveAccountsWindowStep = 5 * time.Minute -) - -// Layers that are tracked for daily active account stats. -// XXX: Instead of hardcoding, these could also be obtained -// by (periodically) querying the `runtime_related_transactions` table. -var dailyActiveAccountsLayers = []string{ - layerConsensus, - string(common.RuntimeEmerald), - string(common.RuntimeSapphire), - // RuntimeCipher.String(), // Enable once Cipher is supported by Nexus. -} - -type AggregateStatsAnalyzer struct { - target storage.TargetStorage - - txVolumeInterval time.Duration - - logger *log.Logger - metrics metrics.DatabaseMetrics -} - -var _ Analyzer = (*AggregateStatsAnalyzer)(nil) - -func (a *AggregateStatsAnalyzer) Name() string { - return AggregateStatsAnalyzerName -} - -func NewAggregateStatsAnalyzer(cfg *config.AggregateStatsConfig, target storage.TargetStorage, logger *log.Logger) (*AggregateStatsAnalyzer, error) { - logger.Info("starting aggregate_stats analyzer") - return &AggregateStatsAnalyzer{ - target: target, - txVolumeInterval: cfg.TxVolumeInterval, - logger: logger.With("analyzer", AggregateStatsAnalyzerName), - metrics: metrics.NewDefaultDatabaseMetrics(AggregateStatsAnalyzerName), - }, nil -} - -func (a *AggregateStatsAnalyzer) Start(ctx context.Context) { - var wg sync.WaitGroup - wg.Add(2) - - go func() { - defer wg.Done() - a.txVolumeWorker(ctx) - }() - go func() { - defer wg.Done() - a.dailyActiveAccountsWorker(ctx) - }() - - wg.Wait() -} - -// txVolumeWorker periodically updates the tx volume stats. -func (a *AggregateStatsAnalyzer) txVolumeWorker(ctx context.Context) { - for { - a.logger.Info("updating tx volume stats (5-min and daily)") - - // Note: The order matters here! Daily tx volume is a materialized view - // that's instantiated from 5 minute tx volume. - batch := &storage.QueryBatch{} - batch.Queue(queries.RefreshMin5TxVolume) - batch.Queue(queries.RefreshDailyTxVolume) - - ctxWithTimeout, cancelCtx := context.WithTimeout(ctx, txVolumeStatsUpdateTimeout) - if err := a.writeToDB(ctxWithTimeout, batch, "update_tx_volume_stats"); err != nil { - a.logger.Error("failed to trigger tx volume stats updates", "err", err) - } - a.logger.Info("updated tx volume stats", "num_materialized_views", batch.Len()) - cancelCtx() - - select { - case <-time.After(a.txVolumeInterval): - // Update stats. - case <-ctx.Done(): - a.logger.Error("shutting down tx volume worker", "reason", ctx.Err()) - return - } - } -} - -func (a *AggregateStatsAnalyzer) writeToDB(ctx context.Context, batch *storage.QueryBatch, opName string) error { - timer := a.metrics.DatabaseTimer(a.target.Name(), opName) - defer timer.ObserveDuration() - - if err := a.target.SendBatch(ctx, batch); err != nil { - a.metrics.DatabaseCounter(a.target.Name(), opName, "failure").Inc() - return err - } - a.metrics.DatabaseCounter(a.target.Name(), opName, "success").Inc() - - return nil -} - -// dailyActiveAccountsWorker computes a sliding window of daily unique active accounts. -func (a *AggregateStatsAnalyzer) dailyActiveAccountsWorker(ctx context.Context) { - windowSize := dailyActiveAccountsWindowSize - windowStep := dailyActiveAccountsWindowStep - - // Rounds the timestamp down to the nearest windowStep interval. - // e.g. if windowStep is 5 minutes, then 12:34:56 will be rounded down to 12:30:00. - floorWindow := func(ts *time.Time) time.Time { - return ts.Truncate(windowStep) - } - - for { - for _, layer := range dailyActiveAccountsLayers { - a.logger.Info("updating daily active account stats", "layer", layer) - - // Query the latest indexed block timestamp. - latestBlockTs, err := a.latestBlockTs(ctx, layer) - if err != nil { - // Log this as info as this can be expected when there's no indexed blocks yet, or if any of the layers are not being indexed. - a.logger.Info("failed querying latest indexed block timestamp, skipping iteration", "layer", layer, "err", err) - continue - } - latestPossibleWindow := floorWindow(latestBlockTs) - - // Start at the latest already computed window, or at the earliest indexed - // block if no stats have been computed yet. - var latestStatsTs time.Time - err = a.target.QueryRow( - ctx, - queries.LatestDailyAccountStats, - layer, - ).Scan(&latestStatsTs) - switch { - case err == nil: - // Continues below. - case errors.Is(pgx.ErrNoRows, err): - // No stats yet. Start at the earliest indexed block. - var earliestBlockTs *time.Time - earliestBlockTs, err = a.earliestBlockTs(ctx, layer) - if err != nil { - a.logger.Error("failed querying earliest indexed block timestamp", "layer", layer, "err", err) - continue - } - latestStatsTs = floorWindow(earliestBlockTs) - default: - a.logger.Error("failed querying latest daily accounts stats window", "layer", layer, "err", err) - continue - } - - // Compute daily unique active accounts for windows until the latest indexed block. - batch := &storage.QueryBatch{} - for { - nextWindow := latestStatsTs.Add(windowStep) - if !latestPossibleWindow.After(nextWindow) { - // Cannot yet compute stats for next window. Stop. - break - } - windowStart := nextWindow.Add(-windowSize) - windowEnd := nextWindow - - // Compute active accounts for the provided time window. - var acctsRow pgx.Row - switch layer { - case layerConsensus: - acctsRow = a.target.QueryRow( - ctx, - queries.ConsensusActiveAccounts, - windowStart, // from - windowEnd, // to - ) - default: - acctsRow = a.target.QueryRow( - ctx, - queries.RuntimeActiveAccounts, - layer, // runtime - windowStart, // from - windowEnd, // to - ) - } - var activeAccounts uint64 - if err := acctsRow.Scan(&activeAccounts); err != nil { - a.logger.Error("failed to compute daily active accounts", "layer", layer, "window_start", windowStart, "window_end", windowEnd, "err", err) - continue - } - - // Insert computed active accounts. - batch.Queue(queries.InsertDailyAccountStats, layer, windowEnd.UTC(), activeAccounts) - if batch.Len() > dailyActiveAccountsBatchLimit { - break - } - latestStatsTs = nextWindow - } - if err := a.writeToDB(ctx, batch, fmt.Sprintf("update_daily_active_account_%s_stats", layer)); err != nil { - a.logger.Error("failed to insert daily active account stats update", "err", err, "layer", layer) - } - a.logger.Info("updated daily active account stats", "layer", layer, "num_inserts", batch.Len()) - } - - select { - case <-time.After(dailyActiveAccountsInterval): - // Update stats again. - case <-ctx.Done(): - a.logger.Error("shutting down daily active accounts worker", "reason", ctx.Err()) - return - } - } -} - -// Queries the latest indexed block of the specified layer. -func (a *AggregateStatsAnalyzer) latestBlockTs(ctx context.Context, layer string) (*time.Time, error) { - var latestBlockTsRow pgx.Row - switch layer { - case layerConsensus: - latestBlockTsRow = a.target.QueryRow(ctx, queries.LatestConsensusBlockTime) - default: - latestBlockTsRow = a.target.QueryRow(ctx, queries.LatestRuntimeBlockTime, layer) - } - var latestBlockTs *time.Time - if err := latestBlockTsRow.Scan(&latestBlockTs); err != nil { // Fails with ErrNoRows if no blocks. - return nil, err - } - return latestBlockTs, nil -} - -// Queries the earliest indexed block for the specified layer. -func (a *AggregateStatsAnalyzer) earliestBlockTs(ctx context.Context, layer string) (*time.Time, error) { - var earliestBlockTsRow pgx.Row - switch layer { - case layerConsensus: - earliestBlockTsRow = a.target.QueryRow(ctx, queries.EarliestConsensusBlockTime) - default: - earliestBlockTsRow = a.target.QueryRow(ctx, queries.EarliestRuntimeBlockTime, layer) - } - var earliestBlockTs *time.Time - if err := earliestBlockTsRow.Scan(&earliestBlockTs); err != nil { // Fails with ErrNoRows if no blocks. - return nil, err - } - return earliestBlockTs, nil -} diff --git a/analyzer/aggregate_stats/aggregate_stats.go b/analyzer/aggregate_stats/aggregate_stats.go new file mode 100644 index 000000000..8a16f4722 --- /dev/null +++ b/analyzer/aggregate_stats/aggregate_stats.go @@ -0,0 +1,280 @@ +package aggregate_stats + +import ( + "context" + "errors" + "fmt" + "time" + + "github.com/jackc/pgx/v5" + + "github.com/oasisprotocol/nexus/analyzer" + "github.com/oasisprotocol/nexus/common" + "github.com/oasisprotocol/nexus/log" + "github.com/oasisprotocol/nexus/metrics" + "github.com/oasisprotocol/nexus/storage" +) + +const ( + aggregateStatsAnalyzerName = "aggregate_stats" + + // Limit the number of stats aggregation windows to be computed in a single + // iteration. + // This limits database batch sizes in cases if the stats analyzer would be far behind + // latest indexed block (e.g. if it would not be enabled during initial sync). + // Likely never the case in normal operation. + statsBatchLimit = 1_000 + + // Name of the consensus layer. + layerConsensus = "consensus" + + // Interval between stat computation iterations. + // The workers only computes new data and iteration with no updates are cheap + // so it's fine to run this frequently. + statsComputationInterval = 1 * time.Minute +) + +// Layers that are tracked for aggregate stats. +// XXX: Instead of hardcoding, these could also be obtained +// by (periodically) querying the `runtime_related_transactions` table. +var statsLayers = []string{ + layerConsensus, + string(common.RuntimeEmerald), + string(common.RuntimeSapphire), + string(common.RuntimeCipher), +} + +type aggregateStatsAnalyzer struct { + target storage.TargetStorage + + logger *log.Logger + metrics metrics.DatabaseMetrics +} + +var _ analyzer.Analyzer = (*aggregateStatsAnalyzer)(nil) + +func (a *aggregateStatsAnalyzer) Name() string { + return aggregateStatsAnalyzerName +} + +func NewAggregateStatsAnalyzer(target storage.TargetStorage, logger *log.Logger) (analyzer.Analyzer, error) { + logger.Info("starting aggregate_stats analyzer") + return &aggregateStatsAnalyzer{ + target: target, + logger: logger.With("analyzer", aggregateStatsAnalyzerName), + metrics: metrics.NewDefaultDatabaseMetrics(aggregateStatsAnalyzerName), + }, nil +} + +func (a *aggregateStatsAnalyzer) Start(ctx context.Context) { + a.aggregateStatsWorker(ctx) +} + +func (a *aggregateStatsAnalyzer) writeToDB(ctx context.Context, batch *storage.QueryBatch, opName string) error { + timer := a.metrics.DatabaseTimer(a.target.Name(), opName) + defer timer.ObserveDuration() + + if err := a.target.SendBatch(ctx, batch); err != nil { + a.metrics.DatabaseCounter(a.target.Name(), opName, "failure").Inc() + return err + } + a.metrics.DatabaseCounter(a.target.Name(), opName, "success").Inc() + + return nil +} + +func (a *aggregateStatsAnalyzer) aggregateStatsWorker(ctx context.Context) { + statsComputations := []*statsComputation{} + + // Compute 5-minute tx volume stats every 5 minutes for all layers. + for _, layer := range statsLayers { + sc := &statsComputation{ + target: a.target, + name: "min5_tx_volume_" + layer, + layer: layer, + outputTable: "stats.min5_tx_volume", + outputColumn: "tx_volume", + windowSize: 5 * time.Minute, + windowStep: 5 * time.Minute, + } + if layer == layerConsensus { + // Consensus layer queries. + sc.statsQuery = QueryConsensusTxVolume + sc.latestAvailableDataTs = func(ctx context.Context, target storage.TargetStorage) (*time.Time, error) { + var latestBlockTs *time.Time + err := target.QueryRow(ctx, QueryLatestConsensusBlockTime).Scan(&latestBlockTs) + return latestBlockTs, err + } + } else { + // Runtime layer queries. + sc.statsQuery = QueryRuntimeTxVolume + sc.latestAvailableDataTs = func(ctx context.Context, target storage.TargetStorage) (*time.Time, error) { + var latestBlockTs *time.Time + err := target.QueryRow(ctx, QueryLatestRuntimeBlockTime, sc.layer).Scan(&latestBlockTs) + return latestBlockTs, err + } + } + statsComputations = append(statsComputations, sc) + } + + // Compute daily tx volume stats every 5 minutes for all layers. + // Uses the stats.min5_tx_volume results so that it is efficient. + for _, layer := range statsLayers { + layer := layer + sc := &statsComputation{ + target: a.target, + name: "daily_tx_volume_" + layer, + layer: layer, + outputTable: "stats.daily_tx_volume", + outputColumn: "tx_volume", + windowSize: 24 * time.Hour, + windowStep: 5 * time.Minute, + // Latest available data is the latest computed 5-minute tx volume window. + latestAvailableDataTs: func(ctx context.Context, target storage.TargetStorage) (*time.Time, error) { + var latestTs *time.Time + return latestTs, a.target.QueryRow(ctx, QueryLatestMin5TxVolume, layer).Scan(&latestTs) + }, + statsQuery: QueryDailyTxVolume, + } + statsComputations = append(statsComputations, sc) + } + + // Compute daily active accounts every 5 minutes for all layers. + for _, layer := range statsLayers { + sc := &statsComputation{ + target: a.target, + name: "daily_active_accounts" + "_" + layer, + layer: layer, + outputTable: "stats.daily_active_accounts", + outputColumn: "active_accounts", + windowSize: 24 * time.Hour, + windowStep: 5 * time.Minute, + } + if layer == layerConsensus { + // Consensus layer queries. + sc.statsQuery = QueryConsensusActiveAccounts + sc.latestAvailableDataTs = func(ctx context.Context, target storage.TargetStorage) (*time.Time, error) { + var latestBlockTs *time.Time + return latestBlockTs, target.QueryRow(ctx, QueryLatestConsensusBlockTime).Scan(&latestBlockTs) + } + } else { + // Runtime layer queries. + sc.statsQuery = QueryRuntimeActiveAccounts + sc.latestAvailableDataTs = func(ctx context.Context, target storage.TargetStorage) (*time.Time, error) { + var latestBlockTs *time.Time + return latestBlockTs, target.QueryRow(ctx, QueryLatestRuntimeBlockTime, sc.layer).Scan(&latestBlockTs) + } + } + statsComputations = append(statsComputations, sc) + } + + for { + for _, statsComputation := range statsComputations { + logger := a.logger.With("name", statsComputation.name, "layer", statsComputation.layer) + + windowSize := statsComputation.windowSize + windowStep := statsComputation.windowStep + // Rounds the timestamp down to the nearest windowStep interval. + // e.g. if windowStep is 5 minutes, then 12:34:56 will be rounded down to 12:30:00. + floorWindow := func(ts *time.Time) time.Time { + return ts.Truncate(windowStep) + } + + logger.Info("updating stats") + + // First we find the start and end timestamps for which we can compute stats. + // Start at the latest already computed window, or at the earliest indexed + // block if no stats have been computed yet. + latestComputed, err := statsComputation.LatestComputedTs(ctx) + switch { + case err == nil: + // Continues below. + case errors.Is(pgx.ErrNoRows, err): + // No stats yet. Start at the earliest indexed block. + var earliestBlockTs *time.Time + earliestBlockTs, err = a.earliestBlockTs(ctx, statsComputation.layer) + switch { + case err == nil: + latestComputed = floorWindow(earliestBlockTs) + case errors.Is(pgx.ErrNoRows, err): + // No data log a debug only log. + logger.Debug("no stats available yet, skipping iteration") + continue + default: + logger.Error("failed querying earliest indexed block timestamp", "err", err) + continue + } + default: + logger.Error("failed querying latest computed stats window", "err", err) + continue + } + + // End at the latest available data timestamp rounded down to the windowStep interval. + latestPossibleWindow, err := statsComputation.latestAvailableDataTs(ctx, a.target) + switch { + case err == nil: + // Continues below. + case errors.Is(pgx.ErrNoRows, err): + logger.Debug("no stats available yet, skipping iteration") + continue + default: + logger.Error("failed querying the latest possible window, skipping iteration", "err", err) + continue + } + latestPossibleWindow = common.Ptr(floorWindow(latestPossibleWindow)) + + // Compute stats for all windows between `latestComputed` and `latestPossibleWindow`. + batch := &storage.QueryBatch{} + for { + nextWindow := latestComputed.Add(windowStep) + if !latestPossibleWindow.After(nextWindow) { + // Cannot yet compute stats for this window. Stop. + break + } + windowStart := nextWindow.Add(-windowSize) + windowEnd := nextWindow + + // Compute stats for the provided time window. + queries, err := statsComputation.ComputeStats(ctx, windowStart, windowEnd) + if err != nil { + logger.Error("failed to compute stat for the window", "window_start", windowStart, "window_end", windowEnd, "err", err) + break + } + batch.Extend(queries) + + if batch.Len() > statsBatchLimit { + break + } + latestComputed = nextWindow + } + if err := a.writeToDB(ctx, batch, fmt.Sprintf("update_stats_%s", a.Name())); err != nil { + logger.Error("failed to insert computed stats update", "err", err) + } + logger.Info("updated stats", "num_inserts", batch.Len()) + } + + select { + case <-time.After(statsComputationInterval): + // Update stats again. + case <-ctx.Done(): + a.logger.Error("shutting down aggregate stats worker", "reason", ctx.Err()) + return + } + } +} + +// Queries the earliest indexed block for the specified layer. +func (a *aggregateStatsAnalyzer) earliestBlockTs(ctx context.Context, layer string) (*time.Time, error) { + var earliestBlockTsRow pgx.Row + switch layer { + case layerConsensus: + earliestBlockTsRow = a.target.QueryRow(ctx, QueryEarliestConsensusBlockTime) + default: + earliestBlockTsRow = a.target.QueryRow(ctx, QueryEarliestRuntimeBlockTime, layer) + } + var earliestBlockTs *time.Time + if err := earliestBlockTsRow.Scan(&earliestBlockTs); err != nil { // Fails with ErrNoRows if no blocks. + return nil, err + } + return earliestBlockTs, nil +} diff --git a/analyzer/aggregate_stats/queries.go b/analyzer/aggregate_stats/queries.go new file mode 100644 index 000000000..966de2bfd --- /dev/null +++ b/analyzer/aggregate_stats/queries.go @@ -0,0 +1,109 @@ +package aggregate_stats + +const ( + // QueryEarliestConsensusBlockTime is the query to get the timestamp of the earliest + // indexed consensus block. + QueryEarliestConsensusBlockTime = ` + SELECT time + FROM chain.blocks + ORDER BY height + LIMIT 1 + ` + + // QueryEarliestRuntimeBlockTime is the query to get the timestamp of the earliest + // indexed runtime block. + QueryEarliestRuntimeBlockTime = ` + SELECT timestamp + FROM chain.runtime_blocks + WHERE (runtime = $1) + ORDER BY round + LIMIT 1 + ` + // QueryLatestConsensusBlockTime is the query to get the timestamp of the latest + // indexed consensus block. + QueryLatestConsensusBlockTime = ` + SELECT time + FROM chain.blocks + ORDER BY height DESC + LIMIT 1 + ` + // QueryLatestRuntimeBlockTime is the query to get the timestamp of the latest + // indexed runtime block. + QueryLatestRuntimeBlockTime = ` + SELECT timestamp + FROM chain.runtime_blocks + WHERE (runtime = $1) + ORDER BY round DESC + LIMIT 1 + ` + + // QueryConsensusTxVolume is the query to get the number the number of transactions + // in the given window at the consensus layer. + QueryConsensusTxVolume = ` + WITH dummy AS (SELECT $1::text) -- Dummy select that uses parameter $1 so that the query parameters are compatible with QueryRuntimeTxVolume. + SELECT COUNT(*) + FROM chain.blocks AS b + JOIN chain.transactions AS t ON b.height = t.block + WHERE (b.time >= $2::timestamptz AND b.time < $3::timestamptz) + ` + + // QueryRuntimeTxVolume is the query to get the number of of transactions + // in the runtime layer within the given time range. + QueryRuntimeTxVolume = ` + SELECT COUNT(*) + FROM chain.runtime_transactions AS t + WHERE (t.runtime = $1 AND t.timestamp >= $2::timestamptz AND t.timestamp < $3::timestamptz) + ` + + // QueryLatestStatsComputation is the query to get the timestamp of the latest computed stat window. + QueryLatestStatsComputation = ` + SELECT window_end + FROM %s + WHERE layer = $1 + ORDER BY window_end DESC + LIMIT 1 + ` + + // QueryInsertStatsComputation is the query to insert the stats computation. + QueryInsertStatsComputation = ` + INSERT INTO %s (layer, window_end, %s) + VALUES ($1, $2, $3) + ` + + // QueryConsensusActiveAccounts is the query to get the number of + // active accounts in the consensus layer within the given time range. + QueryConsensusActiveAccounts = ` + WITH dummy AS (SELECT $1::text) -- Dummy select that uses parameter $1 so that the query parameters are compatible with QueryRuntimeActiveAccounts. + SELECT COUNT(DISTINCT account_address) + FROM chain.accounts_related_transactions AS art + JOIN chain.blocks AS b ON art.tx_block = b.height + WHERE (b.time >= $2::timestamptz AND b.time < $3::timestamptz) + ` + + // QueryRuntimeActiveAccounts is the query to get the number of + // active accounts in the runtime layer within the given time range. + QueryRuntimeActiveAccounts = ` + SELECT COUNT(DISTINCT account_address) + FROM chain.runtime_related_transactions AS rt + JOIN chain.runtime_blocks AS b ON (rt.runtime = b.runtime AND rt.tx_round = b.round) + WHERE (rt.runtime = $1 AND b.timestamp >= $2::timestamptz AND b.timestamp < $3::timestamptz) + ` + + // QueryDailyTxVolume is the query to get the number of transactions within the given + // time range, by using the already computed 5 minute windowed data. + QueryDailyTxVolume = ` + SELECT COALESCE(SUM(t.tx_volume), 0) + FROM stats.min5_tx_volume AS t + WHERE (t.layer = $1 AND t.window_end >= $2::timestamptz AND t.window_end < $3::timestamptz) + ` + + // QueryLatestMin5TxVolume is the query to get the timestamp of the latest computed + // 5 minute windowed data for the given layer. + QueryLatestMin5TxVolume = ` + SELECT window_end + FROM stats.min5_tx_volume as t + WHERE t.layer = $1 + ORDER BY window_end DESC + LIMIT 1 + ` +) diff --git a/analyzer/aggregate_stats/stats_computation.go b/analyzer/aggregate_stats/stats_computation.go new file mode 100644 index 000000000..4424cffff --- /dev/null +++ b/analyzer/aggregate_stats/stats_computation.go @@ -0,0 +1,64 @@ +package aggregate_stats + +import ( + "context" + "fmt" + "time" + + "github.com/oasisprotocol/nexus/storage" +) + +type statsComputation struct { + target storage.TargetStorage + + // Stats name. + name string + // Layer (e.g. consensus, emerald...) for which the stats are computed. + layer string + + outputTable string + outputColumn string + + // Stat window configurations. + windowSize time.Duration + windowStep time.Duration + + // Return the latest timestamp up until the data is available. + latestAvailableDataTs func(ctx context.Context, target storage.TargetStorage) (*time.Time, error) + // Query that computes the stats. The query should expect 3 arguments: + // $1 - layer name + // $2 - start of window timestamp + // $3 - end of window timestamp + // The result should be a uint64 compatible number - the computed stat. + statsQuery string +} + +func (s *statsComputation) LatestComputedTs(ctx context.Context) (time.Time, error) { + var latestStatsTs time.Time + err := s.target.QueryRow( + ctx, + fmt.Sprintf(QueryLatestStatsComputation, s.outputTable), + s.layer, + ).Scan(&latestStatsTs) + return latestStatsTs, err +} + +func (s *statsComputation) ComputeStats(ctx context.Context, windowStart time.Time, windowEnd time.Time) (*storage.QueryBatch, error) { + batch := &storage.QueryBatch{} + row := s.target.QueryRow( + ctx, + s.statsQuery, + s.layer, + windowStart, + windowEnd, + ) + var result uint64 + if err := row.Scan(&result); err != nil { + return nil, err + } + + // Insert computed stat. + batch.Queue(fmt.Sprintf(QueryInsertStatsComputation, s.outputTable, s.outputColumn), s.layer, windowEnd.UTC(), result) + + return batch, nil +} diff --git a/analyzer/queries/queries.go b/analyzer/queries/queries.go index 90bbd70af..ef66ec3ce 100644 --- a/analyzer/queries/queries.go +++ b/analyzer/queries/queries.go @@ -475,7 +475,7 @@ var ( INSERT INTO analysis.evm_tokens (runtime, token_address, total_supply, last_mutate_round) VALUES ($1, $2, $3, $4) ON CONFLICT (runtime, token_address) DO - UPDATE SET + UPDATE SET total_supply = analysis.evm_tokens.total_supply + $3, last_mutate_round = excluded.last_mutate_round` @@ -625,83 +625,4 @@ var ( source_files = $5 WHERE runtime = $1 AND contract_address = $2` - - RefreshDailyTxVolume = ` - REFRESH MATERIALIZED VIEW CONCURRENTLY stats.daily_tx_volume - ` - - RefreshMin5TxVolume = ` - REFRESH MATERIALIZED VIEW CONCURRENTLY stats.min5_tx_volume - ` - - // LatestDailyAccountStats is the query to get the timestamp of the latest daily active accounts stat. - LatestDailyAccountStats = ` - SELECT window_end - FROM stats.daily_active_accounts - WHERE layer = $1 - ORDER BY window_end DESC - LIMIT 1 - ` - - // InsertDailyAccountStats is the query to insert the daily active accounts stat. - InsertDailyAccountStats = ` - INSERT INTO stats.daily_active_accounts (layer, window_end, active_accounts) - VALUES ($1, $2, $3) - ` - - // EarliestConsensusBlockTime is the query to get the timestamp of the earliest - // indexed consensus block. - EarliestConsensusBlockTime = ` - SELECT time - FROM chain.blocks - ORDER BY height - LIMIT 1 - ` - - // LatestConsensusBlockTime is the query to get the timestamp of the latest - // indexed consensus block. - LatestConsensusBlockTime = ` - SELECT time - FROM chain.blocks - ORDER BY height DESC - LIMIT 1 - ` - - // EarliestRuntimeBlockTime is the query to get the timestamp of the earliest - // indexed runtime block. - EarliestRuntimeBlockTime = ` - SELECT timestamp - FROM chain.runtime_blocks - WHERE (runtime = $1) - ORDER BY round - LIMIT 1 - ` - - // LatestRuntimeBlockTime is the query to get the timestamp of the latest - // indexed runtime block. - LatestRuntimeBlockTime = ` - SELECT timestamp - FROM chain.runtime_blocks - WHERE (runtime = $1) - ORDER BY round DESC - LIMIT 1 - ` - - // ConsensusActiveAccounts is the query to get the number of - // active accounts in the consensus layer within the given time range. - ConsensusActiveAccounts = ` - SELECT COUNT(DISTINCT account_address) - FROM chain.accounts_related_transactions AS art - JOIN chain.blocks AS b ON art.tx_block = b.height - WHERE (b.time >= $1::timestamptz AND b.time < $2::timestamptz) - ` - - // RuntimeActiveAccounts is the query to get the number of - // active accounts in the runtime layer within the given time range. - RuntimeActiveAccounts = ` - SELECT COUNT(DISTINCT account_address) - FROM chain.runtime_related_transactions AS rt - JOIN chain.runtime_blocks AS b ON (rt.runtime = b.runtime AND rt.tx_round = b.round) - WHERE (rt.runtime = $1 AND b.timestamp >= $2::timestamptz AND b.timestamp < $3::timestamptz) - ` ) diff --git a/api/errors.go b/api/errors.go index 7eadb522a..7aec8b6b4 100644 --- a/api/errors.go +++ b/api/errors.go @@ -44,11 +44,11 @@ func HttpCodeForError(err error) int { errType := errVal.Type() switch { - case err == ErrBadChainID: + case errors.Is(err, ErrBadChainID): return http.StatusNotFound - case err == ErrBadRequest: + case errors.Is(err, ErrBadRequest): return http.StatusBadRequest - case err == ErrNotFound: + case errors.Is(err, ErrNotFound): return http.StatusNotFound case errType == reflect.TypeOf(ErrStorageError{}): return http.StatusInternalServerError diff --git a/api/middleware.go b/api/middleware.go index 56aa2aa05..2c9676f91 100644 --- a/api/middleware.go +++ b/api/middleware.go @@ -19,7 +19,8 @@ import ( var ( defaultOffset = uint64(0) defaultLimit = uint64(100) - defaultBucketSizeSeconds = uint32(3600) + defaultWindowSizeSeconds = uint32(86400) + defaultWindowStepSeconds = uint32(86400) maxLimit = uint64(1000) ) @@ -126,8 +127,10 @@ func fixDefaultsAndLimits(p any) { f.Set(reflect.ValueOf(&defaultLimit)) case "Offset": f.Set(reflect.ValueOf(&defaultOffset)) - case "BucketSizeSeconds": - f.Set(reflect.ValueOf(&defaultBucketSizeSeconds)) + case "WindowSizeSeconds": + f.Set(reflect.ValueOf(&defaultWindowSizeSeconds)) + case "WindowStepSeconds": + f.Set(reflect.ValueOf(&defaultWindowStepSeconds)) } } } diff --git a/api/spec/v1.yaml b/api/spec/v1.yaml index f17b5cfe7..edaa09532 100644 --- a/api/spec/v1.yaml +++ b/api/spec/v1.yaml @@ -50,16 +50,16 @@ x-query-params: $ref: '#/components/schemas/Runtime' description: | The runtime which to query. - - &bucket_size_seconds + - &window_size_seconds in: query - name: bucket_size_seconds + name: window_size_seconds schema: type: integer format: uint32 default: 86400 description: | - The size of buckets into which the statistic is grouped, in seconds. - The backend supports a limited number of bucket sizes: 300 (5 minutes) and + The size of windows into which the statistic is grouped, in seconds. + The backend supports a limited number of window sizes: 300 (5 minutes) and 86400 (1 day). Requests with other values may be rejected. - &window_step_seconds in: query @@ -67,7 +67,6 @@ x-query-params: schema: type: integer format: uint32 - enum: [300, 86400] default: 86400 description: | The size of the step between returned statistic windows, in seconds. @@ -1014,7 +1013,7 @@ paths: /{runtime}/evm_tokens/{address}/holders: get: summary: | - Returns the list of holders of an EVM (ERC-20, ...) token. + Returns the list of holders of an EVM (ERC-20, ...) token. This endpoint does not verify that `address` is actually an EVM token; if it is not, it will simply return an empty list. parameters: - *limit @@ -1077,7 +1076,8 @@ paths: parameters: - *limit - *offset - - *bucket_size_seconds + - *window_size_seconds + - *window_step_seconds - in: path name: layer required: true @@ -2563,12 +2563,12 @@ components: TxVolumeList: type: object - required: [bucket_size_seconds, buckets] + required: [window_size_seconds, windows] properties: - bucket_size_seconds: + window_size_seconds: type: integer format: uint32 - buckets: + windows: type: array items: $ref: '#/components/schemas/TxVolume' @@ -2578,17 +2578,17 @@ components: TxVolume: type: object - required: [bucket_start, tx_volume] + required: [window_end, tx_volume] properties: - bucket_start: + window_end: type: string format: date-time - description: The date for this daily transaction volume measurement. + description: The end timestamp for this daily transaction volume measurement. example: *iso_timestamp_1 tx_volume: type: integer format: uint64 - description: The transaction volume on this day. + description: The transaction volume for this window. example: 420 ActiveAccountsList: @@ -2618,7 +2618,7 @@ components: active_accounts: type: integer format: uint64 - description: The number of active accounts for the 24hour window starting at bucket_start. + description: The number of active accounts for the 24hour window ending at window_end. example: 420 responses: diff --git a/cmd/analyzer/analyzer.go b/cmd/analyzer/analyzer.go index b3d548517..fbfd19c59 100644 --- a/cmd/analyzer/analyzer.go +++ b/cmd/analyzer/analyzer.go @@ -17,6 +17,7 @@ import ( "github.com/spf13/cobra" "github.com/oasisprotocol/nexus/analyzer" + "github.com/oasisprotocol/nexus/analyzer/aggregate_stats" "github.com/oasisprotocol/nexus/analyzer/consensus" "github.com/oasisprotocol/nexus/analyzer/evmcontractcode" "github.com/oasisprotocol/nexus/analyzer/evmtokenbalances" @@ -347,7 +348,7 @@ func NewService(cfg *config.AnalysisConfig) (*Service, error) { } if cfg.Analyzers.AggregateStats != nil { analyzers, err = addAnalyzer(analyzers, err, func() (A, error) { - return analyzer.NewAggregateStatsAnalyzer(cfg.Analyzers.AggregateStats, dbClient, logger) + return aggregate_stats.NewAggregateStatsAnalyzer(dbClient, logger) }) } if cfg.Analyzers.EVMContractsVerifier != nil { diff --git a/config/config.go b/config/config.go index b3cb02c14..b31fb6bd0 100644 --- a/config/config.go +++ b/config/config.go @@ -328,18 +328,9 @@ func (cfg *MetadataRegistryConfig) Validate() error { } // AggregateStatsConfig is the configuration for the aggregate stats analyzer. -type AggregateStatsConfig struct { - // TxVolumeInterval is the time interval for recomputing the tx volume aggregate stats. - // We would ideally recompute stats (which include 5-min aggregates) every 5 min, but the - // current naive implementation is too inefficient for that. - TxVolumeInterval time.Duration `koanf:"tx_volume_interval"` -} +type AggregateStatsConfig struct{} -// Validate validates the aggregate stats config. func (cfg *AggregateStatsConfig) Validate() error { - if cfg.TxVolumeInterval < 5*time.Minute { - return fmt.Errorf("tx_volume_interval must be at least 5 minutes") - } return nil } diff --git a/config/docker-dev.yml b/config/docker-dev.yml index 6284cef54..6f5341a3b 100644 --- a/config/docker-dev.yml +++ b/config/docker-dev.yml @@ -8,8 +8,7 @@ analysis: analyzers: metadata_registry: interval: 1h - aggregate_stats: - tx_volume_interval: 10m + aggregate_stats: {} consensus: from: 8_048_956 # Damask genesis emerald: diff --git a/config/local-dev.yml b/config/local-dev.yml index da42ce19c..0765656f9 100644 --- a/config/local-dev.yml +++ b/config/local-dev.yml @@ -8,8 +8,7 @@ analysis: analyzers: metadata_registry: interval: 1h - aggregate_stats: - tx_volume_interval: 10m + aggregate_stats: {} consensus: from: 8_048_956 # Damask genesis emerald: diff --git a/storage/client/client.go b/storage/client/client.go index ba7766d1a..1d2a7b55e 100644 --- a/storage/client/client.go +++ b/storage/client/client.go @@ -1598,15 +1598,23 @@ func (c *StorageClient) RuntimeStatus(ctx context.Context) (*RuntimeStatus, erro return &s, nil } -// TxVolumes returns a list of transaction volumes per time bucket. +// TxVolumes returns a list of transaction volumes per time window. func (c *StorageClient) TxVolumes(ctx context.Context, layer apiTypes.Layer, p apiTypes.GetLayerStatsTxVolumeParams) (*TxVolumeList, error) { var query string - if *p.BucketSizeSeconds == 300 { + + switch { + case *p.WindowSizeSeconds == 300 && *p.WindowStepSeconds == 300: + // 5 minute window, 5 minute step. query = queries.FineTxVolumes - } else { - var day uint32 = 86400 - p.BucketSizeSeconds = &day - query = queries.TxVolumes + case *p.WindowSizeSeconds == 86400 && *p.WindowStepSeconds == 86400: + // 1 day window, 1 day step. + query = queries.DailyTxVolumes + case *p.WindowSizeSeconds == 86400 && *p.WindowStepSeconds == 300: + // 1 day window, 5 minute step. + query = queries.FineDailyTxVolumes + default: + // Unsupported: case *p.WindowSizeSeconds == 300 && *p.WindowStepSeconds == 86400: + return nil, fmt.Errorf("invalid window size parameters: %w", apiCommon.ErrBadRequest) } rows, err := c.db.Query( @@ -1622,19 +1630,19 @@ func (c *StorageClient) TxVolumes(ctx context.Context, layer apiTypes.Layer, p a defer rows.Close() ts := TxVolumeList{ - BucketSizeSeconds: *p.BucketSizeSeconds, - Buckets: []apiTypes.TxVolume{}, + WindowSizeSeconds: *p.WindowSizeSeconds, + Windows: []apiTypes.TxVolume{}, } for rows.Next() { var t TxVolume if err := rows.Scan( - &t.BucketStart, + &t.WindowEnd, &t.TxVolume, ); err != nil { return nil, wrapError(err) } - t.BucketStart = t.BucketStart.UTC() // Ensure UTC timestamp in response. - ts.Buckets = append(ts.Buckets, t) + t.WindowEnd = t.WindowEnd.UTC() // Ensure UTC timestamp in response. + ts.Windows = append(ts.Windows, t) } return &ts, nil diff --git a/storage/client/queries/queries.go b/storage/client/queries/queries.go index d72b877a8..38da4cbb0 100644 --- a/storage/client/queries/queries.go +++ b/storage/client/queries/queries.go @@ -422,7 +422,7 @@ const ( LEFT JOIN chain.evm_tokens as tokens ON (evs.runtime=tokens.runtime) AND (preimages.address=tokens.token_address) - WHERE + WHERE (evs.runtime = $1) AND ($2::bigint IS NULL OR evs.round = $2::bigint) AND ($3::integer IS NULL OR evs.tx_index = $3::integer) AND @@ -558,22 +558,35 @@ const ( WHERE runtime_id = $1::text ` + // FineTxVolumes returns the fine-grained query for 5-minute sampled tx volume windows. FineTxVolumes = ` - SELECT window_start, tx_volume + SELECT window_end, tx_volume FROM stats.min5_tx_volume WHERE layer = $1::text ORDER BY - window_start DESC + window_end DESC LIMIT $2::bigint OFFSET $3::bigint ` - TxVolumes = ` - SELECT window_start, tx_volume + // FineDailyTxVolumes returns the query for daily tx volume windows. + FineDailyTxVolumes = ` + SELECT window_end, tx_volume FROM stats.daily_tx_volume WHERE layer = $1::text ORDER BY - window_start DESC + window_end DESC + LIMIT $2::bigint + OFFSET $3::bigint + ` + + // DailyTxVolumes returns the query for daily sampled daily tx volume windows. + DailyTxVolumes = ` + SELECT window_end, tx_volume + FROM stats.daily_tx_volume + WHERE (layer = $1::text AND (window_end AT TIME ZONE 'UTC')::time = '00:00:00') + ORDER BY + window_end DESC LIMIT $2::bigint OFFSET $3::bigint ` @@ -591,7 +604,7 @@ const ( // DailyActiveAccounts returns the query for daily sampled daily active account windows. DailyActiveAccounts = ` - SELECT date_trunc('day', window_end) as window_end, active_accounts + SELECT window_end, active_accounts FROM stats.daily_active_accounts WHERE (layer = $1::text AND (window_end AT TIME ZONE 'UTC')::time = '00:00:00') ORDER BY diff --git a/storage/migrations/03_agg_stats.up.sql b/storage/migrations/03_agg_stats.up.sql index 9775a0c51..2a563bf2b 100644 --- a/storage/migrations/03_agg_stats.up.sql +++ b/storage/migrations/03_agg_stats.up.sql @@ -47,7 +47,7 @@ CREATE MATERIALIZED VIEW stats.daily_tx_volume AS GROUP BY 1, 2; CREATE UNIQUE INDEX ix_stats_daily_tx_volume_window_start ON stats.daily_tx_volume (layer, window_start); -- A unique index is required for CONCURRENTLY refreshing the view. --- daily_active_accounts stores the sliding widnow for the number of unique accounts per day +-- daily_active_accounts stores the sliding window for the number of unique accounts per day -- that were involved in transactions. CREATE TABLE stats.daily_active_accounts ( diff --git a/storage/migrations/11_agg_stats_refactor.up.sql b/storage/migrations/11_agg_stats_refactor.up.sql new file mode 100644 index 000000000..032c23849 --- /dev/null +++ b/storage/migrations/11_agg_stats_refactor.up.sql @@ -0,0 +1,73 @@ +-- Drop stats materialized views and replace with tables. +-- Also migrate the data from the old materialized views to the new tables. + +BEGIN; + +-- First create new tables. The _new suffix is used to avoid name conflicts with the old materialized views, +-- and will be dropped at the end of the migration. + +-- min5_tx_volume stores the 5-minute tx volumes in 5-minute windows, per layer. +CREATE TABLE stats.min5_tx_volume_new +( + layer TEXT NOT NULL, + window_end TIMESTAMP WITH TIME ZONE NOT NULL, + tx_volume uint63 NOT NULL, + + PRIMARY KEY (layer, window_end) +); +-- daily_tx_volume stores the sliding window for the number of transactions per day per layer. +CREATE TABLE stats.daily_tx_volume_new +( + layer TEXT NOT NULL, + window_end TIMESTAMP WITH TIME ZONE NOT NULL, + tx_volume uint63 NOT NULL, + + PRIMARY KEY (layer, window_end) +); +-- Index for efficient query of the daily samples. +CREATE INDEX ix_stats_daily_tx_volume_daily_windows ON stats.daily_tx_volume_new (layer, window_end) WHERE ((window_end AT TIME ZONE 'UTC')::time = '00:00:00'); + +-- Migrate the data from the old materialized views to the new tables. +INSERT INTO stats.min5_tx_volume_new +SELECT layer, window_start + INTERVAL '5 minute', tx_volume +FROM stats.min5_tx_volume; + +INSERT INTO stats.daily_tx_volume_new +SELECT layer, window_start + INTERVAL '1 day', tx_volume +FROM stats.daily_tx_volume; + +-- Drop the old materialized views. +DROP MATERIALIZED VIEW IF EXISTS stats.daily_tx_volume; +DROP MATERIALIZED VIEW IF EXISTS stats.min5_tx_volume; + +-- Drop the last window for each of the daily_tx_volume layers, since that window can be incomplete. +WITH RankedRows AS ( + SELECT + layer, + window_end, + ROW_NUMBER() OVER (PARTITION BY layer ORDER BY window_end DESC) AS rnum + FROM + stats.daily_tx_volume_new +) +DELETE FROM stats.daily_tx_volume_new +WHERE (layer, window_end) IN (SELECT layer, window_end FROM RankedRows WHERE rnum = 1); + +-- Drop the last window for each of the min5_tx_volume layers, since that window can be incomplete. +WITH RankedRows AS ( + SELECT + layer, + window_end, + ROW_NUMBER() OVER (PARTITION BY layer ORDER BY window_end DESC) AS rnum + FROM + stats.min5_tx_volume_new +) +DELETE FROM stats.min5_tx_volume_new +WHERE (layer, window_end) IN (SELECT layer, window_end FROM RankedRows WHERE rnum = 1); + +-- Drop the _new suffixes from the new tables. +ALTER TABLE stats.min5_tx_volume_new +RENAME TO min5_tx_volume; +ALTER TABLE stats.daily_tx_volume_new +RENAME TO daily_tx_volume; + +COMMIT; diff --git a/tests/e2e_regression/e2e_config.yml b/tests/e2e_regression/e2e_config.yml index 1ebbdc391..372d9a45b 100644 --- a/tests/e2e_regression/e2e_config.yml +++ b/tests/e2e_regression/e2e_config.yml @@ -11,8 +11,7 @@ analysis: analyzers: # metadata_registry: # interval: 5m - # aggregate_stats: - # tx_volume_interval: 5m + # aggregate_stats: {} consensus: from: 8_048_956 # Damask genesis to: 8_049_056 # 100 blocks; fast enough for early testing diff --git a/tests/e2e_regression/expected/bucket_size.body b/tests/e2e_regression/expected/bucket_size.body deleted file mode 100644 index 2c4c93492..000000000 --- a/tests/e2e_regression/expected/bucket_size.body +++ /dev/null @@ -1,4 +0,0 @@ -{ - "bucket_size_seconds": 300, - "buckets": [] -} diff --git a/tests/e2e_regression/expected/emerald_tx_volume.body b/tests/e2e_regression/expected/emerald_tx_volume.body index 8673db24c..792a4bf1e 100644 --- a/tests/e2e_regression/expected/emerald_tx_volume.body +++ b/tests/e2e_regression/expected/emerald_tx_volume.body @@ -1,4 +1,4 @@ { - "bucket_size_seconds": 86400, - "buckets": [] + "window_size_seconds": 86400, + "windows": [] } diff --git a/tests/e2e_regression/expected/nonstandard_bucket_size.body b/tests/e2e_regression/expected/nonstandard_bucket_size.body deleted file mode 100644 index 8673db24c..000000000 --- a/tests/e2e_regression/expected/nonstandard_bucket_size.body +++ /dev/null @@ -1,4 +0,0 @@ -{ - "bucket_size_seconds": 86400, - "buckets": [] -} diff --git a/tests/e2e_regression/expected/nonstandard_bucket_size.headers b/tests/e2e_regression/expected/nonstandard_bucket_size.headers deleted file mode 100644 index 1cce01c2d..000000000 --- a/tests/e2e_regression/expected/nonstandard_bucket_size.headers +++ /dev/null @@ -1,6 +0,0 @@ -HTTP/1.1 200 OK -Content-Type: application/json -Vary: Origin -Date: UNINTERESTING -Content-Length: UNINTERESTING - diff --git a/tests/e2e_regression/expected/nonstandard_window_size.body b/tests/e2e_regression/expected/nonstandard_window_size.body new file mode 100644 index 000000000..076180344 --- /dev/null +++ b/tests/e2e_regression/expected/nonstandard_window_size.body @@ -0,0 +1,3 @@ +{ + "msg": "invalid window size parameters: invalid request parameters" +} diff --git a/tests/e2e_regression/expected/nonstandard_window_size.headers b/tests/e2e_regression/expected/nonstandard_window_size.headers new file mode 100644 index 000000000..c52b02ddd --- /dev/null +++ b/tests/e2e_regression/expected/nonstandard_window_size.headers @@ -0,0 +1,7 @@ +HTTP/1.1 400 Bad Request +Content-Type: application/json; charset=utf-8 +Vary: Origin +X-Content-Type-Options: nosniff +Date: UNINTERESTING +Content-Length: UNINTERESTING + diff --git a/tests/e2e_regression/expected/tx_volume.body b/tests/e2e_regression/expected/tx_volume.body index 8673db24c..792a4bf1e 100644 --- a/tests/e2e_regression/expected/tx_volume.body +++ b/tests/e2e_regression/expected/tx_volume.body @@ -1,4 +1,4 @@ { - "bucket_size_seconds": 86400, - "buckets": [] + "window_size_seconds": 86400, + "windows": [] } diff --git a/tests/e2e_regression/expected/window_size.body b/tests/e2e_regression/expected/window_size.body new file mode 100644 index 000000000..bed16d725 --- /dev/null +++ b/tests/e2e_regression/expected/window_size.body @@ -0,0 +1,4 @@ +{ + "window_size_seconds": 300, + "windows": [] +} diff --git a/tests/e2e_regression/expected/bucket_size.headers b/tests/e2e_regression/expected/window_size.headers similarity index 100% rename from tests/e2e_regression/expected/bucket_size.headers rename to tests/e2e_regression/expected/window_size.headers diff --git a/tests/e2e_regression/run.sh b/tests/e2e_regression/run.sh index 36ad78b65..031d67e1c 100755 --- a/tests/e2e_regression/run.sh +++ b/tests/e2e_regression/run.sh @@ -54,8 +54,8 @@ testCases=( 'proposal /v1/consensus/proposals/2' 'votes /v1/consensus/proposals/2/votes' 'tx_volume /v1/consensus/stats/tx_volume' - 'bucket_size /v1/consensus/stats/tx_volume?bucket_size_seconds=300' - 'nonstandard_bucket_size /v1/consensus/stats/tx_volume?bucket_size_seconds=301' + 'window_size /v1/consensus/stats/tx_volume?window_size_seconds=300&window_step_seconds=300' + 'nonstandard_window_size /v1/consensus/stats/tx_volume?window_size_seconds=301&window_step_seconds=300' 'active_accounts /v1/consensus/stats/active_accounts' 'active_accounts_window /v1/consensus/stats/active_accounts?window_step_seconds=300' 'active_accounts_emerald /v1/emerald/stats/active_accounts' @@ -127,7 +127,7 @@ diff --recursive "$SCRIPT_DIR/expected" "$outDir" >/dev/null || { # Create a copy of the `expected` dir with the symlink contents materialized; we'll diff against that. rm -rf /tmp/nexus-e2e-expected; cp -r --dereference "$SCRIPT_DIR/expected" /tmp/nexus-e2e-expected; } - if [[ -t 1 ]]; then # Running in a terminal + if [[ -t 1 ]]; then # Running in a terminal echo "Press enter see the diff, or Ctrl-C to abort." read -r git diff --no-index /tmp/nexus-e2e-expected "$SCRIPT_DIR/actual" || true From a3544b615a50659257e82862b1b379c7a1f8a40d Mon Sep 17 00:00:00 2001 From: ptrus Date: Tue, 8 Aug 2023 11:46:54 +0200 Subject: [PATCH 2/2] aggregate_stats: don't migrate data from views --- analyzer/aggregate_stats/aggregate_stats.go | 14 ++++- .../migrations/11_agg_stats_refactor.up.sql | 58 +++---------------- 2 files changed, 21 insertions(+), 51 deletions(-) diff --git a/analyzer/aggregate_stats/aggregate_stats.go b/analyzer/aggregate_stats/aggregate_stats.go index 8a16f4722..fa1237436 100644 --- a/analyzer/aggregate_stats/aggregate_stats.go +++ b/analyzer/aggregate_stats/aggregate_stats.go @@ -32,6 +32,10 @@ const ( // The workers only computes new data and iteration with no updates are cheap // so it's fine to run this frequently. statsComputationInterval = 1 * time.Minute + + // Interval between stat computation iterations if the aggregate stats worker is catching up + // (indicated by the fact that the per iteration batch limit was reached). + statsComputationIntervalCatchup = 5 * time.Second ) // Layers that are tracked for aggregate stats. @@ -169,6 +173,8 @@ func (a *aggregateStatsAnalyzer) aggregateStatsWorker(ctx context.Context) { } for { + // If batch limit was reached, start the next iteration sooner. + var batchLimitReached bool for _, statsComputation := range statsComputations { logger := a.logger.With("name", statsComputation.name, "layer", statsComputation.layer) @@ -243,6 +249,7 @@ func (a *aggregateStatsAnalyzer) aggregateStatsWorker(ctx context.Context) { batch.Extend(queries) if batch.Len() > statsBatchLimit { + batchLimitReached = true break } latestComputed = nextWindow @@ -253,8 +260,13 @@ func (a *aggregateStatsAnalyzer) aggregateStatsWorker(ctx context.Context) { logger.Info("updated stats", "num_inserts", batch.Len()) } + timeout := statsComputationInterval + if batchLimitReached { + // Batch limit reached, use the shorter stats-catchup timeout. + timeout = statsComputationIntervalCatchup + } select { - case <-time.After(statsComputationInterval): + case <-time.After(timeout): // Update stats again. case <-ctx.Done(): a.logger.Error("shutting down aggregate stats worker", "reason", ctx.Err()) diff --git a/storage/migrations/11_agg_stats_refactor.up.sql b/storage/migrations/11_agg_stats_refactor.up.sql index 032c23849..552bf4e6f 100644 --- a/storage/migrations/11_agg_stats_refactor.up.sql +++ b/storage/migrations/11_agg_stats_refactor.up.sql @@ -1,13 +1,14 @@ -- Drop stats materialized views and replace with tables. --- Also migrate the data from the old materialized views to the new tables. - BEGIN; --- First create new tables. The _new suffix is used to avoid name conflicts with the old materialized views, --- and will be dropped at the end of the migration. +-- Drop the old materialized views. +DROP MATERIALIZED VIEW IF EXISTS stats.daily_tx_volume; +DROP MATERIALIZED VIEW IF EXISTS stats.min5_tx_volume; + +-- Create new tables. -- min5_tx_volume stores the 5-minute tx volumes in 5-minute windows, per layer. -CREATE TABLE stats.min5_tx_volume_new +CREATE TABLE stats.min5_tx_volume ( layer TEXT NOT NULL, window_end TIMESTAMP WITH TIME ZONE NOT NULL, @@ -16,7 +17,7 @@ CREATE TABLE stats.min5_tx_volume_new PRIMARY KEY (layer, window_end) ); -- daily_tx_volume stores the sliding window for the number of transactions per day per layer. -CREATE TABLE stats.daily_tx_volume_new +CREATE TABLE stats.daily_tx_volume ( layer TEXT NOT NULL, window_end TIMESTAMP WITH TIME ZONE NOT NULL, @@ -25,49 +26,6 @@ CREATE TABLE stats.daily_tx_volume_new PRIMARY KEY (layer, window_end) ); -- Index for efficient query of the daily samples. -CREATE INDEX ix_stats_daily_tx_volume_daily_windows ON stats.daily_tx_volume_new (layer, window_end) WHERE ((window_end AT TIME ZONE 'UTC')::time = '00:00:00'); - --- Migrate the data from the old materialized views to the new tables. -INSERT INTO stats.min5_tx_volume_new -SELECT layer, window_start + INTERVAL '5 minute', tx_volume -FROM stats.min5_tx_volume; - -INSERT INTO stats.daily_tx_volume_new -SELECT layer, window_start + INTERVAL '1 day', tx_volume -FROM stats.daily_tx_volume; - --- Drop the old materialized views. -DROP MATERIALIZED VIEW IF EXISTS stats.daily_tx_volume; -DROP MATERIALIZED VIEW IF EXISTS stats.min5_tx_volume; - --- Drop the last window for each of the daily_tx_volume layers, since that window can be incomplete. -WITH RankedRows AS ( - SELECT - layer, - window_end, - ROW_NUMBER() OVER (PARTITION BY layer ORDER BY window_end DESC) AS rnum - FROM - stats.daily_tx_volume_new -) -DELETE FROM stats.daily_tx_volume_new -WHERE (layer, window_end) IN (SELECT layer, window_end FROM RankedRows WHERE rnum = 1); - --- Drop the last window for each of the min5_tx_volume layers, since that window can be incomplete. -WITH RankedRows AS ( - SELECT - layer, - window_end, - ROW_NUMBER() OVER (PARTITION BY layer ORDER BY window_end DESC) AS rnum - FROM - stats.min5_tx_volume_new -) -DELETE FROM stats.min5_tx_volume_new -WHERE (layer, window_end) IN (SELECT layer, window_end FROM RankedRows WHERE rnum = 1); - --- Drop the _new suffixes from the new tables. -ALTER TABLE stats.min5_tx_volume_new -RENAME TO min5_tx_volume; -ALTER TABLE stats.daily_tx_volume_new -RENAME TO daily_tx_volume; +CREATE INDEX ix_stats_daily_tx_volume_daily_windows ON stats.daily_tx_volume (layer, window_end) WHERE ((window_end AT TIME ZONE 'UTC')::time = '00:00:00'); COMMIT;