Skip to content

Commit

Permalink
Merge pull request #586 from formancehq/feat/performance-accounts-ins…
Browse files Browse the repository at this point in the history
…ertions

feat(performance): batch accounts insertions
  • Loading branch information
gfyrag authored Nov 25, 2024
2 parents 819405f + 7300c7a commit 26eaec1
Show file tree
Hide file tree
Showing 13 changed files with 146 additions and 97 deletions.
2 changes: 1 addition & 1 deletion internal/controller/ledger/controller_default.go
Original file line number Diff line number Diff line change
Expand Up @@ -444,7 +444,7 @@ func (ctrl *DefaultController) SaveTransactionMetadata(ctx context.Context, para
}

func (ctrl *DefaultController) saveAccountMetadata(ctx context.Context, store Store, parameters Parameters[SaveAccountMetadata]) (*ledger.SavedMetadata, error) {
if _, err := store.UpsertAccount(ctx, &ledger.Account{
if err := store.UpsertAccounts(ctx, &ledger.Account{
Address: parameters.Input.Address,
Metadata: parameters.Input.Metadata,
}); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion internal/controller/ledger/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ type Store interface {
DeleteTransactionMetadata(ctx context.Context, transactionID int, key string) (*ledger.Transaction, bool, error)
UpdateAccountsMetadata(ctx context.Context, m map[string]metadata.Metadata) error
// UpsertAccount returns a boolean indicating if the account was upserted
UpsertAccount(ctx context.Context, account *ledger.Account) (bool, error)
UpsertAccounts(ctx context.Context, accounts ...*ledger.Account) error
DeleteAccountMetadata(ctx context.Context, address, key string) error
InsertLog(ctx context.Context, log *ledger.Log) error

Expand Down
22 changes: 13 additions & 9 deletions internal/controller/ledger/store_generated_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

68 changes: 24 additions & 44 deletions internal/storage/ledger/accounts.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package ledger

import (
"context"
"database/sql"
"fmt"
. "github.com/formancehq/go-libs/v2/bun/bunpaginate"
"github.com/formancehq/ledger/pkg/features"
Expand All @@ -12,11 +11,8 @@ import (

"github.com/formancehq/go-libs/v2/metadata"
"github.com/formancehq/go-libs/v2/platform/postgres"
ledgercontroller "github.com/formancehq/ledger/internal/controller/ledger"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"

"github.com/formancehq/go-libs/v2/time"
ledgercontroller "github.com/formancehq/ledger/internal/controller/ledger"

"github.com/formancehq/go-libs/v2/query"
ledger "github.com/formancehq/ledger/internal"
Expand Down Expand Up @@ -333,45 +329,29 @@ func (s *Store) DeleteAccountMetadata(ctx context.Context, account, key string)
return err
}

// todo: since we update first balances of an accounts in the transaction process, we can avoid nested sql txs
// while upserting account and upsert them all in one shot
func (s *Store) UpsertAccount(ctx context.Context, account *ledger.Account) (bool, error) {
return tracing.TraceWithMetric(
func (s *Store) UpsertAccounts(ctx context.Context, accounts ...*ledger.Account) error {
return tracing.SkipResult(tracing.TraceWithMetric(
ctx,
"UpsertAccount",
"UpsertAccounts",
s.tracer,
s.upsertAccountHistogram,
func(ctx context.Context) (bool, error) {
upserted := false
err := s.db.RunInTx(ctx, &sql.TxOptions{}, func(ctx context.Context, tx bun.Tx) error {
ret, err := tx.NewInsert().
Model(account).
ModelTableExpr(s.GetPrefixedRelationName("accounts")).
On("conflict (ledger, address) do update").
Set("first_usage = case when ? < excluded.first_usage then ? else excluded.first_usage end", account.FirstUsage, account.FirstUsage).
Set("metadata = accounts.metadata || excluded.metadata").
Set("updated_at = excluded.updated_at").
Value("ledger", "?", s.ledger.Name).
Returning("*").
Where("(? < accounts.first_usage) or not accounts.metadata @> excluded.metadata", account.FirstUsage).
Exec(ctx)
if err != nil {
return err
}
rowsModified, err := ret.RowsAffected()
if err != nil {
return err
}
upserted = rowsModified > 0
return nil
})
return upserted, postgres.ResolveError(err)
},
func(ctx context.Context, upserted bool) {
trace.SpanFromContext(ctx).SetAttributes(
attribute.String("address", account.Address),
attribute.Bool("upserted", upserted),
)
},
)
s.upsertAccountsHistogram,
tracing.NoResult(func(ctx context.Context) error {
_, err := s.db.NewInsert().
Model(&accounts).
ModelTableExpr(s.GetPrefixedRelationName("accounts")).
On("conflict (ledger, address) do update").
Set("first_usage = case when excluded.first_usage < accounts.first_usage then excluded.first_usage else accounts.first_usage end").
Set("metadata = accounts.metadata || excluded.metadata").
Set("updated_at = excluded.updated_at").
Value("ledger", "?", s.ledger.Name).
Returning("*").
Where("(excluded.first_usage < accounts.first_usage) or not accounts.metadata @> excluded.metadata").
Exec(ctx)
if err != nil {
return fmt.Errorf("upserting accounts: %w", postgres.ResolveError(err))
}

return nil
}),
))
}
25 changes: 16 additions & 9 deletions internal/storage/ledger/accounts_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -402,22 +402,30 @@ func TestAccountsUpsert(t *testing.T) {
store := newLedgerStore(t)
ctx := logging.TestingContext()

account := ledger.Account{
account1 := ledger.Account{
Address: "foo",
}

account2 := ledger.Account{
Address: "foo2",
}

// Initial insert
upserted, err := store.UpsertAccount(ctx, &account)
err := store.UpsertAccounts(ctx, &account1, &account2)
require.NoError(t, err)
require.True(t, upserted)
require.NotEmpty(t, account.FirstUsage)
require.NotEmpty(t, account.InsertionDate)
require.NotEmpty(t, account.UpdatedAt)

require.NotEmpty(t, account1.FirstUsage)
require.NotEmpty(t, account1.InsertionDate)
require.NotEmpty(t, account1.UpdatedAt)

require.NotEmpty(t, account2.FirstUsage)
require.NotEmpty(t, account2.InsertionDate)
require.NotEmpty(t, account2.UpdatedAt)

now := time.Now()

// Reset the account model
account = ledger.Account{
account1 = ledger.Account{
Address: "foo",
// The account will be upserted on the timeline after its initial usage.
// The upsert should not modify anything, but, it should retrieve and load the account entity
Expand All @@ -427,7 +435,6 @@ func TestAccountsUpsert(t *testing.T) {
}

// Upsert with no modification
upserted, err = store.UpsertAccount(ctx, &account)
err = store.UpsertAccounts(ctx, &account1)
require.NoError(t, err)
require.False(t, upserted)
}
4 changes: 2 additions & 2 deletions internal/storage/ledger/balances_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func TestBalancesGet(t *testing.T) {
UpdatedAt: time.Now(),
FirstUsage: time.Now(),
}
_, err := store.UpsertAccount(ctx, world)
err := store.UpsertAccounts(ctx, world)
require.NoError(t, err)

_, err = store.UpdateVolumes(ctx, ledger.AccountsVolumes{
Expand Down Expand Up @@ -146,7 +146,7 @@ func TestBalancesGet(t *testing.T) {
InsertionDate: tx.InsertedAt,
UpdatedAt: tx.InsertedAt,
}
_, err = store.UpsertAccount(ctx, &bankAccount)
err = store.UpsertAccounts(ctx, &bankAccount)
require.NoError(t, err)

err = store.InsertMoves(ctx, &ledger.Move{
Expand Down
4 changes: 2 additions & 2 deletions internal/storage/ledger/legacy/adapters.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ func (d *DefaultStoreAdapter) UpdateAccountsMetadata(ctx context.Context, m map[
return d.newStore.UpdateAccountsMetadata(ctx, m)
}

func (d *DefaultStoreAdapter) UpsertAccount(ctx context.Context, account *ledger.Account) (bool, error) {
return d.newStore.UpsertAccount(ctx, account)
func (d *DefaultStoreAdapter) UpsertAccounts(ctx context.Context, accounts ... *ledger.Account) error {
return d.newStore.UpsertAccounts(ctx, accounts...)
}

func (d *DefaultStoreAdapter) DeleteAccountMetadata(ctx context.Context, address, key string) error {
Expand Down
2 changes: 1 addition & 1 deletion internal/storage/ledger/moves_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func TestMovesInsert(t *testing.T) {
account := &ledger.Account{
Address: "world",
}
_, err := store.UpsertAccount(ctx, account)
err := store.UpsertAccounts(ctx, account)
require.NoError(t, err)

now := time.Now()
Expand Down
8 changes: 4 additions & 4 deletions internal/storage/ledger/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@ type Store struct {
getAccountHistogram metric.Int64Histogram
countAccountsHistogram metric.Int64Histogram
updateAccountsMetadataHistogram metric.Int64Histogram
deleteAccountMetadataHistogram metric.Int64Histogram
upsertAccountHistogram metric.Int64Histogram
getBalancesHistogram metric.Int64Histogram
deleteAccountMetadataHistogram metric.Int64Histogram
upsertAccountsHistogram metric.Int64Histogram
getBalancesHistogram metric.Int64Histogram
insertLogHistogram metric.Int64Histogram
listLogsHistogram metric.Int64Histogram
readLogWithIdempotencyKeyHistogram metric.Int64Histogram
Expand Down Expand Up @@ -154,7 +154,7 @@ func New(db bun.IDB, bucket bucket.Bucket, ledger ledger.Ledger, opts ...Option)
panic(err)
}

ret.upsertAccountHistogram, err = ret.meter.Int64Histogram("store.upsertAccount")
ret.upsertAccountsHistogram, err = ret.meter.Int64Histogram("store.upsertAccounts")
if err != nil {
panic(err)
}
Expand Down
12 changes: 6 additions & 6 deletions internal/storage/ledger/transactions.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"fmt"
"github.com/formancehq/go-libs/v2/collectionutils"
"github.com/formancehq/ledger/pkg/features"
"math/big"
"regexp"
Expand Down Expand Up @@ -254,15 +255,15 @@ func (s *Store) CommitTransaction(ctx context.Context, tx *ledger.Transaction) e
return fmt.Errorf("failed to insert transaction: %w", err)
}

for _, address := range tx.InvolvedAccounts() {
_, err := s.UpsertAccount(ctx, &ledger.Account{
err = s.UpsertAccounts(ctx, collectionutils.Map(tx.InvolvedAccounts(), func(address string) *ledger.Account {
return &ledger.Account{
Address: address,
FirstUsage: tx.Timestamp,
Metadata: make(metadata.Metadata),
})
if err != nil {
return fmt.Errorf("upserting account: %w", err)
}
})...)
if err != nil {
return fmt.Errorf("upserting accounts: %w", err)
}

if s.ledger.HasFeature(features.FeatureMovesHistory, "ON") {
Expand Down Expand Up @@ -302,7 +303,6 @@ func (s *Store) CommitTransaction(ctx context.Context, tx *ledger.Transaction) e
}

if s.ledger.HasFeature(features.FeatureMovesHistoryPostCommitEffectiveVolumes, "SYNC") {
// todo: tx is inserted earlier!
tx.PostCommitEffectiveVolumes = moves.ComputePostCommitEffectiveVolumes()
}
}
Expand Down
58 changes: 53 additions & 5 deletions pkg/generate/generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ import (
"github.com/formancehq/ledger/pkg/client/models/operations"
"github.com/google/uuid"
"math/big"
"os"
"path/filepath"
"time"
)

Expand Down Expand Up @@ -185,12 +187,24 @@ func (r Action) Apply(ctx context.Context, client *client.V2, l string) (*Result
return &Result{response.V2BulkResponse.Data[0]}, nil
}

type NextOptions struct {
Globals map[string]any
}

type NextOption func(options *NextOptions)

func WithNextGlobals(globals map[string]any) NextOption {
return func(options *NextOptions) {
options.Globals = globals
}
}

type Generator struct {
next func(int) (*Action, error)
next func(int, ...NextOption) (*Action, error)
}

func (g *Generator) Next(iteration int) (*Action, error) {
return g.next(iteration)
func (g *Generator) Next(iteration int, options ...NextOption) (*Action, error) {
return g.next(iteration, options...)
}

func NewGenerator(script string, opts ...Option) (*Generator, error) {
Expand Down Expand Up @@ -221,14 +235,41 @@ func NewGenerator(script string, opts ...Option) (*Generator, error) {
return nil, err
}

err = runtime.Set("read_file", func(path string) string {
fmt.Println("read file", path)
f, err := os.ReadFile(filepath.Join(cfg.rootPath, path))
if err != nil {
panic(err)
}

return string(f)
})
if err != nil {
return nil, err
}

var next func(int) map[string]any
err = runtime.ExportTo(runtime.Get("next"), &next)
if err != nil {
panic(err)
}

return &Generator{
next: func(i int) (*Action, error) {
next: func(i int, options ...NextOption) (*Action, error) {

nextOptions := NextOptions{}
for _, option := range options {
option(&nextOptions)
}

if nextOptions.Globals != nil {
for k, v := range nextOptions.Globals {
if err := runtime.Set(k, v); err != nil {
return nil, fmt.Errorf("failed to set global variable %s: %w", k, err)
}
}
}

ret := next(i)

var (
Expand Down Expand Up @@ -281,7 +322,8 @@ func NewGenerator(script string, opts ...Option) (*Generator, error) {
}

type config struct {
globals map[string]any
globals map[string]any
rootPath string
}

type Option func(*config)
Expand All @@ -291,3 +333,9 @@ func WithGlobals(globals map[string]any) Option {
c.globals = globals
}
}

func WithRootPath(path string) Option {
return func(c *config) {
c.rootPath = path
}
}
Loading

0 comments on commit 26eaec1

Please sign in to comment.