Skip to content

Commit

Permalink
chore: backport formancehq/stack#1730
Browse files Browse the repository at this point in the history
  • Loading branch information
gfyrag committed Oct 16, 2024
1 parent 60045a5 commit 053e1ab
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 26 deletions.
55 changes: 34 additions & 21 deletions internal/storage/ledgerstore/volumes.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,39 +99,60 @@ func (store *Store) buildVolumesWithBalancesQuery(query *bun.SelectQuery, q GetV
dateFilterColumn = "insertion_date"
}

query = query.
selectAccounts := store.GetDB().NewSelect().
Column("account_address_array").
Column("account_address").
Column("accounts_seq").
Column("asset").
Column("ledger").
ColumnExpr("sum(case when not is_source then amount else 0 end) as input").
ColumnExpr("sum(case when is_source then amount else 0 end) as output").
ColumnExpr("sum(case when not is_source then amount else -amount end) as balance").
ModelTableExpr("moves")
Table("moves").
Group("ledger", "accounts_seq", "account_address", "account_address_array", "asset").
Apply(filterPIT(filtersForVolumes.PIT, dateFilterColumn)).
Apply(filterOOT(filtersForVolumes.OOT, dateFilterColumn))

query = query.
TableExpr("(?) accountsWithVolumes", selectAccounts).
Column(
"account_address",
"account_address_array",
"accounts_seq",
"ledger",
"asset",
"input",
"output",
"balance",
)

if useMetadata {
query = query.ColumnExpr("accounts.metadata as metadata").
query = query.
ColumnExpr("accounts_metadata.metadata as metadata").
Join(`join lateral (
select metadata
from accounts a
where a.seq = moves.accounts_seq
) accounts on true`).Group("metadata")
select metadata
from accounts a
where a.seq = accountsWithVolumes.accounts_seq
) accounts_metadata on true`,
)
}

query = query.
Where("ledger = ?", store.name).
Apply(filterPIT(filtersForVolumes.PIT, dateFilterColumn)).
Apply(filterOOT(filtersForVolumes.OOT, dateFilterColumn)).
GroupExpr("account_address, account_address_array, asset")
Where("ledger = ?", store.name)

globalQuery := query.NewSelect()
globalQuery = globalQuery.
With("query", query).
ModelTableExpr("query")
TableExpr("query")

if where != "" {
globalQuery.Where(where, args...)
}

if filtersForVolumes.GroupLvl > 0 {
globalQuery = globalQuery.
ColumnExpr(fmt.Sprintf(`(array_to_string((string_to_array(account_address, ':'))[1:LEAST(array_length(string_to_array(account_address, ':'),1),%d)],':')) as account`, filtersForVolumes.GroupLvl)).
ColumnExpr("asset").
Column("asset").
ColumnExpr("sum(input) as input").
ColumnExpr("sum(output) as output").
ColumnExpr("sum(balance) as balance").
Expand All @@ -140,14 +161,6 @@ func (store *Store) buildVolumesWithBalancesQuery(query *bun.SelectQuery, q GetV
globalQuery = globalQuery.ColumnExpr("account_address as account, asset, input, output, balance")
}

if useMetadata {
globalQuery = globalQuery.Column("metadata")
}

if where != "" {
globalQuery.Where(where, args...)
}

return globalQuery
}

Expand Down
26 changes: 21 additions & 5 deletions internal/storage/ledgerstore/volumes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -397,9 +397,7 @@ func TestGetVolumesWithBalances(t *testing.T) {

require.NoError(t, err)
require.Len(t, volumes.Data, 1)

})

}

func TestAggGetVolumesWithBalances(t *testing.T) {
Expand All @@ -408,11 +406,8 @@ func TestAggGetVolumesWithBalances(t *testing.T) {
now := time.Now()
ctx := logging.TestingContext()

// previousPIT := now.Add(-2 * time.Minute)
futurPIT := now.Add(2 * time.Minute)

previousOOT := now.Add(-2 * time.Minute)
// futurOOT := now.Add(2 * time.Minute)

require.NoError(t, store.InsertLogs(ctx,
ledger.ChainLogs(
Expand Down Expand Up @@ -470,6 +465,10 @@ func TestAggGetVolumesWithBalances(t *testing.T) {
WithIDUint64(7),
map[string]metadata.Metadata{},
).WithDate(now),

ledger.NewSetMetadataOnAccountLog(time.Now(), "account:1:1", metadata.Metadata{
"foo": "bar",
}),
)...,
))

Expand Down Expand Up @@ -631,4 +630,21 @@ func TestAggGetVolumesWithBalances(t *testing.T) {
})
})

t.Run("filter using account matching, metadata, and group", func(t *testing.T) {
t.Parallel()

volumes, err := store.GetVolumesWithBalances(ctx,
NewGetVolumesWithBalancesQuery(
NewPaginatedQueryOptions(
FiltersForVolumes{
GroupLvl: 1,
}).WithQueryBuilder(query.And(
query.Match("account", "account::"),
query.Match("metadata[foo]", "bar"),
))),
)

require.NoError(t, err)
require.Len(t, volumes.Data, 1)
})
}

0 comments on commit 053e1ab

Please sign in to comment.