From d84cc02bc20152a5a94424107b539a5b3519cc42 Mon Sep 17 00:00:00 2001 From: Geoffrey Ragot Date: Sat, 23 Nov 2024 13:26:47 +0100 Subject: [PATCH] feat(performance): batch accounts insertions --- .../controller/ledger/controller_default.go | 2 +- internal/controller/ledger/store.go | 2 +- .../controller/ledger/store_generated_test.go | 22 +++--- internal/storage/ledger/accounts.go | 68 +++++++------------ internal/storage/ledger/accounts_test.go | 25 ++++--- internal/storage/ledger/balances_test.go | 4 +- internal/storage/ledger/legacy/adapters.go | 4 +- internal/storage/ledger/moves_test.go | 2 +- internal/storage/ledger/store.go | 8 +-- internal/storage/ledger/transactions.go | 2 +- 10 files changed, 65 insertions(+), 74 deletions(-) diff --git a/internal/controller/ledger/controller_default.go b/internal/controller/ledger/controller_default.go index ae114e6c4..a8a3f26ef 100644 --- a/internal/controller/ledger/controller_default.go +++ b/internal/controller/ledger/controller_default.go @@ -444,7 +444,7 @@ func (ctrl *DefaultController) SaveTransactionMetadata(ctx context.Context, para } func (ctrl *DefaultController) saveAccountMetadata(ctx context.Context, store Store, parameters Parameters[SaveAccountMetadata]) (*ledger.SavedMetadata, error) { - if _, err := store.UpsertAccount(ctx, &ledger.Account{ + if err := store.UpsertAccounts(ctx, &ledger.Account{ Address: parameters.Input.Address, Metadata: parameters.Input.Metadata, }); err != nil { diff --git a/internal/controller/ledger/store.go b/internal/controller/ledger/store.go index c442293fe..9a4579419 100644 --- a/internal/controller/ledger/store.go +++ b/internal/controller/ledger/store.go @@ -46,7 +46,7 @@ type Store interface { DeleteTransactionMetadata(ctx context.Context, transactionID int, key string) (*ledger.Transaction, bool, error) UpdateAccountsMetadata(ctx context.Context, m map[string]metadata.Metadata) error // UpsertAccount returns a boolean indicating if the account was upserted - UpsertAccount(ctx context.Context, account *ledger.Account) (bool, error) + UpsertAccounts(ctx context.Context, accounts ...*ledger.Account) error DeleteAccountMetadata(ctx context.Context, address, key string) error InsertLog(ctx context.Context, log *ledger.Log) error diff --git a/internal/controller/ledger/store_generated_test.go b/internal/controller/ledger/store_generated_test.go index ff5d1465d..0244559a7 100644 --- a/internal/controller/ledger/store_generated_test.go +++ b/internal/controller/ledger/store_generated_test.go @@ -412,17 +412,21 @@ func (mr *MockStoreMockRecorder) UpdateTransactionMetadata(ctx, transactionID, m return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateTransactionMetadata", reflect.TypeOf((*MockStore)(nil).UpdateTransactionMetadata), ctx, transactionID, m) } -// UpsertAccount mocks base method. -func (m *MockStore) UpsertAccount(ctx context.Context, account *ledger.Account) (bool, error) { +// UpsertAccounts mocks base method. +func (m *MockStore) UpsertAccounts(ctx context.Context, accounts ...*ledger.Account) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "UpsertAccount", ctx, account) - ret0, _ := ret[0].(bool) - ret1, _ := ret[1].(error) - return ret0, ret1 + varargs := []any{ctx} + for _, a := range accounts { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "UpsertAccounts", varargs...) + ret0, _ := ret[0].(error) + return ret0 } -// UpsertAccount indicates an expected call of UpsertAccount. -func (mr *MockStoreMockRecorder) UpsertAccount(ctx, account any) *gomock.Call { +// UpsertAccounts indicates an expected call of UpsertAccounts. +func (mr *MockStoreMockRecorder) UpsertAccounts(ctx any, accounts ...any) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpsertAccount", reflect.TypeOf((*MockStore)(nil).UpsertAccount), ctx, account) + varargs := append([]any{ctx}, accounts...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpsertAccounts", reflect.TypeOf((*MockStore)(nil).UpsertAccounts), varargs...) } diff --git a/internal/storage/ledger/accounts.go b/internal/storage/ledger/accounts.go index 1e34ca469..6b1972ce3 100644 --- a/internal/storage/ledger/accounts.go +++ b/internal/storage/ledger/accounts.go @@ -2,7 +2,6 @@ package ledger import ( "context" - "database/sql" "fmt" . "github.com/formancehq/go-libs/v2/bun/bunpaginate" "github.com/formancehq/ledger/pkg/features" @@ -12,11 +11,8 @@ import ( "github.com/formancehq/go-libs/v2/metadata" "github.com/formancehq/go-libs/v2/platform/postgres" - ledgercontroller "github.com/formancehq/ledger/internal/controller/ledger" - "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/trace" - "github.com/formancehq/go-libs/v2/time" + ledgercontroller "github.com/formancehq/ledger/internal/controller/ledger" "github.com/formancehq/go-libs/v2/query" ledger "github.com/formancehq/ledger/internal" @@ -333,45 +329,29 @@ func (s *Store) DeleteAccountMetadata(ctx context.Context, account, key string) return err } -// todo: since we update first balances of an accounts in the transaction process, we can avoid nested sql txs -// while upserting account and upsert them all in one shot -func (s *Store) UpsertAccount(ctx context.Context, account *ledger.Account) (bool, error) { - return tracing.TraceWithMetric( +func (s *Store) UpsertAccounts(ctx context.Context, accounts ...*ledger.Account) error { + return tracing.SkipResult(tracing.TraceWithMetric( ctx, - "UpsertAccount", + "UpsertAccounts", s.tracer, - s.upsertAccountHistogram, - func(ctx context.Context) (bool, error) { - upserted := false - err := s.db.RunInTx(ctx, &sql.TxOptions{}, func(ctx context.Context, tx bun.Tx) error { - ret, err := tx.NewInsert(). - Model(account). - ModelTableExpr(s.GetPrefixedRelationName("accounts")). - On("conflict (ledger, address) do update"). - Set("first_usage = case when ? < excluded.first_usage then ? else excluded.first_usage end", account.FirstUsage, account.FirstUsage). - Set("metadata = accounts.metadata || excluded.metadata"). - Set("updated_at = excluded.updated_at"). - Value("ledger", "?", s.ledger.Name). - Returning("*"). - Where("(? < accounts.first_usage) or not accounts.metadata @> excluded.metadata", account.FirstUsage). - Exec(ctx) - if err != nil { - return err - } - rowsModified, err := ret.RowsAffected() - if err != nil { - return err - } - upserted = rowsModified > 0 - return nil - }) - return upserted, postgres.ResolveError(err) - }, - func(ctx context.Context, upserted bool) { - trace.SpanFromContext(ctx).SetAttributes( - attribute.String("address", account.Address), - attribute.Bool("upserted", upserted), - ) - }, - ) + s.upsertAccountsHistogram, + tracing.NoResult(func(ctx context.Context) error { + _, err := s.db.NewInsert(). + Model(&accounts). + ModelTableExpr(s.GetPrefixedRelationName("accounts")). + On("conflict (ledger, address) do update"). + Set("first_usage = case when excluded.first_usage < accounts.first_usage then excluded.first_usage else accounts.first_usage end"). + Set("metadata = accounts.metadata || excluded.metadata"). + Set("updated_at = excluded.updated_at"). + Value("ledger", "?", s.ledger.Name). + Returning("*"). + Where("(excluded.first_usage < accounts.first_usage) or not accounts.metadata @> excluded.metadata"). + Exec(ctx) + if err != nil { + return fmt.Errorf("upserting accounts: %w", postgres.ResolveError(err)) + } + + return nil + }), + )) } diff --git a/internal/storage/ledger/accounts_test.go b/internal/storage/ledger/accounts_test.go index 5277f5fba..ef0c2f6d1 100644 --- a/internal/storage/ledger/accounts_test.go +++ b/internal/storage/ledger/accounts_test.go @@ -402,22 +402,30 @@ func TestAccountsUpsert(t *testing.T) { store := newLedgerStore(t) ctx := logging.TestingContext() - account := ledger.Account{ + account1 := ledger.Account{ Address: "foo", } + account2 := ledger.Account{ + Address: "foo2", + } + // Initial insert - upserted, err := store.UpsertAccount(ctx, &account) + err := store.UpsertAccounts(ctx, &account1, &account2) require.NoError(t, err) - require.True(t, upserted) - require.NotEmpty(t, account.FirstUsage) - require.NotEmpty(t, account.InsertionDate) - require.NotEmpty(t, account.UpdatedAt) + + require.NotEmpty(t, account1.FirstUsage) + require.NotEmpty(t, account1.InsertionDate) + require.NotEmpty(t, account1.UpdatedAt) + + require.NotEmpty(t, account2.FirstUsage) + require.NotEmpty(t, account2.InsertionDate) + require.NotEmpty(t, account2.UpdatedAt) now := time.Now() // Reset the account model - account = ledger.Account{ + account1 = ledger.Account{ Address: "foo", // The account will be upserted on the timeline after its initial usage. // The upsert should not modify anything, but, it should retrieve and load the account entity @@ -427,7 +435,6 @@ func TestAccountsUpsert(t *testing.T) { } // Upsert with no modification - upserted, err = store.UpsertAccount(ctx, &account) + err = store.UpsertAccounts(ctx, &account1) require.NoError(t, err) - require.False(t, upserted) } diff --git a/internal/storage/ledger/balances_test.go b/internal/storage/ledger/balances_test.go index 24a6c9bbc..5b32c9aaa 100644 --- a/internal/storage/ledger/balances_test.go +++ b/internal/storage/ledger/balances_test.go @@ -33,7 +33,7 @@ func TestBalancesGet(t *testing.T) { UpdatedAt: time.Now(), FirstUsage: time.Now(), } - _, err := store.UpsertAccount(ctx, world) + err := store.UpsertAccounts(ctx, world) require.NoError(t, err) _, err = store.UpdateVolumes(ctx, ledger.AccountsVolumes{ @@ -146,7 +146,7 @@ func TestBalancesGet(t *testing.T) { InsertionDate: tx.InsertedAt, UpdatedAt: tx.InsertedAt, } - _, err = store.UpsertAccount(ctx, &bankAccount) + err = store.UpsertAccounts(ctx, &bankAccount) require.NoError(t, err) err = store.InsertMoves(ctx, &ledger.Move{ diff --git a/internal/storage/ledger/legacy/adapters.go b/internal/storage/ledger/legacy/adapters.go index cb0d20eb3..807bb1678 100644 --- a/internal/storage/ledger/legacy/adapters.go +++ b/internal/storage/ledger/legacy/adapters.go @@ -46,8 +46,8 @@ func (d *DefaultStoreAdapter) UpdateAccountsMetadata(ctx context.Context, m map[ return d.newStore.UpdateAccountsMetadata(ctx, m) } -func (d *DefaultStoreAdapter) UpsertAccount(ctx context.Context, account *ledger.Account) (bool, error) { - return d.newStore.UpsertAccount(ctx, account) +func (d *DefaultStoreAdapter) UpsertAccounts(ctx context.Context, accounts ... *ledger.Account) error { + return d.newStore.UpsertAccounts(ctx, accounts...) } func (d *DefaultStoreAdapter) DeleteAccountMetadata(ctx context.Context, address, key string) error { diff --git a/internal/storage/ledger/moves_test.go b/internal/storage/ledger/moves_test.go index e667ee8de..02ace80c4 100644 --- a/internal/storage/ledger/moves_test.go +++ b/internal/storage/ledger/moves_test.go @@ -38,7 +38,7 @@ func TestMovesInsert(t *testing.T) { account := &ledger.Account{ Address: "world", } - _, err := store.UpsertAccount(ctx, account) + err := store.UpsertAccounts(ctx, account) require.NoError(t, err) now := time.Now() diff --git a/internal/storage/ledger/store.go b/internal/storage/ledger/store.go index 87bcfeb44..b15a0eafc 100644 --- a/internal/storage/ledger/store.go +++ b/internal/storage/ledger/store.go @@ -31,9 +31,9 @@ type Store struct { getAccountHistogram metric.Int64Histogram countAccountsHistogram metric.Int64Histogram updateAccountsMetadataHistogram metric.Int64Histogram - deleteAccountMetadataHistogram metric.Int64Histogram - upsertAccountHistogram metric.Int64Histogram - getBalancesHistogram metric.Int64Histogram + deleteAccountMetadataHistogram metric.Int64Histogram + upsertAccountsHistogram metric.Int64Histogram + getBalancesHistogram metric.Int64Histogram insertLogHistogram metric.Int64Histogram listLogsHistogram metric.Int64Histogram readLogWithIdempotencyKeyHistogram metric.Int64Histogram @@ -154,7 +154,7 @@ func New(db bun.IDB, bucket bucket.Bucket, ledger ledger.Ledger, opts ...Option) panic(err) } - ret.upsertAccountHistogram, err = ret.meter.Int64Histogram("store.upsertAccount") + ret.upsertAccountsHistogram, err = ret.meter.Int64Histogram("store.upsertAccounts") if err != nil { panic(err) } diff --git a/internal/storage/ledger/transactions.go b/internal/storage/ledger/transactions.go index 3626c5dc1..4600a9642 100644 --- a/internal/storage/ledger/transactions.go +++ b/internal/storage/ledger/transactions.go @@ -255,7 +255,7 @@ func (s *Store) CommitTransaction(ctx context.Context, tx *ledger.Transaction) e } for _, address := range tx.InvolvedAccounts() { - _, err := s.UpsertAccount(ctx, &ledger.Account{ + err := s.UpsertAccounts(ctx, &ledger.Account{ Address: address, FirstUsage: tx.Timestamp, Metadata: make(metadata.Metadata),