diff --git a/internal/storage/bucket/migrations/11-stateless.sql b/internal/storage/bucket/migrations/11-stateless.sql index 643505773..6f2033b20 100644 --- a/internal/storage/bucket/migrations/11-stateless.sql +++ b/internal/storage/bucket/migrations/11-stateless.sql @@ -181,7 +181,7 @@ drop function "{{.Bucket}}".get_account_balance(_ledger character varying, _acco drop function "{{.Bucket}}".get_aggregated_effective_volumes_for_transaction(_ledger character varying, tx numeric); drop function "{{.Bucket}}".aggregate_ledger_volumes(_ledger character varying, _before timestamp without time zone, _accounts character varying[], _assets character varying[] ); drop function "{{.Bucket}}".get_transaction(_ledger character varying, _id numeric, _before timestamp without time zone); -drop function "{{.Bucket}}".explode_address(_address character varying); +--drop function "{{.Bucket}}".explode_address(_address character varying); drop function "{{.Bucket}}".revert_transaction(_ledger character varying, _id numeric, _date timestamp without time zone); drop type "{{.Bucket}}".volumes_with_asset; @@ -339,4 +339,56 @@ begin return new; end; +$$; + +create or replace function "{{.Bucket}}".explode_address(_address varchar) + returns jsonb + language sql + immutable +as +$$ +select public.aggregate_objects(jsonb_build_object(data.number - 1, data.value)) +from (select row_number() over () as number, v.value + from (select unnest(string_to_array(_address, ':')) as value + union all + select null) v) data +$$; + +create or replace function "{{.Bucket}}".set_transaction_addresses() returns trigger + security definer + language plpgsql +as +$$ +begin + + new.sources = ( + select to_jsonb(array_agg(v->>'source')) as value + from jsonb_array_elements(new.postings::jsonb) v + ); + new.destinations = ( + select to_jsonb(array_agg(v->>'destination')) as value + from jsonb_array_elements(new.postings::jsonb) v + ); + + return new; +end +$$; + +create or replace function "{{.Bucket}}".set_transaction_addresses_segments() returns trigger + security definer + language plpgsql +as +$$ +begin + new.sources_arrays = ( + select to_jsonb(array_agg("{{.Bucket}}".explode_address(v ->> 'source'))) as value + from jsonb_array_elements(new.postings::jsonb) v + ); + new.destinations_arrays = ( + select to_jsonb(array_agg("{{.Bucket}}".explode_address(v ->> 'destination'))) as value + from jsonb_array_elements(new.postings::jsonb) v + ); + + return new; +end $$; \ No newline at end of file diff --git a/internal/storage/ledger/migrations/0-add-sequences.sql b/internal/storage/ledger/migrations/0-add-sequences.sql index 7a368f549..24036b890 100644 --- a/internal/storage/ledger/migrations/0-add-sequences.sql +++ b/internal/storage/ledger/migrations/0-add-sequences.sql @@ -94,6 +94,19 @@ when ( execute procedure "{{.Bucket}}".insert_transaction_metadata_history(); {{ end }} +{{ if .HasFeature "INDEX_TRANSACTION_ACCOUNTS" "ON" }} +create index "transactions_sources_{{.ID}}" on "{{.Bucket}}".transactions using gin (sources jsonb_path_ops) where ledger = '{{.Name}}'; +create index "transactions_destinations_{{.ID}}" on "{{.Bucket}}".transactions using gin (destinations jsonb_path_ops) where ledger = '{{.Name}}'; +create trigger "transaction_set_addresses_{{.ID}}" + before insert + on "{{.Bucket}}"."transactions" + for each row + when ( + new.ledger = '{{.Name}}' + ) +execute procedure "{{.Bucket}}".set_transaction_addresses(); +{{ end }} + {{ if .HasFeature "INDEX_ADDRESS_SEGMENTS" "ON" }} create index "moves_accounts_address_array_{{.ID}}" on "{{.Bucket}}".moves using gin (accounts_address_array jsonb_ops) where ledger = '{{.Name}}'; create index "moves_accounts_address_array_length_{{.ID}}" on "{{.Bucket}}".moves (jsonb_array_length(accounts_address_array)) where ledger = '{{.Name}}'; @@ -104,10 +117,14 @@ create index "accounts_address_array_length_{{.ID}}" on "{{.Bucket}}".accounts ( {{ if .HasFeature "INDEX_TRANSACTION_ACCOUNTS" "ON" }} create index "transactions_sources_arrays_{{.ID}}" on "{{.Bucket}}".transactions using gin (sources_arrays jsonb_path_ops) where ledger = '{{.Name}}'; create index "transactions_destinations_arrays_{{.ID}}" on "{{.Bucket}}".transactions using gin (destinations_arrays jsonb_path_ops) where ledger = '{{.Name}}'; -{{ end }} -{{ end }} -{{ if .HasFeature "INDEX_TRANSACTION_ACCOUNTS" "ON" }} -create index "transactions_sources_{{.ID}}" on "{{.Bucket}}".transactions using gin (sources jsonb_path_ops) where ledger = '{{.Name}}'; -create index "transactions_destinations_{{.ID}}" on "{{.Bucket}}".transactions using gin (destinations jsonb_path_ops) where ledger = '{{.Name}}'; +create trigger "transaction_set_addresses_segments_{{.ID}}" + before insert + on "{{.Bucket}}"."transactions" + for each row + when ( + new.ledger = '{{.Name}}' + ) +execute procedure "{{.Bucket}}".set_transaction_addresses_segments(); +{{ end }} {{ end }} \ No newline at end of file diff --git a/internal/storage/ledger/transactions.go b/internal/storage/ledger/transactions.go index ba70736b9..aaa2a100f 100644 --- a/internal/storage/ledger/transactions.go +++ b/internal/storage/ledger/transactions.go @@ -45,10 +45,6 @@ type Transaction struct { Metadata metadata.Metadata `bun:"metadata,type:jsonb,default:'{}'"` RevertedAt *time.Time `bun:"reverted_at,type:timestamp without time zone"` InsertedAt time.Time `bun:"inserted_at,type:timestamp without time zone,nullzero"` - Sources []string `bun:"sources,type:jsonb"` - Destinations []string `bun:"destinations,type:jsonb"` - SourcesArray []map[string]any `bun:"sources_arrays,type:jsonb"` - DestinationsArray []map[string]any `bun:"destinations_arrays,type:jsonb"` PostCommitEffectiveVolumes ledger.PostCommitVolumes `bun:"post_commit_effective_volumes,type:jsonb,scanonly"` PostCommitVolumes ledger.PostCommitVolumes `bun:"post_commit_volumes,type:jsonb"` } @@ -169,7 +165,7 @@ func (s *Store) selectTransactions(date *time.Time, expandVolumes, expandEffecti `join (?) pcev on pcev.transactions_id = transactions.id`, s.db.NewSelect(). Column("transactions_id"). - ColumnExpr("jsonb_merge_agg(pcev::jsonb) as post_commit_effective_volumes"). + ColumnExpr("aggregate_objects(pcev::jsonb) as post_commit_effective_volumes"). TableExpr( "(?) data", s.db.NewSelect(). @@ -303,8 +299,6 @@ func (s *Store) CommitTransaction(ctx context.Context, tx *ledger.Transaction) e return errors.Wrap(err, "failed to update balances") } - sources := Map(tx.Postings, ledger.Posting.GetSource) - destinations := Map(tx.Postings, ledger.Posting.GetDestination) mappedTx := &Transaction{ Ledger: s.ledger.Name, Postings: tx.Postings, @@ -312,10 +306,6 @@ func (s *Store) CommitTransaction(ctx context.Context, tx *ledger.Transaction) e Timestamp: tx.Timestamp, Reference: tx.Reference, InsertedAt: insertionDate, - Sources: sources, - Destinations: destinations, - SourcesArray: Map(sources, convertAddrToIndexedJSONB), - DestinationsArray: Map(destinations, convertAddrToIndexedJSONB), PostCommitVolumes: postCommitVolumes, } diff --git a/internal/storage/ledger/transactions_test.go b/internal/storage/ledger/transactions_test.go index 648bf7825..04553a608 100644 --- a/internal/storage/ledger/transactions_test.go +++ b/internal/storage/ledger/transactions_test.go @@ -498,32 +498,91 @@ func TestTransactionsInsert(t *testing.T) { now := time.Now() ctx := logging.TestingContext() - // create a simple tx - tx1 := Transaction{ - Ledger: store.ledger.Name, - Timestamp: now, - Reference: "foo", - } - err := store.insertTransaction(ctx, &tx1) - require.NoError(t, err) - require.NotZero(t, tx1.ID) + t.Run("check reference conflict", func(t *testing.T) { + t.Parallel() - // create another tx with the same reference - tx2 := Transaction{ - Ledger: store.ledger.Name, - Timestamp: now, - Reference: "foo", - } - err = store.insertTransaction(ctx, &tx2) - require.Error(t, err) - require.True(t, errors.Is(err, ledgercontroller.ErrReferenceConflict{})) + // create a simple tx + tx1 := Transaction{ + Ledger: store.ledger.Name, + Timestamp: now, + Reference: "foo", + Postings: []ledger.Posting{ + ledger.NewPosting("world", "bank", "USD/2", big.NewInt(100)), + }, + } + err := store.insertTransaction(ctx, &tx1) + require.NoError(t, err) + require.NotZero(t, tx1.ID) + + // create another tx with the same reference + tx2 := Transaction{ + Ledger: store.ledger.Name, + Timestamp: now, + Reference: "foo", + Postings: []ledger.Posting{ + ledger.NewPosting("world", "bank", "USD/2", big.NewInt(100)), + }, + } + err = store.insertTransaction(ctx, &tx2) + require.Error(t, err) + require.True(t, errors.Is(err, ledgercontroller.ErrReferenceConflict{})) + }) + t.Run("create a tx with no timestamp", func(t *testing.T) { + t.Parallel() - // create a tx with no timestamp - tx3 := Transaction{ - Ledger: store.ledger.Name, - } - err = store.insertTransaction(ctx, &tx3) - require.NoError(t, err) + // create a tx with no timestamp + tx1 := Transaction{ + Ledger: store.ledger.Name, + Postings: []ledger.Posting{ + ledger.NewPosting("world", "bank", "USD/2", big.NewInt(100)), + }, + } + err := store.insertTransaction(ctx, &tx1) + require.NoError(t, err) + }) + t.Run("check denormalization", func(t *testing.T) { + t.Parallel() + + tx1 := Transaction{ + Ledger: store.ledger.Name, + Timestamp: now, + InsertedAt: now, + Postings: []ledger.Posting{ + ledger.NewPosting("world", "bank", "USD/2", big.NewInt(100)), + }, + Metadata: metadata.Metadata{}, + } + err := store.insertTransaction(ctx, &tx1) + require.NoError(t, err) + + type Model struct { + Transaction + Sources []string `bun:"sources,type:jsonb"` + Destinations []string `bun:"destinations,type:jsonb"` + SourcesArrays []map[string]any `bun:"sources_arrays,type:jsonb"` + DestinationsArrays []map[string]any `bun:"destinations_arrays,type:jsonb"` + } + + m := Model{} + err = store.db.NewSelect(). + Model(&m). + ModelTableExpr(store.GetPrefixedRelationName("transactions") + " as model"). + Scan(ctx) + require.NoError(t, err) + require.Equal(t, Model{ + Transaction: tx1, + Sources: []string{"world"}, + Destinations: []string{"bank"}, + SourcesArrays: []map[string]any{{ + "0": "world", + "1": nil, + }}, + DestinationsArrays: []map[string]any{{ + "0": "bank", + "1": nil, + }}, + }, m) + }) } func TestTransactionsList(t *testing.T) { diff --git a/internal/storage/ledger/utils.go b/internal/storage/ledger/utils.go index 998ed1e6f..11b64f438 100644 --- a/internal/storage/ledger/utils.go +++ b/internal/storage/ledger/utils.go @@ -46,14 +46,3 @@ func filterAccountAddress(address, key string) string { return strings.Join(parts, " and ") } - -func convertAddrToIndexedJSONB(addr string) map[string]any { - ret := map[string]any{} - parts := strings.Split(addr, ":") - for i := range parts { - ret[fmt.Sprint(i)] = parts[i] - } - ret[fmt.Sprint(len(parts))] = nil - - return ret -} diff --git a/internal/storage/system/migrations.go b/internal/storage/system/migrations.go index c1a3200ad..ef1e27ad3 100644 --- a/internal/storage/system/migrations.go +++ b/internal/storage/system/migrations.go @@ -118,7 +118,7 @@ func getMigrator() *migrations.Migrator { }, }, migrations.Migration{ - Name: "Add jsonb_merge_agg pg aggregator", + Name: "Add aggregate_objects pg aggregator", UpWithContext: func(ctx context.Context, tx bun.Tx) error { _, err := tx.ExecContext(ctx, jsonbMerge) return err @@ -217,7 +217,7 @@ create or replace function public.jsonb_concat(a jsonb, b jsonb) returns jsonb parallel safe ; -create or replace aggregate public.jsonb_merge_agg(jsonb) +create or replace aggregate public.aggregate_objects(jsonb) ( sfunc = public.jsonb_concat, stype = jsonb,