Skip to content

Commit

Permalink
feat: add bulk options
Browse files Browse the repository at this point in the history
  • Loading branch information
gfyrag committed Nov 25, 2024
1 parent 26eaec1 commit 1f431e8
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 7 deletions.
4 changes: 2 additions & 2 deletions internal/api/v2/controllers_bulk.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ func bulkHandler(bulkMaxSize int) http.HandlerFunc {
bulker := ledgercontroller.NewBulker(ledgerController)
results, err := bulker.Run(r.Context(),
b,
api.QueryParamBool(r, "continueOnFailure"),
api.QueryParamBool(r, "atomic"),
ledgercontroller.WithContinueOnFailure(api.QueryParamBool(r, "continueOnFailure")),
ledgercontroller.WithAtomic(api.QueryParamBool(r, "atomic")),
)
if err != nil {
api.InternalServerError(w, r, err)
Expand Down
34 changes: 29 additions & 5 deletions internal/controller/ledger/bulker.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,12 @@ func (b *Bulker) run(ctx context.Context, ctrl Controller, bulk Bulk, continueOn
return results, nil
}

func (b *Bulker) Run(ctx context.Context, bulk Bulk, continueOnFailure, atomic bool) (BulkResult, error) {
func (b *Bulker) Run(ctx context.Context, bulk Bulk, providedOptions ... BulkOption) (BulkResult, error) {

bulkOptions := BulkOptions{}
for _, option := range providedOptions {
option(&bulkOptions)
}

for i, element := range bulk {
switch element.Action {
Expand Down Expand Up @@ -154,17 +159,17 @@ func (b *Bulker) Run(ctx context.Context, bulk Bulk, continueOnFailure, atomic b
}

ctrl := b.ctrl
if atomic {
if bulkOptions.Atomic {
var err error
ctrl, err = ctrl.BeginTX(ctx, nil)
if err != nil {
return nil, fmt.Errorf("error starting transaction: %s", err)
}
}

results, err := b.run(ctx, ctrl, bulk, continueOnFailure)
results, err := b.run(ctx, ctrl, bulk, bulkOptions.ContinueOnFailure)
if err != nil {
if atomic {
if bulkOptions.Atomic {
if rollbackErr := ctrl.Rollback(ctx); rollbackErr != nil {
logging.FromContext(ctx).Errorf("failed to rollback transaction: %v", rollbackErr)
}
Expand All @@ -173,7 +178,7 @@ func (b *Bulker) Run(ctx context.Context, bulk Bulk, continueOnFailure, atomic b
return nil, fmt.Errorf("error running bulk: %s", err)
}

if atomic {
if bulkOptions.Atomic {
if results.HasErrors() {
if rollbackErr := ctrl.Rollback(ctx); rollbackErr != nil {
logging.FromContext(ctx).Errorf("failed to rollback transaction: %v", rollbackErr)
Expand Down Expand Up @@ -331,3 +336,22 @@ func (b *Bulker) processElement(ctx context.Context, ctrl Controller, element Bu
func NewBulker(ctrl Controller) *Bulker {
return &Bulker{ctrl: ctrl}
}

type BulkOptions struct {
ContinueOnFailure bool
Atomic bool
}

type BulkOption func(*BulkOptions)

func WithContinueOnFailure(v bool) BulkOption {
return func(options *BulkOptions) {
options.ContinueOnFailure = v
}
}

func WithAtomic(v bool) BulkOption {
return func(options *BulkOptions) {
options.Atomic = v
}
}

0 comments on commit 1f431e8

Please sign in to comment.