Skip to content

Commit

Permalink
feat: migrate old data
Browse files Browse the repository at this point in the history
  • Loading branch information
gfyrag committed Oct 29, 2024
1 parent 744002c commit de1229c
Show file tree
Hide file tree
Showing 34 changed files with 343 additions and 9 deletions.
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ require (
golang.org/x/sync v0.8.0
)

require gopkg.in/yaml.v3 v3.0.1 // indirect

require (
github.com/hashicorp/go-hclog v1.6.3 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
Expand Down
8 changes: 4 additions & 4 deletions internal/storage/bucket/migrations.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,12 @@ import (
var MigrationsFS embed.FS

func GetMigrator(db *bun.DB, name string) *migrations.Migrator {
migrator := migrations.NewMigrator(db, migrations.WithSchema(name, true))
migrations, err := migrations.CollectMigrations(MigrationsFS, name)
migrator := migrations.NewMigrator(db, migrations.WithSchema(name))
_migrations, err := migrations.CollectMigrations(MigrationsFS, name)
if err != nil {
panic(err)
}
migrator.RegisterMigrations(migrations...)
migrator.RegisterMigrations(_migrations...)

return migrator
}
Expand All @@ -27,4 +27,4 @@ func migrate(ctx context.Context, tracer trace.Tracer, db *bun.DB, name string)
defer span.End()

return GetMigrator(db, name).Up(ctx)
}
}
2 changes: 1 addition & 1 deletion internal/storage/bucket/migrations/1-fix-trigger/up.sql
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,4 @@ begin
posting ->> 'destination', posting ->> 'asset', (posting ->> 'amount')::numeric, false,
_destination_exists);
end;
$$ set search_path from current;
$$ set search_path from current;
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
name: Fill transaction ids of table moves
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
do $$
declare
_batch_size integer := 30;
_max integer;
begin
set search_path = '{{.Schema}}';

select count(seq)
from moves
where transactions_id is null
into _max;

perform pg_notify('migrations-{{ .Schema }}', 'init: ' || _max);
loop

with _outdated_moves as (
select *
from moves
where transactions_id is null
limit _batch_size
)
update moves
set transactions_id = (
select id
from transactions
where seq = moves.transactions_seq
)
from _outdated_moves
where moves.seq in (_outdated_moves.seq);

exit when not found;

perform pg_notify('migrations-{{ .Schema }}', 'continue: ' || _batch_size);

commit ;
end loop;

alter table moves
alter column transactions_id set not null;
end
$$
language plpgsql;
Empty file.
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
name: Fill inserted_at column of transactions table
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
do $$
declare
_batch_size integer := 30;
_date timestamp without time zone;
_count integer;
begin
set search_path = '{{.Schema}}';

-- select the date where the "11-make-stateless" migration has been applied
select tstamp into _date
from _system.goose_db_version
where version_id = 12;

select count(*) into _count
from logs
where date <= _date;

perform pg_notify('migrations-{{ .Schema }}', 'init: ' || _count);

for i in 0.._count by _batch_size loop
update transactions
set inserted_at = (
select date
from logs
where transactions.id = (data->'transaction'->>'id')::bigint and transactions.ledger = ledger
)
where id >= i and id < i + _batch_size;

commit;

perform pg_notify('migrations-{{ .Schema }}', 'continue: 1');

end loop;
end
$$;
Empty file.
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
name: Fill post_commit_volumes column of transactions table
55 changes: 55 additions & 0 deletions internal/storage/bucket/migrations/14-transactions-fill-pcv/up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
do $$
declare
_batch_size integer := 30;
_count integer;
begin
set search_path = '{{.Schema}}';

select count(id)
from transactions
where post_commit_volumes is null
into _count;

perform pg_notify('migrations-{{ .Schema }}', 'init: ' || _count);

loop
with _outdated_transactions as (
select id
from transactions
where post_commit_volumes is null
limit _batch_size
)
update transactions
set post_commit_volumes = (
select public.aggregate_objects(post_commit_volumes::jsonb) as post_commit_volumes
from (
select accounts_address, json_build_object(accounts_address, post_commit_volumes) post_commit_volumes
from (
select accounts_address, json_build_object(asset, post_commit_volumes) as post_commit_volumes
from (
select distinct on (accounts_address, asset)
accounts_address,
asset,
first_value(post_commit_volumes) over (
partition by accounts_address, asset
order by seq desc
) as post_commit_volumes
from moves
where transactions_id = transactions.id and ledger = transactions.ledger
) moves
) values
) values
)
from _outdated_transactions
where transactions.id in (_outdated_transactions.id);

exit when not found;

perform pg_notify('migrations-{{ .Schema }}', 'continue: ' || _batch_size);

end loop;

alter table transactions
alter column post_commit_volumes set not null;
end
$$;
Empty file.
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
name: Populate accounts_volumes table with historic data
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
do $$
declare
_missing record;
_count integer;
begin
set search_path = '{{.Schema}}';

select count(*)
from (
select distinct on (ledger, accounts_address, asset)
ledger,
accounts_address,
asset,
first_value(post_commit_volumes) over (
partition by ledger, accounts_address, asset
order by seq desc
) as post_commit_volumes
from moves
where not exists(
select
from accounts_volumes
where ledger = moves.ledger
and asset = moves.asset
and accounts_address = moves.accounts_address
)
) data
into _count;

perform pg_notify('migrations-{{ .Schema }}', 'init: ' || _count);

loop
select distinct on (ledger, accounts_address, asset)
ledger,
accounts_address,
asset,
first_value(post_commit_volumes) over (
partition by ledger, accounts_address, asset
order by seq desc
) as post_commit_volumes
into _missing
from moves
where not exists(
select
from accounts_volumes
where ledger = moves.ledger
and asset = moves.asset
and accounts_address = moves.accounts_address
)
limit 1;

exit when not found;

insert into accounts_volumes (ledger, accounts_address, asset, input, output)
values (
_missing.ledger,
_missing.accounts_address,
_missing.asset,
(_missing.post_commit_volumes).inputs,
(_missing.post_commit_volumes).outputs
)
on conflict do nothing; -- can be inserted by a concurrent transaction

perform pg_notify('migrations-{{ .Schema }}', 'continue: 1');

commit;
end loop;
end
$$;
Empty file.
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
name: Fill transactions_id column of transactions_metadata table
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@

do $$
declare
_batch_size integer := 30;
_count integer;
begin
set search_path = '{{.Schema}}';

select count(seq)
from transactions_metadata
where transactions_id is null
into _count;

perform pg_notify('migrations-{{ .Schema }}', 'init: ' || _count);

loop
with _outdated_transactions_metadata as (
select seq
from transactions_metadata
where transactions_id is null
limit _batch_size
)
update transactions_metadata
set transactions_id = (
select id
from transactions
where transactions_metadata.transactions_seq = seq
)
from _outdated_transactions_metadata
where transactions_metadata.seq in (_outdated_transactions_metadata.seq);

exit when not found;

commit;

perform pg_notify('migrations-{{ .Schema }}', 'continue: ' || _batch_size);

end loop;

alter table transactions_metadata
alter column transactions_id set not null ;
end
$$;

Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
name: Fill accounts_address column of accounts_metadata table
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@

do $$
declare
_batch_size integer := 30;
_count integer;
begin
set search_path = '{{.Schema}}';

select count(seq)
from accounts_metadata
where accounts_address is null
into _count;

perform pg_notify('migrations-{{ .Schema }}', 'init: ' || _count);

loop
with _outdated_accounts_metadata as (
select seq
from accounts_metadata
where accounts_address is null
limit _batch_size
)
update accounts_metadata
set accounts_address = (
select address
from accounts
where accounts_metadata.accounts_seq = seq
)
from _outdated_accounts_metadata
where accounts_metadata.seq in (_outdated_accounts_metadata.seq);

exit when not found;

commit;

perform pg_notify('migrations-{{ .Schema }}', 'continue: ' || _batch_size);

end loop;

alter table accounts_metadata
alter column accounts_address set not null ;
end
$$;

Empty file.
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
name: Fill memento column of logs table
38 changes: 38 additions & 0 deletions internal/storage/bucket/migrations/18-logs-fill-memento/up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
do $$
declare
_batch_size integer := 30;
_count integer;
begin
set search_path = '{{.Schema}}';

select count(seq)
from logs
where memento is null
into _count;

perform pg_notify('migrations-{{ .Schema }}', 'init: ' || _count);

loop
with _outdated_logs as (
select seq
from logs
where memento is null
limit _batch_size
)
update logs
set memento = convert_to(data::varchar, 'LATIN1')::bytea
from _outdated_logs
where logs.seq in (_outdated_logs.seq);

exit when not found;

commit;

perform pg_notify('migrations-{{ .Schema }}', 'continue: ' || _batch_size);
end loop;

alter table logs
alter column memento set not null;
end
$$;

Empty file.
Empty file.
Original file line number Diff line number Diff line change
Expand Up @@ -22,5 +22,4 @@ with all_assets as (select v.v as asset
) m on true)
select moves.asset, moves.post_commit_volumes
from moves
$$ set search_path from current;

$$ set search_path from current;
Loading

0 comments on commit de1229c

Please sign in to comment.