From 7a4504cde33082bef11b574157c68bf80ea0211a Mon Sep 17 00:00:00 2001 From: Geoffrey Ragot Date: Thu, 3 Oct 2024 16:01:05 +0200 Subject: [PATCH] feat: simplify locking system --- internal/README.md | 14 ++-- internal/storage/ledger/accounts.go | 90 ++++++------------------- internal/storage/ledger/transactions.go | 11 --- test/e2e/lifecycle_test.go | 2 +- 4 files changed, 28 insertions(+), 89 deletions(-) diff --git a/internal/README.md b/internal/README.md index d494188d7..eaa32e484 100644 --- a/internal/README.md +++ b/internal/README.md @@ -112,7 +112,7 @@ import "github.com/formancehq/ledger/internal" ## Constants - +Current set of features: | Name | Default value | Possible configuration | Description | |\-|\-|\-|\-| | ACCOUNT\_METADATA\_HISTORY | ```go const ( @@ -271,7 +271,7 @@ type BalancesByAssetsByAccounts map[string]BalancesByAssets ``` -## type [Configuration]() +## type [Configuration]() @@ -284,7 +284,7 @@ type Configuration struct { ``` -### func [NewDefaultConfiguration]() +### func [NewDefaultConfiguration]() ```go func NewDefaultConfiguration() Configuration @@ -293,7 +293,7 @@ func NewDefaultConfiguration() Configuration -### func \(\*Configuration\) [SetDefaults]() +### func \(\*Configuration\) [SetDefaults]() ```go func (c *Configuration) SetDefaults() @@ -302,7 +302,7 @@ func (c *Configuration) SetDefaults() -### func \(\*Configuration\) [Validate]() +### func \(\*Configuration\) [Validate]() ```go func (c *Configuration) Validate() error @@ -421,7 +421,7 @@ func (e ErrInvalidLedgerName) Is(err error) bool -## type [FeatureSet]() +## type [FeatureSet]() @@ -430,7 +430,7 @@ type FeatureSet map[string]string ``` -### func \(FeatureSet\) [With]() +### func \(FeatureSet\) [With]() ```go func (f FeatureSet) With(feature, value string) FeatureSet diff --git a/internal/storage/ledger/accounts.go b/internal/storage/ledger/accounts.go index a4b143e68..2bfa52228 100644 --- a/internal/storage/ledger/accounts.go +++ b/internal/storage/ledger/accounts.go @@ -9,8 +9,6 @@ import ( . "github.com/formancehq/go-libs/bun/bunpaginate" "github.com/formancehq/ledger/internal/tracing" - "errors" - "github.com/formancehq/go-libs/logging" "github.com/formancehq/go-libs/metadata" "github.com/formancehq/go-libs/platform/postgres" ledgercontroller "github.com/formancehq/ledger/internal/controller/ledger" @@ -300,83 +298,35 @@ func (s *Store) DeleteAccountMetadata(ctx context.Context, account, key string) } func (s *Store) UpsertAccount(ctx context.Context, account *ledger.Account) (bool, error) { - var rollbacked = errors.New("rollbacked") - upserted, err := tracing.TraceWithLatency(ctx, "UpsertAccount", func(ctx context.Context) (bool, error) { - type upsertedEntity struct { - ledger.Account `bun:",extend"` - Upserted bool `bun:"upserted"` - } - upserted := &upsertedEntity{} - + return tracing.TraceWithLatency(ctx, "UpsertAccount", func(ctx context.Context) (bool, error) { + upserted := false err := s.db.RunInTx(ctx, &sql.TxOptions{}, func(ctx context.Context, tx bun.Tx) error { - if err := tx.NewSelect(). - With( - "ins", - tx.NewInsert(). - Model(account). - ModelTableExpr(s.GetPrefixedRelationName("accounts")). - On("conflict (ledger, address) do update"). - Set("first_usage = case when ? < excluded.first_usage then ? else excluded.first_usage end", account.FirstUsage, account.FirstUsage). - Set("metadata = accounts.metadata || excluded.metadata"). - Set("updated_at = ?", account.UpdatedAt). - Value("ledger", "?", s.ledger.Name). - Returning("*"). - Where("(? < accounts.first_usage) or not accounts.metadata @> excluded.metadata", account.FirstUsage), - ). - ModelTableExpr( - "(?) account", - tx.NewSelect(). - ModelTableExpr("ins"). - ColumnExpr("ins.*, true as upserted"). - UnionAll( - tx.NewSelect(). - ModelTableExpr(s.GetPrefixedRelationName("accounts")). - ColumnExpr("*, false as upserted"). - Where("address = ? and ledger = ?", account.Address, s.ledger.Name). - Limit(1), - ), - ). - Model(upserted). - ColumnExpr("*"). - Limit(1). - Scan(ctx); err != nil { - return postgres.ResolveError(err) + ret, err := tx.NewInsert(). + Model(account). + ModelTableExpr(s.GetPrefixedRelationName("accounts")). + On("conflict (ledger, address) do update"). + Set("first_usage = case when ? < excluded.first_usage then ? else excluded.first_usage end", account.FirstUsage, account.FirstUsage). + Set("metadata = accounts.metadata || excluded.metadata"). + Set("updated_at = ?", account.UpdatedAt). + Value("ledger", "?", s.ledger.Name). + Returning("*"). + Where("(? < accounts.first_usage) or not accounts.metadata @> excluded.metadata", account.FirstUsage). + Exec(ctx) + if err != nil { + return err } - - account.FirstUsage = upserted.FirstUsage - account.InsertionDate = upserted.InsertionDate - account.UpdatedAt = upserted.UpdatedAt - account.Metadata = upserted.Metadata - - if !upserted.Upserted { - // By roll-backing the transaction, we release the lock, allowing a concurrent transaction - // to use the accounts. - if err := tx.Rollback(); err != nil { - return err - } - return rollbacked + rowsModified, err := ret.RowsAffected() + if err != nil { + return err } - + upserted = rowsModified > 0 return nil }) - if err != nil && !errors.Is(err, rollbacked) { - return false, fmt.Errorf("upserting account: %w", err) - } - - return upserted.Upserted, nil + return upserted, err }, func(ctx context.Context, upserted bool) { trace.SpanFromContext(ctx).SetAttributes( attribute.String("address", account.Address), attribute.Bool("upserted", upserted), ) }) - if err != nil && !errors.Is(err, rollbacked) { - return false, fmt.Errorf("failed to upsert account: %w", err) - } else if upserted { - logging.FromContext(ctx).Debugf("account upserted") - } else { - logging.FromContext(ctx).Debugf("account not modified") - } - - return upserted, nil } diff --git a/internal/storage/ledger/transactions.go b/internal/storage/ledger/transactions.go index ecb52e030..4da5e8d06 100644 --- a/internal/storage/ledger/transactions.go +++ b/internal/storage/ledger/transactions.go @@ -12,7 +12,6 @@ import ( "github.com/formancehq/ledger/internal/tracing" "errors" - . "github.com/formancehq/go-libs/collectionutils" "github.com/formancehq/go-libs/platform/postgres" ledgercontroller "github.com/formancehq/ledger/internal/controller/ledger" "go.opentelemetry.io/otel/attribute" @@ -232,16 +231,6 @@ func (s *Store) selectTransactions(date *time.Time, expandVolumes, expandEffecti } func (s *Store) CommitTransaction(ctx context.Context, tx *ledger.Transaction) error { - - sqlQueries := Map(tx.InvolvedAccounts(), func(from string) string { - return fmt.Sprintf("select pg_advisory_xact_lock(hashtext('%s'))", fmt.Sprintf("%s_%s", s.ledger.Name, from)) - }) - - _, err := s.db.NewRaw(strings.Join(sqlQueries, ";")).Exec(ctx) - if err != nil { - return postgres.ResolveError(err) - } - if tx.InsertedAt.IsZero() { tx.InsertedAt = time.Now() } diff --git a/test/e2e/lifecycle_test.go b/test/e2e/lifecycle_test.go index b6fe7e684..1081979c1 100644 --- a/test/e2e/lifecycle_test.go +++ b/test/e2e/lifecycle_test.go @@ -102,7 +102,7 @@ var _ = Context("Ledger application lifecycle tests", func() { count, err := db.NewSelect(). Table("pg_stat_activity"). Where("state <> 'idle' and pid <> pg_backend_pid()"). - Where(`query like 'select pg_advisory_xact_lock%'`). + Where(`query like 'INSERT INTO "_default".accounts%'`). Count(ctx) g.Expect(err).To(BeNil()) return count