Skip to content

Commit

Permalink
feat: remove ExpandedTransaction and fetch pc(e)v when inserting moves
Browse files Browse the repository at this point in the history
  • Loading branch information
gfyrag committed Oct 23, 2024
1 parent e2fe629 commit 675f32c
Show file tree
Hide file tree
Showing 22 changed files with 390 additions and 339 deletions.
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ require (
github.com/onsi/gomega v1.34.2
github.com/pborman/uuid v1.2.1
github.com/pkg/errors v0.9.1
github.com/shomali11/xsql v0.0.0-20190608141458-bf76292144df
github.com/spf13/cobra v1.8.1
github.com/spf13/pflag v1.0.5
github.com/stretchr/testify v1.9.0
Expand Down Expand Up @@ -137,6 +138,7 @@ require (
github.com/rs/cors v1.11.1 // indirect
github.com/shirou/gopsutil/v4 v4.24.8 // indirect
github.com/shoenig/go-m1cpu v0.1.6 // indirect
github.com/shomali11/util v0.0.0-20180607005212-e0f70fd665ff // indirect
github.com/sirupsen/logrus v1.9.3 // indirect
github.com/tklauser/go-sysconf v0.3.14 // indirect
github.com/tklauser/numcpus v0.8.0 // indirect
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,10 @@ github.com/shoenig/go-m1cpu v0.1.6 h1:nxdKQNcEB6vzgA2E2bvzKIYRuNj7XNJ4S/aRSwKzFt
github.com/shoenig/go-m1cpu v0.1.6/go.mod h1:1JJMcUBvfNwpq05QDQVAnx3gUHr9IYF7GNg9SUEw2VQ=
github.com/shoenig/test v0.6.4 h1:kVTaSd7WLz5WZ2IaoM0RSzRsUD+m8wRR+5qvntpn4LU=
github.com/shoenig/test v0.6.4/go.mod h1:byHiCGXqrVaflBLAMq/srcZIHynQPQgeyvkvXnjqq0k=
github.com/shomali11/util v0.0.0-20180607005212-e0f70fd665ff h1:A47HTOEURe8GFXu/9ztnUzVgBBo0NlWoKmVPmfJ4LR8=
github.com/shomali11/util v0.0.0-20180607005212-e0f70fd665ff/go.mod h1:WWE2GJM9B5UpdOiwH2val10w/pvJ2cUUQOOA/4LgOng=
github.com/shomali11/xsql v0.0.0-20190608141458-bf76292144df h1:SVCDTuzM3KEk8WBwSSw7RTPLw9ajzBaXDg39Bo6xIeU=
github.com/shomali11/xsql v0.0.0-20190608141458-bf76292144df/go.mod h1:K8jR5lDI2MGs9Ky+X2jIF4MwIslI0L8o8ijIlEq7/Vw=
github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ=
github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ=
github.com/spf13/cobra v1.8.1 h1:e5/vxKd/rZsfSJMUX1agtjeTDf+qv1/JdBF8gg5k9ZM=
Expand Down
8 changes: 4 additions & 4 deletions internal/api/v1/controllers_transactions.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,14 @@ func mapTransactionToV1(tx ledger.Transaction) any {
}
}

func mapExpandedTransactionToV1(tx ledger.ExpandedTransaction) any {
func mapExpandedTransactionToV1(tx ledger.Transaction) any {
return struct {
ledger.ExpandedTransaction
ledger.Transaction
TxID int `json:"txid"`
ID int `json:"-"`
}{
ExpandedTransaction: tx,
TxID: tx.ID,
Transaction: tx,
TxID: tx.ID,
}
}

Expand Down
22 changes: 6 additions & 16 deletions internal/api/v1/controllers_transactions_list_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,21 +131,11 @@ func TestTransactionsList(t *testing.T) {
testCase.expectStatusCode = http.StatusOK
}

expectedCursor := bunpaginate.Cursor[ledger.ExpandedTransaction]{
Data: []ledger.ExpandedTransaction{
{
Transaction: ledger.NewTransaction().WithPostings(
ledger.NewPosting("world", "bank", "USD", big.NewInt(100)),
),
PostCommitVolumes: map[string]ledger.VolumesByAssets{
"world": {
"USD": ledger.NewEmptyVolumes().WithOutput(big.NewInt(100)),
},
"bank": {
"USD": ledger.NewEmptyVolumes().WithInput(big.NewInt(100)),
},
},
},
expectedCursor := bunpaginate.Cursor[ledger.Transaction]{
Data: []ledger.Transaction{
ledger.NewTransaction().WithPostings(
ledger.NewPosting("world", "bank", "USD", big.NewInt(100)),
),
},
}

Expand All @@ -166,7 +156,7 @@ func TestTransactionsList(t *testing.T) {

require.Equal(t, testCase.expectStatusCode, rec.Code)
if testCase.expectStatusCode < 300 && testCase.expectStatusCode >= 200 {
cursor := api.DecodeCursorResponse[ledger.ExpandedTransaction](t, rec.Body)
cursor := api.DecodeCursorResponse[ledger.Transaction](t, rec.Body)
require.Equal(t, expectedCursor, *cursor)
} else {
err := api.ErrorResponse{}
Expand Down
18 changes: 4 additions & 14 deletions internal/api/v1/controllers_transactions_read_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,9 @@ import (
func TestTransactionsRead(t *testing.T) {
t.Parallel()

tx := ledger.ExpandedTransaction{
Transaction: ledger.NewTransaction().WithPostings(
ledger.NewPosting("world", "bank", "USD", big.NewInt(100)),
),
PostCommitVolumes: map[string]ledger.VolumesByAssets{
"world": {
"USD": ledger.NewEmptyVolumes().WithOutput(big.NewInt(100)),
},
"bank": {
"USD": ledger.NewEmptyVolumes().WithInput(big.NewInt(100)),
},
},
}
tx := ledger.NewTransaction().WithPostings(
ledger.NewPosting("world", "bank", "USD", big.NewInt(100)),
)

systemController, ledgerController := newTestingSystemController(t, true)
ledgerController.EXPECT().
Expand All @@ -44,6 +34,6 @@ func TestTransactionsRead(t *testing.T) {
router.ServeHTTP(rec, req)

require.Equal(t, http.StatusOK, rec.Code)
response, _ := api.DecodeSingleResponse[ledger.ExpandedTransaction](t, rec.Body)
response, _ := api.DecodeSingleResponse[ledger.Transaction](t, rec.Body)
require.Equal(t, tx, response)
}
22 changes: 6 additions & 16 deletions internal/api/v2/controllers_transactions_list_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,21 +185,11 @@ func TestTransactionsList(t *testing.T) {
testCase.expectStatusCode = http.StatusOK
}

expectedCursor := bunpaginate.Cursor[ledger.ExpandedTransaction]{
Data: []ledger.ExpandedTransaction{
{
Transaction: ledger.NewTransaction().WithPostings(
ledger.NewPosting("world", "bank", "USD", big.NewInt(100)),
),
PostCommitVolumes: map[string]ledger.VolumesByAssets{
"world": {
"USD": ledger.NewEmptyVolumes().WithOutput(big.NewInt(100)),
},
"bank": {
"USD": ledger.NewEmptyVolumes().WithInput(big.NewInt(100)),
},
},
},
expectedCursor := bunpaginate.Cursor[ledger.Transaction]{
Data: []ledger.Transaction{
ledger.NewTransaction().WithPostings(
ledger.NewPosting("world", "bank", "USD", big.NewInt(100)),
),
},
}

Expand All @@ -225,7 +215,7 @@ func TestTransactionsList(t *testing.T) {

require.Equal(t, testCase.expectStatusCode, rec.Code)
if testCase.expectStatusCode < 300 && testCase.expectStatusCode >= 200 {
cursor := api.DecodeCursorResponse[ledger.ExpandedTransaction](t, rec.Body)
cursor := api.DecodeCursorResponse[ledger.Transaction](t, rec.Body)
require.Equal(t, expectedCursor, *cursor)
} else {
err := api.ErrorResponse{}
Expand Down
18 changes: 4 additions & 14 deletions internal/api/v2/controllers_transactions_read_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,9 @@ func TestTransactionsRead(t *testing.T) {

now := time.Now()

tx := ledger.ExpandedTransaction{
Transaction: ledger.NewTransaction().WithPostings(
ledger.NewPosting("world", "bank", "USD", big.NewInt(100)),
),
PostCommitVolumes: map[string]ledger.VolumesByAssets{
"world": {
"USD": ledger.NewEmptyVolumes().WithOutput(big.NewInt(100)),
},
"bank": {
"USD": ledger.NewEmptyVolumes().WithInput(big.NewInt(100)),
},
},
}
tx := ledger.NewTransaction().WithPostings(
ledger.NewPosting("world", "bank", "USD", big.NewInt(100)),
)

query := ledgercontroller.NewGetTransactionQuery(0)
query.PIT = &now
Expand All @@ -50,6 +40,6 @@ func TestTransactionsRead(t *testing.T) {
router.ServeHTTP(rec, req)

require.Equal(t, http.StatusOK, rec.Code)
response, _ := api.DecodeSingleResponse[ledger.ExpandedTransaction](t, rec.Body)
response, _ := api.DecodeSingleResponse[ledger.Transaction](t, rec.Body)
require.Equal(t, tx, response)
}
4 changes: 2 additions & 2 deletions internal/controller/ledger/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ type Controller interface {
CountAccounts(ctx context.Context, query ListAccountsQuery) (int, error)
ListLogs(ctx context.Context, query GetLogsQuery) (*bunpaginate.Cursor[ledger.Log], error)
CountTransactions(ctx context.Context, query ListTransactionsQuery) (int, error)
ListTransactions(ctx context.Context, query ListTransactionsQuery) (*bunpaginate.Cursor[ledger.ExpandedTransaction], error)
GetTransaction(ctx context.Context, query GetTransactionQuery) (*ledger.ExpandedTransaction, error)
ListTransactions(ctx context.Context, query ListTransactionsQuery) (*bunpaginate.Cursor[ledger.Transaction], error)
GetTransaction(ctx context.Context, query GetTransactionQuery) (*ledger.Transaction, error)
GetVolumesWithBalances(ctx context.Context, q GetVolumesWithBalancesQuery) (*bunpaginate.Cursor[ledger.VolumesWithBalanceByAssetByAccount], error)
GetAggregatedBalances(ctx context.Context, q GetAggregatedBalanceQuery) (ledger.BalancesByAssets, error)

Expand Down
8 changes: 4 additions & 4 deletions internal/controller/ledger/controller_default.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,8 +124,8 @@ func (ctrl *DefaultController) forgeLog(ctx context.Context, parameters Paramete
}
}

func (ctrl *DefaultController) ListTransactions(ctx context.Context, q ListTransactionsQuery) (*bunpaginate.Cursor[ledger.ExpandedTransaction], error) {
return tracing.Trace(ctx, "ListTransactions", func(ctx context.Context) (*bunpaginate.Cursor[ledger.ExpandedTransaction], error) {
func (ctrl *DefaultController) ListTransactions(ctx context.Context, q ListTransactionsQuery) (*bunpaginate.Cursor[ledger.Transaction], error) {
return tracing.Trace(ctx, "ListTransactions", func(ctx context.Context) (*bunpaginate.Cursor[ledger.Transaction], error) {
txs, err := ctrl.store.ListTransactions(ctx, q)
return txs, err
})
Expand All @@ -138,8 +138,8 @@ func (ctrl *DefaultController) CountTransactions(ctx context.Context, q ListTran
})
}

func (ctrl *DefaultController) GetTransaction(ctx context.Context, query GetTransactionQuery) (*ledger.ExpandedTransaction, error) {
return tracing.Trace(ctx, "GetTransaction", func(ctx context.Context) (*ledger.ExpandedTransaction, error) {
func (ctrl *DefaultController) GetTransaction(ctx context.Context, query GetTransactionQuery) (*ledger.Transaction, error) {
return tracing.Trace(ctx, "GetTransaction", func(ctx context.Context) (*ledger.Transaction, error) {
tx, err := ctrl.store.GetTransaction(ctx, query)
return tx, err
})
Expand Down
4 changes: 2 additions & 2 deletions internal/controller/ledger/controller_default_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ func TestListTransactions(t *testing.T) {
listener := NewMockListener(ctrl)
ctx := logging.TestingContext()

cursor := &bunpaginate.Cursor[ledger.ExpandedTransaction]{}
cursor := &bunpaginate.Cursor[ledger.Transaction]{}
query := NewListTransactionsQuery(NewPaginatedQueryOptions[PITFilterWithVolumes](PITFilterWithVolumes{}))
store.EXPECT().
ListTransactions(gomock.Any(), query).
Expand Down Expand Up @@ -229,7 +229,7 @@ func TestGetTransaction(t *testing.T) {
listener := NewMockListener(ctrl)
ctx := logging.TestingContext()

tx := ledger.ExpandedTransaction{}
tx := ledger.Transaction{}
query := NewGetTransactionQuery(0)
store.EXPECT().
GetTransaction(gomock.Any(), query).
Expand Down
8 changes: 4 additions & 4 deletions internal/controller/ledger/controller_generated.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions internal/controller/ledger/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,9 @@ type Store interface {
ListLogs(ctx context.Context, q GetLogsQuery) (*bunpaginate.Cursor[ledger.Log], error)
ReadLogWithIdempotencyKey(ctx context.Context, ik string) (*ledger.Log, error)

ListTransactions(ctx context.Context, q ListTransactionsQuery) (*bunpaginate.Cursor[ledger.ExpandedTransaction], error)
ListTransactions(ctx context.Context, q ListTransactionsQuery) (*bunpaginate.Cursor[ledger.Transaction], error)
CountTransactions(ctx context.Context, q ListTransactionsQuery) (int, error)
GetTransaction(ctx context.Context, query GetTransactionQuery) (*ledger.ExpandedTransaction, error)
GetTransaction(ctx context.Context, query GetTransactionQuery) (*ledger.Transaction, error)
CountAccounts(ctx context.Context, a ListAccountsQuery) (int, error)
ListAccounts(ctx context.Context, a ListAccountsQuery) (*bunpaginate.Cursor[ledger.ExpandedAccount], error)
GetAccount(ctx context.Context, q GetAccountQuery) (*ledger.ExpandedAccount, error)
Expand Down
8 changes: 4 additions & 4 deletions internal/controller/ledger/store_generated.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

26 changes: 26 additions & 0 deletions internal/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,32 @@ type NewTransactionLogPayload struct {
AccountMetadata AccountMetadata `json:"accountMetadata"`
}

// MarshalJSON override default json marshalling
// We don't want to store pc(v)e on the logs
// because :
// 1. It can change (effective only)
// 2. They are not part of the decision-making process
func (p NewTransactionLogPayload) MarshalJSON() ([]byte, error) {
// strip MarshalJSON of Transaction
type aux Transaction
type tx struct {
aux
PostCommitVolumes PostCommitVolumes `json:"postCommitVolumes,omitempty"`
PostCommitEffectiveVolumes PostCommitVolumes `json:"postCommitEffectiveVolumes,omitempty"`
}

return json.Marshal(struct {
Transaction tx `json:"transaction"`
AccountMetadata AccountMetadata `json:"accountMetadata"`
}{
Transaction: tx{
aux: aux(p.Transaction),
},
AccountMetadata: p.AccountMetadata,
})

}

func NewTransactionLog(tx Transaction, accountMetadata AccountMetadata) Log {
if accountMetadata == nil {
accountMetadata = AccountMetadata{}
Expand Down
2 changes: 1 addition & 1 deletion internal/storage/ledger/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"database/sql/driver"
"encoding/json"
"fmt"

"github.com/formancehq/ledger/internal/tracing"

"github.com/formancehq/go-libs/bun/bunpaginate"
Expand Down Expand Up @@ -73,6 +72,7 @@ func (s *Store) InsertLog(ctx context.Context, log *ledger.Log) error {
}

_, err := tracing.TraceWithLatency(ctx, "InsertLog", tracing.NoResult(func(ctx context.Context) error {

data, err := json.Marshal(log.Data)
if err != nil {
return errors.Wrap(err, "failed to marshal log data")
Expand Down
2 changes: 2 additions & 0 deletions internal/storage/ledger/logs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (
"github.com/stretchr/testify/require"
)

// todo: add log hash test with ledger v2

func TestInsertLog(t *testing.T) {
t.Parallel()

Expand Down
32 changes: 30 additions & 2 deletions internal/storage/ledger/moves.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,8 @@ type Move struct {
AccountSeq int `bun:"accounts_seq,type:int"`
InsertionDate time.Time `bun:"insertion_date,type:timestamp"`
EffectiveDate time.Time `bun:"effective_date,type:timestamp"`
PostCommitVolumes Volumes `bun:"post_commit_volumes,type:jsonb,scanonly"`
PostCommitEffectiveVolumes Volumes `bun:"post_commit_effective_volumes,type:jsonb,scanonly"`
PostCommitVolumes *Volumes `bun:"post_commit_volumes,type:jsonb,scanonly"`
PostCommitEffectiveVolumes *Volumes `bun:"post_commit_effective_volumes,type:jsonb,scanonly"`
}

type Moves []*Move
Expand All @@ -108,3 +108,31 @@ func (m Moves) BalanceUpdates() map[string]map[string]*big.Int {

return ret
}

func (m Moves) ComputePostCommitVolumes() TransactionsPostCommitVolumes {
ret := TransactionsPostCommitVolumes{}
for _, move := range m {
ret = append(ret, TransactionPostCommitVolume{
AggregatedAccountVolume: AggregatedAccountVolume{
Volumes: *move.PostCommitVolumes,
Asset: move.Asset,
},
Account: move.Account,
})
}
return ret
}

func (m Moves) ComputePostCommitEffectiveVolumes() TransactionsPostCommitVolumes {
ret := TransactionsPostCommitVolumes{}
for _, move := range m {
ret = append(ret, TransactionPostCommitVolume{
AggregatedAccountVolume: AggregatedAccountVolume{
Volumes: *move.PostCommitEffectiveVolumes,
Asset: move.Asset,
},
Account: move.Account,
})
}
return ret
}
Loading

0 comments on commit 675f32c

Please sign in to comment.