Skip to content

Commit

Permalink
feat: streamable bulk (#590)
Browse files Browse the repository at this point in the history
* feat: add stream at core level

* feat: refactor api part to support switch on content types

* refacto: bulk streams

* feat: dedicated bulking package

* feat: simplify

* test: refactor and test

* test: add some tests

* fix: remove useless CI step

* feat: add some traces

* chore: clean unused indexes

* fix: rolling upgrade tests
  • Loading branch information
gfyrag authored Nov 27, 2024
1 parent d7ba7d8 commit aeb7585
Show file tree
Hide file tree
Showing 94 changed files with 2,576 additions and 1,019 deletions.
6 changes: 0 additions & 6 deletions .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -172,12 +172,6 @@ jobs:
- uses: 'actions/checkout@v4'
with:
fetch-depth: 0
- name: Tailscale
uses: tailscale/github-action@v2
with:
oauth-client-id: ${{ secrets.TS_OAUTH_CLIENT_ID }}
oauth-secret: ${{ secrets.TS_OAUTH_SECRET }}
tags: tag:ci
- name: "Deploy in staging"
env:
TAG: ${{ github.sha }}
Expand Down
5 changes: 5 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -71,4 +71,9 @@ services:
OTEL_TRACES_EXPORTER_OTLP_INSECURE: "true"
OTEL_TRACES_BATCH: "true"
POSTGRES_URI: "postgresql://ledger:ledger@postgres/ledger?sslmode=disable"
POSTGRES_MAX_OPEN_CONNS: "40"
POSTGRES_MAX_IDLE_CONNS: "40"
POSTGRES_CONN_MAX_IDLE_TIME: "5m"
EXPERIMENTAL_FEATURES: "true"
AUTO_UPGRADE: "true"
BULK_PARALLEL: "10"
10 changes: 10 additions & 0 deletions internal/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import "github.com/formancehq/ledger/internal"
- [Variables](<#variables>)
- [func ComputeIdempotencyHash\(inputs any\) string](<#ComputeIdempotencyHash>)
- [type Account](<#Account>)
- [func \(a Account\) GetAddress\(\) string](<#Account.GetAddress>)
- [type AccountMetadata](<#AccountMetadata>)
- [type AccountsVolumes](<#AccountsVolumes>)
- [type BalancesByAssets](<#BalancesByAssets>)
Expand Down Expand Up @@ -172,6 +173,15 @@ type Account struct {
}
```

<a name="Account.GetAddress"></a>
### func \(Account\) GetAddress

```go
func (a Account) GetAddress() string
```



<a name="AccountMetadata"></a>
## type AccountMetadata

Expand Down
4 changes: 4 additions & 0 deletions internal/account.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ type Account struct {
EffectiveVolumes VolumesByAssets `json:"effectiveVolumes,omitempty" bun:"effective_volumes,scanonly"`
}

func (a Account) GetAddress() string {
return a.Address
}

type AccountsVolumes struct {
bun.BaseModel `bun:"accounts_volumes"`

Expand Down
313 changes: 313 additions & 0 deletions internal/api/bulking/bulker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,313 @@
package bulking

import (
"context"
"encoding/json"
"errors"
"fmt"
"github.com/alitto/pond"
"github.com/formancehq/go-libs/v2/logging"
ledger "github.com/formancehq/ledger/internal"
ledgercontroller "github.com/formancehq/ledger/internal/controller/ledger"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"go.opentelemetry.io/otel/trace/noop"
"sync/atomic"
)

type Bulker struct {
ctrl ledgercontroller.Controller
parallelism int
tracer trace.Tracer
}

func (b *Bulker) run(ctx context.Context, ctrl ledgercontroller.Controller, bulk Bulk, result chan BulkElementResult, continueOnFailure, parallel bool) bool {

parallelism := 1
if parallel && b.parallelism != 0 {
parallelism = b.parallelism
}

wp := pond.New(parallelism, parallelism)
hasError := atomic.Bool{}

index := 0
for element := range bulk {
wp.Submit(func() {
ctx, span := b.tracer.Start(ctx, "Bulk:ProcessElement",
trace.WithNewRoot(),
trace.WithLinks(trace.LinkFromContext(ctx)),
trace.WithAttributes(attribute.Int("index", index)),
)
defer span.End()

select {
case <-ctx.Done():
result <- BulkElementResult{
Error: ctx.Err(),
}
default:
if hasError.Load() && !continueOnFailure {
result <- BulkElementResult{
Error: context.Canceled,
}
return
}
ret, logID, err := b.processElement(ctx, ctrl, element)
if err != nil {
hasError.Store(true)
span.RecordError(err)

result <- BulkElementResult{
Error: err,
}

return
}

result <- BulkElementResult{
Data: ret,
LogID: logID,
}
}

})
}

wp.StopAndWait()

defer close(result)

return hasError.Load()
}

func (b *Bulker) Run(ctx context.Context, bulk Bulk, result chan BulkElementResult, bulkOptions BulkingOptions) error {

ctx, span := b.tracer.Start(ctx, "Bulk:Run", trace.WithAttributes(
attribute.Bool("atomic", bulkOptions.Atomic),
attribute.Bool("parallel", bulkOptions.Parallel),
attribute.Bool("continueOnFailure", bulkOptions.ContinueOnFailure),
attribute.Int("parallelism", b.parallelism),
))
defer span.End()

if err := bulkOptions.Validate(); err != nil {
return fmt.Errorf("validating bulk options: %s", err)
}

ctrl := b.ctrl
if bulkOptions.Atomic {
var err error
ctrl, err = ctrl.BeginTX(ctx, nil)
if err != nil {
return fmt.Errorf("error starting transaction: %s", err)
}
}

hasError := b.run(ctx, ctrl, bulk, result, bulkOptions.ContinueOnFailure, bulkOptions.Parallel)
if hasError && bulkOptions.Atomic {
if rollbackErr := ctrl.Rollback(ctx); rollbackErr != nil {
logging.FromContext(ctx).Errorf("failed to rollback transaction: %v", rollbackErr)
}

return nil
}

if bulkOptions.Atomic {
if err := ctrl.Commit(ctx); err != nil {
return fmt.Errorf("error committing transaction: %s", err)
}
}

return nil
}

func (b *Bulker) processElement(ctx context.Context, ctrl ledgercontroller.Controller, data BulkElement) (any, int, error) {

switch data.Action {
case ActionCreateTransaction:
rs, err := data.Data.(TransactionRequest).ToRunScript(false)
if err != nil {
return nil, 0, fmt.Errorf("error parsing element: %s", err)
}

log, createTransactionResult, err := ctrl.CreateTransaction(ctx, ledgercontroller.Parameters[ledgercontroller.RunScript]{
DryRun: false,
IdempotencyKey: data.IdempotencyKey,
Input: *rs,
})
if err != nil {
return nil, 0, err
}

// todo(next api version): no reason to return only the transaction...
return createTransactionResult.Transaction, log.ID, nil
case ActionAddMetadata:
req := data.Data.(AddMetadataRequest)

var (
log *ledger.Log
err error
)
switch req.TargetType {
case ledger.MetaTargetTypeAccount:
address := ""
if err := json.Unmarshal(req.TargetID, &address); err != nil {
return nil, 0, err
}
log, err = ctrl.SaveAccountMetadata(ctx, ledgercontroller.Parameters[ledgercontroller.SaveAccountMetadata]{
DryRun: false,
IdempotencyKey: data.IdempotencyKey,
Input: ledgercontroller.SaveAccountMetadata{
Address: address,
Metadata: req.Metadata,
},
})
case ledger.MetaTargetTypeTransaction:
transactionID := 0
if err := json.Unmarshal(req.TargetID, &transactionID); err != nil {
return nil, 0, err
}
log, err = ctrl.SaveTransactionMetadata(ctx, ledgercontroller.Parameters[ledgercontroller.SaveTransactionMetadata]{
DryRun: false,
IdempotencyKey: data.IdempotencyKey,
Input: ledgercontroller.SaveTransactionMetadata{
TransactionID: transactionID,
Metadata: req.Metadata,
},
})
default:
return nil, 0, fmt.Errorf("invalid target type: %s", req.TargetType)
}
if err != nil {
return nil, 0, err
}

return nil, log.ID, nil
case ActionRevertTransaction:
req := data.Data.(RevertTransactionRequest)

log, revertTransactionResult, err := ctrl.RevertTransaction(ctx, ledgercontroller.Parameters[ledgercontroller.RevertTransaction]{
DryRun: false,
IdempotencyKey: data.IdempotencyKey,
Input: ledgercontroller.RevertTransaction{
Force: req.Force,
AtEffectiveDate: req.AtEffectiveDate,
TransactionID: req.ID,
},
})
if err != nil {
return nil, 0, err
}

return revertTransactionResult.RevertedTransaction, log.ID, nil
case ActionDeleteMetadata:
req := data.Data.(DeleteMetadataRequest)

var (
log *ledger.Log
err error
)
switch req.TargetType {
case ledger.MetaTargetTypeAccount:
address := ""
if err := json.Unmarshal(req.TargetID, &address); err != nil {
return nil, 0, err
}

log, err = ctrl.DeleteAccountMetadata(ctx, ledgercontroller.Parameters[ledgercontroller.DeleteAccountMetadata]{
DryRun: false,
IdempotencyKey: data.IdempotencyKey,
Input: ledgercontroller.DeleteAccountMetadata{
Address: address,
Key: req.Key,
},
})
case ledger.MetaTargetTypeTransaction:
transactionID := 0
if err := json.Unmarshal(req.TargetID, &transactionID); err != nil {
return nil, 0, err
}

log, err = ctrl.DeleteTransactionMetadata(ctx, ledgercontroller.Parameters[ledgercontroller.DeleteTransactionMetadata]{
DryRun: false,
IdempotencyKey: data.IdempotencyKey,
Input: ledgercontroller.DeleteTransactionMetadata{
TransactionID: transactionID,
Key: req.Key,
},
})
default:
return nil, 0, fmt.Errorf("unsupported target type: %s", req.TargetType)
}
if err != nil {
return nil, 0, err
}

return nil, log.ID, nil
default:
panic("unreachable")
}
}

func NewBulker(ctrl ledgercontroller.Controller, options ...BulkerOption) *Bulker {
ret := &Bulker{ctrl: ctrl}
for _, option := range append(defaultBulkerOptions, options...) {
option(ret)
}

return ret
}

type BulkerOption func(bulker *Bulker)

func WithParallelism(v int) BulkerOption {
return func(options *Bulker) {
options.parallelism = v
}
}

func WithTracer(tracer trace.Tracer) BulkerOption {
return func(options *Bulker) {
options.tracer = tracer
}
}

var defaultBulkerOptions = []BulkerOption{
WithTracer(noop.Tracer{}),
WithParallelism(10),
}

type BulkingOptions struct {
ContinueOnFailure bool
Atomic bool
Parallel bool
}

func (opts BulkingOptions) Validate() error {
if opts.Atomic && opts.Parallel {
return errors.New("atomic and parallel options are mutually exclusive")
}

return nil
}

type BulkerFactory interface {
CreateBulker(ctrl ledgercontroller.Controller) *Bulker
}

type DefaultBulkerFactory struct {
Options []BulkerOption
}

func (d *DefaultBulkerFactory) CreateBulker(ctrl ledgercontroller.Controller) *Bulker {
return NewBulker(ctrl, d.Options...)
}

func NewDefaultBulkerFactory(options ...BulkerOption) *DefaultBulkerFactory {
return &DefaultBulkerFactory{
Options: options,
}
}

var _ BulkerFactory = (*DefaultBulkerFactory)(nil)
Loading

0 comments on commit aeb7585

Please sign in to comment.