Skip to content

Commit

Permalink
feat: merge stores migrations
Browse files Browse the repository at this point in the history
  • Loading branch information
gfyrag committed Oct 23, 2024
1 parent 0cff551 commit 8ddee28
Show file tree
Hide file tree
Showing 11 changed files with 294 additions and 280 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ require (
github.com/alitto/pond v1.9.2
github.com/antlr/antlr4/runtime/Go/antlr v1.4.10
github.com/bluele/gcache v0.0.2
github.com/formancehq/go-libs/v2 v2.1.3-0.20241017104005-ad844342baae
github.com/formancehq/go-libs/v2 v2.0.0-20241017113509-22db708b22a5
github.com/formancehq/stack/ledger/client v0.0.0-00010101000000-000000000000
github.com/go-chi/chi/v5 v5.1.0
github.com/go-chi/cors v1.2.1
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,10 @@ github.com/fatih/color v1.17.0 h1:GlRw1BRJxkpqUCBKzKOw098ed57fEsKeNjpTe3cSjK4=
github.com/fatih/color v1.17.0/go.mod h1:YZ7TlrGPkiz6ku9fK3TLD/pl3CpsiFyu8N92HLgmosI=
github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2Wg=
github.com/felixge/httpsnoop v1.0.4/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U=
github.com/formancehq/go-libs/v2 v2.0.0-20241017113509-22db708b22a5 h1:uwsBFFeyh7dI3O3EnoGuyy569WsRXFhNjEaSD4AN+eg=
github.com/formancehq/go-libs/v2 v2.0.0-20241017113509-22db708b22a5/go.mod h1:LgxayMN6wgAQbkB3ioBDTHOVMKp1rC6Q55M1CvG44xY=
github.com/formancehq/go-libs/v2 v2.1.2 h1:AL5A2LgpepFU6+ovvkRXkYUfupPY46BsRHp0M6SFTUY=
github.com/formancehq/go-libs/v2 v2.1.2/go.mod h1:+JVecYnhf7xTbbz65nwxBDehdMx1JgHIitvnxXYuuAI=
github.com/formancehq/go-libs/v2 v2.1.3-0.20241017101710-5a4a6a4cb50d h1:tZZo5yypip02aAkbkp6kGDC8Lvo032DlE/0yVhu8iNY=
github.com/formancehq/go-libs/v2 v2.1.3-0.20241017101710-5a4a6a4cb50d/go.mod h1:LgxayMN6wgAQbkB3ioBDTHOVMKp1rC6Q55M1CvG44xY=
github.com/formancehq/go-libs/v2 v2.1.3-0.20241017104005-ad844342baae h1:29GM6mMhC/7mVsbZJ6qgvQEGAcLSNXwWD3DNKo2Kikk=
Expand Down
165 changes: 163 additions & 2 deletions internal/storage/bucket/bucket.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
package bucket

import (
"bytes"
"context"
_ "embed"
"go.opentelemetry.io/otel/trace"

"errors"
"fmt"
"github.com/formancehq/go-libs/v2/migrations"
ledger "github.com/formancehq/ledger/internal"
"github.com/uptrace/bun"
"go.opentelemetry.io/otel/trace"
"text/template"
)

type Bucket struct {
Expand All @@ -27,9 +30,167 @@ func (b *Bucket) IsUpToDate(ctx context.Context) (bool, error) {
return ret, err
}

func (b *Bucket) GetMigrationsInfo(ctx context.Context) ([]migrations.Info, error) {
return GetMigrator(b.name).GetMigrations(ctx, b.db)
}

func (b *Bucket) AddLedger(ctx context.Context, l ledger.Ledger, db bun.IDB) error {

tpl := template.Must(template.New("sql").Parse(addLedgerTpl))
buf := bytes.NewBuffer(nil)
if err := tpl.Execute(buf, l); err != nil {
return fmt.Errorf("executing template: %w", err)
}

_, err := db.ExecContext(ctx, buf.String())
if err != nil {
return fmt.Errorf("executing sql: %w", err)
}

return nil
}

func New(db bun.IDB, name string) *Bucket {
return &Bucket{
db: db,
name: name,
}
}

const addLedgerTpl = `
-- create a sequence for transactions by ledger instead of a sequence of the table as we want to have contiguous ids
-- notes: we can still have "holes" on ids since a sql transaction can be reverted after a usage of the sequence
create sequence "{{.Bucket}}"."transaction_id_{{.ID}}" owned by "{{.Bucket}}".transactions.id;
select setval('"{{.Bucket}}"."transaction_id_{{.ID}}"', coalesce((
select max(id) + 1
from "{{.Bucket}}".transactions
where ledger = '{{ .Name }}'
), 1)::bigint, false);
-- create a sequence for logs by ledger instead of a sequence of the table as we want to have contiguous ids
-- notes: we can still have "holes" on id since a sql transaction can be reverted after a usage of the sequence
create sequence "{{.Bucket}}"."log_id_{{.ID}}" owned by "{{.Bucket}}".logs.id;
select setval('"{{.Bucket}}"."log_id_{{.ID}}"', coalesce((
select max(id) + 1
from "{{.Bucket}}".logs
where ledger = '{{ .Name }}'
), 1)::bigint, false);
-- enable post commit effective volumes synchronously
{{ if .HasFeature "MOVES_HISTORY_POST_COMMIT_EFFECTIVE_VOLUMES" "SYNC" }}
create index "pcev_{{.ID}}" on "{{.Bucket}}".moves (accounts_address, asset, effective_date desc) where ledger = '{{.Name}}';
create trigger "set_effective_volumes_{{.ID}}"
before insert
on "{{.Bucket}}"."moves"
for each row
when (
new.ledger = '{{.Name}}'
)
execute procedure "{{.Bucket}}".set_effective_volumes();
create trigger "update_effective_volumes_{{.ID}}"
after insert
on "{{.Bucket}}"."moves"
for each row
when (
new.ledger = '{{.Name}}'
)
execute procedure "{{.Bucket}}".update_effective_volumes();
{{ end }}
-- logs hash
{{ if .HasFeature "HASH_LOGS" "SYNC" }}
create trigger "set_log_hash_{{.ID}}"
before insert
on "{{.Bucket}}"."logs"
for each row
when (
new.ledger = '{{.Name}}'
)
execute procedure "{{.Bucket}}".set_log_hash();
{{ end }}
{{ if .HasFeature "ACCOUNT_METADATA_HISTORY" "SYNC" }}
create trigger "update_account_metadata_history_{{.ID}}"
after update
on "{{.Bucket}}"."accounts"
for each row
when (
new.ledger = '{{.Name}}'
)
execute procedure "{{.Bucket}}".update_account_metadata_history();
create trigger "insert_account_metadata_history_{{.ID}}"
after insert
on "{{.Bucket}}"."accounts"
for each row
when (
new.ledger = '{{.Name}}'
)
execute procedure "{{.Bucket}}".insert_account_metadata_history();
{{ end }}
{{ if .HasFeature "TRANSACTION_METADATA_HISTORY" "SYNC" }}
create trigger "update_transaction_metadata_history_{{.ID}}"
after update
on "{{.Bucket}}"."transactions"
for each row
when (
new.ledger = '{{.Name}}'
)
execute procedure "{{.Bucket}}".update_transaction_metadata_history();
create trigger "insert_transaction_metadata_history_{{.ID}}"
after insert
on "{{.Bucket}}"."transactions"
for each row
when (
new.ledger = '{{.Name}}'
)
execute procedure "{{.Bucket}}".insert_transaction_metadata_history();
{{ end }}
{{ if .HasFeature "INDEX_TRANSACTION_ACCOUNTS" "ON" }}
create index "transactions_sources_{{.ID}}" on "{{.Bucket}}".transactions using gin (sources jsonb_path_ops) where ledger = '{{.Name}}';
create index "transactions_destinations_{{.ID}}" on "{{.Bucket}}".transactions using gin (destinations jsonb_path_ops) where ledger = '{{.Name}}';
create trigger "transaction_set_addresses_{{.ID}}"
before insert
on "{{.Bucket}}"."transactions"
for each row
when (
new.ledger = '{{.Name}}'
)
execute procedure "{{.Bucket}}".set_transaction_addresses();
{{ end }}
{{ if .HasFeature "INDEX_ADDRESS_SEGMENTS" "ON" }}
create index "accounts_address_array_{{.ID}}" on "{{.Bucket}}".accounts using gin (address_array jsonb_ops) where ledger = '{{.Name}}';
create index "accounts_address_array_length_{{.ID}}" on "{{.Bucket}}".accounts (jsonb_array_length(address_array)) where ledger = '{{.Name}}';
create trigger "accounts_set_address_array_{{.ID}}"
before insert
on "{{.Bucket}}"."accounts"
for each row
when (
new.ledger = '{{.Name}}'
)
execute procedure "{{.Bucket}}".set_address_array_for_account();
{{ if .HasFeature "INDEX_TRANSACTION_ACCOUNTS" "ON" }}
create index "transactions_sources_arrays_{{.ID}}" on "{{.Bucket}}".transactions using gin (sources_arrays jsonb_path_ops) where ledger = '{{.Name}}';
create index "transactions_destinations_arrays_{{.ID}}" on "{{.Bucket}}".transactions using gin (destinations_arrays jsonb_path_ops) where ledger = '{{.Name}}';
create trigger "transaction_set_addresses_segments_{{.ID}}"
before insert
on "{{.Bucket}}"."transactions"
for each row
when (
new.ledger = '{{.Name}}'
)
execute procedure "{{.Bucket}}".set_transaction_addresses_segments();
{{ end }}
{{ end }}
`
79 changes: 79 additions & 0 deletions internal/storage/bucket/migrations/34-set-ledger-specifics.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
DO
$do$
declare
ledger record;
vsql text;
BEGIN
for ledger in select * from _system.ledgers where bucket = '{{.Bucket}}' loop
-- create a sequence for transactions by ledger instead of a sequence of the table as we want to have contiguous ids
-- notes: we can still have "holes" on ids since a sql transaction can be reverted after a usage of the sequence

vsql = 'create sequence "{{.Bucket}}"."transaction_id_' || ledger.id || '" owned by "{{.Bucket}}".transactions.id';
execute vsql;

vsql = 'select setval("{{.Bucket}}"."transaction_id_' || ledger.id || '", coalesce((select max(id) + 1 from "{{.Bucket}}".transactions where ledger = ledger.name), 1)::bigint, false)';
execute vsql;

-- create a sequence for logs by ledger instead of a sequence of the table as we want to have contiguous ids
-- notes: we can still have "holes" on id since a sql transaction can be reverted after a usage of the sequence
vsql = 'create sequence "{{.Bucket}}"."log_id_' || ledger.id || '" owned by "{{.Bucket}}".logs.id';
execute vsql;

vsql = 'select setval("{{.Bucket}}"."log_id_' || ledger.id || '", coalesce((select max(id) + 1 from "{{.Bucket}}".logs where ledger = ledger.name), 1)::bigint, false)';
execute vsql;

-- enable post commit effective volumes synchronously
vsql = 'create index "pcev_' || ledger.id || '" on "{{.Bucket}}".moves (accounts_address, asset, effective_date desc) where ledger = ledger.name';
execute vsql;

vsql = 'create trigger "set_effective_volumes_' || ledger.id || '" before insert on "{{.Bucket}}".moves for each row when (new.ledger = ledger.name) execute procedure "{{.Bucket}}".set_effective_volumes()';
execute vsql;

vsql = 'create trigger "update_effective_volumes_' || ledger.id || '" after insert on "{{.Bucket}}".moves for each row when (new.ledger = ledger.name) execute procedure "{{.Bucket}}".update_effective_volumes()';
execute vsql;

-- logs hash
vsql = 'create trigger "set_log_hash_' || ledger.id || '" before insert on "{{.Bucket}}".logs for each row when (new.ledger = ledger.name) execute procedure "{{.Bucket}}".set_log_hash()';
execute vsql;

vsql = 'create trigger "update_account_metadata_history_' || ledger.id || '" after update on "{{.Bucket}}"."accounts" for each row when (new.ledger = ledger.name) execute procedure "{{.Bucket}}".update_account_metadata_history()';
execute vsql;

vsql = 'create trigger "insert_account_metadata_history_' || ledger.id || '" after insert on "{{.Bucket}}"."accounts" for each row when (new.ledger = ledger.name) execute procedure "{{.Bucket}}".insert_account_metadata_history()';
execute vsql;

vsql = 'create trigger "update_transaction_metadata_history_' || ledger.id || '" after update on "{{.Bucket}}"."transactions" for each row when (new.ledger = ledger.name) execute procedure "{{.Bucket}}".update_transaction_metadata_history()';
execute vsql;

vsql = 'create trigger "insert_transaction_metadata_history_' || ledger.id || '" after insert on "{{.Bucket}}"."transactions" for each row when (new.ledger = ledger.name) execute procedure "{{.Bucket}}".insert_transaction_metadata_history()';
execute vsql;

vsql = 'create index "transactions_sources_' || ledger.id || '" on "{{.Bucket}}".transactions using gin (sources jsonb_path_ops) where ledger = ledger.name';
execute vsql;

vsql = 'create index "transactions_destinations_' || ledger.id || '" on "{{.Bucket}}".transactions using gin (destinations jsonb_path_ops) where ledger = ledger.name';
execute vsql;

vsql = 'create trigger "transaction_set_addresses_' || ledger.id || '" before insert on "{{.Bucket}}".transactions for each row when (new.ledger = ledger.name) execute procedure "{{.Bucket}}".set_transaction_addresses()';
execute vsql;

vsql = 'create index "accounts_address_array_' || ledger.id || '" on "{{.Bucket}}".accounts using gin (address_array jsonb_ops) where ledger = ledger.name';
execute vsql;

vsql = 'create index "accounts_address_array_length_' || ledger.id || '" on "{{.Bucket}}".accounts (jsonb_array_length(address_array)) where ledger = ledger.name';
execute vsql;

vsql = 'create trigger "accounts_set_address_array_' || ledger.id || '" before insert on "{{.Bucket}}".accounts for each row when (new.ledger = ledger.name) execute procedure "{{.Bucket}}".set_address_array_for_account()';
execute vsql;

vsql = 'create index "transactions_sources_arrays_' || ledger.id || '" on "{{.Bucket}}".transactions using gin (sources_arrays jsonb_path_ops) where ledger = ledger.name';
execute vsql;

vsql = 'create index "transactions_destinations_arrays_' || ledger.id || '" on "{{.Bucket}}".transactions using gin (destinations_arrays jsonb_path_ops) where ledger = ledger.name';
execute vsql;

vsql = 'create trigger "transaction_set_addresses_segments_' || ledger.id || '" before insert on "{{.Bucket}}"."transactions" for each row when (new.ledger = ledger.name) execute procedure "{{.Bucket}}".set_transaction_addresses_segments()';
execute vsql;
end loop;
END
$do$;
Loading

0 comments on commit 8ddee28

Please sign in to comment.