Skip to content

Commit

Permalink
feat: introduce acounts_volumes table
Browse files Browse the repository at this point in the history
  • Loading branch information
gfyrag committed Oct 23, 2024
1 parent a856fc4 commit 793e791
Show file tree
Hide file tree
Showing 8 changed files with 166 additions and 89 deletions.
26 changes: 17 additions & 9 deletions internal/storage/bucket/migrations/11-stateless.sql
Original file line number Diff line number Diff line change
Expand Up @@ -30,24 +30,32 @@ alter table "{{.Bucket}}".logs
alter column data
type json;

create table "{{.Bucket}}".balances (
ledger varchar,
account varchar,
asset varchar,
balance numeric,
create table "{{.Bucket}}".accounts_volumes (
ledger varchar not null,
accounts_seq int not null,
account varchar not null,
asset varchar not null,
inputs numeric not null,
outputs numeric not null,

primary key (ledger, account, asset)
);

insert into "{{.Bucket}}".balances
select distinct on (ledger, account_address, asset)
create view "{{.Bucket}}".balances as
select ledger, accounts_seq, account, asset, inputs - outputs as balance
from "{{.Bucket}}".accounts_volumes;

insert into "{{.Bucket}}".accounts_volumes (ledger, accounts_seq, account, asset, inputs, outputs)
select distinct on (ledger, accounts_seq, account_address, asset)
ledger,
accounts_seq,
account_address as account,
asset,
(moves.post_commit_volumes).inputs - (moves.post_commit_volumes).outputs as balance
(moves.post_commit_volumes).inputs as inputs,
(moves.post_commit_volumes).outputs as outputs
from (
select *
from moves
from "{{.Bucket}}".moves
order by seq desc
) moves;

Expand Down
41 changes: 1 addition & 40 deletions internal/storage/ledger/balances.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (

"github.com/formancehq/ledger/internal/tracing"

"github.com/formancehq/go-libs/platform/postgres"
"github.com/formancehq/go-libs/time"
ledgercontroller "github.com/formancehq/ledger/internal/controller/ledger"
"github.com/pkg/errors"
Expand All @@ -19,7 +18,7 @@ import (
)

type Balances struct {
bun.BaseModel `bun:"balances"`
bun.BaseModel `bun:"accounts_volumes"`

Ledger string `bun:"ledger,type:varchar"`
Account string `bun:"account,type:varchar"`
Expand Down Expand Up @@ -192,41 +191,3 @@ func (s *Store) GetBalances(ctx context.Context, query ledgercontroller.BalanceQ
return ret, nil
})
}

func (s *Store) updateBalances(ctx context.Context, diff map[string]map[string]*big.Int) (map[string]map[string]*big.Int, error) {
return tracing.TraceWithLatency(ctx, "UpdateBalances", func(ctx context.Context) (map[string]map[string]*big.Int, error) {

balances := make([]Balances, 0)
for account, forAccount := range diff {
for asset, amount := range forAccount {
balances = append(balances, Balances{
Ledger: s.ledger.Name,
Account: account,
Asset: asset,
Balance: amount,
})
}
}

_, err := s.db.NewInsert().
Model(&balances).
ModelTableExpr(s.GetPrefixedRelationName("balances")).
On("conflict (ledger, account, asset) do update").
Set("balance = balances.balance + excluded.balance").
Returning("balance").
Exec(ctx)
if err != nil {
return nil, postgres.ResolveError(err)
}

ret := make(map[string]map[string]*big.Int)
for _, balance := range balances {
if _, ok := ret[balance.Account]; !ok {
ret[balance.Account] = map[string]*big.Int{}
}
ret[balance.Account][balance.Asset] = balance.Balance
}

return ret, err
})
}
81 changes: 63 additions & 18 deletions internal/storage/ledger/balances_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,24 @@ func TestGetBalances(t *testing.T) {
store := newLedgerStore(t)
ctx := logging.TestingContext()

_, err := store.updateBalances(ctx, map[string]map[string]*big.Int{
"world": {
"USD": big.NewInt(100),
},
world := &Account{
Ledger: store.ledger.Name,
Address: "world",
AddressArray: []string{"world"},
InsertionDate: time.Now(),
UpdatedAt: time.Now(),
FirstUsage: time.Now(),
}
_, err := store.upsertAccount(ctx, world)
require.NoError(t, err)

_, err = store.updateVolumes(ctx, AccountsVolumes{
Ledger: store.ledger.Name,
Account: "world",
Asset: "USD",
Inputs: new(big.Int),
Outputs: big.NewInt(100),
AccountsSeq: world.Seq,
})
require.NoError(t, err)

Expand Down Expand Up @@ -91,7 +105,7 @@ func TestGetBalances(t *testing.T) {
require.NotNil(t, balances["world"])
require.NotNil(t, balances["not-existing"])

require.Equal(t, big.NewInt(100), balances["world"]["USD"])
require.Equal(t, big.NewInt(-100), balances["world"]["USD"])
require.Equal(t, big.NewInt(0), balances["not-existing"]["USD"])
})
}
Expand Down Expand Up @@ -247,27 +261,58 @@ func TestUpdateBalances(t *testing.T) {
store := newLedgerStore(t)
ctx := logging.TestingContext()

balances, err := store.updateBalances(ctx, map[string]map[string]*big.Int{
"world": {
"USD/2": big.NewInt(-100),
},
world := &Account{
Ledger: store.ledger.Name,
Address: "world",
AddressArray: []string{"world"},
InsertionDate: time.Now(),
UpdatedAt: time.Now(),
FirstUsage: time.Now(),
}
_, err := store.upsertAccount(ctx, world)
require.NoError(t, err)

volumes, err := store.updateVolumes(ctx, AccountsVolumes{
Ledger: store.ledger.Name,
Account: "world",
Asset: "USD/2",
Inputs: big.NewInt(0),
Outputs: big.NewInt(100),
AccountsSeq: world.Seq,
})
require.NoError(t, err)
require.Equal(t, map[string]map[string]*big.Int{
require.Equal(t, map[string]map[string]ledger.Volumes{
"world": {
"USD/2": big.NewInt(-100),
"USD/2": ledger.NewVolumesInt64(0, 100),
},
}, balances)

balances, err = store.updateBalances(ctx, map[string]map[string]*big.Int{
}, volumes)

volumes, err = store.updateVolumes(ctx, AccountsVolumes{
Ledger: store.ledger.Name,
Account: "world",
Asset: "USD/2",
Inputs: big.NewInt(50),
Outputs: big.NewInt(0),
})
require.NoError(t, err)
require.Equal(t, map[string]map[string]ledger.Volumes{
"world": {
"USD/2": big.NewInt(50),
"USD/2": ledger.NewVolumesInt64(50, 100),
},
}, volumes)

volumes, err = store.updateVolumes(ctx, AccountsVolumes{
Ledger: store.ledger.Name,
Account: "world",
Asset: "USD/2",
Inputs: big.NewInt(50),
Outputs: big.NewInt(50),
AccountsSeq: world.Seq,
})
require.NoError(t, err)
require.Equal(t, map[string]map[string]*big.Int{
require.Equal(t, map[string]map[string]ledger.Volumes{
"world": {
"USD/2": big.NewInt(-50),
"USD/2": ledger.NewVolumesInt64(100, 150),
},
}, balances)
}, volumes)
}
39 changes: 29 additions & 10 deletions internal/storage/ledger/moves.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package ledger

import (
"context"
ledger "github.com/formancehq/ledger/internal"
"math/big"

"github.com/formancehq/go-libs/bun/bunpaginate"
Expand Down Expand Up @@ -90,20 +91,38 @@ type Move struct {

type Moves []*Move

func (m Moves) BalanceUpdates() map[string]map[string]*big.Int {
ret := make(map[string]map[string]*big.Int)
func (m Moves) volumeUpdates() []AccountsVolumes {

aggregatedVolumes := make(map[string]map[string][]*Move)
for _, move := range m {
if _, ok := ret[move.Account]; !ok {
ret[move.Account] = make(map[string]*big.Int)
if _, ok := aggregatedVolumes[move.Account]; !ok {
aggregatedVolumes[move.Account] = make(map[string][]*Move)
}
if _, ok := ret[move.Account][move.Asset]; !ok {
ret[move.Account][move.Asset] = big.NewInt(0)
if _, ok := aggregatedVolumes[move.Account][move.Asset]; !ok {
aggregatedVolumes[move.Account][move.Asset] = append(aggregatedVolumes[move.Account][move.Asset], move)
}
amount := big.NewInt(0).Set((*big.Int)(move.Amount))
if move.IsSource {
amount = big.NewInt(0).Neg(amount)
}

ret := make([]AccountsVolumes, 0)
for account, movesByAsset := range aggregatedVolumes {
for asset, moves := range movesByAsset {
volumes := ledger.NewEmptyVolumes()
for _, move := range moves {
if move.IsSource {
volumes.Outputs.Add(volumes.Outputs, (*big.Int)(move.Amount))
} else {
volumes.Inputs.Add(volumes.Inputs, (*big.Int)(move.Amount))
}
}
ret = append(ret, AccountsVolumes{
Ledger: moves[0].Ledger,
Account: account,
Asset: asset,
Inputs: volumes.Inputs,
Outputs: volumes.Outputs,
AccountsSeq: moves[0].AccountSeq,
})
}
ret[move.Account][move.Asset] = ret[move.Account][move.Asset].Add(ret[move.Account][move.Asset], amount)
}

return ret
Expand Down
2 changes: 1 addition & 1 deletion internal/storage/ledger/transactions.go
Original file line number Diff line number Diff line change
Expand Up @@ -393,7 +393,7 @@ func (s *Store) CommitTransaction(ctx context.Context, tx *ledger.Transaction) e
return errors.Wrap(err, "failed to insert moves")
}

_, err = s.updateBalances(ctx, moves.BalanceUpdates())
_, err = s.updateVolumes(ctx, moves.volumeUpdates()...)
if err != nil {
return errors.Wrap(err, "failed to update balances")
}
Expand Down
19 changes: 10 additions & 9 deletions internal/storage/ledger/transactions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,18 +252,19 @@ func TestTransactionsCommit(t *testing.T) {
})
require.NoError(t, err)

_, err = store.upsertAccount(ctx, &Account{
account2 := &Account{
Address: "account:2",
})
}
_, err = store.upsertAccount(ctx, account2)
require.NoError(t, err)

_, err = store.updateBalances(ctx, map[string]map[string]*big.Int{
"account:1": {
"USD": big.NewInt(100),
},
"account:2": {
"USD": big.NewInt(100),
},
_, err = store.updateVolumes(ctx, AccountsVolumes{
Ledger: store.ledger.Name,
Account: "account:1",
Asset: "USD",
Inputs: big.NewInt(100),
Outputs: big.NewInt(0),
AccountsSeq: account2.Seq,
})
require.NoError(t, err)

Expand Down
43 changes: 43 additions & 0 deletions internal/storage/ledger/volumes.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package ledger
import (
"context"
"fmt"
"github.com/formancehq/go-libs/platform/postgres"
"math/big"

"github.com/formancehq/ledger/internal/tracing"

Expand All @@ -14,6 +16,47 @@ import (
"github.com/uptrace/bun"
)

type AccountsVolumes struct {
bun.BaseModel `bun:"accounts_volumes"`

Ledger string `bun:"ledger,type:varchar"`
Account string `bun:"account,type:varchar"`
Asset string `bun:"asset,type:varchar"`
Inputs *big.Int `bun:"inputs,type:numeric"`
Outputs *big.Int `bun:"outputs,type:numeric"`
AccountsSeq int `bun:"accounts_seq,type:int"`
}

func (s *Store) updateVolumes(ctx context.Context, accountVolumes ...AccountsVolumes) (map[string]map[string]ledger.Volumes, error) {
return tracing.TraceWithLatency(ctx, "UpdateBalances", func(ctx context.Context) (map[string]map[string]ledger.Volumes, error) {

_, err := s.db.NewInsert().
Model(&accountVolumes).
ModelTableExpr(s.GetPrefixedRelationName("accounts_volumes")).
On("conflict (ledger, account, asset) do update").
Set("inputs = accounts_volumes.inputs + excluded.inputs").
Set("outputs = accounts_volumes.outputs + excluded.outputs").
Returning("inputs, outputs").
Exec(ctx)
if err != nil {
return nil, postgres.ResolveError(err)
}

ret := make(map[string]map[string]ledger.Volumes)
for _, volumes := range accountVolumes {
if _, ok := ret[volumes.Account]; !ok {
ret[volumes.Account] = map[string]ledger.Volumes{}
}
ret[volumes.Account][volumes.Asset] = ledger.Volumes{
Inputs: volumes.Inputs,
Outputs: volumes.Outputs,
}
}

return ret, err
})
}

func (s *Store) selectVolumes(oot, pit *time.Time, useInsertionDate bool, groupLevel int, q lquery.Builder) *bun.SelectQuery {

ret := s.db.NewSelect().
Expand Down
4 changes: 2 additions & 2 deletions internal/volumes.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ import (
)

type Volumes struct {
Inputs *big.Int `json:"input"`
Outputs *big.Int `json:"output"`
Inputs *big.Int `bun:"inputs" json:"input"`
Outputs *big.Int `bun:"outputs" json:"output"`
}

func (v Volumes) CopyWithZerosIfNeeded() Volumes {
Expand Down

0 comments on commit 793e791

Please sign in to comment.