Skip to content

Commit

Permalink
chore: remove accounts_address_array from moves
Browse files Browse the repository at this point in the history
  • Loading branch information
gfyrag committed Oct 16, 2024
1 parent 7fe89c9 commit 36112d0
Show file tree
Hide file tree
Showing 7 changed files with 95 additions and 58 deletions.
7 changes: 5 additions & 2 deletions internal/storage/bucket/migrations/11-stateless.sql
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ drop column post_commit_volumes;
alter table "{{.Bucket}}".moves
drop column post_commit_effective_volumes;

alter table "{{.Bucket}}".moves
drop column accounts_address_array;

alter table "{{.Bucket}}".moves
rename post_commit_volumes_jsonb to post_commit_volumes;

Expand Down Expand Up @@ -145,8 +148,8 @@ drop trigger "update_account" on "{{.Bucket}}".accounts;
drop trigger "insert_transaction" on "{{.Bucket}}".transactions;
drop trigger "update_transaction" on "{{.Bucket}}".transactions;

drop index moves_account_address_array;
drop index moves_account_address_array_length;
--drop index moves_account_address_array;
--drop index moves_account_address_array_length;
drop index transactions_sources_arrays;
drop index transactions_destinations_arrays;
drop index accounts_address_array;
Expand Down
52 changes: 39 additions & 13 deletions internal/storage/ledger/balances.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,15 +67,15 @@ func (s *Store) selectAccountWithVolumes(date *time.Time, useInsertionDate bool,
}
selectAccountsWithVolumes = s.db.NewSelect().
TableExpr("(?) moves", s.SelectDistinctMovesBySeq(date)).
Column("asset", "accounts_address", "accounts_address_array").
Column("asset", "accounts_address").
ColumnExpr("post_commit_volumes as volumes")
} else {
if !s.ledger.HasFeature(ledger.FeatureMovesHistoryPostCommitEffectiveVolumes, "SYNC") {
return ret.Err(ledgercontroller.NewErrMissingFeature(ledger.FeatureMovesHistoryPostCommitEffectiveVolumes))
}
selectAccountsWithVolumes = s.db.NewSelect().
TableExpr("(?) moves", s.SelectDistinctMovesByEffectiveDate(date)).
Column("asset", "accounts_address", "accounts_address_array").
Column("asset", "accounts_address").
ColumnExpr("moves.post_commit_effective_volumes as volumes")
}
} else {
Expand Down Expand Up @@ -117,24 +117,21 @@ func (s *Store) selectAccountWithVolumes(date *time.Time, useInsertionDate bool,
}
} else {
if needAddressSegment {
if date == nil || date.IsZero() { // account_address_array already resolved by moves if pit is specified
selectAccountsWithVolumes = s.db.NewSelect().
TableExpr(
"(?) accounts",
selectAccountsWithVolumes.
Join("join "+s.GetPrefixedRelationName("accounts")+" accounts on accounts.address = accounts_volumes.accounts_address").
ColumnExpr("accounts.address_array as accounts_address_array"),
).
ColumnExpr("*")
}
selectAccountsWithVolumes = s.db.NewSelect().
TableExpr(
"(?) accounts",
selectAccountsWithVolumes.
Join("join "+s.GetPrefixedRelationName("accounts")+" accounts on accounts.address = accounts_volumes.accounts_address"),
).
ColumnExpr("*")
}
}

if builder != nil {
where, args, err := builder.Build(query.ContextFn(func(key, operator string, value any) (string, []any, error) {
switch {
case key == "address":
return filterAccountAddress(value.(string), "accounts_address"), nil, nil
return filterAccountAddress(value.(string), "accounts.address"), nil, nil
case metadataRegex.Match([]byte(key)):
match := metadataRegex.FindAllStringSubmatch(key, 3)

Expand Down Expand Up @@ -246,3 +243,32 @@ func (s *Store) GetBalances(ctx context.Context, query ledgercontroller.BalanceQ
return ret, nil
})
}

/**
SELECT to_json(array_agg(json_build_object('asset', accounts.asset, 'input', (accounts.volumes->>'input')::numeric, 'output', (accounts.volumes->>'output')::numeric))) AS aggregated
FROM (
SELECT *
FROM (
SELECT *, accounts.address_array
FROM (
SELECT "asset", "accounts_address", post_commit_volumes AS volumes
FROM (
SELECT DISTINCT ON (accounts_address, asset)
"accounts_address",
"asset",
first_value(post_commit_volumes) OVER (PARTITION BY (accounts_address, asset) ORDER BY seq DESC) AS post_commit_volumes
FROM (
SELECT *
FROM "87b28082".moves
WHERE (ledger = '87b28082') AND (insertion_date <= '2024-09-26T14:45:10.568382Z')
ORDER BY "seq" DESC
) moves
WHERE (ledger = '87b28082') AND (insertion_date <= '2024-09-26T14:45:10.568382Z')
) moves
) accounts_volumes
JOIN "87b28082".accounts accounts ON accounts.address = accounts_volumes.accounts_address
) accounts
WHERE (jsonb_array_length(accounts.address_array) = 2
AND accounts.address_array @@ ('$[0] == "users"')::jsonpath)
) accounts
*/
3 changes: 0 additions & 3 deletions internal/storage/ledger/migrations/0-add-sequences.sql
Original file line number Diff line number Diff line change
Expand Up @@ -108,9 +108,6 @@ execute procedure "{{.Bucket}}".set_transaction_addresses();
{{ end }}

{{ if .HasFeature "INDEX_ADDRESS_SEGMENTS" "ON" }}
create index "moves_accounts_address_array_{{.ID}}" on "{{.Bucket}}".moves using gin (accounts_address_array jsonb_ops) where ledger = '{{.Name}}';
create index "moves_accounts_address_array_length_{{.ID}}" on "{{.Bucket}}".moves (jsonb_array_length(accounts_address_array)) where ledger = '{{.Name}}';

create index "accounts_address_array_{{.ID}}" on "{{.Bucket}}".accounts using gin (address_array jsonb_ops) where ledger = '{{.Name}}';
create index "accounts_address_array_length_{{.ID}}" on "{{.Bucket}}".accounts (jsonb_array_length(address_array)) where ledger = '{{.Name}}';

Expand Down
12 changes: 4 additions & 8 deletions internal/storage/ledger/moves.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ func (s *Store) SelectDistinctMovesBySeq(date *time.Time) *bun.SelectQuery {
TableExpr("(?) moves", s.SortMovesBySeq(date)).
DistinctOn("accounts_address, asset").
Column("accounts_address", "asset").
ColumnExpr("first_value(accounts_address_array) over (partition by (accounts_address, asset) order by seq desc) as accounts_address_array").
ColumnExpr("first_value(post_commit_volumes) over (partition by (accounts_address, asset) order by seq desc) as post_commit_volumes").
Where("ledger = ?", s.ledger.Name)

Expand All @@ -48,7 +47,6 @@ func (s *Store) SelectDistinctMovesByEffectiveDate(date *time.Time) *bun.SelectQ
TableExpr(s.GetPrefixedRelationName("moves")).
DistinctOn("accounts_address, asset").
Column("accounts_address", "asset").
ColumnExpr("first_value(accounts_address_array) over (partition by (accounts_address, asset) order by effective_date desc, seq desc) as accounts_address_array").
ColumnExpr("first_value(post_commit_effective_volumes) over (partition by (accounts_address, asset) order by effective_date desc, seq desc) as post_commit_effective_volumes").
Where("ledger = ?", s.ledger.Name)

Expand Down Expand Up @@ -77,12 +75,10 @@ func (s *Store) insertMoves(ctx context.Context, moves ...*Move) error {
type Move struct {
bun.BaseModel `bun:"table:moves"`

Ledger string `bun:"ledger,type:varchar"`
TransactionID int `bun:"transactions_id,type:bigint"`
IsSource bool `bun:"is_source,type:bool"`
Account string `bun:"accounts_address,type:varchar"`
// todo: use accounts table
AccountAddressArray []string `bun:"accounts_address_array,type:jsonb"`
Ledger string `bun:"ledger,type:varchar"`
TransactionID int `bun:"transactions_id,type:bigint"`
IsSource bool `bun:"is_source,type:bool"`
Account string `bun:"accounts_address,type:varchar"`
Amount *bunpaginate.BigInt `bun:"amount,type:numeric"`
Asset string `bun:"asset,type:varchar"`
InsertionDate time.Time `bun:"insertion_date,type:timestamp"`
Expand Down
5 changes: 0 additions & 5 deletions internal/storage/ledger/moves_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ func TestMovesInsert(t *testing.T) {
Ledger: store.ledger.Name,
IsSource: true,
Account: "world",
AccountAddressArray: []string{"world"},
Amount: (*bunpaginate.BigInt)(big.NewInt(100)),
Asset: "USD",
InsertionDate: t0,
Expand All @@ -72,7 +71,6 @@ func TestMovesInsert(t *testing.T) {
Ledger: store.ledger.Name,
IsSource: false,
Account: "world",
AccountAddressArray: []string{"world"},
Amount: (*bunpaginate.BigInt)(big.NewInt(50)),
Asset: "USD",
InsertionDate: t3,
Expand All @@ -90,7 +88,6 @@ func TestMovesInsert(t *testing.T) {
Ledger: store.ledger.Name,
IsSource: true,
Account: "world",
AccountAddressArray: []string{"world"},
Amount: (*bunpaginate.BigInt)(big.NewInt(200)),
Asset: "USD",
InsertionDate: t1,
Expand All @@ -108,7 +105,6 @@ func TestMovesInsert(t *testing.T) {
Ledger: store.ledger.Name,
IsSource: false,
Account: "world",
AccountAddressArray: []string{"world"},
Amount: (*bunpaginate.BigInt)(big.NewInt(50)),
Asset: "USD",
InsertionDate: t2,
Expand All @@ -126,7 +122,6 @@ func TestMovesInsert(t *testing.T) {
Ledger: store.ledger.Name,
IsSource: false,
Account: "world",
AccountAddressArray: []string{"world"},
Amount: (*bunpaginate.BigInt)(big.NewInt(50)),
Asset: "USD",
InsertionDate: t4,
Expand Down
36 changes: 17 additions & 19 deletions internal/storage/ledger/transactions.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,29 +280,27 @@ func (s *Store) CommitTransaction(ctx context.Context, tx *ledger.Transaction) e

for _, posting := range postings {
moves = append(moves, &Move{
Ledger: s.ledger.Name,
Account: posting.Destination,
AccountAddressArray: strings.Split(posting.Destination, ":"),
Amount: (*bunpaginate.BigInt)(posting.Amount),
Asset: posting.Asset,
InsertionDate: tx.InsertedAt,
EffectiveDate: tx.Timestamp,
PostCommitVolumes: pointer.For(postCommitVolumes[posting.Destination][posting.Asset].Copy()),
TransactionID: tx.ID,
Ledger: s.ledger.Name,
Account: posting.Destination,
Amount: (*bunpaginate.BigInt)(posting.Amount),
Asset: posting.Asset,
InsertionDate: tx.InsertedAt,
EffectiveDate: tx.Timestamp,
PostCommitVolumes: pointer.For(postCommitVolumes[posting.Destination][posting.Asset].Copy()),
TransactionID: tx.ID,
})
postCommitVolumes.AddInput(posting.Destination, posting.Asset, new(big.Int).Neg(posting.Amount))

moves = append(moves, &Move{
Ledger: s.ledger.Name,
IsSource: true,
Account: posting.Source,
AccountAddressArray: strings.Split(posting.Source, ":"),
Amount: (*bunpaginate.BigInt)(posting.Amount),
Asset: posting.Asset,
InsertionDate: tx.InsertedAt,
EffectiveDate: tx.Timestamp,
PostCommitVolumes: pointer.For(postCommitVolumes[posting.Source][posting.Asset].Copy()),
TransactionID: tx.ID,
Ledger: s.ledger.Name,
IsSource: true,
Account: posting.Source,
Amount: (*bunpaginate.BigInt)(posting.Amount),
Asset: posting.Asset,
InsertionDate: tx.InsertedAt,
EffectiveDate: tx.Timestamp,
PostCommitVolumes: pointer.For(postCommitVolumes[posting.Source][posting.Asset].Copy()),
TransactionID: tx.ID,
})
postCommitVolumes.AddOutput(posting.Source, posting.Asset, new(big.Int).Neg(posting.Amount))
}
Expand Down
38 changes: 30 additions & 8 deletions internal/storage/ledger/volumes.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,12 +64,20 @@ func (s *Store) selectVolumes(oot, pit *time.Time, useInsertionDate bool, groupL
return ret.Err(ledgercontroller.NewErrMissingFeature(ledger.FeatureMovesHistory))
}

var useMetadata bool
var (
useMetadata bool
needSegmentAddress bool
)
if q != nil {
err := q.Walk(func(operator, key string, value any) error {
switch {
case key == "account" || key == "address":
return s.validateAddressFilter(operator, value)
if err := s.validateAddressFilter(operator, value); err != nil {
return err
}
if !needSegmentAddress {
needSegmentAddress = isSegmentedAddress(value.(string)) // safe cast
}
case metadataRegex.Match([]byte(key)):
if operator != "$match" {
return ledgercontroller.NewErrInvalidQuery("'metadata' column can only be used with $match")
Expand All @@ -92,14 +100,25 @@ func (s *Store) selectVolumes(oot, pit *time.Time, useInsertionDate bool, groupL
}

ret = ret.
Column("accounts_address_array").
Column("accounts_address").
ColumnExpr("accounts_address as address").
Column("asset").
ColumnExpr("sum(case when not is_source then amount else 0 end) as input").
ColumnExpr("sum(case when is_source then amount else 0 end) as output").
ColumnExpr("sum(case when not is_source then amount else -amount end) as balance").
ModelTableExpr(s.GetPrefixedRelationName("moves"))

if needSegmentAddress {
ret = ret.
Join(
"join (?) accounts on accounts.address = moves.accounts_address",
s.db.NewSelect().
ModelTableExpr(s.GetPrefixedRelationName("accounts")).
Where("ledger = ?", s.ledger.Name).
Column("address_array", "address"),
).
Column("accounts.address_array")
}

// todo: handle with pit by using accounts_metadata
if useMetadata {
ret = ret.
Expand All @@ -126,7 +145,10 @@ func (s *Store) selectVolumes(oot, pit *time.Time, useInsertionDate bool, groupL
ret = ret.Where(dateFilterColumn+" >= ?", oot)
}

ret = ret.GroupExpr("accounts_address, accounts_address_array, asset")
ret = ret.GroupExpr("accounts_address, asset")
if needSegmentAddress {
ret = ret.GroupExpr("address_array")
}

globalQuery := s.db.NewSelect()
globalQuery = globalQuery.
Expand All @@ -135,14 +157,14 @@ func (s *Store) selectVolumes(oot, pit *time.Time, useInsertionDate bool, groupL

if groupLevel > 0 {
globalQuery = globalQuery.
ColumnExpr(fmt.Sprintf(`(array_to_string((string_to_array(accounts_address, ':'))[1:LEAST(array_length(string_to_array(accounts_address, ':'),1),%d)],':')) as account`, groupLevel)).
ColumnExpr(fmt.Sprintf(`(array_to_string((string_to_array(address, ':'))[1:LEAST(array_length(string_to_array(address, ':'),1),%d)],':')) as account`, groupLevel)).
ColumnExpr("asset").
ColumnExpr("sum(input) as input").
ColumnExpr("sum(output) as output").
ColumnExpr("sum(balance) as balance").
GroupExpr("account, asset")
} else {
globalQuery = globalQuery.ColumnExpr("accounts_address as account, asset, input, output, balance")
globalQuery = globalQuery.ColumnExpr("address as account, asset, input, output, balance")
}

if useMetadata {
Expand All @@ -154,7 +176,7 @@ func (s *Store) selectVolumes(oot, pit *time.Time, useInsertionDate bool, groupL

switch {
case key == "account" || key == "address":
return filterAccountAddress(value.(string), "accounts_address"), nil, nil
return filterAccountAddress(value.(string), "address"), nil, nil
case metadataRegex.Match([]byte(key)):
match := metadataRegex.FindAllStringSubmatch(key, 3)
return "metadata @> ?", []any{map[string]any{
Expand Down

0 comments on commit 36112d0

Please sign in to comment.