From 1e2acc4f3b3b1ccbe1f2dd871a6e8796af1be084 Mon Sep 17 00:00:00 2001 From: Geoffrey Ragot Date: Fri, 22 Nov 2024 14:58:54 +0100 Subject: [PATCH] fix(bulk): defer events sending at commit --- internal/controller/ledger/bulker.go | 52 ++++--- .../ledger/controller_with_events.go | 146 ++++++++++++------ test/e2e/api_bulk_test.go | 60 +++++-- 3 files changed, 172 insertions(+), 86 deletions(-) diff --git a/internal/controller/ledger/bulker.go b/internal/controller/ledger/bulker.go index eccb169db..a1d79b2ac 100644 --- a/internal/controller/ledger/bulker.go +++ b/internal/controller/ledger/bulker.go @@ -100,30 +100,6 @@ type Bulker struct { } func (b *Bulker) run(ctx context.Context, ctrl Controller, bulk Bulk, continueOnFailure bool) (BulkResult, error) { - for i, element := range bulk { - switch element.Action { - case ActionCreateTransaction: - req := &TransactionRequest{} - if err := json.Unmarshal(element.Data, req); err != nil { - return nil, fmt.Errorf("error parsing element %d: %s", i, err) - } - case ActionAddMetadata: - req := &AddMetadataRequest{} - if err := json.Unmarshal(element.Data, req); err != nil { - return nil, fmt.Errorf("error parsing element %d: %s", i, err) - } - case ActionRevertTransaction: - req := &RevertTransactionRequest{} - if err := json.Unmarshal(element.Data, req); err != nil { - return nil, fmt.Errorf("error parsing element %d: %s", i, err) - } - case ActionDeleteMetadata: - req := &DeleteMetadataRequest{} - if err := json.Unmarshal(element.Data, req); err != nil { - return nil, fmt.Errorf("error parsing element %d: %s", i, err) - } - } - } results := make([]BulkElementResult, 0, len(bulk)) @@ -151,10 +127,36 @@ func (b *Bulker) run(ctx context.Context, ctrl Controller, bulk Bulk, continueOn } func (b *Bulker) Run(ctx context.Context, bulk Bulk, continueOnFailure, atomic bool) (BulkResult, error) { + + for i, element := range bulk { + switch element.Action { + case ActionCreateTransaction: + req := &TransactionRequest{} + if err := json.Unmarshal(element.Data, req); err != nil { + return nil, fmt.Errorf("error parsing element %d: %s", i, err) + } + case ActionAddMetadata: + req := &AddMetadataRequest{} + if err := json.Unmarshal(element.Data, req); err != nil { + return nil, fmt.Errorf("error parsing element %d: %s", i, err) + } + case ActionRevertTransaction: + req := &RevertTransactionRequest{} + if err := json.Unmarshal(element.Data, req); err != nil { + return nil, fmt.Errorf("error parsing element %d: %s", i, err) + } + case ActionDeleteMetadata: + req := &DeleteMetadataRequest{} + if err := json.Unmarshal(element.Data, req); err != nil { + return nil, fmt.Errorf("error parsing element %d: %s", i, err) + } + } + } + ctrl := b.ctrl if atomic { var err error - ctrl, err = b.ctrl.BeginTX(ctx, nil) + ctrl, err = ctrl.BeginTX(ctx, nil) if err != nil { return nil, fmt.Errorf("error starting transaction: %s", err) } diff --git a/internal/controller/ledger/controller_with_events.go b/internal/controller/ledger/controller_with_events.go index dafbe5255..af97bfad5 100644 --- a/internal/controller/ledger/controller_with_events.go +++ b/internal/controller/ledger/controller_with_events.go @@ -11,6 +11,9 @@ type ControllerWithEvents struct { Controller ledger ledger.Ledger listener Listener + atCommit []func() + parent *ControllerWithEvents + hasTx bool } func NewControllerWithEvents(ledger ledger.Ledger, underlying Controller, listener Listener) *ControllerWithEvents { @@ -20,102 +23,128 @@ func NewControllerWithEvents(ledger ledger.Ledger, underlying Controller, listen listener: listener, } } -func (ctrl *ControllerWithEvents) CreateTransaction(ctx context.Context, parameters Parameters[RunScript]) (*ledger.Log, *ledger.CreatedTransaction, error) { - log, ret, err := ctrl.Controller.CreateTransaction(ctx, parameters) + +func (c *ControllerWithEvents) handleEvent(ctx context.Context, fn func()) { + if !c.hasTx { + fn() + return + } + if c.parent != nil && c.parent.hasTx { + c.parent.handleEvent(ctx, fn) + return + } + + c.atCommit = append(c.atCommit, fn) +} + +func (c *ControllerWithEvents) CreateTransaction(ctx context.Context, parameters Parameters[RunScript]) (*ledger.Log, *ledger.CreatedTransaction, error) { + log, ret, err := c.Controller.CreateTransaction(ctx, parameters) if err != nil { return nil, nil, err } if !parameters.DryRun { - ctrl.listener.CommittedTransactions(ctx, ctrl.ledger.Name, ret.Transaction, ret.AccountMetadata) + c.handleEvent(ctx, func() { + c.listener.CommittedTransactions(ctx, c.ledger.Name, ret.Transaction, ret.AccountMetadata) + }) } return log, ret, nil } -func (ctrl *ControllerWithEvents) RevertTransaction(ctx context.Context, parameters Parameters[RevertTransaction]) (*ledger.Log, *ledger.RevertedTransaction, error) { - log, ret, err := ctrl.Controller.RevertTransaction(ctx, parameters) +func (c *ControllerWithEvents) RevertTransaction(ctx context.Context, parameters Parameters[RevertTransaction]) (*ledger.Log, *ledger.RevertedTransaction, error) { + log, ret, err := c.Controller.RevertTransaction(ctx, parameters) if err != nil { return nil, nil, err } if !parameters.DryRun { - ctrl.listener.RevertedTransaction( - ctx, - ctrl.ledger.Name, - ret.RevertedTransaction, - ret.RevertedTransaction, - ) + c.handleEvent(ctx, func() { + c.listener.RevertedTransaction( + ctx, + c.ledger.Name, + ret.RevertedTransaction, + ret.RevertedTransaction, + ) + }) } return log, ret, nil } -func (ctrl *ControllerWithEvents) SaveTransactionMetadata(ctx context.Context, parameters Parameters[SaveTransactionMetadata]) (*ledger.Log, error) { - log, err := ctrl.Controller.SaveTransactionMetadata(ctx, parameters) +func (c *ControllerWithEvents) SaveTransactionMetadata(ctx context.Context, parameters Parameters[SaveTransactionMetadata]) (*ledger.Log, error) { + log, err := c.Controller.SaveTransactionMetadata(ctx, parameters) if err != nil { return nil, err } if !parameters.DryRun { - ctrl.listener.SavedMetadata( - ctx, - ctrl.ledger.Name, - ledger.MetaTargetTypeTransaction, - fmt.Sprint(parameters.Input.TransactionID), - parameters.Input.Metadata, - ) + c.handleEvent(ctx, func() { + c.listener.SavedMetadata( + ctx, + c.ledger.Name, + ledger.MetaTargetTypeTransaction, + fmt.Sprint(parameters.Input.TransactionID), + parameters.Input.Metadata, + ) + }) } return log, nil } -func (ctrl *ControllerWithEvents) SaveAccountMetadata(ctx context.Context, parameters Parameters[SaveAccountMetadata]) (*ledger.Log, error) { - log, err := ctrl.Controller.SaveAccountMetadata(ctx, parameters) +func (c *ControllerWithEvents) SaveAccountMetadata(ctx context.Context, parameters Parameters[SaveAccountMetadata]) (*ledger.Log, error) { + log, err := c.Controller.SaveAccountMetadata(ctx, parameters) if err != nil { return nil, err } if !parameters.DryRun { - ctrl.listener.SavedMetadata( - ctx, - ctrl.ledger.Name, - ledger.MetaTargetTypeAccount, - parameters.Input.Address, - parameters.Input.Metadata, - ) + c.handleEvent(ctx, func() { + c.listener.SavedMetadata( + ctx, + c.ledger.Name, + ledger.MetaTargetTypeAccount, + parameters.Input.Address, + parameters.Input.Metadata, + ) + }) } return log, nil } -func (ctrl *ControllerWithEvents) DeleteTransactionMetadata(ctx context.Context, parameters Parameters[DeleteTransactionMetadata]) (*ledger.Log, error) { - log, err := ctrl.Controller.DeleteTransactionMetadata(ctx, parameters) +func (c *ControllerWithEvents) DeleteTransactionMetadata(ctx context.Context, parameters Parameters[DeleteTransactionMetadata]) (*ledger.Log, error) { + log, err := c.Controller.DeleteTransactionMetadata(ctx, parameters) if err != nil { return nil, err } if !parameters.DryRun { - ctrl.listener.DeletedMetadata( - ctx, - ctrl.ledger.Name, - ledger.MetaTargetTypeTransaction, - fmt.Sprint(parameters.Input.TransactionID), - parameters.Input.Key, - ) + c.handleEvent(ctx, func() { + c.listener.DeletedMetadata( + ctx, + c.ledger.Name, + ledger.MetaTargetTypeTransaction, + fmt.Sprint(parameters.Input.TransactionID), + parameters.Input.Key, + ) + }) } return log, nil } -func (ctrl *ControllerWithEvents) DeleteAccountMetadata(ctx context.Context, parameters Parameters[DeleteAccountMetadata]) (*ledger.Log, error) { - log, err := ctrl.Controller.DeleteAccountMetadata(ctx, parameters) +func (c *ControllerWithEvents) DeleteAccountMetadata(ctx context.Context, parameters Parameters[DeleteAccountMetadata]) (*ledger.Log, error) { + log, err := c.Controller.DeleteAccountMetadata(ctx, parameters) if err != nil { return nil, err } if !parameters.DryRun { - ctrl.listener.DeletedMetadata( - ctx, - ctrl.ledger.Name, - ledger.MetaTargetTypeAccount, - parameters.Input.Address, - parameters.Input.Key, - ) + c.handleEvent(ctx, func() { + c.listener.DeletedMetadata( + ctx, + c.ledger.Name, + ledger.MetaTargetTypeAccount, + parameters.Input.Address, + parameters.Input.Key, + ) + }) } return log, nil @@ -130,8 +159,29 @@ func (c *ControllerWithEvents) BeginTX(ctx context.Context, options *sql.TxOptio return &ControllerWithEvents{ ledger: c.ledger, Controller: ctrl, - listener: c.listener, + listener: c.listener, + parent: c, + hasTx: true, }, nil } +func (c *ControllerWithEvents) Commit(ctx context.Context) error { + err := c.Controller.Commit(ctx) + if err != nil { + return err + } + + for _, f := range c.atCommit { + f() + } + + return nil +} + +func (c *ControllerWithEvents) Rollback(ctx context.Context) error { + c.atCommit = nil + + return c.Controller.Rollback(ctx) +} + var _ Controller = (*ControllerWithEvents)(nil) diff --git a/test/e2e/api_bulk_test.go b/test/e2e/api_bulk_test.go index 1bfdfeaa4..fd568860b 100644 --- a/test/e2e/api_bulk_test.go +++ b/test/e2e/api_bulk_test.go @@ -4,6 +4,10 @@ package test_suite import ( "github.com/formancehq/go-libs/v2/pointer" + ledger "github.com/formancehq/ledger/internal" + "github.com/formancehq/ledger/internal/bus" + ledgerevents "github.com/formancehq/ledger/pkg/events" + "github.com/nats-io/nats.go" "math/big" "time" @@ -25,14 +29,16 @@ var _ = Context("Ledger engine tests", func() { numscriptRewrite bool }{ {"default", false}, - //{"numscript rewrite", true}, + {"numscript rewrite", true}, } { Context(data.description, func() { var ( - db = UseTemplatedDatabase() - ctx = logging.TestingContext() - bulkMaxSize = 5 + db = UseTemplatedDatabase() + ctx = logging.TestingContext() + events chan *nats.Msg + bulkResponse []components.V2BulkElementResult + bulkMaxSize = 5 ) testServer := NewTestServer(func() Configuration { @@ -50,12 +56,14 @@ var _ = Context("Ledger engine tests", func() { Ledger: "default", }) Expect(err).To(BeNil()) + events = Subscribe(GinkgoT(), testServer.GetValue()) }) When("creating a bulk on a ledger", func() { var ( - now = time.Now().Round(time.Microsecond).UTC() - items []components.V2BulkElement - err error + now = time.Now().Round(time.Microsecond).UTC() + items []components.V2BulkElement + err error + atomic bool ) BeforeEach(func() { items = []components.V2BulkElement{ @@ -96,12 +104,13 @@ var _ = Context("Ledger engine tests", func() { } }) JustBeforeEach(func() { - _, err = CreateBulk(ctx, testServer.GetValue(), operations.V2CreateBulkRequest{ + bulkResponse, err = CreateBulk(ctx, testServer.GetValue(), operations.V2CreateBulkRequest{ + Atomic: pointer.For(atomic), RequestBody: items, Ledger: "default", }) }) - It("should be ok", func() { + shouldBeOk := func() { Expect(err).To(Succeed()) tx, err := GetTransaction(ctx, testServer.GetValue(), operations.V2GetTransactionRequest{ @@ -131,6 +140,19 @@ var _ = Context("Ledger engine tests", func() { Timestamp: now, InsertedAt: tx.InsertedAt, })) + By("It should send events", func() { + Eventually(events).Should(Receive(Event(ledgerevents.EventTypeCommittedTransactions))) + Eventually(events).Should(Receive(Event(ledgerevents.EventTypeSavedMetadata))) + Eventually(events).Should(Receive(Event(ledgerevents.EventTypeDeletedMetadata))) + Eventually(events).Should(Receive(Event(ledgerevents.EventTypeRevertedTransaction))) + }) + } + It("should be ok", shouldBeOk) + Context("with atomic", func() { + BeforeEach(func() { + atomic = true + }) + It("should be ok", shouldBeOk) }) Context("with exceeded batch size", func() { BeforeEach(func() { @@ -157,10 +179,9 @@ var _ = Context("Ledger engine tests", func() { }) When("creating a bulk with an error on a ledger", func() { var ( - now = time.Now().Round(time.Microsecond).UTC() - err error - bulkResponse []components.V2BulkElementResult - atomic bool + now = time.Now().Round(time.Microsecond).UTC() + err error + atomic bool ) JustBeforeEach(func() { bulkResponse, err = CreateBulk(ctx, testServer.GetValue(), operations.V2CreateBulkRequest{ @@ -218,6 +239,15 @@ var _ = Context("Ledger engine tests", func() { Expect(err).To(Succeed()) Expect(txs.Data).To(HaveLen(1)) }) + + By("Should have sent one event", func() { + Eventually(events).Should(Receive(Event(ledgerevents.EventTypeCommittedTransactions, WithPayload(bus.CommittedTransactions{ + Ledger: "default", + Transactions: []ledger.Transaction{ConvertSDKTxToCoreTX(&bulkResponse[0].V2BulkElementResultCreateTransaction.Data)}, + AccountMetadata: ledger.AccountMetadata{}, + })))) + Eventually(events).ShouldNot(Receive()) + }) }) Context("with atomic", func() { BeforeEach(func() { @@ -231,6 +261,10 @@ var _ = Context("Ledger engine tests", func() { }) Expect(err).To(Succeed()) Expect(txs.Data).To(HaveLen(0)) + + By("Should not have sent any event", func() { + Eventually(events).ShouldNot(Receive()) + }) }) }) })