Skip to content

Commit

Permalink
Merge pull request #584 from formancehq/fix/bulk-defer-event-sending-…
Browse files Browse the repository at this point in the history
…at-commit

fix(bulk): defer events sending at commit
  • Loading branch information
gfyrag authored Nov 22, 2024
2 parents 8c53395 + 1e2acc4 commit ebb8982
Show file tree
Hide file tree
Showing 3 changed files with 172 additions and 86 deletions.
52 changes: 27 additions & 25 deletions internal/controller/ledger/bulker.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,30 +100,6 @@ type Bulker struct {
}

func (b *Bulker) run(ctx context.Context, ctrl Controller, bulk Bulk, continueOnFailure bool) (BulkResult, error) {
for i, element := range bulk {
switch element.Action {
case ActionCreateTransaction:
req := &TransactionRequest{}
if err := json.Unmarshal(element.Data, req); err != nil {
return nil, fmt.Errorf("error parsing element %d: %s", i, err)
}
case ActionAddMetadata:
req := &AddMetadataRequest{}
if err := json.Unmarshal(element.Data, req); err != nil {
return nil, fmt.Errorf("error parsing element %d: %s", i, err)
}
case ActionRevertTransaction:
req := &RevertTransactionRequest{}
if err := json.Unmarshal(element.Data, req); err != nil {
return nil, fmt.Errorf("error parsing element %d: %s", i, err)
}
case ActionDeleteMetadata:
req := &DeleteMetadataRequest{}
if err := json.Unmarshal(element.Data, req); err != nil {
return nil, fmt.Errorf("error parsing element %d: %s", i, err)
}
}
}

results := make([]BulkElementResult, 0, len(bulk))

Expand Down Expand Up @@ -151,10 +127,36 @@ func (b *Bulker) run(ctx context.Context, ctrl Controller, bulk Bulk, continueOn
}

func (b *Bulker) Run(ctx context.Context, bulk Bulk, continueOnFailure, atomic bool) (BulkResult, error) {

for i, element := range bulk {
switch element.Action {
case ActionCreateTransaction:
req := &TransactionRequest{}
if err := json.Unmarshal(element.Data, req); err != nil {
return nil, fmt.Errorf("error parsing element %d: %s", i, err)
}
case ActionAddMetadata:
req := &AddMetadataRequest{}
if err := json.Unmarshal(element.Data, req); err != nil {
return nil, fmt.Errorf("error parsing element %d: %s", i, err)
}
case ActionRevertTransaction:
req := &RevertTransactionRequest{}
if err := json.Unmarshal(element.Data, req); err != nil {
return nil, fmt.Errorf("error parsing element %d: %s", i, err)
}
case ActionDeleteMetadata:
req := &DeleteMetadataRequest{}
if err := json.Unmarshal(element.Data, req); err != nil {
return nil, fmt.Errorf("error parsing element %d: %s", i, err)
}
}
}

ctrl := b.ctrl
if atomic {
var err error
ctrl, err = b.ctrl.BeginTX(ctx, nil)
ctrl, err = ctrl.BeginTX(ctx, nil)
if err != nil {
return nil, fmt.Errorf("error starting transaction: %s", err)
}
Expand Down
146 changes: 98 additions & 48 deletions internal/controller/ledger/controller_with_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ type ControllerWithEvents struct {
Controller
ledger ledger.Ledger
listener Listener
atCommit []func()
parent *ControllerWithEvents
hasTx bool
}

func NewControllerWithEvents(ledger ledger.Ledger, underlying Controller, listener Listener) *ControllerWithEvents {
Expand All @@ -20,102 +23,128 @@ func NewControllerWithEvents(ledger ledger.Ledger, underlying Controller, listen
listener: listener,
}
}
func (ctrl *ControllerWithEvents) CreateTransaction(ctx context.Context, parameters Parameters[RunScript]) (*ledger.Log, *ledger.CreatedTransaction, error) {
log, ret, err := ctrl.Controller.CreateTransaction(ctx, parameters)

func (c *ControllerWithEvents) handleEvent(ctx context.Context, fn func()) {
if !c.hasTx {
fn()
return
}
if c.parent != nil && c.parent.hasTx {
c.parent.handleEvent(ctx, fn)
return
}

c.atCommit = append(c.atCommit, fn)
}

func (c *ControllerWithEvents) CreateTransaction(ctx context.Context, parameters Parameters[RunScript]) (*ledger.Log, *ledger.CreatedTransaction, error) {
log, ret, err := c.Controller.CreateTransaction(ctx, parameters)
if err != nil {
return nil, nil, err
}
if !parameters.DryRun {
ctrl.listener.CommittedTransactions(ctx, ctrl.ledger.Name, ret.Transaction, ret.AccountMetadata)
c.handleEvent(ctx, func() {
c.listener.CommittedTransactions(ctx, c.ledger.Name, ret.Transaction, ret.AccountMetadata)
})
}

return log, ret, nil
}

func (ctrl *ControllerWithEvents) RevertTransaction(ctx context.Context, parameters Parameters[RevertTransaction]) (*ledger.Log, *ledger.RevertedTransaction, error) {
log, ret, err := ctrl.Controller.RevertTransaction(ctx, parameters)
func (c *ControllerWithEvents) RevertTransaction(ctx context.Context, parameters Parameters[RevertTransaction]) (*ledger.Log, *ledger.RevertedTransaction, error) {
log, ret, err := c.Controller.RevertTransaction(ctx, parameters)
if err != nil {
return nil, nil, err
}
if !parameters.DryRun {
ctrl.listener.RevertedTransaction(
ctx,
ctrl.ledger.Name,
ret.RevertedTransaction,
ret.RevertedTransaction,
)
c.handleEvent(ctx, func() {
c.listener.RevertedTransaction(
ctx,
c.ledger.Name,
ret.RevertedTransaction,
ret.RevertedTransaction,
)
})
}

return log, ret, nil
}

func (ctrl *ControllerWithEvents) SaveTransactionMetadata(ctx context.Context, parameters Parameters[SaveTransactionMetadata]) (*ledger.Log, error) {
log, err := ctrl.Controller.SaveTransactionMetadata(ctx, parameters)
func (c *ControllerWithEvents) SaveTransactionMetadata(ctx context.Context, parameters Parameters[SaveTransactionMetadata]) (*ledger.Log, error) {
log, err := c.Controller.SaveTransactionMetadata(ctx, parameters)
if err != nil {
return nil, err
}
if !parameters.DryRun {
ctrl.listener.SavedMetadata(
ctx,
ctrl.ledger.Name,
ledger.MetaTargetTypeTransaction,
fmt.Sprint(parameters.Input.TransactionID),
parameters.Input.Metadata,
)
c.handleEvent(ctx, func() {
c.listener.SavedMetadata(
ctx,
c.ledger.Name,
ledger.MetaTargetTypeTransaction,
fmt.Sprint(parameters.Input.TransactionID),
parameters.Input.Metadata,
)
})
}

return log, nil
}

func (ctrl *ControllerWithEvents) SaveAccountMetadata(ctx context.Context, parameters Parameters[SaveAccountMetadata]) (*ledger.Log, error) {
log, err := ctrl.Controller.SaveAccountMetadata(ctx, parameters)
func (c *ControllerWithEvents) SaveAccountMetadata(ctx context.Context, parameters Parameters[SaveAccountMetadata]) (*ledger.Log, error) {
log, err := c.Controller.SaveAccountMetadata(ctx, parameters)
if err != nil {
return nil, err
}
if !parameters.DryRun {
ctrl.listener.SavedMetadata(
ctx,
ctrl.ledger.Name,
ledger.MetaTargetTypeAccount,
parameters.Input.Address,
parameters.Input.Metadata,
)
c.handleEvent(ctx, func() {
c.listener.SavedMetadata(
ctx,
c.ledger.Name,
ledger.MetaTargetTypeAccount,
parameters.Input.Address,
parameters.Input.Metadata,
)
})
}

return log, nil
}

func (ctrl *ControllerWithEvents) DeleteTransactionMetadata(ctx context.Context, parameters Parameters[DeleteTransactionMetadata]) (*ledger.Log, error) {
log, err := ctrl.Controller.DeleteTransactionMetadata(ctx, parameters)
func (c *ControllerWithEvents) DeleteTransactionMetadata(ctx context.Context, parameters Parameters[DeleteTransactionMetadata]) (*ledger.Log, error) {
log, err := c.Controller.DeleteTransactionMetadata(ctx, parameters)
if err != nil {
return nil, err
}
if !parameters.DryRun {
ctrl.listener.DeletedMetadata(
ctx,
ctrl.ledger.Name,
ledger.MetaTargetTypeTransaction,
fmt.Sprint(parameters.Input.TransactionID),
parameters.Input.Key,
)
c.handleEvent(ctx, func() {
c.listener.DeletedMetadata(
ctx,
c.ledger.Name,
ledger.MetaTargetTypeTransaction,
fmt.Sprint(parameters.Input.TransactionID),
parameters.Input.Key,
)
})
}

return log, nil
}

func (ctrl *ControllerWithEvents) DeleteAccountMetadata(ctx context.Context, parameters Parameters[DeleteAccountMetadata]) (*ledger.Log, error) {
log, err := ctrl.Controller.DeleteAccountMetadata(ctx, parameters)
func (c *ControllerWithEvents) DeleteAccountMetadata(ctx context.Context, parameters Parameters[DeleteAccountMetadata]) (*ledger.Log, error) {
log, err := c.Controller.DeleteAccountMetadata(ctx, parameters)
if err != nil {
return nil, err
}
if !parameters.DryRun {
ctrl.listener.DeletedMetadata(
ctx,
ctrl.ledger.Name,
ledger.MetaTargetTypeAccount,
parameters.Input.Address,
parameters.Input.Key,
)
c.handleEvent(ctx, func() {
c.listener.DeletedMetadata(
ctx,
c.ledger.Name,
ledger.MetaTargetTypeAccount,
parameters.Input.Address,
parameters.Input.Key,
)
})
}

return log, nil
Expand All @@ -130,8 +159,29 @@ func (c *ControllerWithEvents) BeginTX(ctx context.Context, options *sql.TxOptio
return &ControllerWithEvents{
ledger: c.ledger,
Controller: ctrl,
listener: c.listener,
listener: c.listener,
parent: c,
hasTx: true,
}, nil
}

func (c *ControllerWithEvents) Commit(ctx context.Context) error {
err := c.Controller.Commit(ctx)
if err != nil {
return err
}

for _, f := range c.atCommit {
f()
}

return nil
}

func (c *ControllerWithEvents) Rollback(ctx context.Context) error {
c.atCommit = nil

return c.Controller.Rollback(ctx)
}

var _ Controller = (*ControllerWithEvents)(nil)
Loading

0 comments on commit ebb8982

Please sign in to comment.