Skip to content

Commit

Permalink
fix: slow migrations
Browse files Browse the repository at this point in the history
  • Loading branch information
gfyrag committed Oct 29, 2024
1 parent 95fe7c0 commit ebfcc41
Show file tree
Hide file tree
Showing 9 changed files with 92 additions and 68 deletions.
3 changes: 1 addition & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ require (
github.com/antlr/antlr4/runtime/Go/antlr v1.4.10
github.com/bluele/gcache v0.0.2
github.com/dop251/goja v0.0.0-20241009100908-5f46f2705ca3
github.com/formancehq/go-libs/v2 v2.0.1-0.20241028161955-be554f388de7
github.com/formancehq/go-libs/v2 v2.0.1-0.20241029111513-edb146ee0db7
github.com/formancehq/ledger/pkg/client v0.0.0-00010101000000-000000000000
github.com/go-chi/chi/v5 v5.1.0
github.com/go-chi/cors v1.2.1
Expand Down Expand Up @@ -54,7 +54,6 @@ require gopkg.in/yaml.v3 v3.0.1 // indirect
require (
github.com/hashicorp/go-hclog v1.6.3 // indirect
github.com/jackc/pgxlisten v0.0.0-20241005155529-9d952acd6a6c // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)

require (
Expand Down
12 changes: 12 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,18 @@ github.com/formancehq/go-libs/v2 v2.0.1-0.20241028161858-847ed1327254 h1:KukRMj3
github.com/formancehq/go-libs/v2 v2.0.1-0.20241028161858-847ed1327254/go.mod h1:DTqSp28pYPZa4O1WrOg3kobhgTHdk9geGtxnws9EViM=
github.com/formancehq/go-libs/v2 v2.0.1-0.20241028161955-be554f388de7 h1:PGSYMvrTg5CWhhfJYONfUBybFsCa+NKx6J1MIXDi9tw=
github.com/formancehq/go-libs/v2 v2.0.1-0.20241028161955-be554f388de7/go.mod h1:DTqSp28pYPZa4O1WrOg3kobhgTHdk9geGtxnws9EViM=
github.com/formancehq/go-libs/v2 v2.0.1-0.20241029104753-6ce97ad02c66 h1:RUT4O/agbd0B/zLkhZczb9mDl94YRLQBLQbeKC7WdYA=
github.com/formancehq/go-libs/v2 v2.0.1-0.20241029104753-6ce97ad02c66/go.mod h1:DTqSp28pYPZa4O1WrOg3kobhgTHdk9geGtxnws9EViM=
github.com/formancehq/go-libs/v2 v2.0.1-0.20241029105339-57d5de8a7888 h1:ZYGF0OoJuXpBkiYyQjXel8w+iyZOrRiBrUVl4VpIwow=
github.com/formancehq/go-libs/v2 v2.0.1-0.20241029105339-57d5de8a7888/go.mod h1:DTqSp28pYPZa4O1WrOg3kobhgTHdk9geGtxnws9EViM=
github.com/formancehq/go-libs/v2 v2.0.1-0.20241029110203-2b35329b59c2 h1:T6l0klF21gEjP8q2DwaRDhUtGKaS/vysOxTCpKnFb7E=
github.com/formancehq/go-libs/v2 v2.0.1-0.20241029110203-2b35329b59c2/go.mod h1:DTqSp28pYPZa4O1WrOg3kobhgTHdk9geGtxnws9EViM=
github.com/formancehq/go-libs/v2 v2.0.1-0.20241029110402-5763c135e073 h1:Eb1fOVBuAxbJKc8g1A/VyRJ662rWBiNSnjxP89+mV6s=
github.com/formancehq/go-libs/v2 v2.0.1-0.20241029110402-5763c135e073/go.mod h1:DTqSp28pYPZa4O1WrOg3kobhgTHdk9geGtxnws9EViM=
github.com/formancehq/go-libs/v2 v2.0.1-0.20241029110646-2504d85e4258 h1:L0af9QJR2XS23KMXj6fOaRwK4xVGq0RYpUJ4zdED6vY=
github.com/formancehq/go-libs/v2 v2.0.1-0.20241029110646-2504d85e4258/go.mod h1:DTqSp28pYPZa4O1WrOg3kobhgTHdk9geGtxnws9EViM=
github.com/formancehq/go-libs/v2 v2.0.1-0.20241029111513-edb146ee0db7 h1:OZz4N9nIj814aIgpqIvojndtae+N9Vqj5MJgKPIzJ5Y=
github.com/formancehq/go-libs/v2 v2.0.1-0.20241029111513-edb146ee0db7/go.mod h1:DTqSp28pYPZa4O1WrOg3kobhgTHdk9geGtxnws9EViM=
github.com/formancehq/numscript v0.0.9-0.20241009144012-1150c14a1417 h1:LOd5hxnXDIBcehFrpW1OnXk+VSs0yJXeu1iAOO+Hji4=
github.com/formancehq/numscript v0.0.9-0.20241009144012-1150c14a1417/go.mod h1:btuSv05cYwi9BvLRxVs5zrunU+O1vTgigG1T6UsawcY=
github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw=
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
do $$
declare
_batch_size integer := 30;
_batch_size integer := 100;
_max integer;
begin
set search_path = '{{.Schema}}';

create index moves_transactions_id on moves(transactions_id);

select count(seq)
from moves
where transactions_id is null
Expand Down
Original file line number Diff line number Diff line change
@@ -1,35 +1,53 @@
do $$
declare
_batch_size integer := 30;
_batch_size integer := 100;
_date timestamp without time zone;
_count integer := 0;
begin
--todo: take explicit advisory lock to avoid concurrent migrations when the service is killed
set search_path = '{{.Schema}}';

-- select the date where the "11-make-stateless" migration has been applied
select tstamp into _date
from _system.goose_db_version
where version_id = 12;

select count(*) into _count
create temporary table logs_transactions as
select id, ledger, date, (data->'transaction'->>'id')::bigint as transaction_id
from logs
where date <= _date;

create index on logs_transactions (ledger, transaction_id) include (id, date);

select count(*) into _count
from logs_transactions;

perform pg_notify('migrations-{{ .Schema }}', 'init: ' || _count);

for i in 1.._count by _batch_size loop
-- todo: disable triggers!
update transactions
set inserted_at = (
select date
from logs
where transactions.id = (data->'transaction'->>'id')::bigint and transactions.ledger = ledger
for i in 0.._count by _batch_size loop
-- disable triggers
set session_replication_role = replica;

with _rows as (
select *
from logs_transactions
order by ledger, transaction_id
offset i
limit _batch_size
)
where id >= i and id < i + _batch_size;
update transactions
set inserted_at = _rows.date
from _rows
where transactions.ledger = _rows.ledger and transactions.id = _rows.transaction_id;

-- enable triggers
set session_replication_role = default;

commit;

perform pg_notify('migrations-{{ .Schema }}', 'continue: 1');
perform pg_notify('migrations-{{ .Schema }}', 'continue: ' || _batch_size);
end loop;

drop table logs_transactions;
end
$$;
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
do $$
declare
_batch_size integer := 30;
_batch_size integer := 100;
_count integer;
begin
set search_path = '{{.Schema}}';
Expand All @@ -13,6 +13,9 @@ do $$
perform pg_notify('migrations-{{ .Schema }}', 'init: ' || _count);

loop
-- disable triggers
set session_replication_role = replica;

with _outdated_transactions as (
select id
from transactions
Expand Down Expand Up @@ -43,10 +46,14 @@ do $$
from _outdated_transactions
where transactions.id in (_outdated_transactions.id);

-- enable triggers
set session_replication_role = default;

exit when not found;

perform pg_notify('migrations-{{ .Schema }}', 'continue: ' || _batch_size);
commit;

perform pg_notify('migrations-{{ .Schema }}', 'continue: ' || _batch_size);
end loop;

alter table transactions
Expand Down
Original file line number Diff line number Diff line change
@@ -1,68 +1,54 @@
do $$
declare
_missing record;
_count integer;
_batch_size integer := 100;
begin
set search_path = '{{.Schema}}';

create temporary table tmp_volumes as
select distinct on (ledger, accounts_address, asset)
ledger,
accounts_address,
asset,
first_value(post_commit_volumes) over (
partition by ledger, accounts_address, asset
order by seq desc
) as post_commit_volumes
from moves
where not exists(
select
from accounts_volumes
where ledger = moves.ledger
and asset = moves.asset
and accounts_address = moves.accounts_address
);

select count(*)
from (
select distinct on (ledger, accounts_address, asset)
ledger,
accounts_address,
asset,
first_value(post_commit_volumes) over (
partition by ledger, accounts_address, asset
order by seq desc
) as post_commit_volumes
from moves
where not exists(
select
from accounts_volumes
where ledger = moves.ledger
and asset = moves.asset
and accounts_address = moves.accounts_address
)
) data
from tmp_volumes
into _count;

perform pg_notify('migrations-{{ .Schema }}', 'init: ' || _count);

loop
select distinct on (ledger, accounts_address, asset)
ledger,
accounts_address,
asset,
first_value(post_commit_volumes) over (
partition by ledger, accounts_address, asset
order by seq desc
) as post_commit_volumes
into _missing
from moves
where not exists(
select
from accounts_volumes
where ledger = moves.ledger
and asset = moves.asset
and accounts_address = moves.accounts_address
)
limit 1;
raise info '_count: %', _count;

exit when not found;

insert into accounts_volumes (ledger, accounts_address, asset, input, output)
values (
_missing.ledger,
_missing.accounts_address,
_missing.asset,
(_missing.post_commit_volumes).inputs,
(_missing.post_commit_volumes).outputs
for i in 0.._count by _batch_size loop
with _rows as (
select *
from tmp_volumes
offset i
limit _batch_size
)
insert into accounts_volumes (ledger, accounts_address, asset, input, output)
select ledger, accounts_address, asset, (post_commit_volumes).inputs, (post_commit_volumes).outputs
from _rows
on conflict do nothing; -- can be inserted by a concurrent transaction

perform pg_notify('migrations-{{ .Schema }}', 'continue: 1');

commit;

perform pg_notify('migrations-{{ .Schema }}', 'continue: ' || _batch_size);

end loop;

drop table tmp_volumes;
end
$$;
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@

do $$
declare
_batch_size integer := 30;
_batch_size integer := 100;
_count integer;
begin
set search_path = '{{.Schema}}';
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@

do $$
declare
_batch_size integer := 30;
_batch_size integer := 100;
_count integer;
begin
set search_path = '{{.Schema}}';
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
do $$
declare
_batch_size integer := 30;
_batch_size integer := 100;
_count integer;
begin
set search_path = '{{.Schema}}';
Expand Down

0 comments on commit ebfcc41

Please sign in to comment.