Skip to content

Commit

Permalink
feat: use accounts_volumes table directly to get balances
Browse files Browse the repository at this point in the history
  • Loading branch information
gfyrag committed Nov 4, 2024
1 parent 36d0ea2 commit e54fe29
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 105 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,4 @@
-- We create this index in a dedicated as, as the doc mentions it (https://www.postgresql.org/docs/current/protocol-flow.html#PROTOCOL-FLOW-MULTI-STATEMENT)
-- multi statements queries are automatically wrapped inside transaction block, and it's forbidden
-- to create index concurrently inside a transaction block.
create unique index concurrently accounts_ledger2 on "{{.Bucket}}".accounts (ledger, address)
create unique index concurrently accounts_ledger2 on "{{.Schema}}".accounts (ledger, address)
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
set search_path = '{{.Bucket}}';
set search_path = '{{.Schema}}';

-- Clean all useless function/aggregates/indexes inherited from stateful version.
drop aggregate aggregate_objects(jsonb);
Expand Down
67 changes: 15 additions & 52 deletions internal/storage/ledger/balances.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,62 +223,25 @@ func (s *Store) GetBalances(ctx context.Context, query ledgercontroller.BalanceQ
}
}

// Try to insert volumes using last move (to keep compat with previous version) or 0 values.
// This way, if the account has a 0 balance at this point, it will be locked as any other accounts.
// If the complete sql transaction fails, the account volumes will not be inserted.
selectMoves := s.db.NewSelect().
ModelTableExpr(s.GetPrefixedRelationName("moves")).
DistinctOn("accounts_address, asset").
Column("accounts_address", "asset").
ColumnExpr("first_value(post_commit_volumes) over (partition by accounts_address, asset order by seq desc) as post_commit_volumes").
ColumnExpr("first_value(ledger) over (partition by accounts_address, asset order by seq desc) as ledger").
Where("("+strings.Join(conditions, ") OR (")+")", args...)

zeroValuesAndMoves := s.db.NewSelect().
TableExpr("(?) data", selectMoves).
Column("ledger", "accounts_address", "asset").
ColumnExpr("(post_commit_volumes).inputs as input").
ColumnExpr("(post_commit_volumes).outputs as output").
UnionAll(
s.db.NewSelect().
TableExpr(
"(?) data",
s.db.NewSelect().NewValues(&accountsVolumes),
).
Column("*"),
)

zeroValueOrMoves := s.db.NewSelect().
TableExpr("(?) data", zeroValuesAndMoves).
Column("ledger", "accounts_address", "asset", "input", "output").
DistinctOn("ledger, accounts_address, asset")

insertDefaultValue := s.db.NewInsert().
TableExpr(s.GetPrefixedRelationName("accounts_volumes")).
TableExpr("(" + zeroValueOrMoves.String() + ") data").
On("conflict (ledger, accounts_address, asset) do nothing").
Returning("ledger, accounts_address, asset, input, output")

selectExistingValues := s.db.NewSelect().
err := s.db.NewSelect().
With(
"ins",
// Try to insert volumes with 0 values.
// This way, if the account has a 0 balance at this point, it will be locked as any other accounts.
// It the complete sql transaction fail, the account volumes will not be inserted.
s.db.NewInsert().
Model(&accountsVolumes).
ModelTableExpr(s.GetPrefixedRelationName("accounts_volumes")).
On("conflict do nothing"),
).
Model(&accountsVolumes).
ModelTableExpr(s.GetPrefixedRelationName("accounts_volumes")).
Column("ledger", "accounts_address", "asset", "input", "output").
Column("accounts_address", "asset", "input", "output").
Where("("+strings.Join(conditions, ") OR (")+")", args...).
For("update").
// notes(gfyrag): Keep order, it ensures consistent locking order and limit deadlocks
Order("accounts_address", "asset")

finalQuery := s.db.NewSelect().
With("inserted", insertDefaultValue).
With("existing", selectExistingValues).
ModelTableExpr(
"(?) accounts_volumes",
s.db.NewSelect().
ModelTableExpr("inserted").
UnionAll(s.db.NewSelect().ModelTableExpr("existing")),
).
Model(&accountsVolumes)

err := finalQuery.Scan(ctx)
Order("accounts_address", "asset").
Scan(ctx)
if err != nil {
return nil, postgres.ResolveError(err)
}
Expand Down
77 changes: 26 additions & 51 deletions internal/storage/ledger/balances_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ package ledger_test

import (
"database/sql"
"github.com/formancehq/go-libs/v2/bun/bunpaginate"
"math/big"
"testing"

Expand Down Expand Up @@ -44,6 +43,32 @@ func TestBalancesGet(t *testing.T) {
})
require.NoError(t, err)

t.Run("get balances of not existing account should create an empty row", func(t *testing.T) {
t.Parallel()

balances, err := store.GetBalances(ctx, ledgercontroller.BalanceQuery{
"orders:1234": []string{"USD"},
})
require.NoError(t, err)
require.Len(t, balances, 1)
require.NotNil(t, balances["orders:1234"])
require.Len(t, balances["orders:1234"], 1)
require.Equal(t, big.NewInt(0), balances["orders:1234"]["USD"])

volumes := make([]*ledger.AccountsVolumes, 0)

err = store.GetDB().NewSelect().
Model(&volumes).
ModelTableExpr(store.GetPrefixedRelationName("accounts_volumes")).
Where("accounts_address = ?", "orders:1234").
Scan(ctx)
require.NoError(t, err)
require.Len(t, volumes, 1)
require.Equal(t, "USD", volumes[0].Asset)
require.Equal(t, big.NewInt(0), volumes[0].Input)
require.Equal(t, big.NewInt(0), volumes[0].Output)
})

t.Run("check concurrent access on same balance", func(t *testing.T) {
t.Parallel()

Expand Down Expand Up @@ -128,56 +153,6 @@ func TestBalancesGet(t *testing.T) {
require.NoError(t, err)
require.Equal(t, 2, count)
})

t.Run("with balance from move", func(t *testing.T) {
t.Parallel()

tx := ledger.NewTransaction().WithPostings(
ledger.NewPosting("world", "bank", "USD", big.NewInt(100)),
)
err := store.InsertTransaction(ctx, &tx)
require.NoError(t, err)

bankAccount := ledger.Account{
Address: "bank",
FirstUsage: tx.InsertedAt,
InsertionDate: tx.InsertedAt,
UpdatedAt: tx.InsertedAt,
}
_, err = store.UpsertAccount(ctx, &bankAccount)
require.NoError(t, err)

err = store.InsertMoves(ctx, &ledger.Move{
TransactionID: tx.ID,
IsSource: false,
Account: "bank",
Amount: (*bunpaginate.BigInt)(big.NewInt(100)),
Asset: "USD",
InsertionDate: tx.InsertedAt,
EffectiveDate: tx.InsertedAt,
PostCommitVolumes: pointer.For(ledger.NewVolumesInt64(100, 0)),
})
require.NoError(t, err)

balances, err := store.GetBalances(ctx, ledgercontroller.BalanceQuery{
"bank": {"USD"},
})
require.NoError(t, err)

require.NotNil(t, balances["bank"])
RequireEqual(t, big.NewInt(100), balances["bank"]["USD"])

// Check a new line has been inserted into accounts_volumes table
volumes := &ledger.AccountsVolumes{}
err = store.GetDB().NewSelect().
ModelTableExpr(store.GetPrefixedRelationName("accounts_volumes")).
Where("accounts_address = ?", "bank").
Scan(ctx, volumes)
require.NoError(t, err)

RequireEqual(t, big.NewInt(100), volumes.Input)
RequireEqual(t, big.NewInt(0), volumes.Output)
})
}

func TestBalancesAggregates(t *testing.T) {
Expand Down

0 comments on commit e54fe29

Please sign in to comment.