diff --git a/internal/README.md b/internal/README.md
index d494188d7..eaa32e484 100644
--- a/internal/README.md
+++ b/internal/README.md
@@ -112,7 +112,7 @@ import "github.com/formancehq/ledger/internal"
## Constants
-
+Current set of features: | Name | Default value | Possible configuration | Description | |\-|\-|\-|\-| | ACCOUNT\_METADATA\_HISTORY |
```go
const (
@@ -271,7 +271,7 @@ type BalancesByAssetsByAccounts map[string]BalancesByAssets
```
-## type [Configuration]()
+## type [Configuration]()
@@ -284,7 +284,7 @@ type Configuration struct {
```
-### func [NewDefaultConfiguration]()
+### func [NewDefaultConfiguration]()
```go
func NewDefaultConfiguration() Configuration
@@ -293,7 +293,7 @@ func NewDefaultConfiguration() Configuration
-### func \(\*Configuration\) [SetDefaults]()
+### func \(\*Configuration\) [SetDefaults]()
```go
func (c *Configuration) SetDefaults()
@@ -302,7 +302,7 @@ func (c *Configuration) SetDefaults()
-### func \(\*Configuration\) [Validate]()
+### func \(\*Configuration\) [Validate]()
```go
func (c *Configuration) Validate() error
@@ -421,7 +421,7 @@ func (e ErrInvalidLedgerName) Is(err error) bool
-## type [FeatureSet]()
+## type [FeatureSet]()
@@ -430,7 +430,7 @@ type FeatureSet map[string]string
```
-### func \(FeatureSet\) [With]()
+### func \(FeatureSet\) [With]()
```go
func (f FeatureSet) With(feature, value string) FeatureSet
diff --git a/internal/storage/ledger/accounts.go b/internal/storage/ledger/accounts.go
index a4b143e68..2bfa52228 100644
--- a/internal/storage/ledger/accounts.go
+++ b/internal/storage/ledger/accounts.go
@@ -9,8 +9,6 @@ import (
. "github.com/formancehq/go-libs/bun/bunpaginate"
"github.com/formancehq/ledger/internal/tracing"
- "errors"
- "github.com/formancehq/go-libs/logging"
"github.com/formancehq/go-libs/metadata"
"github.com/formancehq/go-libs/platform/postgres"
ledgercontroller "github.com/formancehq/ledger/internal/controller/ledger"
@@ -300,83 +298,35 @@ func (s *Store) DeleteAccountMetadata(ctx context.Context, account, key string)
}
func (s *Store) UpsertAccount(ctx context.Context, account *ledger.Account) (bool, error) {
- var rollbacked = errors.New("rollbacked")
- upserted, err := tracing.TraceWithLatency(ctx, "UpsertAccount", func(ctx context.Context) (bool, error) {
- type upsertedEntity struct {
- ledger.Account `bun:",extend"`
- Upserted bool `bun:"upserted"`
- }
- upserted := &upsertedEntity{}
-
+ return tracing.TraceWithLatency(ctx, "UpsertAccount", func(ctx context.Context) (bool, error) {
+ upserted := false
err := s.db.RunInTx(ctx, &sql.TxOptions{}, func(ctx context.Context, tx bun.Tx) error {
- if err := tx.NewSelect().
- With(
- "ins",
- tx.NewInsert().
- Model(account).
- ModelTableExpr(s.GetPrefixedRelationName("accounts")).
- On("conflict (ledger, address) do update").
- Set("first_usage = case when ? < excluded.first_usage then ? else excluded.first_usage end", account.FirstUsage, account.FirstUsage).
- Set("metadata = accounts.metadata || excluded.metadata").
- Set("updated_at = ?", account.UpdatedAt).
- Value("ledger", "?", s.ledger.Name).
- Returning("*").
- Where("(? < accounts.first_usage) or not accounts.metadata @> excluded.metadata", account.FirstUsage),
- ).
- ModelTableExpr(
- "(?) account",
- tx.NewSelect().
- ModelTableExpr("ins").
- ColumnExpr("ins.*, true as upserted").
- UnionAll(
- tx.NewSelect().
- ModelTableExpr(s.GetPrefixedRelationName("accounts")).
- ColumnExpr("*, false as upserted").
- Where("address = ? and ledger = ?", account.Address, s.ledger.Name).
- Limit(1),
- ),
- ).
- Model(upserted).
- ColumnExpr("*").
- Limit(1).
- Scan(ctx); err != nil {
- return postgres.ResolveError(err)
+ ret, err := tx.NewInsert().
+ Model(account).
+ ModelTableExpr(s.GetPrefixedRelationName("accounts")).
+ On("conflict (ledger, address) do update").
+ Set("first_usage = case when ? < excluded.first_usage then ? else excluded.first_usage end", account.FirstUsage, account.FirstUsage).
+ Set("metadata = accounts.metadata || excluded.metadata").
+ Set("updated_at = ?", account.UpdatedAt).
+ Value("ledger", "?", s.ledger.Name).
+ Returning("*").
+ Where("(? < accounts.first_usage) or not accounts.metadata @> excluded.metadata", account.FirstUsage).
+ Exec(ctx)
+ if err != nil {
+ return err
}
-
- account.FirstUsage = upserted.FirstUsage
- account.InsertionDate = upserted.InsertionDate
- account.UpdatedAt = upserted.UpdatedAt
- account.Metadata = upserted.Metadata
-
- if !upserted.Upserted {
- // By roll-backing the transaction, we release the lock, allowing a concurrent transaction
- // to use the accounts.
- if err := tx.Rollback(); err != nil {
- return err
- }
- return rollbacked
+ rowsModified, err := ret.RowsAffected()
+ if err != nil {
+ return err
}
-
+ upserted = rowsModified > 0
return nil
})
- if err != nil && !errors.Is(err, rollbacked) {
- return false, fmt.Errorf("upserting account: %w", err)
- }
-
- return upserted.Upserted, nil
+ return upserted, err
}, func(ctx context.Context, upserted bool) {
trace.SpanFromContext(ctx).SetAttributes(
attribute.String("address", account.Address),
attribute.Bool("upserted", upserted),
)
})
- if err != nil && !errors.Is(err, rollbacked) {
- return false, fmt.Errorf("failed to upsert account: %w", err)
- } else if upserted {
- logging.FromContext(ctx).Debugf("account upserted")
- } else {
- logging.FromContext(ctx).Debugf("account not modified")
- }
-
- return upserted, nil
}
diff --git a/internal/storage/ledger/transactions.go b/internal/storage/ledger/transactions.go
index ecb52e030..4da5e8d06 100644
--- a/internal/storage/ledger/transactions.go
+++ b/internal/storage/ledger/transactions.go
@@ -12,7 +12,6 @@ import (
"github.com/formancehq/ledger/internal/tracing"
"errors"
- . "github.com/formancehq/go-libs/collectionutils"
"github.com/formancehq/go-libs/platform/postgres"
ledgercontroller "github.com/formancehq/ledger/internal/controller/ledger"
"go.opentelemetry.io/otel/attribute"
@@ -232,16 +231,6 @@ func (s *Store) selectTransactions(date *time.Time, expandVolumes, expandEffecti
}
func (s *Store) CommitTransaction(ctx context.Context, tx *ledger.Transaction) error {
-
- sqlQueries := Map(tx.InvolvedAccounts(), func(from string) string {
- return fmt.Sprintf("select pg_advisory_xact_lock(hashtext('%s'))", fmt.Sprintf("%s_%s", s.ledger.Name, from))
- })
-
- _, err := s.db.NewRaw(strings.Join(sqlQueries, ";")).Exec(ctx)
- if err != nil {
- return postgres.ResolveError(err)
- }
-
if tx.InsertedAt.IsZero() {
tx.InsertedAt = time.Now()
}
diff --git a/test/e2e/lifecycle_test.go b/test/e2e/lifecycle_test.go
index b6fe7e684..1081979c1 100644
--- a/test/e2e/lifecycle_test.go
+++ b/test/e2e/lifecycle_test.go
@@ -102,7 +102,7 @@ var _ = Context("Ledger application lifecycle tests", func() {
count, err := db.NewSelect().
Table("pg_stat_activity").
Where("state <> 'idle' and pid <> pg_backend_pid()").
- Where(`query like 'select pg_advisory_xact_lock%'`).
+ Where(`query like 'INSERT INTO "_default".accounts%'`).
Count(ctx)
g.Expect(err).To(BeNil())
return count