Skip to content

Commit

Permalink
feat: compute move pcv on the fly
Browse files Browse the repository at this point in the history
  • Loading branch information
gfyrag committed Oct 16, 2024
1 parent b794b9d commit cd1cd2b
Show file tree
Hide file tree
Showing 9 changed files with 103 additions and 107 deletions.
27 changes: 0 additions & 27 deletions internal/storage/bucket/migrations/11-stateless.sql
Original file line number Diff line number Diff line change
Expand Up @@ -143,33 +143,6 @@ drop function "{{.Bucket}}".revert_transaction(_ledger character varying, _id nu
drop type "{{.Bucket}}".volumes_with_asset;
drop type "{{.Bucket}}".volumes;

create function "{{.Bucket}}".set_volumes()
returns trigger
security definer
language plpgsql
as
$$
begin
new.post_commit_volumes = coalesce((
select json_build_object(
'input', (post_commit_volumes->>'input')::numeric + case when new.is_source then 0 else new.amount end,
'output', (post_commit_volumes->>'output')::numeric + case when new.is_source then new.amount else 0 end
)
from "{{.Bucket}}".moves
where accounts_seq = new.accounts_seq
and asset = new.asset
and ledger = new.ledger
order by seq desc
limit 1
), json_build_object(
'input', case when new.is_source then 0 else new.amount end,
'output', case when new.is_source then new.amount else 0 end
));

return new;
end;
$$;

create function "{{.Bucket}}".set_effective_volumes()
returns trigger
security definer
Expand Down
15 changes: 0 additions & 15 deletions internal/storage/ledger/migrations/0-add-sequences.sql
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,6 @@ select setval('"{{.Bucket}}"."log_id_{{.ID}}"', coalesce((
where ledger = '{{ .Name }}'
), 1)::bigint, false);


-- enable post commit volumes synchronously
{{ if .HasFeature "MOVES_HISTORY_POST_COMMIT_VOLUMES" "SYNC" }}
create index "pcv_{{.ID}}" on "{{.Bucket}}".moves (accounts_seq, asset, seq) where ledger = '{{.Name}}';

create trigger "set_volumes_{{.ID}}"
before insert
on "{{.Bucket}}"."moves"
for each row
when (
new.ledger = '{{.Name}}'
)
execute procedure "{{.Bucket}}".set_volumes();
{{ end }}

-- enable post commit effective volumes synchronously

{{ if .HasFeature "MOVES_HISTORY_POST_COMMIT_EFFECTIVE_VOLUMES" "SYNC" }}
Expand Down
25 changes: 24 additions & 1 deletion internal/storage/ledger/moves.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@ package ledger

import (
"context"
. "github.com/formancehq/go-libs/collectionutils"
ledger "github.com/formancehq/ledger/internal"
"math/big"
"slices"

"github.com/formancehq/go-libs/bun/bunpaginate"
"github.com/formancehq/go-libs/platform/postgres"
Expand Down Expand Up @@ -85,7 +87,7 @@ type Move struct {
AccountSeq int `bun:"accounts_seq,type:int"`
InsertionDate time.Time `bun:"insertion_date,type:timestamp"`
EffectiveDate time.Time `bun:"effective_date,type:timestamp"`
PostCommitVolumes *ledger.Volumes `bun:"post_commit_volumes,type:jsonb,scanonly"`
PostCommitVolumes *ledger.Volumes `bun:"post_commit_volumes,type:jsonb"`
PostCommitEffectiveVolumes *ledger.Volumes `bun:"post_commit_effective_volumes,type:jsonb,scanonly"`
}

Expand Down Expand Up @@ -127,13 +129,34 @@ func (m Moves) volumeUpdates() []AccountsVolumes {
}

func (m Moves) ComputePostCommitEffectiveVolumes() ledger.PostCommitVolumes {
type key struct {
Account string
Asset string
}

visited := Set[key]{}

// we need to find the more recent move for each account/asset
slices.Reverse(m)

ret := ledger.PostCommitVolumes{}
for _, move := range m {
if visited.Contains(key{
Account: move.Account,
Asset: move.Asset,
}) {
continue
}
ret = ret.Merge(ledger.PostCommitVolumes{
move.Account: ledger.VolumesByAssets{
move.Asset: *move.PostCommitEffectiveVolumes,
},
})
visited.Put(key{
Account: move.Account,
Asset: move.Asset,
})
}

return ret
}
25 changes: 0 additions & 25 deletions internal/storage/ledger/moves_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,6 @@ func TestMovesInsert(t *testing.T) {
EffectiveDate: t0,
}
require.NoError(t, store.insertMoves(ctx, &m1))
require.NotNil(t, m1.PostCommitVolumes)
require.Equal(t, ledger.Volumes{
Input: big.NewInt(0),
Output: big.NewInt(100),
}, *m1.PostCommitVolumes)
require.NotNil(t, m1.PostCommitEffectiveVolumes)
require.Equal(t, ledger.Volumes{
Input: big.NewInt(0),
Expand All @@ -85,11 +80,6 @@ func TestMovesInsert(t *testing.T) {
EffectiveDate: t3,
}
require.NoError(t, store.insertMoves(ctx, &m2))
require.NotNil(t, m2.PostCommitVolumes)
require.Equal(t, ledger.Volumes{
Input: big.NewInt(50),
Output: big.NewInt(100),
}, *m2.PostCommitVolumes)
require.NotNil(t, m2.PostCommitEffectiveVolumes)
require.Equal(t, ledger.Volumes{
Input: big.NewInt(50),
Expand All @@ -110,11 +100,6 @@ func TestMovesInsert(t *testing.T) {
EffectiveDate: t1,
}
require.NoError(t, store.insertMoves(ctx, &m3))
require.NotNil(t, m3.PostCommitVolumes)
require.Equal(t, ledger.Volumes{
Input: big.NewInt(50),
Output: big.NewInt(300),
}, *m3.PostCommitVolumes)
require.NotNil(t, m3.PostCommitEffectiveVolumes)
require.Equal(t, ledger.Volumes{
Input: big.NewInt(0),
Expand All @@ -135,11 +120,6 @@ func TestMovesInsert(t *testing.T) {
EffectiveDate: t2,
}
require.NoError(t, store.insertMoves(ctx, &m4))
require.NotNil(t, m4.PostCommitVolumes)
require.Equal(t, ledger.Volumes{
Input: big.NewInt(100),
Output: big.NewInt(300),
}, *m4.PostCommitVolumes)
require.NotNil(t, m4.PostCommitEffectiveVolumes)
require.Equal(t, ledger.Volumes{
Input: big.NewInt(50),
Expand All @@ -160,11 +140,6 @@ func TestMovesInsert(t *testing.T) {
EffectiveDate: t4,
}
require.NoError(t, store.insertMoves(ctx, &m5))
require.NotNil(t, m5.PostCommitVolumes)
require.Equal(t, ledger.Volumes{
Input: big.NewInt(150),
Output: big.NewInt(300),
}, *m5.PostCommitVolumes)
require.NotNil(t, m5.PostCommitEffectiveVolumes)
require.Equal(t, ledger.Volumes{
Input: big.NewInt(150),
Expand Down
84 changes: 50 additions & 34 deletions internal/storage/ledger/transactions.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ import (
"context"
"encoding/json"
"fmt"
"math/big"
"regexp"
"slices"
"strings"

"github.com/formancehq/ledger/internal/tracing"
Expand Down Expand Up @@ -303,7 +305,7 @@ func (s *Store) CommitTransaction(ctx context.Context, tx *ledger.Transaction) e
accounts[address] = account
}

updatedVolumes, err := s.updateVolumes(ctx, volumeUpdates(s.ledger.Name, tx, accounts)...)
postCommitVolumes, err := s.updateVolumes(ctx, volumeUpdates(s.ledger.Name, tx, accounts)...)
if err != nil {
return errors.Wrap(err, "failed to update balances")
}
Expand All @@ -321,44 +323,57 @@ func (s *Store) CommitTransaction(ctx context.Context, tx *ledger.Transaction) e
Destinations: destinations,
SourcesArray: Map(sources, convertAddrToIndexedJSONB),
DestinationsArray: Map(destinations, convertAddrToIndexedJSONB),
PostCommitVolumes: updatedVolumes,
PostCommitVolumes: postCommitVolumes,
}

err = s.insertTransaction(ctx, mappedTx)
if err != nil {
return errors.Wrap(err, "failed to insert transaction")
}

tx.ID = mappedTx.ID
tx.PostCommitVolumes = postCommitVolumes.Copy()
tx.Timestamp = mappedTx.Timestamp
tx.InsertedAt = mappedTx.InsertedAt

if s.ledger.HasFeature(ledger.FeatureMovesHistory, "ON") {
moves := Moves{}
for _, p := range tx.Postings {
moves = append(moves, []*Move{
{
Ledger: s.ledger.Name,
IsSource: true,
Account: p.Source,
AccountAddressArray: strings.Split(p.Source, ":"),
Amount: (*bunpaginate.BigInt)(p.Amount),
Asset: p.Asset,
InsertionDate: tx.InsertedAt,
EffectiveDate: tx.Timestamp,
TransactionSeq: mappedTx.Seq,
AccountSeq: accounts[p.Source].Seq,
},
{
Ledger: s.ledger.Name,
Account: p.Destination,
AccountAddressArray: strings.Split(p.Destination, ":"),
Amount: (*bunpaginate.BigInt)(p.Amount),
Asset: p.Asset,
InsertionDate: tx.InsertedAt,
EffectiveDate: tx.Timestamp,
TransactionSeq: mappedTx.Seq,
AccountSeq: accounts[p.Destination].Seq,
},
}...)
postings := tx.Postings
slices.Reverse(postings)

for _, posting := range postings {
moves = append(moves, &Move{
Ledger: s.ledger.Name,
Account: posting.Destination,
AccountAddressArray: strings.Split(posting.Destination, ":"),
Amount: (*bunpaginate.BigInt)(posting.Amount),
Asset: posting.Asset,
InsertionDate: tx.InsertedAt,
EffectiveDate: tx.Timestamp,
TransactionSeq: mappedTx.Seq,
AccountSeq: accounts[posting.Destination].Seq,
PostCommitVolumes: pointer.For(postCommitVolumes[posting.Destination][posting.Asset].Copy()),
})
postCommitVolumes.AddInput(posting.Destination, posting.Asset, new(big.Int).Neg(posting.Amount))

moves = append(moves, &Move{
Ledger: s.ledger.Name,
IsSource: true,
Account: posting.Source,
AccountAddressArray: strings.Split(posting.Source, ":"),
Amount: (*bunpaginate.BigInt)(posting.Amount),
Asset: posting.Asset,
InsertionDate: tx.InsertedAt,
EffectiveDate: tx.Timestamp,
TransactionSeq: mappedTx.Seq,
AccountSeq: accounts[posting.Source].Seq,
PostCommitVolumes: pointer.For(postCommitVolumes[posting.Source][posting.Asset].Copy()),
})
postCommitVolumes.AddOutput(posting.Source, posting.Asset, new(big.Int).Neg(posting.Amount))
}

slices.Reverse(moves)

if err := s.insertMoves(ctx, moves...); err != nil {
return errors.Wrap(err, "failed to insert moves")
}
Expand All @@ -368,11 +383,6 @@ func (s *Store) CommitTransaction(ctx context.Context, tx *ledger.Transaction) e
}
}

tx.ID = mappedTx.ID
tx.PostCommitVolumes = updatedVolumes
tx.Timestamp = mappedTx.Timestamp
tx.InsertedAt = mappedTx.InsertedAt

return nil
}

Expand Down Expand Up @@ -625,6 +635,10 @@ func volumeUpdates(l string, transaction *ledger.Transaction, accounts map[strin
}
aggregatedVolumes[posting.Source][posting.Asset] = append(aggregatedVolumes[posting.Source][posting.Asset], posting)

if posting.Source == posting.Destination {
continue
}

if _, ok := aggregatedVolumes[posting.Destination]; !ok {
aggregatedVolumes[posting.Destination] = make(map[string][]ledger.Posting)
}
Expand All @@ -638,10 +652,12 @@ func volumeUpdates(l string, transaction *ledger.Transaction, accounts map[strin
for _, posting := range postings {
if account == posting.Source {
volumes.Output.Add(volumes.Output, posting.Amount)
} else {
}
if account == posting.Destination {
volumes.Input.Add(volumes.Input, posting.Amount)
}
}

ret = append(ret, AccountsVolumes{
Ledger: l,
Account: account,
Expand Down
22 changes: 22 additions & 0 deletions internal/storage/ledger/transactions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"context"
"database/sql"
"fmt"
"github.com/davecgh/go-spew/spew"
"github.com/formancehq/go-libs/platform/postgres"
ledgercontroller "github.com/formancehq/ledger/internal/controller/ledger"
"math/big"
Expand Down Expand Up @@ -262,6 +263,27 @@ func TestTransactionsCommit(t *testing.T) {
require.Equal(t, tx2.PostCommitVolumes, tx2.PostCommitEffectiveVolumes)
})

t.Run("auto send", func(t *testing.T) {
store := newLedgerStore(t)

tx3 := ledger.NewTransaction().WithPostings(
ledger.NewPosting("account:x", "account:x", "USD", big.NewInt(100)),
)
err := store.CommitTransaction(ctx, &tx3)
require.NoError(t, err)
require.Equal(t, 1, tx3.ID)
require.Equal(t, ledger.PostCommitVolumes{
"account:x": ledger.VolumesByAssets{
"USD": ledger.Volumes{
Input: big.NewInt(100),
Output: big.NewInt(100),
},
},
}, tx3.PostCommitVolumes)
spew.Dump(tx3)
require.Equal(t, tx3.PostCommitVolumes, tx3.PostCommitEffectiveVolumes)
})

t.Run("triggering a deadlock should return appropriate postgres error", func(t *testing.T) {
t.Parallel()

Expand Down
4 changes: 3 additions & 1 deletion internal/transaction_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package ledger

import (
"github.com/formancehq/go-libs/time"
"math/big"
"testing"

Expand All @@ -22,6 +23,7 @@ func TestReverseTransaction(t *testing.T) {
WithTimestamp(tx.Timestamp)

reversed := tx.Reverse(true)
reversed.InsertedAt = tx.InsertedAt
reversed.InsertedAt = time.Time{}
expected.InsertedAt = time.Time{}
require.Equal(t, expected, reversed)
}
6 changes: 3 additions & 3 deletions internal/volumes.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ type Volumes struct {
Output *big.Int `json:"output"`
}

func (v Volumes) CopyWithZerosIfNeeded() Volumes {
func (v Volumes) Copy() Volumes {
return Volumes{
Input: new(big.Int).Set(v.Input),
Output: new(big.Int).Set(v.Output),
Expand Down Expand Up @@ -107,7 +107,7 @@ func (a PostCommitVolumes) AddInput(account, asset string, input *big.Int) {
},
}
} else {
volumes := assetsVolumes[asset].CopyWithZerosIfNeeded()
volumes := assetsVolumes[asset].Copy()
volumes.Input.Add(volumes.Input, input)
assetsVolumes[asset] = volumes
}
Expand All @@ -122,7 +122,7 @@ func (a PostCommitVolumes) AddOutput(account, asset string, output *big.Int) {
},
}
} else {
volumes := assetsVolumes[asset].CopyWithZerosIfNeeded()
volumes := assetsVolumes[asset].Copy()
volumes.Output.Add(volumes.Output, output)
assetsVolumes[asset] = volumes
}
Expand Down
Loading

0 comments on commit cd1cd2b

Please sign in to comment.