diff --git a/internal/api/v1/controllers_transactions_create.go b/internal/api/v1/controllers_transactions_create.go index 0df8b3d10..7b156fe77 100644 --- a/internal/api/v1/controllers_transactions_create.go +++ b/internal/api/v1/controllers_transactions_create.go @@ -119,6 +119,7 @@ func createTransaction(w http.ResponseWriter, r *http.Request) { Metadata: payload.Metadata, } + // todo: handle missing error cases res, err := l.CreateTransaction(r.Context(), getCommandParameters(r), runScript) if err != nil { switch { diff --git a/internal/api/v1/controllers_transactions_create_test.go b/internal/api/v1/controllers_transactions_create_test.go index a05839b14..1e146ee32 100644 --- a/internal/api/v1/controllers_transactions_create_test.go +++ b/internal/api/v1/controllers_transactions_create_test.go @@ -2,6 +2,7 @@ package v1 import ( "encoding/json" + "github.com/formancehq/go-libs/time" "math/big" "net/http" "net/http/httptest" @@ -221,6 +222,7 @@ func TestTransactionsCreate(t *testing.T) { systemController, ledgerController := newTestingSystemController(t, true) if testCase.expectedStatusCode < 300 && testCase.expectedStatusCode >= 200 { + testCase.expectedRunScript.Timestamp = time.Time{} ledgerController.EXPECT(). CreateTransaction(gomock.Any(), ledgercontroller.Parameters{ DryRun: tc.expectedPreview, diff --git a/internal/api/v2/controllers_transactions_create_test.go b/internal/api/v2/controllers_transactions_create_test.go index 1be4163bf..c484ea161 100644 --- a/internal/api/v2/controllers_transactions_create_test.go +++ b/internal/api/v2/controllers_transactions_create_test.go @@ -1,6 +1,7 @@ package v2 import ( + "github.com/formancehq/go-libs/time" "math/big" "net/http" "net/http/httptest" @@ -401,6 +402,7 @@ func TestTransactionCreate(t *testing.T) { systemController, ledgerController := newTestingSystemController(t, true) if testCase.expectControllerCall { + testCase.expectedRunScript.Timestamp = time.Time{} expect := ledgerController.EXPECT(). CreateTransaction(gomock.Any(), ledgercontroller.Parameters{ DryRun: tc.expectedDryRun, diff --git a/internal/storage/bucket/migrations/0-init-schema.sql b/internal/storage/bucket/migrations/0-init-schema.sql index 576f403b9..408dd493c 100644 --- a/internal/storage/bucket/migrations/0-init-schema.sql +++ b/internal/storage/bucket/migrations/0-init-schema.sql @@ -551,11 +551,13 @@ as $$ begin insert into "{{.Bucket}}".accounts_metadata (ledger, accounts_seq, revision, date, metadata) - values (new.ledger, new.seq, (select revision + 1 - from "{{.Bucket}}".accounts_metadata - where accounts_metadata.accounts_seq = new.seq - order by revision desc - limit 1), new.updated_at, new.metadata); + values (new.ledger, new.seq, ( + select revision + 1 + from "{{.Bucket}}".accounts_metadata + where accounts_metadata.accounts_seq = new.seq + order by revision desc + limit 1 + ), new.updated_at, new.metadata); return new; end; diff --git a/internal/storage/bucket/migrations/11-stateless.sql b/internal/storage/bucket/migrations/11-stateless.sql index 7b23cacdc..9319415c6 100644 --- a/internal/storage/bucket/migrations/11-stateless.sql +++ b/internal/storage/bucket/migrations/11-stateless.sql @@ -21,6 +21,9 @@ rename attribute inputs to input; alter type "{{.Bucket}}".volumes rename attribute outputs to output; +alter table "{{.Bucket}}".transactions +add column post_commit_volumes jsonb not null ; + alter table "{{.Bucket}}".moves alter column post_commit_volumes drop not null, @@ -119,7 +122,6 @@ create function "{{.Bucket}}".set_volumes() as $$ begin - --todo: use balances table directly... new.post_commit_volumes = coalesce(( select ( (post_commit_volumes).input + case when new.is_source then 0 else new.amount end, diff --git a/internal/storage/ledger/accounts_test.go b/internal/storage/ledger/accounts_test.go index 652931cad..02e9f2c99 100644 --- a/internal/storage/ledger/accounts_test.go +++ b/internal/storage/ledger/accounts_test.go @@ -290,6 +290,7 @@ func TestGetAccount(t *testing.T) { t.Run("find account in past", func(t *testing.T) { t.Parallel() + account, err := store.GetAccount(ctx, ledgercontroller.NewGetAccountQuery("multi").WithPIT(now.Add(-30*time.Second))) require.NoError(t, err) require.Equal(t, ledger.ExpandedAccount{ diff --git a/internal/storage/ledger/balances_test.go b/internal/storage/ledger/balances_test.go index 159a1a594..9e56c2a4f 100644 --- a/internal/storage/ledger/balances_test.go +++ b/internal/storage/ledger/balances_test.go @@ -281,7 +281,7 @@ func TestUpdateBalances(t *testing.T) { AccountsSeq: world.Seq, }) require.NoError(t, err) - require.Equal(t, map[string]map[string]ledger.Volumes{ + require.Equal(t, ledger.PostCommitVolumes{ "world": { "USD/2": ledger.NewVolumesInt64(0, 100), }, @@ -295,7 +295,7 @@ func TestUpdateBalances(t *testing.T) { Output: big.NewInt(0), }) require.NoError(t, err) - require.Equal(t, map[string]map[string]ledger.Volumes{ + require.Equal(t, ledger.PostCommitVolumes{ "world": { "USD/2": ledger.NewVolumesInt64(50, 100), }, @@ -310,7 +310,7 @@ func TestUpdateBalances(t *testing.T) { AccountsSeq: world.Seq, }) require.NoError(t, err) - require.Equal(t, map[string]map[string]ledger.Volumes{ + require.Equal(t, ledger.PostCommitVolumes{ "world": { "USD/2": ledger.NewVolumesInt64(100, 150), }, diff --git a/internal/storage/ledger/moves.go b/internal/storage/ledger/moves.go index 028994a9d..6c3b9a4cf 100644 --- a/internal/storage/ledger/moves.go +++ b/internal/storage/ledger/moves.go @@ -126,29 +126,13 @@ func (m Moves) volumeUpdates() []AccountsVolumes { return ret } -func (m Moves) ComputePostCommitVolumes() TransactionsPostCommitVolumes { - ret := TransactionsPostCommitVolumes{} +func (m Moves) ComputePostCommitEffectiveVolumes() ledger.PostCommitVolumes { + ret := ledger.PostCommitVolumes{} for _, move := range m { - ret = append(ret, TransactionPostCommitVolume{ - AggregatedAccountVolume: AggregatedAccountVolume{ - Volumes: *move.PostCommitVolumes, - Asset: move.Asset, + ret = ret.Merge(ledger.PostCommitVolumes{ + move.Account: ledger.VolumesByAssets{ + move.Asset: *move.PostCommitEffectiveVolumes, }, - Account: move.Account, - }) - } - return ret -} - -func (m Moves) ComputePostCommitEffectiveVolumes() TransactionsPostCommitVolumes { - ret := TransactionsPostCommitVolumes{} - for _, move := range m { - ret = append(ret, TransactionPostCommitVolume{ - AggregatedAccountVolume: AggregatedAccountVolume{ - Volumes: *move.PostCommitEffectiveVolumes, - Asset: move.Asset, - }, - Account: move.Account, }) } return ret diff --git a/internal/storage/ledger/transactions.go b/internal/storage/ledger/transactions.go index ec6141a90..f23f57324 100644 --- a/internal/storage/ledger/transactions.go +++ b/internal/storage/ledger/transactions.go @@ -35,21 +35,21 @@ var ( type Transaction struct { bun.BaseModel `bun:"table:transactions,alias:transactions"` - Ledger string `bun:"ledger,type:varchar"` - ID int `bun:"id,type:numeric"` - Seq int `bun:"seq,scanonly"` - Timestamp *time.Time `bun:"timestamp,type:timestamp without time zone"` - Reference string `bun:"reference,type:varchar,unique,nullzero"` - Postings []ledger.Posting `bun:"postings,type:jsonb"` - Metadata metadata.Metadata `bun:"metadata,type:jsonb,default:'{}'"` - RevertedAt *time.Time `bun:"reverted_at"` - InsertedAt *time.Time `bun:"inserted_at"` - Sources []string `bun:"sources,type:jsonb"` - Destinations []string `bun:"destinations,type:jsonb"` - SourcesArray []map[string]any `bun:"sources_arrays,type:jsonb"` - DestinationsArray []map[string]any `bun:"destinations_arrays,type:jsonb"` - PostCommitEffectiveVolumes TransactionsPostCommitVolumes `bun:"post_commit_effective_volumes,type:jsonb,scanonly"` - PostCommitVolumes TransactionsPostCommitVolumes `bun:"post_commit_volumes,type:jsonb,scanonly"` + Ledger string `bun:"ledger,type:varchar"` + ID int `bun:"id,type:numeric"` + Seq int `bun:"seq,scanonly"` + Timestamp time.Time `bun:"timestamp,type:timestamp without time zone,nullzero"` + Reference string `bun:"reference,type:varchar,unique,nullzero"` + Postings []ledger.Posting `bun:"postings,type:jsonb"` + Metadata metadata.Metadata `bun:"metadata,type:jsonb,default:'{}'"` + RevertedAt *time.Time `bun:"reverted_at,type:timestamp without time zone"` + InsertedAt time.Time `bun:"inserted_at,type:timestamp without time zone,nullzero"` + Sources []string `bun:"sources,type:jsonb"` + Destinations []string `bun:"destinations,type:jsonb"` + SourcesArray []map[string]any `bun:"sources_arrays,type:jsonb"` + DestinationsArray []map[string]any `bun:"destinations_arrays,type:jsonb"` + PostCommitEffectiveVolumes ledger.PostCommitVolumes `bun:"post_commit_effective_volumes,type:jsonb,scanonly"` + PostCommitVolumes ledger.PostCommitVolumes `bun:"post_commit_volumes,type:jsonb"` } func (t Transaction) toCore() ledger.Transaction { @@ -57,46 +57,17 @@ func (t Transaction) toCore() ledger.Transaction { TransactionData: ledger.TransactionData{ Reference: t.Reference, Metadata: t.Metadata, - Timestamp: *t.Timestamp, + Timestamp: t.Timestamp, Postings: t.Postings, - InsertedAt: *t.InsertedAt, + InsertedAt: t.InsertedAt, }, ID: t.ID, Reverted: t.RevertedAt != nil && !t.RevertedAt.IsZero(), - PostCommitEffectiveVolumes: t.PostCommitEffectiveVolumes.toCore(), - PostCommitVolumes: t.PostCommitVolumes.toCore(), + PostCommitEffectiveVolumes: t.PostCommitEffectiveVolumes, + PostCommitVolumes: t.PostCommitVolumes, } } -type TransactionPostCommitVolume struct { - AggregatedAccountVolume - Account string `bun:"account"` -} - -type TransactionsPostCommitVolumes []TransactionPostCommitVolume - -func (p TransactionsPostCommitVolumes) toCore() ledger.PostCommitVolumes { - ret := ledger.PostCommitVolumes{} - for _, volumes := range p { - if _, ok := ret[volumes.Account]; !ok { - ret[volumes.Account] = map[string]ledger.Volumes{} - } - if v, ok := ret[volumes.Account][volumes.Asset]; !ok { - ret[volumes.Account][volumes.Asset] = ledger.Volumes{ - Input: volumes.Input, - Output: volumes.Output, - } - } else { - v.Input = v.Input.Add(v.Input, volumes.Input) - v.Output = v.Output.Add(v.Output, volumes.Output) - - ret[volumes.Account][volumes.Asset] = v - } - - } - return ret -} - func (s *Store) selectDistinctTransactionMetadataHistories(date *time.Time) *bun.SelectQuery { ret := s.db.NewSelect(). DistinctOn("transactions_seq"). @@ -118,6 +89,7 @@ func (s *Store) selectTransactions(date *time.Time, expandVolumes, expandEffecti if expandVolumes && !s.ledger.HasFeature(ledger.FeaturePostCommitVolumes, "SYNC") { return ret.Err(ledgercontroller.NewErrMissingFeature(ledger.FeaturePostCommitVolumes)) } + if expandEffectiveVolumes && !s.ledger.HasFeature(ledger.FeaturePostCommitEffectiveVolumes, "SYNC") { return ret.Err(ledgercontroller.NewErrMissingFeature(ledger.FeaturePostCommitEffectiveVolumes)) } @@ -176,6 +148,7 @@ func (s *Store) selectTransactions(date *time.Time, expandVolumes, expandEffecti "sources_arrays", "destinations_arrays", "reverted_at", + "post_commit_volumes", ). Where("ledger = ?", s.ledger.Name) @@ -194,28 +167,45 @@ func (s *Store) selectTransactions(date *time.Time, expandVolumes, expandEffecti ret = ret.ColumnExpr("metadata") } - if s.ledger.HasFeature(ledger.FeaturePostCommitVolumes, "SYNC") && expandVolumes { - ret = ret. - Join( - `join (?) pcv on pcv.transactions_seq = transactions.seq`, - s.db.NewSelect(). - TableExpr(s.GetPrefixedRelationName("moves")). - Column("transactions_seq"). - ColumnExpr(`to_json(array_agg(json_build_object('account', moves.account_address, 'asset', moves.asset, 'input', (moves.post_commit_volumes).input, 'output', (moves.post_commit_volumes).output))) as post_commit_volumes`). - Group("transactions_seq"), - ). - ColumnExpr("pcv.*") - } + /** + select transactions_seq, jsonb_merge_agg(pcev::jsonb) as post_commit_effective_volumes + from ( + SELECT + distinct on (transactions_seq, account_address, asset) + "transactions_seq", + json_build_object( + moves.account_address, + json_build_object( + moves.asset, + first_value(moves.post_commit_effective_volumes) over (partition by (transactions_seq, account_address, asset) order by seq desc))) as pcev + FROM moves + ) data + group by transactions_seq; + */ if s.ledger.HasFeature(ledger.FeaturePostCommitEffectiveVolumes, "SYNC") && expandEffectiveVolumes { ret = ret. Join( `join (?) pcev on pcev.transactions_seq = transactions.seq`, - // todo: need to take only the last move for each account/asset s.db.NewSelect(). - TableExpr(s.GetPrefixedRelationName("moves")). Column("transactions_seq"). - ColumnExpr(`to_json(array_agg(json_build_object('account', moves.account_address, 'asset', moves.asset, 'input', (moves.post_commit_effective_volumes).input, 'output', (moves.post_commit_effective_volumes).output))) as post_commit_effective_volumes`). + ColumnExpr("jsonb_merge_agg(pcev::jsonb) as post_commit_effective_volumes"). + TableExpr( + "(?) data", + s.db.NewSelect(). + DistinctOn("transactions_seq, account_address, asset"). + ModelTableExpr(s.GetPrefixedRelationName("moves")). + Column("transactions_seq"). + ColumnExpr(` + json_build_object( + moves.account_address, + json_build_object( + moves.asset, + first_value(moves.post_commit_effective_volumes) over (partition by (transactions_seq, account_address, asset) order by seq desc) + ) + ) as pcev + `), + ). Group("transactions_seq"), ). ColumnExpr("pcev.*") @@ -299,31 +289,6 @@ func (s *Store) selectTransactions(date *time.Time, expandVolumes, expandEffecti func (s *Store) CommitTransaction(ctx context.Context, tx *ledger.Transaction) error { - sources := Map(tx.Postings, ledger.Posting.GetSource) - destinations := Map(tx.Postings, ledger.Posting.GetDestination) - mappedTx := &Transaction{ - Ledger: s.ledger.Name, - Postings: tx.Postings, - Metadata: tx.Metadata, - Timestamp: func() *time.Time { - if tx.Timestamp.IsZero() { - return nil - } - return &tx.Timestamp - }(), - Reference: tx.Reference, - InsertedAt: func() *time.Time { - if tx.InsertedAt.IsZero() { - return nil - } - return &tx.InsertedAt - }(), - Sources: sources, - Destinations: destinations, - SourcesArray: Map(sources, convertAddrToIndexedJSONB), - DestinationsArray: Map(destinations, convertAddrToIndexedJSONB), - } - sqlQueries := Map(tx.InvolvedAccounts(), func(from string) string { return fmt.Sprintf("select pg_advisory_xact_lock(hashtext('%s'))", fmt.Sprintf("%s_%s", s.ledger.Name, from)) }) @@ -333,20 +298,15 @@ func (s *Store) CommitTransaction(ctx context.Context, tx *ledger.Transaction) e return postgres.ResolveError(err) } - err = s.insertTransaction(ctx, mappedTx) - if err != nil { - return errors.Wrap(err, "failed to insert transaction") - } - accounts := map[string]Account{} for _, address := range tx.InvolvedAccounts() { account := Account{ Ledger: s.ledger.Name, AddressArray: strings.Split(address, ":"), Address: address, - FirstUsage: *mappedTx.Timestamp, - InsertionDate: *mappedTx.InsertedAt, - UpdatedAt: *mappedTx.InsertedAt, + FirstUsage: tx.Timestamp, + InsertionDate: tx.InsertedAt, + UpdatedAt: tx.InsertedAt, Metadata: make(metadata.Metadata), } _, err := s.upsertAccount(ctx, &account) @@ -357,9 +317,32 @@ func (s *Store) CommitTransaction(ctx context.Context, tx *ledger.Transaction) e accounts[address] = account } - // notes(gfyrag): keep upserting of accounts separated as some account can be created (and locked), - // while some other will not (the underlying SAVEPOINT of the storage is ROLLBACK if no rows are touched) - // maybe it could be handled by the storage + updatedVolumes, err := s.updateVolumes(ctx, volumeUpdates(s.ledger.Name, tx, accounts)...) + if err != nil { + return errors.Wrap(err, "failed to update balances") + } + + sources := Map(tx.Postings, ledger.Posting.GetSource) + destinations := Map(tx.Postings, ledger.Posting.GetDestination) + mappedTx := &Transaction{ + Ledger: s.ledger.Name, + Postings: tx.Postings, + Metadata: tx.Metadata, + Timestamp: tx.Timestamp, + Reference: tx.Reference, + InsertedAt: tx.InsertedAt, + Sources: sources, + Destinations: destinations, + SourcesArray: Map(sources, convertAddrToIndexedJSONB), + DestinationsArray: Map(destinations, convertAddrToIndexedJSONB), + PostCommitVolumes: updatedVolumes, + } + + err = s.insertTransaction(ctx, mappedTx) + if err != nil { + return errors.Wrap(err, "failed to insert transaction") + } + moves := Moves{} for _, p := range tx.Postings { moves = append(moves, []*Move{ @@ -370,8 +353,8 @@ func (s *Store) CommitTransaction(ctx context.Context, tx *ledger.Transaction) e AccountAddressArray: strings.Split(p.Source, ":"), Amount: (*bunpaginate.BigInt)(p.Amount), Asset: p.Asset, - InsertionDate: *mappedTx.InsertedAt, - EffectiveDate: *mappedTx.Timestamp, + InsertionDate: tx.InsertedAt, + EffectiveDate: tx.Timestamp, TransactionSeq: mappedTx.Seq, AccountSeq: accounts[p.Source].Seq, }, @@ -381,8 +364,8 @@ func (s *Store) CommitTransaction(ctx context.Context, tx *ledger.Transaction) e AccountAddressArray: strings.Split(p.Destination, ":"), Amount: (*bunpaginate.BigInt)(p.Amount), Asset: p.Asset, - InsertionDate: *mappedTx.InsertedAt, - EffectiveDate: *mappedTx.Timestamp, + InsertionDate: tx.InsertedAt, + EffectiveDate: tx.Timestamp, TransactionSeq: mappedTx.Seq, AccountSeq: accounts[p.Destination].Seq, }, @@ -393,19 +376,13 @@ func (s *Store) CommitTransaction(ctx context.Context, tx *ledger.Transaction) e return errors.Wrap(err, "failed to insert moves") } - _, err = s.updateVolumes(ctx, moves.volumeUpdates()...) - if err != nil { - return errors.Wrap(err, "failed to update balances") - } - tx.ID = mappedTx.ID - tx.InsertedAt = *mappedTx.InsertedAt - tx.Timestamp = *mappedTx.Timestamp - if s.ledger.HasFeature(ledger.FeaturePostCommitVolumes, "SYNC") { - tx.PostCommitVolumes = moves.ComputePostCommitVolumes().toCore() - } + tx.PostCommitVolumes = updatedVolumes + tx.Timestamp = mappedTx.Timestamp + tx.InsertedAt = mappedTx.InsertedAt + if s.ledger.HasFeature(ledger.FeaturePostCommitEffectiveVolumes, "SYNC") { - tx.PostCommitEffectiveVolumes = moves.ComputePostCommitEffectiveVolumes().toCore() + tx.PostCommitEffectiveVolumes = moves.ComputePostCommitEffectiveVolumes() } return nil @@ -470,7 +447,7 @@ func (s *Store) insertTransaction(ctx context.Context, tx *Transaction) error { Model(tx). ModelTableExpr(s.GetPrefixedRelationName("transactions")). Value("id", "nextval(?)", s.GetPrefixedRelationName(fmt.Sprintf(`"transaction_id_%d"`, s.ledger.ID))). - Returning("*"). + Returning("id, seq, timestamp, inserted_at"). Exec(ctx) if err != nil { err = postgres.ResolveError(err) @@ -651,3 +628,42 @@ func filterAccountAddressOnTransactions(address string, source, destination bool return strings.Join(parts, " or ") } } + +func volumeUpdates(l string, transaction *ledger.Transaction, accounts map[string]Account) []AccountsVolumes { + aggregatedVolumes := make(map[string]map[string][]ledger.Posting) + for _, posting := range transaction.Postings { + if _, ok := aggregatedVolumes[posting.Source]; !ok { + aggregatedVolumes[posting.Source] = make(map[string][]ledger.Posting) + } + aggregatedVolumes[posting.Source][posting.Asset] = append(aggregatedVolumes[posting.Source][posting.Asset], posting) + + if _, ok := aggregatedVolumes[posting.Destination]; !ok { + aggregatedVolumes[posting.Destination] = make(map[string][]ledger.Posting) + } + aggregatedVolumes[posting.Destination][posting.Asset] = append(aggregatedVolumes[posting.Destination][posting.Asset], posting) + } + + ret := make([]AccountsVolumes, 0) + for account, movesByAsset := range aggregatedVolumes { + for asset, postings := range movesByAsset { + volumes := ledger.NewEmptyVolumes() + for _, posting := range postings { + if account == posting.Source { + volumes.Output.Add(volumes.Output, posting.Amount) + } else { + volumes.Input.Add(volumes.Input, posting.Amount) + } + } + ret = append(ret, AccountsVolumes{ + Ledger: l, + Account: account, + Asset: asset, + Input: volumes.Input, + Output: volumes.Output, + AccountsSeq: accounts[account].Seq, + }) + } + } + + return ret +} diff --git a/internal/storage/ledger/transactions_test.go b/internal/storage/ledger/transactions_test.go index d8a031e1d..2e2f21795 100644 --- a/internal/storage/ledger/transactions_test.go +++ b/internal/storage/ledger/transactions_test.go @@ -212,19 +212,17 @@ func TestTransactionsCommit(t *testing.T) { ctx := logging.TestingContext() - t.Run("inserting a transaction without timestamp should generate one", func(t *testing.T) { + t.Run("inserting some transactions", func(t *testing.T) { t.Parallel() store := newLedgerStore(t) - tx := ledger.NewTransaction().WithPostings( + tx1 := ledger.NewTransaction().WithPostings( ledger.NewPosting("account:1", "account:2", "USD", big.NewInt(100)), ) - err := store.CommitTransaction(ctx, &tx) + err := store.CommitTransaction(ctx, &tx1) require.NoError(t, err) - require.NotZero(t, tx.Timestamp) - require.NotZero(t, tx.InsertedAt) - require.Equal(t, 1, tx.ID) + require.Equal(t, 1, tx1.ID) require.Equal(t, ledger.PostCommitVolumes{ "account:1": ledger.VolumesByAssets{ "USD": ledger.Volumes{ @@ -238,8 +236,30 @@ func TestTransactionsCommit(t *testing.T) { Output: big.NewInt(0), }, }, - }, tx.PostCommitVolumes) - require.Equal(t, tx.PostCommitVolumes, tx.PostCommitEffectiveVolumes) + }, tx1.PostCommitVolumes) + require.Equal(t, tx1.PostCommitVolumes, tx1.PostCommitEffectiveVolumes) + + tx2 := ledger.NewTransaction().WithPostings( + ledger.NewPosting("account:2", "account:3", "USD", big.NewInt(100)), + ) + err = store.CommitTransaction(ctx, &tx2) + require.NoError(t, err) + require.Equal(t, 2, tx2.ID) + require.Equal(t, ledger.PostCommitVolumes{ + "account:2": ledger.VolumesByAssets{ + "USD": ledger.Volumes{ + Input: big.NewInt(100), + Output: big.NewInt(100), + }, + }, + "account:3": ledger.VolumesByAssets{ + "USD": ledger.Volumes{ + Input: big.NewInt(100), + Output: big.NewInt(0), + }, + }, + }, tx2.PostCommitVolumes) + require.Equal(t, tx2.PostCommitVolumes, tx2.PostCommitEffectiveVolumes) }) t.Run("triggering a deadlock should return appropriate postgres error", func(t *testing.T) { @@ -453,8 +473,8 @@ func TestTransactionsRevert(t *testing.T) { require.NotNil(t, revertedTx) require.True(t, revertedTx.Reverted) revertedTx.Reverted = false - tx1.PostCommitEffectiveVolumes = ledger.PostCommitVolumes{} - tx1.PostCommitVolumes = ledger.PostCommitVolumes{} + tx1.PostCommitEffectiveVolumes = nil + //tx1.PostCommitVolumes = ledger.PostCommitVolumes{} require.Equal(t, tx1, *revertedTx) // try to revert again @@ -479,7 +499,7 @@ func TestTransactionsInsert(t *testing.T) { // create a simple tx tx1 := Transaction{ Ledger: store.ledger.Name, - Timestamp: &now, + Timestamp: now, Reference: "foo", } err := store.insertTransaction(ctx, &tx1) @@ -490,7 +510,7 @@ func TestTransactionsInsert(t *testing.T) { // create another tx with the same reference tx2 := Transaction{ Ledger: store.ledger.Name, - Timestamp: &now, + Timestamp: now, Reference: "foo", } err = store.insertTransaction(ctx, &tx2) @@ -503,9 +523,6 @@ func TestTransactionsInsert(t *testing.T) { } err = store.insertTransaction(ctx, &tx3) require.NoError(t, err) - - // timestamp should be filled by the database - require.NotZero(t, tx3.Timestamp) } func TestTransactionsList(t *testing.T) { diff --git a/internal/storage/ledger/volumes.go b/internal/storage/ledger/volumes.go index feb0f0156..40f365dad 100644 --- a/internal/storage/ledger/volumes.go +++ b/internal/storage/ledger/volumes.go @@ -27,8 +27,8 @@ type AccountsVolumes struct { AccountsSeq int `bun:"accounts_seq,type:int"` } -func (s *Store) updateVolumes(ctx context.Context, accountVolumes ...AccountsVolumes) (map[string]map[string]ledger.Volumes, error) { - return tracing.TraceWithLatency(ctx, "UpdateBalances", func(ctx context.Context) (map[string]map[string]ledger.Volumes, error) { +func (s *Store) updateVolumes(ctx context.Context, accountVolumes ...AccountsVolumes) (ledger.PostCommitVolumes, error) { + return tracing.TraceWithLatency(ctx, "UpdateBalances", func(ctx context.Context) (ledger.PostCommitVolumes, error) { _, err := s.db.NewInsert(). Model(&accountVolumes). @@ -42,7 +42,7 @@ func (s *Store) updateVolumes(ctx context.Context, accountVolumes ...AccountsVol return nil, postgres.ResolveError(err) } - ret := make(map[string]map[string]ledger.Volumes) + ret := ledger.PostCommitVolumes{} for _, volumes := range accountVolumes { if _, ok := ret[volumes.Account]; !ok { ret[volumes.Account] = map[string]ledger.Volumes{} diff --git a/internal/storage/system/migrations.go b/internal/storage/system/migrations.go index 3bffb9499..b1b6db6fe 100644 --- a/internal/storage/system/migrations.go +++ b/internal/storage/system/migrations.go @@ -117,6 +117,13 @@ func getMigrator() *migrations.Migrator { return err }, }, + migrations.Migration{ + Name: "Add jsonb_merge_agg pg aggregator", + UpWithContext: func(ctx context.Context, tx bun.Tx) error { + _, err := tx.ExecContext(ctx, jsonbMerge) + return err + }, + }, ) return migrator @@ -191,3 +198,19 @@ BEGIN END; $$ LANGUAGE plpgsql; ` + +const jsonbMerge = ` +create or replace function public.jsonb_concat(a jsonb, b jsonb) returns jsonb + as 'select $1 || $2' + language sql + immutable + parallel safe +; + +create or replace aggregate public.jsonb_merge_agg(jsonb) +( + sfunc = public.jsonb_concat, + stype = jsonb, + initcond = '{}' +); +` diff --git a/internal/transaction.go b/internal/transaction.go index 3e0d8b2a5..932f01306 100644 --- a/internal/transaction.go +++ b/internal/transaction.go @@ -31,8 +31,12 @@ func (data TransactionData) WithPostings(postings ...Posting) TransactionData { } func NewTransactionData() TransactionData { + now := time.Now() return TransactionData{ - Metadata: metadata.Metadata{}, + Metadata: metadata.Metadata{}, + // todo: should be defined by the database ? + InsertedAt: now, + Timestamp: now, } } diff --git a/internal/transaction_test.go b/internal/transaction_test.go index 566e3d920..350eb6b9b 100644 --- a/internal/transaction_test.go +++ b/internal/transaction_test.go @@ -21,5 +21,7 @@ func TestReverseTransaction(t *testing.T) { ). WithTimestamp(tx.Timestamp) - require.Equal(t, expected, tx.Reverse(true)) + reversed := tx.Reverse(true) + reversed.InsertedAt = tx.InsertedAt + require.Equal(t, expected, reversed) } diff --git a/internal/volumes.go b/internal/volumes.go index e9412f684..286d4cbfe 100644 --- a/internal/volumes.go +++ b/internal/volumes.go @@ -135,3 +135,20 @@ func (a PostCommitVolumes) Copy() PostCommitVolumes { } return ret } + +func (a PostCommitVolumes) Merge(volumes PostCommitVolumes) PostCommitVolumes { + for account, volumesByAssets := range volumes { + if _, ok := a[account]; !ok { + a[account] = map[string]Volumes{} + } + for asset, volumes := range volumesByAssets { + if _, ok := a[account][asset]; !ok { + a[account][asset] = NewEmptyVolumes() + } + a[account][asset].Input.Add(a[account][asset].Input, volumes.Input) + a[account][asset].Output.Add(a[account][asset].Output, volumes.Output) + } + } + + return a +}