Skip to content

Commit

Permalink
feat: homogeneize volumes names
Browse files Browse the repository at this point in the history
  • Loading branch information
gfyrag committed Oct 23, 2024
1 parent e705b65 commit 53d5611
Show file tree
Hide file tree
Showing 10 changed files with 124 additions and 123 deletions.
30 changes: 18 additions & 12 deletions internal/storage/bucket/migrations/11-stateless.sql
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,12 @@ alter table "{{.Bucket}}".transactions
alter column id
type bigint;

alter type "{{.Bucket}}".volumes
rename attribute inputs to input;

alter type "{{.Bucket}}".volumes
rename attribute outputs to output;

alter table "{{.Bucket}}".moves
alter column post_commit_volumes
drop not null,
Expand All @@ -35,24 +41,24 @@ create table "{{.Bucket}}".accounts_volumes (
accounts_seq int not null,
account varchar not null,
asset varchar not null,
inputs numeric not null,
outputs numeric not null,
input numeric not null,
output numeric not null,

primary key (ledger, account, asset)
);

create view "{{.Bucket}}".balances as
select ledger, accounts_seq, account, asset, inputs - outputs as balance
select ledger, accounts_seq, account, asset, input - output as balance
from "{{.Bucket}}".accounts_volumes;

insert into "{{.Bucket}}".accounts_volumes (ledger, accounts_seq, account, asset, inputs, outputs)
insert into "{{.Bucket}}".accounts_volumes (ledger, accounts_seq, account, asset, input, output)
select distinct on (ledger, accounts_seq, account_address, asset)
ledger,
accounts_seq,
account_address as account,
asset,
(moves.post_commit_volumes).inputs as inputs,
(moves.post_commit_volumes).outputs as outputs
(moves.post_commit_volumes).input as input,
(moves.post_commit_volumes).output as output
from (
select *
from "{{.Bucket}}".moves
Expand Down Expand Up @@ -116,8 +122,8 @@ begin
--todo: use balances table directly...
new.post_commit_volumes = coalesce((
select (
(post_commit_volumes).inputs + case when new.is_source then 0 else new.amount end,
(post_commit_volumes).outputs + case when new.is_source then new.amount else 0 end
(post_commit_volumes).input + case when new.is_source then 0 else new.amount end,
(post_commit_volumes).output + case when new.is_source then new.amount else 0 end
)
from "{{.Bucket}}".moves
where accounts_seq = new.accounts_seq
Expand All @@ -143,8 +149,8 @@ $$
begin
new.post_commit_effective_volumes = coalesce((
select (
(post_commit_effective_volumes).inputs + case when new.is_source then 0 else new.amount end,
(post_commit_effective_volumes).outputs + case when new.is_source then new.amount else 0 end
(post_commit_effective_volumes).input + case when new.is_source then 0 else new.amount end,
(post_commit_effective_volumes).output + case when new.is_source then new.amount else 0 end
)
from "{{.Bucket}}".moves
where accounts_seq = new.accounts_seq
Expand Down Expand Up @@ -172,8 +178,8 @@ begin
update "{{.Bucket}}".moves
set post_commit_effective_volumes =
(
(post_commit_effective_volumes).inputs + case when new.is_source then 0 else new.amount end,
(post_commit_effective_volumes).outputs + case when new.is_source then new.amount else 0 end
(post_commit_effective_volumes).input + case when new.is_source then 0 else new.amount end,
(post_commit_effective_volumes).output + case when new.is_source then new.amount else 0 end
)
where accounts_seq = new.accounts_seq
and asset = new.asset
Expand Down
21 changes: 8 additions & 13 deletions internal/storage/ledger/accounts.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,8 @@ import (
"github.com/uptrace/bun"
)

type Volumes struct {
Inputs *big.Int `bun:"inputs" json:"inputs"`
Outputs *big.Int `bun:"outputs" json:"outputs"`
}

type AggregatedAccountVolume struct {
Volumes
ledger.Volumes
Asset string `bun:"asset"`
}

Expand All @@ -48,12 +43,12 @@ func (volumes AggregatedAccountVolumes) toCore() ledger.VolumesByAssets {
for _, volume := range volumes {
if volumesForAsset, ok := ret[volume.Asset]; !ok {
ret[volume.Asset] = ledger.Volumes{
Inputs: new(big.Int).Set(volume.Inputs),
Outputs: new(big.Int).Set(volume.Outputs),
Input: new(big.Int).Set(volume.Input),
Output: new(big.Int).Set(volume.Output),
}
} else {
volumesForAsset.Inputs = volumesForAsset.Inputs.Add(volumesForAsset.Inputs, volume.Inputs)
volumesForAsset.Outputs = volumesForAsset.Outputs.Add(volumesForAsset.Outputs, volume.Outputs)
volumesForAsset.Input = volumesForAsset.Input.Add(volumesForAsset.Input, volume.Input)
volumesForAsset.Output = volumesForAsset.Output.Add(volumesForAsset.Output, volume.Output)

ret[volume.Asset] = volumesForAsset
}
Expand Down Expand Up @@ -114,7 +109,7 @@ func convertOperatorToSQL(operator string) string {

func (s *Store) selectBalance(date *time.Time) *bun.SelectQuery {
return s.SortMovesBySeq(date).
ColumnExpr("(post_commit_volumes).inputs - (post_commit_volumes).outputs as balance").
ColumnExpr("(post_commit_volumes).input - (post_commit_volumes).output as balance").
Limit(1)
}

Expand Down Expand Up @@ -198,7 +193,7 @@ func (s *Store) selectAccounts(date *time.Time, expandVolumes, expandEffectiveVo
s.db.NewSelect().
TableExpr("(?) v", s.SelectDistinctMovesBySeq(date)).
Column("accounts_seq").
ColumnExpr(`to_json(array_agg(json_build_object('asset', v.asset, 'inputs', (v.post_commit_volumes).inputs, 'outputs', (v.post_commit_volumes).outputs))) as pcv`).
ColumnExpr(`to_json(array_agg(json_build_object('asset', v.asset, 'input', (v.post_commit_volumes).input, 'output', (v.post_commit_volumes).output))) as pcv`).
Group("accounts_seq"),
).
ColumnExpr("pcv.*")
Expand All @@ -211,7 +206,7 @@ func (s *Store) selectAccounts(date *time.Time, expandVolumes, expandEffectiveVo
s.db.NewSelect().
TableExpr("(?) v", s.SelectDistinctMovesByEffectiveDate(date)).
Column("accounts_seq").
ColumnExpr(`to_json(array_agg(json_build_object('asset', v.asset, 'inputs', (v.post_commit_effective_volumes).inputs, 'outputs', (v.post_commit_effective_volumes).outputs))) as pcev`).
ColumnExpr(`to_json(array_agg(json_build_object('asset', v.asset, 'input', (v.post_commit_effective_volumes).input, 'output', (v.post_commit_effective_volumes).output))) as pcev`).
Group("accounts_seq"),
).
ColumnExpr("pcev.*")
Expand Down
6 changes: 3 additions & 3 deletions internal/storage/ledger/balances.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func (s *Store) selectAccountWithVolumes(date *time.Time, useInsertionDate bool,
ModelTableExpr(s.GetPrefixedRelationName("accounts_volumes")).
Column("asset", "accounts_seq").
ColumnExpr("account as account_address").
ColumnExpr("(inputs, outputs)::"+s.GetPrefixedRelationName("volumes")+" as volumes").
ColumnExpr("(input, output)::"+s.GetPrefixedRelationName("volumes")+" as volumes").
Where("ledger = ?", s.ledger.Name)
}

Expand Down Expand Up @@ -168,7 +168,7 @@ func (s *Store) selectAccountWithVolumes(date *time.Time, useInsertionDate bool,
func (s *Store) SelectAggregatedBalances(date *time.Time, useInsertionDate bool, builder query.Builder) *bun.SelectQuery {
return s.db.NewSelect().
ModelTableExpr("(?) accounts", s.selectAccountWithVolumes(date, useInsertionDate, builder)).
ColumnExpr(`to_json(array_agg(json_build_object('asset', accounts.asset, 'inputs', (accounts.volumes).inputs, 'outputs', (accounts.volumes).outputs))) as aggregated`)
ColumnExpr(`to_json(array_agg(json_build_object('asset', accounts.asset, 'input', (accounts.volumes).input, 'output', (accounts.volumes).output))) as aggregated`)
}

func (s *Store) GetAggregatedBalances(ctx context.Context, q ledgercontroller.GetAggregatedBalanceQuery) (ledger.BalancesByAssets, error) {
Expand Down Expand Up @@ -201,7 +201,7 @@ func (s *Store) GetBalances(ctx context.Context, query ledgercontroller.BalanceQ
Model(&balances).
ModelTableExpr(s.GetPrefixedRelationName("accounts_volumes")).
ColumnExpr("account, asset").
ColumnExpr("inputs - outputs as balance").
ColumnExpr("input - output as balance").
Where("("+strings.Join(conditions, ") OR (")+")", args...).
For("update").
// notes(gfyrag): keep order, it ensures consistent locking order and limit deadlocks
Expand Down
16 changes: 8 additions & 8 deletions internal/storage/ledger/balances_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ func TestBalancesGet(t *testing.T) {
Ledger: store.ledger.Name,
Account: "world",
Asset: "USD",
Inputs: new(big.Int),
Outputs: big.NewInt(100),
Input: new(big.Int),
Output: big.NewInt(100),
AccountsSeq: world.Seq,
})
require.NoError(t, err)
Expand Down Expand Up @@ -276,8 +276,8 @@ func TestUpdateBalances(t *testing.T) {
Ledger: store.ledger.Name,
Account: "world",
Asset: "USD/2",
Inputs: big.NewInt(0),
Outputs: big.NewInt(100),
Input: big.NewInt(0),
Output: big.NewInt(100),
AccountsSeq: world.Seq,
})
require.NoError(t, err)
Expand All @@ -291,8 +291,8 @@ func TestUpdateBalances(t *testing.T) {
Ledger: store.ledger.Name,
Account: "world",
Asset: "USD/2",
Inputs: big.NewInt(50),
Outputs: big.NewInt(0),
Input: big.NewInt(50),
Output: big.NewInt(0),
})
require.NoError(t, err)
require.Equal(t, map[string]map[string]ledger.Volumes{
Expand All @@ -305,8 +305,8 @@ func TestUpdateBalances(t *testing.T) {
Ledger: store.ledger.Name,
Account: "world",
Asset: "USD/2",
Inputs: big.NewInt(50),
Outputs: big.NewInt(50),
Input: big.NewInt(50),
Output: big.NewInt(50),
AccountsSeq: world.Seq,
})
require.NoError(t, err)
Expand Down
12 changes: 6 additions & 6 deletions internal/storage/ledger/moves.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,8 @@ type Move struct {
AccountSeq int `bun:"accounts_seq,type:int"`
InsertionDate time.Time `bun:"insertion_date,type:timestamp"`
EffectiveDate time.Time `bun:"effective_date,type:timestamp"`
PostCommitVolumes *Volumes `bun:"post_commit_volumes,type:jsonb,scanonly"`
PostCommitEffectiveVolumes *Volumes `bun:"post_commit_effective_volumes,type:jsonb,scanonly"`
PostCommitVolumes *ledger.Volumes `bun:"post_commit_volumes,type:jsonb,scanonly"`
PostCommitEffectiveVolumes *ledger.Volumes `bun:"post_commit_effective_volumes,type:jsonb,scanonly"`
}

type Moves []*Move
Expand All @@ -107,17 +107,17 @@ func (m Moves) volumeUpdates() []AccountsVolumes {
volumes := ledger.NewEmptyVolumes()
for _, move := range moves {
if move.IsSource {
volumes.Outputs.Add(volumes.Outputs, (*big.Int)(move.Amount))
volumes.Output.Add(volumes.Output, (*big.Int)(move.Amount))
} else {
volumes.Inputs.Add(volumes.Inputs, (*big.Int)(move.Amount))
volumes.Input.Add(volumes.Input, (*big.Int)(move.Amount))
}
}
ret = append(ret, AccountsVolumes{
Ledger: moves[0].Ledger,
Account: account,
Asset: asset,
Inputs: volumes.Inputs,
Outputs: volumes.Outputs,
Input: volumes.Input,
Output: volumes.Output,
AccountsSeq: moves[0].AccountSeq,
})
}
Expand Down
60 changes: 30 additions & 30 deletions internal/storage/ledger/moves_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,14 +61,14 @@ func TestMovesInsert(t *testing.T) {
}
require.NoError(t, store.insertMoves(ctx, &m1))
require.NotNil(t, m1.PostCommitVolumes)
require.Equal(t, Volumes{
Inputs: big.NewInt(0),
Outputs: big.NewInt(100),
require.Equal(t, ledger.Volumes{
Input: big.NewInt(0),
Output: big.NewInt(100),
}, *m1.PostCommitVolumes)
require.NotNil(t, m1.PostCommitEffectiveVolumes)
require.Equal(t, Volumes{
Inputs: big.NewInt(0),
Outputs: big.NewInt(100),
require.Equal(t, ledger.Volumes{
Input: big.NewInt(0),
Output: big.NewInt(100),
}, *m1.PostCommitEffectiveVolumes)

// add a second move at t3
Expand All @@ -86,14 +86,14 @@ func TestMovesInsert(t *testing.T) {
}
require.NoError(t, store.insertMoves(ctx, &m2))
require.NotNil(t, m2.PostCommitVolumes)
require.Equal(t, Volumes{
Inputs: big.NewInt(50),
Outputs: big.NewInt(100),
require.Equal(t, ledger.Volumes{
Input: big.NewInt(50),
Output: big.NewInt(100),
}, *m2.PostCommitVolumes)
require.NotNil(t, m2.PostCommitEffectiveVolumes)
require.Equal(t, Volumes{
Inputs: big.NewInt(50),
Outputs: big.NewInt(100),
require.Equal(t, ledger.Volumes{
Input: big.NewInt(50),
Output: big.NewInt(100),
}, *m2.PostCommitEffectiveVolumes)

// add a third move at t1
Expand All @@ -111,14 +111,14 @@ func TestMovesInsert(t *testing.T) {
}
require.NoError(t, store.insertMoves(ctx, &m3))
require.NotNil(t, m3.PostCommitVolumes)
require.Equal(t, Volumes{
Inputs: big.NewInt(50),
Outputs: big.NewInt(300),
require.Equal(t, ledger.Volumes{
Input: big.NewInt(50),
Output: big.NewInt(300),
}, *m3.PostCommitVolumes)
require.NotNil(t, m3.PostCommitEffectiveVolumes)
require.Equal(t, Volumes{
Inputs: big.NewInt(0),
Outputs: big.NewInt(300),
require.Equal(t, ledger.Volumes{
Input: big.NewInt(0),
Output: big.NewInt(300),
}, *m3.PostCommitEffectiveVolumes)

// add a fourth move at t2
Expand All @@ -136,14 +136,14 @@ func TestMovesInsert(t *testing.T) {
}
require.NoError(t, store.insertMoves(ctx, &m4))
require.NotNil(t, m4.PostCommitVolumes)
require.Equal(t, Volumes{
Inputs: big.NewInt(100),
Outputs: big.NewInt(300),
require.Equal(t, ledger.Volumes{
Input: big.NewInt(100),
Output: big.NewInt(300),
}, *m4.PostCommitVolumes)
require.NotNil(t, m4.PostCommitEffectiveVolumes)
require.Equal(t, Volumes{
Inputs: big.NewInt(50),
Outputs: big.NewInt(300),
require.Equal(t, ledger.Volumes{
Input: big.NewInt(50),
Output: big.NewInt(300),
}, *m4.PostCommitEffectiveVolumes)

// add a fifth move at t4
Expand All @@ -161,14 +161,14 @@ func TestMovesInsert(t *testing.T) {
}
require.NoError(t, store.insertMoves(ctx, &m5))
require.NotNil(t, m5.PostCommitVolumes)
require.Equal(t, Volumes{
Inputs: big.NewInt(150),
Outputs: big.NewInt(300),
require.Equal(t, ledger.Volumes{
Input: big.NewInt(150),
Output: big.NewInt(300),
}, *m5.PostCommitVolumes)
require.NotNil(t, m5.PostCommitEffectiveVolumes)
require.Equal(t, Volumes{
Inputs: big.NewInt(150),
Outputs: big.NewInt(300),
require.Equal(t, ledger.Volumes{
Input: big.NewInt(150),
Output: big.NewInt(300),
}, *m5.PostCommitEffectiveVolumes)
})

Expand Down
12 changes: 6 additions & 6 deletions internal/storage/ledger/transactions.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,12 +83,12 @@ func (p TransactionsPostCommitVolumes) toCore() ledger.PostCommitVolumes {
}
if v, ok := ret[volumes.Account][volumes.Asset]; !ok {
ret[volumes.Account][volumes.Asset] = ledger.Volumes{
Inputs: volumes.Inputs,
Outputs: volumes.Outputs,
Input: volumes.Input,
Output: volumes.Output,
}
} else {
v.Inputs = v.Inputs.Add(v.Inputs, volumes.Inputs)
v.Outputs = v.Outputs.Add(v.Outputs, volumes.Outputs)
v.Input = v.Input.Add(v.Input, volumes.Input)
v.Output = v.Output.Add(v.Output, volumes.Output)

ret[volumes.Account][volumes.Asset] = v
}
Expand Down Expand Up @@ -201,7 +201,7 @@ func (s *Store) selectTransactions(date *time.Time, expandVolumes, expandEffecti
s.db.NewSelect().
TableExpr(s.GetPrefixedRelationName("moves")).
Column("transactions_seq").
ColumnExpr(`to_json(array_agg(json_build_object('account', moves.account_address, 'asset', moves.asset, 'inputs', (moves.post_commit_volumes).inputs, 'outputs', (moves.post_commit_volumes).outputs))) as post_commit_volumes`).
ColumnExpr(`to_json(array_agg(json_build_object('account', moves.account_address, 'asset', moves.asset, 'input', (moves.post_commit_volumes).input, 'output', (moves.post_commit_volumes).output))) as post_commit_volumes`).
Group("transactions_seq"),
).
ColumnExpr("pcv.*")
Expand All @@ -215,7 +215,7 @@ func (s *Store) selectTransactions(date *time.Time, expandVolumes, expandEffecti
s.db.NewSelect().
TableExpr(s.GetPrefixedRelationName("moves")).
Column("transactions_seq").
ColumnExpr(`to_json(array_agg(json_build_object('account', moves.account_address, 'asset', moves.asset, 'inputs', (moves.post_commit_effective_volumes).inputs, 'outputs', (moves.post_commit_effective_volumes).outputs))) as post_commit_effective_volumes`).
ColumnExpr(`to_json(array_agg(json_build_object('account', moves.account_address, 'asset', moves.asset, 'input', (moves.post_commit_effective_volumes).input, 'output', (moves.post_commit_effective_volumes).output))) as post_commit_effective_volumes`).
Group("transactions_seq"),
).
ColumnExpr("pcev.*")
Expand Down
Loading

0 comments on commit 53d5611

Please sign in to comment.