From 8e28e3ea3c238fae4d5488e03798799e3dd9be14 Mon Sep 17 00:00:00 2001 From: Geoffrey Ragot Date: Wed, 2 Oct 2024 11:41:33 +0200 Subject: [PATCH] test: add some migrations tests --- internal/storage/bucket/bucket.go | 2 +- internal/storage/bucket/migrations.go | 4 +- .../bucket/migrations/11-drop-triggers.sql | 34 ++ .../bucket/migrations/11-stateless.sql | 367 ------------------ .../12-moves-add-transaction-id.sql | 15 + ...-set-transaction-timestamp-default-utc.sql | 3 + .../14-rename-address-moves-column.sql | 5 + .../15-moves-remove-accounts-seq.sql | 2 + .../16-moves-change-pvc-column-type.sql | 25 ++ .../17-transactions-fix-reference.sql | 2 + .../migrations/18-transactions-add-pvc.sql | 23 ++ .../19-logs-add-idempotency-hash.sql | 3 + .../20-moves-drop-accounts-address-array.sql | 3 + .../21-add-accounts-volumes-table.sql | 25 ++ ...ansactions-metadata-add-transaction-id.sql | 16 + .../23-accounts-metadata-add-address.sql | 15 + .../24-transactions-clean-table.sql | 6 + .../25-accounts-set-array-not-null.sql | 2 + .../migrations/26-logs-set-hash-nullable.sql | 3 + .../bucket/migrations/27-clean-index.sql | 12 + .../migrations/28-add-features-functions.sql | 178 +++++++++ internal/storage/bucket/migrations_test.go | 261 ++++++++++++- internal/storage/driver/migrations.go | 4 +- internal/storage/ledger/transactions_test.go | 33 +- 24 files changed, 637 insertions(+), 406 deletions(-) create mode 100644 internal/storage/bucket/migrations/11-drop-triggers.sql delete mode 100644 internal/storage/bucket/migrations/11-stateless.sql create mode 100644 internal/storage/bucket/migrations/12-moves-add-transaction-id.sql create mode 100644 internal/storage/bucket/migrations/13-set-transaction-timestamp-default-utc.sql create mode 100644 internal/storage/bucket/migrations/14-rename-address-moves-column.sql create mode 100644 internal/storage/bucket/migrations/15-moves-remove-accounts-seq.sql create mode 100644 internal/storage/bucket/migrations/16-moves-change-pvc-column-type.sql create mode 100644 internal/storage/bucket/migrations/17-transactions-fix-reference.sql create mode 100644 internal/storage/bucket/migrations/18-transactions-add-pvc.sql create mode 100644 internal/storage/bucket/migrations/19-logs-add-idempotency-hash.sql create mode 100644 internal/storage/bucket/migrations/20-moves-drop-accounts-address-array.sql create mode 100644 internal/storage/bucket/migrations/21-add-accounts-volumes-table.sql create mode 100644 internal/storage/bucket/migrations/22-transactions-metadata-add-transaction-id.sql create mode 100644 internal/storage/bucket/migrations/23-accounts-metadata-add-address.sql create mode 100644 internal/storage/bucket/migrations/24-transactions-clean-table.sql create mode 100644 internal/storage/bucket/migrations/25-accounts-set-array-not-null.sql create mode 100644 internal/storage/bucket/migrations/26-logs-set-hash-nullable.sql create mode 100644 internal/storage/bucket/migrations/27-clean-index.sql create mode 100644 internal/storage/bucket/migrations/28-add-features-functions.sql diff --git a/internal/storage/bucket/bucket.go b/internal/storage/bucket/bucket.go index d18a3261d..c0d2eb229 100644 --- a/internal/storage/bucket/bucket.go +++ b/internal/storage/bucket/bucket.go @@ -22,7 +22,7 @@ func (b *Bucket) Migrate(ctx context.Context) error { } func (b *Bucket) IsUpToDate(ctx context.Context) (bool, error) { - ret, err := getMigrator(b.name).IsUpToDate(ctx, b.db) + ret, err := GetMigrator(b.name).IsUpToDate(ctx, b.db) if err != nil && errors.Is(err, migrations.ErrMissingVersionTable) { return false, nil } diff --git a/internal/storage/bucket/migrations.go b/internal/storage/bucket/migrations.go index c41c41159..d1bbf65ad 100644 --- a/internal/storage/bucket/migrations.go +++ b/internal/storage/bucket/migrations.go @@ -16,7 +16,7 @@ import ( //go:embed migrations var migrationsDir embed.FS -func getMigrator(name string) *migrations.Migrator { +func GetMigrator(name string) *migrations.Migrator { migrator := migrations.NewMigrator(migrations.WithSchema(name, true)) migrator.RegisterMigrationsFromFileSystem(migrationsDir, "migrations", func(s string) string { @@ -39,5 +39,5 @@ func Migrate(ctx context.Context, db bun.IDB, name string) error { ctx, span := tracing.Start(ctx, "Migrate bucket") defer span.End() - return getMigrator(name).Up(ctx, db) + return GetMigrator(name).Up(ctx, db) } diff --git a/internal/storage/bucket/migrations/11-drop-triggers.sql b/internal/storage/bucket/migrations/11-drop-triggers.sql new file mode 100644 index 000000000..f176aea95 --- /dev/null +++ b/internal/storage/bucket/migrations/11-drop-triggers.sql @@ -0,0 +1,34 @@ +drop trigger "insert_account" on "{{.Bucket}}".accounts; +drop trigger "update_account" on "{{.Bucket}}".accounts; +drop trigger "insert_transaction" on "{{.Bucket}}".transactions; +drop trigger "update_transaction" on "{{.Bucket}}".transactions; +drop trigger "insert_log" on "{{.Bucket}}".logs; + +drop aggregate "{{.Bucket}}".aggregate_objects(jsonb); +drop aggregate "{{.Bucket}}".first(anyelement); + +drop function "{{.Bucket}}".array_distinct(anyarray); +drop function "{{.Bucket}}".insert_posting(_transaction_seq bigint, _ledger character varying, _insertion_date timestamp without time zone, _effective_date timestamp without time zone, posting jsonb, _account_metadata jsonb); +drop function "{{.Bucket}}".upsert_account(_ledger character varying, _address character varying, _metadata jsonb, _date timestamp without time zone, _first_usage timestamp without time zone); +drop function "{{.Bucket}}".get_latest_move_for_account_and_asset(_ledger character varying, _account_address character varying, _asset character varying, _before timestamp without time zone); +drop function "{{.Bucket}}".update_transaction_metadata(_ledger character varying, _id numeric, _metadata jsonb, _date timestamp without time zone); +drop function "{{.Bucket}}".delete_account_metadata(_ledger character varying, _address character varying, _key character varying, _date timestamp without time zone); +drop function "{{.Bucket}}".delete_transaction_metadata(_ledger character varying, _id numeric, _key character varying, _date timestamp without time zone); +drop function "{{.Bucket}}".balance_from_volumes(v "{{.Bucket}}".volumes); +drop function "{{.Bucket}}".get_all_account_volumes(_ledger character varying, _account character varying, _before timestamp without time zone); +drop function "{{.Bucket}}".first_agg(anyelement, anyelement); +drop function "{{.Bucket}}".volumes_to_jsonb(v "{{.Bucket}}".volumes_with_asset); +drop function "{{.Bucket}}".get_account_aggregated_effective_volumes(_ledger character varying, _account_address character varying, _before timestamp without time zone); +drop function "{{.Bucket}}".handle_log(); +drop function "{{.Bucket}}".get_account_aggregated_volumes(_ledger character varying, _account_address character varying, _before timestamp without time zone); +drop function "{{.Bucket}}".get_aggregated_volumes_for_transaction(_ledger character varying, tx numeric); +drop function "{{.Bucket}}".insert_move(_transactions_seq bigint, _ledger character varying, _insertion_date timestamp without time zone, _effective_date timestamp without time zone, _account_address character varying, _asset character varying, _amount numeric, _is_source boolean, _account_exists boolean); +drop function "{{.Bucket}}".get_all_assets(_ledger character varying); +drop function "{{.Bucket}}".insert_transaction(_ledger character varying, data jsonb, _date timestamp without time zone, _account_metadata jsonb); +drop function "{{.Bucket}}".get_all_account_effective_volumes(_ledger character varying, _account character varying, _before timestamp without time zone); +drop function "{{.Bucket}}".get_account_balance(_ledger character varying, _account character varying, _asset character varying, _before timestamp without time zone); +drop function "{{.Bucket}}".get_aggregated_effective_volumes_for_transaction(_ledger character varying, tx numeric); +drop function "{{.Bucket}}".aggregate_ledger_volumes(_ledger character varying, _before timestamp without time zone, _accounts character varying[], _assets character varying[] ); +drop function "{{.Bucket}}".get_transaction(_ledger character varying, _id numeric, _before timestamp without time zone); +--drop function "{{.Bucket}}".explode_address(_address character varying); +drop function "{{.Bucket}}".revert_transaction(_ledger character varying, _id numeric, _date timestamp without time zone); \ No newline at end of file diff --git a/internal/storage/bucket/migrations/11-stateless.sql b/internal/storage/bucket/migrations/11-stateless.sql deleted file mode 100644 index 16ce55876..000000000 --- a/internal/storage/bucket/migrations/11-stateless.sql +++ /dev/null @@ -1,367 +0,0 @@ -drop trigger "insert_log" on "{{.Bucket}}".logs; - -drop index transactions_reference; -create unique index transactions_reference on "{{.Bucket}}".transactions (ledger, reference); - -alter table "{{.Bucket}}".transactions -add column inserted_at timestamp without time zone -default (now() at time zone 'utc'); - -alter table "{{.Bucket}}".transactions -alter column timestamp -set default (now() at time zone 'utc'); - -alter table "{{.Bucket}}".transactions -add column post_commit_volumes jsonb not null; - -alter table "{{.Bucket}}".moves -add column post_commit_volumes_jsonb jsonb; - -alter table "{{.Bucket}}".moves -add column post_commit_effective_volumes_jsonb jsonb; - -alter table "{{.Bucket}}".moves -drop column transactions_seq; - -alter table "{{.Bucket}}".moves -drop column accounts_seq; - -alter table "{{.Bucket}}".moves -add column transactions_id bigint not null ; - -alter table "{{.Bucket}}".moves -rename column account_address to accounts_address; - -alter table "{{.Bucket}}".moves -rename column account_address_array to accounts_address_array; - -alter table "{{.Bucket}}".moves -drop column post_commit_volumes; - -alter table "{{.Bucket}}".moves -drop column post_commit_effective_volumes; - -alter table "{{.Bucket}}".moves -drop column accounts_address_array; - -alter table "{{.Bucket}}".moves -rename post_commit_volumes_jsonb to post_commit_volumes; - -alter table "{{.Bucket}}".moves -rename post_commit_effective_volumes_jsonb to post_commit_effective_volumes; - -alter table "{{.Bucket}}".moves -alter column post_commit_volumes -drop not null, -alter column post_commit_effective_volumes -drop not null; - --- todo: need migrate -alter table "{{.Bucket}}".transactions_metadata -add column transactions_id bigint not null; - -update "{{.Bucket}}".transactions_metadata -set transactions_id = ( - select transactions_id - from "{{.Bucket}}".transactions - where ledger = transactions_metadata.ledger and seq = transactions_metadata.transactions_seq -); - -alter table "{{.Bucket}}".transactions_metadata -drop column transactions_seq; - --- todo: need migrate -alter table "{{.Bucket}}".accounts_metadata -drop column accounts_seq; - -alter table "{{.Bucket}}".accounts_metadata -add column accounts_address varchar not null; - -alter table "{{.Bucket}}".transactions -alter column id -type bigint; - -alter table "{{.Bucket}}".transactions -drop column seq; - -alter table "{{.Bucket}}".accounts -alter column address_array drop not null; - -alter table "{{.Bucket}}".logs -alter column hash -drop not null; - --- Change from jsonb to json to keep keys order and ensure consistent hashing -alter table "{{.Bucket}}".logs -alter column data -type json; - -alter table "{{.Bucket}}".logs -add column idempotency_hash bytea; - -create unique index accounts_metadata_ledger on "{{.Bucket}}".accounts_metadata (ledger, accounts_address, revision); -create index accounts_metadata_revisions on "{{.Bucket}}".accounts_metadata(accounts_address asc, revision desc) include (metadata, date); - -create unique index transactions_metadata_ledger on "{{.Bucket}}".transactions_metadata (ledger, transactions_id, revision); -create index transactions_metadata_revisions on "{{.Bucket}}".transactions_metadata(transactions_id asc, revision desc) include (metadata, date); - --- todo: add migration --- update "{{.Bucket}}".moves --- set post_commit_volumes_jsonb = json_build_object( --- 'input', ((moves.post_commit_volumes).inputs), --- 'output', ((moves.post_commit_volumes).outputs) --- ); --- --- update "{{.Bucket}}".moves --- set post_commit_effective_volumes_jsonb = json_build_object( --- 'input', ((moves.post_commit_effective_volumes).inputs), --- 'output', ((moves.post_commit_effective_volumes).outputs) --- ); - -create table "{{.Bucket}}".accounts_volumes ( - ledger varchar not null, - accounts_address varchar not null, - asset varchar not null, - input numeric not null, - output numeric not null, - - primary key (ledger, accounts_address, asset) -); - -create view "{{.Bucket}}".balances as -select ledger, accounts_address, asset, input - output as balance -from "{{.Bucket}}".accounts_volumes; - -insert into "{{.Bucket}}".accounts_volumes (ledger, accounts_address, asset, input, output) -select distinct on (ledger, accounts_address, asset) - ledger, - accounts_address, - asset, - (moves.post_commit_volumes->>'input')::numeric as input, - (moves.post_commit_volumes->>'output')::numeric as output -from ( - select * - from "{{.Bucket}}".moves - order by seq desc -) moves; - -drop trigger "insert_account" on "{{.Bucket}}".accounts; -drop trigger "update_account" on "{{.Bucket}}".accounts; -drop trigger "insert_transaction" on "{{.Bucket}}".transactions; -drop trigger "update_transaction" on "{{.Bucket}}".transactions; - -drop index transactions_sources_arrays; -drop index transactions_destinations_arrays; -drop index accounts_address_array; -drop index accounts_address_array_length; -drop index transactions_sources; -drop index transactions_destinations; - -drop aggregate "{{.Bucket}}".aggregate_objects(jsonb); -drop aggregate "{{.Bucket}}".first(anyelement); - -drop function "{{.Bucket}}".array_distinct(anyarray); -drop function "{{.Bucket}}".insert_posting(_transaction_seq bigint, _ledger character varying, _insertion_date timestamp without time zone, _effective_date timestamp without time zone, posting jsonb, _account_metadata jsonb); -drop function "{{.Bucket}}".upsert_account(_ledger character varying, _address character varying, _metadata jsonb, _date timestamp without time zone, _first_usage timestamp without time zone); -drop function "{{.Bucket}}".get_latest_move_for_account_and_asset(_ledger character varying, _account_address character varying, _asset character varying, _before timestamp without time zone); -drop function "{{.Bucket}}".update_transaction_metadata(_ledger character varying, _id numeric, _metadata jsonb, _date timestamp without time zone); -drop function "{{.Bucket}}".delete_account_metadata(_ledger character varying, _address character varying, _key character varying, _date timestamp without time zone); -drop function "{{.Bucket}}".delete_transaction_metadata(_ledger character varying, _id numeric, _key character varying, _date timestamp without time zone); -drop function "{{.Bucket}}".balance_from_volumes(v "{{.Bucket}}".volumes); -drop function "{{.Bucket}}".get_all_account_volumes(_ledger character varying, _account character varying, _before timestamp without time zone); -drop function "{{.Bucket}}".first_agg(anyelement, anyelement); -drop function "{{.Bucket}}".volumes_to_jsonb(v "{{.Bucket}}".volumes_with_asset); -drop function "{{.Bucket}}".get_account_aggregated_effective_volumes(_ledger character varying, _account_address character varying, _before timestamp without time zone); -drop function "{{.Bucket}}".handle_log(); -drop function "{{.Bucket}}".get_account_aggregated_volumes(_ledger character varying, _account_address character varying, _before timestamp without time zone); -drop function "{{.Bucket}}".get_aggregated_volumes_for_transaction(_ledger character varying, tx numeric); -drop function "{{.Bucket}}".insert_move(_transactions_seq bigint, _ledger character varying, _insertion_date timestamp without time zone, _effective_date timestamp without time zone, _account_address character varying, _asset character varying, _amount numeric, _is_source boolean, _account_exists boolean); -drop function "{{.Bucket}}".get_all_assets(_ledger character varying); -drop function "{{.Bucket}}".insert_transaction(_ledger character varying, data jsonb, _date timestamp without time zone, _account_metadata jsonb); -drop function "{{.Bucket}}".get_all_account_effective_volumes(_ledger character varying, _account character varying, _before timestamp without time zone); -drop function "{{.Bucket}}".get_account_balance(_ledger character varying, _account character varying, _asset character varying, _before timestamp without time zone); -drop function "{{.Bucket}}".get_aggregated_effective_volumes_for_transaction(_ledger character varying, tx numeric); -drop function "{{.Bucket}}".aggregate_ledger_volumes(_ledger character varying, _before timestamp without time zone, _accounts character varying[], _assets character varying[] ); -drop function "{{.Bucket}}".get_transaction(_ledger character varying, _id numeric, _before timestamp without time zone); ---drop function "{{.Bucket}}".explode_address(_address character varying); -drop function "{{.Bucket}}".revert_transaction(_ledger character varying, _id numeric, _date timestamp without time zone); - -drop type "{{.Bucket}}".volumes_with_asset; -drop type "{{.Bucket}}".volumes; - -create function "{{.Bucket}}".set_effective_volumes() - returns trigger - security definer - language plpgsql -as -$$ -begin - new.post_commit_effective_volumes = coalesce(( - select json_build_object( - 'input', (post_commit_effective_volumes->>'input')::numeric + case when new.is_source then 0 else new.amount end, - 'output', (post_commit_effective_volumes->>'output')::numeric + case when new.is_source then new.amount else 0 end - ) - from "{{.Bucket}}".moves - where accounts_address = new.accounts_address - and asset = new.asset - and ledger = new.ledger - and (effective_date < new.effective_date or (effective_date = new.effective_date and seq < new.seq)) - order by effective_date desc, seq desc - limit 1 - ), json_build_object( - 'input', case when new.is_source then 0 else new.amount end, - 'output', case when new.is_source then new.amount else 0 end - )); - - return new; -end; -$$; - -create function "{{.Bucket}}".update_effective_volumes() - returns trigger - security definer - language plpgsql -as -$$ -begin - update "{{.Bucket}}".moves - set post_commit_effective_volumes = json_build_object( - 'input', (post_commit_effective_volumes->>'input')::numeric + case when new.is_source then 0 else new.amount end, - 'output', (post_commit_effective_volumes->>'output')::numeric + case when new.is_source then new.amount else 0 end - ) - where accounts_address = new.accounts_address - and asset = new.asset - and effective_date > new.effective_date - and ledger = new.ledger; - - return new; -end; -$$; - -create or replace function "{{.Bucket}}".update_transaction_metadata_history() returns trigger - security definer - language plpgsql -as -$$ -begin - insert into "{{.Bucket}}".transactions_metadata (ledger, transactions_id, revision, date, metadata) - values (new.ledger, new.id, ( - select revision + 1 - from "{{.Bucket}}".transactions_metadata - where transactions_metadata.transactions_id = new.id and transactions_metadata.ledger = new.ledger - order by revision desc - limit 1 - ), new.updated_at, new.metadata); - - return new; -end; -$$; - -create or replace function "{{.Bucket}}".insert_transaction_metadata_history() returns trigger - security definer - language plpgsql -as -$$ -begin - insert into "{{.Bucket}}".transactions_metadata (ledger, transactions_id, revision, date, metadata) - values (new.ledger, new.id, 1, new.timestamp, new.metadata); - - return new; -end; -$$; - -create or replace function "{{.Bucket}}".update_account_metadata_history() returns trigger - security definer - language plpgsql -as -$$ -begin - insert into "{{.Bucket}}".accounts_metadata (ledger, accounts_address, revision, date, metadata) - values (new.ledger, new.address, ( - select revision + 1 - from "{{.Bucket}}".accounts_metadata - where accounts_metadata.accounts_address = new.address - order by revision desc - limit 1 - ), new.updated_at, new.metadata); - - return new; -end; -$$; - -create or replace function "{{.Bucket}}".insert_account_metadata_history() returns trigger - security definer - language plpgsql -as -$$ -begin - insert into "{{.Bucket}}".accounts_metadata (ledger, accounts_address, revision, date, metadata) - values (new.ledger, new.address, 1, new.insertion_date, new.metadata); - - return new; -end; -$$; - -create or replace function "{{.Bucket}}".explode_address(_address varchar) - returns jsonb - language sql - immutable -as -$$ -select public.aggregate_objects(jsonb_build_object(data.number - 1, data.value)) -from (select row_number() over () as number, v.value - from (select unnest(string_to_array(_address, ':')) as value - union all - select null) v) data -$$; - -create or replace function "{{.Bucket}}".set_transaction_addresses() returns trigger - security definer - language plpgsql -as -$$ -begin - - new.sources = ( - select to_jsonb(array_agg(v->>'source')) as value - from jsonb_array_elements(new.postings::jsonb) v - ); - new.destinations = ( - select to_jsonb(array_agg(v->>'destination')) as value - from jsonb_array_elements(new.postings::jsonb) v - ); - - return new; -end -$$; - -create or replace function "{{.Bucket}}".set_transaction_addresses_segments() returns trigger - security definer - language plpgsql -as -$$ -begin - new.sources_arrays = ( - select to_jsonb(array_agg("{{.Bucket}}".explode_address(v ->> 'source'))) as value - from jsonb_array_elements(new.postings::jsonb) v - ); - new.destinations_arrays = ( - select to_jsonb(array_agg("{{.Bucket}}".explode_address(v ->> 'destination'))) as value - from jsonb_array_elements(new.postings::jsonb) v - ); - - return new; -end -$$; - -create or replace function "{{.Bucket}}".set_address_array_for_account() returns trigger - security definer - language plpgsql -as -$$ -begin - new.address_array = to_json(string_to_array(new.address, ':')); - - return new; -end -$$; \ No newline at end of file diff --git a/internal/storage/bucket/migrations/12-moves-add-transaction-id.sql b/internal/storage/bucket/migrations/12-moves-add-transaction-id.sql new file mode 100644 index 000000000..2bd6209f8 --- /dev/null +++ b/internal/storage/bucket/migrations/12-moves-add-transaction-id.sql @@ -0,0 +1,15 @@ +alter table "{{.Bucket}}".moves +add column transactions_id bigint; + +update "{{.Bucket}}".moves +set transactions_id = ( + select id + from "{{.Bucket}}".transactions + where seq = transactions_seq +); + +alter table "{{.Bucket}}".moves +alter column transactions_id set not null; + +alter table "{{.Bucket}}".moves +drop column transactions_seq; \ No newline at end of file diff --git a/internal/storage/bucket/migrations/13-set-transaction-timestamp-default-utc.sql b/internal/storage/bucket/migrations/13-set-transaction-timestamp-default-utc.sql new file mode 100644 index 000000000..c30c79a30 --- /dev/null +++ b/internal/storage/bucket/migrations/13-set-transaction-timestamp-default-utc.sql @@ -0,0 +1,3 @@ +alter table "{{.Bucket}}".transactions +add column inserted_at timestamp without time zone +default (now() at time zone 'utc'); \ No newline at end of file diff --git a/internal/storage/bucket/migrations/14-rename-address-moves-column.sql b/internal/storage/bucket/migrations/14-rename-address-moves-column.sql new file mode 100644 index 000000000..6b3626d7b --- /dev/null +++ b/internal/storage/bucket/migrations/14-rename-address-moves-column.sql @@ -0,0 +1,5 @@ +alter table "{{.Bucket}}".moves +rename column account_address to accounts_address; + +alter table "{{.Bucket}}".moves +rename column account_address_array to accounts_address_array; \ No newline at end of file diff --git a/internal/storage/bucket/migrations/15-moves-remove-accounts-seq.sql b/internal/storage/bucket/migrations/15-moves-remove-accounts-seq.sql new file mode 100644 index 000000000..a684830d7 --- /dev/null +++ b/internal/storage/bucket/migrations/15-moves-remove-accounts-seq.sql @@ -0,0 +1,2 @@ +alter table "{{.Bucket}}".moves +drop column accounts_seq; \ No newline at end of file diff --git a/internal/storage/bucket/migrations/16-moves-change-pvc-column-type.sql b/internal/storage/bucket/migrations/16-moves-change-pvc-column-type.sql new file mode 100644 index 000000000..8be078add --- /dev/null +++ b/internal/storage/bucket/migrations/16-moves-change-pvc-column-type.sql @@ -0,0 +1,25 @@ +-- update post_commit_volumes of table moves to jsonb +alter table "{{.Bucket}}".moves +add column post_commit_volumes_jsonb jsonb; + +update "{{.Bucket}}".moves +set post_commit_volumes_jsonb = json_build_object('input', (post_commit_volumes).inputs, 'output', (post_commit_volumes).outputs); + +alter table "{{.Bucket}}".moves +drop column post_commit_volumes; + +alter table "{{.Bucket}}".moves +rename post_commit_volumes_jsonb to post_commit_volumes; + +-- update post_commit_volumes of table moves to jsonb +alter table "{{.Bucket}}".moves +add column post_commit_effective_volumes_jsonb jsonb; + +update "{{.Bucket}}".moves +set post_commit_effective_volumes_jsonb = json_build_object('input', (post_commit_effective_volumes).inputs, 'output', (post_commit_effective_volumes).outputs); + +alter table "{{.Bucket}}".moves +drop column post_commit_effective_volumes; + +alter table "{{.Bucket}}".moves +rename post_commit_effective_volumes_jsonb to post_commit_effective_volumes; \ No newline at end of file diff --git a/internal/storage/bucket/migrations/17-transactions-fix-reference.sql b/internal/storage/bucket/migrations/17-transactions-fix-reference.sql new file mode 100644 index 000000000..f6064a6cd --- /dev/null +++ b/internal/storage/bucket/migrations/17-transactions-fix-reference.sql @@ -0,0 +1,2 @@ +drop index transactions_reference; +create unique index transactions_reference on "{{.Bucket}}".transactions (ledger, reference); \ No newline at end of file diff --git a/internal/storage/bucket/migrations/18-transactions-add-pvc.sql b/internal/storage/bucket/migrations/18-transactions-add-pvc.sql new file mode 100644 index 000000000..abd437598 --- /dev/null +++ b/internal/storage/bucket/migrations/18-transactions-add-pvc.sql @@ -0,0 +1,23 @@ +-- add post_commit_volumes column on transactions table +alter table "{{.Bucket}}".transactions +add column post_commit_volumes jsonb; + +update "{{.Bucket}}".transactions +set + post_commit_volumes = ( + select public.aggregate_objects(post_commit_volumes::jsonb) as post_commit_volumes + from ( + select accounts_address, json_build_object(accounts_address, post_commit_volumes) post_commit_volumes + from ( + select accounts_address, json_build_object(asset, post_commit_volumes) as post_commit_volumes + from ( + select distinct on (accounts_address, asset) accounts_address, asset, first_value(post_commit_volumes) over (partition by accounts_address, asset order by seq desc) as post_commit_volumes + from "{{.Bucket}}".moves + ) moves + ) values + ) values + where transactions.sources ? accounts_address or transactions.destinations ? accounts_address + ); + +alter table "{{.Bucket}}".transactions +alter column post_commit_volumes set not null ; \ No newline at end of file diff --git a/internal/storage/bucket/migrations/19-logs-add-idempotency-hash.sql b/internal/storage/bucket/migrations/19-logs-add-idempotency-hash.sql new file mode 100644 index 000000000..f1eedaa79 --- /dev/null +++ b/internal/storage/bucket/migrations/19-logs-add-idempotency-hash.sql @@ -0,0 +1,3 @@ +--todo: add special traitement on code when value is empty +alter table "{{.Bucket}}".logs +add column idempotency_hash bytea; \ No newline at end of file diff --git a/internal/storage/bucket/migrations/20-moves-drop-accounts-address-array.sql b/internal/storage/bucket/migrations/20-moves-drop-accounts-address-array.sql new file mode 100644 index 000000000..ac71ff830 --- /dev/null +++ b/internal/storage/bucket/migrations/20-moves-drop-accounts-address-array.sql @@ -0,0 +1,3 @@ +-- drop accounts_address_array from moves +alter table "{{.Bucket}}".moves +drop column accounts_address_array; \ No newline at end of file diff --git a/internal/storage/bucket/migrations/21-add-accounts-volumes-table.sql b/internal/storage/bucket/migrations/21-add-accounts-volumes-table.sql new file mode 100644 index 000000000..7d33713e5 --- /dev/null +++ b/internal/storage/bucket/migrations/21-add-accounts-volumes-table.sql @@ -0,0 +1,25 @@ +create table "{{.Bucket}}".accounts_volumes ( + ledger varchar not null, + accounts_address varchar not null, + asset varchar not null, + input numeric not null, + output numeric not null, + + primary key (ledger, accounts_address, asset) +); + +insert into "{{.Bucket}}".accounts_volumes (ledger, accounts_address, asset, input, output) +select distinct on (ledger, accounts_address, asset) + ledger, + accounts_address, + asset, + (moves.post_commit_volumes->>'input')::numeric as input, + (moves.post_commit_volumes->>'output')::numeric as output +from ( + select distinct (ledger, accounts_address, asset) + ledger, + accounts_address, + asset, + first_value(post_commit_volumes) over (partition by (accounts_address, asset) order by seq desc) as post_commit_volumes + from "{{.Bucket}}".moves +) moves; \ No newline at end of file diff --git a/internal/storage/bucket/migrations/22-transactions-metadata-add-transaction-id.sql b/internal/storage/bucket/migrations/22-transactions-metadata-add-transaction-id.sql new file mode 100644 index 000000000..1106541a1 --- /dev/null +++ b/internal/storage/bucket/migrations/22-transactions-metadata-add-transaction-id.sql @@ -0,0 +1,16 @@ +alter table "{{.Bucket}}".transactions_metadata +add column transactions_id bigint; + +update "{{.Bucket}}".transactions_metadata +set transactions_id = ( + select id + from "{{.Bucket}}".transactions + where transactions_metadata.transactions_seq = transactions.seq +); + +alter table "{{.Bucket}}".transactions_metadata +drop column transactions_seq; + +alter table "{{.Bucket}}".transactions_metadata +alter column transactions_id +set not null; \ No newline at end of file diff --git a/internal/storage/bucket/migrations/23-accounts-metadata-add-address.sql b/internal/storage/bucket/migrations/23-accounts-metadata-add-address.sql new file mode 100644 index 000000000..2cfd22bf9 --- /dev/null +++ b/internal/storage/bucket/migrations/23-accounts-metadata-add-address.sql @@ -0,0 +1,15 @@ +alter table "{{.Bucket}}".accounts_metadata +add column accounts_address varchar; + +update "{{.Bucket}}".accounts_metadata +set accounts_address = ( + select address + from "{{.Bucket}}".accounts + where accounts_metadata.accounts_seq = seq +); + +alter table "{{.Bucket}}".accounts_metadata +drop column accounts_seq; + +alter table "{{.Bucket}}".accounts_metadata +alter column accounts_address set not null; \ No newline at end of file diff --git a/internal/storage/bucket/migrations/24-transactions-clean-table.sql b/internal/storage/bucket/migrations/24-transactions-clean-table.sql new file mode 100644 index 000000000..3d333b11c --- /dev/null +++ b/internal/storage/bucket/migrations/24-transactions-clean-table.sql @@ -0,0 +1,6 @@ +alter table "{{.Bucket}}".transactions +alter column id +type bigint; + +alter table "{{.Bucket}}".transactions +drop column seq; \ No newline at end of file diff --git a/internal/storage/bucket/migrations/25-accounts-set-array-not-null.sql b/internal/storage/bucket/migrations/25-accounts-set-array-not-null.sql new file mode 100644 index 000000000..dbfee3ec1 --- /dev/null +++ b/internal/storage/bucket/migrations/25-accounts-set-array-not-null.sql @@ -0,0 +1,2 @@ +alter table "{{.Bucket}}".accounts +alter column address_array drop not null; \ No newline at end of file diff --git a/internal/storage/bucket/migrations/26-logs-set-hash-nullable.sql b/internal/storage/bucket/migrations/26-logs-set-hash-nullable.sql new file mode 100644 index 000000000..38c4b57d9 --- /dev/null +++ b/internal/storage/bucket/migrations/26-logs-set-hash-nullable.sql @@ -0,0 +1,3 @@ +alter table "{{.Bucket}}".logs +alter column hash +drop not null; \ No newline at end of file diff --git a/internal/storage/bucket/migrations/27-clean-index.sql b/internal/storage/bucket/migrations/27-clean-index.sql new file mode 100644 index 000000000..b1a8fc133 --- /dev/null +++ b/internal/storage/bucket/migrations/27-clean-index.sql @@ -0,0 +1,12 @@ +create unique index accounts_metadata_ledger on "{{.Bucket}}".accounts_metadata (ledger, accounts_address, revision); +create index accounts_metadata_revisions on "{{.Bucket}}".accounts_metadata(accounts_address asc, revision desc) include (metadata, date); + +create unique index transactions_metadata_ledger on "{{.Bucket}}".transactions_metadata (ledger, transactions_id, revision); +create index transactions_metadata_revisions on "{{.Bucket}}".transactions_metadata(transactions_id asc, revision desc) include (metadata, date); + +drop index transactions_sources_arrays; +drop index transactions_destinations_arrays; +drop index accounts_address_array; +drop index accounts_address_array_length; +drop index transactions_sources; +drop index transactions_destinations; diff --git a/internal/storage/bucket/migrations/28-add-features-functions.sql b/internal/storage/bucket/migrations/28-add-features-functions.sql new file mode 100644 index 000000000..7c1a095be --- /dev/null +++ b/internal/storage/bucket/migrations/28-add-features-functions.sql @@ -0,0 +1,178 @@ +create function "{{.Bucket}}".set_effective_volumes() + returns trigger + security definer + language plpgsql +as +$$ +begin + new.post_commit_effective_volumes = coalesce(( + select json_build_object( + 'input', (post_commit_effective_volumes->>'input')::numeric + case when new.is_source then 0 else new.amount end, + 'output', (post_commit_effective_volumes->>'output')::numeric + case when new.is_source then new.amount else 0 end + ) + from "{{.Bucket}}".moves + where accounts_address = new.accounts_address + and asset = new.asset + and ledger = new.ledger + and (effective_date < new.effective_date or (effective_date = new.effective_date and seq < new.seq)) + order by effective_date desc, seq desc + limit 1 + ), json_build_object( + 'input', case when new.is_source then 0 else new.amount end, + 'output', case when new.is_source then new.amount else 0 end + )); + + return new; +end; +$$; + +create function "{{.Bucket}}".update_effective_volumes() + returns trigger + security definer + language plpgsql +as +$$ +begin + update "{{.Bucket}}".moves + set post_commit_effective_volumes = json_build_object( + 'input', (post_commit_effective_volumes->>'input')::numeric + case when new.is_source then 0 else new.amount end, + 'output', (post_commit_effective_volumes->>'output')::numeric + case when new.is_source then new.amount else 0 end + ) + where accounts_address = new.accounts_address + and asset = new.asset + and effective_date > new.effective_date + and ledger = new.ledger; + + return new; +end; +$$; + +create or replace function "{{.Bucket}}".update_transaction_metadata_history() returns trigger + security definer + language plpgsql +as +$$ +begin + insert into "{{.Bucket}}".transactions_metadata (ledger, transactions_id, revision, date, metadata) + values (new.ledger, new.id, ( + select revision + 1 + from "{{.Bucket}}".transactions_metadata + where transactions_metadata.transactions_id = new.id and transactions_metadata.ledger = new.ledger + order by revision desc + limit 1 + ), new.updated_at, new.metadata); + + return new; +end; +$$; + +create or replace function "{{.Bucket}}".insert_transaction_metadata_history() returns trigger + security definer + language plpgsql +as +$$ +begin + insert into "{{.Bucket}}".transactions_metadata (ledger, transactions_id, revision, date, metadata) + values (new.ledger, new.id, 1, new.timestamp, new.metadata); + + return new; +end; +$$; + +create or replace function "{{.Bucket}}".update_account_metadata_history() returns trigger + security definer + language plpgsql +as +$$ +begin + insert into "{{.Bucket}}".accounts_metadata (ledger, accounts_address, revision, date, metadata) + values (new.ledger, new.address, ( + select revision + 1 + from "{{.Bucket}}".accounts_metadata + where accounts_metadata.accounts_address = new.address + order by revision desc + limit 1 + ), new.updated_at, new.metadata); + + return new; +end; +$$; + +create or replace function "{{.Bucket}}".insert_account_metadata_history() returns trigger + security definer + language plpgsql +as +$$ +begin + insert into "{{.Bucket}}".accounts_metadata (ledger, accounts_address, revision, date, metadata) + values (new.ledger, new.address, 1, new.insertion_date, new.metadata); + + return new; +end; +$$; + +create or replace function "{{.Bucket}}".explode_address(_address varchar) + returns jsonb + language sql + immutable +as +$$ +select public.aggregate_objects(jsonb_build_object(data.number - 1, data.value)) +from (select row_number() over () as number, v.value + from (select unnest(string_to_array(_address, ':')) as value + union all + select null) v) data +$$; + +create or replace function "{{.Bucket}}".set_transaction_addresses() returns trigger + security definer + language plpgsql +as +$$ +begin + + new.sources = ( + select to_jsonb(array_agg(v->>'source')) as value + from jsonb_array_elements(new.postings::jsonb) v + ); + new.destinations = ( + select to_jsonb(array_agg(v->>'destination')) as value + from jsonb_array_elements(new.postings::jsonb) v + ); + + return new; +end +$$; + +create or replace function "{{.Bucket}}".set_transaction_addresses_segments() returns trigger + security definer + language plpgsql +as +$$ +begin + new.sources_arrays = ( + select to_jsonb(array_agg("{{.Bucket}}".explode_address(v ->> 'source'))) as value + from jsonb_array_elements(new.postings::jsonb) v + ); + new.destinations_arrays = ( + select to_jsonb(array_agg("{{.Bucket}}".explode_address(v ->> 'destination'))) as value + from jsonb_array_elements(new.postings::jsonb) v + ); + + return new; +end +$$; + +create or replace function "{{.Bucket}}".set_address_array_for_account() returns trigger + security definer + language plpgsql +as +$$ +begin + new.address_array = to_json(string_to_array(new.address, ':')); + + return new; +end +$$; +-- +-- --todo: add test for changing logs format of created transaction and reverted transaction \ No newline at end of file diff --git a/internal/storage/bucket/migrations_test.go b/internal/storage/bucket/migrations_test.go index eb95cf54c..54782457b 100644 --- a/internal/storage/bucket/migrations_test.go +++ b/internal/storage/bucket/migrations_test.go @@ -5,7 +5,11 @@ package bucket_test import ( "context" "github.com/formancehq/go-libs/testing/migrations" + "github.com/formancehq/go-libs/time" + ledger "github.com/formancehq/ledger/internal" + "github.com/formancehq/ledger/internal/storage/bucket" "github.com/formancehq/ledger/internal/storage/driver" + "math/big" "testing" "github.com/formancehq/go-libs/bun/bunconnect" @@ -32,16 +36,257 @@ func TestMigrations(t *testing.T) { require.NoError(t, db.Close()) }) - test := migrations.NewMigrationTest(t, driver.GetMigrator(), db) - test.Append(8, removeSequenceOnTransactionTable) + require.NoError(t, driver.Migrate(ctx, db)) + + test := migrations.NewMigrationTest(t, bucket.GetMigrator(ledger.DefaultBucket), db) + test.Append(12, removeSequenceOnMovesTable) + test.Append(16, changePVCColumnTypeOfMoves) + test.Append(18, addTransactionsPVC) + test.Append(21, addAccountsVolumesTable) + test.Append(22, addTransactionIDOnTransactionsMetadataTable) + test.Append(23, addAccountAddressOnAccountsMetadataTable) test.Run() } -var removeSequenceOnTransactionTable = migrations.Hook{ - Before: func(ctx context.Context, t *testing.T, db bun.IDB) { +var ( + now = time.Now() + removeSequenceOnMovesTable = migrations.Hook{ + Before: func(ctx context.Context, t *testing.T, db bun.IDB) { + // insert some accounts + _, err := db.NewInsert(). + Model(&map[string]any{ + "ledger": "foo", + "address": "world", + "address_array": []string{"world"}, + "seq": 1, + "insertion_date": now, + "updated_at": now, + }). + TableExpr(ledger.DefaultBucket + ".accounts"). + Exec(ctx) + require.NoError(t, err) - }, - After: func(ctx context.Context, t *testing.T, db bun.IDB) { + _, err = db.NewInsert(). + Model(&map[string]any{ + "ledger": "foo", + "address": "bank", + "address_array": []string{"bank"}, + "seq": 2, + "insertion_date": now, + "updated_at": now, + }). + TableExpr(ledger.DefaultBucket + ".accounts"). + Exec(ctx) + require.NoError(t, err) - }, -} + // insert a transaction + _, err = db.NewInsert(). + Model(&map[string]any{ + "ledger": "foo", + "id": 1, + "seq": 1, + "timestamp": time.Now(), + "postings": []any{}, + "sources": []string{"world"}, + "destinations": []string{"bank"}, + }). + TableExpr(ledger.DefaultBucket + ".transactions"). + Exec(ctx) + require.NoError(t, err) + + // insert moves + _, err = db.NewInsert(). + Model(&map[string]any{ + "ledger": "foo", + "seq": 1, + "asset": "USD", + "amount": big.NewInt(100), + "transactions_seq": 1, + "accounts_seq": 1, + "account_address": "world", + "account_address_array": []string{"world"}, + "post_commit_volumes": "(0, 100)", + "post_commit_effective_volumes": "(0, 100)", + "insertion_date": now, + "effective_date": now, + "is_source": true, + }). + TableExpr(ledger.DefaultBucket + ".moves"). + Exec(ctx) + require.NoError(t, err) + + _, err = db.NewInsert(). + Model(&map[string]any{ + "ledger": "foo", + "seq": 3, + "asset": "USD", + "amount": big.NewInt(100), + "transactions_seq": 1, + "accounts_seq": 2, + "account_address": "bank", + "account_address_array": []string{"bank"}, + "post_commit_volumes": "(100, 0)", + "post_commit_effective_volumes": "(100, 0)", + "insertion_date": now, + "effective_date": now, + "is_source": false, + }). + TableExpr(ledger.DefaultBucket + ".moves"). + Exec(ctx) + require.NoError(t, err) + }, + After: func(ctx context.Context, t *testing.T, db bun.IDB) { + ret := make([]map[string]any, 0) + err := db.NewSelect(). + ModelTableExpr(ledger.DefaultBucket + ".moves"). + Model(&ret). + Scan(ctx) + require.NoError(t, err) + require.Len(t, ret, 2) + require.Equal(t, int64(1), ret[0]["transactions_id"]) + require.Equal(t, int64(1), ret[1]["transactions_id"]) + }, + } + changePVCColumnTypeOfMoves = migrations.Hook{ + After: func(ctx context.Context, t *testing.T, db bun.IDB) { + type model struct { + bun.BaseModel `bun:"alias:moves"` + + Volumes ledger.Volumes `bun:"post_commit_volumes"` + EffectiveVolumes ledger.Volumes `bun:"post_commit_effective_volumes"` + } + ret := make([]model, 0) + err := db.NewSelect(). + Model(&ret). + ModelTableExpr(ledger.DefaultBucket + ".moves"). + Order("seq"). + Scan(ctx) + require.NoError(t, err) + + require.Len(t, ret, 2) + require.Equal(t, ledger.NewVolumesInt64(0, 100), ret[0].Volumes) + require.Equal(t, ledger.NewVolumesInt64(100, 0), ret[1].Volumes) + require.Equal(t, ledger.NewVolumesInt64(0, 100), ret[0].EffectiveVolumes) + require.Equal(t, ledger.NewVolumesInt64(100, 0), ret[1].EffectiveVolumes) + }, + } + addTransactionsPVC = migrations.Hook{ + After: func(ctx context.Context, t *testing.T, db bun.IDB) { + type model struct { + bun.BaseModel `bun:"alias:transactions"` + + PostCommitVolumes ledger.PostCommitVolumes `bun:"post_commit_volumes"` + } + ret := make([]model, 0) + err := db.NewSelect(). + Model(&ret). + ModelTableExpr(ledger.DefaultBucket + ".transactions"). + Order("seq"). + Scan(ctx) + require.NoError(t, err) + + require.Len(t, ret, 1) + require.Equal(t, ledger.PostCommitVolumes{ + "world": { + "USD": ledger.NewVolumesInt64(0, 100), + }, + "bank": { + "USD": ledger.NewVolumesInt64(100, 0), + }, + }, ret[0].PostCommitVolumes) + }, + } + addAccountsVolumesTable = migrations.Hook{ + After: func(ctx context.Context, t *testing.T, db bun.IDB) { + type model struct { + bun.BaseModel `bun:"alias:accounts_volumes"` + + Address string `bun:"accounts_address"` + Asset string `bun:"asset"` + Input *big.Int `bun:"input"` + Output *big.Int `bun:"output"` + } + ret := make([]model, 0) + err := db.NewSelect(). + Model(&ret). + ModelTableExpr(ledger.DefaultBucket + ".accounts_volumes"). + Order("accounts_address"). + Scan(ctx) + require.NoError(t, err) + + require.Len(t, ret, 2) + require.Equal(t, model{ + Address: "bank", + Asset: "USD", + Input: big.NewInt(100), + Output: big.NewInt(0), + }, ret[0]) + require.Equal(t, model{ + Address: "world", + Asset: "USD", + Input: big.NewInt(0), + Output: big.NewInt(100), + }, ret[1]) + }, + } + addTransactionIDOnTransactionsMetadataTable = migrations.Hook{ + Before: func(ctx context.Context, t *testing.T, db bun.IDB) { + _, err := db.NewInsert(). + Model(&map[string]any{ + "ledger": "foo", + "transactions_seq": 1, + "revision": 1, + "date": now, + "metadata": map[string]string{"foo": "bar"}, + }). + TableExpr(ledger.DefaultBucket + ".transactions_metadata"). + Exec(ctx) + require.NoError(t, err) + }, + After: func(ctx context.Context, t *testing.T, db bun.IDB) { + type model struct { + bun.BaseModel `bun:"alias:transactions_metadata"` + + TransactionID int `bun:"transactions_id"` + } + ret := make([]model, 0) + err := db.NewSelect(). + Model(&ret). + ModelTableExpr(ledger.DefaultBucket + ".transactions_metadata"). + Scan(ctx) + require.NoError(t, err) + require.Len(t, ret, 1) + require.Equal(t, 1, ret[0].TransactionID) + }, + } + addAccountAddressOnAccountsMetadataTable = migrations.Hook{ + Before: func(ctx context.Context, t *testing.T, db bun.IDB) { + _, err := db.NewInsert(). + Model(&map[string]any{ + "ledger": "foo", + "accounts_seq": 1, + "revision": 1, + "date": now, + "metadata": map[string]string{"foo": "bar"}, + }). + TableExpr(ledger.DefaultBucket + ".accounts_metadata"). + Exec(ctx) + require.NoError(t, err) + }, + After: func(ctx context.Context, t *testing.T, db bun.IDB) { + type model struct { + bun.BaseModel `bun:"alias:accounts_metadata"` + + Address string `bun:"accounts_address"` + } + ret := make([]model, 0) + err := db.NewSelect(). + Model(&ret). + ModelTableExpr(ledger.DefaultBucket + ".accounts_metadata"). + Scan(ctx) + require.NoError(t, err) + require.Len(t, ret, 1) + require.Equal(t, "world", ret[0].Address) + }, + } +) diff --git a/internal/storage/driver/migrations.go b/internal/storage/driver/migrations.go index 957c175ef..231acb4b1 100644 --- a/internal/storage/driver/migrations.go +++ b/internal/storage/driver/migrations.go @@ -124,7 +124,7 @@ func GetMigrator() *migrations.Migrator { migrations.Migration{ Name: "Add aggregate_objects pg aggregator", UpWithContext: func(ctx context.Context, tx bun.Tx) error { - _, err := tx.ExecContext(ctx, jsonbMerge) + _, err := tx.ExecContext(ctx, aggregateObjects) return err }, }, @@ -156,7 +156,7 @@ func Migrate(ctx context.Context, db bun.IDB) error { return GetMigrator().Up(ctx, db) } -const jsonbMerge = ` +const aggregateObjects = ` create or replace function public.jsonb_concat(a jsonb, b jsonb) returns jsonb as 'select $1 || $2' language sql diff --git a/internal/storage/ledger/transactions_test.go b/internal/storage/ledger/transactions_test.go index 8536ca971..76a93002c 100644 --- a/internal/storage/ledger/transactions_test.go +++ b/internal/storage/ledger/transactions_test.go @@ -517,37 +517,18 @@ func TestTransactionsInsert(t *testing.T) { require.Error(t, err) require.True(t, errors.Is(err, ledgercontroller.ErrTransactionReferenceConflict{})) }) - t.Run("create a tx with no timestamp", func(t *testing.T) { - t.Parallel() - - store := newLedgerStore(t) - - // Create a tx with no timestamp - tx1 := ledger.Transaction{ - TransactionData: ledger.TransactionData{ - Postings: []ledger.Posting{ - ledger.NewPosting("world", "bank", "USD/2", big.NewInt(100)), - }, - }, - } - err := store.InsertTransaction(ctx, &tx1) - require.NoError(t, err) - }) t.Run("check denormalization", func(t *testing.T) { t.Parallel() store := newLedgerStore(t) - tx1 := ledger.Transaction{ - TransactionData: ledger.TransactionData{ - Timestamp: now, - InsertedAt: now, - Postings: []ledger.Posting{ - ledger.NewPosting("world", "bank", "USD/2", big.NewInt(100)), - }, - Metadata: metadata.Metadata{}, - }, - } + tx1 := ledger.NewTransaction(). + WithPostings( + ledger.NewPosting("world", "bank", "USD/2", big.NewInt(100)), + ). + WithInsertedAt(now). + WithTimestamp(now) + err := store.InsertTransaction(ctx, &tx1) require.NoError(t, err)