Skip to content

Commit

Permalink
aggregate_stats: don't migrate data from views
Browse files Browse the repository at this point in the history
  • Loading branch information
ptrus committed Aug 8, 2023
1 parent ee2815f commit e9fbabb
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 50 deletions.
14 changes: 13 additions & 1 deletion analyzer/aggregate_stats/aggregate_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ const (
// The workers only computes new data and iteration with no updates are cheap
// so it's fine to run this frequently.
statsComputationInterval = 1 * time.Minute

// Interval between stat computation iterations if the aggregate stats worker is catching up
// (indicated by the fact that the per iteration batch limit was reached).
statsComputationIntervalCatchup = 5 * time.Second
)

// Layers that are tracked for aggregate stats.
Expand Down Expand Up @@ -169,6 +173,8 @@ func (a *aggregateStatsAnalyzer) aggregateStatsWorker(ctx context.Context) {
}

for {
// If batch limit was reached, start the next iteration sooner.
var batchLimitReached bool
for _, statsComputation := range statsComputations {
logger := a.logger.With("name", statsComputation.name, "layer", statsComputation.layer)

Expand Down Expand Up @@ -243,6 +249,7 @@ func (a *aggregateStatsAnalyzer) aggregateStatsWorker(ctx context.Context) {
batch.Extend(queries)

if batch.Len() > statsBatchLimit {
batchLimitReached = true
break
}
latestComputed = nextWindow
Expand All @@ -253,8 +260,13 @@ func (a *aggregateStatsAnalyzer) aggregateStatsWorker(ctx context.Context) {
logger.Info("updated stats", "num_inserts", batch.Len())
}

timeout := statsComputationInterval
if batchLimitReached {
// Batch limit reached, use the shorter stats-catchup timeout.
timeout = statsComputationIntervalCatchup
}
select {
case <-time.After(statsComputationInterval):
case <-time.After(timeout):
// Update stats again.
case <-ctx.Done():
a.logger.Error("shutting down aggregate stats worker", "reason", ctx.Err())
Expand Down
56 changes: 7 additions & 49 deletions storage/migrations/11_agg_stats_refactor.up.sql
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
-- Drop stats materialized views and replace with tables.
-- Also migrate the data from the old materialized views to the new tables.

BEGIN;

-- First create new tables. The _new suffix is used to avoid name conflicts with the old materialized views,
-- and will be dropped at the end of the migration.
-- Drop the old materialized views.
DROP MATERIALIZED VIEW IF EXISTS stats.daily_tx_volume;
DROP MATERIALIZED VIEW IF EXISTS stats.min5_tx_volume;

-- Create new tables.

-- min5_tx_volume stores the 5-minute tx volumes in 5-minute windows, per layer.
CREATE TABLE stats.min5_tx_volume_new
CREATE TABLE stats.min5_tx_volume
(
layer TEXT NOT NULL,
window_end TIMESTAMP WITH TIME ZONE NOT NULL,
Expand All @@ -16,7 +17,7 @@ CREATE TABLE stats.min5_tx_volume_new
PRIMARY KEY (layer, window_end)
);
-- daily_tx_volume stores the sliding window for the number of transactions per day per layer.
CREATE TABLE stats.daily_tx_volume_new
CREATE TABLE stats.daily_tx_volume
(
layer TEXT NOT NULL,
window_end TIMESTAMP WITH TIME ZONE NOT NULL,
Expand All @@ -27,47 +28,4 @@ CREATE TABLE stats.daily_tx_volume_new
-- Index for efficient query of the daily samples.
CREATE INDEX ix_stats_daily_tx_volume_daily_windows ON stats.daily_tx_volume_new (layer, window_end) WHERE ((window_end AT TIME ZONE 'UTC')::time = '00:00:00');

-- Migrate the data from the old materialized views to the new tables.
INSERT INTO stats.min5_tx_volume_new
SELECT layer, window_start + INTERVAL '5 minute', tx_volume
FROM stats.min5_tx_volume;

INSERT INTO stats.daily_tx_volume_new
SELECT layer, window_start + INTERVAL '1 day', tx_volume
FROM stats.daily_tx_volume;

-- Drop the old materialized views.
DROP MATERIALIZED VIEW IF EXISTS stats.daily_tx_volume;
DROP MATERIALIZED VIEW IF EXISTS stats.min5_tx_volume;

-- Drop the last window for each of the daily_tx_volume layers, since that window can be incomplete.
WITH RankedRows AS (
SELECT
layer,
window_end,
ROW_NUMBER() OVER (PARTITION BY layer ORDER BY window_end DESC) AS rnum
FROM
stats.daily_tx_volume_new
)
DELETE FROM stats.daily_tx_volume_new
WHERE (layer, window_end) IN (SELECT layer, window_end FROM RankedRows WHERE rnum = 1);

-- Drop the last window for each of the min5_tx_volume layers, since that window can be incomplete.
WITH RankedRows AS (
SELECT
layer,
window_end,
ROW_NUMBER() OVER (PARTITION BY layer ORDER BY window_end DESC) AS rnum
FROM
stats.min5_tx_volume_new
)
DELETE FROM stats.min5_tx_volume_new
WHERE (layer, window_end) IN (SELECT layer, window_end FROM RankedRows WHERE rnum = 1);

-- Drop the _new suffixes from the new tables.
ALTER TABLE stats.min5_tx_volume_new
RENAME TO min5_tx_volume;
ALTER TABLE stats.daily_tx_volume_new
RENAME TO daily_tx_volume;

COMMIT;

0 comments on commit e9fbabb

Please sign in to comment.