-
Notifications
You must be signed in to change notification settings - Fork 101
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
2 changed files
with
54 additions
and
52 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
95 changes: 47 additions & 48 deletions
95
internal/storage/bucket/migrations/19-transactions-fill-pcv/up.sql
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,64 +1,63 @@ | ||
do $$ | ||
declare | ||
_batch_size integer := 100; | ||
_count integer; | ||
_offset integer := 0; | ||
_batch_size integer := 1000; | ||
begin | ||
set search_path = '{{.Schema}}'; | ||
set search_path = '{{ .Schema }}'; | ||
|
||
drop table if exists moves_view; | ||
|
||
create temp table moves_view as | ||
select transactions_id::numeric, public.aggregate_objects(json_build_object(accounts_address, json_build_object(asset, post_commit_volumes))::jsonb) as volumes | ||
from ( | ||
SELECT DISTINCT ON (moves.transactions_id, accounts_address, asset) moves.transactions_id, accounts_address, asset, | ||
first_value(post_commit_volumes) OVER ( | ||
PARTITION BY moves.transactions_id, accounts_address, asset | ||
ORDER BY seq DESC | ||
) AS post_commit_volumes | ||
FROM moves | ||
where insertion_date < ( | ||
select tstamp from goose_db_version where version_id = 12 | ||
) | ||
) moves | ||
group by transactions_id; | ||
|
||
select count(id) | ||
from transactions | ||
where post_commit_volumes is null | ||
into _count; | ||
perform pg_notify('migrations-{{ .Schema }}', 'init: ' || (select count(*) from moves_view)); | ||
|
||
perform pg_notify('migrations-{{ .Schema }}', 'init: ' || _count); | ||
create index moves_view_idx on moves_view(transactions_id); | ||
|
||
loop | ||
-- disable triggers | ||
set session_replication_role = replica; | ||
-- disable triggers | ||
set session_replication_role = replica; | ||
|
||
with _outdated_transactions as ( | ||
select id | ||
from transactions | ||
where post_commit_volumes is null | ||
limit _batch_size | ||
loop | ||
with data as ( | ||
select transactions_id, volumes | ||
from moves_view | ||
-- play better than offset/limit | ||
where transactions_id >= _offset and transactions_id < _offset + _batch_size | ||
) | ||
update transactions | ||
set post_commit_volumes = ( | ||
select public.aggregate_objects(post_commit_volumes::jsonb) as post_commit_volumes | ||
from ( | ||
select accounts_address, json_build_object(accounts_address, post_commit_volumes) post_commit_volumes | ||
from ( | ||
select accounts_address, json_build_object(asset, post_commit_volumes) as post_commit_volumes | ||
from ( | ||
select distinct on (accounts_address, asset) | ||
accounts_address, | ||
asset, | ||
first_value(post_commit_volumes) over ( | ||
partition by accounts_address, asset | ||
order by seq desc | ||
) as post_commit_volumes | ||
from moves | ||
where transactions_id = transactions.id and ledger = transactions.ledger | ||
) moves | ||
) values | ||
) values | ||
) | ||
from _outdated_transactions | ||
where transactions.id in (_outdated_transactions.id); | ||
|
||
-- enable triggers | ||
set session_replication_role = default; | ||
set post_commit_volumes = data.volumes | ||
from data | ||
where transactions.id = data.transactions_id; | ||
|
||
exit when not found; | ||
|
||
commit; | ||
_offset = _offset + _batch_size; | ||
|
||
perform pg_notify('migrations-{{ .Schema }}', 'continue: ' || _batch_size); | ||
|
||
commit; | ||
end loop; | ||
|
||
alter table transactions | ||
add constraint post_commit_volumes_not_null | ||
check (post_commit_volumes is not null) | ||
not valid; | ||
|
||
-- enable triggers | ||
set session_replication_role = default; | ||
|
||
drop table if exists moves_view; | ||
|
||
alter table transactions | ||
add constraint post_commit_volumes_not_null | ||
check (post_commit_volumes is not null) | ||
not valid; | ||
end | ||
$$; | ||
$$; |