Skip to content

Commit

Permalink
fix: first
Browse files Browse the repository at this point in the history
  • Loading branch information
Antoine Gelloz authored and flemzord committed Feb 9, 2023
1 parent ae2ec31 commit 0c60871
Show file tree
Hide file tree
Showing 6 changed files with 197 additions and 42 deletions.
178 changes: 178 additions & 0 deletions pkg/api/controllers/pagination_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,13 @@ package controllers_test

import (
"context"
"encoding/base64"
"encoding/json"
"fmt"
"net/http"
"net/url"
"testing"
"time"

sharedapi "github.com/formancehq/go-libs/api"
"github.com/numary/ledger/pkg/api"
Expand Down Expand Up @@ -436,3 +439,178 @@ func testGetPagination(t *testing.T, api *api.API, txsPages, additionalTxs int)
return nil
}
}

func TestCursor(t *testing.T) {
internal.RunTest(t, fx.Invoke(func(lc fx.Lifecycle, api *api.API) {
lc.Append(fx.Hook{
OnStart: func(ctx context.Context) error {
timestamp, err := time.Parse(time.RFC3339, "2023-01-01T00:00:00Z")
require.NoError(t, err)
for i := 0; i < 30; i++ {
rsp := internal.PostTransaction(t, api, controllers.PostTransaction{
Postings: core.Postings{
{
Source: "world",
Destination: fmt.Sprintf("accounts:%02d", i),
Amount: core.NewMonetaryInt(1),
Asset: "USD",
},
},
Reference: fmt.Sprintf("ref:%02d", i),
Metadata: core.Metadata{"ref": "abc"},
Timestamp: timestamp.Add(time.Duration(i) * time.Second),
}, false)
require.Equal(t, http.StatusOK, rsp.Result().StatusCode)
rsp = internal.PostAccountMetadata(t, api, fmt.Sprintf("accounts:%02d", i),
core.Metadata{
"foo": json.RawMessage(`"bar"`),
})
require.Equal(t, http.StatusNoContent, rsp.Result().StatusCode)
}

t.Run("GetAccounts", func(t *testing.T) {
httpResponse := internal.GetAccounts(api, url.Values{
"after": []string{"accounts:15"},
"address": []string{"acc.*"},
"metadata[foo]": []string{"bar"},
"balance": []string{"1"},
controllers.QueryKeyBalanceOperator: []string{"gte"},
controllers.QueryKeyPageSize: []string{"3"},
})
assert.Equal(t, http.StatusOK, httpResponse.Result().StatusCode, httpResponse.Body.String())

cursor := internal.DecodeCursorResponse[core.Account](t, httpResponse.Body)
res, err := base64.RawURLEncoding.DecodeString(cursor.Next)
require.NoError(t, err)
require.Equal(t,
`{"pageSize":3,"offset":3,"after":"accounts:15","address":"acc.*","metadata":{"foo":"bar"},"balance":"1","balanceOperator":"gte"}`,
string(res))

httpResponse = internal.GetAccounts(api, url.Values{
controllers.QueryKeyCursor: []string{cursor.Next},
})
assert.Equal(t, http.StatusOK, httpResponse.Result().StatusCode, httpResponse.Body.String())

cursor = internal.DecodeCursorResponse[core.Account](t, httpResponse.Body)
res, err = base64.RawURLEncoding.DecodeString(cursor.Previous)
require.NoError(t, err)
require.Equal(t,
`{"pageSize":3,"offset":0,"after":"accounts:15","address":"acc.*","metadata":{"foo":"bar"},"balance":"1","balanceOperator":"gte"}`,
string(res))
res, err = base64.RawURLEncoding.DecodeString(cursor.Next)
require.NoError(t, err)
require.Equal(t,
`{"pageSize":3,"offset":6,"after":"accounts:15","address":"acc.*","metadata":{"foo":"bar"},"balance":"1","balanceOperator":"gte"}`,
string(res))
})

t.Run("GetTransactions", func(t *testing.T) {
httpResponse := internal.GetTransactions(api, url.Values{
"after": []string{"15"},
"account": []string{"acc.*"},
"source": []string{"world"},
"destination": []string{"acc.*"},
controllers.QueryKeyStartTime: []string{timestamp.Add(5 * time.Second).Format(time.RFC3339)},
controllers.QueryKeyEndTime: []string{timestamp.Add(25 * time.Second).Format(time.RFC3339)},
"metadata[ref]": []string{"abc"},
controllers.QueryKeyPageSize: []string{"3"},
})
assert.Equal(t, http.StatusOK, httpResponse.Result().StatusCode, httpResponse.Body.String())

cursor := internal.DecodeCursorResponse[core.Transaction](t, httpResponse.Body)
res, err := base64.RawURLEncoding.DecodeString(cursor.Next)
require.NoError(t, err)
require.Equal(t,
`{"after":12,"account":"acc.*","source":"world","destination":"acc.*","startTime":"2023-01-01T00:00:05Z","endTime":"2023-01-01T00:00:25Z","metadata":{"ref":"abc"},"pageSize":3}`,
string(res))

httpResponse = internal.GetTransactions(api, url.Values{
controllers.QueryKeyCursor: []string{cursor.Next},
})
assert.Equal(t, http.StatusOK, httpResponse.Result().StatusCode, httpResponse.Body.String())

cursor = internal.DecodeCursorResponse[core.Transaction](t, httpResponse.Body)
res, err = base64.RawURLEncoding.DecodeString(cursor.Previous)
require.NoError(t, err)
require.Equal(t,
`{"after":15,"account":"acc.*","source":"world","destination":"acc.*","startTime":"2023-01-01T00:00:05Z","endTime":"2023-01-01T00:00:25Z","metadata":{"ref":"abc"},"pageSize":3}`,
string(res))
res, err = base64.RawURLEncoding.DecodeString(cursor.Next)
require.NoError(t, err)
require.Equal(t,
`{"after":9,"account":"acc.*","source":"world","destination":"acc.*","startTime":"2023-01-01T00:00:05Z","endTime":"2023-01-01T00:00:25Z","metadata":{"ref":"abc"},"pageSize":3}`,
string(res))
})

t.Run("GetBalances", func(t *testing.T) {
httpResponse := internal.GetBalances(api, url.Values{
"after": []string{"accounts:15"},
"address": []string{"acc.*"},
controllers.QueryKeyPageSize: []string{"3"},
})
assert.Equal(t, http.StatusOK, httpResponse.Result().StatusCode, httpResponse.Body.String())

cursor := internal.DecodeCursorResponse[core.AccountsBalances](t, httpResponse.Body)
res, err := base64.RawURLEncoding.DecodeString(cursor.Next)
require.NoError(t, err)
require.Equal(t,
`{"pageSize":3,"offset":3,"after":"accounts:15","address":"acc.*"}`,
string(res))

httpResponse = internal.GetBalances(api, url.Values{
controllers.QueryKeyCursor: []string{cursor.Next},
})
assert.Equal(t, http.StatusOK, httpResponse.Result().StatusCode, httpResponse.Body.String())

cursor = internal.DecodeCursorResponse[core.AccountsBalances](t, httpResponse.Body)
res, err = base64.RawURLEncoding.DecodeString(cursor.Previous)
require.NoError(t, err)
require.Equal(t,
`{"pageSize":3,"offset":0,"after":"accounts:15","address":"acc.*"}`,
string(res))
res, err = base64.RawURLEncoding.DecodeString(cursor.Next)
require.NoError(t, err)
require.Equal(t,
`{"pageSize":3,"offset":6,"after":"accounts:15","address":"acc.*"}`,
string(res))
})

t.Run("GetLogs", func(t *testing.T) {
httpResponse := internal.GetLedgerLogs(api, url.Values{
"after": []string{"30"},
controllers.QueryKeyStartTime: []string{timestamp.Add(5 * time.Second).Format(time.RFC3339)},
controllers.QueryKeyEndTime: []string{timestamp.Add(25 * time.Second).Format(time.RFC3339)},
controllers.QueryKeyPageSize: []string{"2"},
})
assert.Equal(t, http.StatusOK, httpResponse.Result().StatusCode, httpResponse.Body.String())

cursor := internal.DecodeCursorResponse[core.Log](t, httpResponse.Body)
res, err := base64.RawURLEncoding.DecodeString(cursor.Next)
require.NoError(t, err)
require.Equal(t,
`{"after":26,"pageSize":2,"startTime":"2023-01-01T00:00:05Z","endTime":"2023-01-01T00:00:25Z"}`,
string(res))

httpResponse = internal.GetLedgerLogs(api, url.Values{
controllers.QueryKeyCursor: []string{cursor.Next},
})
assert.Equal(t, http.StatusOK, httpResponse.Result().StatusCode, httpResponse.Body.String())

cursor = internal.DecodeCursorResponse[core.Log](t, httpResponse.Body)
res, err = base64.RawURLEncoding.DecodeString(cursor.Previous)
require.NoError(t, err)
require.Equal(t,
`{"after":28,"pageSize":2,"startTime":"2023-01-01T00:00:05Z","endTime":"2023-01-01T00:00:25Z"}`,
string(res))
res, err = base64.RawURLEncoding.DecodeString(cursor.Next)
require.NoError(t, err)
require.Equal(t,
`{"after":22,"pageSize":2,"startTime":"2023-01-01T00:00:05Z","endTime":"2023-01-01T00:00:25Z"}`,
string(res))
})

return nil
},
})
}))
}
28 changes: 12 additions & 16 deletions pkg/storage/sqlstorage/accounts.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,20 +40,17 @@ func (s *Store) buildAccountsQuery(p ledger.AccountsQuery) (*sqlbuilder.SelectBu
t.AddressRegexpFilter = address
}

if len(metadata) > 0 {
for key, value := range metadata {
arg := sb.Args.Add(value)
// TODO: Need to find another way to specify the prefix since Table() methods does not make sense for functions and procedures
sb.Where(s.schema.Table(
fmt.Sprintf("%s(metadata, %s, '%s')",
SQLCustomFuncMetaCompare, arg, strings.ReplaceAll(key, ".", "', '")),
))
}
t.MetadataFilter = metadata
for key, value := range metadata {
arg := sb.Args.Add(value)
// TODO: Need to find another way to specify the prefix since Table() methods does not make sense for functions and procedures
sb.Where(s.schema.Table(
fmt.Sprintf("%s(metadata, %s, '%s')",
SQLCustomFuncMetaCompare, arg, strings.ReplaceAll(key, ".", "', '")),
))
}
t.MetadataFilter = metadata

if balance != "" {

sb.Join(s.schema.Table("volumes"), "accounts.address = volumes.account")
balanceOperation := "volumes.input - volumes.output"

Expand Down Expand Up @@ -84,6 +81,9 @@ func (s *Store) buildAccountsQuery(p ledger.AccountsQuery) (*sqlbuilder.SelectBu
} else { // if no operator is given, default to gte
sb.Where(sb.GreaterEqualThan(balanceOperation, balanceValue))
}

t.BalanceFilter = balance
t.BalanceOperatorFilter = balanceOperator
}

return sb, t
Expand Down Expand Up @@ -120,11 +120,7 @@ func (s *Store) GetAccounts(ctx context.Context, q ledger.AccountsQuery) (api.Cu
if err != nil {
return api.Cursor[core.Account]{}, s.error(err)
}
defer func(rows *sql.Rows) {
if err := rows.Close(); err != nil {
panic(err)
}
}(rows)
defer rows.Close()

for rows.Next() {
account := core.Account{
Expand Down
7 changes: 1 addition & 6 deletions pkg/storage/sqlstorage/aggregations.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,12 +132,7 @@ func (s *Store) GetAssetsVolumes(ctx context.Context, accountAddress string) (co
if err != nil {
return nil, s.error(err)
}
defer func(rows *sql.Rows) {
err := rows.Close()
if err != nil {
panic(err)
}
}(rows)
defer rows.Close()

volumes := core.AssetsVolumes{}
for rows.Next() {
Expand Down
14 changes: 2 additions & 12 deletions pkg/storage/sqlstorage/balances.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package sqlstorage

import (
"context"
"database/sql"
"encoding/base64"
"encoding/json"
"strconv"
Expand Down Expand Up @@ -41,12 +40,7 @@ func (s *Store) GetBalancesAggregated(ctx context.Context, q ledger.BalancesQuer
if err != nil {
return nil, s.error(err)
}

defer func(rows *sql.Rows) {
if err := rows.Close(); err != nil {
panic(err)
}
}(rows)
defer rows.Close()

aggregatedBalances := core.AssetsBalances{}

Expand Down Expand Up @@ -121,11 +115,7 @@ func (s *Store) GetBalances(ctx context.Context, q ledger.BalancesQuery) (api.Cu
if err != nil {
return api.Cursor[core.AccountsBalances]{}, s.error(err)
}
defer func(rows *sql.Rows) {
if err := rows.Close(); err != nil {
panic(err)
}
}(rows)
defer rows.Close()

accounts := make([]core.AccountsBalances, 0)

Expand Down
6 changes: 1 addition & 5 deletions pkg/storage/sqlstorage/store_system.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,7 @@ func (s SystemStore) ListLedgers(ctx context.Context) ([]string, error) {
if err != nil {
return nil, err
}
defer func(rows *sql.Rows) {
if err := rows.Close(); err != nil {
panic(err)
}
}(rows)
defer rows.Close()

res := make([]string, 0)
for rows.Next() {
Expand Down
6 changes: 3 additions & 3 deletions pkg/storage/sqlstorage/transactions.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ func (s *Store) buildTransactionsQuery(flavor Flavor, p ledger.TransactionsQuery
// deprecated regex handling
arg := sb.Args.Add(source)
sb.Where(s.schema.Table("use_account_as_source") + "(postings, " + arg + ")")
t.SourceFilter = source
} else {
// new wildcard handling
src := strings.Split(source, ":")
Expand All @@ -67,13 +66,13 @@ func (s *Store) buildTransactionsQuery(flavor Flavor, p ledger.TransactionsQuery
sb.Where(fmt.Sprintf("postings.source @@ ('$[%d] == \"' || %s::text || '\"')::jsonpath", i, arg))
}
}
t.SourceFilter = source
}
if destination != "" {
if !addressQueryRegexp.MatchString(destination) || flavor == SQLite {
// deprecated regex handling
arg := sb.Args.Add(destination)
sb.Where(s.schema.Table("use_account_as_destination") + "(postings, " + arg + ")")
t.DestinationFilter = destination
} else {
// new wildcard handling
dst := strings.Split(destination, ":")
Expand All @@ -87,13 +86,13 @@ func (s *Store) buildTransactionsQuery(flavor Flavor, p ledger.TransactionsQuery
sb.Where(fmt.Sprintf("postings.destination @@ ('$[%d] == \"' || %s::text || '\"')::jsonpath", i, arg))
}
}
t.DestinationFilter = destination
}
if account != "" {
if !addressQueryRegexp.MatchString(account) || flavor == SQLite {
// deprecated regex handling
arg := sb.Args.Add(account)
sb.Where(s.schema.Table("use_account") + "(postings, " + arg + ")")
t.AccountFilter = account
} else {
// new wildcard handling
dst := strings.Split(account, ":")
Expand All @@ -107,6 +106,7 @@ func (s *Store) buildTransactionsQuery(flavor Flavor, p ledger.TransactionsQuery
sb.Where(fmt.Sprintf("(postings.source @@ ('$[%d] == \"' || %s::text || '\"')::jsonpath OR postings.destination @@ ('$[%d] == \"' || %s::text || '\"')::jsonpath)", i, arg, i, arg))
}
}
t.AccountFilter = account
}
if reference != "" {
sb.Where(sb.E("reference", reference))
Expand Down

0 comments on commit 0c60871

Please sign in to comment.