From ec68658bc635e7b5d17c0473c9e71b20a3f10017 Mon Sep 17 00:00:00 2001 From: Geoffrey Ragot Date: Tue, 22 Oct 2024 15:35:45 +0200 Subject: [PATCH] feat: add bulk limits --- cmd/serve.go | 8 ++ docs/api/README.md | 1 + internal/api/module.go | 8 +- internal/api/router.go | 11 ++ internal/api/v2/controllers_bulk.go | 37 +++--- internal/api/v2/errors.go | 1 + internal/api/v2/routes.go | 9 +- openapi.yaml | 1 + openapi/v2.yaml | 1 + pkg/client/.speakeasy/gen.lock | 6 +- pkg/client/.speakeasy/gen.yaml | 2 +- .../docs/models/components/v2errorsenum.md | 3 +- pkg/client/formance.go | 4 +- pkg/client/models/components/v2errorsenum.go | 3 + pkg/testserver/server.go | 8 ++ test/e2e/api_bulk.go | 113 +++++++++++------- 16 files changed, 149 insertions(+), 67 deletions(-) diff --git a/cmd/serve.go b/cmd/serve.go index ed51d4bcb..bc3dbf1f9 100644 --- a/cmd/serve.go +++ b/cmd/serve.go @@ -39,6 +39,7 @@ const ( APIResponsesTimeoutDelayFlag = "api-responses-timeout-delay" APIResponsesTimeoutStatusCodeFlag = "api-responses-timeout-status-code" ExperimentalFeaturesFlag = "experimental-features" + BulkMaxSizeFlag = "bulk-max-size" ) func NewServeCommand() *cobra.Command { @@ -68,6 +69,11 @@ func NewServeCommand() *cobra.Command { return err } + bulkMaxSize, err := cmd.Flags().GetInt(BulkMaxSizeFlag) + if err != nil { + return err + } + options := []fx.Option{ fx.NopLogger, otlp.FXModuleFromFlags(cmd), @@ -96,6 +102,7 @@ func NewServeCommand() *cobra.Command { Timeout: apiResponseTimeoutDelay, StatusCode: apiResponseTimeoutStatusCode, }, + BulkMaxSize: bulkMaxSize, }), fx.Decorate(func( params struct { @@ -131,6 +138,7 @@ func NewServeCommand() *cobra.Command { cmd.Flags().Bool(ExperimentalFeaturesFlag, false, "Enable features configurability") cmd.Flags().Duration(APIResponsesTimeoutDelayFlag, 0, "API response timeout delay") cmd.Flags().Int(APIResponsesTimeoutStatusCodeFlag, http.StatusGatewayTimeout, "API response timeout status code") + cmd.Flags().Int(BulkMaxSizeFlag, api.DefaultBulkMaxSize, "Bulk max size (default 100)") service.AddFlags(cmd.Flags()) bunconnect.AddFlags(cmd.Flags()) diff --git a/docs/api/README.md b/docs/api/README.md index bb75c084d..7cd5c29c8 100644 --- a/docs/api/README.md +++ b/docs/api/README.md @@ -3188,6 +3188,7 @@ Authorization ( Scopes: ledger:write ) |*anonymous*|LEDGER_NOT_FOUND| |*anonymous*|IMPORT| |*anonymous*|TIMEOUT| +|*anonymous*|BULK_SIZE_EXCEEDED|

V2LedgerInfoResponse

diff --git a/internal/api/module.go b/internal/api/module.go index 511877da7..980cb9175 100644 --- a/internal/api/module.go +++ b/internal/api/module.go @@ -13,9 +13,10 @@ import ( ) type Config struct { - Version string - Debug bool - Timeout common.TimeoutConfiguration + Version string + Debug bool + Timeout common.TimeoutConfiguration + BulkMaxSize int } func Module(cfg Config) fx.Option { @@ -34,6 +35,7 @@ func Module(cfg Config) fx.Option { cfg.Debug, WithTracer(tracer.Tracer("api")), WithTimeout(cfg.Timeout), + WithBulkMaxSize(cfg.BulkMaxSize), ) }), health.Module(), diff --git a/internal/api/router.go b/internal/api/router.go index e24c65505..600ff3750 100644 --- a/internal/api/router.go +++ b/internal/api/router.go @@ -66,6 +66,7 @@ func NewRouter( debug, v2.WithTracer(routerOptions.tracer), v2.WithMiddlewares(commonMiddlewares...), + v2.WithBulkMaxSize(routerOptions.bulkMaxSize), ) mux.Handle("/v2*", http.StripPrefix("/v2", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { chi.RouteContext(r.Context()).Reset() @@ -86,6 +87,7 @@ func NewRouter( type routerOptions struct { tracer trace.Tracer timeoutConfiguration common.TimeoutConfiguration + bulkMaxSize int } type RouterOption func(ro *routerOptions) @@ -102,6 +104,15 @@ func WithTimeout(timeoutConfiguration common.TimeoutConfiguration) RouterOption } } +func WithBulkMaxSize(bulkMaxSize int) RouterOption { + return func(ro *routerOptions) { + ro.bulkMaxSize = bulkMaxSize + } +} + var defaultRouterOptions = []RouterOption{ WithTracer(nooptracer.Tracer{}), + WithBulkMaxSize(DefaultBulkMaxSize), } + +const DefaultBulkMaxSize = 100 diff --git a/internal/api/v2/controllers_bulk.go b/internal/api/v2/controllers_bulk.go index 8d561e793..b3b70e8dd 100644 --- a/internal/api/v2/controllers_bulk.go +++ b/internal/api/v2/controllers_bulk.go @@ -14,24 +14,31 @@ import ( ledgercontroller "github.com/formancehq/ledger/internal/controller/ledger" ) -func bulkHandler(w http.ResponseWriter, r *http.Request) { - b := Bulk{} - if err := json.NewDecoder(r.Body).Decode(&b); err != nil { - api.BadRequest(w, ErrValidation, err) - return - } +func bulkHandler(bulkMaxSize int) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + b := Bulk{} + if err := json.NewDecoder(r.Body).Decode(&b); err != nil { + api.BadRequest(w, ErrValidation, err) + return + } - w.Header().Set("Content-Type", "application/json") + if bulkMaxSize != 0 && len(b) > bulkMaxSize { + api.WriteErrorResponse(w, http.StatusRequestEntityTooLarge, ErrBulkSizeExceeded, fmt.Errorf("bulk size exceeded, max size is %d", bulkMaxSize)) + return + } - ret, errorsInBulk, err := ProcessBulk(r.Context(), common.LedgerFromContext(r.Context()), b, api.QueryParamBool(r, "continueOnFailure")) - if err != nil || errorsInBulk { - w.WriteHeader(http.StatusBadRequest) - } + w.Header().Set("Content-Type", "application/json") - if err := json.NewEncoder(w).Encode(api.BaseResponse[[]Result]{ - Data: &ret, - }); err != nil { - panic(err) + ret, errorsInBulk, err := ProcessBulk(r.Context(), common.LedgerFromContext(r.Context()), b, api.QueryParamBool(r, "continueOnFailure")) + if err != nil || errorsInBulk { + w.WriteHeader(http.StatusBadRequest) + } + + if err := json.NewEncoder(w).Encode(api.BaseResponse[[]Result]{ + Data: &ret, + }); err != nil { + panic(err) + } } } diff --git a/internal/api/v2/errors.go b/internal/api/v2/errors.go index b511a422b..0e88416c6 100644 --- a/internal/api/v2/errors.go +++ b/internal/api/v2/errors.go @@ -8,4 +8,5 @@ const ( ErrNoPostings = "NO_POSTINGS" ErrCompilationFailed = "COMPILATION_FAILED" ErrMetadataOverride = "METADATA_OVERRIDE" + ErrBulkSizeExceeded = "BULK_SIZE_EXCEEDED" ) diff --git a/internal/api/v2/routes.go b/internal/api/v2/routes.go index e6b14ef0a..0bc4394b7 100644 --- a/internal/api/v2/routes.go +++ b/internal/api/v2/routes.go @@ -52,7 +52,7 @@ func NewRouter( router.With(common.LedgerMiddleware(systemController, func(r *http.Request) string { return chi.URLParam(r, "ledger") }, routerOptions.tracer, "/_info")).Group(func(router chi.Router) { - router.Post("/_bulk", bulkHandler) + router.Post("/_bulk", bulkHandler(routerOptions.bulkMaxSize)) // LedgerController router.Get("/_info", getLedgerInfo) @@ -92,6 +92,7 @@ func NewRouter( type routerOptions struct { tracer trace.Tracer middlewares []func(http.Handler) http.Handler + bulkMaxSize int } type RouterOption func(ro *routerOptions) @@ -108,6 +109,12 @@ func WithMiddlewares(middlewares ...func(http.Handler) http.Handler) RouterOptio } } +func WithBulkMaxSize(bulkMaxSize int) RouterOption { + return func(ro *routerOptions) { + ro.bulkMaxSize = bulkMaxSize + } +} + var defaultRouterOptions = []RouterOption{ WithTracer(nooptracer.Tracer{}), } diff --git a/openapi.yaml b/openapi.yaml index 8458fa785..e0c60b639 100644 --- a/openapi.yaml +++ b/openapi.yaml @@ -3452,6 +3452,7 @@ components: - LEDGER_NOT_FOUND - IMPORT - TIMEOUT + - BULK_SIZE_EXCEEDED example: VALIDATION V2LedgerInfoResponse: type: object diff --git a/openapi/v2.yaml b/openapi/v2.yaml index e432d96b1..a06345033 100644 --- a/openapi/v2.yaml +++ b/openapi/v2.yaml @@ -1723,6 +1723,7 @@ components: - LEDGER_NOT_FOUND - IMPORT - TIMEOUT + - BULK_SIZE_EXCEEDED example: VALIDATION V2LedgerInfoResponse: type: object diff --git a/pkg/client/.speakeasy/gen.lock b/pkg/client/.speakeasy/gen.lock index ee3413a75..7db1bbd33 100644 --- a/pkg/client/.speakeasy/gen.lock +++ b/pkg/client/.speakeasy/gen.lock @@ -1,12 +1,12 @@ lockVersion: 2.0.0 id: a9ac79e1-e429-4ee3-96c4-ec973f19bec3 management: - docChecksum: 6d88216daa0351f4aa07aaa5646d4d8b + docChecksum: f87e5e30078da93c83bec4115056a7a9 docVersion: v1 speakeasyVersion: 1.351.0 generationVersion: 2.384.1 - releaseVersion: 0.4.14 - configChecksum: 583ed6e25e9905dc157e817350d207e0 + releaseVersion: 0.4.15 + configChecksum: 82b92b8a70bc4a520560afe9e887834c features: go: additionalDependencies: 0.1.0 diff --git a/pkg/client/.speakeasy/gen.yaml b/pkg/client/.speakeasy/gen.yaml index f52c892f8..ad5852e79 100644 --- a/pkg/client/.speakeasy/gen.yaml +++ b/pkg/client/.speakeasy/gen.yaml @@ -15,7 +15,7 @@ generation: auth: oAuth2ClientCredentialsEnabled: true go: - version: 0.4.14 + version: 0.4.15 additionalDependencies: {} allowUnknownFieldsInWeakUnions: false clientServerStatusCodesAsErrors: true diff --git a/pkg/client/docs/models/components/v2errorsenum.md b/pkg/client/docs/models/components/v2errorsenum.md index 0396d0ec8..a292606a7 100644 --- a/pkg/client/docs/models/components/v2errorsenum.md +++ b/pkg/client/docs/models/components/v2errorsenum.md @@ -17,4 +17,5 @@ | `V2ErrorsEnumNoPostings` | NO_POSTINGS | | `V2ErrorsEnumLedgerNotFound` | LEDGER_NOT_FOUND | | `V2ErrorsEnumImport` | IMPORT | -| `V2ErrorsEnumTimeout` | TIMEOUT | \ No newline at end of file +| `V2ErrorsEnumTimeout` | TIMEOUT | +| `V2ErrorsEnumBulkSizeExceeded` | BULK_SIZE_EXCEEDED | \ No newline at end of file diff --git a/pkg/client/formance.go b/pkg/client/formance.go index 5c5c55fb8..e1a4b7c0c 100644 --- a/pkg/client/formance.go +++ b/pkg/client/formance.go @@ -143,9 +143,9 @@ func New(opts ...SDKOption) *Formance { sdkConfiguration: sdkConfiguration{ Language: "go", OpenAPIDocVersion: "v1", - SDKVersion: "0.4.14", + SDKVersion: "0.4.15", GenVersion: "2.384.1", - UserAgent: "speakeasy-sdk/go 0.4.14 2.384.1 v1 github.com/formancehq/stack/ledger/client", + UserAgent: "speakeasy-sdk/go 0.4.15 2.384.1 v1 github.com/formancehq/stack/ledger/client", Hooks: hooks.New(), }, } diff --git a/pkg/client/models/components/v2errorsenum.go b/pkg/client/models/components/v2errorsenum.go index de3c9dfac..598f3248c 100644 --- a/pkg/client/models/components/v2errorsenum.go +++ b/pkg/client/models/components/v2errorsenum.go @@ -23,6 +23,7 @@ const ( V2ErrorsEnumLedgerNotFound V2ErrorsEnum = "LEDGER_NOT_FOUND" V2ErrorsEnumImport V2ErrorsEnum = "IMPORT" V2ErrorsEnumTimeout V2ErrorsEnum = "TIMEOUT" + V2ErrorsEnumBulkSizeExceeded V2ErrorsEnum = "BULK_SIZE_EXCEEDED" ) func (e V2ErrorsEnum) ToPointer() *V2ErrorsEnum { @@ -59,6 +60,8 @@ func (e *V2ErrorsEnum) UnmarshalJSON(data []byte) error { case "IMPORT": fallthrough case "TIMEOUT": + fallthrough + case "BULK_SIZE_EXCEEDED": *e = V2ErrorsEnum(v) return nil default: diff --git a/pkg/testserver/server.go b/pkg/testserver/server.go index 896e7e213..f57e2eeae 100644 --- a/pkg/testserver/server.go +++ b/pkg/testserver/server.go @@ -44,6 +44,7 @@ type Configuration struct { OTLPConfig *OTLPConfig ExperimentalFeatures bool APIResponseTimeout time.Duration + BulkMaxSize int } type Server struct { @@ -81,6 +82,13 @@ func (s *Server) Start() { s.configuration.APIResponseTimeout.String(), ) } + if s.configuration.BulkMaxSize != 0 { + args = append( + args, + "--"+cmd.BulkMaxSizeFlag, + fmt.Sprint(s.configuration.BulkMaxSize), + ) + } if s.configuration.PostgresConfiguration.MaxIdleConns != 0 { args = append( args, diff --git a/test/e2e/api_bulk.go b/test/e2e/api_bulk.go index 943f8112a..76e144558 100644 --- a/test/e2e/api_bulk.go +++ b/test/e2e/api_bulk.go @@ -4,6 +4,7 @@ package test_suite import ( "github.com/formancehq/go-libs/v2/logging" + . "github.com/formancehq/go-libs/v2/testing/api" "github.com/formancehq/go-libs/v2/testing/platform/pgtesting" . "github.com/formancehq/ledger/pkg/testserver" "github.com/formancehq/stack/ledger/client/models/components" @@ -18,8 +19,9 @@ import ( var _ = Context("Ledger engine tests", func() { var ( - db = pgtesting.UsePostgresDatabase(pgServer) - ctx = logging.TestingContext() + db = pgtesting.UsePostgresDatabase(pgServer) + ctx = logging.TestingContext() + bulkMaxSize = 5 ) testServer := NewTestServer(func() Configuration { @@ -28,6 +30,7 @@ var _ = Context("Ledger engine tests", func() { Output: GinkgoWriter, Debug: debug, NatsURL: natsServer.GetValue().ClientURL(), + BulkMaxSize: bulkMaxSize, } }) BeforeEach(func() { @@ -38,51 +41,57 @@ var _ = Context("Ledger engine tests", func() { }) When("creating a bulk on a ledger", func() { var ( - now = time.Now().Round(time.Microsecond).UTC() + now = time.Now().Round(time.Microsecond).UTC() + items []components.V2BulkElement + err error ) BeforeEach(func() { - _, err := CreateBulk(ctx, testServer.GetValue(), operations.V2CreateBulkRequest{ - RequestBody: []components.V2BulkElement{ - components.CreateV2BulkElementCreateTransaction(components.V2BulkElementCreateTransaction{ - Data: &components.V2PostTransaction{ - Metadata: map[string]string{}, - Postings: []components.V2Posting{{ - Amount: big.NewInt(100), - Asset: "USD/2", - Destination: "bank", - Source: "world", - }}, - Timestamp: &now, - }, - }), - components.CreateV2BulkElementAddMetadata(components.V2BulkElementAddMetadata{ - Data: &components.Data{ - Metadata: metadata.Metadata{ - "foo": "bar", - "role": "admin", - }, - TargetID: components.CreateV2TargetIDBigint(big.NewInt(1)), - TargetType: components.V2TargetTypeTransaction, - }, - }), - components.CreateV2BulkElementDeleteMetadata(components.V2BulkElementDeleteMetadata{ - Data: &components.V2BulkElementDeleteMetadataData{ - Key: "foo", - TargetID: components.CreateV2TargetIDBigint(big.NewInt(1)), - TargetType: components.V2TargetTypeTransaction, + items = []components.V2BulkElement{ + components.CreateV2BulkElementCreateTransaction(components.V2BulkElementCreateTransaction{ + Data: &components.V2PostTransaction{ + Metadata: map[string]string{}, + Postings: []components.V2Posting{{ + Amount: big.NewInt(100), + Asset: "USD/2", + Destination: "bank", + Source: "world", + }}, + Timestamp: &now, + }, + }), + components.CreateV2BulkElementAddMetadata(components.V2BulkElementAddMetadata{ + Data: &components.Data{ + Metadata: metadata.Metadata{ + "foo": "bar", + "role": "admin", }, - }), - components.CreateV2BulkElementRevertTransaction(components.V2BulkElementRevertTransaction{ - Data: &components.V2BulkElementRevertTransactionData{ - ID: big.NewInt(1), - }, - }), - }, - Ledger: "default", + TargetID: components.CreateV2TargetIDBigint(big.NewInt(1)), + TargetType: components.V2TargetTypeTransaction, + }, + }), + components.CreateV2BulkElementDeleteMetadata(components.V2BulkElementDeleteMetadata{ + Data: &components.V2BulkElementDeleteMetadataData{ + Key: "foo", + TargetID: components.CreateV2TargetIDBigint(big.NewInt(1)), + TargetType: components.V2TargetTypeTransaction, + }, + }), + components.CreateV2BulkElementRevertTransaction(components.V2BulkElementRevertTransaction{ + Data: &components.V2BulkElementRevertTransactionData{ + ID: big.NewInt(1), + }, + }), + } + }) + JustBeforeEach(func() { + _, err = CreateBulk(ctx, testServer.GetValue(), operations.V2CreateBulkRequest{ + RequestBody: items, + Ledger: "default", }) - Expect(err).To(Succeed()) }) It("should be ok", func() { + Expect(err).To(Succeed()) + tx, err := GetTransaction(ctx, testServer.GetValue(), operations.V2GetTransactionRequest{ ID: big.NewInt(1), Ledger: "default", @@ -112,6 +121,28 @@ var _ = Context("Ledger engine tests", func() { InsertedAt: tx.InsertedAt, })) }) + Context("with exceeded batch size", func() { + BeforeEach(func() { + items = make([]components.V2BulkElement, 0) + for i := 0; i < bulkMaxSize+1; i++ { + items = append(items, components.CreateV2BulkElementCreateTransaction(components.V2BulkElementCreateTransaction{ + Data: &components.V2PostTransaction{ + Metadata: map[string]string{}, + Postings: []components.V2Posting{{ + Amount: big.NewInt(100), + Asset: "USD/2", + Destination: "bank", + Source: "world", + }}, + Timestamp: &now, + }, + })) + } + }) + It("should respond with an error", func() { + Expect(err).To(HaveErrorCode(string(components.V2ErrorsEnumBulkSizeExceeded))) + }) + }) }) When("creating a bulk with an error on a ledger", func() { var (