Skip to content

Commit

Permalink
feat(performance): batch accounts insertions
Browse files Browse the repository at this point in the history
  • Loading branch information
gfyrag committed Nov 23, 2024
1 parent 819405f commit d84cc02
Show file tree
Hide file tree
Showing 10 changed files with 65 additions and 74 deletions.
2 changes: 1 addition & 1 deletion internal/controller/ledger/controller_default.go
Original file line number Diff line number Diff line change
Expand Up @@ -444,7 +444,7 @@ func (ctrl *DefaultController) SaveTransactionMetadata(ctx context.Context, para
}

func (ctrl *DefaultController) saveAccountMetadata(ctx context.Context, store Store, parameters Parameters[SaveAccountMetadata]) (*ledger.SavedMetadata, error) {
if _, err := store.UpsertAccount(ctx, &ledger.Account{
if err := store.UpsertAccounts(ctx, &ledger.Account{
Address: parameters.Input.Address,
Metadata: parameters.Input.Metadata,
}); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion internal/controller/ledger/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ type Store interface {
DeleteTransactionMetadata(ctx context.Context, transactionID int, key string) (*ledger.Transaction, bool, error)
UpdateAccountsMetadata(ctx context.Context, m map[string]metadata.Metadata) error
// UpsertAccount returns a boolean indicating if the account was upserted
UpsertAccount(ctx context.Context, account *ledger.Account) (bool, error)
UpsertAccounts(ctx context.Context, accounts ...*ledger.Account) error
DeleteAccountMetadata(ctx context.Context, address, key string) error
InsertLog(ctx context.Context, log *ledger.Log) error

Expand Down
22 changes: 13 additions & 9 deletions internal/controller/ledger/store_generated_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

68 changes: 24 additions & 44 deletions internal/storage/ledger/accounts.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package ledger

import (
"context"
"database/sql"
"fmt"
. "github.com/formancehq/go-libs/v2/bun/bunpaginate"
"github.com/formancehq/ledger/pkg/features"
Expand All @@ -12,11 +11,8 @@ import (

"github.com/formancehq/go-libs/v2/metadata"
"github.com/formancehq/go-libs/v2/platform/postgres"
ledgercontroller "github.com/formancehq/ledger/internal/controller/ledger"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"

"github.com/formancehq/go-libs/v2/time"
ledgercontroller "github.com/formancehq/ledger/internal/controller/ledger"

"github.com/formancehq/go-libs/v2/query"
ledger "github.com/formancehq/ledger/internal"
Expand Down Expand Up @@ -333,45 +329,29 @@ func (s *Store) DeleteAccountMetadata(ctx context.Context, account, key string)
return err
}

// todo: since we update first balances of an accounts in the transaction process, we can avoid nested sql txs
// while upserting account and upsert them all in one shot
func (s *Store) UpsertAccount(ctx context.Context, account *ledger.Account) (bool, error) {
return tracing.TraceWithMetric(
func (s *Store) UpsertAccounts(ctx context.Context, accounts ...*ledger.Account) error {
return tracing.SkipResult(tracing.TraceWithMetric(
ctx,
"UpsertAccount",
"UpsertAccounts",
s.tracer,
s.upsertAccountHistogram,
func(ctx context.Context) (bool, error) {
upserted := false
err := s.db.RunInTx(ctx, &sql.TxOptions{}, func(ctx context.Context, tx bun.Tx) error {
ret, err := tx.NewInsert().
Model(account).
ModelTableExpr(s.GetPrefixedRelationName("accounts")).
On("conflict (ledger, address) do update").
Set("first_usage = case when ? < excluded.first_usage then ? else excluded.first_usage end", account.FirstUsage, account.FirstUsage).
Set("metadata = accounts.metadata || excluded.metadata").
Set("updated_at = excluded.updated_at").
Value("ledger", "?", s.ledger.Name).
Returning("*").
Where("(? < accounts.first_usage) or not accounts.metadata @> excluded.metadata", account.FirstUsage).
Exec(ctx)
if err != nil {
return err
}
rowsModified, err := ret.RowsAffected()
if err != nil {
return err
}
upserted = rowsModified > 0
return nil
})
return upserted, postgres.ResolveError(err)
},
func(ctx context.Context, upserted bool) {
trace.SpanFromContext(ctx).SetAttributes(
attribute.String("address", account.Address),
attribute.Bool("upserted", upserted),
)
},
)
s.upsertAccountsHistogram,
tracing.NoResult(func(ctx context.Context) error {
_, err := s.db.NewInsert().
Model(&accounts).
ModelTableExpr(s.GetPrefixedRelationName("accounts")).
On("conflict (ledger, address) do update").
Set("first_usage = case when excluded.first_usage < accounts.first_usage then excluded.first_usage else accounts.first_usage end").
Set("metadata = accounts.metadata || excluded.metadata").
Set("updated_at = excluded.updated_at").
Value("ledger", "?", s.ledger.Name).
Returning("*").
Where("(excluded.first_usage < accounts.first_usage) or not accounts.metadata @> excluded.metadata").
Exec(ctx)
if err != nil {
return fmt.Errorf("upserting accounts: %w", postgres.ResolveError(err))
}

return nil
}),
))
}
25 changes: 16 additions & 9 deletions internal/storage/ledger/accounts_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -402,22 +402,30 @@ func TestAccountsUpsert(t *testing.T) {
store := newLedgerStore(t)
ctx := logging.TestingContext()

account := ledger.Account{
account1 := ledger.Account{
Address: "foo",
}

account2 := ledger.Account{
Address: "foo2",
}

// Initial insert
upserted, err := store.UpsertAccount(ctx, &account)
err := store.UpsertAccounts(ctx, &account1, &account2)
require.NoError(t, err)
require.True(t, upserted)
require.NotEmpty(t, account.FirstUsage)
require.NotEmpty(t, account.InsertionDate)
require.NotEmpty(t, account.UpdatedAt)

require.NotEmpty(t, account1.FirstUsage)
require.NotEmpty(t, account1.InsertionDate)
require.NotEmpty(t, account1.UpdatedAt)

require.NotEmpty(t, account2.FirstUsage)
require.NotEmpty(t, account2.InsertionDate)
require.NotEmpty(t, account2.UpdatedAt)

now := time.Now()

// Reset the account model
account = ledger.Account{
account1 = ledger.Account{
Address: "foo",
// The account will be upserted on the timeline after its initial usage.
// The upsert should not modify anything, but, it should retrieve and load the account entity
Expand All @@ -427,7 +435,6 @@ func TestAccountsUpsert(t *testing.T) {
}

// Upsert with no modification
upserted, err = store.UpsertAccount(ctx, &account)
err = store.UpsertAccounts(ctx, &account1)
require.NoError(t, err)
require.False(t, upserted)
}
4 changes: 2 additions & 2 deletions internal/storage/ledger/balances_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func TestBalancesGet(t *testing.T) {
UpdatedAt: time.Now(),
FirstUsage: time.Now(),
}
_, err := store.UpsertAccount(ctx, world)
err := store.UpsertAccounts(ctx, world)
require.NoError(t, err)

_, err = store.UpdateVolumes(ctx, ledger.AccountsVolumes{
Expand Down Expand Up @@ -146,7 +146,7 @@ func TestBalancesGet(t *testing.T) {
InsertionDate: tx.InsertedAt,
UpdatedAt: tx.InsertedAt,
}
_, err = store.UpsertAccount(ctx, &bankAccount)
err = store.UpsertAccounts(ctx, &bankAccount)
require.NoError(t, err)

err = store.InsertMoves(ctx, &ledger.Move{
Expand Down
4 changes: 2 additions & 2 deletions internal/storage/ledger/legacy/adapters.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ func (d *DefaultStoreAdapter) UpdateAccountsMetadata(ctx context.Context, m map[
return d.newStore.UpdateAccountsMetadata(ctx, m)
}

func (d *DefaultStoreAdapter) UpsertAccount(ctx context.Context, account *ledger.Account) (bool, error) {
return d.newStore.UpsertAccount(ctx, account)
func (d *DefaultStoreAdapter) UpsertAccounts(ctx context.Context, accounts ... *ledger.Account) error {
return d.newStore.UpsertAccounts(ctx, accounts...)
}

func (d *DefaultStoreAdapter) DeleteAccountMetadata(ctx context.Context, address, key string) error {
Expand Down
2 changes: 1 addition & 1 deletion internal/storage/ledger/moves_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func TestMovesInsert(t *testing.T) {
account := &ledger.Account{
Address: "world",
}
_, err := store.UpsertAccount(ctx, account)
err := store.UpsertAccounts(ctx, account)
require.NoError(t, err)

now := time.Now()
Expand Down
8 changes: 4 additions & 4 deletions internal/storage/ledger/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@ type Store struct {
getAccountHistogram metric.Int64Histogram
countAccountsHistogram metric.Int64Histogram
updateAccountsMetadataHistogram metric.Int64Histogram
deleteAccountMetadataHistogram metric.Int64Histogram
upsertAccountHistogram metric.Int64Histogram
getBalancesHistogram metric.Int64Histogram
deleteAccountMetadataHistogram metric.Int64Histogram
upsertAccountsHistogram metric.Int64Histogram
getBalancesHistogram metric.Int64Histogram
insertLogHistogram metric.Int64Histogram
listLogsHistogram metric.Int64Histogram
readLogWithIdempotencyKeyHistogram metric.Int64Histogram
Expand Down Expand Up @@ -154,7 +154,7 @@ func New(db bun.IDB, bucket bucket.Bucket, ledger ledger.Ledger, opts ...Option)
panic(err)
}

ret.upsertAccountHistogram, err = ret.meter.Int64Histogram("store.upsertAccount")
ret.upsertAccountsHistogram, err = ret.meter.Int64Histogram("store.upsertAccounts")
if err != nil {
panic(err)
}
Expand Down
2 changes: 1 addition & 1 deletion internal/storage/ledger/transactions.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ func (s *Store) CommitTransaction(ctx context.Context, tx *ledger.Transaction) e
}

for _, address := range tx.InvolvedAccounts() {
_, err := s.UpsertAccount(ctx, &ledger.Account{
err := s.UpsertAccounts(ctx, &ledger.Account{
Address: address,
FirstUsage: tx.Timestamp,
Metadata: make(metadata.Metadata),
Expand Down

0 comments on commit d84cc02

Please sign in to comment.