diff --git a/go.mod b/go.mod index 25ae8ebcd..24cc4c36f 100644 --- a/go.mod +++ b/go.mod @@ -13,7 +13,7 @@ require ( github.com/alitto/pond v1.9.2 github.com/antlr/antlr4/runtime/Go/antlr v1.4.10 github.com/bluele/gcache v0.0.2 - github.com/formancehq/go-libs/v2 v2.1.3-0.20241017104005-ad844342baae + github.com/formancehq/go-libs/v2 v2.0.0-20241017113509-22db708b22a5 github.com/formancehq/stack/ledger/client v0.0.0-00010101000000-000000000000 github.com/go-chi/chi/v5 v5.1.0 github.com/go-chi/cors v1.2.1 diff --git a/go.sum b/go.sum index f569201ba..facdfa7a2 100644 --- a/go.sum +++ b/go.sum @@ -95,6 +95,10 @@ github.com/fatih/color v1.17.0 h1:GlRw1BRJxkpqUCBKzKOw098ed57fEsKeNjpTe3cSjK4= github.com/fatih/color v1.17.0/go.mod h1:YZ7TlrGPkiz6ku9fK3TLD/pl3CpsiFyu8N92HLgmosI= github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2Wg= github.com/felixge/httpsnoop v1.0.4/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= +github.com/formancehq/go-libs/v2 v2.0.0-20241017113509-22db708b22a5 h1:uwsBFFeyh7dI3O3EnoGuyy569WsRXFhNjEaSD4AN+eg= +github.com/formancehq/go-libs/v2 v2.0.0-20241017113509-22db708b22a5/go.mod h1:LgxayMN6wgAQbkB3ioBDTHOVMKp1rC6Q55M1CvG44xY= +github.com/formancehq/go-libs/v2 v2.1.2 h1:AL5A2LgpepFU6+ovvkRXkYUfupPY46BsRHp0M6SFTUY= +github.com/formancehq/go-libs/v2 v2.1.2/go.mod h1:+JVecYnhf7xTbbz65nwxBDehdMx1JgHIitvnxXYuuAI= github.com/formancehq/go-libs/v2 v2.1.3-0.20241017101710-5a4a6a4cb50d h1:tZZo5yypip02aAkbkp6kGDC8Lvo032DlE/0yVhu8iNY= github.com/formancehq/go-libs/v2 v2.1.3-0.20241017101710-5a4a6a4cb50d/go.mod h1:LgxayMN6wgAQbkB3ioBDTHOVMKp1rC6Q55M1CvG44xY= github.com/formancehq/go-libs/v2 v2.1.3-0.20241017104005-ad844342baae h1:29GM6mMhC/7mVsbZJ6qgvQEGAcLSNXwWD3DNKo2Kikk= diff --git a/internal/storage/bucket/bucket.go b/internal/storage/bucket/bucket.go index af0c1001f..73777c500 100644 --- a/internal/storage/bucket/bucket.go +++ b/internal/storage/bucket/bucket.go @@ -1,13 +1,16 @@ package bucket import ( + "bytes" "context" _ "embed" - "go.opentelemetry.io/otel/trace" - "errors" + "fmt" "github.com/formancehq/go-libs/v2/migrations" + ledger "github.com/formancehq/ledger/internal" "github.com/uptrace/bun" + "go.opentelemetry.io/otel/trace" + "text/template" ) type Bucket struct { @@ -27,9 +30,167 @@ func (b *Bucket) IsUpToDate(ctx context.Context) (bool, error) { return ret, err } +func (b *Bucket) GetMigrationsInfo(ctx context.Context) ([]migrations.Info, error) { + return GetMigrator(b.name).GetMigrations(ctx, b.db) +} + +func (b *Bucket) AddLedger(ctx context.Context, l ledger.Ledger, db bun.IDB) error { + + tpl := template.Must(template.New("sql").Parse(addLedgerTpl)) + buf := bytes.NewBuffer(nil) + if err := tpl.Execute(buf, l); err != nil { + return fmt.Errorf("executing template: %w", err) + } + + _, err := db.ExecContext(ctx, buf.String()) + if err != nil { + return fmt.Errorf("executing sql: %w", err) + } + + return nil +} + func New(db bun.IDB, name string) *Bucket { return &Bucket{ db: db, name: name, } } + +const addLedgerTpl = ` +-- create a sequence for transactions by ledger instead of a sequence of the table as we want to have contiguous ids +-- notes: we can still have "holes" on ids since a sql transaction can be reverted after a usage of the sequence +create sequence "{{.Bucket}}"."transaction_id_{{.ID}}" owned by "{{.Bucket}}".transactions.id; +select setval('"{{.Bucket}}"."transaction_id_{{.ID}}"', coalesce(( + select max(id) + 1 + from "{{.Bucket}}".transactions + where ledger = '{{ .Name }}' +), 1)::bigint, false); + +-- create a sequence for logs by ledger instead of a sequence of the table as we want to have contiguous ids +-- notes: we can still have "holes" on id since a sql transaction can be reverted after a usage of the sequence +create sequence "{{.Bucket}}"."log_id_{{.ID}}" owned by "{{.Bucket}}".logs.id; +select setval('"{{.Bucket}}"."log_id_{{.ID}}"', coalesce(( + select max(id) + 1 + from "{{.Bucket}}".logs + where ledger = '{{ .Name }}' +), 1)::bigint, false); + +-- enable post commit effective volumes synchronously + +{{ if .HasFeature "MOVES_HISTORY_POST_COMMIT_EFFECTIVE_VOLUMES" "SYNC" }} +create index "pcev_{{.ID}}" on "{{.Bucket}}".moves (accounts_address, asset, effective_date desc) where ledger = '{{.Name}}'; + +create trigger "set_effective_volumes_{{.ID}}" +before insert +on "{{.Bucket}}"."moves" +for each row +when ( + new.ledger = '{{.Name}}' +) +execute procedure "{{.Bucket}}".set_effective_volumes(); + +create trigger "update_effective_volumes_{{.ID}}" +after insert +on "{{.Bucket}}"."moves" +for each row +when ( + new.ledger = '{{.Name}}' +) +execute procedure "{{.Bucket}}".update_effective_volumes(); +{{ end }} + +-- logs hash + +{{ if .HasFeature "HASH_LOGS" "SYNC" }} +create trigger "set_log_hash_{{.ID}}" +before insert +on "{{.Bucket}}"."logs" +for each row +when ( + new.ledger = '{{.Name}}' +) +execute procedure "{{.Bucket}}".set_log_hash(); +{{ end }} + +{{ if .HasFeature "ACCOUNT_METADATA_HISTORY" "SYNC" }} +create trigger "update_account_metadata_history_{{.ID}}" +after update +on "{{.Bucket}}"."accounts" +for each row +when ( + new.ledger = '{{.Name}}' +) +execute procedure "{{.Bucket}}".update_account_metadata_history(); + +create trigger "insert_account_metadata_history_{{.ID}}" +after insert +on "{{.Bucket}}"."accounts" +for each row +when ( + new.ledger = '{{.Name}}' +) +execute procedure "{{.Bucket}}".insert_account_metadata_history(); +{{ end }} + +{{ if .HasFeature "TRANSACTION_METADATA_HISTORY" "SYNC" }} +create trigger "update_transaction_metadata_history_{{.ID}}" +after update +on "{{.Bucket}}"."transactions" +for each row +when ( + new.ledger = '{{.Name}}' +) +execute procedure "{{.Bucket}}".update_transaction_metadata_history(); + +create trigger "insert_transaction_metadata_history_{{.ID}}" +after insert +on "{{.Bucket}}"."transactions" +for each row +when ( + new.ledger = '{{.Name}}' +) +execute procedure "{{.Bucket}}".insert_transaction_metadata_history(); +{{ end }} + +{{ if .HasFeature "INDEX_TRANSACTION_ACCOUNTS" "ON" }} +create index "transactions_sources_{{.ID}}" on "{{.Bucket}}".transactions using gin (sources jsonb_path_ops) where ledger = '{{.Name}}'; +create index "transactions_destinations_{{.ID}}" on "{{.Bucket}}".transactions using gin (destinations jsonb_path_ops) where ledger = '{{.Name}}'; +create trigger "transaction_set_addresses_{{.ID}}" + before insert + on "{{.Bucket}}"."transactions" + for each row + when ( + new.ledger = '{{.Name}}' + ) +execute procedure "{{.Bucket}}".set_transaction_addresses(); +{{ end }} + +{{ if .HasFeature "INDEX_ADDRESS_SEGMENTS" "ON" }} +create index "accounts_address_array_{{.ID}}" on "{{.Bucket}}".accounts using gin (address_array jsonb_ops) where ledger = '{{.Name}}'; +create index "accounts_address_array_length_{{.ID}}" on "{{.Bucket}}".accounts (jsonb_array_length(address_array)) where ledger = '{{.Name}}'; + +create trigger "accounts_set_address_array_{{.ID}}" + before insert + on "{{.Bucket}}"."accounts" + for each row + when ( + new.ledger = '{{.Name}}' + ) +execute procedure "{{.Bucket}}".set_address_array_for_account(); + +{{ if .HasFeature "INDEX_TRANSACTION_ACCOUNTS" "ON" }} +create index "transactions_sources_arrays_{{.ID}}" on "{{.Bucket}}".transactions using gin (sources_arrays jsonb_path_ops) where ledger = '{{.Name}}'; +create index "transactions_destinations_arrays_{{.ID}}" on "{{.Bucket}}".transactions using gin (destinations_arrays jsonb_path_ops) where ledger = '{{.Name}}'; + +create trigger "transaction_set_addresses_segments_{{.ID}}" + before insert + on "{{.Bucket}}"."transactions" + for each row + when ( + new.ledger = '{{.Name}}' + ) +execute procedure "{{.Bucket}}".set_transaction_addresses_segments(); +{{ end }} +{{ end }} +` diff --git a/internal/storage/bucket/migrations/34-set-ledger-specifics.sql b/internal/storage/bucket/migrations/34-set-ledger-specifics.sql new file mode 100644 index 000000000..a909691da --- /dev/null +++ b/internal/storage/bucket/migrations/34-set-ledger-specifics.sql @@ -0,0 +1,79 @@ +DO +$do$ + declare + ledger record; + vsql text; + BEGIN + for ledger in select * from _system.ledgers where bucket = '{{.Bucket}}' loop + -- create a sequence for transactions by ledger instead of a sequence of the table as we want to have contiguous ids + -- notes: we can still have "holes" on ids since a sql transaction can be reverted after a usage of the sequence + + vsql = 'create sequence "{{.Bucket}}"."transaction_id_' || ledger.id || '" owned by "{{.Bucket}}".transactions.id'; + execute vsql; + + vsql = 'select setval("{{.Bucket}}"."transaction_id_' || ledger.id || '", coalesce((select max(id) + 1 from "{{.Bucket}}".transactions where ledger = ledger.name), 1)::bigint, false)'; + execute vsql; + + -- create a sequence for logs by ledger instead of a sequence of the table as we want to have contiguous ids + -- notes: we can still have "holes" on id since a sql transaction can be reverted after a usage of the sequence + vsql = 'create sequence "{{.Bucket}}"."log_id_' || ledger.id || '" owned by "{{.Bucket}}".logs.id'; + execute vsql; + + vsql = 'select setval("{{.Bucket}}"."log_id_' || ledger.id || '", coalesce((select max(id) + 1 from "{{.Bucket}}".logs where ledger = ledger.name), 1)::bigint, false)'; + execute vsql; + + -- enable post commit effective volumes synchronously + vsql = 'create index "pcev_' || ledger.id || '" on "{{.Bucket}}".moves (accounts_address, asset, effective_date desc) where ledger = ledger.name'; + execute vsql; + + vsql = 'create trigger "set_effective_volumes_' || ledger.id || '" before insert on "{{.Bucket}}".moves for each row when (new.ledger = ledger.name) execute procedure "{{.Bucket}}".set_effective_volumes()'; + execute vsql; + + vsql = 'create trigger "update_effective_volumes_' || ledger.id || '" after insert on "{{.Bucket}}".moves for each row when (new.ledger = ledger.name) execute procedure "{{.Bucket}}".update_effective_volumes()'; + execute vsql; + + -- logs hash + vsql = 'create trigger "set_log_hash_' || ledger.id || '" before insert on "{{.Bucket}}".logs for each row when (new.ledger = ledger.name) execute procedure "{{.Bucket}}".set_log_hash()'; + execute vsql; + + vsql = 'create trigger "update_account_metadata_history_' || ledger.id || '" after update on "{{.Bucket}}"."accounts" for each row when (new.ledger = ledger.name) execute procedure "{{.Bucket}}".update_account_metadata_history()'; + execute vsql; + + vsql = 'create trigger "insert_account_metadata_history_' || ledger.id || '" after insert on "{{.Bucket}}"."accounts" for each row when (new.ledger = ledger.name) execute procedure "{{.Bucket}}".insert_account_metadata_history()'; + execute vsql; + + vsql = 'create trigger "update_transaction_metadata_history_' || ledger.id || '" after update on "{{.Bucket}}"."transactions" for each row when (new.ledger = ledger.name) execute procedure "{{.Bucket}}".update_transaction_metadata_history()'; + execute vsql; + + vsql = 'create trigger "insert_transaction_metadata_history_' || ledger.id || '" after insert on "{{.Bucket}}"."transactions" for each row when (new.ledger = ledger.name) execute procedure "{{.Bucket}}".insert_transaction_metadata_history()'; + execute vsql; + + vsql = 'create index "transactions_sources_' || ledger.id || '" on "{{.Bucket}}".transactions using gin (sources jsonb_path_ops) where ledger = ledger.name'; + execute vsql; + + vsql = 'create index "transactions_destinations_' || ledger.id || '" on "{{.Bucket}}".transactions using gin (destinations jsonb_path_ops) where ledger = ledger.name'; + execute vsql; + + vsql = 'create trigger "transaction_set_addresses_' || ledger.id || '" before insert on "{{.Bucket}}".transactions for each row when (new.ledger = ledger.name) execute procedure "{{.Bucket}}".set_transaction_addresses()'; + execute vsql; + + vsql = 'create index "accounts_address_array_' || ledger.id || '" on "{{.Bucket}}".accounts using gin (address_array jsonb_ops) where ledger = ledger.name'; + execute vsql; + + vsql = 'create index "accounts_address_array_length_' || ledger.id || '" on "{{.Bucket}}".accounts (jsonb_array_length(address_array)) where ledger = ledger.name'; + execute vsql; + + vsql = 'create trigger "accounts_set_address_array_' || ledger.id || '" before insert on "{{.Bucket}}".accounts for each row when (new.ledger = ledger.name) execute procedure "{{.Bucket}}".set_address_array_for_account()'; + execute vsql; + + vsql = 'create index "transactions_sources_arrays_' || ledger.id || '" on "{{.Bucket}}".transactions using gin (sources_arrays jsonb_path_ops) where ledger = ledger.name'; + execute vsql; + + vsql = 'create index "transactions_destinations_arrays_' || ledger.id || '" on "{{.Bucket}}".transactions using gin (destinations_arrays jsonb_path_ops) where ledger = ledger.name'; + execute vsql; + + vsql = 'create trigger "transaction_set_addresses_segments_' || ledger.id || '" before insert on "{{.Bucket}}"."transactions" for each row when (new.ledger = ledger.name) execute procedure "{{.Bucket}}".set_transaction_addresses_segments()'; + execute vsql; + end loop; + END +$do$; \ No newline at end of file diff --git a/internal/storage/driver/driver.go b/internal/storage/driver/driver.go index 08b20c43b..41242f328 100644 --- a/internal/storage/driver/driver.go +++ b/internal/storage/driver/driver.go @@ -4,16 +4,15 @@ import ( "context" "database/sql" "fmt" - . "github.com/formancehq/go-libs/v2/collectionutils" + "github.com/formancehq/go-libs/v2/collectionutils" "github.com/formancehq/go-libs/v2/metadata" "github.com/formancehq/go-libs/v2/platform/postgres" + systemcontroller "github.com/formancehq/ledger/internal/controller/system" "go.opentelemetry.io/otel/metric" noopmetrics "go.opentelemetry.io/otel/metric/noop" "go.opentelemetry.io/otel/trace" nooptracer "go.opentelemetry.io/otel/trace/noop" - systemcontroller "github.com/formancehq/ledger/internal/controller/system" - ledgercontroller "github.com/formancehq/ledger/internal/controller/ledger" "github.com/formancehq/go-libs/v2/bun/bunpaginate" @@ -35,20 +34,37 @@ type Driver struct { meter metric.Meter } -func (d *Driver) createLedgerStore(ctx context.Context, db bun.IDB, ledger ledger.Ledger) (*ledgerstore.Store, error) { +func (d *Driver) createLedgerStore(ctx context.Context, db bun.IDB, l *ledger.Ledger) (*ledgerstore.Store, error) { tx, err := db.BeginTx(ctx, &sql.TxOptions{}) if err != nil { return nil, fmt.Errorf("begin transaction: %w", err) } - b := bucket.New(tx, ledger.Bucket) + b := bucket.New(tx, l.Bucket) if err := b.Migrate(ctx, d.tracer); err != nil { return nil, fmt.Errorf("migrating bucket: %w", err) } - if err := ledgerstore.Migrate(ctx, d.tracer, tx, ledger); err != nil { - return nil, fmt.Errorf("failed to migrate ledger store: %w", err) + ret, err := db.NewInsert(). + Model(l). + Ignore(). + Returning("id, added_at"). + Exec(ctx) + if err != nil { + return nil, postgres.ResolveError(err) + } + + affected, err := ret.RowsAffected() + if err != nil { + return nil, fmt.Errorf("creating ledger: %w", err) + } + if affected == 0 { + return nil, systemcontroller.ErrLedgerAlreadyExists + } + + if err := b.AddLedger(ctx, *l, tx); err != nil { + return nil, fmt.Errorf("adding ledger to bucket: %w", err) } if err := tx.Commit(); err != nil { @@ -57,7 +73,8 @@ func (d *Driver) createLedgerStore(ctx context.Context, db bun.IDB, ledger ledge return ledgerstore.New( d.db, - ledger, + b, + *l, ledgerstore.WithMeter(d.meter), ledgerstore.WithTracer(d.tracer), ), nil @@ -78,24 +95,7 @@ func (d *Driver) CreateLedger(ctx context.Context, l *ledger.Ledger) (*ledgersto l.Metadata = metadata.Metadata{} } - ret, err := d.db.NewInsert(). - Model(l). - Ignore(). - Returning("id, added_at"). - Exec(ctx) - if err != nil { - return nil, postgres.ResolveError(err) - } - - affected, err := ret.RowsAffected() - if err != nil { - return nil, fmt.Errorf("creating ledger: %w", err) - } - if affected == 0 { - return nil, systemcontroller.ErrLedgerAlreadyExists - } - - store, err := d.createLedgerStore(ctx, tx, *l) + store, err := d.createLedgerStore(ctx, tx, l) if err != nil { return nil, err } @@ -119,6 +119,7 @@ func (d *Driver) OpenLedger(ctx context.Context, name string) (*ledgerstore.Stor return ledgerstore.New( d.db, + bucket.New(d.db, ret.Bucket), *ret, ledgerstore.WithMeter(d.meter), ledgerstore.WithTracer(d.tracer), @@ -184,14 +185,14 @@ func (d *Driver) UpgradeBucket(ctx context.Context, name string) error { func (d *Driver) UpgradeAllBuckets(ctx context.Context) error { - bucketsNames := Set[string]{} + buckets := collectionutils.Set[string]{} err := bunpaginate.Iterate(ctx, ledgercontroller.NewListLedgersQuery(10), func(ctx context.Context, q ledgercontroller.ListLedgersQuery) (*bunpaginate.Cursor[ledger.Ledger], error) { return d.ListLedgers(ctx, q) }, func(cursor *bunpaginate.Cursor[ledger.Ledger]) error { - for _, name := range cursor.Data { - bucketsNames.Put(name.Bucket) + for _, l := range cursor.Data { + buckets.Put(l.Bucket) } return nil }) @@ -199,7 +200,7 @@ func (d *Driver) UpgradeAllBuckets(ctx context.Context) error { return err } - for _, bucketName := range Keys(bucketsNames) { + for _, bucketName := range collectionutils.Keys(buckets) { b := bucket.New(d.db, bucketName) logging.FromContext(ctx).Infof("Upgrading bucket '%s'", bucketName) @@ -211,26 +212,6 @@ func (d *Driver) UpgradeAllBuckets(ctx context.Context) error { return nil } -func (d *Driver) UpgradeAllLedgers(ctx context.Context) error { - err := bunpaginate.Iterate(ctx, ledgercontroller.NewListLedgersQuery(10), - func(ctx context.Context, q ledgercontroller.ListLedgersQuery) (*bunpaginate.Cursor[ledger.Ledger], error) { - return d.ListLedgers(ctx, q) - }, - func(cursor *bunpaginate.Cursor[ledger.Ledger]) error { - for _, ledger := range cursor.Data { - if err := ledgerstore.Migrate(ctx, d.tracer, d.db, ledger); err != nil { - return err - } - } - return nil - }) - if err != nil { - return err - } - - return nil -} - func New(db *bun.DB, opts ...Option) *Driver { ret := &Driver{ db: db, diff --git a/internal/storage/driver/driver_test.go b/internal/storage/driver/driver_test.go index 7a775a74c..1a96f3e12 100644 --- a/internal/storage/driver/driver_test.go +++ b/internal/storage/driver/driver_test.go @@ -37,7 +37,6 @@ func TestUpgradeAllLedgers(t *testing.T) { } require.NoError(t, d.UpgradeAllBuckets(ctx)) - require.NoError(t, d.UpgradeAllLedgers(ctx)) } func TestLedgersCreate(t *testing.T) { diff --git a/internal/storage/ledger/main_test.go b/internal/storage/ledger/main_test.go index fdd178f30..2fc0f904b 100644 --- a/internal/storage/ledger/main_test.go +++ b/internal/storage/ledger/main_test.go @@ -87,9 +87,9 @@ func newLedgerStore(t T) *ledgerstore.Store { b := bucket.New(db, ledgerName) require.NoError(t, b.Migrate(ctx, noop.Tracer{})) - require.NoError(t, ledgerstore.Migrate(ctx, noop.Tracer{}, db, l)) + require.NoError(t, b.AddLedger(ctx, l, db)) - return ledgerstore.New(db, l) + return ledgerstore.New(db, b, l) } func bigIntComparer(v1 *big.Int, v2 *big.Int) bool { diff --git a/internal/storage/ledger/migrations.go b/internal/storage/ledger/migrations.go deleted file mode 100644 index aeb31b529..000000000 --- a/internal/storage/ledger/migrations.go +++ /dev/null @@ -1,43 +0,0 @@ -package ledger - -import ( - "bytes" - "context" - "embed" - "fmt" - "go.opentelemetry.io/otel/trace" - "text/template" - - "github.com/formancehq/go-libs/v2/migrations" - ledger "github.com/formancehq/ledger/internal" - "github.com/uptrace/bun" -) - -//go:embed migrations -var migrationsDir embed.FS - -func getMigrator(ledger ledger.Ledger) *migrations.Migrator { - migrator := migrations.NewMigrator( - migrations.WithSchema(ledger.Bucket, false), - migrations.WithTableName(fmt.Sprintf("migrations_%s", ledger.Name)), - ) - migrator.RegisterMigrationsFromFileSystem(migrationsDir, "migrations", func(s string) string { - buf := bytes.NewBufferString("") - - t := template.Must(template.New("migration").Parse(s)) - if err := t.Execute(buf, ledger); err != nil { - panic(err) - } - - return buf.String() - }) - - return migrator -} - -func Migrate(ctx context.Context, tracer trace.Tracer, db bun.IDB, ledger ledger.Ledger) error { - ctx, span := tracer.Start(ctx, "Migrate ledger") - defer span.End() - - return getMigrator(ledger).Up(ctx, db) -} diff --git a/internal/storage/ledger/migrations/0-add-sequences.sql b/internal/storage/ledger/migrations/0-add-sequences.sql deleted file mode 100644 index aa709231b..000000000 --- a/internal/storage/ledger/migrations/0-add-sequences.sql +++ /dev/null @@ -1,136 +0,0 @@ - --- create a sequence for transactions by ledger instead of a sequence of the table as we want to have contiguous ids --- notes: we can still have "holes" on ids since a sql transaction can be reverted after a usage of the sequence -create sequence "{{.Bucket}}"."transaction_id_{{.ID}}" owned by "{{.Bucket}}".transactions.id; -select setval('"{{.Bucket}}"."transaction_id_{{.ID}}"', coalesce(( - select max(id) + 1 - from "{{.Bucket}}".transactions - where ledger = '{{ .Name }}' -), 1)::bigint, false); - --- create a sequence for logs by ledger instead of a sequence of the table as we want to have contiguous ids --- notes: we can still have "holes" on id since a sql transaction can be reverted after a usage of the sequence -create sequence "{{.Bucket}}"."log_id_{{.ID}}" owned by "{{.Bucket}}".logs.id; -select setval('"{{.Bucket}}"."log_id_{{.ID}}"', coalesce(( - select max(id) + 1 - from "{{.Bucket}}".logs - where ledger = '{{ .Name }}' -), 1)::bigint, false); - --- enable post commit effective volumes synchronously - -{{ if .HasFeature "MOVES_HISTORY_POST_COMMIT_EFFECTIVE_VOLUMES" "SYNC" }} -create index "pcev_{{.ID}}" on "{{.Bucket}}".moves (accounts_address, asset, effective_date desc) where ledger = '{{.Name}}'; - -create trigger "set_effective_volumes_{{.ID}}" -before insert -on "{{.Bucket}}"."moves" -for each row -when ( - new.ledger = '{{.Name}}' -) -execute procedure "{{.Bucket}}".set_effective_volumes(); - -create trigger "update_effective_volumes_{{.ID}}" -after insert -on "{{.Bucket}}"."moves" -for each row -when ( - new.ledger = '{{.Name}}' -) -execute procedure "{{.Bucket}}".update_effective_volumes(); -{{ end }} - --- logs hash - -{{ if .HasFeature "HASH_LOGS" "SYNC" }} -create trigger "set_log_hash_{{.ID}}" -before insert -on "{{.Bucket}}"."logs" -for each row -when ( - new.ledger = '{{.Name}}' -) -execute procedure "{{.Bucket}}".set_log_hash(); -{{ end }} - -{{ if .HasFeature "ACCOUNT_METADATA_HISTORY" "SYNC" }} -create trigger "update_account_metadata_history_{{.ID}}" -after update -on "{{.Bucket}}"."accounts" -for each row -when ( - new.ledger = '{{.Name}}' -) -execute procedure "{{.Bucket}}".update_account_metadata_history(); - -create trigger "insert_account_metadata_history_{{.ID}}" -after insert -on "{{.Bucket}}"."accounts" -for each row -when ( - new.ledger = '{{.Name}}' -) -execute procedure "{{.Bucket}}".insert_account_metadata_history(); -{{ end }} - -{{ if .HasFeature "TRANSACTION_METADATA_HISTORY" "SYNC" }} -create trigger "update_transaction_metadata_history_{{.ID}}" -after update -on "{{.Bucket}}"."transactions" -for each row -when ( - new.ledger = '{{.Name}}' -) -execute procedure "{{.Bucket}}".update_transaction_metadata_history(); - -create trigger "insert_transaction_metadata_history_{{.ID}}" -after insert -on "{{.Bucket}}"."transactions" -for each row -when ( - new.ledger = '{{.Name}}' -) -execute procedure "{{.Bucket}}".insert_transaction_metadata_history(); -{{ end }} - -{{ if .HasFeature "INDEX_TRANSACTION_ACCOUNTS" "ON" }} -create index "transactions_sources_{{.ID}}" on "{{.Bucket}}".transactions using gin (sources jsonb_path_ops) where ledger = '{{.Name}}'; -create index "transactions_destinations_{{.ID}}" on "{{.Bucket}}".transactions using gin (destinations jsonb_path_ops) where ledger = '{{.Name}}'; -create trigger "transaction_set_addresses_{{.ID}}" - before insert - on "{{.Bucket}}"."transactions" - for each row - when ( - new.ledger = '{{.Name}}' - ) -execute procedure "{{.Bucket}}".set_transaction_addresses(); -{{ end }} - -{{ if .HasFeature "INDEX_ADDRESS_SEGMENTS" "ON" }} -create index "accounts_address_array_{{.ID}}" on "{{.Bucket}}".accounts using gin (address_array jsonb_ops) where ledger = '{{.Name}}'; -create index "accounts_address_array_length_{{.ID}}" on "{{.Bucket}}".accounts (jsonb_array_length(address_array)) where ledger = '{{.Name}}'; - -create trigger "accounts_set_address_array_{{.ID}}" - before insert - on "{{.Bucket}}"."accounts" - for each row - when ( - new.ledger = '{{.Name}}' - ) -execute procedure "{{.Bucket}}".set_address_array_for_account(); - -{{ if .HasFeature "INDEX_TRANSACTION_ACCOUNTS" "ON" }} -create index "transactions_sources_arrays_{{.ID}}" on "{{.Bucket}}".transactions using gin (sources_arrays jsonb_path_ops) where ledger = '{{.Name}}'; -create index "transactions_destinations_arrays_{{.ID}}" on "{{.Bucket}}".transactions using gin (destinations_arrays jsonb_path_ops) where ledger = '{{.Name}}'; - -create trigger "transaction_set_addresses_segments_{{.ID}}" - before insert - on "{{.Bucket}}"."transactions" - for each row - when ( - new.ledger = '{{.Name}}' - ) -execute procedure "{{.Bucket}}".set_transaction_addresses_segments(); -{{ end }} -{{ end }} \ No newline at end of file diff --git a/internal/storage/ledger/store.go b/internal/storage/ledger/store.go index 19891df8f..67026a761 100644 --- a/internal/storage/ledger/store.go +++ b/internal/storage/ledger/store.go @@ -3,23 +3,22 @@ package ledger import ( "context" "fmt" + "github.com/formancehq/go-libs/v2/migrations" "github.com/formancehq/go-libs/v2/platform/postgres" + "github.com/formancehq/ledger/internal/storage/bucket" "go.opentelemetry.io/otel/metric" noopmetrics "go.opentelemetry.io/otel/metric/noop" "go.opentelemetry.io/otel/trace" nooptracer "go.opentelemetry.io/otel/trace/noop" - "github.com/formancehq/ledger/internal/tracing" - "errors" - "github.com/formancehq/go-libs/v2/migrations" ledger "github.com/formancehq/ledger/internal" - "github.com/formancehq/ledger/internal/storage/bucket" "github.com/uptrace/bun" ) type Store struct { db bun.IDB + bucket *bucket.Bucket ledger ledger.Ledger tracer trace.Tracer @@ -62,45 +61,6 @@ func (s *Store) WithDB(db bun.IDB) *Store { return &ret } -// todo: merge with bucket migration info -// todo: add test -func (s *Store) GetMigrationsInfo(ctx context.Context) ([]migrations.Info, error) { - return getMigrator(s.ledger).GetMigrations(ctx, s.db) -} - -func (s *Store) IsUpToDate(ctx context.Context) (bool, error) { - bucketUpToDate, err := tracing.TraceWithMetric( - ctx, - "CheckBucketSchema", - s.tracer, - s.checkBucketSchemaHistogram, - func(ctx context.Context) (bool, error) { - return bucket.New(s.db, s.ledger.Bucket).IsUpToDate(ctx) - }, - ) - if err != nil { - return false, fmt.Errorf("failed to check if bucket is up to date: %w", err) - } - if !bucketUpToDate { - return false, nil - } - - ret, err := tracing.TraceWithMetric( - ctx, - "CheckLedgerSchema", - s.tracer, - s.checkLedgerSchemaHistogram, - func(ctx context.Context) (bool, error) { - return getMigrator(s.ledger).IsUpToDate(ctx, s.db) - }, - ) - if err != nil && errors.Is(err, migrations.ErrMissingVersionTable) { - return false, nil - } - - return ret, err -} - func (s *Store) validateAddressFilter(operator string, value any) error { if operator != "$match" { return errors.New("'address' column can only be used with $match") @@ -119,10 +79,11 @@ func (s *Store) LockLedger(ctx context.Context) error { return postgres.ResolveError(err) } -func New(db bun.IDB, ledger ledger.Ledger, opts ...Option) *Store { +func New(db bun.IDB, bucket *bucket.Bucket, ledger ledger.Ledger, opts ...Option) *Store { ret := &Store{ db: db, ledger: ledger, + bucket: bucket, } for _, opt := range append(defaultOptions, opts...) { opt(ret) @@ -221,6 +182,14 @@ func New(db bun.IDB, ledger ledger.Ledger, opts ...Option) *Store { return ret } +func (s *Store) IsUpToDate(ctx context.Context) (bool, error) { + return s.bucket.IsUpToDate(ctx) +} + +func (s *Store) GetMigrationsInfo(ctx context.Context) ([]migrations.Info, error) { + return s.bucket.GetMigrationsInfo(ctx) +} + type Option func(s *Store) func WithMeter(meter metric.Meter) Option { diff --git a/test/migrations/upgrade_test.go b/test/migrations/upgrade_test.go index 964d7846a..2abbad014 100644 --- a/test/migrations/upgrade_test.go +++ b/test/migrations/upgrade_test.go @@ -53,7 +53,7 @@ func TestMigrations(t *testing.T) { // Migrate database driver := driver.New(db) require.NoError(t, driver.Initialize(ctx)) - require.NoError(t, driver.UpgradeAllLedgers(ctx)) + require.NoError(t, driver.UpgradeAllBuckets(ctx)) } func copyDatabase(t *testing.T, dockerPool *docker.Pool, source, destination string) {