Skip to content

Commit

Permalink
feat: make balance aggregation use accounts_volumes table if pit is n…
Browse files Browse the repository at this point in the history
…ot defined
  • Loading branch information
gfyrag committed Oct 16, 2024
1 parent 8ab28a1 commit d821df5
Show file tree
Hide file tree
Showing 8 changed files with 198 additions and 106 deletions.
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ require (
github.com/onsi/gomega v1.34.2
github.com/pborman/uuid v1.2.1
github.com/pkg/errors v0.9.1
github.com/shomali11/xsql v0.0.0-20190608141458-bf76292144df
github.com/spf13/cobra v1.8.1
github.com/spf13/pflag v1.0.5
github.com/stretchr/testify v1.9.0
Expand Down Expand Up @@ -134,6 +135,7 @@ require (
github.com/rs/cors v1.11.1 // indirect
github.com/shirou/gopsutil/v4 v4.24.8 // indirect
github.com/shoenig/go-m1cpu v0.1.6 // indirect
github.com/shomali11/util v0.0.0-20180607005212-e0f70fd665ff // indirect
github.com/sirupsen/logrus v1.9.3 // indirect
github.com/tklauser/go-sysconf v0.3.14 // indirect
github.com/tklauser/numcpus v0.8.0 // indirect
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,10 @@ github.com/shoenig/go-m1cpu v0.1.6 h1:nxdKQNcEB6vzgA2E2bvzKIYRuNj7XNJ4S/aRSwKzFt
github.com/shoenig/go-m1cpu v0.1.6/go.mod h1:1JJMcUBvfNwpq05QDQVAnx3gUHr9IYF7GNg9SUEw2VQ=
github.com/shoenig/test v0.6.4 h1:kVTaSd7WLz5WZ2IaoM0RSzRsUD+m8wRR+5qvntpn4LU=
github.com/shoenig/test v0.6.4/go.mod h1:byHiCGXqrVaflBLAMq/srcZIHynQPQgeyvkvXnjqq0k=
github.com/shomali11/util v0.0.0-20180607005212-e0f70fd665ff h1:A47HTOEURe8GFXu/9ztnUzVgBBo0NlWoKmVPmfJ4LR8=
github.com/shomali11/util v0.0.0-20180607005212-e0f70fd665ff/go.mod h1:WWE2GJM9B5UpdOiwH2val10w/pvJ2cUUQOOA/4LgOng=
github.com/shomali11/xsql v0.0.0-20190608141458-bf76292144df h1:SVCDTuzM3KEk8WBwSSw7RTPLw9ajzBaXDg39Bo6xIeU=
github.com/shomali11/xsql v0.0.0-20190608141458-bf76292144df/go.mod h1:K8jR5lDI2MGs9Ky+X2jIF4MwIslI0L8o8ijIlEq7/Vw=
github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ=
github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ=
github.com/spf13/cobra v1.8.1 h1:e5/vxKd/rZsfSJMUX1agtjeTDf+qv1/JdBF8gg5k9ZM=
Expand Down
164 changes: 120 additions & 44 deletions internal/storage/ledger/balances.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,16 @@ package ledger

import (
"context"
"fmt"
"github.com/pkg/errors"
"math/big"
"strings"

"github.com/formancehq/ledger/internal/tracing"

"github.com/formancehq/go-libs/time"
ledgercontroller "github.com/formancehq/ledger/internal/controller/ledger"
"github.com/pkg/errors"

"github.com/formancehq/go-libs/query"
"github.com/formancehq/go-libs/time"
ledger "github.com/formancehq/ledger/internal"
ledgercontroller "github.com/formancehq/ledger/internal/controller/ledger"
"github.com/uptrace/bun"
)

Expand All @@ -26,16 +24,25 @@ type Balances struct {
Balance *big.Int `bun:"balance,type:numeric"`
}

func (s *Store) SelectAggregatedBalances(date *time.Time, useInsertionDate bool, builder query.Builder) *bun.SelectQuery {
func (s *Store) selectAccountWithVolumes(date *time.Time, useInsertionDate bool, builder query.Builder) *bun.SelectQuery {

ret := s.db.NewSelect()
var needMetadata bool
var (
needMetadata bool
needAddressSegment bool
)

if builder != nil {
if err := builder.Walk(func(operator string, key string, value any) error {
switch {
case key == "address":
return s.validateAddressFilter(operator, value)
if err := s.validateAddressFilter(operator, value); err != nil {
return err
}
if !needAddressSegment {
needAddressSegment = isSegmentedAddress(value.(string)) // cast is safe, the type has been validated by validatedAddressFilter
}

case key == "metadata":
needMetadata = true
if operator != "$exists" {
Expand All @@ -55,39 +62,79 @@ func (s *Store) SelectAggregatedBalances(date *time.Time, useInsertionDate bool,
}
}

var selectMoves *bun.SelectQuery
if useInsertionDate {
if !s.ledger.HasFeature(ledger.FeaturePostCommitVolumes, "SYNC") {
return ret.Err(ledgercontroller.NewErrMissingFeature(ledger.FeaturePostCommitVolumes))
if needAddressSegment && !s.ledger.HasFeature(ledger.FeatureIndexAddressSegments, "ON") {
return ret.Err(ledgercontroller.NewErrMissingFeature(ledger.FeatureIndexAddressSegments))
}

var selectAccountsWithVolumes *bun.SelectQuery
if date != nil && !date.IsZero() {
if useInsertionDate {
if !s.ledger.HasFeature(ledger.FeaturePostCommitVolumes, "SYNC") {
return ret.Err(ledgercontroller.NewErrMissingFeature(ledger.FeaturePostCommitVolumes))
}
selectAccountsWithVolumes = s.db.NewSelect().
TableExpr("(?) moves", s.SelectDistinctMovesBySeq(date)).
Column("asset", "accounts_seq", "account_address", "account_address_array").
ColumnExpr("post_commit_volumes as volumes")
} else {
if !s.ledger.HasFeature(ledger.FeaturePostCommitEffectiveVolumes, "SYNC") {
return ret.Err(ledgercontroller.NewErrMissingFeature(ledger.FeaturePostCommitEffectiveVolumes))
}
selectAccountsWithVolumes = s.db.NewSelect().
TableExpr("(?) moves", s.SelectDistinctMovesByEffectiveDate(date)).
Column("asset", "accounts_seq", "account_address", "account_address_array").
ColumnExpr("moves.post_commit_effective_volumes as volumes")
}
selectMoves = s.db.NewSelect().
TableExpr("(?) moves", s.SelectDistinctMovesBySeq(date)).
Column("asset", "account_address", "account_address_array").
ColumnExpr("post_commit_volumes as volumes")
} else {
if !s.ledger.HasFeature(ledger.FeaturePostCommitEffectiveVolumes, "SYNC") {
return ret.Err(ledgercontroller.NewErrMissingFeature(ledger.FeaturePostCommitEffectiveVolumes))
}
selectMoves = s.db.NewSelect().
TableExpr("(?) moves", s.SelectDistinctMovesByEffectiveDate(date)).
ColumnExpr("moves.post_commit_effective_volumes as volumes").
Column("asset", "account_address", "account_address_array")
selectAccountsWithVolumes = s.db.NewSelect().
ModelTableExpr(s.GetPrefixedRelationName("accounts_volumes")).
Column("asset", "accounts_seq").
ColumnExpr("account as account_address").
ColumnExpr("(inputs, outputs)::"+s.GetPrefixedRelationName("volumes")+" as volumes").
Where("ledger = ?", s.ledger.Name)
}

selectAccountsWithVolumes = s.db.NewSelect().
ColumnExpr("*").
TableExpr("(?) accounts_volumes", selectAccountsWithVolumes)

if needMetadata {
if s.ledger.HasFeature(ledger.FeatureAccountMetadataHistories, "SYNC") && date != nil && !date.IsZero() {
selectMoves = selectMoves.
selectAccountsWithVolumes = selectAccountsWithVolumes.
Join(
`left join (?) accounts_metadata on accounts_metadata.accounts_seq = moves.accounts_seq`,
`left join (?) accounts_metadata on accounts_metadata.accounts_seq = accounts_volumes.accounts_seq`,
s.selectDistinctAccountMetadataHistories(date),
).
ColumnExpr("coalesce(accounts_metadata.metadata, '{}'::jsonb) as metadata")

if needAddressSegment {
selectAccountsWithVolumes = selectAccountsWithVolumes.
Join("join " + s.GetPrefixedRelationName("accounts") + " on accounts.seq = accounts_volumes.accounts_seq").
Column("accounts.address_array")
}
} else {
selectMoves = selectMoves.
selectAccountsWithVolumes = selectAccountsWithVolumes.
Join(
`join (?) accounts on accounts.seq = moves.accounts_seq`,
`join (?) accounts on accounts.seq = accounts_volumes.accounts_seq`,
s.db.NewSelect().ModelTableExpr(s.GetPrefixedRelationName("accounts")),
)

if needAddressSegment {
selectAccountsWithVolumes = selectAccountsWithVolumes.Column("accounts.address_array")
}
}
} else {
if needAddressSegment {
if date == nil || date.IsZero() { // account_address_array already resolved by moves if pit is specified
selectAccountsWithVolumes = s.db.NewSelect().
TableExpr(
"(?) accounts",
selectAccountsWithVolumes.
Join("join "+s.GetPrefixedRelationName("accounts")+" accounts on accounts.seq = accounts_volumes.accounts_seq").
ColumnExpr("accounts.address_array as account_address_array"),
).
ColumnExpr("*")
}
}
}

Expand All @@ -99,35 +146,29 @@ func (s *Store) SelectAggregatedBalances(date *time.Time, useInsertionDate bool,
case metadataRegex.Match([]byte(key)):
match := metadataRegex.FindAllStringSubmatch(key, 3)

key := "accounts.metadata"
if s.ledger.HasFeature(ledger.FeatureAccountMetadataHistories, "SYNC") && date != nil && !date.IsZero() {
key = "accounts_metadata.metadata"
}

return key + " @> ?", []any{map[string]any{
return "metadata @> ?", []any{map[string]any{
match[0][1]: value,
}}, nil

case key == "metadata":
key := "accounts.metadata"
if s.ledger.HasFeature(ledger.FeatureAccountMetadataHistories, "SYNC") && date != nil && !date.IsZero() {
key = "am.metadata"
}

return fmt.Sprintf("%s -> ? IS NOT NULL", key), []any{value}, nil
return "metadata -> ? is not null", []any{value}, nil
default:
return "", nil, ledgercontroller.NewErrInvalidQuery("unknown key '%s' when building query", key)
}
}))
if err != nil {
return ret.Err(errors.Wrap(err, "building where clause"))
}
selectMoves = selectMoves.Where(where, args...)
selectAccountsWithVolumes = selectAccountsWithVolumes.Where(where, args...)
}

return selectAccountsWithVolumes
}

func (s *Store) SelectAggregatedBalances(date *time.Time, useInsertionDate bool, builder query.Builder) *bun.SelectQuery {
return s.db.NewSelect().
ModelTableExpr("(?) moves", selectMoves).
ColumnExpr(`to_json(array_agg(json_build_object('asset', moves.asset, 'inputs', (moves.volumes).inputs, 'outputs', (moves.volumes).outputs))) as aggregated`)
ModelTableExpr("(?) accounts", s.selectAccountWithVolumes(date, useInsertionDate, builder)).
ColumnExpr(`to_json(array_agg(json_build_object('asset', accounts.asset, 'inputs', (accounts.volumes).inputs, 'outputs', (accounts.volumes).outputs))) as aggregated`)
}

func (s *Store) GetAggregatedBalances(ctx context.Context, q ledgercontroller.GetAggregatedBalanceQuery) (ledger.BalancesByAssets, error) {
Expand Down Expand Up @@ -158,9 +199,11 @@ func (s *Store) GetBalances(ctx context.Context, query ledgercontroller.BalanceQ
balances := make([]Balances, 0)
err := s.db.NewSelect().
Model(&balances).
ModelTableExpr(s.GetPrefixedRelationName("balances")).
ModelTableExpr(s.GetPrefixedRelationName("accounts_volumes")).
ColumnExpr("account, asset").
ColumnExpr("inputs - outputs as balance").
Where("("+strings.Join(conditions, ") OR (")+")", args...).
For("UPDATE").
For("update").
// notes(gfyrag): keep order, it ensures consistent locking order and limit deadlocks
Order("account", "asset").
Scan(ctx)
Expand Down Expand Up @@ -191,3 +234,36 @@ func (s *Store) GetBalances(ctx context.Context, query ledgercontroller.BalanceQ
return ret, nil
})
}

/**
SELECT *
FROM (
SELECT *, accounts.address_array AS account_address_array
FROM (
SELECT
"asset",
"accounts_seq",
"account_address",
"account_address_array",
post_commit_volumes AS volumes
FROM (
SELECT DISTINCT ON (accounts_seq, account_address, asset)
"accounts_seq",
"account_address",
"asset",
first_value(account_address_array) OVER (PARTITION BY (accounts_seq, account_address, asset) ORDER BY seq DESC) AS account_address_array,
first_value(post_commit_volumes) OVER (PARTITION BY (accounts_seq, account_address, asset) ORDER BY seq DESC) AS post_commit_volumes
FROM (
SELECT *
FROM "7c44551f".moves
WHERE (ledger = '7c44551f') AND (insertion_date <= '2024-09-25T12:01:13.895812Z')
ORDER BY "seq" DESC
) moves
WHERE (ledger = '7c44551f') AND (insertion_date <= '2024-09-25T12:01:13.895812Z')) moves
) accounts_volumes
JOIN "7c44551f".accounts accounts ON accounts.seq = accounts_volumes.accounts_seq
) accounts
WHERE (jsonb_array_length(account_address_array) = 2 AND account_address_array @@ ('$[0] == "users"')::jsonpath)
*/
68 changes: 34 additions & 34 deletions internal/storage/ledger/balances_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,28 +19,28 @@ import (
libtime "time"
)

func TestGetBalances(t *testing.T) {
func TestBalancesGet(t *testing.T) {
t.Parallel()
store := newLedgerStore(t)
ctx := logging.TestingContext()

world := &Account{
Ledger: store.ledger.Name,
Address: "world",
AddressArray: []string{"world"},
InsertionDate: time.Now(),
UpdatedAt: time.Now(),
FirstUsage: time.Now(),
Ledger: store.ledger.Name,
Address: "world",
AddressArray: []string{"world"},
InsertionDate: time.Now(),
UpdatedAt: time.Now(),
FirstUsage: time.Now(),
}
_, err := store.upsertAccount(ctx, world)
require.NoError(t, err)

_, err = store.updateVolumes(ctx, AccountsVolumes{
Ledger: store.ledger.Name,
Account: "world",
Asset: "USD",
Inputs: new(big.Int),
Outputs: big.NewInt(100),
Ledger: store.ledger.Name,
Account: "world",
Asset: "USD",
Inputs: new(big.Int),
Outputs: big.NewInt(100),
AccountsSeq: world.Seq,
})
require.NoError(t, err)
Expand Down Expand Up @@ -110,7 +110,7 @@ func TestGetBalances(t *testing.T) {
})
}

func TestGetBalancesAggregated(t *testing.T) {
func TestBalancesAggregates(t *testing.T) {
t.Parallel()

store := newLedgerStore(t)
Expand Down Expand Up @@ -262,22 +262,22 @@ func TestUpdateBalances(t *testing.T) {
ctx := logging.TestingContext()

world := &Account{
Ledger: store.ledger.Name,
Address: "world",
AddressArray: []string{"world"},
InsertionDate: time.Now(),
UpdatedAt: time.Now(),
FirstUsage: time.Now(),
Ledger: store.ledger.Name,
Address: "world",
AddressArray: []string{"world"},
InsertionDate: time.Now(),
UpdatedAt: time.Now(),
FirstUsage: time.Now(),
}
_, err := store.upsertAccount(ctx, world)
require.NoError(t, err)

volumes, err := store.updateVolumes(ctx, AccountsVolumes{
Ledger: store.ledger.Name,
Account: "world",
Asset: "USD/2",
Inputs: big.NewInt(0),
Outputs: big.NewInt(100),
Ledger: store.ledger.Name,
Account: "world",
Asset: "USD/2",
Inputs: big.NewInt(0),
Outputs: big.NewInt(100),
AccountsSeq: world.Seq,
})
require.NoError(t, err)
Expand All @@ -288,11 +288,11 @@ func TestUpdateBalances(t *testing.T) {
}, volumes)

volumes, err = store.updateVolumes(ctx, AccountsVolumes{
Ledger: store.ledger.Name,
Account: "world",
Asset: "USD/2",
Inputs: big.NewInt(50),
Outputs: big.NewInt(0),
Ledger: store.ledger.Name,
Account: "world",
Asset: "USD/2",
Inputs: big.NewInt(50),
Outputs: big.NewInt(0),
})
require.NoError(t, err)
require.Equal(t, map[string]map[string]ledger.Volumes{
Expand All @@ -302,11 +302,11 @@ func TestUpdateBalances(t *testing.T) {
}, volumes)

volumes, err = store.updateVolumes(ctx, AccountsVolumes{
Ledger: store.ledger.Name,
Account: "world",
Asset: "USD/2",
Inputs: big.NewInt(50),
Outputs: big.NewInt(50),
Ledger: store.ledger.Name,
Account: "world",
Asset: "USD/2",
Inputs: big.NewInt(50),
Outputs: big.NewInt(50),
AccountsSeq: world.Seq,
})
require.NoError(t, err)
Expand Down
Loading

0 comments on commit d821df5

Please sign in to comment.