Skip to content

Commit

Permalink
feat: add bulk limits
Browse files Browse the repository at this point in the history
  • Loading branch information
gfyrag committed Oct 23, 2024
1 parent 1086ddf commit ec68658
Show file tree
Hide file tree
Showing 16 changed files with 149 additions and 67 deletions.
8 changes: 8 additions & 0 deletions cmd/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ const (
APIResponsesTimeoutDelayFlag = "api-responses-timeout-delay"
APIResponsesTimeoutStatusCodeFlag = "api-responses-timeout-status-code"
ExperimentalFeaturesFlag = "experimental-features"
BulkMaxSizeFlag = "bulk-max-size"
)

func NewServeCommand() *cobra.Command {
Expand Down Expand Up @@ -68,6 +69,11 @@ func NewServeCommand() *cobra.Command {
return err
}

bulkMaxSize, err := cmd.Flags().GetInt(BulkMaxSizeFlag)
if err != nil {
return err
}

options := []fx.Option{
fx.NopLogger,
otlp.FXModuleFromFlags(cmd),
Expand Down Expand Up @@ -96,6 +102,7 @@ func NewServeCommand() *cobra.Command {
Timeout: apiResponseTimeoutDelay,
StatusCode: apiResponseTimeoutStatusCode,
},
BulkMaxSize: bulkMaxSize,
}),
fx.Decorate(func(
params struct {
Expand Down Expand Up @@ -131,6 +138,7 @@ func NewServeCommand() *cobra.Command {
cmd.Flags().Bool(ExperimentalFeaturesFlag, false, "Enable features configurability")
cmd.Flags().Duration(APIResponsesTimeoutDelayFlag, 0, "API response timeout delay")
cmd.Flags().Int(APIResponsesTimeoutStatusCodeFlag, http.StatusGatewayTimeout, "API response timeout status code")
cmd.Flags().Int(BulkMaxSizeFlag, api.DefaultBulkMaxSize, "Bulk max size (default 100)")

service.AddFlags(cmd.Flags())
bunconnect.AddFlags(cmd.Flags())
Expand Down
1 change: 1 addition & 0 deletions docs/api/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3188,6 +3188,7 @@ Authorization ( Scopes: ledger:write )
|*anonymous*|LEDGER_NOT_FOUND|
|*anonymous*|IMPORT|
|*anonymous*|TIMEOUT|
|*anonymous*|BULK_SIZE_EXCEEDED|

<h2 id="tocS_V2LedgerInfoResponse">V2LedgerInfoResponse</h2>
<!-- backwards compatibility -->
Expand Down
8 changes: 5 additions & 3 deletions internal/api/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,10 @@ import (
)

type Config struct {
Version string
Debug bool
Timeout common.TimeoutConfiguration
Version string
Debug bool
Timeout common.TimeoutConfiguration
BulkMaxSize int
}

func Module(cfg Config) fx.Option {
Expand All @@ -34,6 +35,7 @@ func Module(cfg Config) fx.Option {
cfg.Debug,
WithTracer(tracer.Tracer("api")),
WithTimeout(cfg.Timeout),
WithBulkMaxSize(cfg.BulkMaxSize),
)
}),
health.Module(),
Expand Down
11 changes: 11 additions & 0 deletions internal/api/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ func NewRouter(
debug,
v2.WithTracer(routerOptions.tracer),
v2.WithMiddlewares(commonMiddlewares...),
v2.WithBulkMaxSize(routerOptions.bulkMaxSize),
)
mux.Handle("/v2*", http.StripPrefix("/v2", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
chi.RouteContext(r.Context()).Reset()
Expand All @@ -86,6 +87,7 @@ func NewRouter(
type routerOptions struct {
tracer trace.Tracer
timeoutConfiguration common.TimeoutConfiguration
bulkMaxSize int
}

type RouterOption func(ro *routerOptions)
Expand All @@ -102,6 +104,15 @@ func WithTimeout(timeoutConfiguration common.TimeoutConfiguration) RouterOption
}
}

func WithBulkMaxSize(bulkMaxSize int) RouterOption {
return func(ro *routerOptions) {
ro.bulkMaxSize = bulkMaxSize
}
}

var defaultRouterOptions = []RouterOption{
WithTracer(nooptracer.Tracer{}),
WithBulkMaxSize(DefaultBulkMaxSize),
}

const DefaultBulkMaxSize = 100
37 changes: 22 additions & 15 deletions internal/api/v2/controllers_bulk.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,24 +14,31 @@ import (
ledgercontroller "github.com/formancehq/ledger/internal/controller/ledger"
)

func bulkHandler(w http.ResponseWriter, r *http.Request) {
b := Bulk{}
if err := json.NewDecoder(r.Body).Decode(&b); err != nil {
api.BadRequest(w, ErrValidation, err)
return
}
func bulkHandler(bulkMaxSize int) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
b := Bulk{}
if err := json.NewDecoder(r.Body).Decode(&b); err != nil {
api.BadRequest(w, ErrValidation, err)
return
}

w.Header().Set("Content-Type", "application/json")
if bulkMaxSize != 0 && len(b) > bulkMaxSize {
api.WriteErrorResponse(w, http.StatusRequestEntityTooLarge, ErrBulkSizeExceeded, fmt.Errorf("bulk size exceeded, max size is %d", bulkMaxSize))
return
}

ret, errorsInBulk, err := ProcessBulk(r.Context(), common.LedgerFromContext(r.Context()), b, api.QueryParamBool(r, "continueOnFailure"))
if err != nil || errorsInBulk {
w.WriteHeader(http.StatusBadRequest)
}
w.Header().Set("Content-Type", "application/json")

if err := json.NewEncoder(w).Encode(api.BaseResponse[[]Result]{
Data: &ret,
}); err != nil {
panic(err)
ret, errorsInBulk, err := ProcessBulk(r.Context(), common.LedgerFromContext(r.Context()), b, api.QueryParamBool(r, "continueOnFailure"))
if err != nil || errorsInBulk {
w.WriteHeader(http.StatusBadRequest)
}

if err := json.NewEncoder(w).Encode(api.BaseResponse[[]Result]{
Data: &ret,
}); err != nil {
panic(err)
}
}
}

Expand Down
1 change: 1 addition & 0 deletions internal/api/v2/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,5 @@ const (
ErrNoPostings = "NO_POSTINGS"
ErrCompilationFailed = "COMPILATION_FAILED"
ErrMetadataOverride = "METADATA_OVERRIDE"
ErrBulkSizeExceeded = "BULK_SIZE_EXCEEDED"
)
9 changes: 8 additions & 1 deletion internal/api/v2/routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func NewRouter(
router.With(common.LedgerMiddleware(systemController, func(r *http.Request) string {
return chi.URLParam(r, "ledger")
}, routerOptions.tracer, "/_info")).Group(func(router chi.Router) {
router.Post("/_bulk", bulkHandler)
router.Post("/_bulk", bulkHandler(routerOptions.bulkMaxSize))

// LedgerController
router.Get("/_info", getLedgerInfo)
Expand Down Expand Up @@ -92,6 +92,7 @@ func NewRouter(
type routerOptions struct {
tracer trace.Tracer
middlewares []func(http.Handler) http.Handler
bulkMaxSize int
}

type RouterOption func(ro *routerOptions)
Expand All @@ -108,6 +109,12 @@ func WithMiddlewares(middlewares ...func(http.Handler) http.Handler) RouterOptio
}
}

func WithBulkMaxSize(bulkMaxSize int) RouterOption {
return func(ro *routerOptions) {
ro.bulkMaxSize = bulkMaxSize
}
}

var defaultRouterOptions = []RouterOption{
WithTracer(nooptracer.Tracer{}),
}
1 change: 1 addition & 0 deletions openapi.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3452,6 +3452,7 @@ components:
- LEDGER_NOT_FOUND
- IMPORT
- TIMEOUT
- BULK_SIZE_EXCEEDED
example: VALIDATION
V2LedgerInfoResponse:
type: object
Expand Down
1 change: 1 addition & 0 deletions openapi/v2.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1723,6 +1723,7 @@ components:
- LEDGER_NOT_FOUND
- IMPORT
- TIMEOUT
- BULK_SIZE_EXCEEDED
example: VALIDATION
V2LedgerInfoResponse:
type: object
Expand Down
6 changes: 3 additions & 3 deletions pkg/client/.speakeasy/gen.lock
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
lockVersion: 2.0.0
id: a9ac79e1-e429-4ee3-96c4-ec973f19bec3
management:
docChecksum: 6d88216daa0351f4aa07aaa5646d4d8b
docChecksum: f87e5e30078da93c83bec4115056a7a9
docVersion: v1
speakeasyVersion: 1.351.0
generationVersion: 2.384.1
releaseVersion: 0.4.14
configChecksum: 583ed6e25e9905dc157e817350d207e0
releaseVersion: 0.4.15
configChecksum: 82b92b8a70bc4a520560afe9e887834c
features:
go:
additionalDependencies: 0.1.0
Expand Down
2 changes: 1 addition & 1 deletion pkg/client/.speakeasy/gen.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ generation:
auth:
oAuth2ClientCredentialsEnabled: true
go:
version: 0.4.14
version: 0.4.15
additionalDependencies: {}
allowUnknownFieldsInWeakUnions: false
clientServerStatusCodesAsErrors: true
Expand Down
3 changes: 2 additions & 1 deletion pkg/client/docs/models/components/v2errorsenum.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,5 @@
| `V2ErrorsEnumNoPostings` | NO_POSTINGS |
| `V2ErrorsEnumLedgerNotFound` | LEDGER_NOT_FOUND |
| `V2ErrorsEnumImport` | IMPORT |
| `V2ErrorsEnumTimeout` | TIMEOUT |
| `V2ErrorsEnumTimeout` | TIMEOUT |
| `V2ErrorsEnumBulkSizeExceeded` | BULK_SIZE_EXCEEDED |
4 changes: 2 additions & 2 deletions pkg/client/formance.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,9 +143,9 @@ func New(opts ...SDKOption) *Formance {
sdkConfiguration: sdkConfiguration{
Language: "go",
OpenAPIDocVersion: "v1",
SDKVersion: "0.4.14",
SDKVersion: "0.4.15",
GenVersion: "2.384.1",
UserAgent: "speakeasy-sdk/go 0.4.14 2.384.1 v1 github.com/formancehq/stack/ledger/client",
UserAgent: "speakeasy-sdk/go 0.4.15 2.384.1 v1 github.com/formancehq/stack/ledger/client",
Hooks: hooks.New(),
},
}
Expand Down
3 changes: 3 additions & 0 deletions pkg/client/models/components/v2errorsenum.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ const (
V2ErrorsEnumLedgerNotFound V2ErrorsEnum = "LEDGER_NOT_FOUND"
V2ErrorsEnumImport V2ErrorsEnum = "IMPORT"
V2ErrorsEnumTimeout V2ErrorsEnum = "TIMEOUT"
V2ErrorsEnumBulkSizeExceeded V2ErrorsEnum = "BULK_SIZE_EXCEEDED"
)

func (e V2ErrorsEnum) ToPointer() *V2ErrorsEnum {
Expand Down Expand Up @@ -59,6 +60,8 @@ func (e *V2ErrorsEnum) UnmarshalJSON(data []byte) error {
case "IMPORT":
fallthrough
case "TIMEOUT":
fallthrough
case "BULK_SIZE_EXCEEDED":
*e = V2ErrorsEnum(v)
return nil
default:
Expand Down
8 changes: 8 additions & 0 deletions pkg/testserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ type Configuration struct {
OTLPConfig *OTLPConfig
ExperimentalFeatures bool
APIResponseTimeout time.Duration
BulkMaxSize int
}

type Server struct {
Expand Down Expand Up @@ -81,6 +82,13 @@ func (s *Server) Start() {
s.configuration.APIResponseTimeout.String(),
)
}
if s.configuration.BulkMaxSize != 0 {
args = append(
args,
"--"+cmd.BulkMaxSizeFlag,
fmt.Sprint(s.configuration.BulkMaxSize),
)
}
if s.configuration.PostgresConfiguration.MaxIdleConns != 0 {
args = append(
args,
Expand Down
Loading

0 comments on commit ec68658

Please sign in to comment.