From d821df52a4ebf12d5266928bc8536d75ade829ea Mon Sep 17 00:00:00 2001 From: Geoffrey Ragot Date: Wed, 25 Sep 2024 14:54:47 +0200 Subject: [PATCH] feat: make balance aggregation use accounts_volumes table if pit is not defined --- go.mod | 2 + go.sum | 4 + internal/storage/ledger/balances.go | 164 ++++++++++++++----- internal/storage/ledger/balances_test.go | 68 ++++---- internal/storage/ledger/moves.go | 14 +- internal/storage/ledger/transactions.go | 2 +- internal/storage/ledger/transactions_test.go | 16 +- internal/storage/ledger/volumes.go | 34 ++-- 8 files changed, 198 insertions(+), 106 deletions(-) diff --git a/go.mod b/go.mod index cc799c57f..e6d6b0d0f 100644 --- a/go.mod +++ b/go.mod @@ -27,6 +27,7 @@ require ( github.com/onsi/gomega v1.34.2 github.com/pborman/uuid v1.2.1 github.com/pkg/errors v0.9.1 + github.com/shomali11/xsql v0.0.0-20190608141458-bf76292144df github.com/spf13/cobra v1.8.1 github.com/spf13/pflag v1.0.5 github.com/stretchr/testify v1.9.0 @@ -134,6 +135,7 @@ require ( github.com/rs/cors v1.11.1 // indirect github.com/shirou/gopsutil/v4 v4.24.8 // indirect github.com/shoenig/go-m1cpu v0.1.6 // indirect + github.com/shomali11/util v0.0.0-20180607005212-e0f70fd665ff // indirect github.com/sirupsen/logrus v1.9.3 // indirect github.com/tklauser/go-sysconf v0.3.14 // indirect github.com/tklauser/numcpus v0.8.0 // indirect diff --git a/go.sum b/go.sum index 74e03e712..50d8d5cbc 100644 --- a/go.sum +++ b/go.sum @@ -270,6 +270,10 @@ github.com/shoenig/go-m1cpu v0.1.6 h1:nxdKQNcEB6vzgA2E2bvzKIYRuNj7XNJ4S/aRSwKzFt github.com/shoenig/go-m1cpu v0.1.6/go.mod h1:1JJMcUBvfNwpq05QDQVAnx3gUHr9IYF7GNg9SUEw2VQ= github.com/shoenig/test v0.6.4 h1:kVTaSd7WLz5WZ2IaoM0RSzRsUD+m8wRR+5qvntpn4LU= github.com/shoenig/test v0.6.4/go.mod h1:byHiCGXqrVaflBLAMq/srcZIHynQPQgeyvkvXnjqq0k= +github.com/shomali11/util v0.0.0-20180607005212-e0f70fd665ff h1:A47HTOEURe8GFXu/9ztnUzVgBBo0NlWoKmVPmfJ4LR8= +github.com/shomali11/util v0.0.0-20180607005212-e0f70fd665ff/go.mod h1:WWE2GJM9B5UpdOiwH2val10w/pvJ2cUUQOOA/4LgOng= +github.com/shomali11/xsql v0.0.0-20190608141458-bf76292144df h1:SVCDTuzM3KEk8WBwSSw7RTPLw9ajzBaXDg39Bo6xIeU= +github.com/shomali11/xsql v0.0.0-20190608141458-bf76292144df/go.mod h1:K8jR5lDI2MGs9Ky+X2jIF4MwIslI0L8o8ijIlEq7/Vw= github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ= github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ= github.com/spf13/cobra v1.8.1 h1:e5/vxKd/rZsfSJMUX1agtjeTDf+qv1/JdBF8gg5k9ZM= diff --git a/internal/storage/ledger/balances.go b/internal/storage/ledger/balances.go index 393eee94f..95f07d6d0 100644 --- a/internal/storage/ledger/balances.go +++ b/internal/storage/ledger/balances.go @@ -2,18 +2,16 @@ package ledger import ( "context" - "fmt" + "github.com/pkg/errors" "math/big" "strings" "github.com/formancehq/ledger/internal/tracing" - "github.com/formancehq/go-libs/time" - ledgercontroller "github.com/formancehq/ledger/internal/controller/ledger" - "github.com/pkg/errors" - "github.com/formancehq/go-libs/query" + "github.com/formancehq/go-libs/time" ledger "github.com/formancehq/ledger/internal" + ledgercontroller "github.com/formancehq/ledger/internal/controller/ledger" "github.com/uptrace/bun" ) @@ -26,16 +24,25 @@ type Balances struct { Balance *big.Int `bun:"balance,type:numeric"` } -func (s *Store) SelectAggregatedBalances(date *time.Time, useInsertionDate bool, builder query.Builder) *bun.SelectQuery { +func (s *Store) selectAccountWithVolumes(date *time.Time, useInsertionDate bool, builder query.Builder) *bun.SelectQuery { ret := s.db.NewSelect() - var needMetadata bool + var ( + needMetadata bool + needAddressSegment bool + ) if builder != nil { if err := builder.Walk(func(operator string, key string, value any) error { switch { case key == "address": - return s.validateAddressFilter(operator, value) + if err := s.validateAddressFilter(operator, value); err != nil { + return err + } + if !needAddressSegment { + needAddressSegment = isSegmentedAddress(value.(string)) // cast is safe, the type has been validated by validatedAddressFilter + } + case key == "metadata": needMetadata = true if operator != "$exists" { @@ -55,39 +62,79 @@ func (s *Store) SelectAggregatedBalances(date *time.Time, useInsertionDate bool, } } - var selectMoves *bun.SelectQuery - if useInsertionDate { - if !s.ledger.HasFeature(ledger.FeaturePostCommitVolumes, "SYNC") { - return ret.Err(ledgercontroller.NewErrMissingFeature(ledger.FeaturePostCommitVolumes)) + if needAddressSegment && !s.ledger.HasFeature(ledger.FeatureIndexAddressSegments, "ON") { + return ret.Err(ledgercontroller.NewErrMissingFeature(ledger.FeatureIndexAddressSegments)) + } + + var selectAccountsWithVolumes *bun.SelectQuery + if date != nil && !date.IsZero() { + if useInsertionDate { + if !s.ledger.HasFeature(ledger.FeaturePostCommitVolumes, "SYNC") { + return ret.Err(ledgercontroller.NewErrMissingFeature(ledger.FeaturePostCommitVolumes)) + } + selectAccountsWithVolumes = s.db.NewSelect(). + TableExpr("(?) moves", s.SelectDistinctMovesBySeq(date)). + Column("asset", "accounts_seq", "account_address", "account_address_array"). + ColumnExpr("post_commit_volumes as volumes") + } else { + if !s.ledger.HasFeature(ledger.FeaturePostCommitEffectiveVolumes, "SYNC") { + return ret.Err(ledgercontroller.NewErrMissingFeature(ledger.FeaturePostCommitEffectiveVolumes)) + } + selectAccountsWithVolumes = s.db.NewSelect(). + TableExpr("(?) moves", s.SelectDistinctMovesByEffectiveDate(date)). + Column("asset", "accounts_seq", "account_address", "account_address_array"). + ColumnExpr("moves.post_commit_effective_volumes as volumes") } - selectMoves = s.db.NewSelect(). - TableExpr("(?) moves", s.SelectDistinctMovesBySeq(date)). - Column("asset", "account_address", "account_address_array"). - ColumnExpr("post_commit_volumes as volumes") } else { - if !s.ledger.HasFeature(ledger.FeaturePostCommitEffectiveVolumes, "SYNC") { - return ret.Err(ledgercontroller.NewErrMissingFeature(ledger.FeaturePostCommitEffectiveVolumes)) - } - selectMoves = s.db.NewSelect(). - TableExpr("(?) moves", s.SelectDistinctMovesByEffectiveDate(date)). - ColumnExpr("moves.post_commit_effective_volumes as volumes"). - Column("asset", "account_address", "account_address_array") + selectAccountsWithVolumes = s.db.NewSelect(). + ModelTableExpr(s.GetPrefixedRelationName("accounts_volumes")). + Column("asset", "accounts_seq"). + ColumnExpr("account as account_address"). + ColumnExpr("(inputs, outputs)::"+s.GetPrefixedRelationName("volumes")+" as volumes"). + Where("ledger = ?", s.ledger.Name) } + selectAccountsWithVolumes = s.db.NewSelect(). + ColumnExpr("*"). + TableExpr("(?) accounts_volumes", selectAccountsWithVolumes) + if needMetadata { if s.ledger.HasFeature(ledger.FeatureAccountMetadataHistories, "SYNC") && date != nil && !date.IsZero() { - selectMoves = selectMoves. + selectAccountsWithVolumes = selectAccountsWithVolumes. Join( - `left join (?) accounts_metadata on accounts_metadata.accounts_seq = moves.accounts_seq`, + `left join (?) accounts_metadata on accounts_metadata.accounts_seq = accounts_volumes.accounts_seq`, s.selectDistinctAccountMetadataHistories(date), ). ColumnExpr("coalesce(accounts_metadata.metadata, '{}'::jsonb) as metadata") + + if needAddressSegment { + selectAccountsWithVolumes = selectAccountsWithVolumes. + Join("join " + s.GetPrefixedRelationName("accounts") + " on accounts.seq = accounts_volumes.accounts_seq"). + Column("accounts.address_array") + } } else { - selectMoves = selectMoves. + selectAccountsWithVolumes = selectAccountsWithVolumes. Join( - `join (?) accounts on accounts.seq = moves.accounts_seq`, + `join (?) accounts on accounts.seq = accounts_volumes.accounts_seq`, s.db.NewSelect().ModelTableExpr(s.GetPrefixedRelationName("accounts")), ) + + if needAddressSegment { + selectAccountsWithVolumes = selectAccountsWithVolumes.Column("accounts.address_array") + } + } + } else { + if needAddressSegment { + if date == nil || date.IsZero() { // account_address_array already resolved by moves if pit is specified + selectAccountsWithVolumes = s.db.NewSelect(). + TableExpr( + "(?) accounts", + selectAccountsWithVolumes. + Join("join "+s.GetPrefixedRelationName("accounts")+" accounts on accounts.seq = accounts_volumes.accounts_seq"). + ColumnExpr("accounts.address_array as account_address_array"), + ). + ColumnExpr("*") + } } } @@ -99,22 +146,12 @@ func (s *Store) SelectAggregatedBalances(date *time.Time, useInsertionDate bool, case metadataRegex.Match([]byte(key)): match := metadataRegex.FindAllStringSubmatch(key, 3) - key := "accounts.metadata" - if s.ledger.HasFeature(ledger.FeatureAccountMetadataHistories, "SYNC") && date != nil && !date.IsZero() { - key = "accounts_metadata.metadata" - } - - return key + " @> ?", []any{map[string]any{ + return "metadata @> ?", []any{map[string]any{ match[0][1]: value, }}, nil case key == "metadata": - key := "accounts.metadata" - if s.ledger.HasFeature(ledger.FeatureAccountMetadataHistories, "SYNC") && date != nil && !date.IsZero() { - key = "am.metadata" - } - - return fmt.Sprintf("%s -> ? IS NOT NULL", key), []any{value}, nil + return "metadata -> ? is not null", []any{value}, nil default: return "", nil, ledgercontroller.NewErrInvalidQuery("unknown key '%s' when building query", key) } @@ -122,12 +159,16 @@ func (s *Store) SelectAggregatedBalances(date *time.Time, useInsertionDate bool, if err != nil { return ret.Err(errors.Wrap(err, "building where clause")) } - selectMoves = selectMoves.Where(where, args...) + selectAccountsWithVolumes = selectAccountsWithVolumes.Where(where, args...) } + return selectAccountsWithVolumes +} + +func (s *Store) SelectAggregatedBalances(date *time.Time, useInsertionDate bool, builder query.Builder) *bun.SelectQuery { return s.db.NewSelect(). - ModelTableExpr("(?) moves", selectMoves). - ColumnExpr(`to_json(array_agg(json_build_object('asset', moves.asset, 'inputs', (moves.volumes).inputs, 'outputs', (moves.volumes).outputs))) as aggregated`) + ModelTableExpr("(?) accounts", s.selectAccountWithVolumes(date, useInsertionDate, builder)). + ColumnExpr(`to_json(array_agg(json_build_object('asset', accounts.asset, 'inputs', (accounts.volumes).inputs, 'outputs', (accounts.volumes).outputs))) as aggregated`) } func (s *Store) GetAggregatedBalances(ctx context.Context, q ledgercontroller.GetAggregatedBalanceQuery) (ledger.BalancesByAssets, error) { @@ -158,9 +199,11 @@ func (s *Store) GetBalances(ctx context.Context, query ledgercontroller.BalanceQ balances := make([]Balances, 0) err := s.db.NewSelect(). Model(&balances). - ModelTableExpr(s.GetPrefixedRelationName("balances")). + ModelTableExpr(s.GetPrefixedRelationName("accounts_volumes")). + ColumnExpr("account, asset"). + ColumnExpr("inputs - outputs as balance"). Where("("+strings.Join(conditions, ") OR (")+")", args...). - For("UPDATE"). + For("update"). // notes(gfyrag): keep order, it ensures consistent locking order and limit deadlocks Order("account", "asset"). Scan(ctx) @@ -191,3 +234,36 @@ func (s *Store) GetBalances(ctx context.Context, query ledgercontroller.BalanceQ return ret, nil }) } + +/** + +SELECT * +FROM ( + SELECT *, accounts.address_array AS account_address_array + FROM ( + SELECT + "asset", + "accounts_seq", + "account_address", + "account_address_array", + post_commit_volumes AS volumes + FROM ( + SELECT DISTINCT ON (accounts_seq, account_address, asset) + "accounts_seq", + "account_address", + "asset", + first_value(account_address_array) OVER (PARTITION BY (accounts_seq, account_address, asset) ORDER BY seq DESC) AS account_address_array, + first_value(post_commit_volumes) OVER (PARTITION BY (accounts_seq, account_address, asset) ORDER BY seq DESC) AS post_commit_volumes + FROM ( + SELECT * + FROM "7c44551f".moves + WHERE (ledger = '7c44551f') AND (insertion_date <= '2024-09-25T12:01:13.895812Z') + ORDER BY "seq" DESC + ) moves + WHERE (ledger = '7c44551f') AND (insertion_date <= '2024-09-25T12:01:13.895812Z')) moves + ) accounts_volumes + JOIN "7c44551f".accounts accounts ON accounts.seq = accounts_volumes.accounts_seq +) accounts +WHERE (jsonb_array_length(account_address_array) = 2 AND account_address_array @@ ('$[0] == "users"')::jsonpath) + +*/ diff --git a/internal/storage/ledger/balances_test.go b/internal/storage/ledger/balances_test.go index f013227a0..c99c26fc6 100644 --- a/internal/storage/ledger/balances_test.go +++ b/internal/storage/ledger/balances_test.go @@ -19,28 +19,28 @@ import ( libtime "time" ) -func TestGetBalances(t *testing.T) { +func TestBalancesGet(t *testing.T) { t.Parallel() store := newLedgerStore(t) ctx := logging.TestingContext() world := &Account{ - Ledger: store.ledger.Name, - Address: "world", - AddressArray: []string{"world"}, - InsertionDate: time.Now(), - UpdatedAt: time.Now(), - FirstUsage: time.Now(), + Ledger: store.ledger.Name, + Address: "world", + AddressArray: []string{"world"}, + InsertionDate: time.Now(), + UpdatedAt: time.Now(), + FirstUsage: time.Now(), } _, err := store.upsertAccount(ctx, world) require.NoError(t, err) _, err = store.updateVolumes(ctx, AccountsVolumes{ - Ledger: store.ledger.Name, - Account: "world", - Asset: "USD", - Inputs: new(big.Int), - Outputs: big.NewInt(100), + Ledger: store.ledger.Name, + Account: "world", + Asset: "USD", + Inputs: new(big.Int), + Outputs: big.NewInt(100), AccountsSeq: world.Seq, }) require.NoError(t, err) @@ -110,7 +110,7 @@ func TestGetBalances(t *testing.T) { }) } -func TestGetBalancesAggregated(t *testing.T) { +func TestBalancesAggregates(t *testing.T) { t.Parallel() store := newLedgerStore(t) @@ -262,22 +262,22 @@ func TestUpdateBalances(t *testing.T) { ctx := logging.TestingContext() world := &Account{ - Ledger: store.ledger.Name, - Address: "world", - AddressArray: []string{"world"}, - InsertionDate: time.Now(), - UpdatedAt: time.Now(), - FirstUsage: time.Now(), + Ledger: store.ledger.Name, + Address: "world", + AddressArray: []string{"world"}, + InsertionDate: time.Now(), + UpdatedAt: time.Now(), + FirstUsage: time.Now(), } _, err := store.upsertAccount(ctx, world) require.NoError(t, err) volumes, err := store.updateVolumes(ctx, AccountsVolumes{ - Ledger: store.ledger.Name, - Account: "world", - Asset: "USD/2", - Inputs: big.NewInt(0), - Outputs: big.NewInt(100), + Ledger: store.ledger.Name, + Account: "world", + Asset: "USD/2", + Inputs: big.NewInt(0), + Outputs: big.NewInt(100), AccountsSeq: world.Seq, }) require.NoError(t, err) @@ -288,11 +288,11 @@ func TestUpdateBalances(t *testing.T) { }, volumes) volumes, err = store.updateVolumes(ctx, AccountsVolumes{ - Ledger: store.ledger.Name, - Account: "world", - Asset: "USD/2", - Inputs: big.NewInt(50), - Outputs: big.NewInt(0), + Ledger: store.ledger.Name, + Account: "world", + Asset: "USD/2", + Inputs: big.NewInt(50), + Outputs: big.NewInt(0), }) require.NoError(t, err) require.Equal(t, map[string]map[string]ledger.Volumes{ @@ -302,11 +302,11 @@ func TestUpdateBalances(t *testing.T) { }, volumes) volumes, err = store.updateVolumes(ctx, AccountsVolumes{ - Ledger: store.ledger.Name, - Account: "world", - Asset: "USD/2", - Inputs: big.NewInt(50), - Outputs: big.NewInt(50), + Ledger: store.ledger.Name, + Account: "world", + Asset: "USD/2", + Inputs: big.NewInt(50), + Outputs: big.NewInt(50), AccountsSeq: world.Seq, }) require.NoError(t, err) diff --git a/internal/storage/ledger/moves.go b/internal/storage/ledger/moves.go index d5e16ba4f..e900f77d5 100644 --- a/internal/storage/ledger/moves.go +++ b/internal/storage/ledger/moves.go @@ -98,9 +98,7 @@ func (m Moves) volumeUpdates() []AccountsVolumes { if _, ok := aggregatedVolumes[move.Account]; !ok { aggregatedVolumes[move.Account] = make(map[string][]*Move) } - if _, ok := aggregatedVolumes[move.Account][move.Asset]; !ok { - aggregatedVolumes[move.Account][move.Asset] = append(aggregatedVolumes[move.Account][move.Asset], move) - } + aggregatedVolumes[move.Account][move.Asset] = append(aggregatedVolumes[move.Account][move.Asset], move) } ret := make([]AccountsVolumes, 0) @@ -115,11 +113,11 @@ func (m Moves) volumeUpdates() []AccountsVolumes { } } ret = append(ret, AccountsVolumes{ - Ledger: moves[0].Ledger, - Account: account, - Asset: asset, - Inputs: volumes.Inputs, - Outputs: volumes.Outputs, + Ledger: moves[0].Ledger, + Account: account, + Asset: asset, + Inputs: volumes.Inputs, + Outputs: volumes.Outputs, AccountsSeq: moves[0].AccountSeq, }) } diff --git a/internal/storage/ledger/transactions.go b/internal/storage/ledger/transactions.go index c1b9c6b1b..d3e2c5227 100644 --- a/internal/storage/ledger/transactions.go +++ b/internal/storage/ledger/transactions.go @@ -325,7 +325,7 @@ func (s *Store) CommitTransaction(ctx context.Context, tx *ledger.Transaction) e } sqlQueries := Map(tx.InvolvedAccounts(), func(from string) string { - return fmt.Sprintf("select pg_advisory_xact_lock(hashtext('%s'))", fmt.Sprintf("%s%s", s.ledger.Name, from)) + return fmt.Sprintf("select pg_advisory_xact_lock(hashtext('%s'))", fmt.Sprintf("%s_%s", s.ledger.Name, from)) }) _, err := s.db.NewRaw(strings.Join(sqlQueries, ";")).Exec(ctx) diff --git a/internal/storage/ledger/transactions_test.go b/internal/storage/ledger/transactions_test.go index 9b8b61eab..8eacae195 100644 --- a/internal/storage/ledger/transactions_test.go +++ b/internal/storage/ledger/transactions_test.go @@ -247,9 +247,10 @@ func TestTransactionsCommit(t *testing.T) { store := newLedgerStore(t) - _, err := store.upsertAccount(ctx, &Account{ + account1 := &Account{ Address: "account:1", - }) + } + _, err := store.upsertAccount(ctx, account1) require.NoError(t, err) account2 := &Account{ @@ -258,12 +259,23 @@ func TestTransactionsCommit(t *testing.T) { _, err = store.upsertAccount(ctx, account2) require.NoError(t, err) + // todo: we should not need to update volumes to have a lock _, err = store.updateVolumes(ctx, AccountsVolumes{ Ledger: store.ledger.Name, Account: "account:1", Asset: "USD", Inputs: big.NewInt(100), Outputs: big.NewInt(0), + AccountsSeq: account1.Seq, + }) + require.NoError(t, err) + + _, err = store.updateVolumes(ctx, AccountsVolumes{ + Ledger: store.ledger.Name, + Account: "account:2", + Asset: "USD", + Inputs: big.NewInt(100), + Outputs: big.NewInt(0), AccountsSeq: account2.Seq, }) require.NoError(t, err) diff --git a/internal/storage/ledger/volumes.go b/internal/storage/ledger/volumes.go index 4dbdb2b96..16277bfad 100644 --- a/internal/storage/ledger/volumes.go +++ b/internal/storage/ledger/volumes.go @@ -19,12 +19,12 @@ import ( type AccountsVolumes struct { bun.BaseModel `bun:"accounts_volumes"` - Ledger string `bun:"ledger,type:varchar"` - Account string `bun:"account,type:varchar"` - Asset string `bun:"asset,type:varchar"` - Inputs *big.Int `bun:"inputs,type:numeric"` - Outputs *big.Int `bun:"outputs,type:numeric"` - AccountsSeq int `bun:"accounts_seq,type:int"` + Ledger string `bun:"ledger,type:varchar"` + Account string `bun:"account,type:varchar"` + Asset string `bun:"asset,type:varchar"` + Inputs *big.Int `bun:"inputs,type:numeric"` + Outputs *big.Int `bun:"outputs,type:numeric"` + AccountsSeq int `bun:"accounts_seq,type:int"` } func (s *Store) updateVolumes(ctx context.Context, accountVolumes ...AccountsVolumes) (map[string]map[string]ledger.Volumes, error) { @@ -48,7 +48,7 @@ func (s *Store) updateVolumes(ctx context.Context, accountVolumes ...AccountsVol ret[volumes.Account] = map[string]ledger.Volumes{} } ret[volumes.Account][volumes.Asset] = ledger.Volumes{ - Inputs: volumes.Inputs, + Inputs: volumes.Inputs, Outputs: volumes.Outputs, } } @@ -58,18 +58,9 @@ func (s *Store) updateVolumes(ctx context.Context, accountVolumes ...AccountsVol } func (s *Store) selectVolumes(oot, pit *time.Time, useInsertionDate bool, groupLevel int, q lquery.Builder) *bun.SelectQuery { - - ret := s.db.NewSelect(). - Column("account_address_array"). - Column("account_address"). - Column("asset"). - ColumnExpr("sum(case when not is_source then amount else 0 end) as input"). - ColumnExpr("sum(case when is_source then amount else 0 end) as output"). - ColumnExpr("sum(case when not is_source then amount else -amount end) as balance"). - ModelTableExpr(s.GetPrefixedRelationName("moves")) + ret := s.db.NewSelect() var useMetadata bool - if q != nil { err := q.Walk(func(operator, key string, value any) error { switch { @@ -96,6 +87,15 @@ func (s *Store) selectVolumes(oot, pit *time.Time, useInsertionDate bool, groupL } } + ret = ret. + Column("account_address_array"). + Column("account_address"). + Column("asset"). + ColumnExpr("sum(case when not is_source then amount else 0 end) as input"). + ColumnExpr("sum(case when is_source then amount else 0 end) as output"). + ColumnExpr("sum(case when not is_source then amount else -amount end) as balance"). + ModelTableExpr(s.GetPrefixedRelationName("moves")) + // todo: handle with pit by using accounts_metadata if useMetadata { ret = ret.