diff --git a/pkg/api/controllers/pagination_test.go b/pkg/api/controllers/pagination_test.go index 3b167106d..9721f2f08 100644 --- a/pkg/api/controllers/pagination_test.go +++ b/pkg/api/controllers/pagination_test.go @@ -2,10 +2,13 @@ package controllers_test import ( "context" + "encoding/base64" + "encoding/json" "fmt" "net/http" "net/url" "testing" + "time" sharedapi "github.com/formancehq/go-libs/api" "github.com/numary/ledger/pkg/api" @@ -436,3 +439,178 @@ func testGetPagination(t *testing.T, api *api.API, txsPages, additionalTxs int) return nil } } + +func TestCursor(t *testing.T) { + internal.RunTest(t, fx.Invoke(func(lc fx.Lifecycle, api *api.API) { + lc.Append(fx.Hook{ + OnStart: func(ctx context.Context) error { + timestamp, err := time.Parse(time.RFC3339, "2023-01-01T00:00:00Z") + require.NoError(t, err) + for i := 0; i < 30; i++ { + rsp := internal.PostTransaction(t, api, controllers.PostTransaction{ + Postings: core.Postings{ + { + Source: "world", + Destination: fmt.Sprintf("accounts:%02d", i), + Amount: core.NewMonetaryInt(1), + Asset: "USD", + }, + }, + Reference: fmt.Sprintf("ref:%02d", i), + Metadata: core.Metadata{"ref": "abc"}, + Timestamp: timestamp.Add(time.Duration(i) * time.Second), + }, false) + require.Equal(t, http.StatusOK, rsp.Result().StatusCode) + rsp = internal.PostAccountMetadata(t, api, fmt.Sprintf("accounts:%02d", i), + core.Metadata{ + "foo": json.RawMessage(`"bar"`), + }) + require.Equal(t, http.StatusNoContent, rsp.Result().StatusCode) + } + + t.Run("GetAccounts", func(t *testing.T) { + httpResponse := internal.GetAccounts(api, url.Values{ + "after": []string{"accounts:15"}, + "address": []string{"acc.*"}, + "metadata[foo]": []string{"bar"}, + "balance": []string{"1"}, + controllers.QueryKeyBalanceOperator: []string{"gte"}, + controllers.QueryKeyPageSize: []string{"3"}, + }) + assert.Equal(t, http.StatusOK, httpResponse.Result().StatusCode, httpResponse.Body.String()) + + cursor := internal.DecodeCursorResponse[core.Account](t, httpResponse.Body) + res, err := base64.RawURLEncoding.DecodeString(cursor.Next) + require.NoError(t, err) + require.Equal(t, + `{"pageSize":3,"offset":3,"after":"accounts:15","address":"acc.*","metadata":{"foo":"bar"},"balance":"1","balanceOperator":"gte"}`, + string(res)) + + httpResponse = internal.GetAccounts(api, url.Values{ + controllers.QueryKeyCursor: []string{cursor.Next}, + }) + assert.Equal(t, http.StatusOK, httpResponse.Result().StatusCode, httpResponse.Body.String()) + + cursor = internal.DecodeCursorResponse[core.Account](t, httpResponse.Body) + res, err = base64.RawURLEncoding.DecodeString(cursor.Previous) + require.NoError(t, err) + require.Equal(t, + `{"pageSize":3,"offset":0,"after":"accounts:15","address":"acc.*","metadata":{"foo":"bar"},"balance":"1","balanceOperator":"gte"}`, + string(res)) + res, err = base64.RawURLEncoding.DecodeString(cursor.Next) + require.NoError(t, err) + require.Equal(t, + `{"pageSize":3,"offset":6,"after":"accounts:15","address":"acc.*","metadata":{"foo":"bar"},"balance":"1","balanceOperator":"gte"}`, + string(res)) + }) + + t.Run("GetTransactions", func(t *testing.T) { + httpResponse := internal.GetTransactions(api, url.Values{ + "after": []string{"15"}, + "account": []string{"acc.*"}, + "source": []string{"world"}, + "destination": []string{"acc.*"}, + controllers.QueryKeyStartTime: []string{timestamp.Add(5 * time.Second).Format(time.RFC3339)}, + controllers.QueryKeyEndTime: []string{timestamp.Add(25 * time.Second).Format(time.RFC3339)}, + "metadata[ref]": []string{"abc"}, + controllers.QueryKeyPageSize: []string{"3"}, + }) + assert.Equal(t, http.StatusOK, httpResponse.Result().StatusCode, httpResponse.Body.String()) + + cursor := internal.DecodeCursorResponse[core.Transaction](t, httpResponse.Body) + res, err := base64.RawURLEncoding.DecodeString(cursor.Next) + require.NoError(t, err) + require.Equal(t, + `{"after":12,"account":"acc.*","source":"world","destination":"acc.*","startTime":"2023-01-01T00:00:05Z","endTime":"2023-01-01T00:00:25Z","metadata":{"ref":"abc"},"pageSize":3}`, + string(res)) + + httpResponse = internal.GetTransactions(api, url.Values{ + controllers.QueryKeyCursor: []string{cursor.Next}, + }) + assert.Equal(t, http.StatusOK, httpResponse.Result().StatusCode, httpResponse.Body.String()) + + cursor = internal.DecodeCursorResponse[core.Transaction](t, httpResponse.Body) + res, err = base64.RawURLEncoding.DecodeString(cursor.Previous) + require.NoError(t, err) + require.Equal(t, + `{"after":15,"account":"acc.*","source":"world","destination":"acc.*","startTime":"2023-01-01T00:00:05Z","endTime":"2023-01-01T00:00:25Z","metadata":{"ref":"abc"},"pageSize":3}`, + string(res)) + res, err = base64.RawURLEncoding.DecodeString(cursor.Next) + require.NoError(t, err) + require.Equal(t, + `{"after":9,"account":"acc.*","source":"world","destination":"acc.*","startTime":"2023-01-01T00:00:05Z","endTime":"2023-01-01T00:00:25Z","metadata":{"ref":"abc"},"pageSize":3}`, + string(res)) + }) + + t.Run("GetBalances", func(t *testing.T) { + httpResponse := internal.GetBalances(api, url.Values{ + "after": []string{"accounts:15"}, + "address": []string{"acc.*"}, + controllers.QueryKeyPageSize: []string{"3"}, + }) + assert.Equal(t, http.StatusOK, httpResponse.Result().StatusCode, httpResponse.Body.String()) + + cursor := internal.DecodeCursorResponse[core.AccountsBalances](t, httpResponse.Body) + res, err := base64.RawURLEncoding.DecodeString(cursor.Next) + require.NoError(t, err) + require.Equal(t, + `{"pageSize":3,"offset":3,"after":"accounts:15","address":"acc.*"}`, + string(res)) + + httpResponse = internal.GetBalances(api, url.Values{ + controllers.QueryKeyCursor: []string{cursor.Next}, + }) + assert.Equal(t, http.StatusOK, httpResponse.Result().StatusCode, httpResponse.Body.String()) + + cursor = internal.DecodeCursorResponse[core.AccountsBalances](t, httpResponse.Body) + res, err = base64.RawURLEncoding.DecodeString(cursor.Previous) + require.NoError(t, err) + require.Equal(t, + `{"pageSize":3,"offset":0,"after":"accounts:15","address":"acc.*"}`, + string(res)) + res, err = base64.RawURLEncoding.DecodeString(cursor.Next) + require.NoError(t, err) + require.Equal(t, + `{"pageSize":3,"offset":6,"after":"accounts:15","address":"acc.*"}`, + string(res)) + }) + + t.Run("GetLogs", func(t *testing.T) { + httpResponse := internal.GetLedgerLogs(api, url.Values{ + "after": []string{"30"}, + controllers.QueryKeyStartTime: []string{timestamp.Add(5 * time.Second).Format(time.RFC3339)}, + controllers.QueryKeyEndTime: []string{timestamp.Add(25 * time.Second).Format(time.RFC3339)}, + controllers.QueryKeyPageSize: []string{"2"}, + }) + assert.Equal(t, http.StatusOK, httpResponse.Result().StatusCode, httpResponse.Body.String()) + + cursor := internal.DecodeCursorResponse[core.Log](t, httpResponse.Body) + res, err := base64.RawURLEncoding.DecodeString(cursor.Next) + require.NoError(t, err) + require.Equal(t, + `{"after":26,"pageSize":2,"startTime":"2023-01-01T00:00:05Z","endTime":"2023-01-01T00:00:25Z"}`, + string(res)) + + httpResponse = internal.GetLedgerLogs(api, url.Values{ + controllers.QueryKeyCursor: []string{cursor.Next}, + }) + assert.Equal(t, http.StatusOK, httpResponse.Result().StatusCode, httpResponse.Body.String()) + + cursor = internal.DecodeCursorResponse[core.Log](t, httpResponse.Body) + res, err = base64.RawURLEncoding.DecodeString(cursor.Previous) + require.NoError(t, err) + require.Equal(t, + `{"after":28,"pageSize":2,"startTime":"2023-01-01T00:00:05Z","endTime":"2023-01-01T00:00:25Z"}`, + string(res)) + res, err = base64.RawURLEncoding.DecodeString(cursor.Next) + require.NoError(t, err) + require.Equal(t, + `{"after":22,"pageSize":2,"startTime":"2023-01-01T00:00:05Z","endTime":"2023-01-01T00:00:25Z"}`, + string(res)) + }) + + return nil + }, + }) + })) +} diff --git a/pkg/storage/sqlstorage/accounts.go b/pkg/storage/sqlstorage/accounts.go index 46f5ab845..52fd81561 100644 --- a/pkg/storage/sqlstorage/accounts.go +++ b/pkg/storage/sqlstorage/accounts.go @@ -40,20 +40,17 @@ func (s *Store) buildAccountsQuery(p ledger.AccountsQuery) (*sqlbuilder.SelectBu t.AddressRegexpFilter = address } - if len(metadata) > 0 { - for key, value := range metadata { - arg := sb.Args.Add(value) - // TODO: Need to find another way to specify the prefix since Table() methods does not make sense for functions and procedures - sb.Where(s.schema.Table( - fmt.Sprintf("%s(metadata, %s, '%s')", - SQLCustomFuncMetaCompare, arg, strings.ReplaceAll(key, ".", "', '")), - )) - } - t.MetadataFilter = metadata + for key, value := range metadata { + arg := sb.Args.Add(value) + // TODO: Need to find another way to specify the prefix since Table() methods does not make sense for functions and procedures + sb.Where(s.schema.Table( + fmt.Sprintf("%s(metadata, %s, '%s')", + SQLCustomFuncMetaCompare, arg, strings.ReplaceAll(key, ".", "', '")), + )) } + t.MetadataFilter = metadata if balance != "" { - sb.Join(s.schema.Table("volumes"), "accounts.address = volumes.account") balanceOperation := "volumes.input - volumes.output" @@ -84,6 +81,9 @@ func (s *Store) buildAccountsQuery(p ledger.AccountsQuery) (*sqlbuilder.SelectBu } else { // if no operator is given, default to gte sb.Where(sb.GreaterEqualThan(balanceOperation, balanceValue)) } + + t.BalanceFilter = balance + t.BalanceOperatorFilter = balanceOperator } return sb, t @@ -120,11 +120,7 @@ func (s *Store) GetAccounts(ctx context.Context, q ledger.AccountsQuery) (api.Cu if err != nil { return api.Cursor[core.Account]{}, s.error(err) } - defer func(rows *sql.Rows) { - if err := rows.Close(); err != nil { - panic(err) - } - }(rows) + defer rows.Close() for rows.Next() { account := core.Account{ diff --git a/pkg/storage/sqlstorage/aggregations.go b/pkg/storage/sqlstorage/aggregations.go index ef72355fe..3d66fa53d 100644 --- a/pkg/storage/sqlstorage/aggregations.go +++ b/pkg/storage/sqlstorage/aggregations.go @@ -132,12 +132,7 @@ func (s *Store) GetAssetsVolumes(ctx context.Context, accountAddress string) (co if err != nil { return nil, s.error(err) } - defer func(rows *sql.Rows) { - err := rows.Close() - if err != nil { - panic(err) - } - }(rows) + defer rows.Close() volumes := core.AssetsVolumes{} for rows.Next() { diff --git a/pkg/storage/sqlstorage/balances.go b/pkg/storage/sqlstorage/balances.go index 1522a33a4..f4cfcd367 100644 --- a/pkg/storage/sqlstorage/balances.go +++ b/pkg/storage/sqlstorage/balances.go @@ -2,7 +2,6 @@ package sqlstorage import ( "context" - "database/sql" "encoding/base64" "encoding/json" "strconv" @@ -41,12 +40,7 @@ func (s *Store) GetBalancesAggregated(ctx context.Context, q ledger.BalancesQuer if err != nil { return nil, s.error(err) } - - defer func(rows *sql.Rows) { - if err := rows.Close(); err != nil { - panic(err) - } - }(rows) + defer rows.Close() aggregatedBalances := core.AssetsBalances{} @@ -121,11 +115,7 @@ func (s *Store) GetBalances(ctx context.Context, q ledger.BalancesQuery) (api.Cu if err != nil { return api.Cursor[core.AccountsBalances]{}, s.error(err) } - defer func(rows *sql.Rows) { - if err := rows.Close(); err != nil { - panic(err) - } - }(rows) + defer rows.Close() accounts := make([]core.AccountsBalances, 0) diff --git a/pkg/storage/sqlstorage/store_system.go b/pkg/storage/sqlstorage/store_system.go index 2af286ef4..9ae82f52a 100644 --- a/pkg/storage/sqlstorage/store_system.go +++ b/pkg/storage/sqlstorage/store_system.go @@ -59,11 +59,7 @@ func (s SystemStore) ListLedgers(ctx context.Context) ([]string, error) { if err != nil { return nil, err } - defer func(rows *sql.Rows) { - if err := rows.Close(); err != nil { - panic(err) - } - }(rows) + defer rows.Close() res := make([]string, 0) for rows.Next() { diff --git a/pkg/storage/sqlstorage/transactions.go b/pkg/storage/sqlstorage/transactions.go index ae56e0d53..1a07ae457 100644 --- a/pkg/storage/sqlstorage/transactions.go +++ b/pkg/storage/sqlstorage/transactions.go @@ -52,7 +52,6 @@ func (s *Store) buildTransactionsQuery(flavor Flavor, p ledger.TransactionsQuery // deprecated regex handling arg := sb.Args.Add(source) sb.Where(s.schema.Table("use_account_as_source") + "(postings, " + arg + ")") - t.SourceFilter = source } else { // new wildcard handling src := strings.Split(source, ":") @@ -67,13 +66,13 @@ func (s *Store) buildTransactionsQuery(flavor Flavor, p ledger.TransactionsQuery sb.Where(fmt.Sprintf("postings.source @@ ('$[%d] == \"' || %s::text || '\"')::jsonpath", i, arg)) } } + t.SourceFilter = source } if destination != "" { if !addressQueryRegexp.MatchString(destination) || flavor == SQLite { // deprecated regex handling arg := sb.Args.Add(destination) sb.Where(s.schema.Table("use_account_as_destination") + "(postings, " + arg + ")") - t.DestinationFilter = destination } else { // new wildcard handling dst := strings.Split(destination, ":") @@ -87,13 +86,13 @@ func (s *Store) buildTransactionsQuery(flavor Flavor, p ledger.TransactionsQuery sb.Where(fmt.Sprintf("postings.destination @@ ('$[%d] == \"' || %s::text || '\"')::jsonpath", i, arg)) } } + t.DestinationFilter = destination } if account != "" { if !addressQueryRegexp.MatchString(account) || flavor == SQLite { // deprecated regex handling arg := sb.Args.Add(account) sb.Where(s.schema.Table("use_account") + "(postings, " + arg + ")") - t.AccountFilter = account } else { // new wildcard handling dst := strings.Split(account, ":") @@ -107,6 +106,7 @@ func (s *Store) buildTransactionsQuery(flavor Flavor, p ledger.TransactionsQuery sb.Where(fmt.Sprintf("(postings.source @@ ('$[%d] == \"' || %s::text || '\"')::jsonpath OR postings.destination @@ ('$[%d] == \"' || %s::text || '\"')::jsonpath)", i, arg, i, arg)) } } + t.AccountFilter = account } if reference != "" { sb.Where(sb.E("reference", reference))