Skip to content

Commit

Permalink
chore: handle a todo and add some test
Browse files Browse the repository at this point in the history
  • Loading branch information
gfyrag committed Oct 23, 2024
1 parent a40f7e3 commit 58d6392
Show file tree
Hide file tree
Showing 2 changed files with 147 additions and 144 deletions.
112 changes: 62 additions & 50 deletions internal/storage/ledger/volumes.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,76 +100,71 @@ func (s *Store) selectVolumes(oot, pit *time.Time, useInsertionDate bool, groupL
}
}

ret = ret.
selectVolumes := s.db.NewSelect().
ColumnExpr("accounts_address as address").
Column("asset").
ColumnExpr("sum(case when not is_source then amount else 0 end) as input").
ColumnExpr("sum(case when is_source then amount else 0 end) as output").
ColumnExpr("sum(case when not is_source then amount else -amount end) as balance").
ModelTableExpr(s.GetPrefixedRelationName("moves"))

if needSegmentAddress {
ret = ret.
Join(
"join (?) accounts on accounts.address = moves.accounts_address",
s.db.NewSelect().
ModelTableExpr(s.GetPrefixedRelationName("accounts")).
Where("ledger = ?", s.ledger.Name).
Column("address_array", "address"),
).
Column("accounts.address_array")
}

// todo: handle with pit by using accounts_metadata
if useMetadata {
ret = ret.
Join(
"join lateral (?) accounts on true",
s.db.NewSelect().
Column("metadata").
ModelTableExpr(s.GetPrefixedRelationName("accounts")).
Where("accounts.address = moves.accounts_address"),
).
ColumnExpr("accounts.metadata as metadata").
Group("accounts.metadata")
}
ModelTableExpr(s.GetPrefixedRelationName("moves")).
GroupExpr("accounts_address, asset")

dateFilterColumn := "effective_date"
if useInsertionDate {
dateFilterColumn = "insertion_date"
}

if pit != nil && !pit.IsZero() {
ret = ret.Where(dateFilterColumn+" <= ?", pit)
selectVolumes = selectVolumes.Where(dateFilterColumn+" <= ?", pit)
}
if oot != nil && !oot.IsZero() {
ret = ret.Where(dateFilterColumn+" >= ?", oot)
selectVolumes = selectVolumes.Where(dateFilterColumn+" >= ?", oot)
}

ret = ret.GroupExpr("accounts_address, asset")
if needSegmentAddress {
ret = ret.GroupExpr("address_array")
}
ret = ret.
ModelTableExpr("(?) volumes", selectVolumes).
Column("address", "asset", "input", "output", "balance")

globalQuery := s.db.NewSelect()
globalQuery = globalQuery.
With("query", ret).
ModelTableExpr("query")
if needSegmentAddress {
selectAccount := s.db.NewSelect().
ModelTableExpr(s.GetPrefixedRelationName("accounts")).
Where("ledger = ? and address = volumes.address", s.ledger.Name).
Column("address_array")
if useMetadata && (pit == nil || pit.IsZero()) {
selectAccount = selectAccount.Column("metadata")
}

if groupLevel > 0 {
globalQuery = globalQuery.
ColumnExpr(fmt.Sprintf(`(array_to_string((string_to_array(address, ':'))[1:LEAST(array_length(string_to_array(address, ':'),1),%d)],':')) as account`, groupLevel)).
ColumnExpr("asset").
ColumnExpr("sum(input) as input").
ColumnExpr("sum(output) as output").
ColumnExpr("sum(balance) as balance").
GroupExpr("account, asset")
} else {
globalQuery = globalQuery.ColumnExpr("address as account, asset, input, output, balance")
ret = ret.
Join("join lateral (?) accounts on true", selectAccount).
Column("accounts.address_array")
if useMetadata && (pit == nil || pit.IsZero()) {
ret = ret.Column("accounts.metadata")
}
}

if useMetadata {
globalQuery = globalQuery.Column("metadata")
switch {
case needSegmentAddress && (pit == nil || pit.IsZero()):
// nothing to do, already handled earlier
case !needSegmentAddress && (pit == nil || pit.IsZero()):
selectAccount := s.db.NewSelect().
ModelTableExpr(s.GetPrefixedRelationName("accounts")).
Where("ledger = ? and address = volumes.address", s.ledger.Name).
Column("metadata")

ret = ret.
Join("join lateral (?) accounts on true", selectAccount).
Column("accounts.metadata")
case pit != nil && !pit.IsZero():
selectAccountMetadata := s.db.NewSelect().
Column("metadata").
ModelTableExpr(s.GetPrefixedRelationName("accounts_metadata")).
Where("ledger = ? and accounts_address = volumes.address and date <= ?", s.ledger.Name, pit)

ret = ret.
Join("join lateral (?) accounts_metadata on true", selectAccountMetadata).
Column("accounts_metadata.metadata")
}
}

if q != nil {
Expand All @@ -195,7 +190,24 @@ func (s *Store) selectVolumes(oot, pit *time.Time, useInsertionDate bool, groupL
if err != nil {
return ret.Err(err)
}
globalQuery = globalQuery.Where(where, args...)
ret = ret.Where(where, args...)
}

globalQuery := s.db.NewSelect()
globalQuery = globalQuery.
With("query", ret).
ModelTableExpr("query")

if groupLevel > 0 {
globalQuery = globalQuery.
ColumnExpr(fmt.Sprintf(`(array_to_string((string_to_array(address, ':'))[1:LEAST(array_length(string_to_array(address, ':'),1),%d)],':')) as account`, groupLevel)).
ColumnExpr("asset").
ColumnExpr("sum(input) as input").
ColumnExpr("sum(output) as output").
ColumnExpr("sum(balance) as balance").
GroupExpr("account, asset")
} else {
globalQuery = globalQuery.ColumnExpr("address as account, asset, input, output, balance")
}

return globalQuery
Expand Down
Loading

0 comments on commit 58d6392

Please sign in to comment.