Skip to content

Commit

Permalink
feat: add parallel bulk
Browse files Browse the repository at this point in the history
  • Loading branch information
gfyrag committed Nov 25, 2024
1 parent 1f431e8 commit 6bf16a6
Show file tree
Hide file tree
Showing 7 changed files with 150 additions and 40 deletions.
16 changes: 13 additions & 3 deletions cmd/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ const (
AutoUpgradeFlag = "auto-upgrade"
ExperimentalFeaturesFlag = "experimental-features"
BulkMaxSizeFlag = "bulk-max-size"
BulkParallelFlag = "bulk-parallel"
NumscriptInterpreterFlag = "experimental-numscript-interpreter"
)

Expand All @@ -67,6 +68,11 @@ func NewServeCommand() *cobra.Command {
return err
}

bulkParallel, err := cmd.Flags().GetInt(BulkParallelFlag)
if err != nil {
return err
}

options := []fx.Option{
fx.NopLogger,
otlp.FXModuleFromFlags(cmd),
Expand All @@ -90,9 +96,12 @@ func NewServeCommand() *cobra.Command {
bus.NewFxModule(),
ballast.Module(serveConfiguration.ballastSize),
api.Module(api.Config{
Version: Version,
Debug: service.IsDebug(cmd),
BulkMaxSize: bulkMaxSize,
Version: Version,
Debug: service.IsDebug(cmd),
Bulk: api.BulkConfig{
MaxSize: bulkMaxSize,
Parallel: bulkParallel,
},
}),
fx.Decorate(func(
params struct {
Expand Down Expand Up @@ -129,6 +138,7 @@ func NewServeCommand() *cobra.Command {
cmd.Flags().String(BindFlag, "0.0.0.0:3068", "API bind address")
cmd.Flags().Bool(ExperimentalFeaturesFlag, false, "Enable features configurability")
cmd.Flags().Int(BulkMaxSizeFlag, api.DefaultBulkMaxSize, "Bulk max size (default 100)")
cmd.Flags().Int(BulkParallelFlag, 10, "Bulk max parallelism")
cmd.Flags().Bool(NumscriptInterpreterFlag, false, "Enable experimental numscript rewrite")

service.AddFlags(cmd.Flags())
Expand Down
13 changes: 11 additions & 2 deletions internal/api/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,22 @@ import (
_ "embed"
"github.com/formancehq/go-libs/v2/auth"
"github.com/formancehq/go-libs/v2/health"
"github.com/formancehq/ledger/internal/controller/ledger"
"github.com/formancehq/ledger/internal/controller/system"
"github.com/go-chi/chi/v5"
"go.opentelemetry.io/otel/trace"
"go.uber.org/fx"
)

type BulkConfig struct {
MaxSize int
Parallel int
}

type Config struct {
Version string
Debug bool
BulkMaxSize int
Bulk BulkConfig
}

func Module(cfg Config) fx.Option {
Expand All @@ -29,7 +35,10 @@ func Module(cfg Config) fx.Option {
"develop",
cfg.Debug,
WithTracer(tracer.Tracer("api")),
WithBulkMaxSize(cfg.BulkMaxSize),
WithBulkMaxSize(cfg.Bulk.MaxSize),
WithBulkerFactory(ledger.NewDefaultBulkerFactory(
ledger.WithParallelism(cfg.Bulk.Parallel),
)),
)
}),
health.Module(),
Expand Down
13 changes: 11 additions & 2 deletions internal/api/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package api

import (
"github.com/formancehq/go-libs/v2/api"
ledgercontroller "github.com/formancehq/ledger/internal/controller/ledger"
"github.com/formancehq/ledger/internal/controller/system"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
Expand Down Expand Up @@ -66,6 +67,7 @@ func NewRouter(
v2.WithTracer(routerOptions.tracer),
v2.WithMiddlewares(commonMiddlewares...),
v2.WithBulkMaxSize(routerOptions.bulkMaxSize),
v2.WithBulkerFactory(routerOptions.bulkerFactory),
)
mux.Handle("/v2*", http.StripPrefix("/v2", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
chi.RouteContext(r.Context()).Reset()
Expand All @@ -84,8 +86,9 @@ func NewRouter(
}

type routerOptions struct {
tracer trace.Tracer
bulkMaxSize int
tracer trace.Tracer
bulkMaxSize int
bulkerFactory ledgercontroller.BulkerFactory
}

type RouterOption func(ro *routerOptions)
Expand All @@ -102,6 +105,12 @@ func WithBulkMaxSize(bulkMaxSize int) RouterOption {
}
}

func WithBulkerFactory(bf ledgercontroller.BulkerFactory) RouterOption {
return func(ro *routerOptions) {
ro.bulkerFactory = bf
}
}

var defaultRouterOptions = []RouterOption{
WithTracer(nooptracer.Tracer{}),
WithBulkMaxSize(DefaultBulkMaxSize),
Expand Down
8 changes: 4 additions & 4 deletions internal/api/v2/controllers_bulk.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
ledgercontroller "github.com/formancehq/ledger/internal/controller/ledger"
)

func bulkHandler(bulkMaxSize int) http.HandlerFunc {
func bulkHandler(bulkerFactory ledgercontroller.BulkerFactory, bulkMaxSize int) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
b := ledgercontroller.Bulk{}
if err := json.NewDecoder(r.Body).Decode(&b); err != nil {
Expand All @@ -28,11 +28,11 @@ func bulkHandler(bulkMaxSize int) http.HandlerFunc {
w.Header().Set("Content-Type", "application/json")

ledgerController := common.LedgerFromContext(r.Context())
bulker := ledgercontroller.NewBulker(ledgerController)
results, err := bulker.Run(r.Context(),
b,

results, err := bulkerFactory.CreateBulker(ledgerController).Run(r.Context(), b,
ledgercontroller.WithContinueOnFailure(api.QueryParamBool(r, "continueOnFailure")),
ledgercontroller.WithAtomic(api.QueryParamBool(r, "atomic")),
ledgercontroller.WithParallel(api.QueryParamBool(r, "parallel")),
)
if err != nil {
api.InternalServerError(w, r, err)
Expand Down
4 changes: 4 additions & 0 deletions internal/api/v2/controllers_bulk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,10 @@ func TestBulk(t *testing.T) {
ErrorCode: api.ErrorInternal,
ErrorDescription: "unexpected error",
ResponseType: "ERROR",
}, {
ErrorCode: api.ErrorInternal,
ErrorDescription: "canceled",
ResponseType: "ERROR",
}},
expectError: true,
},
Expand Down
11 changes: 10 additions & 1 deletion internal/api/v2/routes.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package v2

import (
"github.com/formancehq/ledger/internal/controller/ledger"
nooptracer "go.opentelemetry.io/otel/trace/noop"
"net/http"

Expand Down Expand Up @@ -52,7 +53,7 @@ func NewRouter(
router.With(common.LedgerMiddleware(systemController, func(r *http.Request) string {
return chi.URLParam(r, "ledger")
}, routerOptions.tracer, "/_info")).Group(func(router chi.Router) {
router.Post("/_bulk", bulkHandler(routerOptions.bulkMaxSize))
router.Post("/_bulk", bulkHandler(routerOptions.bulkerFactory, routerOptions.bulkMaxSize))

// LedgerController
router.Get("/_info", getLedgerInfo)
Expand Down Expand Up @@ -92,6 +93,7 @@ func NewRouter(
type routerOptions struct {
tracer trace.Tracer
middlewares []func(http.Handler) http.Handler
bulkerFactory ledger.BulkerFactory
bulkMaxSize int
}

Expand All @@ -115,6 +117,13 @@ func WithBulkMaxSize(bulkMaxSize int) RouterOption {
}
}

func WithBulkerFactory(bulkerFactory ledger.BulkerFactory) RouterOption {
return func(ro *routerOptions) {
ro.bulkerFactory = bulkerFactory
}
}

var defaultRouterOptions = []RouterOption{
WithTracer(nooptracer.Tracer{}),
WithBulkerFactory(ledger.NewDefaultBulkerFactory()),
}
125 changes: 97 additions & 28 deletions internal/controller/ledger/bulker.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,15 @@ package ledger
import (
"context"
"encoding/json"
"errors"
"fmt"
"github.com/alitto/pond"
"github.com/formancehq/go-libs/v2/logging"
"github.com/formancehq/go-libs/v2/metadata"
"github.com/formancehq/go-libs/v2/pointer"
"github.com/formancehq/go-libs/v2/time"
ledger "github.com/formancehq/ledger/internal"
"sync"
)

const (
Expand Down Expand Up @@ -96,39 +99,65 @@ func (req *TransactionRequest) ToRunScript(allowUnboundedOverdrafts bool) (*RunS
}

type Bulker struct {
ctrl Controller
ctrl Controller
parallelism int
}

func (b *Bulker) run(ctx context.Context, ctrl Controller, bulk Bulk, continueOnFailure bool) (BulkResult, error) {
func (b *Bulker) run(ctx context.Context, ctrl Controller, bulk Bulk, continueOnFailure, parallel bool) (BulkResult, error) {

results := make([]BulkElementResult, 0, len(bulk))
ctx, cancel := context.WithCancel(ctx)
defer cancel()

for _, element := range bulk {
ret, logID, err := b.processElement(ctx, ctrl, element)
if err != nil {
results = append(results, BulkElementResult{
Error: err,
})
parallelism := 1
if parallel && b.parallelism != 0 {
parallelism = b.parallelism
}

wp := pond.New(parallelism, len(bulk), pond.Context(ctx))
results := sync.Map{}

if !continueOnFailure {
return results, nil
for index, element := range bulk {
wp.Submit(func() {
ret, logID, err := b.processElement(ctx, ctrl, element)
if err != nil {
results.Store(index, BulkElementResult{
Error: err,
})

if !continueOnFailure {
cancel()
}

return
}

results.Store(index, BulkElementResult{
Data: ret,
LogID: logID,
})
})
}

wp.StopAndWait()

finalResults := make(BulkResult, 0, len(bulk))
for index := range bulk {
v, ok := results.Load(index)
if ok {
finalResults = append(finalResults, v.(BulkElementResult))
continue
}

results = append(results, BulkElementResult{
Data: ret,
LogID: logID,
finalResults = append(finalResults, BulkElementResult{
Error: errors.New("canceled"),
})
}

return results, nil
return finalResults, nil
}

func (b *Bulker) Run(ctx context.Context, bulk Bulk, providedOptions ... BulkOption) (BulkResult, error) {
func (b *Bulker) Run(ctx context.Context, bulk Bulk, providedOptions ...BulkingOption) (BulkResult, error) {

bulkOptions := BulkOptions{}
bulkOptions := BulkingOptions{}
for _, option := range providedOptions {
option(&bulkOptions)
}
Expand Down Expand Up @@ -167,7 +196,7 @@ func (b *Bulker) Run(ctx context.Context, bulk Bulk, providedOptions ... BulkOpt
}
}

results, err := b.run(ctx, ctrl, bulk, bulkOptions.ContinueOnFailure)
results, err := b.run(ctx, ctrl, bulk, bulkOptions.ContinueOnFailure, bulkOptions.Parallel)
if err != nil {
if bulkOptions.Atomic {
if rollbackErr := ctrl.Rollback(ctx); rollbackErr != nil {
Expand Down Expand Up @@ -333,25 +362,65 @@ func (b *Bulker) processElement(ctx context.Context, ctrl Controller, element Bu
}
}

func NewBulker(ctrl Controller) *Bulker {
return &Bulker{ctrl: ctrl}
func NewBulker(ctrl Controller, options ...BulkerOption) *Bulker {
ret := &Bulker{ctrl: ctrl}
for _, option := range options {
option(ret)
}

return ret
}

type BulkerOption func(bulker *Bulker)

func WithParallelism(v int) BulkerOption {
return func(options *Bulker) {
options.parallelism = v
}
}

type BulkOptions struct {
type BulkingOptions struct {
ContinueOnFailure bool
Atomic bool
Parallel bool
}

type BulkOption func(*BulkOptions)
type BulkingOption func(*BulkingOptions)

func WithContinueOnFailure(v bool) BulkOption {
return func(options *BulkOptions) {
func WithContinueOnFailure(v bool) BulkingOption {
return func(options *BulkingOptions) {
options.ContinueOnFailure = v
}
}

func WithAtomic(v bool) BulkOption {
return func(options *BulkOptions) {
func WithAtomic(v bool) BulkingOption {
return func(options *BulkingOptions) {
options.Atomic = v
}
}
}

func WithParallel(v bool) BulkingOption {
return func(options *BulkingOptions) {
options.Parallel = v
}
}

type BulkerFactory interface {
CreateBulker(ctrl Controller) *Bulker
}

type DefaultBulkerFactory struct {
Options []BulkerOption
}

func (d *DefaultBulkerFactory) CreateBulker(ctrl Controller) *Bulker {
return NewBulker(ctrl, d.Options...)
}

func NewDefaultBulkerFactory(options ...BulkerOption) *DefaultBulkerFactory {
return &DefaultBulkerFactory{
Options: options,
}
}

var _ BulkerFactory = (*DefaultBulkerFactory)(nil)

0 comments on commit 6bf16a6

Please sign in to comment.