diff --git a/internal/storage/ledger/accounts.go b/internal/storage/ledger/accounts.go index 01d2d844e..6fa8545bd 100644 --- a/internal/storage/ledger/accounts.go +++ b/internal/storage/ledger/accounts.go @@ -42,22 +42,26 @@ func convertOperatorToSQL(operator string) string { panic("unreachable") } -func (s *Store) selectBalance(date *time.Time) *bun.SelectQuery { +func (s *Store) selectBalance(date *time.Time) (*bun.SelectQuery, error) { if date != nil && !date.IsZero() { - sortedMoves := s.SelectDistinctMovesBySeq(date). + selectDistinctMovesBySeq, err := s.SelectDistinctMovesBySeq(date) + if err != nil { + return nil, err + } + sortedMoves := selectDistinctMovesBySeq. ColumnExpr("(post_commit_volumes).inputs - (post_commit_volumes).outputs as balance") return s.db.NewSelect(). ModelTableExpr("(?) moves", sortedMoves). Where("ledger = ?", s.ledger.Name). - ColumnExpr("accounts_address, asset, balance") + ColumnExpr("accounts_address, asset, balance"), nil } return s.db.NewSelect(). ModelTableExpr(s.GetPrefixedRelationName("accounts_volumes")). Where("ledger = ?", s.ledger.Name). - ColumnExpr("input - output as balance") + ColumnExpr("input - output as balance"), nil } func (s *Store) selectDistinctAccountMetadataHistories(date *time.Time) *bun.SelectQuery { @@ -75,7 +79,7 @@ func (s *Store) selectDistinctAccountMetadataHistories(date *time.Time) *bun.Sel return ret } -func (s *Store) selectAccounts(date *time.Time, expandVolumes, expandEffectiveVolumes bool, qb query.Builder) *bun.SelectQuery { +func (s *Store) selectAccounts(date *time.Time, expandVolumes, expandEffectiveVolumes bool, qb query.Builder) (*bun.SelectQuery, error) { ret := s.db.NewSelect() @@ -104,7 +108,7 @@ func (s *Store) selectAccounts(date *time.Time, expandVolumes, expandEffectiveVo return nil }); err != nil { - return ret.Err(fmt.Errorf("failed to check filters: %w", err)) + return nil, fmt.Errorf("failed to check filters: %w", err) } } @@ -131,16 +135,24 @@ func (s *Store) selectAccounts(date *time.Time, expandVolumes, expandEffectiveVo } if s.ledger.HasFeature(features.FeatureMovesHistory, "ON") && needVolumes { + selectAccountWithAggregatedVolumes, err := s.selectAccountWithAggregatedVolumes(date, true, "volumes") + if err != nil { + return nil, err + } ret = ret.Join( `left join (?) volumes on volumes.accounts_address = accounts.address`, - s.selectAccountWithAggregatedVolumes(date, true, "volumes"), + selectAccountWithAggregatedVolumes, ).Column("volumes.*") } if s.ledger.HasFeature(features.FeatureMovesHistoryPostCommitEffectiveVolumes, "SYNC") && expandEffectiveVolumes { + selectAccountWithAggregatedVolumes, err := s.selectAccountWithAggregatedVolumes(date, false, "effective_volumes") + if err != nil { + return nil, err + } ret = ret.Join( `left join (?) effective_volumes on effective_volumes.accounts_address = accounts.address`, - s.selectAccountWithAggregatedVolumes(date, false, "effective_volumes"), + selectAccountWithAggregatedVolumes, ).Column("effective_volumes.*") } @@ -156,20 +168,30 @@ func (s *Store) selectAccounts(date *time.Time, expandVolumes, expandEffectiveVo match := balanceRegex.FindAllStringSubmatch(key, 2) asset := match[0][1] + selectBalance, err := s.selectBalance(date) + if err != nil { + return "", nil, err + } + return s.db.NewSelect(). TableExpr( "(?) balance", - s.selectBalance(date). + selectBalance. Where("asset = ? and accounts_address = accounts.address", asset), ). ColumnExpr(fmt.Sprintf("balance %s ?", convertOperatorToSQL(operator)), value). String(), nil, nil case key == "balance": + selectBalance, err := s.selectBalance(date) + if err != nil { + return "", nil, err + } + return s.db.NewSelect(). TableExpr( "(?) balance", - s.selectBalance(date). + selectBalance. Where("accounts_address = accounts.address"), ). ColumnExpr(fmt.Sprintf("balance %s ?", convertOperatorToSQL(operator)), value). @@ -198,7 +220,7 @@ func (s *Store) selectAccounts(date *time.Time, expandVolumes, expandEffectiveVo panic("unreachable") })) if err != nil { - return ret.Err(fmt.Errorf("evaluating filters: %w", err)) + return nil, fmt.Errorf("evaluating filters: %w", err) } if len(args) > 0 { ret = ret.Where(where, args...) @@ -207,10 +229,19 @@ func (s *Store) selectAccounts(date *time.Time, expandVolumes, expandEffectiveVo } } - return ret + return ret, nil } func (s *Store) ListAccounts(ctx context.Context, q ledgercontroller.ListAccountsQuery) (*Cursor[ledger.Account], error) { + selectAccounts, err := s.selectAccounts( + q.Options.Options.PIT, + q.Options.Options.ExpandVolumes, + q.Options.Options.ExpandEffectiveVolumes, + q.Options.QueryBuilder, + ) + if err != nil { + return nil, err + } return tracing.TraceWithMetric( ctx, "ListAccounts", @@ -219,12 +250,7 @@ func (s *Store) ListAccounts(ctx context.Context, q ledgercontroller.ListAccount func(ctx context.Context) (*Cursor[ledger.Account], error) { ret, err := UsingOffset[ledgercontroller.PaginatedQueryOptions[ledgercontroller.PITFilterWithVolumes], ledger.Account]( ctx, - s.selectAccounts( - q.Options.Options.PIT, - q.Options.Options.ExpandVolumes, - q.Options.Options.ExpandEffectiveVolumes, - q.Options.QueryBuilder, - ), + selectAccounts, OffsetPaginatedQuery[ledgercontroller.PaginatedQueryOptions[ledgercontroller.PITFilterWithVolumes]](q), ) @@ -245,7 +271,11 @@ func (s *Store) GetAccount(ctx context.Context, q ledgercontroller.GetAccountQue s.getAccountHistogram, func(ctx context.Context) (*ledger.Account, error) { ret := &ledger.Account{} - if err := s.selectAccounts(q.PIT, q.ExpandVolumes, q.ExpandEffectiveVolumes, nil). + selectAccounts, err := s.selectAccounts(q.PIT, q.ExpandVolumes, q.ExpandEffectiveVolumes, nil) + if err != nil { + return nil, err + } + if err := selectAccounts. Model(ret). Where("accounts.address = ?", q.Addr). Limit(1). @@ -265,13 +295,17 @@ func (s *Store) CountAccounts(ctx context.Context, q ledgercontroller.ListAccoun s.tracer, s.countAccountsHistogram, func(ctx context.Context) (int, error) { + selectAccounts, err := s.selectAccounts( + q.Options.Options.PIT, + q.Options.Options.ExpandVolumes, + q.Options.Options.ExpandEffectiveVolumes, + q.Options.QueryBuilder, + ) + if err != nil { + return 0, err + } return s.db.NewSelect(). - TableExpr("(?) data", s.selectAccounts( - q.Options.Options.PIT, - q.Options.Options.ExpandVolumes, - q.Options.Options.ExpandEffectiveVolumes, - q.Options.QueryBuilder, - )). + TableExpr("(?) data", selectAccounts). Count(ctx) }, ) diff --git a/internal/storage/ledger/balances.go b/internal/storage/ledger/balances.go index 1ddbf6eee..7bf95e7d8 100644 --- a/internal/storage/ledger/balances.go +++ b/internal/storage/ledger/balances.go @@ -18,9 +18,8 @@ import ( "github.com/uptrace/bun" ) -func (s *Store) selectAccountWithAssetAndVolumes(date *time.Time, useInsertionDate bool, builder query.Builder) *bun.SelectQuery { +func (s *Store) selectAccountWithAssetAndVolumes(date *time.Time, useInsertionDate bool, builder query.Builder) (*bun.SelectQuery, error) { - ret := s.db.NewSelect() var ( needMetadata bool needAddressSegment bool @@ -53,27 +52,31 @@ func (s *Store) selectAccountWithAssetAndVolumes(date *time.Time, useInsertionDa } return nil }); err != nil { - return ret.Err(err) + return nil, err } } if needAddressSegment && !s.ledger.HasFeature(features.FeatureIndexAddressSegments, "ON") { - return ret.Err(ledgercontroller.NewErrMissingFeature(features.FeatureIndexAddressSegments)) + return nil, ledgercontroller.NewErrMissingFeature(features.FeatureIndexAddressSegments) } var selectAccountsWithVolumes *bun.SelectQuery if date != nil && !date.IsZero() { if useInsertionDate { if !s.ledger.HasFeature(features.FeatureMovesHistory, "ON") { - return ret.Err(ledgercontroller.NewErrMissingFeature(features.FeatureMovesHistory)) + return nil, ledgercontroller.NewErrMissingFeature(features.FeatureMovesHistory) + } + selectDistinctMovesBySeq, err := s.SelectDistinctMovesBySeq(date) + if err != nil { + return nil, err } selectAccountsWithVolumes = s.db.NewSelect(). - TableExpr("(?) moves", s.SelectDistinctMovesBySeq(date)). + TableExpr("(?) moves", selectDistinctMovesBySeq). Column("asset", "accounts_address"). ColumnExpr("post_commit_volumes as volumes") } else { if !s.ledger.HasFeature(features.FeatureMovesHistoryPostCommitEffectiveVolumes, "SYNC") { - return ret.Err(ledgercontroller.NewErrMissingFeature(features.FeatureMovesHistoryPostCommitEffectiveVolumes)) + return nil, ledgercontroller.NewErrMissingFeature(features.FeatureMovesHistoryPostCommitEffectiveVolumes) } selectAccountsWithVolumes = s.db.NewSelect(). TableExpr("(?) moves", s.SelectDistinctMovesByEffectiveDate(date)). @@ -143,26 +146,32 @@ func (s *Store) selectAccountWithAssetAndVolumes(date *time.Time, useInsertionDa } })) if err != nil { - return ret.Err(fmt.Errorf("building where clause: %w", err)) + return nil, fmt.Errorf("building where clause: %w", err) } finalQuery = finalQuery.Where(where, args...) } - return finalQuery + return finalQuery, nil } -func (s *Store) selectAccountWithAggregatedVolumes(date *time.Time, useInsertionDate bool, alias string) *bun.SelectQuery { - selectAccountWithAssetAndVolumes := s.selectAccountWithAssetAndVolumes(date, useInsertionDate, nil) +func (s *Store) selectAccountWithAggregatedVolumes(date *time.Time, useInsertionDate bool, alias string) (*bun.SelectQuery, error) { + selectAccountWithAssetAndVolumes, err := s.selectAccountWithAssetAndVolumes(date, useInsertionDate, nil) + if err != nil { + return nil, err + } return s.db.NewSelect(). TableExpr("(?) values", selectAccountWithAssetAndVolumes). Group("accounts_address"). Column("accounts_address"). - ColumnExpr("public.aggregate_objects(json_build_object(asset, json_build_object('input', (volumes).inputs, 'output', (volumes).outputs))::jsonb) as " + alias) + ColumnExpr("public.aggregate_objects(json_build_object(asset, json_build_object('input', (volumes).inputs, 'output', (volumes).outputs))::jsonb) as " + alias), nil } -func (s *Store) SelectAggregatedBalances(date *time.Time, useInsertionDate bool, builder query.Builder) *bun.SelectQuery { +func (s *Store) SelectAggregatedBalances(date *time.Time, useInsertionDate bool, builder query.Builder) (*bun.SelectQuery, error) { - selectAccountsWithVolumes := s.selectAccountWithAssetAndVolumes(date, useInsertionDate, builder) + selectAccountsWithVolumes, err := s.selectAccountWithAssetAndVolumes(date, useInsertionDate, builder) + if err != nil { + return nil, err + } sumVolumesForAsset := s.db.NewSelect(). TableExpr("(?) values", selectAccountsWithVolumes). Group("asset"). @@ -171,7 +180,7 @@ func (s *Store) SelectAggregatedBalances(date *time.Time, useInsertionDate bool, return s.db.NewSelect(). TableExpr("(?) values", sumVolumesForAsset). - ColumnExpr("aggregate_objects(json_build_object(asset, volumes)::jsonb) as aggregated") + ColumnExpr("aggregate_objects(json_build_object(asset, volumes)::jsonb) as aggregated"), nil } func (s *Store) GetAggregatedBalances(ctx context.Context, q ledgercontroller.GetAggregatedBalanceQuery) (ledger.BalancesByAssets, error) { @@ -179,9 +188,14 @@ func (s *Store) GetAggregatedBalances(ctx context.Context, q ledgercontroller.Ge Aggregated ledger.VolumesByAssets `bun:"aggregated,type:jsonb"` } + selectAggregatedBalances, err := s.SelectAggregatedBalances(q.PIT, q.UseInsertionDate, q.QueryBuilder) + if err != nil { + return nil, err + } + aggregatedVolumes := AggregatedVolumes{} if err := s.db.NewSelect(). - ModelTableExpr("(?) aggregated_volumes", s.SelectAggregatedBalances(q.PIT, q.UseInsertionDate, q.QueryBuilder)). + ModelTableExpr("(?) aggregated_volumes", selectAggregatedBalances). Model(&aggregatedVolumes). Scan(ctx); err != nil { return nil, err diff --git a/internal/storage/ledger/moves.go b/internal/storage/ledger/moves.go index 2c32228c2..6f5e55343 100644 --- a/internal/storage/ledger/moves.go +++ b/internal/storage/ledger/moves.go @@ -12,11 +12,11 @@ import ( "github.com/uptrace/bun" ) -func (s *Store) SortMovesBySeq(date *time.Time) *bun.SelectQuery { +func (s *Store) SortMovesBySeq(date *time.Time) (*bun.SelectQuery, error) { ret := s.db.NewSelect() if !s.ledger.HasFeature(features.FeatureMovesHistory, "ON") { - return ret.Err(ledgercontroller.NewErrMissingFeature(features.FeatureMovesHistory)) + return nil, ledgercontroller.NewErrMissingFeature(features.FeatureMovesHistory) } ret = ret. @@ -28,12 +28,16 @@ func (s *Store) SortMovesBySeq(date *time.Time) *bun.SelectQuery { ret = ret.Where("insertion_date <= ?", date) } - return ret + return ret, nil } -func (s *Store) SelectDistinctMovesBySeq(date *time.Time) *bun.SelectQuery { +func (s *Store) SelectDistinctMovesBySeq(date *time.Time) (*bun.SelectQuery, error) { + sortMovesBySeq, err := s.SortMovesBySeq(date) + if err != nil { + return nil, err + } ret := s.db.NewSelect(). - TableExpr("(?) moves", s.SortMovesBySeq(date)). + TableExpr("(?) moves", sortMovesBySeq). DistinctOn("accounts_address, asset"). Column("accounts_address", "asset"). ColumnExpr("first_value(post_commit_volumes) over (partition by (accounts_address, asset) order by seq desc) as post_commit_volumes"). @@ -43,7 +47,7 @@ func (s *Store) SelectDistinctMovesBySeq(date *time.Time) *bun.SelectQuery { ret = ret.Where("insertion_date <= ?", date) } - return ret + return ret, nil } func (s *Store) SelectDistinctMovesByEffectiveDate(date *time.Time) *bun.SelectQuery { diff --git a/internal/storage/ledger/transactions.go b/internal/storage/ledger/transactions.go index e0fb7b612..9de7d93fd 100644 --- a/internal/storage/ledger/transactions.go +++ b/internal/storage/ledger/transactions.go @@ -50,11 +50,11 @@ func (s *Store) selectDistinctTransactionMetadataHistories(date *time.Time) *bun return ret } -func (s *Store) selectTransactions(date *time.Time, expandVolumes, expandEffectiveVolumes bool, q query.Builder) *bun.SelectQuery { +func (s *Store) selectTransactions(date *time.Time, expandVolumes, expandEffectiveVolumes bool, q query.Builder) (*bun.SelectQuery, error) { ret := s.db.NewSelect() if expandEffectiveVolumes && !s.ledger.HasFeature(features.FeatureMovesHistoryPostCommitEffectiveVolumes, "SYNC") { - return ret.Err(ledgercontroller.NewErrMissingFeature(features.FeatureMovesHistoryPostCommitEffectiveVolumes)) + return nil, ledgercontroller.NewErrMissingFeature(features.FeatureMovesHistoryPostCommitEffectiveVolumes) } if q != nil { @@ -91,7 +91,7 @@ func (s *Store) selectTransactions(date *time.Time, expandVolumes, expandEffecti return nil }); err != nil { - return ret.Err(err) + return nil, err } } @@ -229,7 +229,7 @@ func (s *Store) selectTransactions(date *time.Time, expandVolumes, expandEffecti } })) if err != nil { - return ret.Err(err) + return nil, err } if len(args) > 0 { @@ -239,7 +239,7 @@ func (s *Store) selectTransactions(date *time.Time, expandVolumes, expandEffecti } } - return ret + return ret, nil } func (s *Store) CommitTransaction(ctx context.Context, tx *ledger.Transaction) error { @@ -317,14 +317,18 @@ func (s *Store) ListTransactions(ctx context.Context, q ledgercontroller.ListTra s.tracer, s.listTransactionsHistogram, func(ctx context.Context) (*bunpaginate.Cursor[ledger.Transaction], error) { + selectTransactions, err := s.selectTransactions( + q.Options.Options.PIT, + q.Options.Options.ExpandVolumes, + q.Options.Options.ExpandEffectiveVolumes, + q.Options.QueryBuilder, + ) + if err != nil { + return nil, err + } cursor, err := bunpaginate.UsingColumn[ledgercontroller.PaginatedQueryOptions[ledgercontroller.PITFilterWithVolumes], ledger.Transaction]( ctx, - s.selectTransactions( - q.Options.Options.PIT, - q.Options.Options.ExpandVolumes, - q.Options.Options.ExpandEffectiveVolumes, - q.Options.QueryBuilder, - ), + selectTransactions, bunpaginate.ColumnPaginatedQuery[ledgercontroller.PaginatedQueryOptions[ledgercontroller.PITFilterWithVolumes]](q), ) if err != nil { @@ -343,13 +347,17 @@ func (s *Store) CountTransactions(ctx context.Context, q ledgercontroller.ListTr s.tracer, s.countTransactionsHistogram, func(ctx context.Context) (int, error) { + selectTransactions, err := s.selectTransactions( + q.Options.Options.PIT, + q.Options.Options.ExpandVolumes, + q.Options.Options.ExpandEffectiveVolumes, + q.Options.QueryBuilder, + ) + if err != nil { + return 0, err + } return s.db.NewSelect(). - TableExpr("(?) data", s.selectTransactions( - q.Options.Options.PIT, - q.Options.Options.ExpandVolumes, - q.Options.Options.ExpandEffectiveVolumes, - q.Options.QueryBuilder, - )). + TableExpr("(?) data", selectTransactions). Count(ctx) }, ) @@ -364,12 +372,16 @@ func (s *Store) GetTransaction(ctx context.Context, filter ledgercontroller.GetT func(ctx context.Context) (*ledger.Transaction, error) { ret := &ledger.Transaction{} - if err := s.selectTransactions( + selectTransactions, err := s.selectTransactions( filter.PIT, filter.ExpandVolumes, filter.ExpandEffectiveVolumes, nil, - ). + ) + if err != nil { + return nil, err + } + if err := selectTransactions. Where("transactions.id = ?", filter.ID). Limit(1). Model(ret). diff --git a/internal/storage/ledger/volumes.go b/internal/storage/ledger/volumes.go index 8b7c26722..48f207e21 100644 --- a/internal/storage/ledger/volumes.go +++ b/internal/storage/ledger/volumes.go @@ -64,7 +64,7 @@ func (s *Store) UpdateVolumes(ctx context.Context, accountVolumes ...ledger.Acco ) } -func (s *Store) selectVolumes(oot, pit *time.Time, useInsertionDate bool, groupLevel int, q lquery.Builder) *bun.SelectQuery { +func (s *Store) selectVolumes(oot, pit *time.Time, useInsertionDate bool, groupLevel int, q lquery.Builder) (*bun.SelectQuery, error) { ret := s.db.NewSelect() var ( @@ -98,7 +98,7 @@ func (s *Store) selectVolumes(oot, pit *time.Time, useInsertionDate bool, groupL return nil }) if err != nil { - return ret.Err(err) + return nil, err } } @@ -115,7 +115,7 @@ func (s *Store) selectVolumes(oot, pit *time.Time, useInsertionDate bool, groupL Order("accounts_address", "asset") } else { if !s.ledger.HasFeature(features.FeatureMovesHistory, "ON") { - return ret.Err(ledgercontroller.NewErrMissingFeature(features.FeatureMovesHistory)) + return nil, ledgercontroller.NewErrMissingFeature(features.FeatureMovesHistory) } dateFilterColumn := "effective_date" @@ -210,7 +210,7 @@ func (s *Store) selectVolumes(oot, pit *time.Time, useInsertionDate bool, groupL } })) if err != nil { - return ret.Err(err) + return nil, err } ret = ret.Where(where, args...) } @@ -232,7 +232,7 @@ func (s *Store) selectVolumes(oot, pit *time.Time, useInsertionDate bool, groupL globalQuery = globalQuery.ColumnExpr("address as account, asset, input, output, balance") } - return globalQuery + return globalQuery, nil } func (s *Store) GetVolumesWithBalances(ctx context.Context, q ledgercontroller.GetVolumesWithBalancesQuery) (*bunpaginate.Cursor[ledger.VolumesWithBalanceByAssetByAccount], error) { @@ -242,15 +242,19 @@ func (s *Store) GetVolumesWithBalances(ctx context.Context, q ledgercontroller.G s.tracer, s.getVolumesWithBalancesHistogram, func(ctx context.Context) (*bunpaginate.Cursor[ledger.VolumesWithBalanceByAssetByAccount], error) { + selectVolumes, err := s.selectVolumes( + q.Options.Options.OOT, + q.Options.Options.PIT, + q.Options.Options.UseInsertionDate, + q.Options.Options.GroupLvl, + q.Options.QueryBuilder, + ) + if err != nil { + return nil, err + } return bunpaginate.UsingOffset[ledgercontroller.PaginatedQueryOptions[ledgercontroller.FiltersForVolumes], ledger.VolumesWithBalanceByAssetByAccount]( ctx, - s.selectVolumes( - q.Options.Options.OOT, - q.Options.Options.PIT, - q.Options.Options.UseInsertionDate, - q.Options.Options.GroupLvl, - q.Options.QueryBuilder, - ), + selectVolumes, bunpaginate.OffsetPaginatedQuery[ledgercontroller.PaginatedQueryOptions[ledgercontroller.FiltersForVolumes]](q), ) },