diff --git a/internal/storage/bucket/migrations/19-accounts-recreate-unique-index/notes.yaml b/internal/storage/bucket/migrations/19-accounts-recreate-unique-index/notes.yaml new file mode 100644 index 000000000..4b7c24021 --- /dev/null +++ b/internal/storage/bucket/migrations/19-accounts-recreate-unique-index/notes.yaml @@ -0,0 +1 @@ +name: Recreate accounts unique index diff --git a/internal/storage/bucket/migrations/19-accounts-recreate-unique-index/up.sql b/internal/storage/bucket/migrations/19-accounts-recreate-unique-index/up.sql new file mode 100644 index 000000000..1d8734023 --- /dev/null +++ b/internal/storage/bucket/migrations/19-accounts-recreate-unique-index/up.sql @@ -0,0 +1,9 @@ +-- There is already a covering index on accounts table (including seq column). +-- As we will remove the seq column in next migration, we have to create a new index without it (PG will remove it automatically in background). +-- Also, we create the index concurrently to avoid locking the table. +-- And, as there is already an index on this table, the index creation should not fail. +-- +-- We create this index in a dedicated as, as the doc mentions it (https://www.postgresql.org/docs/current/protocol-flow.html#PROTOCOL-FLOW-MULTI-STATEMENT) +-- multi statements queries are automatically wrapped inside transaction block, and it's forbidden +-- to create index concurrently inside a transaction block. +create unique index concurrently accounts_ledger2 on "{{.Bucket}}".accounts (ledger, address) \ No newline at end of file diff --git a/internal/storage/bucket/migrations/19-clean-database/notes.yaml b/internal/storage/bucket/migrations/20-clean-database/notes.yaml similarity index 100% rename from internal/storage/bucket/migrations/19-clean-database/notes.yaml rename to internal/storage/bucket/migrations/20-clean-database/notes.yaml diff --git a/internal/storage/bucket/migrations/19-clean-database/up.sql b/internal/storage/bucket/migrations/20-clean-database/up.sql similarity index 79% rename from internal/storage/bucket/migrations/19-clean-database/up.sql rename to internal/storage/bucket/migrations/20-clean-database/up.sql index 39597ae54..92aa70f53 100644 --- a/internal/storage/bucket/migrations/19-clean-database/up.sql +++ b/internal/storage/bucket/migrations/20-clean-database/up.sql @@ -73,4 +73,54 @@ alter table accounts_metadata drop column accounts_seq; alter table transactions +drop column seq; + +alter table accounts +drop column seq; + +-- rename index create in previous migration, as the drop of the column seq of accounts table has automatically dropped the index accounts_ledger +alter index accounts_ledger2 +rename to accounts_ledger; + +create or replace function set_log_hash() + returns trigger + security definer + language plpgsql +as +$$ +declare + previousHash bytea; + marshalledAsJSON varchar; +begin + select hash into previousHash + from logs + where ledger = new.ledger + order by id desc + limit 1; + + -- select only fields participating in the hash on the backend and format json representation the same way + select '{' || + '"type":"' || new.type || '",' || + '"data":' || encode(new.memento, 'escape') || ',' || + '"date":"' || (to_json(new.date::timestamp)#>>'{}') || 'Z",' || + '"idempotencyKey":"' || coalesce(new.idempotency_key, '') || '",' || + '"id":0,' || + '"hash":null' || + '}' into marshalledAsJSON; + + new.hash = ( + select public.digest( + case + when previousHash is null + then marshalledAsJSON::bytea + else '"' || encode(previousHash::bytea, 'base64')::bytea || E'"\n' || convert_to(marshalledAsJSON, 'LATIN1')::bytea + end || E'\n', 'sha256'::text + ) + ); + + return new; +end; +$$ set search_path from current; + +alter table logs drop column seq; \ No newline at end of file diff --git a/internal/storage/driver/driver.go b/internal/storage/driver/driver.go index 6f872ac4a..ff812366e 100644 --- a/internal/storage/driver/driver.go +++ b/internal/storage/driver/driver.go @@ -2,7 +2,7 @@ package driver import ( "context" - "database/sql" + "errors" "fmt" "github.com/formancehq/go-libs/v2/metadata" "github.com/formancehq/go-libs/v2/platform/postgres" @@ -35,41 +35,26 @@ type Driver struct { func (d *Driver) createLedgerStore(ctx context.Context, db bun.IDB, l *ledger.Ledger) (*ledgerstore.Store, error) { - tx, err := db.BeginTx(ctx, &sql.TxOptions{}) - if err != nil { - return nil, fmt.Errorf("begin transaction: %w", err) - } - - b := bucket.New(tx, l.Bucket) + b := bucket.New(d.db, l.Bucket) if err := b.Migrate(ctx, d.tracer); err != nil { return nil, fmt.Errorf("migrating bucket: %w", err) } - ret, err := db.NewInsert(). + _, err := db.NewInsert(). Model(l). - Ignore(). Returning("id, added_at"). Exec(ctx) if err != nil { + if errors.Is(postgres.ResolveError(err), postgres.ErrConstraintsFailed{}) { + return nil, systemcontroller.ErrLedgerAlreadyExists + } return nil, postgres.ResolveError(err) } - affected, err := ret.RowsAffected() - if err != nil { - return nil, fmt.Errorf("creating ledger: %w", err) - } - if affected == 0 { - return nil, systemcontroller.ErrLedgerAlreadyExists - } - - if err := b.AddLedger(ctx, *l, tx); err != nil { + if err := b.AddLedger(ctx, *l, d.db); err != nil { return nil, fmt.Errorf("adding ledger to bucket: %w", err) } - if err := tx.Commit(); err != nil { - return nil, fmt.Errorf("committing sql transaction to create ledger and schemas: %w", err) - } - return ledgerstore.New( d.db, b, @@ -81,29 +66,37 @@ func (d *Driver) createLedgerStore(ctx context.Context, db bun.IDB, l *ledger.Le func (d *Driver) CreateLedger(ctx context.Context, l *ledger.Ledger) (*ledgerstore.Store, error) { - // start a transaction because we will need to create the schema and apply ledger migrations - tx, err := d.db.BeginTx(ctx, &sql.TxOptions{}) - if err != nil { - return nil, fmt.Errorf("begin transaction: %w", err) - } - defer func() { - _ = tx.Rollback() - }() - if l.Metadata == nil { l.Metadata = metadata.Metadata{} } - store, err := d.createLedgerStore(ctx, tx, l) + b := bucket.New(d.db, l.Bucket) + if err := b.Migrate(ctx, d.tracer); err != nil { + return nil, fmt.Errorf("migrating bucket: %w", err) + } + + _, err := d.db.NewInsert(). + Model(l). + Returning("id, added_at"). + Exec(ctx) if err != nil { - return nil, err + if errors.Is(postgres.ResolveError(err), postgres.ErrConstraintsFailed{}) { + return nil, systemcontroller.ErrLedgerAlreadyExists + } + return nil, postgres.ResolveError(err) } - if err := tx.Commit(); err != nil { - return nil, fmt.Errorf("committing sql transaction to create ledger schema: %w", err) + if err := b.AddLedger(ctx, *l, d.db); err != nil { + return nil, fmt.Errorf("adding ledger to bucket: %w", err) } - return store, nil + return ledgerstore.New( + d.db, + b, + *l, + ledgerstore.WithMeter(d.meter), + ledgerstore.WithTracer(d.tracer), + ), nil } func (d *Driver) OpenLedger(ctx context.Context, name string) (*ledgerstore.Store, *ledger.Ledger, error) {