Skip to content

Commit

Permalink
feat: import legacy store
Browse files Browse the repository at this point in the history
  • Loading branch information
gfyrag committed Oct 23, 2024
1 parent 6a3031d commit e2b7b5b
Show file tree
Hide file tree
Showing 21 changed files with 2,884 additions and 8 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,5 @@ go.work*
*.jar
node_modules
dist
vendor
vendor
worktrees
20 changes: 20 additions & 0 deletions internal/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,8 @@ import "github.com/formancehq/ledger/internal"
- [func \(v Volumes\) Copy\(\) Volumes](<#Volumes.Copy>)
- [func \(Volumes\) JSONSchemaExtend\(schema \*jsonschema.Schema\)](<#Volumes.JSONSchemaExtend>)
- [func \(v Volumes\) MarshalJSON\(\) \(\[\]byte, error\)](<#Volumes.MarshalJSON>)
- [func \(v \*Volumes\) Scan\(src interface\{\}\) error](<#Volumes.Scan>)
- [func \(v Volumes\) Value\(\) \(driver.Value, error\)](<#Volumes.Value>)
- [type VolumesByAssets](<#VolumesByAssets>)
- [func \(v VolumesByAssets\) Balances\(\) BalancesByAssets](<#VolumesByAssets.Balances>)
- [type VolumesWithBalance](<#VolumesWithBalance>)
Expand Down Expand Up @@ -1149,6 +1151,24 @@ func (v Volumes) MarshalJSON() ([]byte, error)



<a name="Volumes.Scan"></a>
### func \(\*Volumes\) Scan

```go
func (v *Volumes) Scan(src interface{}) error
```



<a name="Volumes.Value"></a>
### func \(Volumes\) Value

```go
func (v Volumes) Value() (driver.Value, error)
```



<a name="VolumesByAssets"></a>
## type VolumesByAssets

Expand Down
162 changes: 161 additions & 1 deletion internal/storage/bucket/migrations/11-make-stateless.sql
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,171 @@ add column transactions_id bigint,
alter column post_commit_volumes drop not null,
alter column post_commit_effective_volumes drop not null,
alter column insertion_date set default (now() at time zone 'utc'),
alter column effective_date set default (now() at time zone 'utc');
alter column effective_date set default (now() at time zone 'utc'),
alter column account_address_array drop not null;

alter table moves
rename column account_address to accounts_address;

alter table moves
rename column account_address_array to accounts_address_array;

-- since the column `account_address` has been renamed to `accounts_address`, we need to update the function
create or replace function get_aggregated_volumes_for_transaction(_ledger varchar, tx numeric) returns jsonb
stable
language sql
as
$$
select aggregate_objects(jsonb_build_object(data.accounts_address, data.aggregated))
from (
select distinct on (move.accounts_address, move.asset)
move.accounts_address,
volumes_to_jsonb((move.asset, first(move.post_commit_volumes))) as aggregated
from (select * from moves order by seq desc) move
where move.transactions_seq = tx and
ledger = _ledger
group by move.accounts_address, move.asset
) data
$$ set search_path from current;

create or replace function get_aggregated_effective_volumes_for_transaction(_ledger varchar, tx numeric) returns jsonb
stable
language sql
as
$$
select aggregate_objects(jsonb_build_object(data.accounts_address, data.aggregated))
from (
select distinct on (move.accounts_address, move.asset)
move.accounts_address,
volumes_to_jsonb((move.asset, first(move.post_commit_effective_volumes))) as aggregated
from (select * from moves order by seq desc) move
where move.transactions_seq = tx
and ledger = _ledger
group by move.accounts_address, move.asset
) data
$$ set search_path from current;

create or replace function get_all_account_effective_volumes(_ledger varchar, _account varchar, _before timestamp default null)
returns setof volumes_with_asset
language sql
stable
as
$$
with all_assets as (select v.v as asset
from get_all_assets(_ledger) v),
moves as (select m.*
from all_assets assets
join lateral (
select *
from moves s
where (_before is null or s.effective_date <= _before)
and s.accounts_address = _account
and s.asset = assets.asset
and s.ledger = _ledger
order by effective_date desc, seq desc
limit 1
) m on true)
select moves.asset, moves.post_commit_effective_volumes
from moves
$$ set search_path from current;

create or replace function get_all_account_volumes(_ledger varchar, _account varchar, _before timestamp default null)
returns setof volumes_with_asset
language sql
stable
as
$$
with all_assets as (select v.v as asset
from get_all_assets(_ledger) v),
moves as (select m.*
from all_assets assets
join lateral (
select *
from moves s
where (_before is null or s.insertion_date <= _before)
and s.accounts_address = _account
and s.asset = assets.asset
and s.ledger = _ledger
order by seq desc
limit 1
) m on true)
select moves.asset, moves.post_commit_volumes
from moves
$$ set search_path from current;

-- notes(gfyrag): temporary trigger to be able to handle writes on the old schema (the code does not specify this anymore)
create or replace function set_compat_on_move()
returns trigger
security definer
language plpgsql
as
$$
begin
new.transactions_seq = (
select seq
from transactions
where id = new.transactions_id and ledger = new.ledger
);
new.accounts_seq = (
select seq
from accounts
where address = new.accounts_address and ledger = new.ledger
);
new.accounts_address_array = to_json(string_to_array(new.accounts_address, ':'));

return new;
end;
$$ set search_path from current;

create trigger set_compat_on_move
before insert on moves
for each row
execute procedure set_compat_on_move();

create or replace function set_compat_on_accounts_metadata()
returns trigger
security definer
language plpgsql
as
$$
begin
new.accounts_seq = (
select seq
from accounts
where address = new.accounts_address and ledger = new.ledger
);

return new;
end;
$$ set search_path from current;

create trigger set_compat_on_accounts_metadata
before insert on accounts_metadata
for each row
execute procedure set_compat_on_accounts_metadata();

create or replace function set_compat_on_transactions_metadata()
returns trigger
security definer
language plpgsql
as
$$
begin
new.transactions_seq = (
select seq
from transactions
where id = new.transactions_id and ledger = new.ledger
);

return new;
end;
$$ set search_path from current;

create trigger set_compat_on_transactions_metadata
before insert on transactions_metadata
for each row
execute procedure set_compat_on_transactions_metadata();

alter table transactions
add column post_commit_volumes jsonb,
add column inserted_at timestamp without time zone default (now() at time zone 'utc'),
Expand Down
2 changes: 0 additions & 2 deletions internal/storage/ledger/accounts.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,8 +205,6 @@ func (s *Store) selectAccounts(date *time.Time, expandVolumes, expandEffectiveVo
}
}

s.DumpQuery(context.Background(), ret)

return ret
}

Expand Down
Loading

0 comments on commit e2b7b5b

Please sign in to comment.