Skip to content

Commit

Permalink
chore: simplify some code
Browse files Browse the repository at this point in the history
  • Loading branch information
gfyrag committed Oct 23, 2024
1 parent a759184 commit 0a30efe
Show file tree
Hide file tree
Showing 8 changed files with 123 additions and 102 deletions.
7 changes: 4 additions & 3 deletions internal/api/v1/controllers_logs_list_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"github.com/formancehq/go-libs/api"
"github.com/formancehq/go-libs/auth"
"github.com/formancehq/go-libs/bun/bunpaginate"
"github.com/formancehq/go-libs/metadata"
"github.com/formancehq/go-libs/query"
"github.com/formancehq/go-libs/time"
ledger "github.com/formancehq/ledger/internal"
Expand Down Expand Up @@ -77,8 +76,10 @@ func TestGetLogs(t *testing.T) {

expectedCursor := bunpaginate.Cursor[ledger.Log]{
Data: []ledger.Log{
ledger.NewTransactionLog(ledger.NewTransaction(), map[string]metadata.Metadata{}).
ChainLog(nil),
ledger.NewTransactionLog(ledger.CreatedTransaction{
Transaction: ledger.NewTransaction(),
AccountMetadata: ledger.AccountMetadata{},
}).ChainLog(nil),
},
}

Expand Down
6 changes: 4 additions & 2 deletions internal/api/v2/controllers_logs_export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (

"github.com/formancehq/go-libs/api"
"github.com/formancehq/go-libs/auth"
"github.com/formancehq/go-libs/metadata"
ledger "github.com/formancehq/ledger/internal"
ledgercontroller "github.com/formancehq/ledger/internal/controller/ledger"
"github.com/pkg/errors"
Expand Down Expand Up @@ -46,7 +45,10 @@ func TestLogsExport(t *testing.T) {
tc.expectStatusCode = http.StatusOK
}

log := ledger.NewTransactionLog(ledger.NewTransaction(), map[string]metadata.Metadata{})
log := ledger.NewTransactionLog(ledger.CreatedTransaction{
Transaction: ledger.NewTransaction(),
AccountMetadata: ledger.AccountMetadata{},
})

systemController, ledgerController := newTestingSystemController(t, true)
ledgerController.EXPECT().
Expand Down
6 changes: 4 additions & 2 deletions internal/api/v2/controllers_logs_import_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (

"github.com/formancehq/go-libs/api"
"github.com/formancehq/go-libs/auth"
"github.com/formancehq/go-libs/metadata"
ledger "github.com/formancehq/ledger/internal"
"github.com/pkg/errors"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -47,7 +46,10 @@ func TestLogsImport(t *testing.T) {
tc.expectStatusCode = http.StatusNoContent
}

log := ledger.NewTransactionLog(ledger.NewTransaction(), map[string]metadata.Metadata{})
log := ledger.NewTransactionLog(ledger.CreatedTransaction{
Transaction: ledger.NewTransaction(),
AccountMetadata: ledger.AccountMetadata{},
})

systemController, ledgerController := newTestingSystemController(t, true)
ledgerController.EXPECT().
Expand Down
6 changes: 4 additions & 2 deletions internal/api/v2/controllers_logs_list_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"github.com/formancehq/go-libs/api"
"github.com/formancehq/go-libs/auth"
"github.com/formancehq/go-libs/bun/bunpaginate"
"github.com/formancehq/go-libs/metadata"
"github.com/formancehq/go-libs/query"
"github.com/formancehq/go-libs/time"
ledger "github.com/formancehq/ledger/internal"
Expand Down Expand Up @@ -113,7 +112,10 @@ func TestGetLogs(t *testing.T) {

expectedCursor := bunpaginate.Cursor[ledger.Log]{
Data: []ledger.Log{
ledger.NewTransactionLog(ledger.NewTransaction(), map[string]metadata.Metadata{}).
ledger.NewTransactionLog(ledger.CreatedTransaction{
Transaction: ledger.NewTransaction(),
AccountMetadata: ledger.AccountMetadata{},
}).
ChainLog(nil),
},
}
Expand Down
111 changes: 61 additions & 50 deletions internal/controller/ledger/controller_default.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/formancehq/go-libs/metadata"
"github.com/formancehq/go-libs/pointer"
"github.com/google/uuid"
// todo: remove as it is in maintenance mode
"github.com/pkg/errors"

ledger "github.com/formancehq/ledger/internal"
Expand All @@ -42,6 +43,7 @@ func NewDefaultController(
ledger: ledger,
machineFactory: machineFactory,
}

return ret
}

Expand Down Expand Up @@ -138,7 +140,7 @@ func (ctrl *DefaultController) importLog(ctx context.Context, sqlTx TX, log ledg
if err != nil {
return errors.Wrap(err, "failed to revert transaction")
}
case ledger.SetMetadata:
case ledger.SavedMetadata:
switch payload.TargetType {
case ledger.MetaTargetTypeTransaction:
if _, _, err := sqlTx.UpdateTransactionMetadata(ctx, payload.TargetID.(int), payload.Metadata); err != nil {
Expand Down Expand Up @@ -215,16 +217,16 @@ func (ctrl *DefaultController) CreateTransaction(ctx context.Context, parameters
return nil, errors.Wrap(err, "failed to compile script")
}

log, err := forgeLog(ctx, ctrl.store, parameters, func(ctx context.Context, sqlTX TX, input RunScript) (*ledger.Log, error) {
output, err := forgeLog(ctx, ctrl.store, parameters, func(ctx context.Context, sqlTX TX, input RunScript) (*ledger.Log, *ledger.CreatedTransaction, error) {
result, err := tracing.TraceWithLatency(ctx, "ExecuteMachine", func(ctx context.Context) (*MachineResult, error) {
return m.Execute(ctx, newVmStoreAdapter(sqlTX), input.Vars)
})
if err != nil {
return nil, errors.Wrap(err, "failed to execute program")
return nil, nil, errors.Wrap(err, "failed to execute program")
}

if len(result.Postings) == 0 {
return nil, ErrNoPostings
return nil, nil, ErrNoPostings
}

finalMetadata := result.Metadata
Expand All @@ -233,7 +235,7 @@ func (ctrl *DefaultController) CreateTransaction(ctx context.Context, parameters
}
for k, v := range input.Metadata {
if finalMetadata[k] != "" {
return nil, newErrMetadataOverride(k)
return nil, nil, newErrMetadataOverride(k)
}
finalMetadata[k] = v
}
Expand All @@ -252,51 +254,49 @@ func (ctrl *DefaultController) CreateTransaction(ctx context.Context, parameters
WithReference(input.Reference)
err = sqlTX.CommitTransaction(ctx, &transaction)
if err != nil {
return nil, err
return nil, nil, err
}

if len(result.AccountMetadata) > 0 {
if err := sqlTX.UpdateAccountsMetadata(ctx, result.AccountMetadata); err != nil {
return nil, errors.Wrapf(err, "updating metadata of account '%s'", Keys(result.AccountMetadata))
return nil, nil, errors.Wrapf(err, "updating metadata of account '%s'", Keys(result.AccountMetadata))
}
}

return pointer.For(ledger.NewTransactionLog(transaction, result.AccountMetadata)), err
createdTransaction := ledger.CreatedTransaction{
Transaction: transaction,
AccountMetadata: result.AccountMetadata,
}

return pointer.For(ledger.NewTransactionLog(createdTransaction)), &createdTransaction, err
})
if err != nil {
return nil, err
}

transaction := log.Data.(ledger.CreatedTransaction).Transaction
accountMetadata := log.Data.(ledger.CreatedTransaction).AccountMetadata

return &ledger.CreatedTransaction{
Transaction: transaction,
AccountMetadata: accountMetadata,
}, nil
return output, nil
}

func (ctrl *DefaultController) RevertTransaction(ctx context.Context, parameters Parameters[RevertTransaction]) (*ledger.RevertedTransaction, error) {
var originalTransaction *ledger.Transaction
log, err := forgeLog(ctx, ctrl.store, parameters, func(ctx context.Context, sqlTX TX, input RevertTransaction) (*ledger.Log, error) {
return forgeLog(ctx, ctrl.store, parameters, func(ctx context.Context, sqlTX TX, input RevertTransaction) (*ledger.Log, *ledger.RevertedTransaction, error) {

var (
hasBeenReverted bool
err error
)
originalTransaction, hasBeenReverted, err = sqlTX.RevertTransaction(ctx, input.TransactionID)
originalTransaction, hasBeenReverted, err := sqlTX.RevertTransaction(ctx, input.TransactionID)
if err != nil {
return nil, err
return nil, nil, err
}
if !hasBeenReverted {
return nil, newErrAlreadyReverted(input.TransactionID)
return nil, nil, newErrAlreadyReverted(input.TransactionID)
}

bq := originalTransaction.InvolvedAccountAndAssets()

balances, err := sqlTX.GetBalances(ctx, bq)
if err != nil {
return nil, errors.Wrap(err, "failed to get balances")
return nil, nil, errors.Wrap(err, "failed to get balances")
}

reversedTx := originalTransaction.Reverse()
Expand Down Expand Up @@ -324,88 +324,99 @@ func (ctrl *DefaultController) RevertTransaction(ctx context.Context, parameters
if finalBalance.Cmp(new(big.Int)) < 0 {
// todo(waiting): break dependency on machine package
// notes(gfyrag): wait for the new interpreter
return nil, machine.NewErrInsufficientFund("insufficient fund for %s/%s", account, asset)
return nil, nil, machine.NewErrInsufficientFund("insufficient fund for %s/%s", account, asset)
}
}
}
}

err = sqlTX.CommitTransaction(ctx, &reversedTx)
if err != nil {
return nil, errors.Wrap(err, "failed to insert transaction")
return nil, nil, errors.Wrap(err, "failed to insert transaction")
}

return pointer.For(ledger.NewRevertedTransactionLog(*originalTransaction, reversedTx)), nil
return pointer.For(ledger.NewRevertedTransactionLog(*originalTransaction, reversedTx)), &ledger.RevertedTransaction{
RevertedTransaction: *originalTransaction,
RevertTransaction: reversedTx,
}, nil
})
if err != nil {
return nil, err
}

return &ledger.RevertedTransaction{
RevertedTransaction: *originalTransaction,
RevertTransaction: log.Data.(ledger.RevertedTransaction).RevertTransaction,
}, nil
}

func (ctrl *DefaultController) SaveTransactionMetadata(ctx context.Context, parameters Parameters[SaveTransactionMetadata]) error {
_, err := forgeLog(ctx, ctrl.store, parameters, func(ctx context.Context, sqlTX TX, input SaveTransactionMetadata) (*ledger.Log, error) {
_, err := forgeLog(ctx, ctrl.store, parameters, func(ctx context.Context, sqlTX TX, input SaveTransactionMetadata) (*ledger.Log, *ledger.SavedMetadata, error) {
if _, _, err := sqlTX.UpdateTransactionMetadata(ctx, input.TransactionID, input.Metadata); err != nil {
return nil, err
return nil, nil, err
}

return pointer.For(ledger.NewSetMetadataOnTransactionLog(input.TransactionID, input.Metadata)), nil
})
if err != nil {
return err
}
setMetadata := ledger.SavedMetadata{
TargetType: ledger.MetaTargetTypeTransaction,
TargetID: parameters.Input.TransactionID,
Metadata: parameters.Input.Metadata,
}

return nil
return pointer.For(ledger.NewSetMetadataOnTransactionLog(setMetadata)), &setMetadata, nil
})
return err
}

func (ctrl *DefaultController) SaveAccountMetadata(ctx context.Context, parameters Parameters[SaveAccountMetadata]) error {
now := time.Now()
_, err := forgeLog(ctx, ctrl.store, parameters, func(ctx context.Context, sqlTX TX, input SaveAccountMetadata) (*ledger.Log, error) {
_, err := forgeLog(ctx, ctrl.store, parameters, func(ctx context.Context, sqlTX TX, input SaveAccountMetadata) (*ledger.Log, *ledger.SavedMetadata, error) {
if _, err := sqlTX.UpsertAccount(ctx, &ledger.Account{
Address: input.Address,
Metadata: input.Metadata,
FirstUsage: now,
InsertionDate: now,
UpdatedAt: now,
}); err != nil {
return nil, err
return nil, nil, err
}

setMetadata := ledger.SavedMetadata{
TargetType: ledger.MetaTargetTypeAccount,
TargetID: parameters.Input.Address,
Metadata: parameters.Input.Metadata,
}

return pointer.For(ledger.NewSetMetadataOnAccountLog(input.Address, input.Metadata)), nil
return pointer.For(ledger.NewSetMetadataOnAccountLog(setMetadata)), &setMetadata, nil
})

return err
}

func (ctrl *DefaultController) DeleteTransactionMetadata(ctx context.Context, parameters Parameters[DeleteTransactionMetadata]) error {
_, err := forgeLog(ctx, ctrl.store, parameters, func(ctx context.Context, sqlTX TX, input DeleteTransactionMetadata) (*ledger.Log, error) {
_, err := forgeLog(ctx, ctrl.store, parameters, func(ctx context.Context, sqlTX TX, input DeleteTransactionMetadata) (*ledger.Log, *ledger.DeletedMetadata, error) {
_, modified, err := sqlTX.DeleteTransactionMetadata(ctx, input.TransactionID, input.Key)
if err != nil {
return nil, err
return nil, nil, err
}

if !modified {
return nil, postgres.ErrNotFound
return nil, nil, postgres.ErrNotFound
}

return pointer.For(ledger.NewDeleteTransactionMetadataLog(input.TransactionID, input.Key)), nil
return pointer.For(ledger.NewDeleteTransactionMetadataLog(input.TransactionID, input.Key)), &ledger.DeletedMetadata{
TargetType: ledger.MetaTargetTypeTransaction,
TargetID: parameters.Input.TransactionID,
Key: parameters.Input.Key,
}, nil
})

return err
}

func (ctrl *DefaultController) DeleteAccountMetadata(ctx context.Context, parameters Parameters[DeleteAccountMetadata]) error {
_, err := forgeLog(ctx, ctrl.store, parameters, func(ctx context.Context, sqlTX TX, input DeleteAccountMetadata) (*ledger.Log, error) {
_, err := forgeLog(ctx, ctrl.store, parameters, func(ctx context.Context, sqlTX TX, input DeleteAccountMetadata) (*ledger.Log, *ledger.DeletedMetadata, error) {
err := sqlTX.DeleteAccountMetadata(ctx, input.Address, input.Key)
if err != nil {
return nil, err
return nil, nil, err
}

return pointer.For(ledger.NewDeleteAccountMetadataLog(input.Address, input.Key)), nil
return pointer.For(ledger.NewDeleteAccountMetadataLog(input.Address, input.Key)), &ledger.DeletedMetadata{
TargetType: ledger.MetaTargetTypeAccount,
TargetID: parameters.Input.Address,
Key: parameters.Input.Key,
}, nil
})
return err
}
Expand Down
22 changes: 13 additions & 9 deletions internal/controller/ledger/log_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,19 @@ import (
"context"
"github.com/formancehq/go-libs/logging"
"github.com/formancehq/go-libs/platform/postgres"
"github.com/formancehq/go-libs/pointer"
ledger "github.com/formancehq/ledger/internal"
"github.com/formancehq/ledger/internal/tracing"
"github.com/pkg/errors"
)

func runTx[INPUT any](ctx context.Context, store Store, parameters Parameters[INPUT], fn func(ctx context.Context, sqlTX TX, input INPUT) (*ledger.Log, error)) (*ledger.Log, error) {
var log *ledger.Log
func runTx[INPUT, OUTPUT any](ctx context.Context, store Store, parameters Parameters[INPUT], fn func(ctx context.Context, sqlTX TX, input INPUT) (*ledger.Log, *OUTPUT, error)) (*OUTPUT, error) {
var (
log *ledger.Log
output *OUTPUT
)
err := store.WithTX(ctx, nil, func(tx TX) (commit bool, err error) {
log, err = fn(ctx, tx, parameters.Input)
log, output, err = fn(ctx, tx, parameters.Input)
if err != nil {
return false, err
}
Expand All @@ -33,14 +37,14 @@ func runTx[INPUT any](ctx context.Context, store Store, parameters Parameters[IN

return true, nil
})
return log, err
return output, err
}

// todo: handle too many clients error
// notes(gfyrag): how?
// By retrying? Is the server already overloaded? Add a limit on the retries number?
// Ask the client to retry later?
func forgeLog[INPUT any](ctx context.Context, store Store, parameters Parameters[INPUT], fn func(ctx context.Context, sqlTX TX, input INPUT) (*ledger.Log, error)) (*ledger.Log, error) {
func forgeLog[INPUT, OUTPUT any](ctx context.Context, store Store, parameters Parameters[INPUT], fn func(ctx context.Context, sqlTX TX, input INPUT) (*ledger.Log, *OUTPUT, error)) (*OUTPUT, error) {
if parameters.IdempotencyKey != "" {
log, err := store.ReadLogWithIdempotencyKey(ctx, parameters.IdempotencyKey)
if err != nil && !errors.Is(err, postgres.ErrNotFound) {
Expand All @@ -51,12 +55,12 @@ func forgeLog[INPUT any](ctx context.Context, store Store, parameters Parameters
return nil, newErrInvalidIdempotencyInputs(log.IdempotencyKey, log.IdempotencyHash, computedHash)
}

return log, nil
return pointer.For(log.Data.(OUTPUT)), nil
}
}

for {
log, err := runTx(ctx, store, parameters, fn)
output, err := runTx(ctx, store, parameters, fn)
if err != nil {
switch {
case errors.Is(err, postgres.ErrDeadlockDetected):
Expand All @@ -73,12 +77,12 @@ func forgeLog[INPUT any](ctx context.Context, store Store, parameters Parameters
return nil, err
}

return log, nil
return pointer.For(log.Data.(OUTPUT)), nil
default:
return nil, errors.Wrap(err, "unexpected error while forging log")
}
}

return log, nil
return output, nil
}
}
Loading

0 comments on commit 0a30efe

Please sign in to comment.