Skip to content

Commit

Permalink
feat: convert post_commit_volumes to jsonb
Browse files Browse the repository at this point in the history
  • Loading branch information
gfyrag committed Oct 23, 2024
1 parent 68f47ca commit 0a5dbbc
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 29 deletions.
1 change: 1 addition & 0 deletions internal/ledger.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ type Ledger struct {
}

func (l Ledger) HasFeature(feature, value string) bool {
// todo: to avoid development error we could check if the value is possible
return l.Features[feature] == value
}

Expand Down
75 changes: 51 additions & 24 deletions internal/storage/bucket/migrations/11-stateless.sql
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,39 @@ alter table "{{.Bucket}}".transactions
alter column id
type bigint;

alter type "{{.Bucket}}".volumes
rename attribute inputs to input;
alter table "{{.Bucket}}".moves
add column post_commit_volumes_jsonb jsonb;

alter table "{{.Bucket}}".moves
add column post_commit_effective_volumes_jsonb jsonb;

-- todo: add migration
-- update "{{.Bucket}}".moves
-- set post_commit_volumes_jsonb = json_build_object(
-- 'input', ((moves.post_commit_volumes).inputs),
-- 'output', ((moves.post_commit_volumes).outputs)
-- );
--
-- update "{{.Bucket}}".moves
-- set post_commit_effective_volumes_jsonb = json_build_object(
-- 'input', ((moves.post_commit_effective_volumes).inputs),
-- 'output', ((moves.post_commit_effective_volumes).outputs)
-- );

alter table "{{.Bucket}}".moves
drop column post_commit_volumes;

alter type "{{.Bucket}}".volumes
rename attribute outputs to output;
alter table "{{.Bucket}}".moves
drop column post_commit_effective_volumes;

alter table "{{.Bucket}}".moves
rename post_commit_volumes_jsonb to post_commit_volumes;

alter table "{{.Bucket}}".moves
rename post_commit_effective_volumes_jsonb to post_commit_effective_volumes;

alter table "{{.Bucket}}".transactions
add column post_commit_volumes jsonb not null ;
add column post_commit_volumes jsonb not null;

alter table "{{.Bucket}}".moves
alter column post_commit_volumes
Expand Down Expand Up @@ -60,8 +85,8 @@ select distinct on (ledger, accounts_seq, account_address, asset)
accounts_seq,
account_address as account,
asset,
(moves.post_commit_volumes).input as input,
(moves.post_commit_volumes).output as output
(moves.post_commit_volumes->>'input')::numeric as input,
(moves.post_commit_volumes->>'output')::numeric as output
from (
select *
from "{{.Bucket}}".moves
Expand Down Expand Up @@ -115,6 +140,9 @@ drop function "{{.Bucket}}".get_transaction(_ledger character varying, _id numer
drop function "{{.Bucket}}".explode_address(_address character varying);
drop function "{{.Bucket}}".revert_transaction(_ledger character varying, _id numeric, _date timestamp without time zone);

drop type "{{.Bucket}}".volumes_with_asset;
drop type "{{.Bucket}}".volumes;

create function "{{.Bucket}}".set_volumes()
returns trigger
security definer
Expand All @@ -123,19 +151,19 @@ as
$$
begin
new.post_commit_volumes = coalesce((
select (
(post_commit_volumes).input + case when new.is_source then 0 else new.amount end,
(post_commit_volumes).output + case when new.is_source then new.amount else 0 end
select json_build_object(
'input', (post_commit_volumes->>'input')::numeric + case when new.is_source then 0 else new.amount end,
'output', (post_commit_volumes->>'output')::numeric + case when new.is_source then new.amount else 0 end
)
from "{{.Bucket}}".moves
where accounts_seq = new.accounts_seq
and asset = new.asset
and ledger = new.ledger
order by seq desc
limit 1
), (
case when new.is_source then 0 else new.amount end,
case when new.is_source then new.amount else 0 end
), json_build_object(
'input', case when new.is_source then 0 else new.amount end,
'output', case when new.is_source then new.amount else 0 end
));

return new;
Expand All @@ -150,9 +178,9 @@ as
$$
begin
new.post_commit_effective_volumes = coalesce((
select (
(post_commit_effective_volumes).input + case when new.is_source then 0 else new.amount end,
(post_commit_effective_volumes).output + case when new.is_source then new.amount else 0 end
select json_build_object(
'input', (post_commit_effective_volumes->>'input')::numeric + case when new.is_source then 0 else new.amount end,
'output', (post_commit_effective_volumes->>'output')::numeric + case when new.is_source then new.amount else 0 end
)
from "{{.Bucket}}".moves
where accounts_seq = new.accounts_seq
Expand All @@ -161,9 +189,9 @@ begin
and (effective_date < new.effective_date or (effective_date = new.effective_date and seq < new.seq))
order by effective_date desc, seq desc
limit 1
), (
case when new.is_source then 0 else new.amount end,
case when new.is_source then new.amount else 0 end
), json_build_object(
'input', case when new.is_source then 0 else new.amount end,
'output', case when new.is_source then new.amount else 0 end
));

return new;
Expand All @@ -178,11 +206,10 @@ as
$$
begin
update "{{.Bucket}}".moves
set post_commit_effective_volumes =
(
(post_commit_effective_volumes).input + case when new.is_source then 0 else new.amount end,
(post_commit_effective_volumes).output + case when new.is_source then new.amount else 0 end
)
set post_commit_effective_volumes = json_build_object(
'input', (post_commit_effective_volumes->>'input')::numeric + case when new.is_source then 0 else new.amount end,
'output', (post_commit_effective_volumes->>'output')::numeric + case when new.is_source then new.amount else 0 end
)
where accounts_seq = new.accounts_seq
and asset = new.asset
and effective_date > new.effective_date
Expand Down
6 changes: 3 additions & 3 deletions internal/storage/ledger/accounts.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func convertOperatorToSQL(operator string) string {

func (s *Store) selectBalance(date *time.Time) *bun.SelectQuery {
return s.SortMovesBySeq(date).
ColumnExpr("(post_commit_volumes).input - (post_commit_volumes).output as balance").
ColumnExpr("(post_commit_volumes->>'input')::numeric - (post_commit_volumes->>'output')::numeric as balance").
Limit(1)
}

Expand Down Expand Up @@ -195,7 +195,7 @@ func (s *Store) selectAccounts(date *time.Time, expandVolumes, expandEffectiveVo
s.db.NewSelect().
TableExpr("(?) v", s.SelectDistinctMovesBySeq(date)).
Column("accounts_seq").
ColumnExpr(`to_json(array_agg(json_build_object('asset', v.asset, 'input', (v.post_commit_volumes).input, 'output', (v.post_commit_volumes).output))) as pcv`).
ColumnExpr(`to_json(array_agg(json_build_object('asset', v.asset, 'input', (v.post_commit_volumes->>'input')::numeric, 'output', (v.post_commit_volumes->>'output')::numeric))) as pcv`).
Group("accounts_seq"),
).
ColumnExpr("pcv.*")
Expand All @@ -208,7 +208,7 @@ func (s *Store) selectAccounts(date *time.Time, expandVolumes, expandEffectiveVo
s.db.NewSelect().
TableExpr("(?) v", s.SelectDistinctMovesByEffectiveDate(date)).
Column("accounts_seq").
ColumnExpr(`to_json(array_agg(json_build_object('asset', v.asset, 'input', (v.post_commit_effective_volumes).input, 'output', (v.post_commit_effective_volumes).output))) as pcev`).
ColumnExpr(`to_json(array_agg(json_build_object('asset', v.asset, 'input', (v.post_commit_effective_volumes->>'input')::numeric, 'output', (v.post_commit_effective_volumes->>'output')::numeric))) as pcev`).
Group("accounts_seq"),
).
ColumnExpr("pcev.*")
Expand Down
4 changes: 2 additions & 2 deletions internal/storage/ledger/balances.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func (s *Store) selectAccountWithVolumes(date *time.Time, useInsertionDate bool,
ModelTableExpr(s.GetPrefixedRelationName("accounts_volumes")).
Column("asset", "accounts_seq").
ColumnExpr("account as account_address").
ColumnExpr("(input, output)::"+s.GetPrefixedRelationName("volumes")+" as volumes").
ColumnExpr("json_build_object('input', input, 'output', output) as volumes").
Where("ledger = ?", s.ledger.Name)
}

Expand Down Expand Up @@ -169,7 +169,7 @@ func (s *Store) selectAccountWithVolumes(date *time.Time, useInsertionDate bool,
func (s *Store) SelectAggregatedBalances(date *time.Time, useInsertionDate bool, builder query.Builder) *bun.SelectQuery {
return s.db.NewSelect().
ModelTableExpr("(?) accounts", s.selectAccountWithVolumes(date, useInsertionDate, builder)).
ColumnExpr(`to_json(array_agg(json_build_object('asset', accounts.asset, 'input', (accounts.volumes).input, 'output', (accounts.volumes).output))) as aggregated`)
ColumnExpr(`to_json(array_agg(json_build_object('asset', accounts.asset, 'input', (accounts.volumes->>'input')::numeric, 'output', (accounts.volumes->>'output')::numeric))) as aggregated`)
}

func (s *Store) GetAggregatedBalances(ctx context.Context, q ledgercontroller.GetAggregatedBalanceQuery) (ledger.BalancesByAssets, error) {
Expand Down

0 comments on commit 0a5dbbc

Please sign in to comment.