diff --git a/go.mod b/go.mod index e8edefb61..3661ced15 100644 --- a/go.mod +++ b/go.mod @@ -30,6 +30,7 @@ require ( github.com/onsi/gomega v1.34.2 github.com/pborman/uuid v1.2.1 github.com/pkg/errors v0.9.1 + github.com/shomali11/xsql v0.0.0-20190608141458-bf76292144df github.com/spf13/cobra v1.8.1 github.com/spf13/pflag v1.0.5 github.com/stretchr/testify v1.9.0 @@ -137,6 +138,7 @@ require ( github.com/rs/cors v1.11.1 // indirect github.com/shirou/gopsutil/v4 v4.24.8 // indirect github.com/shoenig/go-m1cpu v0.1.6 // indirect + github.com/shomali11/util v0.0.0-20180607005212-e0f70fd665ff // indirect github.com/sirupsen/logrus v1.9.3 // indirect github.com/tklauser/go-sysconf v0.3.14 // indirect github.com/tklauser/numcpus v0.8.0 // indirect diff --git a/go.sum b/go.sum index 74e03e712..50d8d5cbc 100644 --- a/go.sum +++ b/go.sum @@ -270,6 +270,10 @@ github.com/shoenig/go-m1cpu v0.1.6 h1:nxdKQNcEB6vzgA2E2bvzKIYRuNj7XNJ4S/aRSwKzFt github.com/shoenig/go-m1cpu v0.1.6/go.mod h1:1JJMcUBvfNwpq05QDQVAnx3gUHr9IYF7GNg9SUEw2VQ= github.com/shoenig/test v0.6.4 h1:kVTaSd7WLz5WZ2IaoM0RSzRsUD+m8wRR+5qvntpn4LU= github.com/shoenig/test v0.6.4/go.mod h1:byHiCGXqrVaflBLAMq/srcZIHynQPQgeyvkvXnjqq0k= +github.com/shomali11/util v0.0.0-20180607005212-e0f70fd665ff h1:A47HTOEURe8GFXu/9ztnUzVgBBo0NlWoKmVPmfJ4LR8= +github.com/shomali11/util v0.0.0-20180607005212-e0f70fd665ff/go.mod h1:WWE2GJM9B5UpdOiwH2val10w/pvJ2cUUQOOA/4LgOng= +github.com/shomali11/xsql v0.0.0-20190608141458-bf76292144df h1:SVCDTuzM3KEk8WBwSSw7RTPLw9ajzBaXDg39Bo6xIeU= +github.com/shomali11/xsql v0.0.0-20190608141458-bf76292144df/go.mod h1:K8jR5lDI2MGs9Ky+X2jIF4MwIslI0L8o8ijIlEq7/Vw= github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ= github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ= github.com/spf13/cobra v1.8.1 h1:e5/vxKd/rZsfSJMUX1agtjeTDf+qv1/JdBF8gg5k9ZM= diff --git a/internal/api/v1/controllers_transactions.go b/internal/api/v1/controllers_transactions.go index 015bb03f1..7cc1c3d97 100644 --- a/internal/api/v1/controllers_transactions.go +++ b/internal/api/v1/controllers_transactions.go @@ -19,14 +19,14 @@ func mapTransactionToV1(tx ledger.Transaction) any { } } -func mapExpandedTransactionToV1(tx ledger.ExpandedTransaction) any { +func mapExpandedTransactionToV1(tx ledger.Transaction) any { return struct { - ledger.ExpandedTransaction + ledger.Transaction TxID int `json:"txid"` ID int `json:"-"` }{ - ExpandedTransaction: tx, - TxID: tx.ID, + Transaction: tx, + TxID: tx.ID, } } diff --git a/internal/api/v1/controllers_transactions_list_test.go b/internal/api/v1/controllers_transactions_list_test.go index 461eb7ccb..cc18eeb6f 100644 --- a/internal/api/v1/controllers_transactions_list_test.go +++ b/internal/api/v1/controllers_transactions_list_test.go @@ -131,21 +131,11 @@ func TestTransactionsList(t *testing.T) { testCase.expectStatusCode = http.StatusOK } - expectedCursor := bunpaginate.Cursor[ledger.ExpandedTransaction]{ - Data: []ledger.ExpandedTransaction{ - { - Transaction: ledger.NewTransaction().WithPostings( - ledger.NewPosting("world", "bank", "USD", big.NewInt(100)), - ), - PostCommitVolumes: map[string]ledger.VolumesByAssets{ - "world": { - "USD": ledger.NewEmptyVolumes().WithOutput(big.NewInt(100)), - }, - "bank": { - "USD": ledger.NewEmptyVolumes().WithInput(big.NewInt(100)), - }, - }, - }, + expectedCursor := bunpaginate.Cursor[ledger.Transaction]{ + Data: []ledger.Transaction{ + ledger.NewTransaction().WithPostings( + ledger.NewPosting("world", "bank", "USD", big.NewInt(100)), + ), }, } @@ -166,7 +156,7 @@ func TestTransactionsList(t *testing.T) { require.Equal(t, testCase.expectStatusCode, rec.Code) if testCase.expectStatusCode < 300 && testCase.expectStatusCode >= 200 { - cursor := api.DecodeCursorResponse[ledger.ExpandedTransaction](t, rec.Body) + cursor := api.DecodeCursorResponse[ledger.Transaction](t, rec.Body) require.Equal(t, expectedCursor, *cursor) } else { err := api.ErrorResponse{} diff --git a/internal/api/v1/controllers_transactions_read_test.go b/internal/api/v1/controllers_transactions_read_test.go index 07d6ed745..bb856e9ce 100644 --- a/internal/api/v1/controllers_transactions_read_test.go +++ b/internal/api/v1/controllers_transactions_read_test.go @@ -17,19 +17,9 @@ import ( func TestTransactionsRead(t *testing.T) { t.Parallel() - tx := ledger.ExpandedTransaction{ - Transaction: ledger.NewTransaction().WithPostings( - ledger.NewPosting("world", "bank", "USD", big.NewInt(100)), - ), - PostCommitVolumes: map[string]ledger.VolumesByAssets{ - "world": { - "USD": ledger.NewEmptyVolumes().WithOutput(big.NewInt(100)), - }, - "bank": { - "USD": ledger.NewEmptyVolumes().WithInput(big.NewInt(100)), - }, - }, - } + tx := ledger.NewTransaction().WithPostings( + ledger.NewPosting("world", "bank", "USD", big.NewInt(100)), + ) systemController, ledgerController := newTestingSystemController(t, true) ledgerController.EXPECT(). @@ -44,6 +34,6 @@ func TestTransactionsRead(t *testing.T) { router.ServeHTTP(rec, req) require.Equal(t, http.StatusOK, rec.Code) - response, _ := api.DecodeSingleResponse[ledger.ExpandedTransaction](t, rec.Body) + response, _ := api.DecodeSingleResponse[ledger.Transaction](t, rec.Body) require.Equal(t, tx, response) } diff --git a/internal/api/v2/controllers_transactions_list_test.go b/internal/api/v2/controllers_transactions_list_test.go index ff2c52504..66eb5a4b5 100644 --- a/internal/api/v2/controllers_transactions_list_test.go +++ b/internal/api/v2/controllers_transactions_list_test.go @@ -185,21 +185,11 @@ func TestTransactionsList(t *testing.T) { testCase.expectStatusCode = http.StatusOK } - expectedCursor := bunpaginate.Cursor[ledger.ExpandedTransaction]{ - Data: []ledger.ExpandedTransaction{ - { - Transaction: ledger.NewTransaction().WithPostings( - ledger.NewPosting("world", "bank", "USD", big.NewInt(100)), - ), - PostCommitVolumes: map[string]ledger.VolumesByAssets{ - "world": { - "USD": ledger.NewEmptyVolumes().WithOutput(big.NewInt(100)), - }, - "bank": { - "USD": ledger.NewEmptyVolumes().WithInput(big.NewInt(100)), - }, - }, - }, + expectedCursor := bunpaginate.Cursor[ledger.Transaction]{ + Data: []ledger.Transaction{ + ledger.NewTransaction().WithPostings( + ledger.NewPosting("world", "bank", "USD", big.NewInt(100)), + ), }, } @@ -225,7 +215,7 @@ func TestTransactionsList(t *testing.T) { require.Equal(t, testCase.expectStatusCode, rec.Code) if testCase.expectStatusCode < 300 && testCase.expectStatusCode >= 200 { - cursor := api.DecodeCursorResponse[ledger.ExpandedTransaction](t, rec.Body) + cursor := api.DecodeCursorResponse[ledger.Transaction](t, rec.Body) require.Equal(t, expectedCursor, *cursor) } else { err := api.ErrorResponse{} diff --git a/internal/api/v2/controllers_transactions_read_test.go b/internal/api/v2/controllers_transactions_read_test.go index 7132d82c6..7e47f8376 100644 --- a/internal/api/v2/controllers_transactions_read_test.go +++ b/internal/api/v2/controllers_transactions_read_test.go @@ -20,19 +20,9 @@ func TestTransactionsRead(t *testing.T) { now := time.Now() - tx := ledger.ExpandedTransaction{ - Transaction: ledger.NewTransaction().WithPostings( - ledger.NewPosting("world", "bank", "USD", big.NewInt(100)), - ), - PostCommitVolumes: map[string]ledger.VolumesByAssets{ - "world": { - "USD": ledger.NewEmptyVolumes().WithOutput(big.NewInt(100)), - }, - "bank": { - "USD": ledger.NewEmptyVolumes().WithInput(big.NewInt(100)), - }, - }, - } + tx := ledger.NewTransaction().WithPostings( + ledger.NewPosting("world", "bank", "USD", big.NewInt(100)), + ) query := ledgercontroller.NewGetTransactionQuery(0) query.PIT = &now @@ -50,6 +40,6 @@ func TestTransactionsRead(t *testing.T) { router.ServeHTTP(rec, req) require.Equal(t, http.StatusOK, rec.Code) - response, _ := api.DecodeSingleResponse[ledger.ExpandedTransaction](t, rec.Body) + response, _ := api.DecodeSingleResponse[ledger.Transaction](t, rec.Body) require.Equal(t, tx, response) } diff --git a/internal/controller/ledger/controller.go b/internal/controller/ledger/controller.go index c5e6502d4..286a92f4f 100644 --- a/internal/controller/ledger/controller.go +++ b/internal/controller/ledger/controller.go @@ -23,8 +23,8 @@ type Controller interface { CountAccounts(ctx context.Context, query ListAccountsQuery) (int, error) ListLogs(ctx context.Context, query GetLogsQuery) (*bunpaginate.Cursor[ledger.Log], error) CountTransactions(ctx context.Context, query ListTransactionsQuery) (int, error) - ListTransactions(ctx context.Context, query ListTransactionsQuery) (*bunpaginate.Cursor[ledger.ExpandedTransaction], error) - GetTransaction(ctx context.Context, query GetTransactionQuery) (*ledger.ExpandedTransaction, error) + ListTransactions(ctx context.Context, query ListTransactionsQuery) (*bunpaginate.Cursor[ledger.Transaction], error) + GetTransaction(ctx context.Context, query GetTransactionQuery) (*ledger.Transaction, error) GetVolumesWithBalances(ctx context.Context, q GetVolumesWithBalancesQuery) (*bunpaginate.Cursor[ledger.VolumesWithBalanceByAssetByAccount], error) GetAggregatedBalances(ctx context.Context, q GetAggregatedBalanceQuery) (ledger.BalancesByAssets, error) diff --git a/internal/controller/ledger/controller_default.go b/internal/controller/ledger/controller_default.go index 69c948f48..d6c4bb6b3 100644 --- a/internal/controller/ledger/controller_default.go +++ b/internal/controller/ledger/controller_default.go @@ -124,8 +124,8 @@ func (ctrl *DefaultController) forgeLog(ctx context.Context, parameters Paramete } } -func (ctrl *DefaultController) ListTransactions(ctx context.Context, q ListTransactionsQuery) (*bunpaginate.Cursor[ledger.ExpandedTransaction], error) { - return tracing.Trace(ctx, "ListTransactions", func(ctx context.Context) (*bunpaginate.Cursor[ledger.ExpandedTransaction], error) { +func (ctrl *DefaultController) ListTransactions(ctx context.Context, q ListTransactionsQuery) (*bunpaginate.Cursor[ledger.Transaction], error) { + return tracing.Trace(ctx, "ListTransactions", func(ctx context.Context) (*bunpaginate.Cursor[ledger.Transaction], error) { txs, err := ctrl.store.ListTransactions(ctx, q) return txs, err }) @@ -138,8 +138,8 @@ func (ctrl *DefaultController) CountTransactions(ctx context.Context, q ListTran }) } -func (ctrl *DefaultController) GetTransaction(ctx context.Context, query GetTransactionQuery) (*ledger.ExpandedTransaction, error) { - return tracing.Trace(ctx, "GetTransaction", func(ctx context.Context) (*ledger.ExpandedTransaction, error) { +func (ctrl *DefaultController) GetTransaction(ctx context.Context, query GetTransactionQuery) (*ledger.Transaction, error) { + return tracing.Trace(ctx, "GetTransaction", func(ctx context.Context) (*ledger.Transaction, error) { tx, err := ctrl.store.GetTransaction(ctx, query) return tx, err }) diff --git a/internal/controller/ledger/controller_default_test.go b/internal/controller/ledger/controller_default_test.go index 7d6d9cd4c..fe3338745 100644 --- a/internal/controller/ledger/controller_default_test.go +++ b/internal/controller/ledger/controller_default_test.go @@ -192,7 +192,7 @@ func TestListTransactions(t *testing.T) { listener := NewMockListener(ctrl) ctx := logging.TestingContext() - cursor := &bunpaginate.Cursor[ledger.ExpandedTransaction]{} + cursor := &bunpaginate.Cursor[ledger.Transaction]{} query := NewListTransactionsQuery(NewPaginatedQueryOptions[PITFilterWithVolumes](PITFilterWithVolumes{})) store.EXPECT(). ListTransactions(gomock.Any(), query). @@ -229,7 +229,7 @@ func TestGetTransaction(t *testing.T) { listener := NewMockListener(ctrl) ctx := logging.TestingContext() - tx := ledger.ExpandedTransaction{} + tx := ledger.Transaction{} query := NewGetTransactionQuery(0) store.EXPECT(). GetTransaction(gomock.Any(), query). diff --git a/internal/controller/ledger/controller_generated.go b/internal/controller/ledger/controller_generated.go index 2586a29dc..dbf93e7be 100644 --- a/internal/controller/ledger/controller_generated.go +++ b/internal/controller/ledger/controller_generated.go @@ -191,10 +191,10 @@ func (mr *MockControllerMockRecorder) GetStats(ctx any) *gomock.Call { } // GetTransaction mocks base method. -func (m *MockController) GetTransaction(ctx context.Context, query GetTransactionQuery) (*ledger.ExpandedTransaction, error) { +func (m *MockController) GetTransaction(ctx context.Context, query GetTransactionQuery) (*ledger.Transaction, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "GetTransaction", ctx, query) - ret0, _ := ret[0].(*ledger.ExpandedTransaction) + ret0, _ := ret[0].(*ledger.Transaction) ret1, _ := ret[1].(error) return ret0, ret1 } @@ -280,10 +280,10 @@ func (mr *MockControllerMockRecorder) ListLogs(ctx, query any) *gomock.Call { } // ListTransactions mocks base method. -func (m *MockController) ListTransactions(ctx context.Context, query ListTransactionsQuery) (*bunpaginate.Cursor[ledger.ExpandedTransaction], error) { +func (m *MockController) ListTransactions(ctx context.Context, query ListTransactionsQuery) (*bunpaginate.Cursor[ledger.Transaction], error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "ListTransactions", ctx, query) - ret0, _ := ret[0].(*bunpaginate.Cursor[ledger.ExpandedTransaction]) + ret0, _ := ret[0].(*bunpaginate.Cursor[ledger.Transaction]) ret1, _ := ret[1].(error) return ret0, ret1 } diff --git a/internal/controller/ledger/store.go b/internal/controller/ledger/store.go index 91f9fca6d..2f9fd7f78 100644 --- a/internal/controller/ledger/store.go +++ b/internal/controller/ledger/store.go @@ -52,9 +52,9 @@ type Store interface { ListLogs(ctx context.Context, q GetLogsQuery) (*bunpaginate.Cursor[ledger.Log], error) ReadLogWithIdempotencyKey(ctx context.Context, ik string) (*ledger.Log, error) - ListTransactions(ctx context.Context, q ListTransactionsQuery) (*bunpaginate.Cursor[ledger.ExpandedTransaction], error) + ListTransactions(ctx context.Context, q ListTransactionsQuery) (*bunpaginate.Cursor[ledger.Transaction], error) CountTransactions(ctx context.Context, q ListTransactionsQuery) (int, error) - GetTransaction(ctx context.Context, query GetTransactionQuery) (*ledger.ExpandedTransaction, error) + GetTransaction(ctx context.Context, query GetTransactionQuery) (*ledger.Transaction, error) CountAccounts(ctx context.Context, a ListAccountsQuery) (int, error) ListAccounts(ctx context.Context, a ListAccountsQuery) (*bunpaginate.Cursor[ledger.ExpandedAccount], error) GetAccount(ctx context.Context, q GetAccountQuery) (*ledger.ExpandedAccount, error) diff --git a/internal/controller/ledger/store_generated.go b/internal/controller/ledger/store_generated.go index 229d2f984..dc09e6c49 100644 --- a/internal/controller/ledger/store_generated.go +++ b/internal/controller/ledger/store_generated.go @@ -319,10 +319,10 @@ func (mr *MockStoreMockRecorder) GetMigrationsInfo(ctx any) *gomock.Call { } // GetTransaction mocks base method. -func (m *MockStore) GetTransaction(ctx context.Context, query GetTransactionQuery) (*ledger.ExpandedTransaction, error) { +func (m *MockStore) GetTransaction(ctx context.Context, query GetTransactionQuery) (*ledger.Transaction, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "GetTransaction", ctx, query) - ret0, _ := ret[0].(*ledger.ExpandedTransaction) + ret0, _ := ret[0].(*ledger.Transaction) ret1, _ := ret[1].(error) return ret0, ret1 } @@ -394,10 +394,10 @@ func (mr *MockStoreMockRecorder) ListLogs(ctx, q any) *gomock.Call { } // ListTransactions mocks base method. -func (m *MockStore) ListTransactions(ctx context.Context, q ListTransactionsQuery) (*bunpaginate.Cursor[ledger.ExpandedTransaction], error) { +func (m *MockStore) ListTransactions(ctx context.Context, q ListTransactionsQuery) (*bunpaginate.Cursor[ledger.Transaction], error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "ListTransactions", ctx, q) - ret0, _ := ret[0].(*bunpaginate.Cursor[ledger.ExpandedTransaction]) + ret0, _ := ret[0].(*bunpaginate.Cursor[ledger.Transaction]) ret1, _ := ret[1].(error) return ret0, ret1 } diff --git a/internal/log.go b/internal/log.go index 3556dbeec..1fe2bd617 100644 --- a/internal/log.go +++ b/internal/log.go @@ -148,6 +148,32 @@ type NewTransactionLogPayload struct { AccountMetadata AccountMetadata `json:"accountMetadata"` } +// MarshalJSON override default json marshalling +// We don't want to store pc(v)e on the logs +// because : +// 1. It can change (effective only) +// 2. They are not part of the decision-making process +func (p NewTransactionLogPayload) MarshalJSON() ([]byte, error) { + // strip MarshalJSON of Transaction + type aux Transaction + type tx struct { + aux + PostCommitVolumes PostCommitVolumes `json:"postCommitVolumes,omitempty"` + PostCommitEffectiveVolumes PostCommitVolumes `json:"postCommitEffectiveVolumes,omitempty"` + } + + return json.Marshal(struct { + Transaction tx `json:"transaction"` + AccountMetadata AccountMetadata `json:"accountMetadata"` + }{ + Transaction: tx{ + aux: aux(p.Transaction), + }, + AccountMetadata: p.AccountMetadata, + }) + +} + func NewTransactionLog(tx Transaction, accountMetadata AccountMetadata) Log { if accountMetadata == nil { accountMetadata = AccountMetadata{} diff --git a/internal/storage/ledger/logs.go b/internal/storage/ledger/logs.go index c9557cb68..bfe2552e4 100644 --- a/internal/storage/ledger/logs.go +++ b/internal/storage/ledger/logs.go @@ -5,7 +5,6 @@ import ( "database/sql/driver" "encoding/json" "fmt" - "github.com/formancehq/ledger/internal/tracing" "github.com/formancehq/go-libs/bun/bunpaginate" @@ -73,6 +72,7 @@ func (s *Store) InsertLog(ctx context.Context, log *ledger.Log) error { } _, err := tracing.TraceWithLatency(ctx, "InsertLog", tracing.NoResult(func(ctx context.Context) error { + data, err := json.Marshal(log.Data) if err != nil { return errors.Wrap(err, "failed to marshal log data") diff --git a/internal/storage/ledger/logs_test.go b/internal/storage/ledger/logs_test.go index a3fe9a136..fe2c50fbd 100644 --- a/internal/storage/ledger/logs_test.go +++ b/internal/storage/ledger/logs_test.go @@ -22,6 +22,8 @@ import ( "github.com/stretchr/testify/require" ) +// todo: add log hash test with ledger v2 + func TestInsertLog(t *testing.T) { t.Parallel() diff --git a/internal/storage/ledger/moves.go b/internal/storage/ledger/moves.go index 8481f5ba5..3fdd367de 100644 --- a/internal/storage/ledger/moves.go +++ b/internal/storage/ledger/moves.go @@ -84,8 +84,8 @@ type Move struct { AccountSeq int `bun:"accounts_seq,type:int"` InsertionDate time.Time `bun:"insertion_date,type:timestamp"` EffectiveDate time.Time `bun:"effective_date,type:timestamp"` - PostCommitVolumes Volumes `bun:"post_commit_volumes,type:jsonb,scanonly"` - PostCommitEffectiveVolumes Volumes `bun:"post_commit_effective_volumes,type:jsonb,scanonly"` + PostCommitVolumes *Volumes `bun:"post_commit_volumes,type:jsonb,scanonly"` + PostCommitEffectiveVolumes *Volumes `bun:"post_commit_effective_volumes,type:jsonb,scanonly"` } type Moves []*Move @@ -108,3 +108,31 @@ func (m Moves) BalanceUpdates() map[string]map[string]*big.Int { return ret } + +func (m Moves) ComputePostCommitVolumes() TransactionsPostCommitVolumes { + ret := TransactionsPostCommitVolumes{} + for _, move := range m { + ret = append(ret, TransactionPostCommitVolume{ + AggregatedAccountVolume: AggregatedAccountVolume{ + Volumes: *move.PostCommitVolumes, + Asset: move.Asset, + }, + Account: move.Account, + }) + } + return ret +} + +func (m Moves) ComputePostCommitEffectiveVolumes() TransactionsPostCommitVolumes { + ret := TransactionsPostCommitVolumes{} + for _, move := range m { + ret = append(ret, TransactionPostCommitVolume{ + AggregatedAccountVolume: AggregatedAccountVolume{ + Volumes: *move.PostCommitEffectiveVolumes, + Asset: move.Asset, + }, + Account: move.Account, + }) + } + return ret +} diff --git a/internal/storage/ledger/moves_test.go b/internal/storage/ledger/moves_test.go index a623f8f9c..d596c6b86 100644 --- a/internal/storage/ledger/moves_test.go +++ b/internal/storage/ledger/moves_test.go @@ -19,182 +19,196 @@ import ( "testing" ) -func TestMovesPostCommitVolumesComputation(t *testing.T) { - t.Parallel() - - store := newLedgerStore(t) - ctx := logging.TestingContext() - - wp := pond.New(10, 10) - for i := 0; i < 1000; i++ { - wp.Submit(func() { - for { - sqlTx, err := store.GetDB().BeginTx(ctx, &sql.TxOptions{}) - require.NoError(t, err) - storeCP := store.WithDB(sqlTx) - - src := fmt.Sprintf("accounts:%d", rand.Intn(1000000)) - dst := fmt.Sprintf("accounts:%d", rand.Intn(1000000)) - - tx := ledger.NewTransaction().WithPostings( - ledger.NewPosting(src, dst, "USD", big.NewInt(1)), - ) - err = storeCP.CommitTransaction(ctx, &tx) - if errors.Is(err, postgres.ErrDeadlockDetected) { - require.NoError(t, sqlTx.Rollback()) - continue - } - require.NoError(t, err) - require.NoError(t, sqlTx.Commit()) - return - } - }) - } - wp.StopAndWait() - - aggregatedBalances, err := store.GetAggregatedBalances(ctx, ledgercontroller.NewGetAggregatedBalancesQuery(ledgercontroller.PITFilter{}, nil, true)) - require.NoError(t, err) - RequireEqual(t, ledger.BalancesByAssets{ - "USD": big.NewInt(0), - }, aggregatedBalances) -} - func TestMovesInsert(t *testing.T) { t.Parallel() - store := newLedgerStore(t) - ctx := logging.TestingContext() - - tx := Transaction{} - require.NoError(t, store.insertTransaction(ctx, &tx)) - - account := &Account{} - _, err := store.upsertAccount(ctx, account) - require.NoError(t, err) - - now := time.Now() - - // we will insert 5 tx at five different timestamps - // t0 ---------> t1 ---------> t2 ---------> t3 ----------> t4 - // m1 ---------> m3 ---------> m4 ---------> m2 ----------> m5 - t0 := now - t1 := t0.Add(time.Hour) - t2 := t1.Add(time.Hour) - t3 := t2.Add(time.Hour) - t4 := t3.Add(time.Hour) - - // insert a first tx at t0 - m1 := Move{ - Ledger: store.ledger.Name, - IsSource: true, - Account: "world", - AccountAddressArray: []string{"world"}, - Amount: (*bunpaginate.BigInt)(big.NewInt(100)), - Asset: "USD", - TransactionSeq: tx.Seq, - AccountSeq: account.Seq, - InsertionDate: t0, - EffectiveDate: t0, - } - require.NoError(t, store.insertMoves(ctx, &m1)) - require.Equal(t, Volumes{ - Inputs: big.NewInt(0), - Outputs: big.NewInt(100), - }, m1.PostCommitVolumes) - require.Equal(t, Volumes{ - Inputs: big.NewInt(0), - Outputs: big.NewInt(100), - }, m1.PostCommitEffectiveVolumes) - - // add a second move at t3 - m2 := Move{ - Ledger: store.ledger.Name, - IsSource: false, - Account: "world", - AccountAddressArray: []string{"world"}, - Amount: (*bunpaginate.BigInt)(big.NewInt(50)), - Asset: "USD", - TransactionSeq: tx.Seq, - AccountSeq: account.Seq, - InsertionDate: t3, - EffectiveDate: t3, - } - require.NoError(t, store.insertMoves(ctx, &m2)) - require.Equal(t, Volumes{ - Inputs: big.NewInt(50), - Outputs: big.NewInt(100), - }, m2.PostCommitVolumes) - require.Equal(t, Volumes{ - Inputs: big.NewInt(50), - Outputs: big.NewInt(100), - }, m2.PostCommitEffectiveVolumes) - - // add a third move at t1 - m3 := Move{ - Ledger: store.ledger.Name, - IsSource: true, - Account: "world", - AccountAddressArray: []string{"world"}, - Amount: (*bunpaginate.BigInt)(big.NewInt(200)), - Asset: "USD", - TransactionSeq: tx.Seq, - AccountSeq: account.Seq, - InsertionDate: t1, - EffectiveDate: t1, - } - require.NoError(t, store.insertMoves(ctx, &m3)) - require.Equal(t, Volumes{ - Inputs: big.NewInt(50), - Outputs: big.NewInt(300), - }, m3.PostCommitVolumes) - require.Equal(t, Volumes{ - Inputs: big.NewInt(0), - Outputs: big.NewInt(300), - }, m3.PostCommitEffectiveVolumes) - - // add a fourth move at t2 - m4 := Move{ - Ledger: store.ledger.Name, - IsSource: false, - Account: "world", - AccountAddressArray: []string{"world"}, - Amount: (*bunpaginate.BigInt)(big.NewInt(50)), - Asset: "USD", - TransactionSeq: tx.Seq, - AccountSeq: account.Seq, - InsertionDate: t2, - EffectiveDate: t2, - } - require.NoError(t, store.insertMoves(ctx, &m4)) - require.Equal(t, Volumes{ - Inputs: big.NewInt(100), - Outputs: big.NewInt(300), - }, m4.PostCommitVolumes) - require.Equal(t, Volumes{ - Inputs: big.NewInt(50), - Outputs: big.NewInt(300), - }, m4.PostCommitEffectiveVolumes) - - // add a fifth move at t4 - m5 := Move{ - Ledger: store.ledger.Name, - IsSource: false, - Account: "world", - AccountAddressArray: []string{"world"}, - Amount: (*bunpaginate.BigInt)(big.NewInt(50)), - Asset: "USD", - TransactionSeq: tx.Seq, - AccountSeq: account.Seq, - InsertionDate: t4, - EffectiveDate: t4, - } - require.NoError(t, store.insertMoves(ctx, &m5)) - require.Equal(t, Volumes{ - Inputs: big.NewInt(150), - Outputs: big.NewInt(300), - }, m5.PostCommitVolumes) - require.Equal(t, Volumes{ - Inputs: big.NewInt(150), - Outputs: big.NewInt(300), - }, m5.PostCommitEffectiveVolumes) + t.Run("nominal", func(t *testing.T) { + t.Parallel() + + store := newLedgerStore(t) + ctx := logging.TestingContext() + + tx := Transaction{} + require.NoError(t, store.insertTransaction(ctx, &tx)) + + account := &Account{} + _, err := store.upsertAccount(ctx, account) + require.NoError(t, err) + + now := time.Now() + + // we will insert 5 tx at five different timestamps + // t0 ---------> t1 ---------> t2 ---------> t3 ----------> t4 + // m1 ---------> m3 ---------> m4 ---------> m2 ----------> m5 + t0 := now + t1 := t0.Add(time.Hour) + t2 := t1.Add(time.Hour) + t3 := t2.Add(time.Hour) + t4 := t3.Add(time.Hour) + + // insert a first tx at t0 + m1 := Move{ + Ledger: store.ledger.Name, + IsSource: true, + Account: "world", + AccountAddressArray: []string{"world"}, + Amount: (*bunpaginate.BigInt)(big.NewInt(100)), + Asset: "USD", + TransactionSeq: tx.Seq, + AccountSeq: account.Seq, + InsertionDate: t0, + EffectiveDate: t0, + } + require.NoError(t, store.insertMoves(ctx, &m1)) + require.NotNil(t, m1.PostCommitVolumes) + require.Equal(t, Volumes{ + Inputs: big.NewInt(0), + Outputs: big.NewInt(100), + }, *m1.PostCommitVolumes) + require.NotNil(t, m1.PostCommitEffectiveVolumes) + require.Equal(t, Volumes{ + Inputs: big.NewInt(0), + Outputs: big.NewInt(100), + }, *m1.PostCommitEffectiveVolumes) + + // add a second move at t3 + m2 := Move{ + Ledger: store.ledger.Name, + IsSource: false, + Account: "world", + AccountAddressArray: []string{"world"}, + Amount: (*bunpaginate.BigInt)(big.NewInt(50)), + Asset: "USD", + TransactionSeq: tx.Seq, + AccountSeq: account.Seq, + InsertionDate: t3, + EffectiveDate: t3, + } + require.NoError(t, store.insertMoves(ctx, &m2)) + require.NotNil(t, m2.PostCommitVolumes) + require.Equal(t, Volumes{ + Inputs: big.NewInt(50), + Outputs: big.NewInt(100), + }, *m2.PostCommitVolumes) + require.NotNil(t, m2.PostCommitEffectiveVolumes) + require.Equal(t, Volumes{ + Inputs: big.NewInt(50), + Outputs: big.NewInt(100), + }, *m2.PostCommitEffectiveVolumes) + + // add a third move at t1 + m3 := Move{ + Ledger: store.ledger.Name, + IsSource: true, + Account: "world", + AccountAddressArray: []string{"world"}, + Amount: (*bunpaginate.BigInt)(big.NewInt(200)), + Asset: "USD", + TransactionSeq: tx.Seq, + AccountSeq: account.Seq, + InsertionDate: t1, + EffectiveDate: t1, + } + require.NoError(t, store.insertMoves(ctx, &m3)) + require.NotNil(t, m3.PostCommitVolumes) + require.Equal(t, Volumes{ + Inputs: big.NewInt(50), + Outputs: big.NewInt(300), + }, *m3.PostCommitVolumes) + require.NotNil(t, m3.PostCommitEffectiveVolumes) + require.Equal(t, Volumes{ + Inputs: big.NewInt(0), + Outputs: big.NewInt(300), + }, *m3.PostCommitEffectiveVolumes) + + // add a fourth move at t2 + m4 := Move{ + Ledger: store.ledger.Name, + IsSource: false, + Account: "world", + AccountAddressArray: []string{"world"}, + Amount: (*bunpaginate.BigInt)(big.NewInt(50)), + Asset: "USD", + TransactionSeq: tx.Seq, + AccountSeq: account.Seq, + InsertionDate: t2, + EffectiveDate: t2, + } + require.NoError(t, store.insertMoves(ctx, &m4)) + require.NotNil(t, m4.PostCommitVolumes) + require.Equal(t, Volumes{ + Inputs: big.NewInt(100), + Outputs: big.NewInt(300), + }, *m4.PostCommitVolumes) + require.NotNil(t, m4.PostCommitEffectiveVolumes) + require.Equal(t, Volumes{ + Inputs: big.NewInt(50), + Outputs: big.NewInt(300), + }, *m4.PostCommitEffectiveVolumes) + + // add a fifth move at t4 + m5 := Move{ + Ledger: store.ledger.Name, + IsSource: false, + Account: "world", + AccountAddressArray: []string{"world"}, + Amount: (*bunpaginate.BigInt)(big.NewInt(50)), + Asset: "USD", + TransactionSeq: tx.Seq, + AccountSeq: account.Seq, + InsertionDate: t4, + EffectiveDate: t4, + } + require.NoError(t, store.insertMoves(ctx, &m5)) + require.NotNil(t, m5.PostCommitVolumes) + require.Equal(t, Volumes{ + Inputs: big.NewInt(150), + Outputs: big.NewInt(300), + }, *m5.PostCommitVolumes) + require.NotNil(t, m5.PostCommitEffectiveVolumes) + require.Equal(t, Volumes{ + Inputs: big.NewInt(150), + Outputs: big.NewInt(300), + }, *m5.PostCommitEffectiveVolumes) + }) + + t.Run("with high concurrency", func(t *testing.T) { + t.Parallel() + + store := newLedgerStore(t) + ctx := logging.TestingContext() + + wp := pond.New(10, 10) + for i := 0; i < 1000; i++ { + wp.Submit(func() { + for { + sqlTx, err := store.GetDB().BeginTx(ctx, &sql.TxOptions{}) + require.NoError(t, err) + storeCP := store.WithDB(sqlTx) + + src := fmt.Sprintf("accounts:%d", rand.Intn(1000000)) + dst := fmt.Sprintf("accounts:%d", rand.Intn(1000000)) + + tx := ledger.NewTransaction().WithPostings( + ledger.NewPosting(src, dst, "USD", big.NewInt(1)), + ) + err = storeCP.CommitTransaction(ctx, &tx) + if errors.Is(err, postgres.ErrDeadlockDetected) { + require.NoError(t, sqlTx.Rollback()) + continue + } + require.NoError(t, err) + require.NoError(t, sqlTx.Commit()) + return + } + }) + } + wp.StopAndWait() + + aggregatedBalances, err := store.GetAggregatedBalances(ctx, ledgercontroller.NewGetAggregatedBalancesQuery(ledgercontroller.PITFilter{}, nil, true)) + require.NoError(t, err) + RequireEqual(t, ledger.BalancesByAssets{ + "USD": big.NewInt(0), + }, aggregatedBalances) + }) } diff --git a/internal/storage/ledger/transactions.go b/internal/storage/ledger/transactions.go index feb60f5a2..fad0d2600 100644 --- a/internal/storage/ledger/transactions.go +++ b/internal/storage/ledger/transactions.go @@ -35,19 +35,21 @@ var ( type Transaction struct { bun.BaseModel `bun:"table:transactions,alias:transactions"` - Ledger string `bun:"ledger,type:varchar"` - ID int `bun:"id,type:numeric"` - Seq int `bun:"seq,scanonly"` - Timestamp *time.Time `bun:"timestamp,type:timestamp without time zone"` - Reference string `bun:"reference,type:varchar,unique,nullzero"` - Postings []ledger.Posting `bun:"postings,type:jsonb"` - Metadata metadata.Metadata `bun:"metadata,type:jsonb,default:'{}'"` - RevertedAt *time.Time `bun:"reverted_at"` - InsertedAt *time.Time `bun:"inserted_at"` - Sources []string `bun:"sources,type:jsonb"` - Destinations []string `bun:"destinations,type:jsonb"` - SourcesArray []map[string]any `bun:"sources_arrays,type:jsonb"` - DestinationsArray []map[string]any `bun:"destinations_arrays,type:jsonb"` + Ledger string `bun:"ledger,type:varchar"` + ID int `bun:"id,type:numeric"` + Seq int `bun:"seq,scanonly"` + Timestamp *time.Time `bun:"timestamp,type:timestamp without time zone"` + Reference string `bun:"reference,type:varchar,unique,nullzero"` + Postings []ledger.Posting `bun:"postings,type:jsonb"` + Metadata metadata.Metadata `bun:"metadata,type:jsonb,default:'{}'"` + RevertedAt *time.Time `bun:"reverted_at"` + InsertedAt *time.Time `bun:"inserted_at"` + Sources []string `bun:"sources,type:jsonb"` + Destinations []string `bun:"destinations,type:jsonb"` + SourcesArray []map[string]any `bun:"sources_arrays,type:jsonb"` + DestinationsArray []map[string]any `bun:"destinations_arrays,type:jsonb"` + PostCommitEffectiveVolumes TransactionsPostCommitVolumes `bun:"post_commit_effective_volumes,type:jsonb,scanonly"` + PostCommitVolumes TransactionsPostCommitVolumes `bun:"post_commit_volumes,type:jsonb,scanonly"` } func (t Transaction) toCore() ledger.Transaction { @@ -59,8 +61,10 @@ func (t Transaction) toCore() ledger.Transaction { Postings: t.Postings, InsertedAt: *t.InsertedAt, }, - ID: t.ID, - Reverted: t.RevertedAt != nil && !t.RevertedAt.IsZero(), + ID: t.ID, + Reverted: t.RevertedAt != nil && !t.RevertedAt.IsZero(), + PostCommitEffectiveVolumes: t.PostCommitEffectiveVolumes.toCore(), + PostCommitVolumes: t.PostCommitVolumes.toCore(), } } @@ -93,21 +97,6 @@ func (p TransactionsPostCommitVolumes) toCore() ledger.PostCommitVolumes { return ret } -type ExpandedTransaction struct { - Transaction `bun:",extend"` - - PostCommitEffectiveVolumes TransactionsPostCommitVolumes `bun:"post_commit_effective_volumes,type:jsonb,scanonly"` - PostCommitVolumes TransactionsPostCommitVolumes `bun:"post_commit_volumes,type:jsonb,scanonly"` -} - -func (t ExpandedTransaction) toCore() ledger.ExpandedTransaction { - return ledger.ExpandedTransaction{ - Transaction: t.Transaction.toCore(), - PostCommitEffectiveVolumes: t.PostCommitEffectiveVolumes.toCore(), - PostCommitVolumes: t.PostCommitVolumes.toCore(), - } -} - func (s *Store) selectDistinctTransactionMetadataHistories(date *time.Time) *bun.SelectQuery { ret := s.db.NewSelect(). DistinctOn("transactions_seq"). @@ -412,13 +401,19 @@ func (s *Store) CommitTransaction(ctx context.Context, tx *ledger.Transaction) e tx.ID = mappedTx.ID tx.InsertedAt = *mappedTx.InsertedAt tx.Timestamp = *mappedTx.Timestamp + if s.ledger.HasFeature(ledger.FeaturePostCommitVolumes, "SYNC") { + tx.PostCommitVolumes = moves.ComputePostCommitVolumes().toCore() + } + if s.ledger.HasFeature(ledger.FeaturePostCommitEffectiveVolumes, "SYNC") { + tx.PostCommitEffectiveVolumes = moves.ComputePostCommitEffectiveVolumes().toCore() + } return nil } -func (s *Store) ListTransactions(ctx context.Context, q ledgercontroller.ListTransactionsQuery) (*bunpaginate.Cursor[ledger.ExpandedTransaction], error) { - return tracing.Trace(ctx, "ListTransactions", func(ctx context.Context) (*bunpaginate.Cursor[ledger.ExpandedTransaction], error) { - cursor, err := bunpaginate.UsingColumn[ledgercontroller.PaginatedQueryOptions[ledgercontroller.PITFilterWithVolumes], ExpandedTransaction]( +func (s *Store) ListTransactions(ctx context.Context, q ledgercontroller.ListTransactionsQuery) (*bunpaginate.Cursor[ledger.Transaction], error) { + return tracing.Trace(ctx, "ListTransactions", func(ctx context.Context) (*bunpaginate.Cursor[ledger.Transaction], error) { + cursor, err := bunpaginate.UsingColumn[ledgercontroller.PaginatedQueryOptions[ledgercontroller.PITFilterWithVolumes], Transaction]( ctx, s.selectTransactions( q.Options.Options.PIT, @@ -432,7 +427,7 @@ func (s *Store) ListTransactions(ctx context.Context, q ledgercontroller.ListTra return nil, err } - return bunpaginate.MapCursor(cursor, ExpandedTransaction.toCore), nil + return bunpaginate.MapCursor(cursor, Transaction.toCore), nil }) } @@ -449,9 +444,9 @@ func (s *Store) CountTransactions(ctx context.Context, q ledgercontroller.ListTr }) } -func (s *Store) GetTransaction(ctx context.Context, filter ledgercontroller.GetTransactionQuery) (*ledger.ExpandedTransaction, error) { - return tracing.TraceWithLatency(ctx, "GetTransaction", func(ctx context.Context) (*ledger.ExpandedTransaction, error) { - ret := &ExpandedTransaction{} +func (s *Store) GetTransaction(ctx context.Context, filter ledgercontroller.GetTransactionQuery) (*ledger.Transaction, error) { + return tracing.TraceWithLatency(ctx, "GetTransaction", func(ctx context.Context) (*ledger.Transaction, error) { + ret := &Transaction{} if err := s.selectTransactions( filter.PIT, filter.ExpandVolumes, diff --git a/internal/storage/ledger/transactions_test.go b/internal/storage/ledger/transactions_test.go index 0ae92e2ee..eef00269c 100644 --- a/internal/storage/ledger/transactions_test.go +++ b/internal/storage/ledger/transactions_test.go @@ -6,7 +6,6 @@ import ( "context" "database/sql" "fmt" - "github.com/formancehq/go-libs/collectionutils" "github.com/formancehq/go-libs/platform/postgres" ledgercontroller "github.com/formancehq/ledger/internal/controller/ledger" "math/big" @@ -226,6 +225,21 @@ func TestTransactionsCommit(t *testing.T) { require.NotZero(t, tx.Timestamp) require.NotZero(t, tx.InsertedAt) require.Equal(t, 1, tx.ID) + require.Equal(t, ledger.PostCommitVolumes{ + "account:1": ledger.VolumesByAssets{ + "USD": ledger.Volumes{ + Inputs: big.NewInt(0), + Outputs: big.NewInt(100), + }, + }, + "account:2": ledger.VolumesByAssets{ + "USD": ledger.Volumes{ + Inputs: big.NewInt(100), + Outputs: big.NewInt(0), + }, + }, + }, tx.PostCommitVolumes) + require.Equal(t, tx.PostCommitVolumes, tx.PostCommitEffectiveVolumes) }) t.Run("triggering a deadlock should return appropriate postgres error", func(t *testing.T) { @@ -426,6 +440,8 @@ func TestTransactionsRevert(t *testing.T) { require.NotNil(t, revertedTx) require.True(t, revertedTx.Reverted) revertedTx.Reverted = false + tx1.PostCommitEffectiveVolumes = ledger.PostCommitVolumes{} + tx1.PostCommitVolumes = ledger.PostCommitVolumes{} require.Equal(t, tx1, *revertedTx) // try to revert again @@ -504,27 +520,37 @@ func TestTransactionsList(t *testing.T) { err = store.CommitTransaction(ctx, &tx2) require.NoError(t, err) - tx3 := ledger.NewTransaction(). + tx3BeforeRevert := ledger.NewTransaction(). WithPostings( ledger.NewPosting("world", "users:marley", "USD", big.NewInt(100)), ). WithMetadata(metadata.Metadata{"category": "3"}). WithTimestamp(now.Add(-time.Hour)) - err = store.CommitTransaction(ctx, &tx3) + err = store.CommitTransaction(ctx, &tx3BeforeRevert) require.NoError(t, err) - tx3AfterRevert, hasBeenReverted, err := store.RevertTransaction(ctx, tx3.ID) + _, hasBeenReverted, err := store.RevertTransaction(ctx, tx3BeforeRevert.ID) require.NoError(t, err) require.True(t, hasBeenReverted) - tx4 := tx3.Reverse(false).WithTimestamp(now) + tx4 := tx3BeforeRevert.Reverse(false).WithTimestamp(now) err = store.CommitTransaction(ctx, &tx4) require.NoError(t, err) - tx3AfterRevert, _, err = store.UpdateTransactionMetadata(ctx, tx3AfterRevert.ID, metadata.Metadata{ + _, _, err = store.UpdateTransactionMetadata(ctx, tx3BeforeRevert.ID, metadata.Metadata{ "additional_metadata": "true", }) + // refresh tx3 + // we can't take the result of the call on RevertTransaction nor UpdateTransactionMetadata as the result does not contains pc(e)v + tx3 := func() ledger.Transaction { + tx3, err := store.GetTransaction(ctx, ledgercontroller.NewGetTransactionQuery(tx3BeforeRevert.ID). + WithExpandVolumes(). + WithExpandEffectiveVolumes()) + require.NoError(t, err) + return *tx3 + }() + tx5 := ledger.NewTransaction(). WithPostings( ledger.NewPosting("users:marley", "sellers:amazon", "USD", big.NewInt(100)), @@ -543,7 +569,7 @@ func TestTransactionsList(t *testing.T) { { name: "nominal", query: ledgercontroller.NewPaginatedQueryOptions(ledgercontroller.PITFilterWithVolumes{}), - expected: []ledger.Transaction{tx5, tx4, *tx3AfterRevert, tx2, tx1}, + expected: []ledger.Transaction{tx5, tx4, tx3, tx2, tx1}, }, { name: "address filter", @@ -561,7 +587,7 @@ func TestTransactionsList(t *testing.T) { name: "address filter using segment", query: ledgercontroller.NewPaginatedQueryOptions(ledgercontroller.PITFilterWithVolumes{}). WithQueryBuilder(query.Match("account", "users:")), - expected: []ledger.Transaction{tx5, tx4, *tx3AfterRevert}, + expected: []ledger.Transaction{tx5, tx4, tx3}, }, { name: "filter using metadata", @@ -576,7 +602,7 @@ func TestTransactionsList(t *testing.T) { PIT: pointer.For(now.Add(-time.Hour)), }, }), - expected: []ledger.Transaction{tx3, tx2, tx1}, + expected: []ledger.Transaction{tx3BeforeRevert, tx2, tx1}, }, { name: "filter using invalid key", @@ -588,13 +614,13 @@ func TestTransactionsList(t *testing.T) { name: "reverted transactions", query: ledgercontroller.NewPaginatedQueryOptions(ledgercontroller.PITFilterWithVolumes{}). WithQueryBuilder(query.Match("reverted", true)), - expected: []ledger.Transaction{*tx3AfterRevert}, + expected: []ledger.Transaction{tx3}, }, { name: "filter using exists metadata", query: ledgercontroller.NewPaginatedQueryOptions(ledgercontroller.PITFilterWithVolumes{}). WithQueryBuilder(query.Exists("metadata", "category")), - expected: []ledger.Transaction{*tx3AfterRevert, tx2, tx1}, + expected: []ledger.Transaction{tx3, tx2, tx1}, }, { name: "filter using exists metadata and pit", @@ -620,14 +646,15 @@ func TestTransactionsList(t *testing.T) { t.Parallel() tc.query.Options.ExpandVolumes = true - tc.query.Options.ExpandEffectiveVolumes = false + tc.query.Options.ExpandEffectiveVolumes = true + cursor, err := store.ListTransactions(ctx, ledgercontroller.NewListTransactionsQuery(tc.query)) if tc.expectError != nil { require.True(t, errors.Is(err, tc.expectError)) } else { require.NoError(t, err) require.Len(t, cursor.Data, len(tc.expected)) - RequireEqual(t, tc.expected, collectionutils.Map(cursor.Data, ledger.ExpandedTransaction.Base)) + RequireEqual(t, tc.expected, cursor.Data) count, err := store.CountTransactions(ctx, ledgercontroller.NewListTransactionsQuery(tc.query)) require.NoError(t, err) diff --git a/internal/transaction.go b/internal/transaction.go index 5bc1c37d4..3e0d8b2a5 100644 --- a/internal/transaction.go +++ b/internal/transaction.go @@ -38,8 +38,10 @@ func NewTransactionData() TransactionData { type Transaction struct { TransactionData - ID int `json:"id"` - Reverted bool `json:"reverted"` + ID int `json:"id"` + Reverted bool `json:"reverted"` + PostCommitVolumes PostCommitVolumes `json:"postCommitVolumes,omitempty"` + PostCommitEffectiveVolumes PostCommitVolumes `json:"postCommitEffectiveVolumes,omitempty"` } func (tx Transaction) Reverse(atEffectiveDate bool) Transaction { @@ -103,24 +105,8 @@ func (tx Transaction) InvolvedAccounts() []string { return slices.Compact(ret) } -func NewTransaction() Transaction { - return Transaction{ - TransactionData: NewTransactionData(), - } -} - -type ExpandedTransaction struct { - Transaction - PostCommitVolumes PostCommitVolumes `json:"postCommitVolumes,omitempty"` - PostCommitEffectiveVolumes PostCommitVolumes `json:"postCommitEffectiveVolumes,omitempty"` -} - -func (t ExpandedTransaction) Base() Transaction { - return t.Transaction -} - -func (t ExpandedTransaction) MarshalJSON() ([]byte, error) { - type Aux ExpandedTransaction +func (tx Transaction) MarshalJSON() ([]byte, error) { + type Aux Transaction type Ret struct { Aux @@ -132,19 +118,19 @@ func (t ExpandedTransaction) MarshalJSON() ([]byte, error) { preCommitVolumes PostCommitVolumes preCommitEffectiveVolumes PostCommitVolumes ) - if len(t.PostCommitVolumes) > 0 { - if t.PostCommitVolumes != nil { - preCommitVolumes = t.PostCommitVolumes.Copy() - for _, posting := range t.Postings { + if len(tx.PostCommitVolumes) > 0 { + if tx.PostCommitVolumes != nil { + preCommitVolumes = tx.PostCommitVolumes.Copy() + for _, posting := range tx.Postings { preCommitVolumes.AddOutput(posting.Source, posting.Asset, big.NewInt(0).Neg(posting.Amount)) preCommitVolumes.AddInput(posting.Destination, posting.Asset, big.NewInt(0).Neg(posting.Amount)) } } } - if len(t.PostCommitEffectiveVolumes) > 0 { - if t.PostCommitEffectiveVolumes != nil { - preCommitEffectiveVolumes = t.PostCommitEffectiveVolumes.Copy() - for _, posting := range t.Postings { + if len(tx.PostCommitEffectiveVolumes) > 0 { + if tx.PostCommitEffectiveVolumes != nil { + preCommitEffectiveVolumes = tx.PostCommitEffectiveVolumes.Copy() + for _, posting := range tx.Postings { preCommitEffectiveVolumes.AddOutput(posting.Source, posting.Asset, big.NewInt(0).Neg(posting.Amount)) preCommitEffectiveVolumes.AddInput(posting.Destination, posting.Asset, big.NewInt(0).Neg(posting.Amount)) } @@ -152,12 +138,18 @@ func (t ExpandedTransaction) MarshalJSON() ([]byte, error) { } return json.Marshal(&Ret{ - Aux: Aux(t), + Aux: Aux(tx), PreCommitVolumes: preCommitVolumes, PreCommitEffectiveVolumes: preCommitEffectiveVolumes, }) } +func NewTransaction() Transaction { + return Transaction{ + TransactionData: NewTransactionData(), + } +} + type TransactionRequest struct { Postings Postings `json:"postings"` Script ScriptV1 `json:"script"` diff --git a/test/performance/env_remote_ledger_test.go b/test/performance/env_remote_ledger_test.go index c5fcb3577..aff7a9227 100644 --- a/test/performance/env_remote_ledger_test.go +++ b/test/performance/env_remote_ledger_test.go @@ -23,6 +23,7 @@ type RemoteLedgerEnvFactory struct { func (r *RemoteLedgerEnvFactory) Create(ctx context.Context, b *testing.B, ledger ledger.Ledger) Env { + // todo: use standalone sdk only client := ledgerclient.New( ledgerclient.WithClient(r.httpClient), ledgerclient.WithServerURL(r.ledgerURL),