Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: streamable bulk #590

Merged
merged 11 commits into from
Nov 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 0 additions & 6 deletions .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -172,12 +172,6 @@ jobs:
- uses: 'actions/checkout@v4'
with:
fetch-depth: 0
- name: Tailscale
uses: tailscale/github-action@v2
with:
oauth-client-id: ${{ secrets.TS_OAUTH_CLIENT_ID }}
oauth-secret: ${{ secrets.TS_OAUTH_SECRET }}
tags: tag:ci
- name: "Deploy in staging"
env:
TAG: ${{ github.sha }}
Expand Down
5 changes: 5 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -71,4 +71,9 @@ services:
OTEL_TRACES_EXPORTER_OTLP_INSECURE: "true"
OTEL_TRACES_BATCH: "true"
POSTGRES_URI: "postgresql://ledger:ledger@postgres/ledger?sslmode=disable"
POSTGRES_MAX_OPEN_CONNS: "40"
POSTGRES_MAX_IDLE_CONNS: "40"
POSTGRES_CONN_MAX_IDLE_TIME: "5m"
EXPERIMENTAL_FEATURES: "true"
AUTO_UPGRADE: "true"
BULK_PARALLEL: "10"
10 changes: 10 additions & 0 deletions internal/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import "github.com/formancehq/ledger/internal"
- [Variables](<#variables>)
- [func ComputeIdempotencyHash\(inputs any\) string](<#ComputeIdempotencyHash>)
- [type Account](<#Account>)
- [func \(a Account\) GetAddress\(\) string](<#Account.GetAddress>)
- [type AccountMetadata](<#AccountMetadata>)
- [type AccountsVolumes](<#AccountsVolumes>)
- [type BalancesByAssets](<#BalancesByAssets>)
Expand Down Expand Up @@ -172,6 +173,15 @@ type Account struct {
}
```

<a name="Account.GetAddress"></a>
### func \(Account\) GetAddress

```go
func (a Account) GetAddress() string
```



<a name="AccountMetadata"></a>
## type AccountMetadata

Expand Down
4 changes: 4 additions & 0 deletions internal/account.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ type Account struct {
EffectiveVolumes VolumesByAssets `json:"effectiveVolumes,omitempty" bun:"effective_volumes,scanonly"`
}

func (a Account) GetAddress() string {
return a.Address
}

type AccountsVolumes struct {
bun.BaseModel `bun:"accounts_volumes"`

Expand Down
313 changes: 313 additions & 0 deletions internal/api/bulking/bulker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,313 @@
package bulking

import (
"context"
"encoding/json"
"errors"
"fmt"
"github.com/alitto/pond"
"github.com/formancehq/go-libs/v2/logging"
ledger "github.com/formancehq/ledger/internal"
ledgercontroller "github.com/formancehq/ledger/internal/controller/ledger"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"go.opentelemetry.io/otel/trace/noop"
"sync/atomic"
)

type Bulker struct {
ctrl ledgercontroller.Controller
parallelism int
tracer trace.Tracer
}

func (b *Bulker) run(ctx context.Context, ctrl ledgercontroller.Controller, bulk Bulk, result chan BulkElementResult, continueOnFailure, parallel bool) bool {

parallelism := 1
if parallel && b.parallelism != 0 {
parallelism = b.parallelism
}

wp := pond.New(parallelism, parallelism)
hasError := atomic.Bool{}

index := 0
for element := range bulk {
wp.Submit(func() {
ctx, span := b.tracer.Start(ctx, "Bulk:ProcessElement",
trace.WithNewRoot(),
trace.WithLinks(trace.LinkFromContext(ctx)),
trace.WithAttributes(attribute.Int("index", index)),
)
defer span.End()

select {
case <-ctx.Done():
result <- BulkElementResult{
Error: ctx.Err(),
}
default:
if hasError.Load() && !continueOnFailure {
result <- BulkElementResult{
Error: context.Canceled,
}
return
}
ret, logID, err := b.processElement(ctx, ctrl, element)
if err != nil {
hasError.Store(true)
span.RecordError(err)

result <- BulkElementResult{
Error: err,
}

return
}

result <- BulkElementResult{
Data: ret,
LogID: logID,
}
}

})
}

wp.StopAndWait()

defer close(result)

return hasError.Load()
}

func (b *Bulker) Run(ctx context.Context, bulk Bulk, result chan BulkElementResult, bulkOptions BulkingOptions) error {

ctx, span := b.tracer.Start(ctx, "Bulk:Run", trace.WithAttributes(
attribute.Bool("atomic", bulkOptions.Atomic),
attribute.Bool("parallel", bulkOptions.Parallel),
attribute.Bool("continueOnFailure", bulkOptions.ContinueOnFailure),
attribute.Int("parallelism", b.parallelism),
))
defer span.End()

if err := bulkOptions.Validate(); err != nil {
return fmt.Errorf("validating bulk options: %s", err)
}

ctrl := b.ctrl
if bulkOptions.Atomic {
var err error
ctrl, err = ctrl.BeginTX(ctx, nil)
if err != nil {
return fmt.Errorf("error starting transaction: %s", err)
}
}

hasError := b.run(ctx, ctrl, bulk, result, bulkOptions.ContinueOnFailure, bulkOptions.Parallel)
if hasError && bulkOptions.Atomic {
if rollbackErr := ctrl.Rollback(ctx); rollbackErr != nil {
logging.FromContext(ctx).Errorf("failed to rollback transaction: %v", rollbackErr)
}

return nil
}

if bulkOptions.Atomic {
if err := ctrl.Commit(ctx); err != nil {
return fmt.Errorf("error committing transaction: %s", err)
}
}

return nil
}

func (b *Bulker) processElement(ctx context.Context, ctrl ledgercontroller.Controller, data BulkElement) (any, int, error) {

switch data.Action {
case ActionCreateTransaction:
rs, err := data.Data.(TransactionRequest).ToRunScript(false)
if err != nil {
return nil, 0, fmt.Errorf("error parsing element: %s", err)
}

log, createTransactionResult, err := ctrl.CreateTransaction(ctx, ledgercontroller.Parameters[ledgercontroller.RunScript]{
DryRun: false,
IdempotencyKey: data.IdempotencyKey,
Input: *rs,
})
if err != nil {
return nil, 0, err
}

// todo(next api version): no reason to return only the transaction...
return createTransactionResult.Transaction, log.ID, nil
case ActionAddMetadata:
req := data.Data.(AddMetadataRequest)

var (
log *ledger.Log
err error
)
switch req.TargetType {
case ledger.MetaTargetTypeAccount:
address := ""
if err := json.Unmarshal(req.TargetID, &address); err != nil {
return nil, 0, err
}
log, err = ctrl.SaveAccountMetadata(ctx, ledgercontroller.Parameters[ledgercontroller.SaveAccountMetadata]{
DryRun: false,
IdempotencyKey: data.IdempotencyKey,
Input: ledgercontroller.SaveAccountMetadata{
Address: address,
Metadata: req.Metadata,
},
})
case ledger.MetaTargetTypeTransaction:
transactionID := 0
if err := json.Unmarshal(req.TargetID, &transactionID); err != nil {
return nil, 0, err
}
log, err = ctrl.SaveTransactionMetadata(ctx, ledgercontroller.Parameters[ledgercontroller.SaveTransactionMetadata]{
DryRun: false,
IdempotencyKey: data.IdempotencyKey,
Input: ledgercontroller.SaveTransactionMetadata{
TransactionID: transactionID,
Metadata: req.Metadata,
},
})
default:
return nil, 0, fmt.Errorf("invalid target type: %s", req.TargetType)
}
if err != nil {
return nil, 0, err
}

return nil, log.ID, nil
case ActionRevertTransaction:
req := data.Data.(RevertTransactionRequest)

log, revertTransactionResult, err := ctrl.RevertTransaction(ctx, ledgercontroller.Parameters[ledgercontroller.RevertTransaction]{
DryRun: false,
IdempotencyKey: data.IdempotencyKey,
Input: ledgercontroller.RevertTransaction{
Force: req.Force,
AtEffectiveDate: req.AtEffectiveDate,
TransactionID: req.ID,
},
})
if err != nil {
return nil, 0, err
}

return revertTransactionResult.RevertedTransaction, log.ID, nil
case ActionDeleteMetadata:
req := data.Data.(DeleteMetadataRequest)

var (
log *ledger.Log
err error
)
switch req.TargetType {
case ledger.MetaTargetTypeAccount:
address := ""
if err := json.Unmarshal(req.TargetID, &address); err != nil {
return nil, 0, err
}

log, err = ctrl.DeleteAccountMetadata(ctx, ledgercontroller.Parameters[ledgercontroller.DeleteAccountMetadata]{
DryRun: false,
IdempotencyKey: data.IdempotencyKey,
Input: ledgercontroller.DeleteAccountMetadata{
Address: address,
Key: req.Key,
},
})
case ledger.MetaTargetTypeTransaction:
transactionID := 0
if err := json.Unmarshal(req.TargetID, &transactionID); err != nil {
return nil, 0, err
}

log, err = ctrl.DeleteTransactionMetadata(ctx, ledgercontroller.Parameters[ledgercontroller.DeleteTransactionMetadata]{
DryRun: false,
IdempotencyKey: data.IdempotencyKey,
Input: ledgercontroller.DeleteTransactionMetadata{
TransactionID: transactionID,
Key: req.Key,
},
})
default:
return nil, 0, fmt.Errorf("unsupported target type: %s", req.TargetType)
}
if err != nil {
return nil, 0, err
}

return nil, log.ID, nil
default:
panic("unreachable")
}
}

func NewBulker(ctrl ledgercontroller.Controller, options ...BulkerOption) *Bulker {
ret := &Bulker{ctrl: ctrl}
for _, option := range append(defaultBulkerOptions, options...) {
option(ret)
}

return ret
}

type BulkerOption func(bulker *Bulker)

func WithParallelism(v int) BulkerOption {
return func(options *Bulker) {
options.parallelism = v
}
}

func WithTracer(tracer trace.Tracer) BulkerOption {
return func(options *Bulker) {
options.tracer = tracer
}
}

var defaultBulkerOptions = []BulkerOption{
WithTracer(noop.Tracer{}),
WithParallelism(10),
}

type BulkingOptions struct {
ContinueOnFailure bool
Atomic bool
Parallel bool
}

func (opts BulkingOptions) Validate() error {
if opts.Atomic && opts.Parallel {
return errors.New("atomic and parallel options are mutually exclusive")
}

return nil
}

type BulkerFactory interface {
CreateBulker(ctrl ledgercontroller.Controller) *Bulker
}

type DefaultBulkerFactory struct {
Options []BulkerOption
}

func (d *DefaultBulkerFactory) CreateBulker(ctrl ledgercontroller.Controller) *Bulker {
return NewBulker(ctrl, d.Options...)
}

func NewDefaultBulkerFactory(options ...BulkerOption) *DefaultBulkerFactory {
return &DefaultBulkerFactory{
Options: options,
}
}

var _ BulkerFactory = (*DefaultBulkerFactory)(nil)
Loading
Loading