Skip to content

Commit

Permalink
feat: migrations post stateless version (#515)
Browse files Browse the repository at this point in the history
* feat: migrate old data

* fix: dependencies

* chore: some fix

* fix: slow migrations

* chore: reorder migrations and clean next-minor todos

* feat: adapt for v2.2 merge
  • Loading branch information
gfyrag authored Nov 26, 2024
1 parent 30c76ee commit d7ba7d8
Show file tree
Hide file tree
Showing 43 changed files with 449 additions and 133 deletions.
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,11 @@ require (
golang.org/x/sync v0.9.0
)

require gopkg.in/yaml.v3 v3.0.1 // indirect

require (
github.com/hashicorp/go-hclog v1.6.3 // indirect
github.com/jackc/pgxlisten v0.0.0-20241005155529-9d952acd6a6c // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)

require (
Expand Down
1 change: 1 addition & 0 deletions internal/storage/bucket/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ type Bucket interface {
Migrate(ctx context.Context, minimalVersionReached chan struct{}, opts ...migrations.Option) error
AddLedger(ctx context.Context, ledger ledger.Ledger) error
HasMinimalVersion(ctx context.Context) (bool, error)
IsUpToDate(ctx context.Context) (bool, error)
GetMigrationsInfo(ctx context.Context) ([]migrations.Info, error)
}

Expand Down
11 changes: 9 additions & 2 deletions internal/storage/bucket/default_bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,27 @@ type DefaultBucket struct {
tracer trace.Tracer
}

func (b *DefaultBucket) IsUpToDate(ctx context.Context) (bool, error) {
return GetMigrator(b.db, b.name).IsUpToDate(ctx)
}

func (b *DefaultBucket) Migrate(ctx context.Context, minimalVersionReached chan struct{}, options ...migrations.Option) error {
return migrate(ctx, b.tracer, b.db, b.name, minimalVersionReached, options...)
}

func (b *DefaultBucket) HasMinimalVersion(ctx context.Context) (bool, error) {
migrator := GetMigrator(b.db, b.name)
lastVersion, err := migrator.GetLastVersion(ctx)
lastVersion, err := b.GetLastVersion(ctx)
if err != nil {
return false, err
}

return lastVersion >= MinimalSchemaVersion, nil
}

func (b *DefaultBucket) GetLastVersion(ctx context.Context) (int, error) {
return GetMigrator(b.db, b.name).GetLastVersion(ctx)
}

func (b *DefaultBucket) GetMigrationsInfo(ctx context.Context) ([]migrations.Info, error) {
return GetMigrator(b.db, b.name).GetMigrations(ctx)
}
Expand Down
2 changes: 1 addition & 1 deletion internal/storage/bucket/migrations/1-fix-trigger/up.sql
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,4 @@ begin
posting ->> 'destination', posting ->> 'asset', (posting ->> 'amount')::numeric, false,
_destination_exists);
end;
$$ set search_path from current;
$$ set search_path from current;
3 changes: 0 additions & 3 deletions internal/storage/bucket/migrations/11-make-stateless/up.sql
Original file line number Diff line number Diff line change
Expand Up @@ -200,8 +200,6 @@ execute procedure set_compat_on_transactions_metadata();

alter table transactions
add column post_commit_volumes jsonb,
-- todo: set in subsequent migration `default transaction_date()`,
-- otherwise the function is called for every existing lines
add column inserted_at timestamp without time zone,
alter column timestamp set default transaction_date()
-- todo: we should change the type of this column, but actually it cause a full lock of the table
Expand Down Expand Up @@ -363,7 +361,6 @@ from (select row_number() over () as number, v.value
select null) v) data
$$ set search_path from current;

-- todo(next-minor): remove that on future version when the table will have this default value (need to fill nulls before)
create or replace function set_transaction_inserted_at() returns trigger
security definer
language plpgsql
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
name: Fill transaction ids of table moves
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
do $$
declare
_batch_size integer := 100;
_max integer;
begin
set search_path = '{{.Schema}}';

create index moves_transactions_id on moves(transactions_id);

select count(seq)
from moves
where transactions_id is null
into _max;

perform pg_notify('migrations-{{ .Schema }}', 'init: ' || _max);
loop

with _outdated_moves as (
select *
from moves
where transactions_id is null
limit _batch_size
)
update moves
set transactions_id = (
select id
from transactions
where seq = moves.transactions_seq
)
from _outdated_moves
where moves.seq in (_outdated_moves.seq);

exit when not found;

perform pg_notify('migrations-{{ .Schema }}', 'continue: ' || _batch_size);

commit ;
end loop;

alter table moves
alter column transactions_id set not null;
end
$$
language plpgsql;
Empty file.
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
name: Fill inserted_at column of transactions table
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
do $$
declare
_batch_size integer := 100;
_date timestamp without time zone;
_count integer := 0;
begin
--todo: take explicit advisory lock to avoid concurrent migrations when the service is killed
set search_path = '{{.Schema}}';

-- select the date where the "11-make-stateless" migration has been applied
select tstamp into _date
from _system.goose_db_version
where version_id = 12;

create temporary table logs_transactions as
select id, ledger, date, (data->'transaction'->>'id')::bigint as transaction_id
from logs
where date <= _date;

create index on logs_transactions (ledger, transaction_id) include (id, date);

select count(*) into _count
from logs_transactions;

perform pg_notify('migrations-{{ .Schema }}', 'init: ' || _count);

for i in 0.._count by _batch_size loop
-- disable triggers
set session_replication_role = replica;

with _rows as (
select *
from logs_transactions
order by ledger, transaction_id
offset i
limit _batch_size
)
update transactions
set inserted_at = _rows.date
from _rows
where transactions.ledger = _rows.ledger and transactions.id = _rows.transaction_id;

-- enable triggers
set session_replication_role = default;

commit;

perform pg_notify('migrations-{{ .Schema }}', 'continue: ' || _batch_size);
end loop;

drop table logs_transactions;

alter table transactions
alter column inserted_at set default transaction_date();

drop trigger set_transaction_inserted_at on transactions;
drop function set_transaction_inserted_at;
end
$$;
Empty file.
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
name: Fill post_commit_volumes column of transactions table
62 changes: 62 additions & 0 deletions internal/storage/bucket/migrations/18-transactions-fill-pcv/up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
do $$
declare
_batch_size integer := 100;
_count integer;
begin
set search_path = '{{.Schema}}';

select count(id)
from transactions
where post_commit_volumes is null
into _count;

perform pg_notify('migrations-{{ .Schema }}', 'init: ' || _count);

loop
-- disable triggers
set session_replication_role = replica;

with _outdated_transactions as (
select id
from transactions
where post_commit_volumes is null
limit _batch_size
)
update transactions
set post_commit_volumes = (
select public.aggregate_objects(post_commit_volumes::jsonb) as post_commit_volumes
from (
select accounts_address, json_build_object(accounts_address, post_commit_volumes) post_commit_volumes
from (
select accounts_address, json_build_object(asset, post_commit_volumes) as post_commit_volumes
from (
select distinct on (accounts_address, asset)
accounts_address,
asset,
first_value(post_commit_volumes) over (
partition by accounts_address, asset
order by seq desc
) as post_commit_volumes
from moves
where transactions_id = transactions.id and ledger = transactions.ledger
) moves
) values
) values
)
from _outdated_transactions
where transactions.id in (_outdated_transactions.id);

-- enable triggers
set session_replication_role = default;

exit when not found;

commit;

perform pg_notify('migrations-{{ .Schema }}', 'continue: ' || _batch_size);
end loop;

alter table transactions
alter column post_commit_volumes set not null;
end
$$;
Empty file.
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
name: Populate accounts_volumes table with historic data
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
do $$
declare
_count integer;
_batch_size integer := 100;
begin
set search_path = '{{.Schema}}';

create temporary table tmp_volumes as
select distinct on (ledger, accounts_address, asset)
ledger,
accounts_address,
asset,
first_value(post_commit_volumes) over (
partition by ledger, accounts_address, asset
order by seq desc
) as post_commit_volumes
from moves
where not exists(
select
from accounts_volumes
where ledger = moves.ledger
and asset = moves.asset
and accounts_address = moves.accounts_address
);

select count(*)
from tmp_volumes
into _count;

perform pg_notify('migrations-{{ .Schema }}', 'init: ' || _count);

raise info '_count: %', _count;

for i in 0.._count by _batch_size loop
with _rows as (
select *
from tmp_volumes
offset i
limit _batch_size
)
insert into accounts_volumes (ledger, accounts_address, asset, input, output)
select ledger, accounts_address, asset, (post_commit_volumes).inputs, (post_commit_volumes).outputs
from _rows
on conflict do nothing; -- can be inserted by a concurrent transaction

commit;

perform pg_notify('migrations-{{ .Schema }}', 'continue: ' || _batch_size);

end loop;

drop table tmp_volumes;
end
$$;
Empty file.
Empty file.
Original file line number Diff line number Diff line change
Expand Up @@ -22,5 +22,4 @@ with all_assets as (select v.v as asset
) m on true)
select moves.asset, moves.post_commit_volumes
from moves
$$ set search_path from current;

$$ set search_path from current;
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
name: Fill transactions_id column of transactions_metadata table
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@

do $$
declare
_batch_size integer := 100;
_count integer;
begin
set search_path = '{{.Schema}}';

select count(seq)
from transactions_metadata
where transactions_id is null
into _count;

perform pg_notify('migrations-{{ .Schema }}', 'init: ' || _count);

loop
with _outdated_transactions_metadata as (
select seq
from transactions_metadata
where transactions_id is null
limit _batch_size
)
update transactions_metadata
set transactions_id = (
select id
from transactions
where transactions_metadata.transactions_seq = seq
)
from _outdated_transactions_metadata
where transactions_metadata.seq in (_outdated_transactions_metadata.seq);

exit when not found;

commit;

perform pg_notify('migrations-{{ .Schema }}', 'continue: ' || _batch_size);

end loop;

alter table transactions_metadata
alter column transactions_id set not null ;
end
$$;

Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
name: Fill accounts_address column of accounts_metadata table
Loading

0 comments on commit d7ba7d8

Please sign in to comment.