From e27b8724353dc20f50531ad0ef35343e1edec39c Mon Sep 17 00:00:00 2001 From: Geoffrey Ragot Date: Thu, 17 Oct 2024 17:27:03 +0200 Subject: [PATCH] feat: simplify migration by removing templating --- internal/storage/bucket/migrations.go | 16 +- .../bucket/migrations/0-init-schema.sql | 358 +++++++++--------- .../bucket/migrations/1-fix-trigger.sql | 16 +- .../migrations/10-fillfactor-on-moves.sql | 2 +- .../bucket/migrations/11-drop-triggers.sql | 64 ++-- .../12-moves-add-transaction-id.sql | 10 +- ...-set-transaction-timestamp-default-utc.sql | 4 +- .../14-rename-address-moves-column.sql | 4 +- .../15-moves-remove-accounts-seq.sql | 2 +- .../16-moves-change-pvc-column-type.sql | 16 +- .../17-transactions-fix-reference.sql | 4 +- .../migrations/18-transactions-add-pvc.sql | 8 +- .../19-logs-add-idempotency-hash.sql | 2 +- .../migrations/2-fix-volumes-aggregation.sql | 10 +- .../20-moves-drop-accounts-address-array.sql | 2 +- .../21-add-accounts-volumes-table.sql | 6 +- ...ansactions-metadata-add-transaction-id.sql | 10 +- .../23-accounts-metadata-add-address.sql | 10 +- .../24-transactions-clean-table.sql | 4 +- .../25-accounts-set-array-not-null.sql | 2 +- .../migrations/26-logs-set-hash-nullable.sql | 2 +- .../bucket/migrations/27-clean-index.sql | 20 +- .../migrations/28-add-features-functions.sql | 60 +-- .../bucket/migrations/29-logs-add-memento.sql | 6 +- ...igger-inserting-backdated-transactions.sql | 26 +- .../migrations/30-logs-hash-in-database.sql | 6 +- .../bucket/migrations/31-logs-assign-date.sql | 2 +- .../migrations/32-accounts-assign-date.sql | 2 +- .../migrations/33-moves-assign-date.sql | 2 +- .../migrations/34-set-ledger-specifics.sql | 44 +-- .../4-add-account-first-usage-column.sql | 86 ++--- .../5-add-idempotency-key-index.sql | 2 +- .../migrations/6-add-reference-index.sql | 2 +- .../migrations/7-add-ik-unique-index.sql | 10 +- .../migrations/8-ik-ledger-unique-index.sql | 4 +- .../9-fix-incorrect-volumes-aggregation.sql | 20 +- 36 files changed, 416 insertions(+), 428 deletions(-) diff --git a/internal/storage/bucket/migrations.go b/internal/storage/bucket/migrations.go index 87d80b9d8..faf3a3aff 100644 --- a/internal/storage/bucket/migrations.go +++ b/internal/storage/bucket/migrations.go @@ -1,14 +1,11 @@ package bucket import ( - "bytes" "context" "embed" - "go.opentelemetry.io/otel/trace" - "text/template" - "github.com/formancehq/go-libs/v2/migrations" "github.com/uptrace/bun" + "go.opentelemetry.io/otel/trace" ) //go:embed migrations @@ -18,16 +15,7 @@ func GetMigrator(name string) *migrations.Migrator { migrator := migrations.NewMigrator(migrations.WithSchema(name, true)) migrator.RegisterMigrationsFromFileSystem(migrationsDir, "migrations", func(s string) string { - buf := bytes.NewBufferString("") - - t := template.Must(template.New("migration").Parse(s)) - if err := t.Execute(buf, map[string]any{ - "Bucket": name, - }); err != nil { - panic(err) - } - - return buf.String() + return s }) return migrator diff --git a/internal/storage/bucket/migrations/0-init-schema.sql b/internal/storage/bucket/migrations/0-init-schema.sql index 79271e043..51d30bfa1 100644 --- a/internal/storage/bucket/migrations/0-init-schema.sql +++ b/internal/storage/bucket/migrations/0-init-schema.sql @@ -1,10 +1,10 @@ -create aggregate "{{.Bucket}}".aggregate_objects(jsonb) ( +create aggregate aggregate_objects(jsonb) ( sfunc = jsonb_concat, stype = jsonb, initcond = '{}' ); -create function "{{.Bucket}}".first_agg(anyelement, anyelement) +create function first_agg(anyelement, anyelement) returns anyelement language sql immutable @@ -15,13 +15,13 @@ $$ select $1 $$; -create aggregate "{{.Bucket}}".first (anyelement) ( - sfunc = "{{.Bucket}}".first_agg, +create aggregate first (anyelement) ( + sfunc = first_agg, stype = anyelement, parallel = safe ); -create function "{{.Bucket}}".array_distinct(anyarray) +create function array_distinct(anyarray) returns anyarray language sql immutable @@ -32,27 +32,27 @@ from unnest($1) t(x); $$; /** Define types **/ -create type "{{.Bucket}}".account_with_volumes as +create type account_with_volumes as ( address varchar, metadata jsonb, volumes jsonb ); -create type "{{.Bucket}}".volumes as +create type volumes as ( inputs numeric, outputs numeric ); -create type "{{.Bucket}}".volumes_with_asset as +create type volumes_with_asset as ( asset varchar, - volumes "{{.Bucket}}".volumes + volumes volumes ); /** Define tables **/ -create table "{{.Bucket}}".transactions +create table transactions ( seq bigserial primary key, ledger varchar not null, @@ -69,19 +69,19 @@ create table "{{.Bucket}}".transactions metadata jsonb not null default '{}'::jsonb ); -create unique index transactions_ledger on "{{.Bucket}}".transactions (ledger, id); -create index transactions_date on "{{.Bucket}}".transactions (timestamp); -create index transactions_metadata_index on "{{.Bucket}}".transactions using gin (metadata jsonb_path_ops); -create index transactions_sources on "{{.Bucket}}".transactions using gin (sources jsonb_path_ops); -create index transactions_destinations on "{{.Bucket}}".transactions using gin (destinations jsonb_path_ops); -create index transactions_sources_arrays on "{{.Bucket}}".transactions using gin (sources_arrays jsonb_path_ops); -create index transactions_destinations_arrays on "{{.Bucket}}".transactions using gin (destinations_arrays jsonb_path_ops); +create unique index transactions_ledger on transactions (ledger, id); +create index transactions_date on transactions (timestamp); +create index transactions_metadata_index on transactions using gin (metadata jsonb_path_ops); +create index transactions_sources on transactions using gin (sources jsonb_path_ops); +create index transactions_destinations on transactions using gin (destinations jsonb_path_ops); +create index transactions_sources_arrays on transactions using gin (sources_arrays jsonb_path_ops); +create index transactions_destinations_arrays on transactions using gin (destinations_arrays jsonb_path_ops); -create table "{{.Bucket}}".transactions_metadata +create table transactions_metadata ( seq bigserial, ledger varchar not null, - transactions_seq bigint references "{{.Bucket}}".transactions (seq), + transactions_seq bigint references transactions (seq), revision numeric default 0 not null, date timestamp not null, metadata jsonb not null default '{}'::jsonb, @@ -89,11 +89,11 @@ create table "{{.Bucket}}".transactions_metadata primary key (seq) ); -create index transactions_metadata_metadata on "{{.Bucket}}".transactions_metadata using gin (metadata jsonb_path_ops); -create unique index transactions_metadata_ledger on "{{.Bucket}}".transactions_metadata (ledger, transactions_seq, revision); -create index transactions_metadata_revisions on "{{.Bucket}}".transactions_metadata(transactions_seq asc, revision desc) include (metadata, date); +create index transactions_metadata_metadata on transactions_metadata using gin (metadata jsonb_path_ops); +create unique index transactions_metadata_ledger on transactions_metadata (ledger, transactions_seq, revision); +create index transactions_metadata_revisions on transactions_metadata(transactions_seq asc, revision desc) include (metadata, date); -create table "{{.Bucket}}".accounts +create table accounts ( seq bigserial primary key, ledger varchar not null, @@ -104,75 +104,75 @@ create table "{{.Bucket}}".accounts metadata jsonb not null default '{}'::jsonb ); -create unique index accounts_ledger on "{{.Bucket}}".accounts (ledger, address) include (seq); -create index accounts_address_array on "{{.Bucket}}".accounts using gin (address_array jsonb_ops); -create index accounts_address_array_length on "{{.Bucket}}".accounts (jsonb_array_length(address_array)); +create unique index accounts_ledger on accounts (ledger, address) include (seq); +create index accounts_address_array on accounts using gin (address_array jsonb_ops); +create index accounts_address_array_length on accounts (jsonb_array_length(address_array)); -create table "{{.Bucket}}".accounts_metadata +create table accounts_metadata ( seq bigserial primary key, ledger varchar not null, - accounts_seq bigint references "{{.Bucket}}".accounts (seq), + accounts_seq bigint references accounts (seq), metadata jsonb not null default '{}'::jsonb, revision numeric default 0, date timestamp ); -create unique index accounts_metadata_ledger on "{{.Bucket}}".accounts_metadata (ledger, accounts_seq, revision); -create index accounts_metadata_metadata on "{{.Bucket}}".accounts_metadata using gin (metadata jsonb_path_ops); -create index accounts_metadata_revisions on "{{.Bucket}}".accounts_metadata(accounts_seq asc, revision desc) include (metadata, date); +create unique index accounts_metadata_ledger on accounts_metadata (ledger, accounts_seq, revision); +create index accounts_metadata_metadata on accounts_metadata using gin (metadata jsonb_path_ops); +create index accounts_metadata_revisions on accounts_metadata(accounts_seq asc, revision desc) include (metadata, date); -create table "{{.Bucket}}".moves +create table moves ( seq bigserial not null primary key, ledger varchar not null, - transactions_seq bigint not null references "{{.Bucket}}".transactions (seq), - accounts_seq bigint not null references "{{.Bucket}}".accounts (seq), + transactions_seq bigint not null references transactions (seq), + accounts_seq bigint not null references accounts (seq), account_address varchar not null, account_address_array jsonb not null, asset varchar not null, amount numeric not null, insertion_date timestamp not null, effective_date timestamp not null, - post_commit_volumes "{{.Bucket}}".volumes not null, - post_commit_effective_volumes "{{.Bucket}}".volumes default null, + post_commit_volumes volumes not null, + post_commit_effective_volumes volumes default null, is_source boolean not null ); -create index moves_ledger on "{{.Bucket}}".moves (ledger); -create index moves_range_dates on "{{.Bucket}}".moves (account_address, asset, effective_date); -create index moves_account_address on "{{.Bucket}}".moves (account_address); -create index moves_account_address_array on "{{.Bucket}}".moves using gin (account_address_array jsonb_ops); -create index moves_account_address_array_length on "{{.Bucket}}".moves (jsonb_array_length(account_address_array)); -create index moves_date on "{{.Bucket}}".moves (effective_date); -create index moves_asset on "{{.Bucket}}".moves (asset); -create index moves_post_commit_volumes on "{{.Bucket}}".moves (accounts_seq, asset, seq); -create index moves_effective_post_commit_volumes on "{{.Bucket}}".moves (accounts_seq, asset, effective_date desc); - -create type "{{.Bucket}}".log_type as enum +create index moves_ledger on moves (ledger); +create index moves_range_dates on moves (account_address, asset, effective_date); +create index moves_account_address on moves (account_address); +create index moves_account_address_array on moves using gin (account_address_array jsonb_ops); +create index moves_account_address_array_length on moves (jsonb_array_length(account_address_array)); +create index moves_date on moves (effective_date); +create index moves_asset on moves (asset); +create index moves_post_commit_volumes on moves (accounts_seq, asset, seq); +create index moves_effective_post_commit_volumes on moves (accounts_seq, asset, effective_date desc); + +create type log_type as enum ('NEW_TRANSACTION', 'REVERTED_TRANSACTION', 'SET_METADATA', 'DELETE_METADATA' ); -create table "{{.Bucket}}".logs +create table logs ( seq bigserial primary key, ledger varchar not null, id numeric not null, - type "{{.Bucket}}".log_type not null, + type log_type not null, hash bytea not null, date timestamp not null, data jsonb not null, idempotency_key varchar(255) ); -create unique index logs_ledger on "{{.Bucket}}".logs (ledger, id); +create unique index logs_ledger on logs (ledger, id); /** Define index **/ -create function "{{.Bucket}}".balance_from_volumes(v "{{.Bucket}}".volumes) +create function balance_from_volumes(v volumes) returns numeric language sql immutable @@ -184,49 +184,49 @@ $$; /** Define write functions **/ -- given the input : "a:b:c", the function will produce : '{"0": "a", "1": "b", "2": "c", "3": null}' -create function "{{.Bucket}}".explode_address(_address varchar) +create function explode_address(_address varchar) returns jsonb language sql immutable as $$ -select "{{.Bucket}}".aggregate_objects(jsonb_build_object(data.number - 1, data.value)) +select aggregate_objects(jsonb_build_object(data.number - 1, data.value)) from (select row_number() over () as number, v.value from (select unnest(string_to_array(_address, ':')) as value union all select null) v) data $$; -create function "{{.Bucket}}".get_transaction(_ledger varchar, _id numeric, _before timestamp default null) - returns setof "{{.Bucket}}".transactions +create function get_transaction(_ledger varchar, _id numeric, _before timestamp default null) + returns setof transactions language sql stable as $$ select * -from "{{.Bucket}}".transactions t +from transactions t where (_before is null or t.timestamp <= _before) and t.id = _id and ledger = _ledger order by id desc limit 1; -$$; +$$ set search_path from current; -- a simple 'select distinct asset from moves' would be more simple -- but Postgres is extremely inefficient with distinct -- so the query implementation use a "hack" to emulate skip scan feature which Postgres lack natively -- see https://wiki.postgresql.org/wiki/Loose_indexscan for more information -create function "{{.Bucket}}".get_all_assets(_ledger varchar) +create function get_all_assets(_ledger varchar) returns setof varchar language sql as $$ with recursive t as (select min(asset) as asset - from "{{.Bucket}}".moves + from moves where ledger = _ledger union all select (select min(asset) - from "{{.Bucket}}".moves + from moves where asset > t.asset and ledger = _ledger) from t @@ -236,96 +236,96 @@ from t where asset is not null union all select null -where exists(select 1 from "{{.Bucket}}".moves where asset is null and ledger = _ledger) -$$; +where exists(select 1 from moves where asset is null and ledger = _ledger) +$$ set search_path from current; -create function "{{.Bucket}}".get_latest_move_for_account_and_asset(_ledger varchar, _account_address varchar, _asset varchar, +create function get_latest_move_for_account_and_asset(_ledger varchar, _account_address varchar, _asset varchar, _before timestamp default null) - returns setof "{{.Bucket}}".moves + returns setof moves language sql stable as $$ select * -from "{{.Bucket}}".moves s +from moves s where (_before is null or s.effective_date <= _before) and s.account_address = _account_address and s.asset = _asset and ledger = _ledger order by effective_date desc, seq desc limit 1; -$$; +$$ set search_path from current; -create function "{{.Bucket}}".upsert_account(_ledger varchar, _address varchar, _metadata jsonb, _date timestamp) +create function upsert_account(_ledger varchar, _address varchar, _metadata jsonb, _date timestamp) returns void language plpgsql as $$ begin - insert into "{{.Bucket}}".accounts(ledger, address, address_array, insertion_date, metadata, updated_at) + insert into accounts(ledger, address, address_array, insertion_date, metadata, updated_at) values (_ledger, _address, to_json(string_to_array(_address, ':')), _date, coalesce(_metadata, '{}'::jsonb), _date) on conflict (ledger, address) do update set metadata = accounts.metadata || coalesce(_metadata, '{}'::jsonb), updated_at = _date where not accounts.metadata @> coalesce(_metadata, '{}'::jsonb); end; -$$; +$$ set search_path from current; -create function "{{.Bucket}}".delete_account_metadata(_ledger varchar, _address varchar, _key varchar, _date timestamp) +create function delete_account_metadata(_ledger varchar, _address varchar, _key varchar, _date timestamp) returns void language plpgsql as $$ begin - update "{{.Bucket}}".accounts + update accounts set metadata = metadata - _key, updated_at = _date where address = _address and ledger = _ledger; end -$$; +$$ set search_path from current; -create function "{{.Bucket}}".update_transaction_metadata(_ledger varchar, _id numeric, _metadata jsonb, _date timestamp) +create function update_transaction_metadata(_ledger varchar, _id numeric, _metadata jsonb, _date timestamp) returns void language plpgsql as $$ begin - update "{{.Bucket}}".transactions + update transactions set metadata = metadata || _metadata, updated_at = _date where id = _id and ledger = _ledger; end; -$$; +$$ set search_path from current; -create function "{{.Bucket}}".delete_transaction_metadata(_ledger varchar, _id numeric, _key varchar, _date timestamp) +create function delete_transaction_metadata(_ledger varchar, _id numeric, _key varchar, _date timestamp) returns void language plpgsql as $$ begin - update "{{.Bucket}}".transactions + update transactions set metadata = metadata - _key, updated_at = _date where id = _id and ledger = _ledger; end; -$$; +$$ set search_path from current; -create function "{{.Bucket}}".revert_transaction(_ledger varchar, _id numeric, _date timestamp) +create function revert_transaction(_ledger varchar, _id numeric, _date timestamp) returns void language sql as $$ -update "{{.Bucket}}".transactions +update transactions set reverted_at = _date where id = _id and ledger = _ledger; -$$; +$$ set search_path from current; -create or replace function "{{.Bucket}}".insert_move( +create or replace function insert_move( _transactions_seq bigint, _ledger varchar, _insertion_date timestamp without time zone, @@ -340,28 +340,28 @@ create or replace function "{{.Bucket}}".insert_move( as $$ declare - _post_commit_volumes "{{.Bucket}}".volumes = (0, 0)::"{{.Bucket}}".volumes; - _effective_post_commit_volumes "{{.Bucket}}".volumes = (0, 0)::"{{.Bucket}}".volumes; + _post_commit_volumes volumes = (0, 0)::volumes; + _effective_post_commit_volumes volumes = (0, 0)::volumes; _seq bigint; _account_seq bigint; begin - select seq from "{{.Bucket}}".accounts where ledger = _ledger and address = _account_address into _account_seq; + select seq from accounts where ledger = _ledger and address = _account_address into _account_seq; if _account_exists then select (post_commit_volumes).inputs, (post_commit_volumes).outputs into _post_commit_volumes - from "{{.Bucket}}".moves + from moves where accounts_seq = _account_seq and asset = _asset order by seq desc limit 1; if not found then - _post_commit_volumes = (0, 0)::"{{.Bucket}}".volumes; - _effective_post_commit_volumes = (0, 0)::"{{.Bucket}}".volumes; + _post_commit_volumes = (0, 0)::volumes; + _effective_post_commit_volumes = (0, 0)::volumes; else select (post_commit_effective_volumes).inputs, (post_commit_effective_volumes).outputs into _effective_post_commit_volumes - from "{{.Bucket}}".moves + from moves where accounts_seq = _account_seq and asset = _asset and effective_date <= _effective_date @@ -378,7 +378,7 @@ begin _effective_post_commit_volumes.inputs = _effective_post_commit_volumes.inputs + _amount; end if; - insert into "{{.Bucket}}".moves (ledger, + insert into moves (ledger, insertion_date, effective_date, accounts_seq, @@ -405,7 +405,7 @@ begin returning seq into _seq; if _account_exists then - update "{{.Bucket}}".moves + update moves set post_commit_effective_volumes = ((post_commit_effective_volumes).inputs + case when _is_source then 0 else _amount end, (post_commit_effective_volumes).outputs + case when _is_source then _amount else 0 end @@ -414,7 +414,7 @@ begin and asset = _asset and effective_date > _effective_date; - update "{{.Bucket}}".moves + update moves set post_commit_effective_volumes = ((post_commit_effective_volumes).inputs + case when _is_source then 0 else _amount end, (post_commit_effective_volumes).outputs + case when _is_source then _amount else 0 end @@ -425,9 +425,9 @@ begin and seq > _seq; end if; end; -$$; +$$ set search_path from current; -create function "{{.Bucket}}".insert_posting(_transaction_seq bigint, _ledger varchar, _insertion_date timestamp without time zone, +create function insert_posting(_transaction_seq bigint, _ledger varchar, _insertion_date timestamp without time zone, _effective_date timestamp without time zone, posting jsonb, _account_metadata jsonb) returns void language plpgsql @@ -438,23 +438,23 @@ declare _destination_exists bool; begin - select true from "{{.Bucket}}".accounts where ledger = _ledger and address = posting ->> 'source' into _source_exists; - select true from "{{.Bucket}}".accounts where ledger = _ledger and address = posting ->> 'destination' into _destination_exists; + select true from accounts where ledger = _ledger and address = posting ->> 'source' into _source_exists; + select true from accounts where ledger = _ledger and address = posting ->> 'destination' into _destination_exists; - perform "{{.Bucket}}".upsert_account(_ledger, posting ->> 'source', _account_metadata -> (posting ->> 'source'), _insertion_date); - perform "{{.Bucket}}".upsert_account(_ledger, posting ->> 'destination', _account_metadata -> (posting ->> 'destination'), + perform upsert_account(_ledger, posting ->> 'source', _account_metadata -> (posting ->> 'source'), _insertion_date); + perform upsert_account(_ledger, posting ->> 'destination', _account_metadata -> (posting ->> 'destination'), _insertion_date); - perform "{{.Bucket}}".insert_move(_transaction_seq, _ledger, _insertion_date, _effective_date, + perform insert_move(_transaction_seq, _ledger, _insertion_date, _effective_date, posting ->> 'source', posting ->> 'asset', (posting ->> 'amount')::numeric, true, _source_exists); - perform "{{.Bucket}}".insert_move(_transaction_seq, _ledger, _insertion_date, _effective_date, + perform insert_move(_transaction_seq, _ledger, _insertion_date, _effective_date, posting ->> 'destination', posting ->> 'asset', (posting ->> 'amount')::numeric, false, _destination_exists); end; -$$; +$$ set search_path from current; -create function "{{.Bucket}}".insert_transaction(_ledger varchar, data jsonb, _date timestamp without time zone, +create function insert_transaction(_ledger varchar, data jsonb, _date timestamp without time zone, _account_metadata jsonb) returns void language plpgsql @@ -464,7 +464,7 @@ declare posting jsonb; _seq bigint; begin - insert into "{{.Bucket}}".transactions (ledger, id, timestamp, updated_at, reference, postings, sources, + insert into transactions (ledger, id, timestamp, updated_at, reference, postings, sources, destinations, sources_arrays, destinations_arrays, metadata) values (_ledger, (data ->> 'id')::numeric, @@ -476,21 +476,21 @@ begin from jsonb_array_elements(data -> 'postings') v), (select to_jsonb(array_agg(v ->> 'destination')) as value from jsonb_array_elements(data -> 'postings') v), - (select to_jsonb(array_agg("{{.Bucket}}".explode_address(v ->> 'source'))) as value + (select to_jsonb(array_agg(explode_address(v ->> 'source'))) as value from jsonb_array_elements(data -> 'postings') v), - (select to_jsonb(array_agg("{{.Bucket}}".explode_address(v ->> 'destination'))) as value + (select to_jsonb(array_agg(explode_address(v ->> 'destination'))) as value from jsonb_array_elements(data -> 'postings') v), coalesce(data -> 'metadata', '{}'::jsonb)) returning seq into _seq; for posting in (select jsonb_array_elements(data -> 'postings')) loop - perform "{{.Bucket}}".insert_posting(_seq, _ledger, _date, (data ->> 'timestamp')::timestamp without time zone, posting, + perform insert_posting(_seq, _ledger, _date, (data ->> 'timestamp')::timestamp without time zone, posting, _account_metadata); end loop; if data -> 'metadata' is not null and data ->> 'metadata' <> '()' then - insert into "{{.Bucket}}".transactions_metadata (ledger, transactions_seq, revision, date, metadata) + insert into transactions_metadata (ledger, transactions_seq, revision, date, metadata) values (_ledger, _seq, 0, @@ -498,9 +498,9 @@ begin coalesce(data -> 'metadata', '{}'::jsonb)); end if; end -$$; +$$ set search_path from current; -create function "{{.Bucket}}".handle_log() returns trigger +create function handle_log() returns trigger security definer language plpgsql as @@ -510,50 +510,50 @@ declare _value jsonb; begin if new.type = 'NEW_TRANSACTION' then - perform "{{.Bucket}}".insert_transaction(new.ledger, new.data -> 'transaction', new.date, new.data -> 'accountMetadata'); + perform insert_transaction(new.ledger, new.data -> 'transaction', new.date, new.data -> 'accountMetadata'); for _key, _value in (select * from jsonb_each_text(new.data -> 'accountMetadata')) loop - perform "{{.Bucket}}".upsert_account(new.ledger, _key, _value, + perform upsert_account(new.ledger, _key, _value, (new.data -> 'transaction' ->> 'timestamp')::timestamp); end loop; end if; if new.type = 'REVERTED_TRANSACTION' then - perform "{{.Bucket}}".insert_transaction(new.ledger, new.data -> 'transaction', new.date, '{}'::jsonb); - perform "{{.Bucket}}".revert_transaction(new.ledger, (new.data ->> 'revertedTransactionID')::numeric, + perform insert_transaction(new.ledger, new.data -> 'transaction', new.date, '{}'::jsonb); + perform revert_transaction(new.ledger, (new.data ->> 'revertedTransactionID')::numeric, (new.data -> 'transaction' ->> 'timestamp')::timestamp); end if; if new.type = 'SET_METADATA' then if new.data ->> 'targetType' = 'TRANSACTION' then - perform "{{.Bucket}}".update_transaction_metadata(new.ledger, (new.data ->> 'targetId')::numeric, new.data -> 'metadata', + perform update_transaction_metadata(new.ledger, (new.data ->> 'targetId')::numeric, new.data -> 'metadata', new.date); else - perform "{{.Bucket}}".upsert_account(new.ledger, (new.data ->> 'targetId')::varchar, new.data -> 'metadata', new.date); + perform upsert_account(new.ledger, (new.data ->> 'targetId')::varchar, new.data -> 'metadata', new.date); end if; end if; if new.type = 'DELETE_METADATA' then if new.data ->> 'targetType' = 'TRANSACTION' then - perform "{{.Bucket}}".delete_transaction_metadata(new.ledger, (new.data ->> 'targetId')::numeric, new.data ->> 'key', + perform delete_transaction_metadata(new.ledger, (new.data ->> 'targetId')::numeric, new.data ->> 'key', new.date); else - perform "{{.Bucket}}".delete_account_metadata(new.ledger, (new.data ->> 'targetId')::varchar, new.data ->> 'key', + perform delete_account_metadata(new.ledger, (new.data ->> 'targetId')::varchar, new.data ->> 'key', new.date); end if; end if; return new; end; -$$; +$$ set search_path from current; -create function "{{.Bucket}}".update_account_metadata_history() returns trigger +create function update_account_metadata_history() returns trigger security definer language plpgsql as $$ begin - insert into "{{.Bucket}}".accounts_metadata (ledger, accounts_seq, revision, date, metadata) + insert into accounts_metadata (ledger, accounts_seq, revision, date, metadata) values (new.ledger, new.seq, ( select revision + 1 - from "{{.Bucket}}".accounts_metadata + from accounts_metadata where accounts_metadata.accounts_seq = new.seq order by revision desc limit 1 @@ -561,64 +561,64 @@ begin return new; end; -$$; +$$ set search_path from current; -create function "{{.Bucket}}".insert_account_metadata_history() returns trigger +create function insert_account_metadata_history() returns trigger security definer language plpgsql as $$ begin - insert into "{{.Bucket}}".accounts_metadata (ledger, accounts_seq, revision, date, metadata) + insert into accounts_metadata (ledger, accounts_seq, revision, date, metadata) values (new.ledger, new.seq, 1, new.insertion_date, new.metadata); return new; end; -$$; +$$ set search_path from current; -create function "{{.Bucket}}".update_transaction_metadata_history() returns trigger +create function update_transaction_metadata_history() returns trigger security definer language plpgsql as $$ begin - insert into "{{.Bucket}}".transactions_metadata (ledger, transactions_seq, revision, date, metadata) + insert into transactions_metadata (ledger, transactions_seq, revision, date, metadata) values (new.ledger, new.seq, (select revision + 1 - from "{{.Bucket}}".transactions_metadata + from transactions_metadata where transactions_metadata.transactions_seq = new.seq order by revision desc limit 1), new.updated_at, new.metadata); return new; end; -$$; +$$ set search_path from current; -create function "{{.Bucket}}".insert_transaction_metadata_history() returns trigger +create function insert_transaction_metadata_history() returns trigger security definer language plpgsql as $$ begin - insert into "{{.Bucket}}".transactions_metadata (ledger, transactions_seq, revision, date, metadata) + insert into transactions_metadata (ledger, transactions_seq, revision, date, metadata) values (new.ledger, new.seq, 1, new.timestamp, new.metadata); return new; end; -$$; +$$ set search_path from current; -create or replace function "{{.Bucket}}".get_all_account_effective_volumes(_ledger varchar, _account varchar, _before timestamp default null) - returns setof "{{.Bucket}}".volumes_with_asset +create or replace function get_all_account_effective_volumes(_ledger varchar, _account varchar, _before timestamp default null) + returns setof volumes_with_asset language sql stable as $$ with all_assets as (select v.v as asset - from "{{.Bucket}}".get_all_assets(_ledger) v), + from get_all_assets(_ledger) v), moves as (select m.* from all_assets assets join lateral ( select * - from "{{.Bucket}}".moves s + from moves s where (_before is null or s.effective_date <= _before) and s.account_address = _account and s.asset = assets.asset @@ -628,21 +628,21 @@ with all_assets as (select v.v as asset ) m on true) select moves.asset, moves.post_commit_effective_volumes from moves -$$; +$$ set search_path from current; -create or replace function "{{.Bucket}}".get_all_account_volumes(_ledger varchar, _account varchar, _before timestamp default null) - returns setof "{{.Bucket}}".volumes_with_asset +create or replace function get_all_account_volumes(_ledger varchar, _account varchar, _before timestamp default null) + returns setof volumes_with_asset language sql stable as $$ with all_assets as (select v.v as asset - from "{{.Bucket}}".get_all_assets(_ledger) v), + from get_all_assets(_ledger) v), moves as (select m.* from all_assets assets join lateral ( select * - from "{{.Bucket}}".moves s + from moves s where (_before is null or s.insertion_date <= _before) and s.account_address = _account and s.asset = assets.asset @@ -652,29 +652,29 @@ with all_assets as (select v.v as asset ) m on true) select moves.asset, moves.post_commit_volumes from moves -$$; +$$ set search_path from current; -create function "{{.Bucket}}".volumes_to_jsonb(v "{{.Bucket}}".volumes_with_asset) +create function volumes_to_jsonb(v volumes_with_asset) returns jsonb language sql immutable as $$ select ('{"' || v.asset || '": {"input": ' || (v.volumes).inputs || ', "output": ' || (v.volumes).outputs || '}}')::jsonb -$$; +$$ set search_path from current; -create function "{{.Bucket}}".get_account_aggregated_effective_volumes(_ledger varchar, _account_address varchar, +create function get_account_aggregated_effective_volumes(_ledger varchar, _account_address varchar, _before timestamp default null) returns jsonb language sql stable as $$ -select "{{.Bucket}}".aggregate_objects("{{.Bucket}}".volumes_to_jsonb(volumes_with_asset)) -from "{{.Bucket}}".get_all_account_effective_volumes(_ledger, _account_address, _before := _before) volumes_with_asset -$$; +select aggregate_objects(volumes_to_jsonb(volumes_with_asset)) +from get_all_account_effective_volumes(_ledger, _account_address, _before := _before) volumes_with_asset +$$ set search_path from current; -create function "{{.Bucket}}".get_account_aggregated_volumes(_ledger varchar, _account_address varchar, +create function get_account_aggregated_volumes(_ledger varchar, _account_address varchar, _before timestamp default null) returns jsonb language sql @@ -682,39 +682,39 @@ create function "{{.Bucket}}".get_account_aggregated_volumes(_ledger varchar, _a parallel safe as $$ -select "{{.Bucket}}".aggregate_objects("{{.Bucket}}".volumes_to_jsonb(volumes_with_asset)) -from "{{.Bucket}}".get_all_account_volumes(_ledger, _account_address, _before := _before) volumes_with_asset -$$; +select aggregate_objects(volumes_to_jsonb(volumes_with_asset)) +from get_all_account_volumes(_ledger, _account_address, _before := _before) volumes_with_asset +$$ set search_path from current; -create function "{{.Bucket}}".get_account_balance(_ledger varchar, _account varchar, _asset varchar, _before timestamp default null) +create function get_account_balance(_ledger varchar, _account varchar, _asset varchar, _before timestamp default null) returns numeric language sql stable as $$ select (post_commit_volumes).inputs - (post_commit_volumes).outputs -from "{{.Bucket}}".moves s +from moves s where (_before is null or s.effective_date <= _before) and s.account_address = _account and s.asset = _asset and s.ledger = _ledger order by seq desc limit 1 -$$; +$$ set search_path from current; -create function "{{.Bucket}}".aggregate_ledger_volumes( +create function aggregate_ledger_volumes( _ledger varchar, _before timestamp default null, _accounts varchar[] default null, _assets varchar[] default null ) - returns setof "{{.Bucket}}".volumes_with_asset + returns setof volumes_with_asset language sql stable as $$ with moves as (select distinct on (m.account_address, m.asset) m.* - from "{{.Bucket}}".moves m + from moves m where (_before is null or m.effective_date <= _before) and (_accounts is null or account_address = any (_accounts)) and (_assets is null or asset = any (_assets)) @@ -724,62 +724,62 @@ select v.asset, (sum((v.post_commit_effective_volumes).inputs), sum((v.post_commit_effective_volumes).outputs)) from moves v group by v.asset -$$; +$$ set search_path from current; -create function "{{.Bucket}}".get_aggregated_effective_volumes_for_transaction(_ledger varchar, tx numeric) returns jsonb +create function get_aggregated_effective_volumes_for_transaction(_ledger varchar, tx numeric) returns jsonb stable language sql as $$ -select "{{.Bucket}}".aggregate_objects(jsonb_build_object(data.account_address, data.aggregated)) +select aggregate_objects(jsonb_build_object(data.account_address, data.aggregated)) from (select distinct on (move.account_address, move.asset) move.account_address, - "{{.Bucket}}".volumes_to_jsonb((move.asset, "{{.Bucket}}".first(move.post_commit_effective_volumes))) as aggregated - from "{{.Bucket}}".moves move + volumes_to_jsonb((move.asset, first(move.post_commit_effective_volumes))) as aggregated + from moves move where move.transactions_seq = tx and ledger = _ledger group by move.account_address, move.asset) data -$$; +$$ set search_path from current; -create function "{{.Bucket}}".get_aggregated_volumes_for_transaction(_ledger varchar, tx numeric) returns jsonb +create function get_aggregated_volumes_for_transaction(_ledger varchar, tx numeric) returns jsonb stable language sql as $$ -select "{{.Bucket}}".aggregate_objects(jsonb_build_object(data.account_address, data.aggregated)) +select aggregate_objects(jsonb_build_object(data.account_address, data.aggregated)) from (select distinct on (move.account_address, move.asset) move.account_address, - "{{.Bucket}}".volumes_to_jsonb((move.asset, "{{.Bucket}}".first(move.post_commit_volumes))) as aggregated - from "{{.Bucket}}".moves move + volumes_to_jsonb((move.asset, first(move.post_commit_volumes))) as aggregated + from moves move where move.transactions_seq = tx and ledger = _ledger group by move.account_address, move.asset) data -$$; +$$ set search_path from current; create trigger "insert_log" after insert -on "{{.Bucket}}"."logs" +on "logs" for each row -execute procedure "{{.Bucket}}".handle_log(); +execute procedure handle_log(); create trigger "update_account" after update -on "{{.Bucket}}"."accounts" +on "accounts" for each row -execute procedure "{{.Bucket}}".update_account_metadata_history(); +execute procedure update_account_metadata_history(); create trigger "insert_account" after insert -on "{{.Bucket}}"."accounts" +on "accounts" for each row -execute procedure "{{.Bucket}}".insert_account_metadata_history(); +execute procedure insert_account_metadata_history(); create trigger "update_transaction" after update -on "{{.Bucket}}"."transactions" +on "transactions" for each row -execute procedure "{{.Bucket}}".update_transaction_metadata_history(); +execute procedure update_transaction_metadata_history(); create trigger "insert_transaction" after insert -on "{{.Bucket}}"."transactions" +on "transactions" for each row -execute procedure "{{.Bucket}}".insert_transaction_metadata_history(); \ No newline at end of file +execute procedure insert_transaction_metadata_history(); \ No newline at end of file diff --git a/internal/storage/bucket/migrations/1-fix-trigger.sql b/internal/storage/bucket/migrations/1-fix-trigger.sql index 7ca47b0df..af127449c 100644 --- a/internal/storage/bucket/migrations/1-fix-trigger.sql +++ b/internal/storage/bucket/migrations/1-fix-trigger.sql @@ -1,4 +1,4 @@ -create or replace function "{{.Bucket}}".insert_posting(_transaction_seq bigint, _ledger varchar, _insertion_date timestamp without time zone, +create or replace function insert_posting(_transaction_seq bigint, _ledger varchar, _insertion_date timestamp without time zone, _effective_date timestamp without time zone, posting jsonb, _account_metadata jsonb) returns void language plpgsql @@ -9,22 +9,22 @@ declare _destination_exists bool; begin - select true from "{{.Bucket}}".accounts where ledger = _ledger and address = posting ->> 'source' into _source_exists; + select true from accounts where ledger = _ledger and address = posting ->> 'source' into _source_exists; if posting ->>'source' = posting->>'destination' then _destination_exists = true; else - select true from "{{.Bucket}}".accounts where ledger = _ledger and address = posting ->> 'destination' into _destination_exists; + select true from accounts where ledger = _ledger and address = posting ->> 'destination' into _destination_exists; end if; - perform "{{.Bucket}}".upsert_account(_ledger, posting ->> 'source', _account_metadata -> (posting ->> 'source'), _insertion_date); - perform "{{.Bucket}}".upsert_account(_ledger, posting ->> 'destination', _account_metadata -> (posting ->> 'destination'), + perform upsert_account(_ledger, posting ->> 'source', _account_metadata -> (posting ->> 'source'), _insertion_date); + perform upsert_account(_ledger, posting ->> 'destination', _account_metadata -> (posting ->> 'destination'), _insertion_date); - perform "{{.Bucket}}".insert_move(_transaction_seq, _ledger, _insertion_date, _effective_date, + perform insert_move(_transaction_seq, _ledger, _insertion_date, _effective_date, posting ->> 'source', posting ->> 'asset', (posting ->> 'amount')::numeric, true, _source_exists); - perform "{{.Bucket}}".insert_move(_transaction_seq, _ledger, _insertion_date, _effective_date, + perform insert_move(_transaction_seq, _ledger, _insertion_date, _effective_date, posting ->> 'destination', posting ->> 'asset', (posting ->> 'amount')::numeric, false, _destination_exists); end; -$$; \ No newline at end of file +$$ set search_path from current; \ No newline at end of file diff --git a/internal/storage/bucket/migrations/10-fillfactor-on-moves.sql b/internal/storage/bucket/migrations/10-fillfactor-on-moves.sql index ebd5cea50..689434e0f 100644 --- a/internal/storage/bucket/migrations/10-fillfactor-on-moves.sql +++ b/internal/storage/bucket/migrations/10-fillfactor-on-moves.sql @@ -1 +1 @@ -alter table "{{.Bucket}}".moves set (fillfactor = 80); \ No newline at end of file +alter table moves set (fillfactor = 80); \ No newline at end of file diff --git a/internal/storage/bucket/migrations/11-drop-triggers.sql b/internal/storage/bucket/migrations/11-drop-triggers.sql index f176aea95..6944b1e1b 100644 --- a/internal/storage/bucket/migrations/11-drop-triggers.sql +++ b/internal/storage/bucket/migrations/11-drop-triggers.sql @@ -1,34 +1,34 @@ -drop trigger "insert_account" on "{{.Bucket}}".accounts; -drop trigger "update_account" on "{{.Bucket}}".accounts; -drop trigger "insert_transaction" on "{{.Bucket}}".transactions; -drop trigger "update_transaction" on "{{.Bucket}}".transactions; -drop trigger "insert_log" on "{{.Bucket}}".logs; +drop trigger "insert_account" on accounts; +drop trigger "update_account" on accounts; +drop trigger "insert_transaction" on transactions; +drop trigger "update_transaction" on transactions; +drop trigger "insert_log" on logs; -drop aggregate "{{.Bucket}}".aggregate_objects(jsonb); -drop aggregate "{{.Bucket}}".first(anyelement); +drop aggregate aggregate_objects(jsonb); +drop aggregate first(anyelement); -drop function "{{.Bucket}}".array_distinct(anyarray); -drop function "{{.Bucket}}".insert_posting(_transaction_seq bigint, _ledger character varying, _insertion_date timestamp without time zone, _effective_date timestamp without time zone, posting jsonb, _account_metadata jsonb); -drop function "{{.Bucket}}".upsert_account(_ledger character varying, _address character varying, _metadata jsonb, _date timestamp without time zone, _first_usage timestamp without time zone); -drop function "{{.Bucket}}".get_latest_move_for_account_and_asset(_ledger character varying, _account_address character varying, _asset character varying, _before timestamp without time zone); -drop function "{{.Bucket}}".update_transaction_metadata(_ledger character varying, _id numeric, _metadata jsonb, _date timestamp without time zone); -drop function "{{.Bucket}}".delete_account_metadata(_ledger character varying, _address character varying, _key character varying, _date timestamp without time zone); -drop function "{{.Bucket}}".delete_transaction_metadata(_ledger character varying, _id numeric, _key character varying, _date timestamp without time zone); -drop function "{{.Bucket}}".balance_from_volumes(v "{{.Bucket}}".volumes); -drop function "{{.Bucket}}".get_all_account_volumes(_ledger character varying, _account character varying, _before timestamp without time zone); -drop function "{{.Bucket}}".first_agg(anyelement, anyelement); -drop function "{{.Bucket}}".volumes_to_jsonb(v "{{.Bucket}}".volumes_with_asset); -drop function "{{.Bucket}}".get_account_aggregated_effective_volumes(_ledger character varying, _account_address character varying, _before timestamp without time zone); -drop function "{{.Bucket}}".handle_log(); -drop function "{{.Bucket}}".get_account_aggregated_volumes(_ledger character varying, _account_address character varying, _before timestamp without time zone); -drop function "{{.Bucket}}".get_aggregated_volumes_for_transaction(_ledger character varying, tx numeric); -drop function "{{.Bucket}}".insert_move(_transactions_seq bigint, _ledger character varying, _insertion_date timestamp without time zone, _effective_date timestamp without time zone, _account_address character varying, _asset character varying, _amount numeric, _is_source boolean, _account_exists boolean); -drop function "{{.Bucket}}".get_all_assets(_ledger character varying); -drop function "{{.Bucket}}".insert_transaction(_ledger character varying, data jsonb, _date timestamp without time zone, _account_metadata jsonb); -drop function "{{.Bucket}}".get_all_account_effective_volumes(_ledger character varying, _account character varying, _before timestamp without time zone); -drop function "{{.Bucket}}".get_account_balance(_ledger character varying, _account character varying, _asset character varying, _before timestamp without time zone); -drop function "{{.Bucket}}".get_aggregated_effective_volumes_for_transaction(_ledger character varying, tx numeric); -drop function "{{.Bucket}}".aggregate_ledger_volumes(_ledger character varying, _before timestamp without time zone, _accounts character varying[], _assets character varying[] ); -drop function "{{.Bucket}}".get_transaction(_ledger character varying, _id numeric, _before timestamp without time zone); ---drop function "{{.Bucket}}".explode_address(_address character varying); -drop function "{{.Bucket}}".revert_transaction(_ledger character varying, _id numeric, _date timestamp without time zone); \ No newline at end of file +drop function array_distinct(anyarray); +drop function insert_posting(_transaction_seq bigint, _ledger character varying, _insertion_date timestamp without time zone, _effective_date timestamp without time zone, posting jsonb, _account_metadata jsonb); +drop function upsert_account(_ledger character varying, _address character varying, _metadata jsonb, _date timestamp without time zone, _first_usage timestamp without time zone); +drop function get_latest_move_for_account_and_asset(_ledger character varying, _account_address character varying, _asset character varying, _before timestamp without time zone); +drop function update_transaction_metadata(_ledger character varying, _id numeric, _metadata jsonb, _date timestamp without time zone); +drop function delete_account_metadata(_ledger character varying, _address character varying, _key character varying, _date timestamp without time zone); +drop function delete_transaction_metadata(_ledger character varying, _id numeric, _key character varying, _date timestamp without time zone); +drop function balance_from_volumes(v volumes); +drop function get_all_account_volumes(_ledger character varying, _account character varying, _before timestamp without time zone); +drop function first_agg(anyelement, anyelement); +drop function volumes_to_jsonb(v volumes_with_asset); +drop function get_account_aggregated_effective_volumes(_ledger character varying, _account_address character varying, _before timestamp without time zone); +drop function handle_log(); +drop function get_account_aggregated_volumes(_ledger character varying, _account_address character varying, _before timestamp without time zone); +drop function get_aggregated_volumes_for_transaction(_ledger character varying, tx numeric); +drop function insert_move(_transactions_seq bigint, _ledger character varying, _insertion_date timestamp without time zone, _effective_date timestamp without time zone, _account_address character varying, _asset character varying, _amount numeric, _is_source boolean, _account_exists boolean); +drop function get_all_assets(_ledger character varying); +drop function insert_transaction(_ledger character varying, data jsonb, _date timestamp without time zone, _account_metadata jsonb); +drop function get_all_account_effective_volumes(_ledger character varying, _account character varying, _before timestamp without time zone); +drop function get_account_balance(_ledger character varying, _account character varying, _asset character varying, _before timestamp without time zone); +drop function get_aggregated_effective_volumes_for_transaction(_ledger character varying, tx numeric); +drop function aggregate_ledger_volumes(_ledger character varying, _before timestamp without time zone, _accounts character varying[], _assets character varying[] ); +drop function get_transaction(_ledger character varying, _id numeric, _before timestamp without time zone); +--drop function explode_address(_address character varying); +drop function revert_transaction(_ledger character varying, _id numeric, _date timestamp without time zone); \ No newline at end of file diff --git a/internal/storage/bucket/migrations/12-moves-add-transaction-id.sql b/internal/storage/bucket/migrations/12-moves-add-transaction-id.sql index 2bd6209f8..4b2bac471 100644 --- a/internal/storage/bucket/migrations/12-moves-add-transaction-id.sql +++ b/internal/storage/bucket/migrations/12-moves-add-transaction-id.sql @@ -1,15 +1,15 @@ -alter table "{{.Bucket}}".moves +alter table moves add column transactions_id bigint; -update "{{.Bucket}}".moves +update moves set transactions_id = ( select id - from "{{.Bucket}}".transactions + from transactions where seq = transactions_seq ); -alter table "{{.Bucket}}".moves +alter table moves alter column transactions_id set not null; -alter table "{{.Bucket}}".moves +alter table moves drop column transactions_seq; \ No newline at end of file diff --git a/internal/storage/bucket/migrations/13-set-transaction-timestamp-default-utc.sql b/internal/storage/bucket/migrations/13-set-transaction-timestamp-default-utc.sql index f955ef437..bf865da01 100644 --- a/internal/storage/bucket/migrations/13-set-transaction-timestamp-default-utc.sql +++ b/internal/storage/bucket/migrations/13-set-transaction-timestamp-default-utc.sql @@ -1,6 +1,6 @@ -alter table "{{.Bucket}}".transactions +alter table transactions add column inserted_at timestamp without time zone default (now() at time zone 'utc'); -alter table "{{.Bucket}}".transactions +alter table transactions alter column timestamp set default (now() at time zone 'utc'); \ No newline at end of file diff --git a/internal/storage/bucket/migrations/14-rename-address-moves-column.sql b/internal/storage/bucket/migrations/14-rename-address-moves-column.sql index 6b3626d7b..f1b76b860 100644 --- a/internal/storage/bucket/migrations/14-rename-address-moves-column.sql +++ b/internal/storage/bucket/migrations/14-rename-address-moves-column.sql @@ -1,5 +1,5 @@ -alter table "{{.Bucket}}".moves +alter table moves rename column account_address to accounts_address; -alter table "{{.Bucket}}".moves +alter table moves rename column account_address_array to accounts_address_array; \ No newline at end of file diff --git a/internal/storage/bucket/migrations/15-moves-remove-accounts-seq.sql b/internal/storage/bucket/migrations/15-moves-remove-accounts-seq.sql index a684830d7..2c4d62772 100644 --- a/internal/storage/bucket/migrations/15-moves-remove-accounts-seq.sql +++ b/internal/storage/bucket/migrations/15-moves-remove-accounts-seq.sql @@ -1,2 +1,2 @@ -alter table "{{.Bucket}}".moves +alter table moves drop column accounts_seq; \ No newline at end of file diff --git a/internal/storage/bucket/migrations/16-moves-change-pvc-column-type.sql b/internal/storage/bucket/migrations/16-moves-change-pvc-column-type.sql index 8be078add..f8869301a 100644 --- a/internal/storage/bucket/migrations/16-moves-change-pvc-column-type.sql +++ b/internal/storage/bucket/migrations/16-moves-change-pvc-column-type.sql @@ -1,25 +1,25 @@ -- update post_commit_volumes of table moves to jsonb -alter table "{{.Bucket}}".moves +alter table moves add column post_commit_volumes_jsonb jsonb; -update "{{.Bucket}}".moves +update moves set post_commit_volumes_jsonb = json_build_object('input', (post_commit_volumes).inputs, 'output', (post_commit_volumes).outputs); -alter table "{{.Bucket}}".moves +alter table moves drop column post_commit_volumes; -alter table "{{.Bucket}}".moves +alter table moves rename post_commit_volumes_jsonb to post_commit_volumes; -- update post_commit_volumes of table moves to jsonb -alter table "{{.Bucket}}".moves +alter table moves add column post_commit_effective_volumes_jsonb jsonb; -update "{{.Bucket}}".moves +update moves set post_commit_effective_volumes_jsonb = json_build_object('input', (post_commit_effective_volumes).inputs, 'output', (post_commit_effective_volumes).outputs); -alter table "{{.Bucket}}".moves +alter table moves drop column post_commit_effective_volumes; -alter table "{{.Bucket}}".moves +alter table moves rename post_commit_effective_volumes_jsonb to post_commit_effective_volumes; \ No newline at end of file diff --git a/internal/storage/bucket/migrations/17-transactions-fix-reference.sql b/internal/storage/bucket/migrations/17-transactions-fix-reference.sql index 4b63156e3..e998b699a 100644 --- a/internal/storage/bucket/migrations/17-transactions-fix-reference.sql +++ b/internal/storage/bucket/migrations/17-transactions-fix-reference.sql @@ -1,2 +1,2 @@ -drop index "{{.Bucket}}".transactions_reference; -create unique index transactions_reference on "{{.Bucket}}".transactions (ledger, reference); \ No newline at end of file +drop index transactions_reference; +create unique index transactions_reference on transactions (ledger, reference); \ No newline at end of file diff --git a/internal/storage/bucket/migrations/18-transactions-add-pvc.sql b/internal/storage/bucket/migrations/18-transactions-add-pvc.sql index abd437598..7a7bc8f52 100644 --- a/internal/storage/bucket/migrations/18-transactions-add-pvc.sql +++ b/internal/storage/bucket/migrations/18-transactions-add-pvc.sql @@ -1,8 +1,8 @@ -- add post_commit_volumes column on transactions table -alter table "{{.Bucket}}".transactions +alter table transactions add column post_commit_volumes jsonb; -update "{{.Bucket}}".transactions +update transactions set post_commit_volumes = ( select public.aggregate_objects(post_commit_volumes::jsonb) as post_commit_volumes @@ -12,12 +12,12 @@ set select accounts_address, json_build_object(asset, post_commit_volumes) as post_commit_volumes from ( select distinct on (accounts_address, asset) accounts_address, asset, first_value(post_commit_volumes) over (partition by accounts_address, asset order by seq desc) as post_commit_volumes - from "{{.Bucket}}".moves + from moves ) moves ) values ) values where transactions.sources ? accounts_address or transactions.destinations ? accounts_address ); -alter table "{{.Bucket}}".transactions +alter table transactions alter column post_commit_volumes set not null ; \ No newline at end of file diff --git a/internal/storage/bucket/migrations/19-logs-add-idempotency-hash.sql b/internal/storage/bucket/migrations/19-logs-add-idempotency-hash.sql index fe7709ff5..7c17afe34 100644 --- a/internal/storage/bucket/migrations/19-logs-add-idempotency-hash.sql +++ b/internal/storage/bucket/migrations/19-logs-add-idempotency-hash.sql @@ -1,2 +1,2 @@ -alter table "{{.Bucket}}".logs +alter table logs add column idempotency_hash bytea; \ No newline at end of file diff --git a/internal/storage/bucket/migrations/2-fix-volumes-aggregation.sql b/internal/storage/bucket/migrations/2-fix-volumes-aggregation.sql index f74997157..334ee11b7 100644 --- a/internal/storage/bucket/migrations/2-fix-volumes-aggregation.sql +++ b/internal/storage/bucket/migrations/2-fix-volumes-aggregation.sql @@ -1,16 +1,16 @@ -create or replace function "{{.Bucket}}".get_all_account_volumes(_ledger varchar, _account varchar, _before timestamp default null) - returns setof "{{.Bucket}}".volumes_with_asset +create or replace function get_all_account_volumes(_ledger varchar, _account varchar, _before timestamp default null) + returns setof volumes_with_asset language sql stable as $$ with all_assets as (select v.v as asset - from "{{.Bucket}}".get_all_assets(_ledger) v), + from get_all_assets(_ledger) v), moves as (select m.* from all_assets assets join lateral ( select * - from "{{.Bucket}}".moves s + from moves s where (_before is null or s.effective_date <= _before) and s.account_address = _account and s.asset = assets.asset @@ -20,4 +20,4 @@ with all_assets as (select v.v as asset ) m on true) select moves.asset, moves.post_commit_volumes from moves -$$; \ No newline at end of file +$$ set search_path from current; \ No newline at end of file diff --git a/internal/storage/bucket/migrations/20-moves-drop-accounts-address-array.sql b/internal/storage/bucket/migrations/20-moves-drop-accounts-address-array.sql index ac71ff830..852de4cc5 100644 --- a/internal/storage/bucket/migrations/20-moves-drop-accounts-address-array.sql +++ b/internal/storage/bucket/migrations/20-moves-drop-accounts-address-array.sql @@ -1,3 +1,3 @@ -- drop accounts_address_array from moves -alter table "{{.Bucket}}".moves +alter table moves drop column accounts_address_array; \ No newline at end of file diff --git a/internal/storage/bucket/migrations/21-add-accounts-volumes-table.sql b/internal/storage/bucket/migrations/21-add-accounts-volumes-table.sql index 7d33713e5..8384d572b 100644 --- a/internal/storage/bucket/migrations/21-add-accounts-volumes-table.sql +++ b/internal/storage/bucket/migrations/21-add-accounts-volumes-table.sql @@ -1,4 +1,4 @@ -create table "{{.Bucket}}".accounts_volumes ( +create table accounts_volumes ( ledger varchar not null, accounts_address varchar not null, asset varchar not null, @@ -8,7 +8,7 @@ create table "{{.Bucket}}".accounts_volumes ( primary key (ledger, accounts_address, asset) ); -insert into "{{.Bucket}}".accounts_volumes (ledger, accounts_address, asset, input, output) +insert into accounts_volumes (ledger, accounts_address, asset, input, output) select distinct on (ledger, accounts_address, asset) ledger, accounts_address, @@ -21,5 +21,5 @@ from ( accounts_address, asset, first_value(post_commit_volumes) over (partition by (accounts_address, asset) order by seq desc) as post_commit_volumes - from "{{.Bucket}}".moves + from moves ) moves; \ No newline at end of file diff --git a/internal/storage/bucket/migrations/22-transactions-metadata-add-transaction-id.sql b/internal/storage/bucket/migrations/22-transactions-metadata-add-transaction-id.sql index 1106541a1..aaca9fba0 100644 --- a/internal/storage/bucket/migrations/22-transactions-metadata-add-transaction-id.sql +++ b/internal/storage/bucket/migrations/22-transactions-metadata-add-transaction-id.sql @@ -1,16 +1,16 @@ -alter table "{{.Bucket}}".transactions_metadata +alter table transactions_metadata add column transactions_id bigint; -update "{{.Bucket}}".transactions_metadata +update transactions_metadata set transactions_id = ( select id - from "{{.Bucket}}".transactions + from transactions where transactions_metadata.transactions_seq = transactions.seq ); -alter table "{{.Bucket}}".transactions_metadata +alter table transactions_metadata drop column transactions_seq; -alter table "{{.Bucket}}".transactions_metadata +alter table transactions_metadata alter column transactions_id set not null; \ No newline at end of file diff --git a/internal/storage/bucket/migrations/23-accounts-metadata-add-address.sql b/internal/storage/bucket/migrations/23-accounts-metadata-add-address.sql index 2cfd22bf9..0a2a5daee 100644 --- a/internal/storage/bucket/migrations/23-accounts-metadata-add-address.sql +++ b/internal/storage/bucket/migrations/23-accounts-metadata-add-address.sql @@ -1,15 +1,15 @@ -alter table "{{.Bucket}}".accounts_metadata +alter table accounts_metadata add column accounts_address varchar; -update "{{.Bucket}}".accounts_metadata +update accounts_metadata set accounts_address = ( select address - from "{{.Bucket}}".accounts + from accounts where accounts_metadata.accounts_seq = seq ); -alter table "{{.Bucket}}".accounts_metadata +alter table accounts_metadata drop column accounts_seq; -alter table "{{.Bucket}}".accounts_metadata +alter table accounts_metadata alter column accounts_address set not null; \ No newline at end of file diff --git a/internal/storage/bucket/migrations/24-transactions-clean-table.sql b/internal/storage/bucket/migrations/24-transactions-clean-table.sql index 3d333b11c..d41c1aa04 100644 --- a/internal/storage/bucket/migrations/24-transactions-clean-table.sql +++ b/internal/storage/bucket/migrations/24-transactions-clean-table.sql @@ -1,6 +1,6 @@ -alter table "{{.Bucket}}".transactions +alter table transactions alter column id type bigint; -alter table "{{.Bucket}}".transactions +alter table transactions drop column seq; \ No newline at end of file diff --git a/internal/storage/bucket/migrations/25-accounts-set-array-not-null.sql b/internal/storage/bucket/migrations/25-accounts-set-array-not-null.sql index dbfee3ec1..b4004b5ce 100644 --- a/internal/storage/bucket/migrations/25-accounts-set-array-not-null.sql +++ b/internal/storage/bucket/migrations/25-accounts-set-array-not-null.sql @@ -1,2 +1,2 @@ -alter table "{{.Bucket}}".accounts +alter table accounts alter column address_array drop not null; \ No newline at end of file diff --git a/internal/storage/bucket/migrations/26-logs-set-hash-nullable.sql b/internal/storage/bucket/migrations/26-logs-set-hash-nullable.sql index 38c4b57d9..78b9f8359 100644 --- a/internal/storage/bucket/migrations/26-logs-set-hash-nullable.sql +++ b/internal/storage/bucket/migrations/26-logs-set-hash-nullable.sql @@ -1,3 +1,3 @@ -alter table "{{.Bucket}}".logs +alter table logs alter column hash drop not null; \ No newline at end of file diff --git a/internal/storage/bucket/migrations/27-clean-index.sql b/internal/storage/bucket/migrations/27-clean-index.sql index 445c26260..21d929fda 100644 --- a/internal/storage/bucket/migrations/27-clean-index.sql +++ b/internal/storage/bucket/migrations/27-clean-index.sql @@ -1,12 +1,12 @@ -create unique index accounts_metadata_ledger on "{{.Bucket}}".accounts_metadata (ledger, accounts_address, revision); -create index accounts_metadata_revisions on "{{.Bucket}}".accounts_metadata(accounts_address asc, revision desc) include (metadata, date); +create unique index accounts_metadata_ledger on accounts_metadata (ledger, accounts_address, revision); +create index accounts_metadata_revisions on accounts_metadata(accounts_address asc, revision desc) include (metadata, date); -create unique index transactions_metadata_ledger on "{{.Bucket}}".transactions_metadata (ledger, transactions_id, revision); -create index transactions_metadata_revisions on "{{.Bucket}}".transactions_metadata(transactions_id asc, revision desc) include (metadata, date); +create unique index transactions_metadata_ledger on transactions_metadata (ledger, transactions_id, revision); +create index transactions_metadata_revisions on transactions_metadata(transactions_id asc, revision desc) include (metadata, date); -drop index "{{.Bucket}}".transactions_sources_arrays; -drop index "{{.Bucket}}".transactions_destinations_arrays; -drop index "{{.Bucket}}".accounts_address_array; -drop index "{{.Bucket}}".accounts_address_array_length; -drop index "{{.Bucket}}".transactions_sources; -drop index "{{.Bucket}}".transactions_destinations; +drop index transactions_sources_arrays; +drop index transactions_destinations_arrays; +drop index accounts_address_array; +drop index accounts_address_array_length; +drop index transactions_sources; +drop index transactions_destinations; diff --git a/internal/storage/bucket/migrations/28-add-features-functions.sql b/internal/storage/bucket/migrations/28-add-features-functions.sql index cc108e73f..1e38de09b 100644 --- a/internal/storage/bucket/migrations/28-add-features-functions.sql +++ b/internal/storage/bucket/migrations/28-add-features-functions.sql @@ -1,4 +1,4 @@ -create function "{{.Bucket}}".set_effective_volumes() +create function set_effective_volumes() returns trigger security definer language plpgsql @@ -10,7 +10,7 @@ begin 'input', (post_commit_effective_volumes->>'input')::numeric + case when new.is_source then 0 else new.amount end, 'output', (post_commit_effective_volumes->>'output')::numeric + case when new.is_source then new.amount else 0 end ) - from "{{.Bucket}}".moves + from moves where accounts_address = new.accounts_address and asset = new.asset and ledger = new.ledger @@ -24,16 +24,16 @@ begin return new; end; -$$; +$$ set search_path from current; -create function "{{.Bucket}}".update_effective_volumes() +create function update_effective_volumes() returns trigger security definer language plpgsql as $$ begin - update "{{.Bucket}}".moves + update moves set post_commit_effective_volumes = json_build_object( 'input', (post_commit_effective_volumes->>'input')::numeric + case when new.is_source then 0 else new.amount end, 'output', (post_commit_effective_volumes->>'output')::numeric + case when new.is_source then new.amount else 0 end @@ -45,18 +45,18 @@ begin return new; end; -$$; +$$ set search_path from current; -create or replace function "{{.Bucket}}".update_transaction_metadata_history() returns trigger +create or replace function update_transaction_metadata_history() returns trigger security definer language plpgsql as $$ begin - insert into "{{.Bucket}}".transactions_metadata (ledger, transactions_id, revision, date, metadata) + insert into transactions_metadata (ledger, transactions_id, revision, date, metadata) values (new.ledger, new.id, ( select revision + 1 - from "{{.Bucket}}".transactions_metadata + from transactions_metadata where transactions_metadata.transactions_id = new.id and transactions_metadata.ledger = new.ledger order by revision desc limit 1 @@ -64,31 +64,31 @@ begin return new; end; -$$; +$$ set search_path from current; -create or replace function "{{.Bucket}}".insert_transaction_metadata_history() returns trigger +create or replace function insert_transaction_metadata_history() returns trigger security definer language plpgsql as $$ begin - insert into "{{.Bucket}}".transactions_metadata (ledger, transactions_id, revision, date, metadata) + insert into transactions_metadata (ledger, transactions_id, revision, date, metadata) values (new.ledger, new.id, 1, new.timestamp, new.metadata); return new; end; -$$; +$$ set search_path from current; -create or replace function "{{.Bucket}}".update_account_metadata_history() returns trigger +create or replace function update_account_metadata_history() returns trigger security definer language plpgsql as $$ begin - insert into "{{.Bucket}}".accounts_metadata (ledger, accounts_address, revision, date, metadata) + insert into accounts_metadata (ledger, accounts_address, revision, date, metadata) values (new.ledger, new.address, ( select revision + 1 - from "{{.Bucket}}".accounts_metadata + from accounts_metadata where accounts_metadata.accounts_address = new.address order by revision desc limit 1 @@ -96,22 +96,22 @@ begin return new; end; -$$; +$$ set search_path from current; -create or replace function "{{.Bucket}}".insert_account_metadata_history() returns trigger +create or replace function insert_account_metadata_history() returns trigger security definer language plpgsql as $$ begin - insert into "{{.Bucket}}".accounts_metadata (ledger, accounts_address, revision, date, metadata) + insert into accounts_metadata (ledger, accounts_address, revision, date, metadata) values (new.ledger, new.address, 1, new.insertion_date, new.metadata); return new; end; -$$; +$$ set search_path from current; -create or replace function "{{.Bucket}}".explode_address(_address varchar) +create or replace function explode_address(_address varchar) returns jsonb language sql immutable @@ -122,9 +122,9 @@ from (select row_number() over () as number, v.value from (select unnest(string_to_array(_address, ':')) as value union all select null) v) data -$$; +$$ set search_path from current; -create or replace function "{{.Bucket}}".set_transaction_addresses() returns trigger +create or replace function set_transaction_addresses() returns trigger security definer language plpgsql as @@ -142,28 +142,28 @@ begin return new; end -$$; +$$ set search_path from current; -create or replace function "{{.Bucket}}".set_transaction_addresses_segments() returns trigger +create or replace function set_transaction_addresses_segments() returns trigger security definer language plpgsql as $$ begin new.sources_arrays = ( - select to_jsonb(array_agg("{{.Bucket}}".explode_address(v ->> 'source'))) as value + select to_jsonb(array_agg(explode_address(v ->> 'source'))) as value from jsonb_array_elements(new.postings::jsonb) v ); new.destinations_arrays = ( - select to_jsonb(array_agg("{{.Bucket}}".explode_address(v ->> 'destination'))) as value + select to_jsonb(array_agg(explode_address(v ->> 'destination'))) as value from jsonb_array_elements(new.postings::jsonb) v ); return new; end -$$; +$$ set search_path from current; -create or replace function "{{.Bucket}}".set_address_array_for_account() returns trigger +create or replace function set_address_array_for_account() returns trigger security definer language plpgsql as @@ -173,4 +173,4 @@ begin return new; end -$$; \ No newline at end of file +$$ set search_path from current; \ No newline at end of file diff --git a/internal/storage/bucket/migrations/29-logs-add-memento.sql b/internal/storage/bucket/migrations/29-logs-add-memento.sql index 68e165f2e..ff0b2ce6b 100644 --- a/internal/storage/bucket/migrations/29-logs-add-memento.sql +++ b/internal/storage/bucket/migrations/29-logs-add-memento.sql @@ -1,8 +1,8 @@ -alter table "{{.Bucket}}".logs +alter table logs add column memento bytea; -update "{{.Bucket}}".logs +update logs set memento = convert_to(data::varchar, 'LATIN1')::bytea; -alter table "{{.Bucket}}".logs +alter table logs alter column memento set not null; \ No newline at end of file diff --git a/internal/storage/bucket/migrations/3-fix-trigger-inserting-backdated-transactions.sql b/internal/storage/bucket/migrations/3-fix-trigger-inserting-backdated-transactions.sql index 853990d26..668771559 100644 --- a/internal/storage/bucket/migrations/3-fix-trigger-inserting-backdated-transactions.sql +++ b/internal/storage/bucket/migrations/3-fix-trigger-inserting-backdated-transactions.sql @@ -1,4 +1,4 @@ -create or replace function "{{.Bucket}}".insert_move( +create or replace function insert_move( _transactions_seq bigint, _ledger varchar, _insertion_date timestamp without time zone, @@ -13,28 +13,28 @@ create or replace function "{{.Bucket}}".insert_move( as $$ declare - _post_commit_volumes "{{.Bucket}}".volumes = (0, 0)::"{{.Bucket}}".volumes; - _effective_post_commit_volumes "{{.Bucket}}".volumes = (0, 0)::"{{.Bucket}}".volumes; + _post_commit_volumes volumes = (0, 0)::volumes; + _effective_post_commit_volumes volumes = (0, 0)::volumes; _seq bigint; _account_seq bigint; begin - select seq from "{{.Bucket}}".accounts where ledger = _ledger and address = _account_address into _account_seq; + select seq from accounts where ledger = _ledger and address = _account_address into _account_seq; if _account_exists then select (post_commit_volumes).inputs, (post_commit_volumes).outputs into _post_commit_volumes - from "{{.Bucket}}".moves + from moves where accounts_seq = _account_seq and asset = _asset order by seq desc limit 1; if not found then - _post_commit_volumes = (0, 0)::"{{.Bucket}}".volumes; - _effective_post_commit_volumes = (0, 0)::"{{.Bucket}}".volumes; + _post_commit_volumes = (0, 0)::volumes; + _effective_post_commit_volumes = (0, 0)::volumes; else select (post_commit_effective_volumes).inputs, (post_commit_effective_volumes).outputs into _effective_post_commit_volumes - from "{{.Bucket}}".moves + from moves where accounts_seq = _account_seq and asset = _asset and effective_date <= _effective_date @@ -42,7 +42,7 @@ begin limit 1; if not found then - _effective_post_commit_volumes = (0, 0)::"{{.Bucket}}".volumes; + _effective_post_commit_volumes = (0, 0)::volumes; end if; end if; end if; @@ -55,7 +55,7 @@ begin _effective_post_commit_volumes.inputs = _effective_post_commit_volumes.inputs + _amount; end if; - insert into "{{.Bucket}}".moves (ledger, + insert into moves (ledger, insertion_date, effective_date, accounts_seq, @@ -82,7 +82,7 @@ begin returning seq into _seq; if _account_exists then - update "{{.Bucket}}".moves + update moves set post_commit_effective_volumes = ((post_commit_effective_volumes).inputs + case when _is_source then 0 else _amount end, (post_commit_effective_volumes).outputs + case when _is_source then _amount else 0 end @@ -91,7 +91,7 @@ begin and asset = _asset and effective_date > _effective_date; - update "{{.Bucket}}".moves + update moves set post_commit_effective_volumes = ((post_commit_effective_volumes).inputs + case when _is_source then 0 else _amount end, (post_commit_effective_volumes).outputs + case when _is_source then _amount else 0 end @@ -102,4 +102,4 @@ begin and seq < _seq; end if; end; -$$; \ No newline at end of file +$$ set search_path from current; \ No newline at end of file diff --git a/internal/storage/bucket/migrations/30-logs-hash-in-database.sql b/internal/storage/bucket/migrations/30-logs-hash-in-database.sql index c6b0fa249..f76ea5ab4 100644 --- a/internal/storage/bucket/migrations/30-logs-hash-in-database.sql +++ b/internal/storage/bucket/migrations/30-logs-hash-in-database.sql @@ -1,4 +1,4 @@ -create function "{{.Bucket}}".set_log_hash() +create function set_log_hash() returns trigger security definer language plpgsql @@ -9,7 +9,7 @@ declare marshalledAsJSON varchar; begin select hash into previousHash - from "{{.Bucket}}".logs + from logs where ledger = new.ledger order by seq desc limit 1; @@ -36,4 +36,4 @@ begin return new; end; -$$; \ No newline at end of file +$$ set search_path from current; \ No newline at end of file diff --git a/internal/storage/bucket/migrations/31-logs-assign-date.sql b/internal/storage/bucket/migrations/31-logs-assign-date.sql index 5de43d87e..b58e5d741 100644 --- a/internal/storage/bucket/migrations/31-logs-assign-date.sql +++ b/internal/storage/bucket/migrations/31-logs-assign-date.sql @@ -1,2 +1,2 @@ -alter table "{{.Bucket}}".logs +alter table logs alter column date set default (now() at time zone 'utc'); \ No newline at end of file diff --git a/internal/storage/bucket/migrations/32-accounts-assign-date.sql b/internal/storage/bucket/migrations/32-accounts-assign-date.sql index df0f93fe9..f7e919681 100644 --- a/internal/storage/bucket/migrations/32-accounts-assign-date.sql +++ b/internal/storage/bucket/migrations/32-accounts-assign-date.sql @@ -1,4 +1,4 @@ -alter table "{{.Bucket}}".accounts +alter table accounts alter column first_usage set default (now() at time zone 'utc'), alter column insertion_date set default (now() at time zone 'utc'), alter column updated_at set default (now() at time zone 'utc') diff --git a/internal/storage/bucket/migrations/33-moves-assign-date.sql b/internal/storage/bucket/migrations/33-moves-assign-date.sql index 51d2e467d..6b4214c52 100644 --- a/internal/storage/bucket/migrations/33-moves-assign-date.sql +++ b/internal/storage/bucket/migrations/33-moves-assign-date.sql @@ -1,4 +1,4 @@ -alter table "{{.Bucket}}".moves +alter table moves alter column insertion_date set default (now() at time zone 'utc'), alter column effective_date set default (now() at time zone 'utc') ; \ No newline at end of file diff --git a/internal/storage/bucket/migrations/34-set-ledger-specifics.sql b/internal/storage/bucket/migrations/34-set-ledger-specifics.sql index a909691da..073cdb185 100644 --- a/internal/storage/bucket/migrations/34-set-ledger-specifics.sql +++ b/internal/storage/bucket/migrations/34-set-ledger-specifics.sql @@ -4,75 +4,75 @@ $do$ ledger record; vsql text; BEGIN - for ledger in select * from _system.ledgers where bucket = '{{.Bucket}}' loop + for ledger in select * from _system.ledgers where bucket = current_schema loop -- create a sequence for transactions by ledger instead of a sequence of the table as we want to have contiguous ids -- notes: we can still have "holes" on ids since a sql transaction can be reverted after a usage of the sequence - vsql = 'create sequence "{{.Bucket}}"."transaction_id_' || ledger.id || '" owned by "{{.Bucket}}".transactions.id'; + vsql = 'create sequence "transaction_id_' || ledger.id || '" owned by transactions.id'; execute vsql; - vsql = 'select setval("{{.Bucket}}"."transaction_id_' || ledger.id || '", coalesce((select max(id) + 1 from "{{.Bucket}}".transactions where ledger = ledger.name), 1)::bigint, false)'; + vsql = 'select setval("transaction_id_' || ledger.id || '", coalesce((select max(id) + 1 from transactions where ledger = ledger.name), 1)::bigint, false)'; execute vsql; -- create a sequence for logs by ledger instead of a sequence of the table as we want to have contiguous ids -- notes: we can still have "holes" on id since a sql transaction can be reverted after a usage of the sequence - vsql = 'create sequence "{{.Bucket}}"."log_id_' || ledger.id || '" owned by "{{.Bucket}}".logs.id'; + vsql = 'create sequence "log_id_' || ledger.id || '" owned by logs.id'; execute vsql; - vsql = 'select setval("{{.Bucket}}"."log_id_' || ledger.id || '", coalesce((select max(id) + 1 from "{{.Bucket}}".logs where ledger = ledger.name), 1)::bigint, false)'; + vsql = 'select setval("log_id_' || ledger.id || '", coalesce((select max(id) + 1 from logs where ledger = ledger.name), 1)::bigint, false)'; execute vsql; -- enable post commit effective volumes synchronously - vsql = 'create index "pcev_' || ledger.id || '" on "{{.Bucket}}".moves (accounts_address, asset, effective_date desc) where ledger = ledger.name'; + vsql = 'create index "pcev_' || ledger.id || '" on moves (accounts_address, asset, effective_date desc) where ledger = ledger.name'; execute vsql; - vsql = 'create trigger "set_effective_volumes_' || ledger.id || '" before insert on "{{.Bucket}}".moves for each row when (new.ledger = ledger.name) execute procedure "{{.Bucket}}".set_effective_volumes()'; + vsql = 'create trigger "set_effective_volumes_' || ledger.id || '" before insert on moves for each row when (new.ledger = ledger.name) execute procedure set_effective_volumes()'; execute vsql; - vsql = 'create trigger "update_effective_volumes_' || ledger.id || '" after insert on "{{.Bucket}}".moves for each row when (new.ledger = ledger.name) execute procedure "{{.Bucket}}".update_effective_volumes()'; + vsql = 'create trigger "update_effective_volumes_' || ledger.id || '" after insert on moves for each row when (new.ledger = ledger.name) execute procedure update_effective_volumes()'; execute vsql; -- logs hash - vsql = 'create trigger "set_log_hash_' || ledger.id || '" before insert on "{{.Bucket}}".logs for each row when (new.ledger = ledger.name) execute procedure "{{.Bucket}}".set_log_hash()'; + vsql = 'create trigger "set_log_hash_' || ledger.id || '" before insert on logs for each row when (new.ledger = ledger.name) execute procedure set_log_hash()'; execute vsql; - vsql = 'create trigger "update_account_metadata_history_' || ledger.id || '" after update on "{{.Bucket}}"."accounts" for each row when (new.ledger = ledger.name) execute procedure "{{.Bucket}}".update_account_metadata_history()'; + vsql = 'create trigger "update_account_metadata_history_' || ledger.id || '" after update on "accounts" for each row when (new.ledger = ledger.name) execute procedure update_account_metadata_history()'; execute vsql; - vsql = 'create trigger "insert_account_metadata_history_' || ledger.id || '" after insert on "{{.Bucket}}"."accounts" for each row when (new.ledger = ledger.name) execute procedure "{{.Bucket}}".insert_account_metadata_history()'; + vsql = 'create trigger "insert_account_metadata_history_' || ledger.id || '" after insert on "accounts" for each row when (new.ledger = ledger.name) execute procedure insert_account_metadata_history()'; execute vsql; - vsql = 'create trigger "update_transaction_metadata_history_' || ledger.id || '" after update on "{{.Bucket}}"."transactions" for each row when (new.ledger = ledger.name) execute procedure "{{.Bucket}}".update_transaction_metadata_history()'; + vsql = 'create trigger "update_transaction_metadata_history_' || ledger.id || '" after update on "transactions" for each row when (new.ledger = ledger.name) execute procedure update_transaction_metadata_history()'; execute vsql; - vsql = 'create trigger "insert_transaction_metadata_history_' || ledger.id || '" after insert on "{{.Bucket}}"."transactions" for each row when (new.ledger = ledger.name) execute procedure "{{.Bucket}}".insert_transaction_metadata_history()'; + vsql = 'create trigger "insert_transaction_metadata_history_' || ledger.id || '" after insert on "transactions" for each row when (new.ledger = ledger.name) execute procedure insert_transaction_metadata_history()'; execute vsql; - vsql = 'create index "transactions_sources_' || ledger.id || '" on "{{.Bucket}}".transactions using gin (sources jsonb_path_ops) where ledger = ledger.name'; + vsql = 'create index "transactions_sources_' || ledger.id || '" on transactions using gin (sources jsonb_path_ops) where ledger = ledger.name'; execute vsql; - vsql = 'create index "transactions_destinations_' || ledger.id || '" on "{{.Bucket}}".transactions using gin (destinations jsonb_path_ops) where ledger = ledger.name'; + vsql = 'create index "transactions_destinations_' || ledger.id || '" on transactions using gin (destinations jsonb_path_ops) where ledger = ledger.name'; execute vsql; - vsql = 'create trigger "transaction_set_addresses_' || ledger.id || '" before insert on "{{.Bucket}}".transactions for each row when (new.ledger = ledger.name) execute procedure "{{.Bucket}}".set_transaction_addresses()'; + vsql = 'create trigger "transaction_set_addresses_' || ledger.id || '" before insert on transactions for each row when (new.ledger = ledger.name) execute procedure set_transaction_addresses()'; execute vsql; - vsql = 'create index "accounts_address_array_' || ledger.id || '" on "{{.Bucket}}".accounts using gin (address_array jsonb_ops) where ledger = ledger.name'; + vsql = 'create index "accounts_address_array_' || ledger.id || '" on accounts using gin (address_array jsonb_ops) where ledger = ledger.name'; execute vsql; - vsql = 'create index "accounts_address_array_length_' || ledger.id || '" on "{{.Bucket}}".accounts (jsonb_array_length(address_array)) where ledger = ledger.name'; + vsql = 'create index "accounts_address_array_length_' || ledger.id || '" on accounts (jsonb_array_length(address_array)) where ledger = ledger.name'; execute vsql; - vsql = 'create trigger "accounts_set_address_array_' || ledger.id || '" before insert on "{{.Bucket}}".accounts for each row when (new.ledger = ledger.name) execute procedure "{{.Bucket}}".set_address_array_for_account()'; + vsql = 'create trigger "accounts_set_address_array_' || ledger.id || '" before insert on accounts for each row when (new.ledger = ledger.name) execute procedure set_address_array_for_account()'; execute vsql; - vsql = 'create index "transactions_sources_arrays_' || ledger.id || '" on "{{.Bucket}}".transactions using gin (sources_arrays jsonb_path_ops) where ledger = ledger.name'; + vsql = 'create index "transactions_sources_arrays_' || ledger.id || '" on transactions using gin (sources_arrays jsonb_path_ops) where ledger = ledger.name'; execute vsql; - vsql = 'create index "transactions_destinations_arrays_' || ledger.id || '" on "{{.Bucket}}".transactions using gin (destinations_arrays jsonb_path_ops) where ledger = ledger.name'; + vsql = 'create index "transactions_destinations_arrays_' || ledger.id || '" on transactions using gin (destinations_arrays jsonb_path_ops) where ledger = ledger.name'; execute vsql; - vsql = 'create trigger "transaction_set_addresses_segments_' || ledger.id || '" before insert on "{{.Bucket}}"."transactions" for each row when (new.ledger = ledger.name) execute procedure "{{.Bucket}}".set_transaction_addresses_segments()'; + vsql = 'create trigger "transaction_set_addresses_segments_' || ledger.id || '" before insert on "transactions" for each row when (new.ledger = ledger.name) execute procedure set_transaction_addresses_segments()'; execute vsql; end loop; END diff --git a/internal/storage/bucket/migrations/4-add-account-first-usage-column.sql b/internal/storage/bucket/migrations/4-add-account-first-usage-column.sql index efd11b24c..34cc49c21 100644 --- a/internal/storage/bucket/migrations/4-add-account-first-usage-column.sql +++ b/internal/storage/bucket/migrations/4-add-account-first-usage-column.sql @@ -1,7 +1,7 @@ -alter table "{{.Bucket}}".accounts +alter table accounts add column first_usage timestamp without time zone; -create or replace function "{{.Bucket}}".insert_move( +create or replace function insert_move( _transactions_seq bigint, _ledger varchar, _insertion_date timestamp without time zone, @@ -16,29 +16,29 @@ create or replace function "{{.Bucket}}".insert_move( as $$ declare - _post_commit_volumes "{{.Bucket}}".volumes = (0, 0)::"{{.Bucket}}".volumes; - _effective_post_commit_volumes "{{.Bucket}}".volumes = (0, 0)::"{{.Bucket}}".volumes; + _post_commit_volumes volumes = (0, 0)::volumes; + _effective_post_commit_volumes volumes = (0, 0)::volumes; _seq bigint; _account_seq bigint; begin - select seq from "{{.Bucket}}".accounts where ledger = _ledger and address = _account_address into _account_seq; + select seq from accounts where ledger = _ledger and address = _account_address into _account_seq; if _account_exists then select (post_commit_volumes).inputs, (post_commit_volumes).outputs into _post_commit_volumes - from "{{.Bucket}}".moves + from moves where accounts_seq = _account_seq and asset = _asset order by seq desc limit 1; if not found then - _post_commit_volumes = (0, 0)::"{{.Bucket}}".volumes; - _effective_post_commit_volumes = (0, 0)::"{{.Bucket}}".volumes; + _post_commit_volumes = (0, 0)::volumes; + _effective_post_commit_volumes = (0, 0)::volumes; else select (post_commit_effective_volumes).inputs, (post_commit_effective_volumes).outputs into _effective_post_commit_volumes - from "{{.Bucket}}".moves + from moves where accounts_seq = _account_seq and asset = _asset and effective_date <= _effective_date @@ -46,7 +46,7 @@ begin limit 1; if not found then - _effective_post_commit_volumes = (0, 0)::"{{.Bucket}}".volumes; + _effective_post_commit_volumes = (0, 0)::volumes; end if; end if; end if; @@ -59,7 +59,7 @@ begin _effective_post_commit_volumes.inputs = _effective_post_commit_volumes.inputs + _amount; end if; - insert into "{{.Bucket}}".moves (ledger, + insert into moves (ledger, insertion_date, effective_date, accounts_seq, @@ -86,7 +86,7 @@ begin returning seq into _seq; if _account_exists then - update "{{.Bucket}}".moves + update moves set post_commit_effective_volumes = ((post_commit_effective_volumes).inputs + case when _is_source then 0 else _amount end, (post_commit_effective_volumes).outputs + case when _is_source then _amount else 0 end @@ -96,15 +96,15 @@ begin and effective_date > _effective_date; end if; end; -$$; +$$ set search_path from current; -create or replace function "{{.Bucket}}".upsert_account(_ledger varchar, _address varchar, _metadata jsonb, _date timestamp, _first_usage timestamp) +create or replace function upsert_account(_ledger varchar, _address varchar, _metadata jsonb, _date timestamp, _first_usage timestamp) returns void language plpgsql as $$ begin - insert into "{{.Bucket}}".accounts(ledger, address, address_array, insertion_date, metadata, updated_at, first_usage) + insert into accounts(ledger, address, address_array, insertion_date, metadata, updated_at, first_usage) values (_ledger, _address, to_json(string_to_array(_address, ':')), _date, coalesce(_metadata, '{}'::jsonb), _date, _first_usage) on conflict (ledger, address) do update set metadata = accounts.metadata || coalesce(_metadata, '{}'::jsonb), @@ -112,9 +112,9 @@ begin first_usage = case when accounts.first_usage < _first_usage then accounts.first_usage else _first_usage end where not accounts.metadata @> coalesce(_metadata, '{}'::jsonb) or accounts.first_usage > _first_usage; end; -$$; +$$ set search_path from current; -create or replace function "{{.Bucket}}".insert_posting(_transaction_seq bigint, _ledger varchar, _insertion_date timestamp without time zone, +create or replace function insert_posting(_transaction_seq bigint, _ledger varchar, _insertion_date timestamp without time zone, _effective_date timestamp without time zone, posting jsonb, _account_metadata jsonb) returns void language plpgsql @@ -125,22 +125,22 @@ declare _destination_exists bool; begin - select true from "{{.Bucket}}".accounts where ledger = _ledger and address = posting ->> 'source' into _source_exists; - perform "{{.Bucket}}".upsert_account(_ledger, posting ->> 'source', _account_metadata -> (posting ->> 'source'), _insertion_date, _effective_date); + select true from accounts where ledger = _ledger and address = posting ->> 'source' into _source_exists; + perform upsert_account(_ledger, posting ->> 'source', _account_metadata -> (posting ->> 'source'), _insertion_date, _effective_date); - select true from "{{.Bucket}}".accounts where ledger = _ledger and address = posting ->> 'destination' into _destination_exists; - perform "{{.Bucket}}".upsert_account(_ledger, posting ->> 'destination', _account_metadata -> (posting ->> 'destination'), _insertion_date, _effective_date); + select true from accounts where ledger = _ledger and address = posting ->> 'destination' into _destination_exists; + perform upsert_account(_ledger, posting ->> 'destination', _account_metadata -> (posting ->> 'destination'), _insertion_date, _effective_date); - perform "{{.Bucket}}".insert_move(_transaction_seq, _ledger, _insertion_date, _effective_date, + perform insert_move(_transaction_seq, _ledger, _insertion_date, _effective_date, posting ->> 'source', posting ->> 'asset', (posting ->> 'amount')::numeric, true, _source_exists); - perform "{{.Bucket}}".insert_move(_transaction_seq, _ledger, _insertion_date, _effective_date, + perform insert_move(_transaction_seq, _ledger, _insertion_date, _effective_date, posting ->> 'destination', posting ->> 'asset', (posting ->> 'amount')::numeric, false, _destination_exists); end; -$$; +$$ set search_path from current; -create or replace function "{{.Bucket}}".handle_log() returns trigger +create or replace function handle_log() returns trigger security definer language plpgsql as @@ -150,54 +150,54 @@ declare _value jsonb; begin if new.type = 'NEW_TRANSACTION' then - perform "{{.Bucket}}".insert_transaction(new.ledger, new.data -> 'transaction', new.date, new.data -> 'accountMetadata'); + perform insert_transaction(new.ledger, new.data -> 'transaction', new.date, new.data -> 'accountMetadata'); for _key, _value in (select * from jsonb_each_text(new.data -> 'accountMetadata')) loop - perform "{{.Bucket}}".upsert_account(new.ledger, _key, _value, + perform upsert_account(new.ledger, _key, _value, (new.data -> 'transaction' ->> 'timestamp')::timestamp, (new.data -> 'transaction' ->> 'timestamp')::timestamp); end loop; end if; if new.type = 'REVERTED_TRANSACTION' then - perform "{{.Bucket}}".insert_transaction(new.ledger, new.data -> 'transaction', new.date, '{}'::jsonb); - perform "{{.Bucket}}".revert_transaction(new.ledger, (new.data ->> 'revertedTransactionID')::numeric, + perform insert_transaction(new.ledger, new.data -> 'transaction', new.date, '{}'::jsonb); + perform revert_transaction(new.ledger, (new.data ->> 'revertedTransactionID')::numeric, (new.data -> 'transaction' ->> 'timestamp')::timestamp); end if; if new.type = 'SET_METADATA' then if new.data ->> 'targetType' = 'TRANSACTION' then - perform "{{.Bucket}}".update_transaction_metadata(new.ledger, (new.data ->> 'targetId')::numeric, new.data -> 'metadata', + perform update_transaction_metadata(new.ledger, (new.data ->> 'targetId')::numeric, new.data -> 'metadata', new.date); else - perform "{{.Bucket}}".upsert_account(new.ledger, (new.data ->> 'targetId')::varchar, new.data -> 'metadata', new.date, new.date); + perform upsert_account(new.ledger, (new.data ->> 'targetId')::varchar, new.data -> 'metadata', new.date, new.date); end if; end if; if new.type = 'DELETE_METADATA' then if new.data ->> 'targetType' = 'TRANSACTION' then - perform "{{.Bucket}}".delete_transaction_metadata(new.ledger, (new.data ->> 'targetId')::numeric, new.data ->> 'key', + perform delete_transaction_metadata(new.ledger, (new.data ->> 'targetId')::numeric, new.data ->> 'key', new.date); else - perform "{{.Bucket}}".delete_account_metadata(new.ledger, (new.data ->> 'targetId')::varchar, new.data ->> 'key', + perform delete_account_metadata(new.ledger, (new.data ->> 'targetId')::varchar, new.data ->> 'key', new.date); end if; end if; return new; end; -$$; +$$ set search_path from current; -create or replace function "{{.Bucket}}".get_all_account_volumes(_ledger varchar, _account varchar, _before timestamp default null) - returns setof "{{.Bucket}}".volumes_with_asset +create or replace function get_all_account_volumes(_ledger varchar, _account varchar, _before timestamp default null) + returns setof volumes_with_asset language sql stable as $$ with all_assets as (select v.v as asset - from "{{.Bucket}}".get_all_assets(_ledger) v), + from get_all_assets(_ledger) v), moves as (select m.* from all_assets assets join lateral ( select * - from "{{.Bucket}}".moves s + from moves s where (_before is null or s.insertion_date <= _before) and s.account_address = _account and s.asset = assets.asset @@ -207,16 +207,16 @@ with all_assets as (select v.v as asset ) m on true) select moves.asset, moves.post_commit_volumes from moves -$$; +$$ set search_path from current; -drop function "{{.Bucket}}".upsert_account(_ledger varchar, _address varchar, _metadata jsonb, _date timestamp); +drop function upsert_account(_ledger varchar, _address varchar, _metadata jsonb, _date timestamp); -create index accounts_first_usage on "{{.Bucket}}".accounts (first_usage); +create index accounts_first_usage on accounts (first_usage); -update "{{.Bucket}}".accounts +update accounts set first_usage = ( select min(effective_date) - from "{{.Bucket}}".moves m + from moves m where m.accounts_seq = accounts.seq union all select accounts.insertion_date diff --git a/internal/storage/bucket/migrations/5-add-idempotency-key-index.sql b/internal/storage/bucket/migrations/5-add-idempotency-key-index.sql index 1f664513c..b44c5459d 100644 --- a/internal/storage/bucket/migrations/5-add-idempotency-key-index.sql +++ b/internal/storage/bucket/migrations/5-add-idempotency-key-index.sql @@ -1 +1 @@ -create index logs_idempotency_key on "{{.Bucket}}".logs (idempotency_key); \ No newline at end of file +create index logs_idempotency_key on logs (idempotency_key); \ No newline at end of file diff --git a/internal/storage/bucket/migrations/6-add-reference-index.sql b/internal/storage/bucket/migrations/6-add-reference-index.sql index 17c91c79f..89b0ed6f8 100644 --- a/internal/storage/bucket/migrations/6-add-reference-index.sql +++ b/internal/storage/bucket/migrations/6-add-reference-index.sql @@ -1 +1 @@ -create index transactions_reference on "{{.Bucket}}".transactions (reference); \ No newline at end of file +create index transactions_reference on transactions (reference); \ No newline at end of file diff --git a/internal/storage/bucket/migrations/7-add-ik-unique-index.sql b/internal/storage/bucket/migrations/7-add-ik-unique-index.sql index 311aa0e84..92ed59085 100644 --- a/internal/storage/bucket/migrations/7-add-ik-unique-index.sql +++ b/internal/storage/bucket/migrations/7-add-ik-unique-index.sql @@ -1,20 +1,20 @@ -update "{{.Bucket}}".logs +update logs set idempotency_key = null where idempotency_key = ''; -update "{{.Bucket}}".logs +update logs set idempotency_key = null where id in ( select unnest(duplicateLogIds.ids[2:]) as id from ( select array_agg(id order by id) as ids - from "{{.Bucket}}".logs l + from logs l where idempotency_key is not null group by idempotency_key having count(*) > 1 ) duplicateLogIds ); -drop index "{{.Bucket}}".logs_idempotency_key; +drop index logs_idempotency_key; -create unique index logs_idempotency_key on "{{.Bucket}}".logs (idempotency_key); \ No newline at end of file +create unique index logs_idempotency_key on logs (idempotency_key); \ No newline at end of file diff --git a/internal/storage/bucket/migrations/8-ik-ledger-unique-index.sql b/internal/storage/bucket/migrations/8-ik-ledger-unique-index.sql index 012de2fcb..1093bf9c0 100644 --- a/internal/storage/bucket/migrations/8-ik-ledger-unique-index.sql +++ b/internal/storage/bucket/migrations/8-ik-ledger-unique-index.sql @@ -1,3 +1,3 @@ -drop index "{{.Bucket}}".logs_idempotency_key; +drop index logs_idempotency_key; -create unique index logs_idempotency_key on "{{.Bucket}}".logs (ledger, idempotency_key); \ No newline at end of file +create unique index logs_idempotency_key on logs (ledger, idempotency_key); \ No newline at end of file diff --git a/internal/storage/bucket/migrations/9-fix-incorrect-volumes-aggregation.sql b/internal/storage/bucket/migrations/9-fix-incorrect-volumes-aggregation.sql index 251bcd820..0731ecaef 100644 --- a/internal/storage/bucket/migrations/9-fix-incorrect-volumes-aggregation.sql +++ b/internal/storage/bucket/migrations/9-fix-incorrect-volumes-aggregation.sql @@ -1,33 +1,33 @@ -create or replace function "{{.Bucket}}".get_aggregated_volumes_for_transaction(_ledger varchar, tx numeric) returns jsonb +create or replace function get_aggregated_volumes_for_transaction(_ledger varchar, tx numeric) returns jsonb stable language sql as $$ -select "{{.Bucket}}".aggregate_objects(jsonb_build_object(data.account_address, data.aggregated)) +select aggregate_objects(jsonb_build_object(data.account_address, data.aggregated)) from ( select distinct on (move.account_address, move.asset) move.account_address, - "{{.Bucket}}".volumes_to_jsonb((move.asset, "{{.Bucket}}".first(move.post_commit_volumes))) as aggregated - from (select * from "{{.Bucket}}".moves order by seq desc) move + volumes_to_jsonb((move.asset, first(move.post_commit_volumes))) as aggregated + from (select * from moves order by seq desc) move where move.transactions_seq = tx and ledger = _ledger group by move.account_address, move.asset ) data -$$; +$$ set search_path from current; -create or replace function "{{.Bucket}}".get_aggregated_effective_volumes_for_transaction(_ledger varchar, tx numeric) returns jsonb +create or replace function get_aggregated_effective_volumes_for_transaction(_ledger varchar, tx numeric) returns jsonb stable language sql as $$ -select "{{.Bucket}}".aggregate_objects(jsonb_build_object(data.account_address, data.aggregated)) +select aggregate_objects(jsonb_build_object(data.account_address, data.aggregated)) from ( select distinct on (move.account_address, move.asset) move.account_address, - "{{.Bucket}}".volumes_to_jsonb((move.asset, "{{.Bucket}}".first(move.post_commit_effective_volumes))) as aggregated - from (select * from "{{.Bucket}}".moves order by seq desc) move + volumes_to_jsonb((move.asset, first(move.post_commit_effective_volumes))) as aggregated + from (select * from moves order by seq desc) move where move.transactions_seq = tx and ledger = _ledger group by move.account_address, move.asset ) data -$$; \ No newline at end of file +$$ set search_path from current; \ No newline at end of file