From 4f373537116c6a4627175f8a62302a1da22178a0 Mon Sep 17 00:00:00 2001 From: Geoffrey Ragot Date: Fri, 27 Sep 2024 13:20:31 +0200 Subject: [PATCH] feat: move model in core --- internal/account.go | 10 +++++ internal/storage/ledger/balances.go | 50 +++++++----------------- internal/storage/ledger/balances_test.go | 3 +- internal/storage/ledger/transactions.go | 9 ++--- internal/storage/ledger/volumes.go | 27 +++++++------ internal/storage/ledger/volumes_test.go | 9 ++--- 6 files changed, 46 insertions(+), 62 deletions(-) diff --git a/internal/account.go b/internal/account.go index 35ae93a13..75d385b13 100644 --- a/internal/account.go +++ b/internal/account.go @@ -4,6 +4,7 @@ import ( "github.com/formancehq/go-libs/metadata" "github.com/formancehq/go-libs/time" "github.com/uptrace/bun" + "math/big" ) const ( @@ -21,3 +22,12 @@ type Account struct { Volumes VolumesByAssets `json:"volumes,omitempty" bun:"pcv,scanonly"` EffectiveVolumes VolumesByAssets `json:"effectiveVolumes,omitempty" bun:"pcev,scanonly"` } + +type AccountsVolumes struct { + bun.BaseModel `bun:"accounts_volumes"` + + Account string `bun:"accounts_address,type:varchar"` + Asset string `bun:"asset,type:varchar"` + Input *big.Int `bun:"input,type:numeric"` + Output *big.Int `bun:"output,type:numeric"` +} diff --git a/internal/storage/ledger/balances.go b/internal/storage/ledger/balances.go index 026c21a2b..f5373e9a6 100644 --- a/internal/storage/ledger/balances.go +++ b/internal/storage/ledger/balances.go @@ -204,15 +204,22 @@ func (s *Store) GetBalances(ctx context.Context, query ledgercontroller.BalanceQ } } - accountsVolumes := make([]AccountsVolumes, 0) + type AccountsVolumesWithLedger struct { + ledger.AccountsVolumes `bun:",extend"` + Ledger string `bun:"ledger,type:varchar"` + } + + accountsVolumes := make([]AccountsVolumesWithLedger, 0) for account, assets := range query { for _, asset := range assets { - accountsVolumes = append(accountsVolumes, AccountsVolumes{ - Ledger: s.ledger.Name, - Account: account, - Asset: asset, - Input: new(big.Int), - Output: new(big.Int), + accountsVolumes = append(accountsVolumes, AccountsVolumesWithLedger{ + Ledger: s.ledger.Name, + AccountsVolumes: ledger.AccountsVolumes{ + Account: account, + Asset: asset, + Input: new(big.Int), + Output: new(big.Int), + }, }) } } @@ -262,32 +269,3 @@ func (s *Store) GetBalances(ctx context.Context, query ledgercontroller.BalanceQ return ret, nil }) } - -/** -SELECT to_json(array_agg(json_build_object('asset', accounts.asset, 'input', (accounts.volumes->>'input')::numeric, 'output', (accounts.volumes->>'output')::numeric))) AS aggregated -FROM ( - SELECT * - FROM ( - SELECT *, accounts.address_array - FROM ( - SELECT "asset", "accounts_address", post_commit_volumes AS volumes - FROM ( - SELECT DISTINCT ON (accounts_address, asset) - "accounts_address", - "asset", - first_value(post_commit_volumes) OVER (PARTITION BY (accounts_address, asset) ORDER BY seq DESC) AS post_commit_volumes - FROM ( - SELECT * - FROM "87b28082".moves - WHERE (ledger = '87b28082') AND (insertion_date <= '2024-09-26T14:45:10.568382Z') - ORDER BY "seq" DESC - ) moves - WHERE (ledger = '87b28082') AND (insertion_date <= '2024-09-26T14:45:10.568382Z') - ) moves - ) accounts_volumes - JOIN "87b28082".accounts accounts ON accounts.address = accounts_volumes.accounts_address - ) accounts - WHERE (jsonb_array_length(accounts.address_array) = 2 - AND accounts.address_array @@ ('$[0] == "users"')::jsonpath) -) accounts -*/ diff --git a/internal/storage/ledger/balances_test.go b/internal/storage/ledger/balances_test.go index e0f996134..3de02d295 100644 --- a/internal/storage/ledger/balances_test.go +++ b/internal/storage/ledger/balances_test.go @@ -35,8 +35,7 @@ func TestBalancesGet(t *testing.T) { _, err := store.UpsertAccount(ctx, world) require.NoError(t, err) - _, err = store.updateVolumes(ctx, AccountsVolumes{ - Ledger: store.ledger.Name, + _, err = store.updateVolumes(ctx, ledger.AccountsVolumes{ Account: "world", Asset: "USD", Input: new(big.Int), diff --git a/internal/storage/ledger/transactions.go b/internal/storage/ledger/transactions.go index f622c3523..ac76a033f 100644 --- a/internal/storage/ledger/transactions.go +++ b/internal/storage/ledger/transactions.go @@ -260,7 +260,7 @@ func (s *Store) CommitTransaction(ctx context.Context, tx *ledger.Transaction) e } } - postCommitVolumes, err := s.updateVolumes(ctx, volumeUpdates(s.ledger.Name, tx)...) + postCommitVolumes, err := s.updateVolumes(ctx, volumeUpdates(tx)...) if err != nil { return errors.Wrap(err, "failed to update balances") } @@ -560,7 +560,7 @@ func filterAccountAddressOnTransactions(address string, source, destination bool } } -func volumeUpdates(l string, transaction *ledger.Transaction) []AccountsVolumes { +func volumeUpdates(transaction *ledger.Transaction) []ledger.AccountsVolumes { aggregatedVolumes := make(map[string]map[string][]ledger.Posting) for _, posting := range transaction.Postings { if _, ok := aggregatedVolumes[posting.Source]; !ok { @@ -578,7 +578,7 @@ func volumeUpdates(l string, transaction *ledger.Transaction) []AccountsVolumes aggregatedVolumes[posting.Destination][posting.Asset] = append(aggregatedVolumes[posting.Destination][posting.Asset], posting) } - ret := make([]AccountsVolumes, 0) + ret := make([]ledger.AccountsVolumes, 0) for account, movesByAsset := range aggregatedVolumes { for asset, postings := range movesByAsset { volumes := ledger.NewEmptyVolumes() @@ -591,8 +591,7 @@ func volumeUpdates(l string, transaction *ledger.Transaction) []AccountsVolumes } } - ret = append(ret, AccountsVolumes{ - Ledger: l, + ret = append(ret, ledger.AccountsVolumes{ Account: account, Asset: asset, Input: volumes.Input, diff --git a/internal/storage/ledger/volumes.go b/internal/storage/ledger/volumes.go index 79aa7e7d4..1cc37a2e6 100644 --- a/internal/storage/ledger/volumes.go +++ b/internal/storage/ledger/volumes.go @@ -3,8 +3,7 @@ package ledger import ( "context" "fmt" - "math/big" - + "github.com/formancehq/go-libs/collectionutils" "github.com/formancehq/go-libs/platform/postgres" "github.com/formancehq/ledger/internal/tracing" @@ -17,21 +16,23 @@ import ( "github.com/uptrace/bun" ) -type AccountsVolumes struct { - bun.BaseModel `bun:"accounts_volumes"` +func (s *Store) updateVolumes(ctx context.Context, accountVolumes ...ledger.AccountsVolumes) (ledger.PostCommitVolumes, error) { + return tracing.TraceWithLatency(ctx, "UpdateBalances", func(ctx context.Context) (ledger.PostCommitVolumes, error) { - Ledger string `bun:"ledger,type:varchar"` - Account string `bun:"accounts_address,type:varchar"` - Asset string `bun:"asset,type:varchar"` - Input *big.Int `bun:"input,type:numeric"` - Output *big.Int `bun:"output,type:numeric"` -} + type AccountsVolumesWithLedger struct { + ledger.AccountsVolumes `bun:",extend"` + Ledger string `bun:"ledger,type:varchar"` + } -func (s *Store) updateVolumes(ctx context.Context, accountVolumes ...AccountsVolumes) (ledger.PostCommitVolumes, error) { - return tracing.TraceWithLatency(ctx, "UpdateBalances", func(ctx context.Context) (ledger.PostCommitVolumes, error) { + accountsVolumesWithLedger := collectionutils.Map(accountVolumes, func(from ledger.AccountsVolumes) AccountsVolumesWithLedger { + return AccountsVolumesWithLedger{ + AccountsVolumes: from, + Ledger: s.ledger.Name, + } + }) _, err := s.db.NewInsert(). - Model(&accountVolumes). + Model(&accountsVolumesWithLedger). ModelTableExpr(s.GetPrefixedRelationName("accounts_volumes")). On("conflict (ledger, accounts_address, asset) do update"). Set("input = accounts_volumes.input + excluded.input"). diff --git a/internal/storage/ledger/volumes_test.go b/internal/storage/ledger/volumes_test.go index ba5203e53..6c1551980 100644 --- a/internal/storage/ledger/volumes_test.go +++ b/internal/storage/ledger/volumes_test.go @@ -695,8 +695,7 @@ func TestUpdateVolumes(t *testing.T) { store := newLedgerStore(t) ctx := logging.TestingContext() - volumes, err := store.updateVolumes(ctx, AccountsVolumes{ - Ledger: store.ledger.Name, + volumes, err := store.updateVolumes(ctx, ledger.AccountsVolumes{ Account: "world", Asset: "USD/2", Input: big.NewInt(0), @@ -709,8 +708,7 @@ func TestUpdateVolumes(t *testing.T) { }, }, volumes) - volumes, err = store.updateVolumes(ctx, AccountsVolumes{ - Ledger: store.ledger.Name, + volumes, err = store.updateVolumes(ctx, ledger.AccountsVolumes{ Account: "world", Asset: "USD/2", Input: big.NewInt(50), @@ -723,8 +721,7 @@ func TestUpdateVolumes(t *testing.T) { }, }, volumes) - volumes, err = store.updateVolumes(ctx, AccountsVolumes{ - Ledger: store.ledger.Name, + volumes, err = store.updateVolumes(ctx, ledger.AccountsVolumes{ Account: "world", Asset: "USD/2", Input: big.NewInt(50),