Skip to content

Commit

Permalink
feat: simplify locking system
Browse files Browse the repository at this point in the history
  • Loading branch information
gfyrag committed Oct 16, 2024
1 parent 9c68380 commit 7a4504c
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 89 deletions.
14 changes: 7 additions & 7 deletions internal/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ import "github.com/formancehq/ledger/internal"

## Constants

<a name="FeatureMovesHistory"></a>
<a name="FeatureMovesHistory"></a>Current set of features: | Name | Default value | Possible configuration | Description | |\-|\-|\-|\-| | ACCOUNT\_METADATA\_HISTORY |

```go
const (
Expand Down Expand Up @@ -271,7 +271,7 @@ type BalancesByAssetsByAccounts map[string]BalancesByAssets
```

<a name="Configuration"></a>
## type [Configuration](<https://github.com/formancehq/ledger/blob/main/internal/ledger.go#L148-L152>)
## type [Configuration](<https://github.com/formancehq/ledger/blob/main/internal/ledger.go#L152-L156>)



Expand All @@ -284,7 +284,7 @@ type Configuration struct {
```

<a name="NewDefaultConfiguration"></a>
### func [NewDefaultConfiguration](<https://github.com/formancehq/ledger/blob/main/internal/ledger.go#L179>)
### func [NewDefaultConfiguration](<https://github.com/formancehq/ledger/blob/main/internal/ledger.go#L183>)

```go
func NewDefaultConfiguration() Configuration
Expand All @@ -293,7 +293,7 @@ func NewDefaultConfiguration() Configuration


<a name="Configuration.SetDefaults"></a>
### func \(\*Configuration\) [SetDefaults](<https://github.com/formancehq/ledger/blob/main/internal/ledger.go#L154>)
### func \(\*Configuration\) [SetDefaults](<https://github.com/formancehq/ledger/blob/main/internal/ledger.go#L158>)

```go
func (c *Configuration) SetDefaults()
Expand All @@ -302,7 +302,7 @@ func (c *Configuration) SetDefaults()


<a name="Configuration.Validate"></a>
### func \(\*Configuration\) [Validate](<https://github.com/formancehq/ledger/blob/main/internal/ledger.go#L169>)
### func \(\*Configuration\) [Validate](<https://github.com/formancehq/ledger/blob/main/internal/ledger.go#L173>)

```go
func (c *Configuration) Validate() error
Expand Down Expand Up @@ -421,7 +421,7 @@ func (e ErrInvalidLedgerName) Is(err error) bool


<a name="FeatureSet"></a>
## type [FeatureSet](<https://github.com/formancehq/ledger/blob/main/internal/ledger.go#L136>)
## type [FeatureSet](<https://github.com/formancehq/ledger/blob/main/internal/ledger.go#L140>)



Expand All @@ -430,7 +430,7 @@ type FeatureSet map[string]string
```

<a name="FeatureSet.With"></a>
### func \(FeatureSet\) [With](<https://github.com/formancehq/ledger/blob/main/internal/ledger.go#L138>)
### func \(FeatureSet\) [With](<https://github.com/formancehq/ledger/blob/main/internal/ledger.go#L142>)

```go
func (f FeatureSet) With(feature, value string) FeatureSet
Expand Down
90 changes: 20 additions & 70 deletions internal/storage/ledger/accounts.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@ import (
. "github.com/formancehq/go-libs/bun/bunpaginate"
"github.com/formancehq/ledger/internal/tracing"

"errors"
"github.com/formancehq/go-libs/logging"
"github.com/formancehq/go-libs/metadata"
"github.com/formancehq/go-libs/platform/postgres"
ledgercontroller "github.com/formancehq/ledger/internal/controller/ledger"
Expand Down Expand Up @@ -300,83 +298,35 @@ func (s *Store) DeleteAccountMetadata(ctx context.Context, account, key string)
}

func (s *Store) UpsertAccount(ctx context.Context, account *ledger.Account) (bool, error) {
var rollbacked = errors.New("rollbacked")
upserted, err := tracing.TraceWithLatency(ctx, "UpsertAccount", func(ctx context.Context) (bool, error) {
type upsertedEntity struct {
ledger.Account `bun:",extend"`
Upserted bool `bun:"upserted"`
}
upserted := &upsertedEntity{}

return tracing.TraceWithLatency(ctx, "UpsertAccount", func(ctx context.Context) (bool, error) {
upserted := false
err := s.db.RunInTx(ctx, &sql.TxOptions{}, func(ctx context.Context, tx bun.Tx) error {
if err := tx.NewSelect().
With(
"ins",
tx.NewInsert().
Model(account).
ModelTableExpr(s.GetPrefixedRelationName("accounts")).
On("conflict (ledger, address) do update").
Set("first_usage = case when ? < excluded.first_usage then ? else excluded.first_usage end", account.FirstUsage, account.FirstUsage).
Set("metadata = accounts.metadata || excluded.metadata").
Set("updated_at = ?", account.UpdatedAt).
Value("ledger", "?", s.ledger.Name).
Returning("*").
Where("(? < accounts.first_usage) or not accounts.metadata @> excluded.metadata", account.FirstUsage),
).
ModelTableExpr(
"(?) account",
tx.NewSelect().
ModelTableExpr("ins").
ColumnExpr("ins.*, true as upserted").
UnionAll(
tx.NewSelect().
ModelTableExpr(s.GetPrefixedRelationName("accounts")).
ColumnExpr("*, false as upserted").
Where("address = ? and ledger = ?", account.Address, s.ledger.Name).
Limit(1),
),
).
Model(upserted).
ColumnExpr("*").
Limit(1).
Scan(ctx); err != nil {
return postgres.ResolveError(err)
ret, err := tx.NewInsert().
Model(account).
ModelTableExpr(s.GetPrefixedRelationName("accounts")).
On("conflict (ledger, address) do update").
Set("first_usage = case when ? < excluded.first_usage then ? else excluded.first_usage end", account.FirstUsage, account.FirstUsage).
Set("metadata = accounts.metadata || excluded.metadata").
Set("updated_at = ?", account.UpdatedAt).
Value("ledger", "?", s.ledger.Name).
Returning("*").
Where("(? < accounts.first_usage) or not accounts.metadata @> excluded.metadata", account.FirstUsage).
Exec(ctx)
if err != nil {
return err
}

account.FirstUsage = upserted.FirstUsage
account.InsertionDate = upserted.InsertionDate
account.UpdatedAt = upserted.UpdatedAt
account.Metadata = upserted.Metadata

if !upserted.Upserted {
// By roll-backing the transaction, we release the lock, allowing a concurrent transaction
// to use the accounts.
if err := tx.Rollback(); err != nil {
return err
}
return rollbacked
rowsModified, err := ret.RowsAffected()
if err != nil {
return err
}

upserted = rowsModified > 0
return nil
})
if err != nil && !errors.Is(err, rollbacked) {
return false, fmt.Errorf("upserting account: %w", err)
}

return upserted.Upserted, nil
return upserted, err
}, func(ctx context.Context, upserted bool) {
trace.SpanFromContext(ctx).SetAttributes(
attribute.String("address", account.Address),
attribute.Bool("upserted", upserted),
)
})
if err != nil && !errors.Is(err, rollbacked) {
return false, fmt.Errorf("failed to upsert account: %w", err)
} else if upserted {
logging.FromContext(ctx).Debugf("account upserted")
} else {
logging.FromContext(ctx).Debugf("account not modified")
}

return upserted, nil
}
11 changes: 0 additions & 11 deletions internal/storage/ledger/transactions.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"github.com/formancehq/ledger/internal/tracing"

"errors"
. "github.com/formancehq/go-libs/collectionutils"
"github.com/formancehq/go-libs/platform/postgres"
ledgercontroller "github.com/formancehq/ledger/internal/controller/ledger"
"go.opentelemetry.io/otel/attribute"
Expand Down Expand Up @@ -232,16 +231,6 @@ func (s *Store) selectTransactions(date *time.Time, expandVolumes, expandEffecti
}

func (s *Store) CommitTransaction(ctx context.Context, tx *ledger.Transaction) error {

sqlQueries := Map(tx.InvolvedAccounts(), func(from string) string {
return fmt.Sprintf("select pg_advisory_xact_lock(hashtext('%s'))", fmt.Sprintf("%s_%s", s.ledger.Name, from))
})

_, err := s.db.NewRaw(strings.Join(sqlQueries, ";")).Exec(ctx)
if err != nil {
return postgres.ResolveError(err)
}

if tx.InsertedAt.IsZero() {
tx.InsertedAt = time.Now()
}
Expand Down
2 changes: 1 addition & 1 deletion test/e2e/lifecycle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ var _ = Context("Ledger application lifecycle tests", func() {
count, err := db.NewSelect().
Table("pg_stat_activity").
Where("state <> 'idle' and pid <> pg_backend_pid()").
Where(`query like 'select pg_advisory_xact_lock%'`).
Where(`query like 'INSERT INTO "_default".accounts%'`).
Count(ctx)
g.Expect(err).To(BeNil())
return count
Expand Down

0 comments on commit 7a4504c

Please sign in to comment.