From e54fe29029155ec5af3476149e3973d5e6027191 Mon Sep 17 00:00:00 2001 From: Geoffrey Ragot Date: Thu, 24 Oct 2024 13:55:24 +0200 Subject: [PATCH] feat: use accounts_volumes table directly to get balances --- .../19-accounts-recreate-unique-index/up.sql | 2 +- .../migrations/20-clean-database/up.sql | 2 +- internal/storage/ledger/balances.go | 67 ++++------------ internal/storage/ledger/balances_test.go | 77 +++++++------------ 4 files changed, 43 insertions(+), 105 deletions(-) diff --git a/internal/storage/bucket/migrations/19-accounts-recreate-unique-index/up.sql b/internal/storage/bucket/migrations/19-accounts-recreate-unique-index/up.sql index 1d8734023..3b8ad12bd 100644 --- a/internal/storage/bucket/migrations/19-accounts-recreate-unique-index/up.sql +++ b/internal/storage/bucket/migrations/19-accounts-recreate-unique-index/up.sql @@ -6,4 +6,4 @@ -- We create this index in a dedicated as, as the doc mentions it (https://www.postgresql.org/docs/current/protocol-flow.html#PROTOCOL-FLOW-MULTI-STATEMENT) -- multi statements queries are automatically wrapped inside transaction block, and it's forbidden -- to create index concurrently inside a transaction block. -create unique index concurrently accounts_ledger2 on "{{.Bucket}}".accounts (ledger, address) \ No newline at end of file +create unique index concurrently accounts_ledger2 on "{{.Schema}}".accounts (ledger, address) \ No newline at end of file diff --git a/internal/storage/bucket/migrations/20-clean-database/up.sql b/internal/storage/bucket/migrations/20-clean-database/up.sql index 92aa70f53..cd528cb9e 100644 --- a/internal/storage/bucket/migrations/20-clean-database/up.sql +++ b/internal/storage/bucket/migrations/20-clean-database/up.sql @@ -1,4 +1,4 @@ -set search_path = '{{.Bucket}}'; +set search_path = '{{.Schema}}'; -- Clean all useless function/aggregates/indexes inherited from stateful version. drop aggregate aggregate_objects(jsonb); diff --git a/internal/storage/ledger/balances.go b/internal/storage/ledger/balances.go index b79c38080..3037e69e3 100644 --- a/internal/storage/ledger/balances.go +++ b/internal/storage/ledger/balances.go @@ -223,62 +223,25 @@ func (s *Store) GetBalances(ctx context.Context, query ledgercontroller.BalanceQ } } - // Try to insert volumes using last move (to keep compat with previous version) or 0 values. - // This way, if the account has a 0 balance at this point, it will be locked as any other accounts. - // If the complete sql transaction fails, the account volumes will not be inserted. - selectMoves := s.db.NewSelect(). - ModelTableExpr(s.GetPrefixedRelationName("moves")). - DistinctOn("accounts_address, asset"). - Column("accounts_address", "asset"). - ColumnExpr("first_value(post_commit_volumes) over (partition by accounts_address, asset order by seq desc) as post_commit_volumes"). - ColumnExpr("first_value(ledger) over (partition by accounts_address, asset order by seq desc) as ledger"). - Where("("+strings.Join(conditions, ") OR (")+")", args...) - - zeroValuesAndMoves := s.db.NewSelect(). - TableExpr("(?) data", selectMoves). - Column("ledger", "accounts_address", "asset"). - ColumnExpr("(post_commit_volumes).inputs as input"). - ColumnExpr("(post_commit_volumes).outputs as output"). - UnionAll( - s.db.NewSelect(). - TableExpr( - "(?) data", - s.db.NewSelect().NewValues(&accountsVolumes), - ). - Column("*"), - ) - - zeroValueOrMoves := s.db.NewSelect(). - TableExpr("(?) data", zeroValuesAndMoves). - Column("ledger", "accounts_address", "asset", "input", "output"). - DistinctOn("ledger, accounts_address, asset") - - insertDefaultValue := s.db.NewInsert(). - TableExpr(s.GetPrefixedRelationName("accounts_volumes")). - TableExpr("(" + zeroValueOrMoves.String() + ") data"). - On("conflict (ledger, accounts_address, asset) do nothing"). - Returning("ledger, accounts_address, asset, input, output") - - selectExistingValues := s.db.NewSelect(). + err := s.db.NewSelect(). + With( + "ins", + // Try to insert volumes with 0 values. + // This way, if the account has a 0 balance at this point, it will be locked as any other accounts. + // It the complete sql transaction fail, the account volumes will not be inserted. + s.db.NewInsert(). + Model(&accountsVolumes). + ModelTableExpr(s.GetPrefixedRelationName("accounts_volumes")). + On("conflict do nothing"), + ). + Model(&accountsVolumes). ModelTableExpr(s.GetPrefixedRelationName("accounts_volumes")). - Column("ledger", "accounts_address", "asset", "input", "output"). + Column("accounts_address", "asset", "input", "output"). Where("("+strings.Join(conditions, ") OR (")+")", args...). For("update"). // notes(gfyrag): Keep order, it ensures consistent locking order and limit deadlocks - Order("accounts_address", "asset") - - finalQuery := s.db.NewSelect(). - With("inserted", insertDefaultValue). - With("existing", selectExistingValues). - ModelTableExpr( - "(?) accounts_volumes", - s.db.NewSelect(). - ModelTableExpr("inserted"). - UnionAll(s.db.NewSelect().ModelTableExpr("existing")), - ). - Model(&accountsVolumes) - - err := finalQuery.Scan(ctx) + Order("accounts_address", "asset"). + Scan(ctx) if err != nil { return nil, postgres.ResolveError(err) } diff --git a/internal/storage/ledger/balances_test.go b/internal/storage/ledger/balances_test.go index 3096e8952..1d2cddfef 100644 --- a/internal/storage/ledger/balances_test.go +++ b/internal/storage/ledger/balances_test.go @@ -4,7 +4,6 @@ package ledger_test import ( "database/sql" - "github.com/formancehq/go-libs/v2/bun/bunpaginate" "math/big" "testing" @@ -44,6 +43,32 @@ func TestBalancesGet(t *testing.T) { }) require.NoError(t, err) + t.Run("get balances of not existing account should create an empty row", func(t *testing.T) { + t.Parallel() + + balances, err := store.GetBalances(ctx, ledgercontroller.BalanceQuery{ + "orders:1234": []string{"USD"}, + }) + require.NoError(t, err) + require.Len(t, balances, 1) + require.NotNil(t, balances["orders:1234"]) + require.Len(t, balances["orders:1234"], 1) + require.Equal(t, big.NewInt(0), balances["orders:1234"]["USD"]) + + volumes := make([]*ledger.AccountsVolumes, 0) + + err = store.GetDB().NewSelect(). + Model(&volumes). + ModelTableExpr(store.GetPrefixedRelationName("accounts_volumes")). + Where("accounts_address = ?", "orders:1234"). + Scan(ctx) + require.NoError(t, err) + require.Len(t, volumes, 1) + require.Equal(t, "USD", volumes[0].Asset) + require.Equal(t, big.NewInt(0), volumes[0].Input) + require.Equal(t, big.NewInt(0), volumes[0].Output) + }) + t.Run("check concurrent access on same balance", func(t *testing.T) { t.Parallel() @@ -128,56 +153,6 @@ func TestBalancesGet(t *testing.T) { require.NoError(t, err) require.Equal(t, 2, count) }) - - t.Run("with balance from move", func(t *testing.T) { - t.Parallel() - - tx := ledger.NewTransaction().WithPostings( - ledger.NewPosting("world", "bank", "USD", big.NewInt(100)), - ) - err := store.InsertTransaction(ctx, &tx) - require.NoError(t, err) - - bankAccount := ledger.Account{ - Address: "bank", - FirstUsage: tx.InsertedAt, - InsertionDate: tx.InsertedAt, - UpdatedAt: tx.InsertedAt, - } - _, err = store.UpsertAccount(ctx, &bankAccount) - require.NoError(t, err) - - err = store.InsertMoves(ctx, &ledger.Move{ - TransactionID: tx.ID, - IsSource: false, - Account: "bank", - Amount: (*bunpaginate.BigInt)(big.NewInt(100)), - Asset: "USD", - InsertionDate: tx.InsertedAt, - EffectiveDate: tx.InsertedAt, - PostCommitVolumes: pointer.For(ledger.NewVolumesInt64(100, 0)), - }) - require.NoError(t, err) - - balances, err := store.GetBalances(ctx, ledgercontroller.BalanceQuery{ - "bank": {"USD"}, - }) - require.NoError(t, err) - - require.NotNil(t, balances["bank"]) - RequireEqual(t, big.NewInt(100), balances["bank"]["USD"]) - - // Check a new line has been inserted into accounts_volumes table - volumes := &ledger.AccountsVolumes{} - err = store.GetDB().NewSelect(). - ModelTableExpr(store.GetPrefixedRelationName("accounts_volumes")). - Where("accounts_address = ?", "bank"). - Scan(ctx, volumes) - require.NoError(t, err) - - RequireEqual(t, big.NewInt(100), volumes.Input) - RequireEqual(t, big.NewInt(0), volumes.Output) - }) } func TestBalancesAggregates(t *testing.T) {