Skip to content

Commit

Permalink
feat: make transaction denorm in sql trigger
Browse files Browse the repository at this point in the history
  • Loading branch information
gfyrag committed Oct 23, 2024
1 parent ab27ac9 commit f4f9b6c
Show file tree
Hide file tree
Showing 6 changed files with 161 additions and 54 deletions.
54 changes: 53 additions & 1 deletion internal/storage/bucket/migrations/11-stateless.sql
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ drop function "{{.Bucket}}".get_account_balance(_ledger character varying, _acco
drop function "{{.Bucket}}".get_aggregated_effective_volumes_for_transaction(_ledger character varying, tx numeric);
drop function "{{.Bucket}}".aggregate_ledger_volumes(_ledger character varying, _before timestamp without time zone, _accounts character varying[], _assets character varying[] );
drop function "{{.Bucket}}".get_transaction(_ledger character varying, _id numeric, _before timestamp without time zone);
drop function "{{.Bucket}}".explode_address(_address character varying);
--drop function "{{.Bucket}}".explode_address(_address character varying);
drop function "{{.Bucket}}".revert_transaction(_ledger character varying, _id numeric, _date timestamp without time zone);

drop type "{{.Bucket}}".volumes_with_asset;
Expand Down Expand Up @@ -339,4 +339,56 @@ begin

return new;
end;
$$;

create or replace function "{{.Bucket}}".explode_address(_address varchar)
returns jsonb
language sql
immutable
as
$$
select public.aggregate_objects(jsonb_build_object(data.number - 1, data.value))
from (select row_number() over () as number, v.value
from (select unnest(string_to_array(_address, ':')) as value
union all
select null) v) data
$$;

create or replace function "{{.Bucket}}".set_transaction_addresses() returns trigger
security definer
language plpgsql
as
$$
begin

new.sources = (
select to_jsonb(array_agg(v->>'source')) as value
from jsonb_array_elements(new.postings::jsonb) v
);
new.destinations = (
select to_jsonb(array_agg(v->>'destination')) as value
from jsonb_array_elements(new.postings::jsonb) v
);

return new;
end
$$;

create or replace function "{{.Bucket}}".set_transaction_addresses_segments() returns trigger
security definer
language plpgsql
as
$$
begin
new.sources_arrays = (
select to_jsonb(array_agg("{{.Bucket}}".explode_address(v ->> 'source'))) as value
from jsonb_array_elements(new.postings::jsonb) v
);
new.destinations_arrays = (
select to_jsonb(array_agg("{{.Bucket}}".explode_address(v ->> 'destination'))) as value
from jsonb_array_elements(new.postings::jsonb) v
);

return new;
end
$$;
27 changes: 22 additions & 5 deletions internal/storage/ledger/migrations/0-add-sequences.sql
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,19 @@ when (
execute procedure "{{.Bucket}}".insert_transaction_metadata_history();
{{ end }}

{{ if .HasFeature "INDEX_TRANSACTION_ACCOUNTS" "ON" }}
create index "transactions_sources_{{.ID}}" on "{{.Bucket}}".transactions using gin (sources jsonb_path_ops) where ledger = '{{.Name}}';
create index "transactions_destinations_{{.ID}}" on "{{.Bucket}}".transactions using gin (destinations jsonb_path_ops) where ledger = '{{.Name}}';
create trigger "transaction_set_addresses_{{.ID}}"
before insert
on "{{.Bucket}}"."transactions"
for each row
when (
new.ledger = '{{.Name}}'
)
execute procedure "{{.Bucket}}".set_transaction_addresses();
{{ end }}

{{ if .HasFeature "INDEX_ADDRESS_SEGMENTS" "ON" }}
create index "moves_accounts_address_array_{{.ID}}" on "{{.Bucket}}".moves using gin (accounts_address_array jsonb_ops) where ledger = '{{.Name}}';
create index "moves_accounts_address_array_length_{{.ID}}" on "{{.Bucket}}".moves (jsonb_array_length(accounts_address_array)) where ledger = '{{.Name}}';
Expand All @@ -104,10 +117,14 @@ create index "accounts_address_array_length_{{.ID}}" on "{{.Bucket}}".accounts (
{{ if .HasFeature "INDEX_TRANSACTION_ACCOUNTS" "ON" }}
create index "transactions_sources_arrays_{{.ID}}" on "{{.Bucket}}".transactions using gin (sources_arrays jsonb_path_ops) where ledger = '{{.Name}}';
create index "transactions_destinations_arrays_{{.ID}}" on "{{.Bucket}}".transactions using gin (destinations_arrays jsonb_path_ops) where ledger = '{{.Name}}';
{{ end }}
{{ end }}

{{ if .HasFeature "INDEX_TRANSACTION_ACCOUNTS" "ON" }}
create index "transactions_sources_{{.ID}}" on "{{.Bucket}}".transactions using gin (sources jsonb_path_ops) where ledger = '{{.Name}}';
create index "transactions_destinations_{{.ID}}" on "{{.Bucket}}".transactions using gin (destinations jsonb_path_ops) where ledger = '{{.Name}}';
create trigger "transaction_set_addresses_segments_{{.ID}}"
before insert
on "{{.Bucket}}"."transactions"
for each row
when (
new.ledger = '{{.Name}}'
)
execute procedure "{{.Bucket}}".set_transaction_addresses_segments();
{{ end }}
{{ end }}
12 changes: 1 addition & 11 deletions internal/storage/ledger/transactions.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,6 @@ type Transaction struct {
Metadata metadata.Metadata `bun:"metadata,type:jsonb,default:'{}'"`
RevertedAt *time.Time `bun:"reverted_at,type:timestamp without time zone"`
InsertedAt time.Time `bun:"inserted_at,type:timestamp without time zone,nullzero"`
Sources []string `bun:"sources,type:jsonb"`
Destinations []string `bun:"destinations,type:jsonb"`
SourcesArray []map[string]any `bun:"sources_arrays,type:jsonb"`
DestinationsArray []map[string]any `bun:"destinations_arrays,type:jsonb"`
PostCommitEffectiveVolumes ledger.PostCommitVolumes `bun:"post_commit_effective_volumes,type:jsonb,scanonly"`
PostCommitVolumes ledger.PostCommitVolumes `bun:"post_commit_volumes,type:jsonb"`
}
Expand Down Expand Up @@ -169,7 +165,7 @@ func (s *Store) selectTransactions(date *time.Time, expandVolumes, expandEffecti
`join (?) pcev on pcev.transactions_id = transactions.id`,
s.db.NewSelect().
Column("transactions_id").
ColumnExpr("jsonb_merge_agg(pcev::jsonb) as post_commit_effective_volumes").
ColumnExpr("aggregate_objects(pcev::jsonb) as post_commit_effective_volumes").
TableExpr(
"(?) data",
s.db.NewSelect().
Expand Down Expand Up @@ -303,19 +299,13 @@ func (s *Store) CommitTransaction(ctx context.Context, tx *ledger.Transaction) e
return errors.Wrap(err, "failed to update balances")
}

sources := Map(tx.Postings, ledger.Posting.GetSource)
destinations := Map(tx.Postings, ledger.Posting.GetDestination)
mappedTx := &Transaction{
Ledger: s.ledger.Name,
Postings: tx.Postings,
Metadata: tx.Metadata,
Timestamp: tx.Timestamp,
Reference: tx.Reference,
InsertedAt: insertionDate,
Sources: sources,
Destinations: destinations,
SourcesArray: Map(sources, convertAddrToIndexedJSONB),
DestinationsArray: Map(destinations, convertAddrToIndexedJSONB),
PostCommitVolumes: postCommitVolumes,
}

Expand Down
107 changes: 83 additions & 24 deletions internal/storage/ledger/transactions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -498,32 +498,91 @@ func TestTransactionsInsert(t *testing.T) {
now := time.Now()
ctx := logging.TestingContext()

// create a simple tx
tx1 := Transaction{
Ledger: store.ledger.Name,
Timestamp: now,
Reference: "foo",
}
err := store.insertTransaction(ctx, &tx1)
require.NoError(t, err)
require.NotZero(t, tx1.ID)
t.Run("check reference conflict", func(t *testing.T) {
t.Parallel()

// create another tx with the same reference
tx2 := Transaction{
Ledger: store.ledger.Name,
Timestamp: now,
Reference: "foo",
}
err = store.insertTransaction(ctx, &tx2)
require.Error(t, err)
require.True(t, errors.Is(err, ledgercontroller.ErrReferenceConflict{}))
// create a simple tx
tx1 := Transaction{
Ledger: store.ledger.Name,
Timestamp: now,
Reference: "foo",
Postings: []ledger.Posting{
ledger.NewPosting("world", "bank", "USD/2", big.NewInt(100)),
},
}
err := store.insertTransaction(ctx, &tx1)
require.NoError(t, err)
require.NotZero(t, tx1.ID)

// create another tx with the same reference
tx2 := Transaction{
Ledger: store.ledger.Name,
Timestamp: now,
Reference: "foo",
Postings: []ledger.Posting{
ledger.NewPosting("world", "bank", "USD/2", big.NewInt(100)),
},
}
err = store.insertTransaction(ctx, &tx2)
require.Error(t, err)
require.True(t, errors.Is(err, ledgercontroller.ErrReferenceConflict{}))
})
t.Run("create a tx with no timestamp", func(t *testing.T) {
t.Parallel()

// create a tx with no timestamp
tx3 := Transaction{
Ledger: store.ledger.Name,
}
err = store.insertTransaction(ctx, &tx3)
require.NoError(t, err)
// create a tx with no timestamp
tx1 := Transaction{
Ledger: store.ledger.Name,
Postings: []ledger.Posting{
ledger.NewPosting("world", "bank", "USD/2", big.NewInt(100)),
},
}
err := store.insertTransaction(ctx, &tx1)
require.NoError(t, err)
})
t.Run("check denormalization", func(t *testing.T) {
t.Parallel()

tx1 := Transaction{
Ledger: store.ledger.Name,
Timestamp: now,
InsertedAt: now,
Postings: []ledger.Posting{
ledger.NewPosting("world", "bank", "USD/2", big.NewInt(100)),
},
Metadata: metadata.Metadata{},
}
err := store.insertTransaction(ctx, &tx1)
require.NoError(t, err)

type Model struct {
Transaction
Sources []string `bun:"sources,type:jsonb"`
Destinations []string `bun:"destinations,type:jsonb"`
SourcesArrays []map[string]any `bun:"sources_arrays,type:jsonb"`
DestinationsArrays []map[string]any `bun:"destinations_arrays,type:jsonb"`
}

m := Model{}
err = store.db.NewSelect().
Model(&m).
ModelTableExpr(store.GetPrefixedRelationName("transactions") + " as model").
Scan(ctx)
require.NoError(t, err)
require.Equal(t, Model{
Transaction: tx1,
Sources: []string{"world"},
Destinations: []string{"bank"},
SourcesArrays: []map[string]any{{
"0": "world",
"1": nil,
}},
DestinationsArrays: []map[string]any{{
"0": "bank",
"1": nil,
}},
}, m)
})
}

func TestTransactionsList(t *testing.T) {
Expand Down
11 changes: 0 additions & 11 deletions internal/storage/ledger/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,14 +46,3 @@ func filterAccountAddress(address, key string) string {

return strings.Join(parts, " and ")
}

func convertAddrToIndexedJSONB(addr string) map[string]any {
ret := map[string]any{}
parts := strings.Split(addr, ":")
for i := range parts {
ret[fmt.Sprint(i)] = parts[i]
}
ret[fmt.Sprint(len(parts))] = nil

return ret
}
4 changes: 2 additions & 2 deletions internal/storage/system/migrations.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ func getMigrator() *migrations.Migrator {
},
},
migrations.Migration{
Name: "Add jsonb_merge_agg pg aggregator",
Name: "Add aggregate_objects pg aggregator",
UpWithContext: func(ctx context.Context, tx bun.Tx) error {
_, err := tx.ExecContext(ctx, jsonbMerge)
return err
Expand Down Expand Up @@ -217,7 +217,7 @@ create or replace function public.jsonb_concat(a jsonb, b jsonb) returns jsonb
parallel safe
;
create or replace aggregate public.jsonb_merge_agg(jsonb)
create or replace aggregate public.aggregate_objects(jsonb)
(
sfunc = public.jsonb_concat,
stype = jsonb,
Expand Down

0 comments on commit f4f9b6c

Please sign in to comment.