From ebfcc4171c707d34bd34d240fc5cc4abdcf639e4 Mon Sep 17 00:00:00 2001 From: Geoffrey Ragot Date: Tue, 29 Oct 2024 15:28:41 +0100 Subject: [PATCH] fix: slow migrations --- go.mod | 3 +- go.sum | 12 +++ .../12-moves-fill-transaction-id/up.sql | 4 +- .../13-transactions-fill-inserted-at/up.sql | 40 ++++++--- .../14-transactions-fill-pcv/up.sql | 11 ++- .../15-accounts-volumes-fill-history/up.sql | 84 ++++++++----------- .../up.sql | 2 +- .../17-accounts-metadata-fill-address/up.sql | 2 +- .../migrations/18-logs-fill-memento/up.sql | 2 +- 9 files changed, 92 insertions(+), 68 deletions(-) diff --git a/go.mod b/go.mod index 27f4a979f..1acb5b47f 100644 --- a/go.mod +++ b/go.mod @@ -14,7 +14,7 @@ require ( github.com/antlr/antlr4/runtime/Go/antlr v1.4.10 github.com/bluele/gcache v0.0.2 github.com/dop251/goja v0.0.0-20241009100908-5f46f2705ca3 - github.com/formancehq/go-libs/v2 v2.0.1-0.20241028161955-be554f388de7 + github.com/formancehq/go-libs/v2 v2.0.1-0.20241029111513-edb146ee0db7 github.com/formancehq/ledger/pkg/client v0.0.0-00010101000000-000000000000 github.com/go-chi/chi/v5 v5.1.0 github.com/go-chi/cors v1.2.1 @@ -54,7 +54,6 @@ require gopkg.in/yaml.v3 v3.0.1 // indirect require ( github.com/hashicorp/go-hclog v1.6.3 // indirect github.com/jackc/pgxlisten v0.0.0-20241005155529-9d952acd6a6c // indirect - gopkg.in/yaml.v3 v3.0.1 // indirect ) require ( diff --git a/go.sum b/go.sum index f85cc5863..9eb3bfa3c 100644 --- a/go.sum +++ b/go.sum @@ -124,6 +124,18 @@ github.com/formancehq/go-libs/v2 v2.0.1-0.20241028161858-847ed1327254 h1:KukRMj3 github.com/formancehq/go-libs/v2 v2.0.1-0.20241028161858-847ed1327254/go.mod h1:DTqSp28pYPZa4O1WrOg3kobhgTHdk9geGtxnws9EViM= github.com/formancehq/go-libs/v2 v2.0.1-0.20241028161955-be554f388de7 h1:PGSYMvrTg5CWhhfJYONfUBybFsCa+NKx6J1MIXDi9tw= github.com/formancehq/go-libs/v2 v2.0.1-0.20241028161955-be554f388de7/go.mod h1:DTqSp28pYPZa4O1WrOg3kobhgTHdk9geGtxnws9EViM= +github.com/formancehq/go-libs/v2 v2.0.1-0.20241029104753-6ce97ad02c66 h1:RUT4O/agbd0B/zLkhZczb9mDl94YRLQBLQbeKC7WdYA= +github.com/formancehq/go-libs/v2 v2.0.1-0.20241029104753-6ce97ad02c66/go.mod h1:DTqSp28pYPZa4O1WrOg3kobhgTHdk9geGtxnws9EViM= +github.com/formancehq/go-libs/v2 v2.0.1-0.20241029105339-57d5de8a7888 h1:ZYGF0OoJuXpBkiYyQjXel8w+iyZOrRiBrUVl4VpIwow= +github.com/formancehq/go-libs/v2 v2.0.1-0.20241029105339-57d5de8a7888/go.mod h1:DTqSp28pYPZa4O1WrOg3kobhgTHdk9geGtxnws9EViM= +github.com/formancehq/go-libs/v2 v2.0.1-0.20241029110203-2b35329b59c2 h1:T6l0klF21gEjP8q2DwaRDhUtGKaS/vysOxTCpKnFb7E= +github.com/formancehq/go-libs/v2 v2.0.1-0.20241029110203-2b35329b59c2/go.mod h1:DTqSp28pYPZa4O1WrOg3kobhgTHdk9geGtxnws9EViM= +github.com/formancehq/go-libs/v2 v2.0.1-0.20241029110402-5763c135e073 h1:Eb1fOVBuAxbJKc8g1A/VyRJ662rWBiNSnjxP89+mV6s= +github.com/formancehq/go-libs/v2 v2.0.1-0.20241029110402-5763c135e073/go.mod h1:DTqSp28pYPZa4O1WrOg3kobhgTHdk9geGtxnws9EViM= +github.com/formancehq/go-libs/v2 v2.0.1-0.20241029110646-2504d85e4258 h1:L0af9QJR2XS23KMXj6fOaRwK4xVGq0RYpUJ4zdED6vY= +github.com/formancehq/go-libs/v2 v2.0.1-0.20241029110646-2504d85e4258/go.mod h1:DTqSp28pYPZa4O1WrOg3kobhgTHdk9geGtxnws9EViM= +github.com/formancehq/go-libs/v2 v2.0.1-0.20241029111513-edb146ee0db7 h1:OZz4N9nIj814aIgpqIvojndtae+N9Vqj5MJgKPIzJ5Y= +github.com/formancehq/go-libs/v2 v2.0.1-0.20241029111513-edb146ee0db7/go.mod h1:DTqSp28pYPZa4O1WrOg3kobhgTHdk9geGtxnws9EViM= github.com/formancehq/numscript v0.0.9-0.20241009144012-1150c14a1417 h1:LOd5hxnXDIBcehFrpW1OnXk+VSs0yJXeu1iAOO+Hji4= github.com/formancehq/numscript v0.0.9-0.20241009144012-1150c14a1417/go.mod h1:btuSv05cYwi9BvLRxVs5zrunU+O1vTgigG1T6UsawcY= github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw= diff --git a/internal/storage/bucket/migrations/12-moves-fill-transaction-id/up.sql b/internal/storage/bucket/migrations/12-moves-fill-transaction-id/up.sql index ab0db072e..5c486cf6e 100644 --- a/internal/storage/bucket/migrations/12-moves-fill-transaction-id/up.sql +++ b/internal/storage/bucket/migrations/12-moves-fill-transaction-id/up.sql @@ -1,10 +1,12 @@ do $$ declare - _batch_size integer := 30; + _batch_size integer := 100; _max integer; begin set search_path = '{{.Schema}}'; + create index moves_transactions_id on moves(transactions_id); + select count(seq) from moves where transactions_id is null diff --git a/internal/storage/bucket/migrations/13-transactions-fill-inserted-at/up.sql b/internal/storage/bucket/migrations/13-transactions-fill-inserted-at/up.sql index 58fac0dc4..d7e500d80 100644 --- a/internal/storage/bucket/migrations/13-transactions-fill-inserted-at/up.sql +++ b/internal/storage/bucket/migrations/13-transactions-fill-inserted-at/up.sql @@ -1,9 +1,10 @@ do $$ declare - _batch_size integer := 30; + _batch_size integer := 100; _date timestamp without time zone; _count integer := 0; begin + --todo: take explicit advisory lock to avoid concurrent migrations when the service is killed set search_path = '{{.Schema}}'; -- select the date where the "11-make-stateless" migration has been applied @@ -11,25 +12,42 @@ do $$ from _system.goose_db_version where version_id = 12; - select count(*) into _count + create temporary table logs_transactions as + select id, ledger, date, (data->'transaction'->>'id')::bigint as transaction_id from logs where date <= _date; + create index on logs_transactions (ledger, transaction_id) include (id, date); + + select count(*) into _count + from logs_transactions; + perform pg_notify('migrations-{{ .Schema }}', 'init: ' || _count); - for i in 1.._count by _batch_size loop - -- todo: disable triggers! - update transactions - set inserted_at = ( - select date - from logs - where transactions.id = (data->'transaction'->>'id')::bigint and transactions.ledger = ledger + for i in 0.._count by _batch_size loop + -- disable triggers + set session_replication_role = replica; + + with _rows as ( + select * + from logs_transactions + order by ledger, transaction_id + offset i + limit _batch_size ) - where id >= i and id < i + _batch_size; + update transactions + set inserted_at = _rows.date + from _rows + where transactions.ledger = _rows.ledger and transactions.id = _rows.transaction_id; + + -- enable triggers + set session_replication_role = default; commit; - perform pg_notify('migrations-{{ .Schema }}', 'continue: 1'); + perform pg_notify('migrations-{{ .Schema }}', 'continue: ' || _batch_size); end loop; + + drop table logs_transactions; end $$; \ No newline at end of file diff --git a/internal/storage/bucket/migrations/14-transactions-fill-pcv/up.sql b/internal/storage/bucket/migrations/14-transactions-fill-pcv/up.sql index a806b6933..39dd9e9f4 100644 --- a/internal/storage/bucket/migrations/14-transactions-fill-pcv/up.sql +++ b/internal/storage/bucket/migrations/14-transactions-fill-pcv/up.sql @@ -1,6 +1,6 @@ do $$ declare - _batch_size integer := 30; + _batch_size integer := 100; _count integer; begin set search_path = '{{.Schema}}'; @@ -13,6 +13,9 @@ do $$ perform pg_notify('migrations-{{ .Schema }}', 'init: ' || _count); loop + -- disable triggers + set session_replication_role = replica; + with _outdated_transactions as ( select id from transactions @@ -43,10 +46,14 @@ do $$ from _outdated_transactions where transactions.id in (_outdated_transactions.id); + -- enable triggers + set session_replication_role = default; + exit when not found; - perform pg_notify('migrations-{{ .Schema }}', 'continue: ' || _batch_size); + commit; + perform pg_notify('migrations-{{ .Schema }}', 'continue: ' || _batch_size); end loop; alter table transactions diff --git a/internal/storage/bucket/migrations/15-accounts-volumes-fill-history/up.sql b/internal/storage/bucket/migrations/15-accounts-volumes-fill-history/up.sql index 071e5513c..f77f2a0ec 100644 --- a/internal/storage/bucket/migrations/15-accounts-volumes-fill-history/up.sql +++ b/internal/storage/bucket/migrations/15-accounts-volumes-fill-history/up.sql @@ -1,68 +1,54 @@ do $$ declare - _missing record; _count integer; + _batch_size integer := 100; begin set search_path = '{{.Schema}}'; + create temporary table tmp_volumes as + select distinct on (ledger, accounts_address, asset) + ledger, + accounts_address, + asset, + first_value(post_commit_volumes) over ( + partition by ledger, accounts_address, asset + order by seq desc + ) as post_commit_volumes + from moves + where not exists( + select + from accounts_volumes + where ledger = moves.ledger + and asset = moves.asset + and accounts_address = moves.accounts_address + ); + select count(*) - from ( - select distinct on (ledger, accounts_address, asset) - ledger, - accounts_address, - asset, - first_value(post_commit_volumes) over ( - partition by ledger, accounts_address, asset - order by seq desc - ) as post_commit_volumes - from moves - where not exists( - select - from accounts_volumes - where ledger = moves.ledger - and asset = moves.asset - and accounts_address = moves.accounts_address - ) - ) data + from tmp_volumes into _count; perform pg_notify('migrations-{{ .Schema }}', 'init: ' || _count); - loop - select distinct on (ledger, accounts_address, asset) - ledger, - accounts_address, - asset, - first_value(post_commit_volumes) over ( - partition by ledger, accounts_address, asset - order by seq desc - ) as post_commit_volumes - into _missing - from moves - where not exists( - select - from accounts_volumes - where ledger = moves.ledger - and asset = moves.asset - and accounts_address = moves.accounts_address - ) - limit 1; + raise info '_count: %', _count; - exit when not found; - - insert into accounts_volumes (ledger, accounts_address, asset, input, output) - values ( - _missing.ledger, - _missing.accounts_address, - _missing.asset, - (_missing.post_commit_volumes).inputs, - (_missing.post_commit_volumes).outputs + for i in 0.._count by _batch_size loop + with _rows as ( + select * + from tmp_volumes + offset i + limit _batch_size ) + insert into accounts_volumes (ledger, accounts_address, asset, input, output) + select ledger, accounts_address, asset, (post_commit_volumes).inputs, (post_commit_volumes).outputs + from _rows on conflict do nothing; -- can be inserted by a concurrent transaction - perform pg_notify('migrations-{{ .Schema }}', 'continue: 1'); - commit; + + perform pg_notify('migrations-{{ .Schema }}', 'continue: ' || _batch_size); + end loop; + + drop table tmp_volumes; end $$; \ No newline at end of file diff --git a/internal/storage/bucket/migrations/16-transactions-metadata-fill-transaction-id/up.sql b/internal/storage/bucket/migrations/16-transactions-metadata-fill-transaction-id/up.sql index 86019a0e5..7823fa915 100644 --- a/internal/storage/bucket/migrations/16-transactions-metadata-fill-transaction-id/up.sql +++ b/internal/storage/bucket/migrations/16-transactions-metadata-fill-transaction-id/up.sql @@ -1,7 +1,7 @@ do $$ declare - _batch_size integer := 30; + _batch_size integer := 100; _count integer; begin set search_path = '{{.Schema}}'; diff --git a/internal/storage/bucket/migrations/17-accounts-metadata-fill-address/up.sql b/internal/storage/bucket/migrations/17-accounts-metadata-fill-address/up.sql index 24887b1b9..752ef3cfd 100644 --- a/internal/storage/bucket/migrations/17-accounts-metadata-fill-address/up.sql +++ b/internal/storage/bucket/migrations/17-accounts-metadata-fill-address/up.sql @@ -1,7 +1,7 @@ do $$ declare - _batch_size integer := 30; + _batch_size integer := 100; _count integer; begin set search_path = '{{.Schema}}'; diff --git a/internal/storage/bucket/migrations/18-logs-fill-memento/up.sql b/internal/storage/bucket/migrations/18-logs-fill-memento/up.sql index dabaf8033..7923084b3 100644 --- a/internal/storage/bucket/migrations/18-logs-fill-memento/up.sql +++ b/internal/storage/bucket/migrations/18-logs-fill-memento/up.sql @@ -1,6 +1,6 @@ do $$ declare - _batch_size integer := 30; + _batch_size integer := 100; _count integer; begin set search_path = '{{.Schema}}';