Skip to content

Commit

Permalink
feat: remove transaction sequences
Browse files Browse the repository at this point in the history
  • Loading branch information
gfyrag committed Oct 23, 2024
1 parent f680ac6 commit cafb3dc
Show file tree
Hide file tree
Showing 5 changed files with 100 additions and 42 deletions.
90 changes: 73 additions & 17 deletions internal/storage/bucket/migrations/11-stateless.sql
Original file line number Diff line number Diff line change
Expand Up @@ -7,32 +7,25 @@ alter table "{{.Bucket}}".transactions
add column inserted_at timestamp without time zone
default (now() at time zone 'utc');

-- todo: check if still required
alter table "{{.Bucket}}".transactions
alter column timestamp
set default (now() at time zone 'utc');

alter table "{{.Bucket}}".transactions
alter column id
type bigint;
add column post_commit_volumes jsonb not null;

alter table "{{.Bucket}}".moves
add column post_commit_volumes_jsonb jsonb;

alter table "{{.Bucket}}".moves
add column post_commit_effective_volumes_jsonb jsonb;

-- todo: add migration
-- update "{{.Bucket}}".moves
-- set post_commit_volumes_jsonb = json_build_object(
-- 'input', ((moves.post_commit_volumes).inputs),
-- 'output', ((moves.post_commit_volumes).outputs)
-- );
--
-- update "{{.Bucket}}".moves
-- set post_commit_effective_volumes_jsonb = json_build_object(
-- 'input', ((moves.post_commit_effective_volumes).inputs),
-- 'output', ((moves.post_commit_effective_volumes).outputs)
-- );
alter table "{{.Bucket}}".moves
drop column transactions_seq;

alter table "{{.Bucket}}".moves
add column transactions_id bigint not null ;

alter table "{{.Bucket}}".moves
drop column post_commit_volumes;
Expand All @@ -46,24 +39,54 @@ rename post_commit_volumes_jsonb to post_commit_volumes;
alter table "{{.Bucket}}".moves
rename post_commit_effective_volumes_jsonb to post_commit_effective_volumes;

alter table "{{.Bucket}}".transactions
add column post_commit_volumes jsonb not null;

alter table "{{.Bucket}}".moves
alter column post_commit_volumes
drop not null,
alter column post_commit_effective_volumes
drop not null;

alter table "{{.Bucket}}".transactions_metadata
drop column transactions_seq;

alter table "{{.Bucket}}".transactions_metadata
add column transactions_id bigint;

alter table "{{.Bucket}}".transactions
alter column id
type bigint;

alter table "{{.Bucket}}".transactions
drop column seq;

alter table "{{.Bucket}}".logs
alter column hash
drop not null;

-- Change from jsonb to json to keep keys order and ensure consistent hashing
-- todo: check if still required
alter table "{{.Bucket}}".logs
alter column data
type json;

--drop index transactions_metadata_ledger;
--drop index transactions_metadata_revisions;

create unique index transactions_metadata_ledger on "{{.Bucket}}".transactions_metadata (ledger, transactions_id, revision);
create index transactions_metadata_revisions on "{{.Bucket}}".transactions_metadata(transactions_id asc, revision desc) include (metadata, date);

-- todo: add migration
-- update "{{.Bucket}}".moves
-- set post_commit_volumes_jsonb = json_build_object(
-- 'input', ((moves.post_commit_volumes).inputs),
-- 'output', ((moves.post_commit_volumes).outputs)
-- );
--
-- update "{{.Bucket}}".moves
-- set post_commit_effective_volumes_jsonb = json_build_object(
-- 'input', ((moves.post_commit_effective_volumes).inputs),
-- 'output', ((moves.post_commit_effective_volumes).outputs)
-- );

create table "{{.Bucket}}".accounts_volumes (
ledger varchar not null,
accounts_seq int not null,
Expand Down Expand Up @@ -230,4 +253,37 @@ begin

return new;
end;
$$;


create or replace function "{{.Bucket}}".update_transaction_metadata_history() returns trigger
security definer
language plpgsql
as
$$
begin
insert into "{{.Bucket}}".transactions_metadata (ledger, transactions_id, revision, date, metadata)
values (new.ledger, new.id, (
select revision + 1
from "{{.Bucket}}".transactions_metadata
where transactions_metadata.transactions_id = new.id and transactions_metadata.ledger = new.ledger
order by revision desc
limit 1
), new.updated_at, new.metadata);

return new;
end;
$$;

create or replace function "{{.Bucket}}".insert_transaction_metadata_history() returns trigger
security definer
language plpgsql
as
$$
begin
insert into "{{.Bucket}}".transactions_metadata (ledger, transactions_id, revision, date, metadata)
values (new.ledger, new.id, 1, new.timestamp, new.metadata);

return new;
end;
$$;
2 changes: 1 addition & 1 deletion internal/storage/ledger/moves.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,12 +78,12 @@ type Move struct {
bun.BaseModel `bun:"table:moves"`

Ledger string `bun:"ledger,type:varchar"`
TransactionID int `bun:"transactions_id,type:bigint"`
IsSource bool `bun:"is_source,type:bool"`
Account string `bun:"account_address,type:varchar"`
AccountAddressArray []string `bun:"account_address_array,type:jsonb"`
Amount *bunpaginate.BigInt `bun:"amount,type:numeric"`
Asset string `bun:"asset,type:varchar"`
TransactionSeq int `bun:"transactions_seq,type:int"`
AccountSeq int `bun:"accounts_seq,type:int"`
InsertionDate time.Time `bun:"insertion_date,type:timestamp"`
EffectiveDate time.Time `bun:"effective_date,type:timestamp"`
Expand Down
5 changes: 0 additions & 5 deletions internal/storage/ledger/moves_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ func TestMovesInsert(t *testing.T) {
AccountAddressArray: []string{"world"},
Amount: (*bunpaginate.BigInt)(big.NewInt(100)),
Asset: "USD",
TransactionSeq: tx.Seq,
AccountSeq: account.Seq,
InsertionDate: t0,
EffectiveDate: t0,
Expand All @@ -74,7 +73,6 @@ func TestMovesInsert(t *testing.T) {
AccountAddressArray: []string{"world"},
Amount: (*bunpaginate.BigInt)(big.NewInt(50)),
Asset: "USD",
TransactionSeq: tx.Seq,
AccountSeq: account.Seq,
InsertionDate: t3,
EffectiveDate: t3,
Expand All @@ -94,7 +92,6 @@ func TestMovesInsert(t *testing.T) {
AccountAddressArray: []string{"world"},
Amount: (*bunpaginate.BigInt)(big.NewInt(200)),
Asset: "USD",
TransactionSeq: tx.Seq,
AccountSeq: account.Seq,
InsertionDate: t1,
EffectiveDate: t1,
Expand All @@ -114,7 +111,6 @@ func TestMovesInsert(t *testing.T) {
AccountAddressArray: []string{"world"},
Amount: (*bunpaginate.BigInt)(big.NewInt(50)),
Asset: "USD",
TransactionSeq: tx.Seq,
AccountSeq: account.Seq,
InsertionDate: t2,
EffectiveDate: t2,
Expand All @@ -134,7 +130,6 @@ func TestMovesInsert(t *testing.T) {
AccountAddressArray: []string{"world"},
Amount: (*bunpaginate.BigInt)(big.NewInt(50)),
Asset: "USD",
TransactionSeq: tx.Seq,
AccountSeq: account.Seq,
InsertionDate: t4,
EffectiveDate: t4,
Expand Down
29 changes: 13 additions & 16 deletions internal/storage/ledger/transactions.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ type Transaction struct {

Ledger string `bun:"ledger,type:varchar"`
ID int `bun:"id,type:numeric"`
Seq int `bun:"seq,scanonly"`
Timestamp time.Time `bun:"timestamp,type:timestamp without time zone,nullzero"`
Reference string `bun:"reference,type:varchar,unique,nullzero"`
Postings []ledger.Posting `bun:"postings,type:jsonb"`
Expand Down Expand Up @@ -72,11 +71,11 @@ func (t Transaction) toCore() ledger.Transaction {

func (s *Store) selectDistinctTransactionMetadataHistories(date *time.Time) *bun.SelectQuery {
ret := s.db.NewSelect().
DistinctOn("transactions_seq").
DistinctOn("transactions_id").
ModelTableExpr(s.GetPrefixedRelationName("transactions_metadata")).
Where("ledger = ?", s.ledger.Name).
Column("transactions_seq", "metadata").
Order("transactions_seq", "revision desc")
Column("transactions_id", "metadata").
Order("transactions_id", "revision desc")

if date != nil && !date.IsZero() {
ret = ret.Where("date <= ?", date)
Expand Down Expand Up @@ -133,7 +132,6 @@ func (s *Store) selectTransactions(date *time.Time, expandVolumes, expandEffecti
ret = ret.
ModelTableExpr(s.GetPrefixedRelationName("transactions")).
Column(
"seq",
"ledger",
"id",
"timestamp",
Expand All @@ -157,7 +155,7 @@ func (s *Store) selectTransactions(date *time.Time, expandVolumes, expandEffecti
if s.ledger.HasFeature(ledger.FeatureAccountMetadataHistory, "SYNC") && date != nil && !date.IsZero() {
ret = ret.
Join(
`left join (?) transactions_metadata on transactions_metadata.transactions_seq = transactions.seq`,
`left join (?) transactions_metadata on transactions_metadata.transactions_id = transactions.id`,
s.selectDistinctTransactionMetadataHistories(date),
).
ColumnExpr("coalesce(transactions_metadata.metadata, '{}'::jsonb) as metadata")
Expand All @@ -168,28 +166,28 @@ func (s *Store) selectTransactions(date *time.Time, expandVolumes, expandEffecti
if s.ledger.HasFeature(ledger.FeatureMovesHistoryPostCommitEffectiveVolumes, "SYNC") && expandEffectiveVolumes {
ret = ret.
Join(
`join (?) pcev on pcev.transactions_seq = transactions.seq`,
`join (?) pcev on pcev.transactions_id = transactions.id`,
s.db.NewSelect().
Column("transactions_seq").
Column("transactions_id").
ColumnExpr("jsonb_merge_agg(pcev::jsonb) as post_commit_effective_volumes").
TableExpr(
"(?) data",
s.db.NewSelect().
DistinctOn("transactions_seq, account_address, asset").
DistinctOn("transactions_id, account_address, asset").
ModelTableExpr(s.GetPrefixedRelationName("moves")).
Column("transactions_seq").
Column("transactions_id").
// use strings.Replace for logs
ColumnExpr(strings.Replace(`
json_build_object(
moves.account_address,
json_build_object(
moves.asset,
first_value(moves.post_commit_effective_volumes) over (partition by (transactions_seq, account_address, asset) order by seq desc)
first_value(moves.post_commit_effective_volumes) over (partition by (transactions_id, account_address, asset) order by seq desc)
)
) as pcev
`, "\n", "", -1)),
).
Group("transactions_seq"),
Group("transactions_id"),
).
ColumnExpr("pcev.*")
}
Expand All @@ -198,7 +196,6 @@ func (s *Store) selectTransactions(date *time.Time, expandVolumes, expandEffecti
ret = s.db.NewSelect().
ModelTableExpr("(?) transactions", ret).
Column(
"seq",
"ledger",
"id",
"timestamp",
Expand Down Expand Up @@ -350,9 +347,9 @@ func (s *Store) CommitTransaction(ctx context.Context, tx *ledger.Transaction) e
Asset: posting.Asset,
InsertionDate: insertionDate,
EffectiveDate: tx.Timestamp,
TransactionSeq: mappedTx.Seq,
AccountSeq: accounts[posting.Destination].Seq,
PostCommitVolumes: pointer.For(postCommitVolumes[posting.Destination][posting.Asset].Copy()),
TransactionID: tx.ID,
})
postCommitVolumes.AddInput(posting.Destination, posting.Asset, new(big.Int).Neg(posting.Amount))

Expand All @@ -365,9 +362,9 @@ func (s *Store) CommitTransaction(ctx context.Context, tx *ledger.Transaction) e
Asset: posting.Asset,
InsertionDate: insertionDate,
EffectiveDate: tx.Timestamp,
TransactionSeq: mappedTx.Seq,
AccountSeq: accounts[posting.Source].Seq,
PostCommitVolumes: pointer.For(postCommitVolumes[posting.Source][posting.Asset].Copy()),
TransactionID: tx.ID,
})
postCommitVolumes.AddOutput(posting.Source, posting.Asset, new(big.Int).Neg(posting.Amount))
}
Expand Down Expand Up @@ -445,7 +442,7 @@ func (s *Store) insertTransaction(ctx context.Context, tx *Transaction) error {
Model(tx).
ModelTableExpr(s.GetPrefixedRelationName("transactions")).
Value("id", "nextval(?)", s.GetPrefixedRelationName(fmt.Sprintf(`"transaction_id_%d"`, s.ledger.ID))).
Returning("id, seq, timestamp, inserted_at").
Returning("id, timestamp, inserted_at").
Exec(ctx)
if err != nil {
err = postgres.ResolveError(err)
Expand Down
16 changes: 13 additions & 3 deletions internal/storage/ledger/transactions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ import (
"context"
"database/sql"
"fmt"
"github.com/davecgh/go-spew/spew"
"github.com/formancehq/go-libs/platform/postgres"
ledgercontroller "github.com/formancehq/ledger/internal/controller/ledger"
"github.com/shomali11/xsql"
"math/big"
"testing"

Expand Down Expand Up @@ -145,6 +145,18 @@ func TestTransactionUpdateMetadata(t *testing.T) {
_, _, err = store.UpdateTransactionMetadata(ctx, tx2.ID, metadata.Metadata{"foo2": "bar2"})
require.NoError(t, err)

rows, err := store.GetDB().NewSelect().ModelTableExpr(store.GetPrefixedRelationName("transactions")).Rows(ctx)
require.NoError(t, err)

data, _ := xsql.Pretty(rows)
fmt.Println(data)

rows, err = store.GetDB().NewSelect().ModelTableExpr(store.GetPrefixedRelationName("moves")).Rows(ctx)
require.NoError(t, err)

data, _ = xsql.Pretty(rows)
fmt.Println(data)

// check that the database returns metadata
tx, err := store.GetTransaction(ctx, ledgercontroller.NewGetTransactionQuery(tx1.ID).WithExpandVolumes().WithExpandEffectiveVolumes())
require.NoError(t, err, "getting transaction should not fail")
Expand Down Expand Up @@ -280,7 +292,6 @@ func TestTransactionsCommit(t *testing.T) {
},
},
}, tx3.PostCommitVolumes)
spew.Dump(tx3)
require.Equal(t, tx3.PostCommitVolumes, tx3.PostCommitEffectiveVolumes)
})

Expand Down Expand Up @@ -527,7 +538,6 @@ func TestTransactionsInsert(t *testing.T) {
err := store.insertTransaction(ctx, &tx1)
require.NoError(t, err)
require.NotZero(t, tx1.ID)
require.NotZero(t, tx1.Seq)

// create another tx with the same reference
tx2 := Transaction{
Expand Down

0 comments on commit cafb3dc

Please sign in to comment.