From 36112d05527c6c26f28b58ea8671b1b6c24f9829 Mon Sep 17 00:00:00 2001 From: Geoffrey Ragot Date: Thu, 26 Sep 2024 17:35:01 +0200 Subject: [PATCH] chore: remove accounts_address_array from moves --- .../bucket/migrations/11-stateless.sql | 7 ++- internal/storage/ledger/balances.go | 52 ++++++++++++++----- .../ledger/migrations/0-add-sequences.sql | 3 -- internal/storage/ledger/moves.go | 12 ++--- internal/storage/ledger/moves_test.go | 5 -- internal/storage/ledger/transactions.go | 36 ++++++------- internal/storage/ledger/volumes.go | 38 +++++++++++--- 7 files changed, 95 insertions(+), 58 deletions(-) diff --git a/internal/storage/bucket/migrations/11-stateless.sql b/internal/storage/bucket/migrations/11-stateless.sql index 6f2033b20..95f1f9f59 100644 --- a/internal/storage/bucket/migrations/11-stateless.sql +++ b/internal/storage/bucket/migrations/11-stateless.sql @@ -42,6 +42,9 @@ drop column post_commit_volumes; alter table "{{.Bucket}}".moves drop column post_commit_effective_volumes; +alter table "{{.Bucket}}".moves +drop column accounts_address_array; + alter table "{{.Bucket}}".moves rename post_commit_volumes_jsonb to post_commit_volumes; @@ -145,8 +148,8 @@ drop trigger "update_account" on "{{.Bucket}}".accounts; drop trigger "insert_transaction" on "{{.Bucket}}".transactions; drop trigger "update_transaction" on "{{.Bucket}}".transactions; -drop index moves_account_address_array; -drop index moves_account_address_array_length; +--drop index moves_account_address_array; +--drop index moves_account_address_array_length; drop index transactions_sources_arrays; drop index transactions_destinations_arrays; drop index accounts_address_array; diff --git a/internal/storage/ledger/balances.go b/internal/storage/ledger/balances.go index a2fe65cd1..d719c26eb 100644 --- a/internal/storage/ledger/balances.go +++ b/internal/storage/ledger/balances.go @@ -67,7 +67,7 @@ func (s *Store) selectAccountWithVolumes(date *time.Time, useInsertionDate bool, } selectAccountsWithVolumes = s.db.NewSelect(). TableExpr("(?) moves", s.SelectDistinctMovesBySeq(date)). - Column("asset", "accounts_address", "accounts_address_array"). + Column("asset", "accounts_address"). ColumnExpr("post_commit_volumes as volumes") } else { if !s.ledger.HasFeature(ledger.FeatureMovesHistoryPostCommitEffectiveVolumes, "SYNC") { @@ -75,7 +75,7 @@ func (s *Store) selectAccountWithVolumes(date *time.Time, useInsertionDate bool, } selectAccountsWithVolumes = s.db.NewSelect(). TableExpr("(?) moves", s.SelectDistinctMovesByEffectiveDate(date)). - Column("asset", "accounts_address", "accounts_address_array"). + Column("asset", "accounts_address"). ColumnExpr("moves.post_commit_effective_volumes as volumes") } } else { @@ -117,16 +117,13 @@ func (s *Store) selectAccountWithVolumes(date *time.Time, useInsertionDate bool, } } else { if needAddressSegment { - if date == nil || date.IsZero() { // account_address_array already resolved by moves if pit is specified - selectAccountsWithVolumes = s.db.NewSelect(). - TableExpr( - "(?) accounts", - selectAccountsWithVolumes. - Join("join "+s.GetPrefixedRelationName("accounts")+" accounts on accounts.address = accounts_volumes.accounts_address"). - ColumnExpr("accounts.address_array as accounts_address_array"), - ). - ColumnExpr("*") - } + selectAccountsWithVolumes = s.db.NewSelect(). + TableExpr( + "(?) accounts", + selectAccountsWithVolumes. + Join("join "+s.GetPrefixedRelationName("accounts")+" accounts on accounts.address = accounts_volumes.accounts_address"), + ). + ColumnExpr("*") } } @@ -134,7 +131,7 @@ func (s *Store) selectAccountWithVolumes(date *time.Time, useInsertionDate bool, where, args, err := builder.Build(query.ContextFn(func(key, operator string, value any) (string, []any, error) { switch { case key == "address": - return filterAccountAddress(value.(string), "accounts_address"), nil, nil + return filterAccountAddress(value.(string), "accounts.address"), nil, nil case metadataRegex.Match([]byte(key)): match := metadataRegex.FindAllStringSubmatch(key, 3) @@ -246,3 +243,32 @@ func (s *Store) GetBalances(ctx context.Context, query ledgercontroller.BalanceQ return ret, nil }) } + +/** +SELECT to_json(array_agg(json_build_object('asset', accounts.asset, 'input', (accounts.volumes->>'input')::numeric, 'output', (accounts.volumes->>'output')::numeric))) AS aggregated +FROM ( + SELECT * + FROM ( + SELECT *, accounts.address_array + FROM ( + SELECT "asset", "accounts_address", post_commit_volumes AS volumes + FROM ( + SELECT DISTINCT ON (accounts_address, asset) + "accounts_address", + "asset", + first_value(post_commit_volumes) OVER (PARTITION BY (accounts_address, asset) ORDER BY seq DESC) AS post_commit_volumes + FROM ( + SELECT * + FROM "87b28082".moves + WHERE (ledger = '87b28082') AND (insertion_date <= '2024-09-26T14:45:10.568382Z') + ORDER BY "seq" DESC + ) moves + WHERE (ledger = '87b28082') AND (insertion_date <= '2024-09-26T14:45:10.568382Z') + ) moves + ) accounts_volumes + JOIN "87b28082".accounts accounts ON accounts.address = accounts_volumes.accounts_address + ) accounts + WHERE (jsonb_array_length(accounts.address_array) = 2 + AND accounts.address_array @@ ('$[0] == "users"')::jsonpath) +) accounts +*/ diff --git a/internal/storage/ledger/migrations/0-add-sequences.sql b/internal/storage/ledger/migrations/0-add-sequences.sql index 24036b890..52164ff43 100644 --- a/internal/storage/ledger/migrations/0-add-sequences.sql +++ b/internal/storage/ledger/migrations/0-add-sequences.sql @@ -108,9 +108,6 @@ execute procedure "{{.Bucket}}".set_transaction_addresses(); {{ end }} {{ if .HasFeature "INDEX_ADDRESS_SEGMENTS" "ON" }} -create index "moves_accounts_address_array_{{.ID}}" on "{{.Bucket}}".moves using gin (accounts_address_array jsonb_ops) where ledger = '{{.Name}}'; -create index "moves_accounts_address_array_length_{{.ID}}" on "{{.Bucket}}".moves (jsonb_array_length(accounts_address_array)) where ledger = '{{.Name}}'; - create index "accounts_address_array_{{.ID}}" on "{{.Bucket}}".accounts using gin (address_array jsonb_ops) where ledger = '{{.Name}}'; create index "accounts_address_array_length_{{.ID}}" on "{{.Bucket}}".accounts (jsonb_array_length(address_array)) where ledger = '{{.Name}}'; diff --git a/internal/storage/ledger/moves.go b/internal/storage/ledger/moves.go index fff1e6398..ffade5e20 100644 --- a/internal/storage/ledger/moves.go +++ b/internal/storage/ledger/moves.go @@ -32,7 +32,6 @@ func (s *Store) SelectDistinctMovesBySeq(date *time.Time) *bun.SelectQuery { TableExpr("(?) moves", s.SortMovesBySeq(date)). DistinctOn("accounts_address, asset"). Column("accounts_address", "asset"). - ColumnExpr("first_value(accounts_address_array) over (partition by (accounts_address, asset) order by seq desc) as accounts_address_array"). ColumnExpr("first_value(post_commit_volumes) over (partition by (accounts_address, asset) order by seq desc) as post_commit_volumes"). Where("ledger = ?", s.ledger.Name) @@ -48,7 +47,6 @@ func (s *Store) SelectDistinctMovesByEffectiveDate(date *time.Time) *bun.SelectQ TableExpr(s.GetPrefixedRelationName("moves")). DistinctOn("accounts_address, asset"). Column("accounts_address", "asset"). - ColumnExpr("first_value(accounts_address_array) over (partition by (accounts_address, asset) order by effective_date desc, seq desc) as accounts_address_array"). ColumnExpr("first_value(post_commit_effective_volumes) over (partition by (accounts_address, asset) order by effective_date desc, seq desc) as post_commit_effective_volumes"). Where("ledger = ?", s.ledger.Name) @@ -77,12 +75,10 @@ func (s *Store) insertMoves(ctx context.Context, moves ...*Move) error { type Move struct { bun.BaseModel `bun:"table:moves"` - Ledger string `bun:"ledger,type:varchar"` - TransactionID int `bun:"transactions_id,type:bigint"` - IsSource bool `bun:"is_source,type:bool"` - Account string `bun:"accounts_address,type:varchar"` - // todo: use accounts table - AccountAddressArray []string `bun:"accounts_address_array,type:jsonb"` + Ledger string `bun:"ledger,type:varchar"` + TransactionID int `bun:"transactions_id,type:bigint"` + IsSource bool `bun:"is_source,type:bool"` + Account string `bun:"accounts_address,type:varchar"` Amount *bunpaginate.BigInt `bun:"amount,type:numeric"` Asset string `bun:"asset,type:varchar"` InsertionDate time.Time `bun:"insertion_date,type:timestamp"` diff --git a/internal/storage/ledger/moves_test.go b/internal/storage/ledger/moves_test.go index e8d80ee58..743918b9c 100644 --- a/internal/storage/ledger/moves_test.go +++ b/internal/storage/ledger/moves_test.go @@ -54,7 +54,6 @@ func TestMovesInsert(t *testing.T) { Ledger: store.ledger.Name, IsSource: true, Account: "world", - AccountAddressArray: []string{"world"}, Amount: (*bunpaginate.BigInt)(big.NewInt(100)), Asset: "USD", InsertionDate: t0, @@ -72,7 +71,6 @@ func TestMovesInsert(t *testing.T) { Ledger: store.ledger.Name, IsSource: false, Account: "world", - AccountAddressArray: []string{"world"}, Amount: (*bunpaginate.BigInt)(big.NewInt(50)), Asset: "USD", InsertionDate: t3, @@ -90,7 +88,6 @@ func TestMovesInsert(t *testing.T) { Ledger: store.ledger.Name, IsSource: true, Account: "world", - AccountAddressArray: []string{"world"}, Amount: (*bunpaginate.BigInt)(big.NewInt(200)), Asset: "USD", InsertionDate: t1, @@ -108,7 +105,6 @@ func TestMovesInsert(t *testing.T) { Ledger: store.ledger.Name, IsSource: false, Account: "world", - AccountAddressArray: []string{"world"}, Amount: (*bunpaginate.BigInt)(big.NewInt(50)), Asset: "USD", InsertionDate: t2, @@ -126,7 +122,6 @@ func TestMovesInsert(t *testing.T) { Ledger: store.ledger.Name, IsSource: false, Account: "world", - AccountAddressArray: []string{"world"}, Amount: (*bunpaginate.BigInt)(big.NewInt(50)), Asset: "USD", InsertionDate: t4, diff --git a/internal/storage/ledger/transactions.go b/internal/storage/ledger/transactions.go index eb6bad12f..91cf07a7a 100644 --- a/internal/storage/ledger/transactions.go +++ b/internal/storage/ledger/transactions.go @@ -280,29 +280,27 @@ func (s *Store) CommitTransaction(ctx context.Context, tx *ledger.Transaction) e for _, posting := range postings { moves = append(moves, &Move{ - Ledger: s.ledger.Name, - Account: posting.Destination, - AccountAddressArray: strings.Split(posting.Destination, ":"), - Amount: (*bunpaginate.BigInt)(posting.Amount), - Asset: posting.Asset, - InsertionDate: tx.InsertedAt, - EffectiveDate: tx.Timestamp, - PostCommitVolumes: pointer.For(postCommitVolumes[posting.Destination][posting.Asset].Copy()), - TransactionID: tx.ID, + Ledger: s.ledger.Name, + Account: posting.Destination, + Amount: (*bunpaginate.BigInt)(posting.Amount), + Asset: posting.Asset, + InsertionDate: tx.InsertedAt, + EffectiveDate: tx.Timestamp, + PostCommitVolumes: pointer.For(postCommitVolumes[posting.Destination][posting.Asset].Copy()), + TransactionID: tx.ID, }) postCommitVolumes.AddInput(posting.Destination, posting.Asset, new(big.Int).Neg(posting.Amount)) moves = append(moves, &Move{ - Ledger: s.ledger.Name, - IsSource: true, - Account: posting.Source, - AccountAddressArray: strings.Split(posting.Source, ":"), - Amount: (*bunpaginate.BigInt)(posting.Amount), - Asset: posting.Asset, - InsertionDate: tx.InsertedAt, - EffectiveDate: tx.Timestamp, - PostCommitVolumes: pointer.For(postCommitVolumes[posting.Source][posting.Asset].Copy()), - TransactionID: tx.ID, + Ledger: s.ledger.Name, + IsSource: true, + Account: posting.Source, + Amount: (*bunpaginate.BigInt)(posting.Amount), + Asset: posting.Asset, + InsertionDate: tx.InsertedAt, + EffectiveDate: tx.Timestamp, + PostCommitVolumes: pointer.For(postCommitVolumes[posting.Source][posting.Asset].Copy()), + TransactionID: tx.ID, }) postCommitVolumes.AddOutput(posting.Source, posting.Asset, new(big.Int).Neg(posting.Amount)) } diff --git a/internal/storage/ledger/volumes.go b/internal/storage/ledger/volumes.go index d5d65dbf6..79aa7e7d4 100644 --- a/internal/storage/ledger/volumes.go +++ b/internal/storage/ledger/volumes.go @@ -64,12 +64,20 @@ func (s *Store) selectVolumes(oot, pit *time.Time, useInsertionDate bool, groupL return ret.Err(ledgercontroller.NewErrMissingFeature(ledger.FeatureMovesHistory)) } - var useMetadata bool + var ( + useMetadata bool + needSegmentAddress bool + ) if q != nil { err := q.Walk(func(operator, key string, value any) error { switch { case key == "account" || key == "address": - return s.validateAddressFilter(operator, value) + if err := s.validateAddressFilter(operator, value); err != nil { + return err + } + if !needSegmentAddress { + needSegmentAddress = isSegmentedAddress(value.(string)) // safe cast + } case metadataRegex.Match([]byte(key)): if operator != "$match" { return ledgercontroller.NewErrInvalidQuery("'metadata' column can only be used with $match") @@ -92,14 +100,25 @@ func (s *Store) selectVolumes(oot, pit *time.Time, useInsertionDate bool, groupL } ret = ret. - Column("accounts_address_array"). - Column("accounts_address"). + ColumnExpr("accounts_address as address"). Column("asset"). ColumnExpr("sum(case when not is_source then amount else 0 end) as input"). ColumnExpr("sum(case when is_source then amount else 0 end) as output"). ColumnExpr("sum(case when not is_source then amount else -amount end) as balance"). ModelTableExpr(s.GetPrefixedRelationName("moves")) + if needSegmentAddress { + ret = ret. + Join( + "join (?) accounts on accounts.address = moves.accounts_address", + s.db.NewSelect(). + ModelTableExpr(s.GetPrefixedRelationName("accounts")). + Where("ledger = ?", s.ledger.Name). + Column("address_array", "address"), + ). + Column("accounts.address_array") + } + // todo: handle with pit by using accounts_metadata if useMetadata { ret = ret. @@ -126,7 +145,10 @@ func (s *Store) selectVolumes(oot, pit *time.Time, useInsertionDate bool, groupL ret = ret.Where(dateFilterColumn+" >= ?", oot) } - ret = ret.GroupExpr("accounts_address, accounts_address_array, asset") + ret = ret.GroupExpr("accounts_address, asset") + if needSegmentAddress { + ret = ret.GroupExpr("address_array") + } globalQuery := s.db.NewSelect() globalQuery = globalQuery. @@ -135,14 +157,14 @@ func (s *Store) selectVolumes(oot, pit *time.Time, useInsertionDate bool, groupL if groupLevel > 0 { globalQuery = globalQuery. - ColumnExpr(fmt.Sprintf(`(array_to_string((string_to_array(accounts_address, ':'))[1:LEAST(array_length(string_to_array(accounts_address, ':'),1),%d)],':')) as account`, groupLevel)). + ColumnExpr(fmt.Sprintf(`(array_to_string((string_to_array(address, ':'))[1:LEAST(array_length(string_to_array(address, ':'),1),%d)],':')) as account`, groupLevel)). ColumnExpr("asset"). ColumnExpr("sum(input) as input"). ColumnExpr("sum(output) as output"). ColumnExpr("sum(balance) as balance"). GroupExpr("account, asset") } else { - globalQuery = globalQuery.ColumnExpr("accounts_address as account, asset, input, output, balance") + globalQuery = globalQuery.ColumnExpr("address as account, asset, input, output, balance") } if useMetadata { @@ -154,7 +176,7 @@ func (s *Store) selectVolumes(oot, pit *time.Time, useInsertionDate bool, groupL switch { case key == "account" || key == "address": - return filterAccountAddress(value.(string), "accounts_address"), nil, nil + return filterAccountAddress(value.(string), "address"), nil, nil case metadataRegex.Match([]byte(key)): match := metadataRegex.FindAllStringSubmatch(key, 3) return "metadata @> ?", []any{map[string]any{