From f606b884d884493928046cd3ac1eee7fcf69f4a6 Mon Sep 17 00:00:00 2001 From: Geoffrey Ragot Date: Thu, 26 Sep 2024 00:54:00 +0200 Subject: [PATCH] feat: remove transaction sequences --- .../bucket/migrations/11-stateless.sql | 90 +++++++++++++++---- internal/storage/ledger/moves.go | 2 +- internal/storage/ledger/moves_test.go | 5 -- internal/storage/ledger/transactions.go | 29 +++--- internal/storage/ledger/transactions_test.go | 16 +++- 5 files changed, 100 insertions(+), 42 deletions(-) diff --git a/internal/storage/bucket/migrations/11-stateless.sql b/internal/storage/bucket/migrations/11-stateless.sql index cf0d535d4..51d8087d7 100644 --- a/internal/storage/bucket/migrations/11-stateless.sql +++ b/internal/storage/bucket/migrations/11-stateless.sql @@ -7,13 +7,13 @@ alter table "{{.Bucket}}".transactions add column inserted_at timestamp without time zone default (now() at time zone 'utc'); +-- todo: check if still required alter table "{{.Bucket}}".transactions alter column timestamp set default (now() at time zone 'utc'); alter table "{{.Bucket}}".transactions -alter column id -type bigint; +add column post_commit_volumes jsonb not null; alter table "{{.Bucket}}".moves add column post_commit_volumes_jsonb jsonb; @@ -21,18 +21,11 @@ add column post_commit_volumes_jsonb jsonb; alter table "{{.Bucket}}".moves add column post_commit_effective_volumes_jsonb jsonb; --- todo: add migration --- update "{{.Bucket}}".moves --- set post_commit_volumes_jsonb = json_build_object( --- 'input', ((moves.post_commit_volumes).inputs), --- 'output', ((moves.post_commit_volumes).outputs) --- ); --- --- update "{{.Bucket}}".moves --- set post_commit_effective_volumes_jsonb = json_build_object( --- 'input', ((moves.post_commit_effective_volumes).inputs), --- 'output', ((moves.post_commit_effective_volumes).outputs) --- ); +alter table "{{.Bucket}}".moves +drop column transactions_seq; + +alter table "{{.Bucket}}".moves +add column transactions_id bigint not null ; alter table "{{.Bucket}}".moves drop column post_commit_volumes; @@ -46,24 +39,54 @@ rename post_commit_volumes_jsonb to post_commit_volumes; alter table "{{.Bucket}}".moves rename post_commit_effective_volumes_jsonb to post_commit_effective_volumes; -alter table "{{.Bucket}}".transactions -add column post_commit_volumes jsonb not null; - alter table "{{.Bucket}}".moves alter column post_commit_volumes drop not null, alter column post_commit_effective_volumes drop not null; +alter table "{{.Bucket}}".transactions_metadata +drop column transactions_seq; + +alter table "{{.Bucket}}".transactions_metadata +add column transactions_id bigint; + +alter table "{{.Bucket}}".transactions +alter column id +type bigint; + +alter table "{{.Bucket}}".transactions +drop column seq; + alter table "{{.Bucket}}".logs alter column hash drop not null; -- Change from jsonb to json to keep keys order and ensure consistent hashing +-- todo: check if still required alter table "{{.Bucket}}".logs alter column data type json; +--drop index transactions_metadata_ledger; +--drop index transactions_metadata_revisions; + +create unique index transactions_metadata_ledger on "{{.Bucket}}".transactions_metadata (ledger, transactions_id, revision); +create index transactions_metadata_revisions on "{{.Bucket}}".transactions_metadata(transactions_id asc, revision desc) include (metadata, date); + +-- todo: add migration +-- update "{{.Bucket}}".moves +-- set post_commit_volumes_jsonb = json_build_object( +-- 'input', ((moves.post_commit_volumes).inputs), +-- 'output', ((moves.post_commit_volumes).outputs) +-- ); +-- +-- update "{{.Bucket}}".moves +-- set post_commit_effective_volumes_jsonb = json_build_object( +-- 'input', ((moves.post_commit_effective_volumes).inputs), +-- 'output', ((moves.post_commit_effective_volumes).outputs) +-- ); + create table "{{.Bucket}}".accounts_volumes ( ledger varchar not null, accounts_seq int not null, @@ -230,4 +253,37 @@ begin return new; end; +$$; + + +create or replace function "{{.Bucket}}".update_transaction_metadata_history() returns trigger + security definer + language plpgsql +as +$$ +begin + insert into "{{.Bucket}}".transactions_metadata (ledger, transactions_id, revision, date, metadata) + values (new.ledger, new.id, ( + select revision + 1 + from "{{.Bucket}}".transactions_metadata + where transactions_metadata.transactions_id = new.id and transactions_metadata.ledger = new.ledger + order by revision desc + limit 1 + ), new.updated_at, new.metadata); + + return new; +end; +$$; + +create or replace function "{{.Bucket}}".insert_transaction_metadata_history() returns trigger + security definer + language plpgsql +as +$$ +begin + insert into "{{.Bucket}}".transactions_metadata (ledger, transactions_id, revision, date, metadata) + values (new.ledger, new.id, 1, new.timestamp, new.metadata); + + return new; +end; $$; \ No newline at end of file diff --git a/internal/storage/ledger/moves.go b/internal/storage/ledger/moves.go index 70d66321e..03e54577c 100644 --- a/internal/storage/ledger/moves.go +++ b/internal/storage/ledger/moves.go @@ -78,12 +78,12 @@ type Move struct { bun.BaseModel `bun:"table:moves"` Ledger string `bun:"ledger,type:varchar"` + TransactionID int `bun:"transactions_id,type:bigint"` IsSource bool `bun:"is_source,type:bool"` Account string `bun:"account_address,type:varchar"` AccountAddressArray []string `bun:"account_address_array,type:jsonb"` Amount *bunpaginate.BigInt `bun:"amount,type:numeric"` Asset string `bun:"asset,type:varchar"` - TransactionSeq int `bun:"transactions_seq,type:int"` AccountSeq int `bun:"accounts_seq,type:int"` InsertionDate time.Time `bun:"insertion_date,type:timestamp"` EffectiveDate time.Time `bun:"effective_date,type:timestamp"` diff --git a/internal/storage/ledger/moves_test.go b/internal/storage/ledger/moves_test.go index c9dac5b5e..160d3c967 100644 --- a/internal/storage/ledger/moves_test.go +++ b/internal/storage/ledger/moves_test.go @@ -54,7 +54,6 @@ func TestMovesInsert(t *testing.T) { AccountAddressArray: []string{"world"}, Amount: (*bunpaginate.BigInt)(big.NewInt(100)), Asset: "USD", - TransactionSeq: tx.Seq, AccountSeq: account.Seq, InsertionDate: t0, EffectiveDate: t0, @@ -74,7 +73,6 @@ func TestMovesInsert(t *testing.T) { AccountAddressArray: []string{"world"}, Amount: (*bunpaginate.BigInt)(big.NewInt(50)), Asset: "USD", - TransactionSeq: tx.Seq, AccountSeq: account.Seq, InsertionDate: t3, EffectiveDate: t3, @@ -94,7 +92,6 @@ func TestMovesInsert(t *testing.T) { AccountAddressArray: []string{"world"}, Amount: (*bunpaginate.BigInt)(big.NewInt(200)), Asset: "USD", - TransactionSeq: tx.Seq, AccountSeq: account.Seq, InsertionDate: t1, EffectiveDate: t1, @@ -114,7 +111,6 @@ func TestMovesInsert(t *testing.T) { AccountAddressArray: []string{"world"}, Amount: (*bunpaginate.BigInt)(big.NewInt(50)), Asset: "USD", - TransactionSeq: tx.Seq, AccountSeq: account.Seq, InsertionDate: t2, EffectiveDate: t2, @@ -134,7 +130,6 @@ func TestMovesInsert(t *testing.T) { AccountAddressArray: []string{"world"}, Amount: (*bunpaginate.BigInt)(big.NewInt(50)), Asset: "USD", - TransactionSeq: tx.Seq, AccountSeq: account.Seq, InsertionDate: t4, EffectiveDate: t4, diff --git a/internal/storage/ledger/transactions.go b/internal/storage/ledger/transactions.go index 098ed0edb..db18355f9 100644 --- a/internal/storage/ledger/transactions.go +++ b/internal/storage/ledger/transactions.go @@ -39,7 +39,6 @@ type Transaction struct { Ledger string `bun:"ledger,type:varchar"` ID int `bun:"id,type:numeric"` - Seq int `bun:"seq,scanonly"` Timestamp time.Time `bun:"timestamp,type:timestamp without time zone,nullzero"` Reference string `bun:"reference,type:varchar,unique,nullzero"` Postings []ledger.Posting `bun:"postings,type:jsonb"` @@ -72,11 +71,11 @@ func (t Transaction) toCore() ledger.Transaction { func (s *Store) selectDistinctTransactionMetadataHistories(date *time.Time) *bun.SelectQuery { ret := s.db.NewSelect(). - DistinctOn("transactions_seq"). + DistinctOn("transactions_id"). ModelTableExpr(s.GetPrefixedRelationName("transactions_metadata")). Where("ledger = ?", s.ledger.Name). - Column("transactions_seq", "metadata"). - Order("transactions_seq", "revision desc") + Column("transactions_id", "metadata"). + Order("transactions_id", "revision desc") if date != nil && !date.IsZero() { ret = ret.Where("date <= ?", date) @@ -133,7 +132,6 @@ func (s *Store) selectTransactions(date *time.Time, expandVolumes, expandEffecti ret = ret. ModelTableExpr(s.GetPrefixedRelationName("transactions")). Column( - "seq", "ledger", "id", "timestamp", @@ -157,7 +155,7 @@ func (s *Store) selectTransactions(date *time.Time, expandVolumes, expandEffecti if s.ledger.HasFeature(ledger.FeatureAccountMetadataHistory, "SYNC") && date != nil && !date.IsZero() { ret = ret. Join( - `left join (?) transactions_metadata on transactions_metadata.transactions_seq = transactions.seq`, + `left join (?) transactions_metadata on transactions_metadata.transactions_id = transactions.id`, s.selectDistinctTransactionMetadataHistories(date), ). ColumnExpr("coalesce(transactions_metadata.metadata, '{}'::jsonb) as metadata") @@ -168,28 +166,28 @@ func (s *Store) selectTransactions(date *time.Time, expandVolumes, expandEffecti if s.ledger.HasFeature(ledger.FeatureMovesHistoryPostCommitEffectiveVolumes, "SYNC") && expandEffectiveVolumes { ret = ret. Join( - `join (?) pcev on pcev.transactions_seq = transactions.seq`, + `join (?) pcev on pcev.transactions_id = transactions.id`, s.db.NewSelect(). - Column("transactions_seq"). + Column("transactions_id"). ColumnExpr("jsonb_merge_agg(pcev::jsonb) as post_commit_effective_volumes"). TableExpr( "(?) data", s.db.NewSelect(). - DistinctOn("transactions_seq, account_address, asset"). + DistinctOn("transactions_id, account_address, asset"). ModelTableExpr(s.GetPrefixedRelationName("moves")). - Column("transactions_seq"). + Column("transactions_id"). // use strings.Replace for logs ColumnExpr(strings.Replace(` json_build_object( moves.account_address, json_build_object( moves.asset, - first_value(moves.post_commit_effective_volumes) over (partition by (transactions_seq, account_address, asset) order by seq desc) + first_value(moves.post_commit_effective_volumes) over (partition by (transactions_id, account_address, asset) order by seq desc) ) ) as pcev `, "\n", "", -1)), ). - Group("transactions_seq"), + Group("transactions_id"), ). ColumnExpr("pcev.*") } @@ -198,7 +196,6 @@ func (s *Store) selectTransactions(date *time.Time, expandVolumes, expandEffecti ret = s.db.NewSelect(). ModelTableExpr("(?) transactions", ret). Column( - "seq", "ledger", "id", "timestamp", @@ -350,9 +347,9 @@ func (s *Store) CommitTransaction(ctx context.Context, tx *ledger.Transaction) e Asset: posting.Asset, InsertionDate: insertionDate, EffectiveDate: tx.Timestamp, - TransactionSeq: mappedTx.Seq, AccountSeq: accounts[posting.Destination].Seq, PostCommitVolumes: pointer.For(postCommitVolumes[posting.Destination][posting.Asset].Copy()), + TransactionID: tx.ID, }) postCommitVolumes.AddInput(posting.Destination, posting.Asset, new(big.Int).Neg(posting.Amount)) @@ -365,9 +362,9 @@ func (s *Store) CommitTransaction(ctx context.Context, tx *ledger.Transaction) e Asset: posting.Asset, InsertionDate: insertionDate, EffectiveDate: tx.Timestamp, - TransactionSeq: mappedTx.Seq, AccountSeq: accounts[posting.Source].Seq, PostCommitVolumes: pointer.For(postCommitVolumes[posting.Source][posting.Asset].Copy()), + TransactionID: tx.ID, }) postCommitVolumes.AddOutput(posting.Source, posting.Asset, new(big.Int).Neg(posting.Amount)) } @@ -445,7 +442,7 @@ func (s *Store) insertTransaction(ctx context.Context, tx *Transaction) error { Model(tx). ModelTableExpr(s.GetPrefixedRelationName("transactions")). Value("id", "nextval(?)", s.GetPrefixedRelationName(fmt.Sprintf(`"transaction_id_%d"`, s.ledger.ID))). - Returning("id, seq, timestamp, inserted_at"). + Returning("id, timestamp, inserted_at"). Exec(ctx) if err != nil { err = postgres.ResolveError(err) diff --git a/internal/storage/ledger/transactions_test.go b/internal/storage/ledger/transactions_test.go index d45b40841..a4bbb09fb 100644 --- a/internal/storage/ledger/transactions_test.go +++ b/internal/storage/ledger/transactions_test.go @@ -6,9 +6,9 @@ import ( "context" "database/sql" "fmt" - "github.com/davecgh/go-spew/spew" "github.com/formancehq/go-libs/platform/postgres" ledgercontroller "github.com/formancehq/ledger/internal/controller/ledger" + "github.com/shomali11/xsql" "math/big" "testing" @@ -145,6 +145,18 @@ func TestTransactionUpdateMetadata(t *testing.T) { _, _, err = store.UpdateTransactionMetadata(ctx, tx2.ID, metadata.Metadata{"foo2": "bar2"}) require.NoError(t, err) + rows, err := store.GetDB().NewSelect().ModelTableExpr(store.GetPrefixedRelationName("transactions")).Rows(ctx) + require.NoError(t, err) + + data, _ := xsql.Pretty(rows) + fmt.Println(data) + + rows, err = store.GetDB().NewSelect().ModelTableExpr(store.GetPrefixedRelationName("moves")).Rows(ctx) + require.NoError(t, err) + + data, _ = xsql.Pretty(rows) + fmt.Println(data) + // check that the database returns metadata tx, err := store.GetTransaction(ctx, ledgercontroller.NewGetTransactionQuery(tx1.ID).WithExpandVolumes().WithExpandEffectiveVolumes()) require.NoError(t, err, "getting transaction should not fail") @@ -280,7 +292,6 @@ func TestTransactionsCommit(t *testing.T) { }, }, }, tx3.PostCommitVolumes) - spew.Dump(tx3) require.Equal(t, tx3.PostCommitVolumes, tx3.PostCommitEffectiveVolumes) }) @@ -527,7 +538,6 @@ func TestTransactionsInsert(t *testing.T) { err := store.insertTransaction(ctx, &tx1) require.NoError(t, err) require.NotZero(t, tx1.ID) - require.NotZero(t, tx1.Seq) // create another tx with the same reference tx2 := Transaction{