Skip to content

Commit

Permalink
chore: simplify storage
Browse files Browse the repository at this point in the history
  • Loading branch information
gfyrag committed Oct 16, 2024
1 parent d6539e4 commit d2d9cd6
Show file tree
Hide file tree
Showing 5 changed files with 21 additions and 38 deletions.
28 changes: 14 additions & 14 deletions internal/storage/ledger/accounts.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ type Account struct {
UpdatedAt time.Time `bun:"updated_at"`
FirstUsage time.Time `bun:"first_usage"`

PostCommitVolumes AggregatedAccountVolumes `bun:"pcv,scanonly"`
PostCommitVolumes ledger.VolumesByAssets `bun:"pcv,scanonly"`
PostCommitEffectiveVolumes AggregatedAccountVolumes `bun:"pcev,scanonly"`
}

Expand All @@ -76,7 +76,7 @@ func (account Account) toCore() ledger.Account {
FirstUsage: account.FirstUsage,
InsertionDate: account.InsertionDate,
UpdatedAt: account.UpdatedAt,
Volumes: account.PostCommitVolumes.toCore(),
Volumes: account.PostCommitVolumes,
EffectiveVolumes: account.PostCommitEffectiveVolumes.toCore(),
}
}
Expand Down Expand Up @@ -126,6 +126,7 @@ func (s *Store) selectAccounts(date *time.Time, expandVolumes, expandEffectiveVo

ret := s.db.NewSelect()

// todo: rename to volumes, pcv is ok in transactions context
needPCV := expandVolumes
if qb != nil {
// analyze filters to check for errors and find potentially additional table to load
Expand Down Expand Up @@ -176,19 +177,17 @@ func (s *Store) selectAccounts(date *time.Time, expandVolumes, expandEffectiveVo
ret = ret.ColumnExpr("accounts.metadata")
}

// todo: should join on histories only if pit is specified
// otherwise the accounts_volumes table is enough
if s.ledger.HasFeature(ledger.FeatureMovesHistory, "ON") && needPCV {
ret = ret.
Join(
`left join (?) pcv on pcv.accounts_address = accounts.address`,
s.db.NewSelect().
TableExpr("(?) v", s.SelectDistinctMovesBySeq(date)).
Column("accounts_address").
ColumnExpr(`to_json(array_agg(json_build_object('asset', v.asset, 'input', (v.post_commit_volumes->>'input')::numeric, 'output', (v.post_commit_volumes->>'output')::numeric))) as pcv`).
Group("accounts_address"),
).
ColumnExpr("pcv.*")
selectAccountWithAssetAndVolumes := s.selectAccountWithAssetAndVolumes(date, true, nil)
selectAccountWithVolumes := s.db.NewSelect().
TableExpr("(?) values", selectAccountWithAssetAndVolumes).
Group("accounts_address").
Column("accounts_address").
ColumnExpr("aggregate_objects(json_build_object(asset, volumes)::jsonb) as pcv")
ret = ret.Join(
`left join (?) pcv on pcv.accounts_address = accounts.address`,
selectAccountWithVolumes,
).Column("pcv.*")
}

if s.ledger.HasFeature(ledger.FeatureMovesHistoryPostCommitEffectiveVolumes, "SYNC") && expandEffectiveVolumes {
Expand Down Expand Up @@ -217,6 +216,7 @@ func (s *Store) selectAccounts(date *time.Time, expandVolumes, expandEffectiveVo

// todo: use moves only if feature is enabled
return s.db.NewSelect().
// todo: use already loaded pcv
TableExpr(
"(?) balance",
s.selectBalance(date).
Expand Down
8 changes: 2 additions & 6 deletions internal/storage/ledger/accounts_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,6 @@ func TestGetAccount(t *testing.T) {
},
FirstUsage: now.Add(-time.Minute),
EffectiveVolumes: map[string]ledger.Volumes{},
Volumes: map[string]ledger.Volumes{},
}, *account)

account, err = store.GetAccount(ctx, ledgercontroller.NewGetAccountQuery("world"))
Expand All @@ -280,8 +279,7 @@ func TestGetAccount(t *testing.T) {
Address: "world",
Metadata: metadata.Metadata{},
FirstUsage: now.Add(-time.Minute),
EffectiveVolumes: map[string]ledger.Volumes{},
Volumes: map[string]ledger.Volumes{},
EffectiveVolumes: ledger.VolumesByAssets{},
}, *account)
})

Expand All @@ -294,8 +292,7 @@ func TestGetAccount(t *testing.T) {
Address: "multi",
Metadata: metadata.Metadata{},
FirstUsage: now.Add(-time.Minute),
Volumes: map[string]ledger.Volumes{},
EffectiveVolumes: map[string]ledger.Volumes{},
EffectiveVolumes: ledger.VolumesByAssets{},
}, *account)
})

Expand Down Expand Up @@ -331,7 +328,6 @@ func TestGetAccount(t *testing.T) {
EffectiveVolumes: ledger.VolumesByAssets{
"USD/2": ledger.NewVolumesInt64(100, 0),
},
Volumes: map[string]ledger.Volumes{},
}, *account)
})

Expand Down
4 changes: 2 additions & 2 deletions internal/storage/ledger/balances.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
"github.com/uptrace/bun"
)

func (s *Store) selectAccountWithVolumes(date *time.Time, useInsertionDate bool, builder query.Builder) *bun.SelectQuery {
func (s *Store) selectAccountWithAssetAndVolumes(date *time.Time, useInsertionDate bool, builder query.Builder) *bun.SelectQuery {

ret := s.db.NewSelect()
var (
Expand Down Expand Up @@ -156,7 +156,7 @@ func (s *Store) selectAccountWithVolumes(date *time.Time, useInsertionDate bool,

func (s *Store) SelectAggregatedBalances(date *time.Time, useInsertionDate bool, builder query.Builder) *bun.SelectQuery {

selectAccountsWithVolumes := s.selectAccountWithVolumes(date, useInsertionDate, builder)
selectAccountsWithVolumes := s.selectAccountWithAssetAndVolumes(date, useInsertionDate, builder)
sumVolumesForAsset := s.db.NewSelect().
TableExpr("(?) values", selectAccountsWithVolumes).
Group("asset").
Expand Down
3 changes: 2 additions & 1 deletion internal/storage/ledger/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func (s *Store) validateAddressFilter(operator string, value any) error {
}

// dev util
func (s *Store) dumpTables(ctx context.Context, tables ... string) {
func (s *Store) dumpTables(ctx context.Context, tables ...string) {
for _, table := range tables {
s.dumpQuery(
ctx,
Expand All @@ -99,6 +99,7 @@ func (s *Store) dumpTables(ctx context.Context, tables ... string) {
}

func (s *Store) dumpQuery(ctx context.Context, query *bun.SelectQuery) {
fmt.Println(query)
rows, err := query.Rows(ctx)
if err != nil {
panic(err)
Expand Down
16 changes: 1 addition & 15 deletions internal/storage/ledger/transactions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,8 @@ import (
"testing"

"github.com/formancehq/go-libs/platform/postgres"
ledgercontroller "github.com/formancehq/ledger/internal/controller/ledger"
"github.com/shomali11/xsql"

"github.com/formancehq/go-libs/time"
ledgercontroller "github.com/formancehq/ledger/internal/controller/ledger"

"github.com/pkg/errors"

Expand Down Expand Up @@ -147,18 +145,6 @@ func TestTransactionUpdateMetadata(t *testing.T) {
_, _, err = store.UpdateTransactionMetadata(ctx, tx2.ID, metadata.Metadata{"foo2": "bar2"})
require.NoError(t, err)

rows, err := store.GetDB().NewSelect().ModelTableExpr(store.GetPrefixedRelationName("transactions")).Rows(ctx)
require.NoError(t, err)

data, _ := xsql.Pretty(rows)
fmt.Println(data)

rows, err = store.GetDB().NewSelect().ModelTableExpr(store.GetPrefixedRelationName("moves")).Rows(ctx)
require.NoError(t, err)

data, _ = xsql.Pretty(rows)
fmt.Println(data)

// check that the database returns metadata
tx, err := store.GetTransaction(ctx, ledgercontroller.NewGetTransactionQuery(tx1.ID).WithExpandVolumes().WithExpandEffectiveVolumes())
require.NoError(t, err, "getting transaction should not fail")
Expand Down

0 comments on commit d2d9cd6

Please sign in to comment.