Skip to content

Commit

Permalink
fix: recreate account index concurrently
Browse files Browse the repository at this point in the history
  • Loading branch information
gfyrag committed Oct 22, 2024
1 parent 28cc2b0 commit dac31bf
Show file tree
Hide file tree
Showing 5 changed files with 89 additions and 36 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
name: Recreate accounts unique index
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
-- There is already a covering index on accounts table (including seq column).
-- As we will remove the seq column in next migration, we have to create a new index without it (PG will remove it automatically in background).
-- Also, we create the index concurrently to avoid locking the table.
-- And, as there is already an index on this table, the index creation should not fail.
--
-- We create this index in a dedicated as, as the doc mentions it (https://www.postgresql.org/docs/current/protocol-flow.html#PROTOCOL-FLOW-MULTI-STATEMENT)
-- multi statements queries are automatically wrapped inside transaction block, and it's forbidden
-- to create index concurrently inside a transaction block.
create unique index concurrently accounts_ledger2 on "{{.Bucket}}".accounts (ledger, address)
Original file line number Diff line number Diff line change
Expand Up @@ -73,4 +73,54 @@ alter table accounts_metadata
drop column accounts_seq;

alter table transactions
drop column seq;

alter table accounts
drop column seq;

-- rename index create in previous migration, as the drop of the column seq of accounts table has automatically dropped the index accounts_ledger
alter index accounts_ledger2
rename to accounts_ledger;

create or replace function set_log_hash()
returns trigger
security definer
language plpgsql
as
$$
declare
previousHash bytea;
marshalledAsJSON varchar;
begin
select hash into previousHash
from logs
where ledger = new.ledger
order by id desc
limit 1;

-- select only fields participating in the hash on the backend and format json representation the same way
select '{' ||
'"type":"' || new.type || '",' ||
'"data":' || encode(new.memento, 'escape') || ',' ||
'"date":"' || (to_json(new.date::timestamp)#>>'{}') || 'Z",' ||
'"idempotencyKey":"' || coalesce(new.idempotency_key, '') || '",' ||
'"id":0,' ||
'"hash":null' ||
'}' into marshalledAsJSON;

new.hash = (
select public.digest(
case
when previousHash is null
then marshalledAsJSON::bytea
else '"' || encode(previousHash::bytea, 'base64')::bytea || E'"\n' || convert_to(marshalledAsJSON, 'LATIN1')::bytea
end || E'\n', 'sha256'::text
)
);

return new;
end;
$$ set search_path from current;

alter table logs
drop column seq;
65 changes: 29 additions & 36 deletions internal/storage/driver/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package driver

import (
"context"
"database/sql"
"errors"
"fmt"
"github.com/formancehq/go-libs/v2/metadata"
"github.com/formancehq/go-libs/v2/platform/postgres"
Expand Down Expand Up @@ -35,41 +35,26 @@ type Driver struct {

func (d *Driver) createLedgerStore(ctx context.Context, db bun.IDB, l *ledger.Ledger) (*ledgerstore.Store, error) {

tx, err := db.BeginTx(ctx, &sql.TxOptions{})
if err != nil {
return nil, fmt.Errorf("begin transaction: %w", err)
}

b := bucket.New(tx, l.Bucket)
b := bucket.New(d.db, l.Bucket)
if err := b.Migrate(ctx, d.tracer); err != nil {
return nil, fmt.Errorf("migrating bucket: %w", err)
}

ret, err := db.NewInsert().
_, err := db.NewInsert().
Model(l).
Ignore().
Returning("id, added_at").
Exec(ctx)
if err != nil {
if errors.Is(postgres.ResolveError(err), postgres.ErrConstraintsFailed{}) {
return nil, systemcontroller.ErrLedgerAlreadyExists
}
return nil, postgres.ResolveError(err)
}

affected, err := ret.RowsAffected()
if err != nil {
return nil, fmt.Errorf("creating ledger: %w", err)
}
if affected == 0 {
return nil, systemcontroller.ErrLedgerAlreadyExists
}

if err := b.AddLedger(ctx, *l, tx); err != nil {
if err := b.AddLedger(ctx, *l, d.db); err != nil {
return nil, fmt.Errorf("adding ledger to bucket: %w", err)
}

if err := tx.Commit(); err != nil {
return nil, fmt.Errorf("committing sql transaction to create ledger and schemas: %w", err)
}

return ledgerstore.New(
d.db,
b,
Expand All @@ -81,29 +66,37 @@ func (d *Driver) createLedgerStore(ctx context.Context, db bun.IDB, l *ledger.Le

func (d *Driver) CreateLedger(ctx context.Context, l *ledger.Ledger) (*ledgerstore.Store, error) {

// start a transaction because we will need to create the schema and apply ledger migrations
tx, err := d.db.BeginTx(ctx, &sql.TxOptions{})
if err != nil {
return nil, fmt.Errorf("begin transaction: %w", err)
}
defer func() {
_ = tx.Rollback()
}()

if l.Metadata == nil {
l.Metadata = metadata.Metadata{}
}

store, err := d.createLedgerStore(ctx, tx, l)
b := bucket.New(d.db, l.Bucket)
if err := b.Migrate(ctx, d.tracer); err != nil {
return nil, fmt.Errorf("migrating bucket: %w", err)
}

_, err := d.db.NewInsert().
Model(l).
Returning("id, added_at").
Exec(ctx)
if err != nil {
return nil, err
if errors.Is(postgres.ResolveError(err), postgres.ErrConstraintsFailed{}) {
return nil, systemcontroller.ErrLedgerAlreadyExists
}
return nil, postgres.ResolveError(err)
}

if err := tx.Commit(); err != nil {
return nil, fmt.Errorf("committing sql transaction to create ledger schema: %w", err)
if err := b.AddLedger(ctx, *l, d.db); err != nil {
return nil, fmt.Errorf("adding ledger to bucket: %w", err)
}

return store, nil
return ledgerstore.New(
d.db,
b,
*l,
ledgerstore.WithMeter(d.meter),
ledgerstore.WithTracer(d.tracer),
), nil
}

func (d *Driver) OpenLedger(ctx context.Context, name string) (*ledgerstore.Store, *ledger.Ledger, error) {
Expand Down

0 comments on commit dac31bf

Please sign in to comment.