Skip to content

Commit

Permalink
feat: simplify some internals
Browse files Browse the repository at this point in the history
  • Loading branch information
gfyrag committed Oct 16, 2024
1 parent d807d38 commit 9e843fd
Show file tree
Hide file tree
Showing 2 changed files with 173 additions and 163 deletions.
325 changes: 165 additions & 160 deletions internal/controller/ledger/controller_default.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,12 @@ type DefaultController struct {
executeMachineHistogram metric.Int64Histogram
deadLockCounter metric.Int64Counter

createTransactionLp *logProcessor[RunScript, ledger.CreatedTransaction]
revertTransactionLp *logProcessor[RevertTransaction, ledger.RevertedTransaction]
saveTransactionMetadataLp *logProcessor[SaveTransactionMetadata, ledger.SavedMetadata]
saveAccountMetadataLp *logProcessor[SaveAccountMetadata, ledger.SavedMetadata]
deleteTransactionMetadata *logProcessor[DeleteTransactionMetadata, ledger.DeletedMetadata]
deleteAccountMetadata *logProcessor[DeleteAccountMetadata, ledger.DeletedMetadata]
createTransactionLp *logProcessor[RunScript, ledger.CreatedTransaction]
revertTransactionLp *logProcessor[RevertTransaction, ledger.RevertedTransaction]
saveTransactionMetadataLp *logProcessor[SaveTransactionMetadata, ledger.SavedMetadata]
saveAccountMetadataLp *logProcessor[SaveAccountMetadata, ledger.SavedMetadata]
deleteTransactionMetadataLp *logProcessor[DeleteTransactionMetadata, ledger.DeletedMetadata]
deleteAccountMetadataLp *logProcessor[DeleteAccountMetadata, ledger.DeletedMetadata]
}

func NewDefaultController(
Expand Down Expand Up @@ -78,8 +78,8 @@ func NewDefaultController(
ret.revertTransactionLp = newLogProcessor[RevertTransaction, ledger.RevertedTransaction]("RevertTransaction", ret.deadLockCounter)
ret.saveTransactionMetadataLp = newLogProcessor[SaveTransactionMetadata, ledger.SavedMetadata]("SaveTransactionMetadata", ret.deadLockCounter)
ret.saveAccountMetadataLp = newLogProcessor[SaveAccountMetadata, ledger.SavedMetadata]("SaveAccountMetadata", ret.deadLockCounter)
ret.deleteTransactionMetadata = newLogProcessor[DeleteTransactionMetadata, ledger.DeletedMetadata]("DeleteTransactionMetadata", ret.deadLockCounter)
ret.deleteAccountMetadata = newLogProcessor[DeleteAccountMetadata, ledger.DeletedMetadata]("DeleteAccountMetadata", ret.deadLockCounter)
ret.deleteTransactionMetadataLp = newLogProcessor[DeleteTransactionMetadata, ledger.DeletedMetadata]("DeleteTransactionMetadata", ret.deadLockCounter)
ret.deleteAccountMetadataLp = newLogProcessor[DeleteAccountMetadata, ledger.DeletedMetadata]("DeleteAccountMetadata", ret.deadLockCounter)

return ret
}
Expand Down Expand Up @@ -245,7 +245,7 @@ func (ctrl *DefaultController) GetVolumesWithBalances(ctx context.Context, q Get
return ctrl.store.GetVolumesWithBalances(ctx, q)
}

func (ctrl *DefaultController) CreateTransaction(ctx context.Context, parameters Parameters[RunScript]) (*ledger.CreatedTransaction, error) {
func (ctrl *DefaultController) createTransaction(ctx context.Context, sqlTX TX, parameters Parameters[RunScript]) (*ledger.CreatedTransaction, error) {

logger := logging.FromContext(ctx).WithField("req", uuid.NewString()[:8])
ctx = logging.ContextWithLogger(ctx, logger)
Expand All @@ -255,196 +255,201 @@ func (ctrl *DefaultController) CreateTransaction(ctx context.Context, parameters
return nil, fmt.Errorf("failed to compile script: %w", err)
}

output, err := ctrl.createTransactionLp.forgeLog(ctx, ctrl.store, parameters, func(ctx context.Context, sqlTX TX, input RunScript) (*ledger.CreatedTransaction, error) {
result, err := tracing.TraceWithMetric(
ctx,
"ExecuteMachine",
ctrl.tracer,
ctrl.executeMachineHistogram,
func(ctx context.Context) (*MachineResult, error) {
return m.Execute(ctx, sqlTX, input.Vars)
},
)
if err != nil {
return nil, fmt.Errorf("failed to execute program: %w", err)
}

if len(result.Postings) == 0 {
return nil, ErrNoPostings
}

finalMetadata := result.Metadata
if finalMetadata == nil {
finalMetadata = metadata.Metadata{}
}
for k, v := range input.Metadata {
if finalMetadata[k] != "" {
return nil, newErrMetadataOverride(k)
}
finalMetadata[k] = v
}
result, err := tracing.TraceWithMetric(
ctx,
"ExecuteMachine",
ctrl.tracer,
ctrl.executeMachineHistogram,
func(ctx context.Context) (*MachineResult, error) {
return m.Execute(ctx, sqlTX, parameters.Input.Vars)
},
)
if err != nil {
return nil, fmt.Errorf("failed to execute program: %w", err)
}

transaction := ledger.NewTransaction().
WithPostings(result.Postings...).
WithMetadata(finalMetadata).
WithTimestamp(input.Timestamp).
WithReference(input.Reference)
err = sqlTX.CommitTransaction(ctx, &transaction)
if err != nil {
return nil, err
}
if len(result.Postings) == 0 {
return nil, ErrNoPostings
}

if len(result.AccountMetadata) > 0 {
if err := sqlTX.UpdateAccountsMetadata(ctx, result.AccountMetadata); err != nil {
return nil, fmt.Errorf("updating metadata of account '%s': %w", Keys(result.AccountMetadata), err)
}
finalMetadata := result.Metadata
if finalMetadata == nil {
finalMetadata = metadata.Metadata{}
}
for k, v := range parameters.Input.Metadata {
if finalMetadata[k] != "" {
return nil, newErrMetadataOverride(k)
}
finalMetadata[k] = v
}

return &ledger.CreatedTransaction{
Transaction: transaction,
AccountMetadata: result.AccountMetadata,
}, err
})
transaction := ledger.NewTransaction().
WithPostings(result.Postings...).
WithMetadata(finalMetadata).
WithTimestamp(parameters.Input.Timestamp).
WithReference(parameters.Input.Reference)
err = sqlTX.CommitTransaction(ctx, &transaction)
if err != nil {
return nil, err
}

return output, nil
if len(result.AccountMetadata) > 0 {
if err := sqlTX.UpdateAccountsMetadata(ctx, result.AccountMetadata); err != nil {
return nil, fmt.Errorf("updating metadata of account '%s': %w", Keys(result.AccountMetadata), err)
}
}

return &ledger.CreatedTransaction{
Transaction: transaction,
AccountMetadata: result.AccountMetadata,
}, err
}

func (ctrl *DefaultController) RevertTransaction(ctx context.Context, parameters Parameters[RevertTransaction]) (*ledger.RevertedTransaction, error) {
return ctrl.revertTransactionLp.forgeLog(ctx, ctrl.store, parameters, func(ctx context.Context, sqlTX TX, input RevertTransaction) (*ledger.RevertedTransaction, error) {
func (ctrl *DefaultController) CreateTransaction(ctx context.Context, parameters Parameters[RunScript]) (*ledger.CreatedTransaction, error) {
return ctrl.createTransactionLp.forgeLog(ctx, ctrl.store, parameters, ctrl.createTransaction)
}

var (
hasBeenReverted bool
err error
)
originalTransaction, hasBeenReverted, err := sqlTX.RevertTransaction(ctx, input.TransactionID)
if err != nil {
return nil, err
}
if !hasBeenReverted {
return nil, newErrAlreadyReverted(input.TransactionID)
}
func (ctrl *DefaultController) revertTransaction(ctx context.Context, sqlTX TX, parameters Parameters[RevertTransaction]) (*ledger.RevertedTransaction, error) {
var (
hasBeenReverted bool
err error
)
originalTransaction, hasBeenReverted, err := sqlTX.RevertTransaction(ctx, parameters.Input.TransactionID)
if err != nil {
return nil, err
}
if !hasBeenReverted {
return nil, newErrAlreadyReverted(parameters.Input.TransactionID)
}

bq := originalTransaction.InvolvedAccountAndAssets()
bq := originalTransaction.InvolvedAccountAndAssets()

balances, err := sqlTX.GetBalances(ctx, bq)
if err != nil {
return nil, fmt.Errorf("failed to get balances: %w", err)
}
balances, err := sqlTX.GetBalances(ctx, bq)
if err != nil {
return nil, fmt.Errorf("failed to get balances: %w", err)
}

reversedTx := originalTransaction.Reverse()
if input.AtEffectiveDate {
reversedTx = reversedTx.WithTimestamp(originalTransaction.Timestamp)
} else {
reversedTx = reversedTx.WithTimestamp(*originalTransaction.RevertedAt)
}
reversedTx := originalTransaction.Reverse()
if parameters.Input.AtEffectiveDate {
reversedTx = reversedTx.WithTimestamp(originalTransaction.Timestamp)
} else {
reversedTx = reversedTx.WithTimestamp(*originalTransaction.RevertedAt)
}

// Check balances after the revert, all balances must be greater than 0
if !input.Force {
for _, posting := range reversedTx.Postings {
balances[posting.Source][posting.Asset] = balances[posting.Source][posting.Asset].Add(
balances[posting.Source][posting.Asset],
big.NewInt(0).Neg(posting.Amount),
)
balances[posting.Destination][posting.Destination] = balances[posting.Destination][posting.Asset].Add(
balances[posting.Destination][posting.Asset],
big.NewInt(0).Set(posting.Amount),
)
}
// Check balances after the revert, all balances must be greater than 0
if !parameters.Input.Force {
for _, posting := range reversedTx.Postings {
balances[posting.Source][posting.Asset] = balances[posting.Source][posting.Asset].Add(
balances[posting.Source][posting.Asset],
big.NewInt(0).Neg(posting.Amount),
)
balances[posting.Destination][posting.Destination] = balances[posting.Destination][posting.Asset].Add(
balances[posting.Destination][posting.Asset],
big.NewInt(0).Set(posting.Amount),
)
}

for account, forAccount := range balances {
for asset, finalBalance := range forAccount {
if finalBalance.Cmp(new(big.Int)) < 0 {
// todo(waiting): break dependency on machine package
// notes(gfyrag): wait for the new interpreter
return nil, machine.NewErrInsufficientFund("insufficient fund for %s/%s", account, asset)
}
for account, forAccount := range balances {
for asset, finalBalance := range forAccount {
if finalBalance.Cmp(new(big.Int)) < 0 {
// todo(waiting): break dependency on machine package
// notes(gfyrag): wait for the new interpreter
return nil, machine.NewErrInsufficientFund("insufficient fund for %s/%s", account, asset)
}
}
}
}

err = sqlTX.CommitTransaction(ctx, &reversedTx)
if err != nil {
return nil, fmt.Errorf("failed to insert transaction: %w", err)
}
err = sqlTX.CommitTransaction(ctx, &reversedTx)
if err != nil {
return nil, fmt.Errorf("failed to insert transaction: %w", err)
}

return &ledger.RevertedTransaction{
RevertedTransaction: *originalTransaction,
RevertTransaction: reversedTx,
}, nil
})
return &ledger.RevertedTransaction{
RevertedTransaction: *originalTransaction,
RevertTransaction: reversedTx,
}, nil
}

func (ctrl *DefaultController) SaveTransactionMetadata(ctx context.Context, parameters Parameters[SaveTransactionMetadata]) error {
_, err := ctrl.saveTransactionMetadataLp.forgeLog(ctx, ctrl.store, parameters, func(ctx context.Context, sqlTX TX, input SaveTransactionMetadata) (*ledger.SavedMetadata, error) {
if _, _, err := sqlTX.UpdateTransactionMetadata(ctx, input.TransactionID, input.Metadata); err != nil {
return nil, err
}
func (ctrl *DefaultController) RevertTransaction(ctx context.Context, parameters Parameters[RevertTransaction]) (*ledger.RevertedTransaction, error) {
return ctrl.revertTransactionLp.forgeLog(ctx, ctrl.store, parameters, ctrl.revertTransaction)
}

return &ledger.SavedMetadata{
TargetType: ledger.MetaTargetTypeTransaction,
TargetID: parameters.Input.TransactionID,
Metadata: parameters.Input.Metadata,
}, nil
})
func (ctrl *DefaultController) saveTransactionMetadata(ctx context.Context, sqlTX TX, parameters Parameters[SaveTransactionMetadata]) (*ledger.SavedMetadata, error) {
if _, _, err := sqlTX.UpdateTransactionMetadata(ctx, parameters.Input.TransactionID, parameters.Input.Metadata); err != nil {
return nil, err
}

return &ledger.SavedMetadata{
TargetType: ledger.MetaTargetTypeTransaction,
TargetID: parameters.Input.TransactionID,
Metadata: parameters.Input.Metadata,
}, nil
}

func (ctrl *DefaultController) SaveTransactionMetadata(ctx context.Context, parameters Parameters[SaveTransactionMetadata]) error {
_, err := ctrl.saveTransactionMetadataLp.forgeLog(ctx, ctrl.store, parameters, ctrl.saveTransactionMetadata)
return err
}

func (ctrl *DefaultController) SaveAccountMetadata(ctx context.Context, parameters Parameters[SaveAccountMetadata]) error {
_, err := ctrl.saveAccountMetadataLp.forgeLog(ctx, ctrl.store, parameters, func(ctx context.Context, sqlTX TX, input SaveAccountMetadata) (*ledger.SavedMetadata, error) {
if _, err := sqlTX.UpsertAccount(ctx, &ledger.Account{
Address: input.Address,
Metadata: input.Metadata,
}); err != nil {
return nil, err
}
func (ctrl *DefaultController) saveAccountMetadata(ctx context.Context, sqlTX TX, parameters Parameters[SaveAccountMetadata]) (*ledger.SavedMetadata, error) {
if _, err := sqlTX.UpsertAccount(ctx, &ledger.Account{
Address: parameters.Input.Address,
Metadata: parameters.Input.Metadata,
}); err != nil {
return nil, err
}

return &ledger.SavedMetadata{
TargetType: ledger.MetaTargetTypeAccount,
TargetID: parameters.Input.Address,
Metadata: parameters.Input.Metadata,
}, nil
})
return &ledger.SavedMetadata{
TargetType: ledger.MetaTargetTypeAccount,
TargetID: parameters.Input.Address,
Metadata: parameters.Input.Metadata,
}, nil
}

func (ctrl *DefaultController) SaveAccountMetadata(ctx context.Context, parameters Parameters[SaveAccountMetadata]) error {
_, err := ctrl.saveAccountMetadataLp.forgeLog(ctx, ctrl.store, parameters, ctrl.saveAccountMetadata)

return err
}

func (ctrl *DefaultController) DeleteTransactionMetadata(ctx context.Context, parameters Parameters[DeleteTransactionMetadata]) error {
_, err := ctrl.deleteTransactionMetadata.forgeLog(ctx, ctrl.store, parameters, func(ctx context.Context, sqlTX TX, input DeleteTransactionMetadata) (*ledger.DeletedMetadata, error) {
_, modified, err := sqlTX.DeleteTransactionMetadata(ctx, input.TransactionID, input.Key)
if err != nil {
return nil, err
}
func (ctrl *DefaultController) deleteTransactionMetadata(ctx context.Context, sqlTX TX, parameters Parameters[DeleteTransactionMetadata]) (*ledger.DeletedMetadata, error) {
_, modified, err := sqlTX.DeleteTransactionMetadata(ctx, parameters.Input.TransactionID, parameters.Input.Key)
if err != nil {
return nil, err
}

if !modified {
return nil, postgres.ErrNotFound
}
if !modified {
return nil, postgres.ErrNotFound
}

return &ledger.DeletedMetadata{
TargetType: ledger.MetaTargetTypeTransaction,
TargetID: parameters.Input.TransactionID,
Key: parameters.Input.Key,
}, nil
})
return &ledger.DeletedMetadata{
TargetType: ledger.MetaTargetTypeTransaction,
TargetID: parameters.Input.TransactionID,
Key: parameters.Input.Key,
}, nil
}

func (ctrl *DefaultController) DeleteTransactionMetadata(ctx context.Context, parameters Parameters[DeleteTransactionMetadata]) error {
_, err := ctrl.deleteTransactionMetadataLp.forgeLog(ctx, ctrl.store, parameters, ctrl.deleteTransactionMetadata)
return err
}

func (ctrl *DefaultController) DeleteAccountMetadata(ctx context.Context, parameters Parameters[DeleteAccountMetadata]) error {
_, err := ctrl.deleteAccountMetadata.forgeLog(ctx, ctrl.store, parameters, func(ctx context.Context, sqlTX TX, input DeleteAccountMetadata) (*ledger.DeletedMetadata, error) {
err := sqlTX.DeleteAccountMetadata(ctx, input.Address, input.Key)
if err != nil {
return nil, err
}
func (ctrl *DefaultController) deleteAccountMetadata(ctx context.Context, sqlTX TX, parameters Parameters[DeleteAccountMetadata]) (*ledger.DeletedMetadata, error) {
err := sqlTX.DeleteAccountMetadata(ctx, parameters.Input.Address, parameters.Input.Key)
if err != nil {
return nil, err
}

return &ledger.DeletedMetadata{
TargetType: ledger.MetaTargetTypeAccount,
TargetID: parameters.Input.Address,
Key: parameters.Input.Key,
}, nil
})
return &ledger.DeletedMetadata{
TargetType: ledger.MetaTargetTypeAccount,
TargetID: parameters.Input.Address,
Key: parameters.Input.Key,
}, nil
}

func (ctrl *DefaultController) DeleteAccountMetadata(ctx context.Context, parameters Parameters[DeleteAccountMetadata]) error {
_, err := ctrl.deleteAccountMetadataLp.forgeLog(ctx, ctrl.store, parameters, ctrl.deleteAccountMetadata)
return err
}

Expand Down
Loading

0 comments on commit 9e843fd

Please sign in to comment.