Skip to content

Commit

Permalink
chore: clean compat layer from v2.1 (#518)
Browse files Browse the repository at this point in the history
  • Loading branch information
gfyrag authored Jan 3, 2025
1 parent 98707ec commit 7e4611b
Show file tree
Hide file tree
Showing 32 changed files with 250 additions and 3,109 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ require (
github.com/onsi/gomega v1.35.1
github.com/ory/dockertest/v3 v3.11.0
github.com/pborman/uuid v1.2.1
github.com/pkg/errors v0.9.1
github.com/shomali11/xsql v0.0.0-20190608141458-bf76292144df
github.com/spf13/cobra v1.8.1
github.com/spf13/pflag v1.0.5
Expand All @@ -55,6 +54,7 @@ require gopkg.in/yaml.v3 v3.0.1 // indirect
require (
github.com/hashicorp/go-hclog v1.6.3 // indirect
github.com/jackc/pgxlisten v0.0.0-20241106001234-1d6f6656415c // indirect
github.com/pkg/errors v0.9.1 // indirect
go.opentelemetry.io/auto/sdk v1.1.0 // indirect
)

Expand Down
2 changes: 1 addition & 1 deletion internal/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ type BalancesByAssetsByAccounts map[string]BalancesByAssets
```go
type Configuration struct {
Bucket string `json:"bucket" bun:"bucket,type:varchar(255)"`
Metadata metadata.Metadata `json:"metadata" bun:"metadata,type:jsonb"`
Metadata metadata.Metadata `json:"metadata" bun:"metadata,type:jsonb,nullzero"`
Features features.FeatureSet `json:"features" bun:"features,type:jsonb"`
}
```
Expand Down
2 changes: 1 addition & 1 deletion internal/ledger.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ var (

type Configuration struct {
Bucket string `json:"bucket" bun:"bucket,type:varchar(255)"`
Metadata metadata.Metadata `json:"metadata" bun:"metadata,type:jsonb"`
Metadata metadata.Metadata `json:"metadata" bun:"metadata,type:jsonb,nullzero"`
Features features.FeatureSet `json:"features" bun:"features,type:jsonb"`
}

Expand Down
10 changes: 5 additions & 5 deletions internal/storage/bucket/default_bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@ import (
)

// stateless version (+1 regarding directory name, as migrations start from 1 in the lib)
const MinimalSchemaVersion = 12
const MinimalSchemaVersion = 27

type DefaultBucket struct {
name string
db *bun.DB
name string
db *bun.DB
tracer trace.Tracer
}

Expand Down Expand Up @@ -81,8 +81,8 @@ func (b *DefaultBucket) AddLedger(ctx context.Context, l ledger.Ledger) error {

func NewDefault(db *bun.DB, tracer trace.Tracer, name string) *DefaultBucket {
return &DefaultBucket{
db: db,
name: name,
db: db,
name: name,
tracer: tracer,
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
name: Recreate accounts unique index
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
-- There is already a covering index on accounts table (including seq column).
-- As we will remove the seq column in next migration, we have to create a new index without it (PG will remove it automatically in background).
-- Also, we create the index concurrently to avoid locking the table.
-- And, as there is already an index on this table, the index creation should not fail.
--
-- We create this index in a dedicated as, as the doc mentions it (https://www.postgresql.org/docs/current/protocol-flow.html#PROTOCOL-FLOW-MULTI-STATEMENT)
-- multi statements queries are automatically wrapped inside transaction block, and it's forbidden
-- to create index concurrently inside a transaction block.
create unique index concurrently accounts_ledger2 on "{{.Schema}}".accounts (ledger, address)
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
name: Clean not used columns in database
124 changes: 124 additions & 0 deletions internal/storage/bucket/migrations/27-clean-database/up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
set search_path = '{{.Schema}}';

-- Clean all useless function/aggregates/indexes inherited from stateful version.
drop aggregate aggregate_objects(jsonb);
drop aggregate first(anyelement);

drop function array_distinct(anyarray);
drop function insert_posting(_transaction_seq bigint, _ledger character varying, _insertion_date timestamp without time zone, _effective_date timestamp without time zone, posting jsonb, _account_metadata jsonb);
drop function upsert_account(_ledger character varying, _address character varying, _metadata jsonb, _date timestamp without time zone, _first_usage timestamp without time zone);
drop function get_latest_move_for_account_and_asset(_ledger character varying, _account_address character varying, _asset character varying, _before timestamp without time zone);
drop function update_transaction_metadata(_ledger character varying, _id numeric, _metadata jsonb, _date timestamp without time zone);
drop function delete_account_metadata(_ledger character varying, _address character varying, _key character varying, _date timestamp without time zone);
drop function delete_transaction_metadata(_ledger character varying, _id numeric, _key character varying, _date timestamp without time zone);
drop function balance_from_volumes(v volumes);
drop function get_all_account_volumes(_ledger character varying, _account character varying, _before timestamp without time zone);
drop function first_agg(anyelement, anyelement);
drop function volumes_to_jsonb(v volumes_with_asset);
drop function get_account_aggregated_effective_volumes(_ledger character varying, _account_address character varying, _before timestamp without time zone);
drop function handle_log();
drop function get_account_aggregated_volumes(_ledger character varying, _account_address character varying, _before timestamp without time zone);
drop function get_aggregated_volumes_for_transaction(_ledger character varying, tx numeric);
drop function insert_move(_transactions_seq bigint, _ledger character varying, _insertion_date timestamp without time zone, _effective_date timestamp without time zone, _account_address character varying, _asset character varying, _amount numeric, _is_source boolean, _account_exists boolean);
drop function get_all_assets(_ledger character varying);
drop function insert_transaction(_ledger character varying, data jsonb, _date timestamp without time zone, _account_metadata jsonb);
drop function get_all_account_effective_volumes(_ledger character varying, _account character varying, _before timestamp without time zone);
drop function get_account_balance(_ledger character varying, _account character varying, _asset character varying, _before timestamp without time zone);
drop function get_aggregated_effective_volumes_for_transaction(_ledger character varying, tx numeric);
drop function aggregate_ledger_volumes(_ledger character varying, _before timestamp without time zone, _accounts character varying[], _assets character varying[] );
drop function get_transaction(_ledger character varying, _id numeric, _before timestamp without time zone);
drop function revert_transaction(_ledger character varying, _id numeric, _date timestamp without time zone);

drop index transactions_sources_arrays;
drop index transactions_destinations_arrays;
drop index transactions_sources;
drop index transactions_destinations;

-- We will remove some triggers writing these columns (set_compat_xxx) later in this file.
-- When these triggers will be removed, there is a little moment where the columns will not be filled and constraints
-- still checked by the database.
-- So, we drop the not null constraint before removing the triggers.
-- Once the triggers removed, we will be able to drop the columns.
alter table moves
alter column transactions_seq drop not null,
alter column accounts_seq drop not null,
alter column accounts_address_array drop not null;

alter table transactions_metadata
alter column transactions_seq drop not null;

alter table accounts_metadata
alter column accounts_seq drop not null;

-- Now, the columns are nullable, we can drop the trigger
drop trigger set_compat_on_move on moves;
drop trigger set_compat_on_accounts_metadata on accounts_metadata;
drop trigger set_compat_on_transactions_metadata on transactions_metadata;
drop function set_compat_on_move();
drop function set_compat_on_accounts_metadata();
drop function set_compat_on_transactions_metadata();

-- Finally remove the columns
alter table moves
drop column transactions_seq,
drop column accounts_seq,
drop column accounts_address_array;

alter table transactions_metadata
drop column transactions_seq;

alter table accounts_metadata
drop column accounts_seq;

alter table transactions
drop column seq;

alter table accounts
drop column seq;

-- rename index create in previous migration, as the drop of the column seq of accounts table has automatically dropped the index accounts_ledger
alter index accounts_ledger2
rename to accounts_ledger;

create or replace function set_log_hash()
returns trigger
security definer
language plpgsql
as
$$
declare
previousHash bytea;
marshalledAsJSON varchar;
begin
select hash into previousHash
from logs
where ledger = new.ledger
order by id desc
limit 1;

-- select only fields participating in the hash on the backend and format json representation the same way
select '{' ||
'"type":"' || new.type || '",' ||
'"data":' || encode(new.memento, 'escape') || ',' ||
'"date":"' || (to_json(new.date::timestamp)#>>'{}') || 'Z",' ||
'"idempotencyKey":"' || coalesce(new.idempotency_key, '') || '",' ||
'"id":0,' ||
'"hash":null' ||
'}' into marshalledAsJSON;

new.hash = (
select public.digest(
case
when previousHash is null
then marshalledAsJSON::bytea
else '"' || encode(previousHash::bytea, 'base64')::bytea || E'"\n' || convert_to(marshalledAsJSON, 'LATIN1')::bytea
end || E'\n', 'sha256'::text
)
);

return new;
end;
$$ set search_path from current;

alter table logs
drop column seq;
10 changes: 2 additions & 8 deletions internal/storage/driver/adapters.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,7 @@ package driver

import (
"context"
"fmt"
ledgerstore "github.com/formancehq/ledger/internal/storage/ledger/legacy"
ledgerstore "github.com/formancehq/ledger/internal/storage/ledger"

ledger "github.com/formancehq/ledger/internal"
ledgercontroller "github.com/formancehq/ledger/internal/controller/ledger"
Expand All @@ -20,12 +19,7 @@ func (d *DefaultStorageDriverAdapter) OpenLedger(ctx context.Context, name strin
return nil, nil, err
}

isUpToDate, err := store.GetBucket().IsUpToDate(ctx)
if err != nil {
return nil, nil, fmt.Errorf("checking if bucket is up to date: %w", err)
}

return ledgerstore.NewDefaultStoreAdapter(isUpToDate, store), l, nil
return ledgerstore.NewDefaultStoreAdapter(store), l, nil
}

func (d *DefaultStorageDriverAdapter) CreateLedger(ctx context.Context, l *ledger.Ledger) error {
Expand Down
4 changes: 0 additions & 4 deletions internal/storage/driver/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,6 @@ type Driver struct {

func (d *Driver) CreateLedger(ctx context.Context, l *ledger.Ledger) (*ledgerstore.Store, error) {

if l.Metadata == nil {
l.Metadata = metadata.Metadata{}
}

b := d.bucketFactory.Create(l.Bucket)
isInitialized, err := b.IsInitialized(ctx)
if err != nil {
Expand Down
51 changes: 51 additions & 0 deletions internal/storage/ledger/adapters.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package ledger

import (
"context"
"database/sql"
ledger "github.com/formancehq/ledger/internal"
ledgercontroller "github.com/formancehq/ledger/internal/controller/ledger"
)

type TX struct {
*Store
}

type DefaultStoreAdapter struct {
*Store
}

func (d *DefaultStoreAdapter) IsUpToDate(ctx context.Context) (bool, error) {
return d.HasMinimalVersion(ctx)
}

func (d *DefaultStoreAdapter) BeginTX(ctx context.Context, opts *sql.TxOptions) (ledgercontroller.Store, error) {
store, err := d.Store.BeginTX(ctx, opts)
if err != nil {
return nil, err
}

return &DefaultStoreAdapter{
Store: store,
}, nil
}

func (d *DefaultStoreAdapter) Commit() error {
return d.Store.Commit()
}

func (d *DefaultStoreAdapter) Rollback() error {
return d.Store.Rollback()
}

func (d *DefaultStoreAdapter) AggregatedBalances() ledgercontroller.Resource[ledger.AggregatedVolumes, ledgercontroller.GetAggregatedVolumesOptions] {
return d.AggregatedVolumes()
}

func NewDefaultStoreAdapter(store *Store) *DefaultStoreAdapter {
return &DefaultStoreAdapter{
Store: store,
}
}

var _ ledgercontroller.Store = (*DefaultStoreAdapter)(nil)
67 changes: 15 additions & 52 deletions internal/storage/ledger/balances.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,62 +49,25 @@ func (store *Store) GetBalances(ctx context.Context, query ledgercontroller.Bala
}
}

// Try to insert volumes using last move (to keep compat with previous version) or 0 values.
// This way, if the account has a 0 balance at this point, it will be locked as any other accounts.
// If the complete sql transaction fails, the account volumes will not be inserted.
selectMoves := store.db.NewSelect().
ModelTableExpr(store.GetPrefixedRelationName("moves")).
DistinctOn("accounts_address, asset").
Column("accounts_address", "asset").
ColumnExpr("first_value(post_commit_volumes) over (partition by accounts_address, asset order by seq desc) as post_commit_volumes").
ColumnExpr("first_value(ledger) over (partition by accounts_address, asset order by seq desc) as ledger").
Where("("+strings.Join(conditions, ") OR (")+")", args...)

zeroValuesAndMoves := store.db.NewSelect().
TableExpr("(?) data", selectMoves).
Column("ledger", "accounts_address", "asset").
ColumnExpr("(post_commit_volumes).inputs as input").
ColumnExpr("(post_commit_volumes).outputs as output").
UnionAll(
store.db.NewSelect().
TableExpr(
"(?) data",
store.db.NewSelect().NewValues(&accountsVolumes),
).
Column("*"),
)

zeroValueOrMoves := store.db.NewSelect().
TableExpr("(?) data", zeroValuesAndMoves).
Column("ledger", "accounts_address", "asset", "input", "output").
DistinctOn("ledger, accounts_address, asset")

insertDefaultValue := store.db.NewInsert().
TableExpr(store.GetPrefixedRelationName("accounts_volumes")).
TableExpr("(" + zeroValueOrMoves.String() + ") data").
On("conflict (ledger, accounts_address, asset) do nothing").
Returning("ledger, accounts_address, asset, input, output")

selectExistingValues := store.db.NewSelect().
err := store.db.NewSelect().
With(
"ins",
// Try to insert volumes with 0 values.
// This way, if the account has a 0 balance at this point, it will be locked as any other accounts.
// It the complete sql transaction fail, the account volumes will not be inserted.
store.db.NewInsert().
Model(&accountsVolumes).
ModelTableExpr(store.GetPrefixedRelationName("accounts_volumes")).
On("conflict do nothing"),
).
Model(&accountsVolumes).
ModelTableExpr(store.GetPrefixedRelationName("accounts_volumes")).
Column("ledger", "accounts_address", "asset", "input", "output").
Column("accounts_address", "asset", "input", "output").
Where("("+strings.Join(conditions, ") OR (")+")", args...).
For("update").
// notes(gfyrag): Keep order, it ensures consistent locking order and limit deadlocks
Order("accounts_address", "asset")

finalQuery := store.db.NewSelect().
With("inserted", insertDefaultValue).
With("existing", selectExistingValues).
ModelTableExpr(
"(?) accounts_volumes",
store.db.NewSelect().
ModelTableExpr("inserted").
UnionAll(store.db.NewSelect().ModelTableExpr("existing")),
).
Model(&accountsVolumes)

err := finalQuery.Scan(ctx)
Order("accounts_address", "asset").
Scan(ctx)
if err != nil {
return nil, postgres.ResolveError(err)
}
Expand Down
Loading

0 comments on commit 7e4611b

Please sign in to comment.