Skip to content

Commit

Permalink
fix: error handling at storage level (#609)
Browse files Browse the repository at this point in the history
  • Loading branch information
gfyrag authored Dec 9, 2024
1 parent 471df93 commit ed8b08d
Show file tree
Hide file tree
Showing 5 changed files with 146 additions and 78 deletions.
84 changes: 59 additions & 25 deletions internal/storage/ledger/accounts.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,22 +42,26 @@ func convertOperatorToSQL(operator string) string {
panic("unreachable")
}

func (s *Store) selectBalance(date *time.Time) *bun.SelectQuery {
func (s *Store) selectBalance(date *time.Time) (*bun.SelectQuery, error) {

if date != nil && !date.IsZero() {
sortedMoves := s.SelectDistinctMovesBySeq(date).
selectDistinctMovesBySeq, err := s.SelectDistinctMovesBySeq(date)
if err != nil {
return nil, err
}
sortedMoves := selectDistinctMovesBySeq.
ColumnExpr("(post_commit_volumes).inputs - (post_commit_volumes).outputs as balance")

return s.db.NewSelect().
ModelTableExpr("(?) moves", sortedMoves).
Where("ledger = ?", s.ledger.Name).
ColumnExpr("accounts_address, asset, balance")
ColumnExpr("accounts_address, asset, balance"), nil
}

return s.db.NewSelect().
ModelTableExpr(s.GetPrefixedRelationName("accounts_volumes")).
Where("ledger = ?", s.ledger.Name).
ColumnExpr("input - output as balance")
ColumnExpr("input - output as balance"), nil
}

func (s *Store) selectDistinctAccountMetadataHistories(date *time.Time) *bun.SelectQuery {
Expand All @@ -75,7 +79,7 @@ func (s *Store) selectDistinctAccountMetadataHistories(date *time.Time) *bun.Sel
return ret
}

func (s *Store) selectAccounts(date *time.Time, expandVolumes, expandEffectiveVolumes bool, qb query.Builder) *bun.SelectQuery {
func (s *Store) selectAccounts(date *time.Time, expandVolumes, expandEffectiveVolumes bool, qb query.Builder) (*bun.SelectQuery, error) {

ret := s.db.NewSelect()

Expand Down Expand Up @@ -104,7 +108,7 @@ func (s *Store) selectAccounts(date *time.Time, expandVolumes, expandEffectiveVo

return nil
}); err != nil {
return ret.Err(fmt.Errorf("failed to check filters: %w", err))
return nil, fmt.Errorf("failed to check filters: %w", err)
}
}

Expand All @@ -131,16 +135,24 @@ func (s *Store) selectAccounts(date *time.Time, expandVolumes, expandEffectiveVo
}

if s.ledger.HasFeature(features.FeatureMovesHistory, "ON") && needVolumes {
selectAccountWithAggregatedVolumes, err := s.selectAccountWithAggregatedVolumes(date, true, "volumes")
if err != nil {
return nil, err
}
ret = ret.Join(
`left join (?) volumes on volumes.accounts_address = accounts.address`,
s.selectAccountWithAggregatedVolumes(date, true, "volumes"),
selectAccountWithAggregatedVolumes,
).Column("volumes.*")
}

if s.ledger.HasFeature(features.FeatureMovesHistoryPostCommitEffectiveVolumes, "SYNC") && expandEffectiveVolumes {
selectAccountWithAggregatedVolumes, err := s.selectAccountWithAggregatedVolumes(date, false, "effective_volumes")
if err != nil {
return nil, err
}
ret = ret.Join(
`left join (?) effective_volumes on effective_volumes.accounts_address = accounts.address`,
s.selectAccountWithAggregatedVolumes(date, false, "effective_volumes"),
selectAccountWithAggregatedVolumes,
).Column("effective_volumes.*")
}

Expand All @@ -156,20 +168,30 @@ func (s *Store) selectAccounts(date *time.Time, expandVolumes, expandEffectiveVo
match := balanceRegex.FindAllStringSubmatch(key, 2)
asset := match[0][1]

selectBalance, err := s.selectBalance(date)
if err != nil {
return "", nil, err
}

return s.db.NewSelect().
TableExpr(
"(?) balance",
s.selectBalance(date).
selectBalance.
Where("asset = ? and accounts_address = accounts.address", asset),
).
ColumnExpr(fmt.Sprintf("balance %s ?", convertOperatorToSQL(operator)), value).
String(), nil, nil

case key == "balance":
selectBalance, err := s.selectBalance(date)
if err != nil {
return "", nil, err
}

return s.db.NewSelect().
TableExpr(
"(?) balance",
s.selectBalance(date).
selectBalance.
Where("accounts_address = accounts.address"),
).
ColumnExpr(fmt.Sprintf("balance %s ?", convertOperatorToSQL(operator)), value).
Expand Down Expand Up @@ -198,7 +220,7 @@ func (s *Store) selectAccounts(date *time.Time, expandVolumes, expandEffectiveVo
panic("unreachable")
}))
if err != nil {
return ret.Err(fmt.Errorf("evaluating filters: %w", err))
return nil, fmt.Errorf("evaluating filters: %w", err)
}
if len(args) > 0 {
ret = ret.Where(where, args...)
Expand All @@ -207,10 +229,19 @@ func (s *Store) selectAccounts(date *time.Time, expandVolumes, expandEffectiveVo
}
}

return ret
return ret, nil
}

func (s *Store) ListAccounts(ctx context.Context, q ledgercontroller.ListAccountsQuery) (*Cursor[ledger.Account], error) {
selectAccounts, err := s.selectAccounts(
q.Options.Options.PIT,
q.Options.Options.ExpandVolumes,
q.Options.Options.ExpandEffectiveVolumes,
q.Options.QueryBuilder,
)
if err != nil {
return nil, err
}
return tracing.TraceWithMetric(
ctx,
"ListAccounts",
Expand All @@ -219,12 +250,7 @@ func (s *Store) ListAccounts(ctx context.Context, q ledgercontroller.ListAccount
func(ctx context.Context) (*Cursor[ledger.Account], error) {
ret, err := UsingOffset[ledgercontroller.PaginatedQueryOptions[ledgercontroller.PITFilterWithVolumes], ledger.Account](
ctx,
s.selectAccounts(
q.Options.Options.PIT,
q.Options.Options.ExpandVolumes,
q.Options.Options.ExpandEffectiveVolumes,
q.Options.QueryBuilder,
),
selectAccounts,
OffsetPaginatedQuery[ledgercontroller.PaginatedQueryOptions[ledgercontroller.PITFilterWithVolumes]](q),
)

Expand All @@ -245,7 +271,11 @@ func (s *Store) GetAccount(ctx context.Context, q ledgercontroller.GetAccountQue
s.getAccountHistogram,
func(ctx context.Context) (*ledger.Account, error) {
ret := &ledger.Account{}
if err := s.selectAccounts(q.PIT, q.ExpandVolumes, q.ExpandEffectiveVolumes, nil).
selectAccounts, err := s.selectAccounts(q.PIT, q.ExpandVolumes, q.ExpandEffectiveVolumes, nil)
if err != nil {
return nil, err
}
if err := selectAccounts.
Model(ret).
Where("accounts.address = ?", q.Addr).
Limit(1).
Expand All @@ -265,13 +295,17 @@ func (s *Store) CountAccounts(ctx context.Context, q ledgercontroller.ListAccoun
s.tracer,
s.countAccountsHistogram,
func(ctx context.Context) (int, error) {
selectAccounts, err := s.selectAccounts(
q.Options.Options.PIT,
q.Options.Options.ExpandVolumes,
q.Options.Options.ExpandEffectiveVolumes,
q.Options.QueryBuilder,
)
if err != nil {
return 0, err
}
return s.db.NewSelect().
TableExpr("(?) data", s.selectAccounts(
q.Options.Options.PIT,
q.Options.Options.ExpandVolumes,
q.Options.Options.ExpandEffectiveVolumes,
q.Options.QueryBuilder,
)).
TableExpr("(?) data", selectAccounts).
Count(ctx)
},
)
Expand Down
46 changes: 30 additions & 16 deletions internal/storage/ledger/balances.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,8 @@ import (
"github.com/uptrace/bun"
)

func (s *Store) selectAccountWithAssetAndVolumes(date *time.Time, useInsertionDate bool, builder query.Builder) *bun.SelectQuery {
func (s *Store) selectAccountWithAssetAndVolumes(date *time.Time, useInsertionDate bool, builder query.Builder) (*bun.SelectQuery, error) {

ret := s.db.NewSelect()
var (
needMetadata bool
needAddressSegment bool
Expand Down Expand Up @@ -53,27 +52,31 @@ func (s *Store) selectAccountWithAssetAndVolumes(date *time.Time, useInsertionDa
}
return nil
}); err != nil {
return ret.Err(err)
return nil, err
}
}

if needAddressSegment && !s.ledger.HasFeature(features.FeatureIndexAddressSegments, "ON") {
return ret.Err(ledgercontroller.NewErrMissingFeature(features.FeatureIndexAddressSegments))
return nil, ledgercontroller.NewErrMissingFeature(features.FeatureIndexAddressSegments)
}

var selectAccountsWithVolumes *bun.SelectQuery
if date != nil && !date.IsZero() {
if useInsertionDate {
if !s.ledger.HasFeature(features.FeatureMovesHistory, "ON") {
return ret.Err(ledgercontroller.NewErrMissingFeature(features.FeatureMovesHistory))
return nil, ledgercontroller.NewErrMissingFeature(features.FeatureMovesHistory)
}
selectDistinctMovesBySeq, err := s.SelectDistinctMovesBySeq(date)
if err != nil {
return nil, err
}
selectAccountsWithVolumes = s.db.NewSelect().
TableExpr("(?) moves", s.SelectDistinctMovesBySeq(date)).
TableExpr("(?) moves", selectDistinctMovesBySeq).
Column("asset", "accounts_address").
ColumnExpr("post_commit_volumes as volumes")
} else {
if !s.ledger.HasFeature(features.FeatureMovesHistoryPostCommitEffectiveVolumes, "SYNC") {
return ret.Err(ledgercontroller.NewErrMissingFeature(features.FeatureMovesHistoryPostCommitEffectiveVolumes))
return nil, ledgercontroller.NewErrMissingFeature(features.FeatureMovesHistoryPostCommitEffectiveVolumes)
}
selectAccountsWithVolumes = s.db.NewSelect().
TableExpr("(?) moves", s.SelectDistinctMovesByEffectiveDate(date)).
Expand Down Expand Up @@ -143,26 +146,32 @@ func (s *Store) selectAccountWithAssetAndVolumes(date *time.Time, useInsertionDa
}
}))
if err != nil {
return ret.Err(fmt.Errorf("building where clause: %w", err))
return nil, fmt.Errorf("building where clause: %w", err)
}
finalQuery = finalQuery.Where(where, args...)
}

return finalQuery
return finalQuery, nil
}

func (s *Store) selectAccountWithAggregatedVolumes(date *time.Time, useInsertionDate bool, alias string) *bun.SelectQuery {
selectAccountWithAssetAndVolumes := s.selectAccountWithAssetAndVolumes(date, useInsertionDate, nil)
func (s *Store) selectAccountWithAggregatedVolumes(date *time.Time, useInsertionDate bool, alias string) (*bun.SelectQuery, error) {
selectAccountWithAssetAndVolumes, err := s.selectAccountWithAssetAndVolumes(date, useInsertionDate, nil)
if err != nil {
return nil, err
}
return s.db.NewSelect().
TableExpr("(?) values", selectAccountWithAssetAndVolumes).
Group("accounts_address").
Column("accounts_address").
ColumnExpr("public.aggregate_objects(json_build_object(asset, json_build_object('input', (volumes).inputs, 'output', (volumes).outputs))::jsonb) as " + alias)
ColumnExpr("public.aggregate_objects(json_build_object(asset, json_build_object('input', (volumes).inputs, 'output', (volumes).outputs))::jsonb) as " + alias), nil
}

func (s *Store) SelectAggregatedBalances(date *time.Time, useInsertionDate bool, builder query.Builder) *bun.SelectQuery {
func (s *Store) SelectAggregatedBalances(date *time.Time, useInsertionDate bool, builder query.Builder) (*bun.SelectQuery, error) {

selectAccountsWithVolumes := s.selectAccountWithAssetAndVolumes(date, useInsertionDate, builder)
selectAccountsWithVolumes, err := s.selectAccountWithAssetAndVolumes(date, useInsertionDate, builder)
if err != nil {
return nil, err
}
sumVolumesForAsset := s.db.NewSelect().
TableExpr("(?) values", selectAccountsWithVolumes).
Group("asset").
Expand All @@ -171,17 +180,22 @@ func (s *Store) SelectAggregatedBalances(date *time.Time, useInsertionDate bool,

return s.db.NewSelect().
TableExpr("(?) values", sumVolumesForAsset).
ColumnExpr("aggregate_objects(json_build_object(asset, volumes)::jsonb) as aggregated")
ColumnExpr("aggregate_objects(json_build_object(asset, volumes)::jsonb) as aggregated"), nil
}

func (s *Store) GetAggregatedBalances(ctx context.Context, q ledgercontroller.GetAggregatedBalanceQuery) (ledger.BalancesByAssets, error) {
type AggregatedVolumes struct {
Aggregated ledger.VolumesByAssets `bun:"aggregated,type:jsonb"`
}

selectAggregatedBalances, err := s.SelectAggregatedBalances(q.PIT, q.UseInsertionDate, q.QueryBuilder)
if err != nil {
return nil, err
}

aggregatedVolumes := AggregatedVolumes{}
if err := s.db.NewSelect().
ModelTableExpr("(?) aggregated_volumes", s.SelectAggregatedBalances(q.PIT, q.UseInsertionDate, q.QueryBuilder)).
ModelTableExpr("(?) aggregated_volumes", selectAggregatedBalances).
Model(&aggregatedVolumes).
Scan(ctx); err != nil {
return nil, err
Expand Down
16 changes: 10 additions & 6 deletions internal/storage/ledger/moves.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,11 @@ import (
"github.com/uptrace/bun"
)

func (s *Store) SortMovesBySeq(date *time.Time) *bun.SelectQuery {
func (s *Store) SortMovesBySeq(date *time.Time) (*bun.SelectQuery, error) {

ret := s.db.NewSelect()
if !s.ledger.HasFeature(features.FeatureMovesHistory, "ON") {
return ret.Err(ledgercontroller.NewErrMissingFeature(features.FeatureMovesHistory))
return nil, ledgercontroller.NewErrMissingFeature(features.FeatureMovesHistory)
}

ret = ret.
Expand All @@ -28,12 +28,16 @@ func (s *Store) SortMovesBySeq(date *time.Time) *bun.SelectQuery {
ret = ret.Where("insertion_date <= ?", date)
}

return ret
return ret, nil
}

func (s *Store) SelectDistinctMovesBySeq(date *time.Time) *bun.SelectQuery {
func (s *Store) SelectDistinctMovesBySeq(date *time.Time) (*bun.SelectQuery, error) {
sortMovesBySeq, err := s.SortMovesBySeq(date)
if err != nil {
return nil, err
}
ret := s.db.NewSelect().
TableExpr("(?) moves", s.SortMovesBySeq(date)).
TableExpr("(?) moves", sortMovesBySeq).
DistinctOn("accounts_address, asset").
Column("accounts_address", "asset").
ColumnExpr("first_value(post_commit_volumes) over (partition by (accounts_address, asset) order by seq desc) as post_commit_volumes").
Expand All @@ -43,7 +47,7 @@ func (s *Store) SelectDistinctMovesBySeq(date *time.Time) *bun.SelectQuery {
ret = ret.Where("insertion_date <= ?", date)
}

return ret
return ret, nil
}

func (s *Store) SelectDistinctMovesByEffectiveDate(date *time.Time) *bun.SelectQuery {
Expand Down
Loading

0 comments on commit ed8b08d

Please sign in to comment.