From 8ab28a1fde5ee6bd649bec5a20e7921b6c5ee6c2 Mon Sep 17 00:00:00 2001 From: Geoffrey Ragot Date: Wed, 25 Sep 2024 12:08:24 +0200 Subject: [PATCH] feat: introduce acounts_volumes table --- .../bucket/migrations/11-stateless.sql | 26 +++--- internal/storage/ledger/balances.go | 41 +--------- internal/storage/ledger/balances_test.go | 81 ++++++++++++++----- internal/storage/ledger/moves.go | 39 ++++++--- internal/storage/ledger/transactions.go | 2 +- internal/storage/ledger/transactions_test.go | 19 ++--- internal/storage/ledger/volumes.go | 43 ++++++++++ internal/volumes.go | 4 +- 8 files changed, 166 insertions(+), 89 deletions(-) diff --git a/internal/storage/bucket/migrations/11-stateless.sql b/internal/storage/bucket/migrations/11-stateless.sql index bbc20b395..8c61b10cd 100644 --- a/internal/storage/bucket/migrations/11-stateless.sql +++ b/internal/storage/bucket/migrations/11-stateless.sql @@ -30,24 +30,32 @@ alter table "{{.Bucket}}".logs alter column data type json; -create table "{{.Bucket}}".balances ( - ledger varchar, - account varchar, - asset varchar, - balance numeric, +create table "{{.Bucket}}".accounts_volumes ( + ledger varchar not null, + accounts_seq int not null, + account varchar not null, + asset varchar not null, + inputs numeric not null, + outputs numeric not null, primary key (ledger, account, asset) ); -insert into "{{.Bucket}}".balances -select distinct on (ledger, account_address, asset) +create view "{{.Bucket}}".balances as +select ledger, accounts_seq, account, asset, inputs - outputs as balance +from "{{.Bucket}}".accounts_volumes; + +insert into "{{.Bucket}}".accounts_volumes (ledger, accounts_seq, account, asset, inputs, outputs) +select distinct on (ledger, accounts_seq, account_address, asset) ledger, + accounts_seq, account_address as account, asset, - (moves.post_commit_volumes).inputs - (moves.post_commit_volumes).outputs as balance + (moves.post_commit_volumes).inputs as inputs, + (moves.post_commit_volumes).outputs as outputs from ( select * - from moves + from "{{.Bucket}}".moves order by seq desc ) moves; diff --git a/internal/storage/ledger/balances.go b/internal/storage/ledger/balances.go index 152f43120..393eee94f 100644 --- a/internal/storage/ledger/balances.go +++ b/internal/storage/ledger/balances.go @@ -8,7 +8,6 @@ import ( "github.com/formancehq/ledger/internal/tracing" - "github.com/formancehq/go-libs/platform/postgres" "github.com/formancehq/go-libs/time" ledgercontroller "github.com/formancehq/ledger/internal/controller/ledger" "github.com/pkg/errors" @@ -19,7 +18,7 @@ import ( ) type Balances struct { - bun.BaseModel `bun:"balances"` + bun.BaseModel `bun:"accounts_volumes"` Ledger string `bun:"ledger,type:varchar"` Account string `bun:"account,type:varchar"` @@ -192,41 +191,3 @@ func (s *Store) GetBalances(ctx context.Context, query ledgercontroller.BalanceQ return ret, nil }) } - -func (s *Store) updateBalances(ctx context.Context, diff map[string]map[string]*big.Int) (map[string]map[string]*big.Int, error) { - return tracing.TraceWithLatency(ctx, "UpdateBalances", func(ctx context.Context) (map[string]map[string]*big.Int, error) { - - balances := make([]Balances, 0) - for account, forAccount := range diff { - for asset, amount := range forAccount { - balances = append(balances, Balances{ - Ledger: s.ledger.Name, - Account: account, - Asset: asset, - Balance: amount, - }) - } - } - - _, err := s.db.NewInsert(). - Model(&balances). - ModelTableExpr(s.GetPrefixedRelationName("balances")). - On("conflict (ledger, account, asset) do update"). - Set("balance = balances.balance + excluded.balance"). - Returning("balance"). - Exec(ctx) - if err != nil { - return nil, postgres.ResolveError(err) - } - - ret := make(map[string]map[string]*big.Int) - for _, balance := range balances { - if _, ok := ret[balance.Account]; !ok { - ret[balance.Account] = map[string]*big.Int{} - } - ret[balance.Account][balance.Asset] = balance.Balance - } - - return ret, err - }) -} diff --git a/internal/storage/ledger/balances_test.go b/internal/storage/ledger/balances_test.go index 505a1ef2d..f013227a0 100644 --- a/internal/storage/ledger/balances_test.go +++ b/internal/storage/ledger/balances_test.go @@ -24,10 +24,24 @@ func TestGetBalances(t *testing.T) { store := newLedgerStore(t) ctx := logging.TestingContext() - _, err := store.updateBalances(ctx, map[string]map[string]*big.Int{ - "world": { - "USD": big.NewInt(100), - }, + world := &Account{ + Ledger: store.ledger.Name, + Address: "world", + AddressArray: []string{"world"}, + InsertionDate: time.Now(), + UpdatedAt: time.Now(), + FirstUsage: time.Now(), + } + _, err := store.upsertAccount(ctx, world) + require.NoError(t, err) + + _, err = store.updateVolumes(ctx, AccountsVolumes{ + Ledger: store.ledger.Name, + Account: "world", + Asset: "USD", + Inputs: new(big.Int), + Outputs: big.NewInt(100), + AccountsSeq: world.Seq, }) require.NoError(t, err) @@ -91,7 +105,7 @@ func TestGetBalances(t *testing.T) { require.NotNil(t, balances["world"]) require.NotNil(t, balances["not-existing"]) - require.Equal(t, big.NewInt(100), balances["world"]["USD"]) + require.Equal(t, big.NewInt(-100), balances["world"]["USD"]) require.Equal(t, big.NewInt(0), balances["not-existing"]["USD"]) }) } @@ -247,27 +261,58 @@ func TestUpdateBalances(t *testing.T) { store := newLedgerStore(t) ctx := logging.TestingContext() - balances, err := store.updateBalances(ctx, map[string]map[string]*big.Int{ - "world": { - "USD/2": big.NewInt(-100), - }, + world := &Account{ + Ledger: store.ledger.Name, + Address: "world", + AddressArray: []string{"world"}, + InsertionDate: time.Now(), + UpdatedAt: time.Now(), + FirstUsage: time.Now(), + } + _, err := store.upsertAccount(ctx, world) + require.NoError(t, err) + + volumes, err := store.updateVolumes(ctx, AccountsVolumes{ + Ledger: store.ledger.Name, + Account: "world", + Asset: "USD/2", + Inputs: big.NewInt(0), + Outputs: big.NewInt(100), + AccountsSeq: world.Seq, }) require.NoError(t, err) - require.Equal(t, map[string]map[string]*big.Int{ + require.Equal(t, map[string]map[string]ledger.Volumes{ "world": { - "USD/2": big.NewInt(-100), + "USD/2": ledger.NewVolumesInt64(0, 100), }, - }, balances) - - balances, err = store.updateBalances(ctx, map[string]map[string]*big.Int{ + }, volumes) + + volumes, err = store.updateVolumes(ctx, AccountsVolumes{ + Ledger: store.ledger.Name, + Account: "world", + Asset: "USD/2", + Inputs: big.NewInt(50), + Outputs: big.NewInt(0), + }) + require.NoError(t, err) + require.Equal(t, map[string]map[string]ledger.Volumes{ "world": { - "USD/2": big.NewInt(50), + "USD/2": ledger.NewVolumesInt64(50, 100), }, + }, volumes) + + volumes, err = store.updateVolumes(ctx, AccountsVolumes{ + Ledger: store.ledger.Name, + Account: "world", + Asset: "USD/2", + Inputs: big.NewInt(50), + Outputs: big.NewInt(50), + AccountsSeq: world.Seq, }) require.NoError(t, err) - require.Equal(t, map[string]map[string]*big.Int{ + require.Equal(t, map[string]map[string]ledger.Volumes{ "world": { - "USD/2": big.NewInt(-50), + "USD/2": ledger.NewVolumesInt64(100, 150), }, - }, balances) + }, volumes) } diff --git a/internal/storage/ledger/moves.go b/internal/storage/ledger/moves.go index 67fdc7dd6..d5e16ba4f 100644 --- a/internal/storage/ledger/moves.go +++ b/internal/storage/ledger/moves.go @@ -2,6 +2,7 @@ package ledger import ( "context" + ledger "github.com/formancehq/ledger/internal" "math/big" "github.com/formancehq/go-libs/bun/bunpaginate" @@ -90,20 +91,38 @@ type Move struct { type Moves []*Move -func (m Moves) BalanceUpdates() map[string]map[string]*big.Int { - ret := make(map[string]map[string]*big.Int) +func (m Moves) volumeUpdates() []AccountsVolumes { + + aggregatedVolumes := make(map[string]map[string][]*Move) for _, move := range m { - if _, ok := ret[move.Account]; !ok { - ret[move.Account] = make(map[string]*big.Int) + if _, ok := aggregatedVolumes[move.Account]; !ok { + aggregatedVolumes[move.Account] = make(map[string][]*Move) } - if _, ok := ret[move.Account][move.Asset]; !ok { - ret[move.Account][move.Asset] = big.NewInt(0) + if _, ok := aggregatedVolumes[move.Account][move.Asset]; !ok { + aggregatedVolumes[move.Account][move.Asset] = append(aggregatedVolumes[move.Account][move.Asset], move) } - amount := big.NewInt(0).Set((*big.Int)(move.Amount)) - if move.IsSource { - amount = big.NewInt(0).Neg(amount) + } + + ret := make([]AccountsVolumes, 0) + for account, movesByAsset := range aggregatedVolumes { + for asset, moves := range movesByAsset { + volumes := ledger.NewEmptyVolumes() + for _, move := range moves { + if move.IsSource { + volumes.Outputs.Add(volumes.Outputs, (*big.Int)(move.Amount)) + } else { + volumes.Inputs.Add(volumes.Inputs, (*big.Int)(move.Amount)) + } + } + ret = append(ret, AccountsVolumes{ + Ledger: moves[0].Ledger, + Account: account, + Asset: asset, + Inputs: volumes.Inputs, + Outputs: volumes.Outputs, + AccountsSeq: moves[0].AccountSeq, + }) } - ret[move.Account][move.Asset] = ret[move.Account][move.Asset].Add(ret[move.Account][move.Asset], amount) } return ret diff --git a/internal/storage/ledger/transactions.go b/internal/storage/ledger/transactions.go index fad0d2600..c1b9c6b1b 100644 --- a/internal/storage/ledger/transactions.go +++ b/internal/storage/ledger/transactions.go @@ -393,7 +393,7 @@ func (s *Store) CommitTransaction(ctx context.Context, tx *ledger.Transaction) e return errors.Wrap(err, "failed to insert moves") } - _, err = s.updateBalances(ctx, moves.BalanceUpdates()) + _, err = s.updateVolumes(ctx, moves.volumeUpdates()...) if err != nil { return errors.Wrap(err, "failed to update balances") } diff --git a/internal/storage/ledger/transactions_test.go b/internal/storage/ledger/transactions_test.go index eef00269c..9b8b61eab 100644 --- a/internal/storage/ledger/transactions_test.go +++ b/internal/storage/ledger/transactions_test.go @@ -252,18 +252,19 @@ func TestTransactionsCommit(t *testing.T) { }) require.NoError(t, err) - _, err = store.upsertAccount(ctx, &Account{ + account2 := &Account{ Address: "account:2", - }) + } + _, err = store.upsertAccount(ctx, account2) require.NoError(t, err) - _, err = store.updateBalances(ctx, map[string]map[string]*big.Int{ - "account:1": { - "USD": big.NewInt(100), - }, - "account:2": { - "USD": big.NewInt(100), - }, + _, err = store.updateVolumes(ctx, AccountsVolumes{ + Ledger: store.ledger.Name, + Account: "account:1", + Asset: "USD", + Inputs: big.NewInt(100), + Outputs: big.NewInt(0), + AccountsSeq: account2.Seq, }) require.NoError(t, err) diff --git a/internal/storage/ledger/volumes.go b/internal/storage/ledger/volumes.go index 3e0078356..4dbdb2b96 100644 --- a/internal/storage/ledger/volumes.go +++ b/internal/storage/ledger/volumes.go @@ -3,6 +3,8 @@ package ledger import ( "context" "fmt" + "github.com/formancehq/go-libs/platform/postgres" + "math/big" "github.com/formancehq/ledger/internal/tracing" @@ -14,6 +16,47 @@ import ( "github.com/uptrace/bun" ) +type AccountsVolumes struct { + bun.BaseModel `bun:"accounts_volumes"` + + Ledger string `bun:"ledger,type:varchar"` + Account string `bun:"account,type:varchar"` + Asset string `bun:"asset,type:varchar"` + Inputs *big.Int `bun:"inputs,type:numeric"` + Outputs *big.Int `bun:"outputs,type:numeric"` + AccountsSeq int `bun:"accounts_seq,type:int"` +} + +func (s *Store) updateVolumes(ctx context.Context, accountVolumes ...AccountsVolumes) (map[string]map[string]ledger.Volumes, error) { + return tracing.TraceWithLatency(ctx, "UpdateBalances", func(ctx context.Context) (map[string]map[string]ledger.Volumes, error) { + + _, err := s.db.NewInsert(). + Model(&accountVolumes). + ModelTableExpr(s.GetPrefixedRelationName("accounts_volumes")). + On("conflict (ledger, account, asset) do update"). + Set("inputs = accounts_volumes.inputs + excluded.inputs"). + Set("outputs = accounts_volumes.outputs + excluded.outputs"). + Returning("inputs, outputs"). + Exec(ctx) + if err != nil { + return nil, postgres.ResolveError(err) + } + + ret := make(map[string]map[string]ledger.Volumes) + for _, volumes := range accountVolumes { + if _, ok := ret[volumes.Account]; !ok { + ret[volumes.Account] = map[string]ledger.Volumes{} + } + ret[volumes.Account][volumes.Asset] = ledger.Volumes{ + Inputs: volumes.Inputs, + Outputs: volumes.Outputs, + } + } + + return ret, err + }) +} + func (s *Store) selectVolumes(oot, pit *time.Time, useInsertionDate bool, groupLevel int, q lquery.Builder) *bun.SelectQuery { ret := s.db.NewSelect(). diff --git a/internal/volumes.go b/internal/volumes.go index fd2229a2a..dce83acf7 100644 --- a/internal/volumes.go +++ b/internal/volumes.go @@ -6,8 +6,8 @@ import ( ) type Volumes struct { - Inputs *big.Int `json:"input"` - Outputs *big.Int `json:"output"` + Inputs *big.Int `bun:"inputs" json:"input"` + Outputs *big.Int `bun:"outputs" json:"output"` } func (v Volumes) CopyWithZerosIfNeeded() Volumes {