Skip to content

Commit

Permalink
Merge pull request #587 from formancehq/feat/parallel-bulk
Browse files Browse the repository at this point in the history
feat(bulk): add parallel processing
  • Loading branch information
gfyrag authored Nov 25, 2024
2 parents 26eaec1 + fb4c804 commit 9d948b7
Show file tree
Hide file tree
Showing 25 changed files with 440 additions and 245 deletions.
16 changes: 13 additions & 3 deletions cmd/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ const (
AutoUpgradeFlag = "auto-upgrade"
ExperimentalFeaturesFlag = "experimental-features"
BulkMaxSizeFlag = "bulk-max-size"
BulkParallelFlag = "bulk-parallel"
NumscriptInterpreterFlag = "experimental-numscript-interpreter"
)

Expand All @@ -67,6 +68,11 @@ func NewServeCommand() *cobra.Command {
return err
}

bulkParallel, err := cmd.Flags().GetInt(BulkParallelFlag)
if err != nil {
return err
}

options := []fx.Option{
fx.NopLogger,
otlp.FXModuleFromFlags(cmd),
Expand All @@ -90,9 +96,12 @@ func NewServeCommand() *cobra.Command {
bus.NewFxModule(),
ballast.Module(serveConfiguration.ballastSize),
api.Module(api.Config{
Version: Version,
Debug: service.IsDebug(cmd),
BulkMaxSize: bulkMaxSize,
Version: Version,
Debug: service.IsDebug(cmd),
Bulk: api.BulkConfig{
MaxSize: bulkMaxSize,
Parallel: bulkParallel,
},
}),
fx.Decorate(func(
params struct {
Expand Down Expand Up @@ -129,6 +138,7 @@ func NewServeCommand() *cobra.Command {
cmd.Flags().String(BindFlag, "0.0.0.0:3068", "API bind address")
cmd.Flags().Bool(ExperimentalFeaturesFlag, false, "Enable features configurability")
cmd.Flags().Int(BulkMaxSizeFlag, api.DefaultBulkMaxSize, "Bulk max size (default 100)")
cmd.Flags().Int(BulkParallelFlag, 10, "Bulk max parallelism")
cmd.Flags().Bool(NumscriptInterpreterFlag, false, "Enable experimental numscript rewrite")

service.AddFlags(cmd.Flags())
Expand Down
1 change: 1 addition & 0 deletions docs/api/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -500,6 +500,7 @@ Accept: application/json
|ledger|path|string|true|Name of the ledger.|
|continueOnFailure|query|boolean|false|Continue on failure|
|atomic|query|boolean|false|Make bulk atomic|
|parallel|query|boolean|false|Process bulk elements in parallel|
|body|body|[V2Bulk](#schemav2bulk)|false|none|

> Example responses
Expand Down
13 changes: 11 additions & 2 deletions internal/api/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,22 @@ import (
_ "embed"
"github.com/formancehq/go-libs/v2/auth"
"github.com/formancehq/go-libs/v2/health"
"github.com/formancehq/ledger/internal/controller/ledger"
"github.com/formancehq/ledger/internal/controller/system"
"github.com/go-chi/chi/v5"
"go.opentelemetry.io/otel/trace"
"go.uber.org/fx"
)

type BulkConfig struct {
MaxSize int
Parallel int
}

type Config struct {
Version string
Debug bool
BulkMaxSize int
Bulk BulkConfig
}

func Module(cfg Config) fx.Option {
Expand All @@ -29,7 +35,10 @@ func Module(cfg Config) fx.Option {
"develop",
cfg.Debug,
WithTracer(tracer.Tracer("api")),
WithBulkMaxSize(cfg.BulkMaxSize),
WithBulkMaxSize(cfg.Bulk.MaxSize),
WithBulkerFactory(ledger.NewDefaultBulkerFactory(
ledger.WithParallelism(cfg.Bulk.Parallel),
)),
)
}),
health.Module(),
Expand Down
13 changes: 11 additions & 2 deletions internal/api/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package api

import (
"github.com/formancehq/go-libs/v2/api"
ledgercontroller "github.com/formancehq/ledger/internal/controller/ledger"
"github.com/formancehq/ledger/internal/controller/system"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
Expand Down Expand Up @@ -66,6 +67,7 @@ func NewRouter(
v2.WithTracer(routerOptions.tracer),
v2.WithMiddlewares(commonMiddlewares...),
v2.WithBulkMaxSize(routerOptions.bulkMaxSize),
v2.WithBulkerFactory(routerOptions.bulkerFactory),
)
mux.Handle("/v2*", http.StripPrefix("/v2", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
chi.RouteContext(r.Context()).Reset()
Expand All @@ -84,8 +86,9 @@ func NewRouter(
}

type routerOptions struct {
tracer trace.Tracer
bulkMaxSize int
tracer trace.Tracer
bulkMaxSize int
bulkerFactory ledgercontroller.BulkerFactory
}

type RouterOption func(ro *routerOptions)
Expand All @@ -102,6 +105,12 @@ func WithBulkMaxSize(bulkMaxSize int) RouterOption {
}
}

func WithBulkerFactory(bf ledgercontroller.BulkerFactory) RouterOption {
return func(ro *routerOptions) {
ro.bulkerFactory = bf
}
}

var defaultRouterOptions = []RouterOption{
WithTracer(nooptracer.Tracer{}),
WithBulkMaxSize(DefaultBulkMaxSize),
Expand Down
12 changes: 6 additions & 6 deletions internal/api/v2/controllers_bulk.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
ledgercontroller "github.com/formancehq/ledger/internal/controller/ledger"
)

func bulkHandler(bulkMaxSize int) http.HandlerFunc {
func bulkHandler(bulkerFactory ledgercontroller.BulkerFactory, bulkMaxSize int) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
b := ledgercontroller.Bulk{}
if err := json.NewDecoder(r.Body).Decode(&b); err != nil {
Expand All @@ -28,11 +28,11 @@ func bulkHandler(bulkMaxSize int) http.HandlerFunc {
w.Header().Set("Content-Type", "application/json")

ledgerController := common.LedgerFromContext(r.Context())
bulker := ledgercontroller.NewBulker(ledgerController)
results, err := bulker.Run(r.Context(),
b,
api.QueryParamBool(r, "continueOnFailure"),
api.QueryParamBool(r, "atomic"),

results, err := bulkerFactory.CreateBulker(ledgerController).Run(r.Context(), b,
ledgercontroller.WithContinueOnFailure(api.QueryParamBool(r, "continueOnFailure")),
ledgercontroller.WithAtomic(api.QueryParamBool(r, "atomic")),
ledgercontroller.WithParallel(api.QueryParamBool(r, "parallel")),
)
if err != nil {
api.InternalServerError(w, r, err)
Expand Down
4 changes: 4 additions & 0 deletions internal/api/v2/controllers_bulk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,10 @@ func TestBulk(t *testing.T) {
ErrorCode: api.ErrorInternal,
ErrorDescription: "unexpected error",
ResponseType: "ERROR",
}, {
ErrorCode: api.ErrorInternal,
ErrorDescription: "canceled",
ResponseType: "ERROR",
}},
expectError: true,
},
Expand Down
11 changes: 10 additions & 1 deletion internal/api/v2/routes.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package v2

import (
"github.com/formancehq/ledger/internal/controller/ledger"
nooptracer "go.opentelemetry.io/otel/trace/noop"
"net/http"

Expand Down Expand Up @@ -52,7 +53,7 @@ func NewRouter(
router.With(common.LedgerMiddleware(systemController, func(r *http.Request) string {
return chi.URLParam(r, "ledger")
}, routerOptions.tracer, "/_info")).Group(func(router chi.Router) {
router.Post("/_bulk", bulkHandler(routerOptions.bulkMaxSize))
router.Post("/_bulk", bulkHandler(routerOptions.bulkerFactory, routerOptions.bulkMaxSize))

// LedgerController
router.Get("/_info", getLedgerInfo)
Expand Down Expand Up @@ -92,6 +93,7 @@ func NewRouter(
type routerOptions struct {
tracer trace.Tracer
middlewares []func(http.Handler) http.Handler
bulkerFactory ledger.BulkerFactory
bulkMaxSize int
}

Expand All @@ -115,6 +117,13 @@ func WithBulkMaxSize(bulkMaxSize int) RouterOption {
}
}

func WithBulkerFactory(bulkerFactory ledger.BulkerFactory) RouterOption {
return func(ro *routerOptions) {
ro.bulkerFactory = bulkerFactory
}
}

var defaultRouterOptions = []RouterOption{
WithTracer(nooptracer.Tracer{}),
WithBulkerFactory(ledger.NewDefaultBulkerFactory()),
}
Loading

0 comments on commit 9d948b7

Please sign in to comment.