From 6bf16a60eae57867a27de39f9707cd3e65dba2ba Mon Sep 17 00:00:00 2001 From: Geoffrey Ragot Date: Sat, 23 Nov 2024 15:52:45 +0100 Subject: [PATCH] feat: add parallel bulk --- cmd/serve.go | 16 ++- internal/api/module.go | 13 ++- internal/api/router.go | 13 ++- internal/api/v2/controllers_bulk.go | 8 +- internal/api/v2/controllers_bulk_test.go | 4 + internal/api/v2/routes.go | 11 +- internal/controller/ledger/bulker.go | 125 ++++++++++++++++++----- 7 files changed, 150 insertions(+), 40 deletions(-) diff --git a/cmd/serve.go b/cmd/serve.go index 1489d3a2c..c4a050763 100644 --- a/cmd/serve.go +++ b/cmd/serve.go @@ -41,6 +41,7 @@ const ( AutoUpgradeFlag = "auto-upgrade" ExperimentalFeaturesFlag = "experimental-features" BulkMaxSizeFlag = "bulk-max-size" + BulkParallelFlag = "bulk-parallel" NumscriptInterpreterFlag = "experimental-numscript-interpreter" ) @@ -67,6 +68,11 @@ func NewServeCommand() *cobra.Command { return err } + bulkParallel, err := cmd.Flags().GetInt(BulkParallelFlag) + if err != nil { + return err + } + options := []fx.Option{ fx.NopLogger, otlp.FXModuleFromFlags(cmd), @@ -90,9 +96,12 @@ func NewServeCommand() *cobra.Command { bus.NewFxModule(), ballast.Module(serveConfiguration.ballastSize), api.Module(api.Config{ - Version: Version, - Debug: service.IsDebug(cmd), - BulkMaxSize: bulkMaxSize, + Version: Version, + Debug: service.IsDebug(cmd), + Bulk: api.BulkConfig{ + MaxSize: bulkMaxSize, + Parallel: bulkParallel, + }, }), fx.Decorate(func( params struct { @@ -129,6 +138,7 @@ func NewServeCommand() *cobra.Command { cmd.Flags().String(BindFlag, "0.0.0.0:3068", "API bind address") cmd.Flags().Bool(ExperimentalFeaturesFlag, false, "Enable features configurability") cmd.Flags().Int(BulkMaxSizeFlag, api.DefaultBulkMaxSize, "Bulk max size (default 100)") + cmd.Flags().Int(BulkParallelFlag, 10, "Bulk max parallelism") cmd.Flags().Bool(NumscriptInterpreterFlag, false, "Enable experimental numscript rewrite") service.AddFlags(cmd.Flags()) diff --git a/internal/api/module.go b/internal/api/module.go index 45a144452..d890e634f 100644 --- a/internal/api/module.go +++ b/internal/api/module.go @@ -4,16 +4,22 @@ import ( _ "embed" "github.com/formancehq/go-libs/v2/auth" "github.com/formancehq/go-libs/v2/health" + "github.com/formancehq/ledger/internal/controller/ledger" "github.com/formancehq/ledger/internal/controller/system" "github.com/go-chi/chi/v5" "go.opentelemetry.io/otel/trace" "go.uber.org/fx" ) +type BulkConfig struct { + MaxSize int + Parallel int +} + type Config struct { Version string Debug bool - BulkMaxSize int + Bulk BulkConfig } func Module(cfg Config) fx.Option { @@ -29,7 +35,10 @@ func Module(cfg Config) fx.Option { "develop", cfg.Debug, WithTracer(tracer.Tracer("api")), - WithBulkMaxSize(cfg.BulkMaxSize), + WithBulkMaxSize(cfg.Bulk.MaxSize), + WithBulkerFactory(ledger.NewDefaultBulkerFactory( + ledger.WithParallelism(cfg.Bulk.Parallel), + )), ) }), health.Module(), diff --git a/internal/api/router.go b/internal/api/router.go index a8407ac79..7c66e84a4 100644 --- a/internal/api/router.go +++ b/internal/api/router.go @@ -2,6 +2,7 @@ package api import ( "github.com/formancehq/go-libs/v2/api" + ledgercontroller "github.com/formancehq/ledger/internal/controller/ledger" "github.com/formancehq/ledger/internal/controller/system" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" @@ -66,6 +67,7 @@ func NewRouter( v2.WithTracer(routerOptions.tracer), v2.WithMiddlewares(commonMiddlewares...), v2.WithBulkMaxSize(routerOptions.bulkMaxSize), + v2.WithBulkerFactory(routerOptions.bulkerFactory), ) mux.Handle("/v2*", http.StripPrefix("/v2", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { chi.RouteContext(r.Context()).Reset() @@ -84,8 +86,9 @@ func NewRouter( } type routerOptions struct { - tracer trace.Tracer - bulkMaxSize int + tracer trace.Tracer + bulkMaxSize int + bulkerFactory ledgercontroller.BulkerFactory } type RouterOption func(ro *routerOptions) @@ -102,6 +105,12 @@ func WithBulkMaxSize(bulkMaxSize int) RouterOption { } } +func WithBulkerFactory(bf ledgercontroller.BulkerFactory) RouterOption { + return func(ro *routerOptions) { + ro.bulkerFactory = bf + } +} + var defaultRouterOptions = []RouterOption{ WithTracer(nooptracer.Tracer{}), WithBulkMaxSize(DefaultBulkMaxSize), diff --git a/internal/api/v2/controllers_bulk.go b/internal/api/v2/controllers_bulk.go index b246ec82c..2fc248e94 100644 --- a/internal/api/v2/controllers_bulk.go +++ b/internal/api/v2/controllers_bulk.go @@ -12,7 +12,7 @@ import ( ledgercontroller "github.com/formancehq/ledger/internal/controller/ledger" ) -func bulkHandler(bulkMaxSize int) http.HandlerFunc { +func bulkHandler(bulkerFactory ledgercontroller.BulkerFactory, bulkMaxSize int) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { b := ledgercontroller.Bulk{} if err := json.NewDecoder(r.Body).Decode(&b); err != nil { @@ -28,11 +28,11 @@ func bulkHandler(bulkMaxSize int) http.HandlerFunc { w.Header().Set("Content-Type", "application/json") ledgerController := common.LedgerFromContext(r.Context()) - bulker := ledgercontroller.NewBulker(ledgerController) - results, err := bulker.Run(r.Context(), - b, + + results, err := bulkerFactory.CreateBulker(ledgerController).Run(r.Context(), b, ledgercontroller.WithContinueOnFailure(api.QueryParamBool(r, "continueOnFailure")), ledgercontroller.WithAtomic(api.QueryParamBool(r, "atomic")), + ledgercontroller.WithParallel(api.QueryParamBool(r, "parallel")), ) if err != nil { api.InternalServerError(w, r, err) diff --git a/internal/api/v2/controllers_bulk_test.go b/internal/api/v2/controllers_bulk_test.go index b94eae76f..cea9bb27a 100644 --- a/internal/api/v2/controllers_bulk_test.go +++ b/internal/api/v2/controllers_bulk_test.go @@ -265,6 +265,10 @@ func TestBulk(t *testing.T) { ErrorCode: api.ErrorInternal, ErrorDescription: "unexpected error", ResponseType: "ERROR", + }, { + ErrorCode: api.ErrorInternal, + ErrorDescription: "canceled", + ResponseType: "ERROR", }}, expectError: true, }, diff --git a/internal/api/v2/routes.go b/internal/api/v2/routes.go index 0bc4394b7..eea3ef656 100644 --- a/internal/api/v2/routes.go +++ b/internal/api/v2/routes.go @@ -1,6 +1,7 @@ package v2 import ( + "github.com/formancehq/ledger/internal/controller/ledger" nooptracer "go.opentelemetry.io/otel/trace/noop" "net/http" @@ -52,7 +53,7 @@ func NewRouter( router.With(common.LedgerMiddleware(systemController, func(r *http.Request) string { return chi.URLParam(r, "ledger") }, routerOptions.tracer, "/_info")).Group(func(router chi.Router) { - router.Post("/_bulk", bulkHandler(routerOptions.bulkMaxSize)) + router.Post("/_bulk", bulkHandler(routerOptions.bulkerFactory, routerOptions.bulkMaxSize)) // LedgerController router.Get("/_info", getLedgerInfo) @@ -92,6 +93,7 @@ func NewRouter( type routerOptions struct { tracer trace.Tracer middlewares []func(http.Handler) http.Handler + bulkerFactory ledger.BulkerFactory bulkMaxSize int } @@ -115,6 +117,13 @@ func WithBulkMaxSize(bulkMaxSize int) RouterOption { } } +func WithBulkerFactory(bulkerFactory ledger.BulkerFactory) RouterOption { + return func(ro *routerOptions) { + ro.bulkerFactory = bulkerFactory + } +} + var defaultRouterOptions = []RouterOption{ WithTracer(nooptracer.Tracer{}), + WithBulkerFactory(ledger.NewDefaultBulkerFactory()), } diff --git a/internal/controller/ledger/bulker.go b/internal/controller/ledger/bulker.go index bd0078908..49e620fce 100644 --- a/internal/controller/ledger/bulker.go +++ b/internal/controller/ledger/bulker.go @@ -3,12 +3,15 @@ package ledger import ( "context" "encoding/json" + "errors" "fmt" + "github.com/alitto/pond" "github.com/formancehq/go-libs/v2/logging" "github.com/formancehq/go-libs/v2/metadata" "github.com/formancehq/go-libs/v2/pointer" "github.com/formancehq/go-libs/v2/time" ledger "github.com/formancehq/ledger/internal" + "sync" ) const ( @@ -96,39 +99,65 @@ func (req *TransactionRequest) ToRunScript(allowUnboundedOverdrafts bool) (*RunS } type Bulker struct { - ctrl Controller + ctrl Controller + parallelism int } -func (b *Bulker) run(ctx context.Context, ctrl Controller, bulk Bulk, continueOnFailure bool) (BulkResult, error) { +func (b *Bulker) run(ctx context.Context, ctrl Controller, bulk Bulk, continueOnFailure, parallel bool) (BulkResult, error) { - results := make([]BulkElementResult, 0, len(bulk)) + ctx, cancel := context.WithCancel(ctx) + defer cancel() - for _, element := range bulk { - ret, logID, err := b.processElement(ctx, ctrl, element) - if err != nil { - results = append(results, BulkElementResult{ - Error: err, - }) + parallelism := 1 + if parallel && b.parallelism != 0 { + parallelism = b.parallelism + } + + wp := pond.New(parallelism, len(bulk), pond.Context(ctx)) + results := sync.Map{} - if !continueOnFailure { - return results, nil + for index, element := range bulk { + wp.Submit(func() { + ret, logID, err := b.processElement(ctx, ctrl, element) + if err != nil { + results.Store(index, BulkElementResult{ + Error: err, + }) + + if !continueOnFailure { + cancel() + } + + return } + results.Store(index, BulkElementResult{ + Data: ret, + LogID: logID, + }) + }) + } + + wp.StopAndWait() + + finalResults := make(BulkResult, 0, len(bulk)) + for index := range bulk { + v, ok := results.Load(index) + if ok { + finalResults = append(finalResults, v.(BulkElementResult)) continue } - - results = append(results, BulkElementResult{ - Data: ret, - LogID: logID, + finalResults = append(finalResults, BulkElementResult{ + Error: errors.New("canceled"), }) } - return results, nil + return finalResults, nil } -func (b *Bulker) Run(ctx context.Context, bulk Bulk, providedOptions ... BulkOption) (BulkResult, error) { +func (b *Bulker) Run(ctx context.Context, bulk Bulk, providedOptions ...BulkingOption) (BulkResult, error) { - bulkOptions := BulkOptions{} + bulkOptions := BulkingOptions{} for _, option := range providedOptions { option(&bulkOptions) } @@ -167,7 +196,7 @@ func (b *Bulker) Run(ctx context.Context, bulk Bulk, providedOptions ... BulkOpt } } - results, err := b.run(ctx, ctrl, bulk, bulkOptions.ContinueOnFailure) + results, err := b.run(ctx, ctrl, bulk, bulkOptions.ContinueOnFailure, bulkOptions.Parallel) if err != nil { if bulkOptions.Atomic { if rollbackErr := ctrl.Rollback(ctx); rollbackErr != nil { @@ -333,25 +362,65 @@ func (b *Bulker) processElement(ctx context.Context, ctrl Controller, element Bu } } -func NewBulker(ctrl Controller) *Bulker { - return &Bulker{ctrl: ctrl} +func NewBulker(ctrl Controller, options ...BulkerOption) *Bulker { + ret := &Bulker{ctrl: ctrl} + for _, option := range options { + option(ret) + } + + return ret +} + +type BulkerOption func(bulker *Bulker) + +func WithParallelism(v int) BulkerOption { + return func(options *Bulker) { + options.parallelism = v + } } -type BulkOptions struct { +type BulkingOptions struct { ContinueOnFailure bool Atomic bool + Parallel bool } -type BulkOption func(*BulkOptions) +type BulkingOption func(*BulkingOptions) -func WithContinueOnFailure(v bool) BulkOption { - return func(options *BulkOptions) { +func WithContinueOnFailure(v bool) BulkingOption { + return func(options *BulkingOptions) { options.ContinueOnFailure = v } } -func WithAtomic(v bool) BulkOption { - return func(options *BulkOptions) { +func WithAtomic(v bool) BulkingOption { + return func(options *BulkingOptions) { options.Atomic = v } -} \ No newline at end of file +} + +func WithParallel(v bool) BulkingOption { + return func(options *BulkingOptions) { + options.Parallel = v + } +} + +type BulkerFactory interface { + CreateBulker(ctrl Controller) *Bulker +} + +type DefaultBulkerFactory struct { + Options []BulkerOption +} + +func (d *DefaultBulkerFactory) CreateBulker(ctrl Controller) *Bulker { + return NewBulker(ctrl, d.Options...) +} + +func NewDefaultBulkerFactory(options ...BulkerOption) *DefaultBulkerFactory { + return &DefaultBulkerFactory{ + Options: options, + } +} + +var _ BulkerFactory = (*DefaultBulkerFactory)(nil)