Skip to content

Commit

Permalink
feat: move code
Browse files Browse the repository at this point in the history
  • Loading branch information
gfyrag committed Oct 16, 2024
1 parent 9580de2 commit 4dd8f2a
Show file tree
Hide file tree
Showing 5 changed files with 69 additions and 66 deletions.
59 changes: 59 additions & 0 deletions internal/moves.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package ledger

import (
"github.com/formancehq/go-libs/bun/bunpaginate"
"github.com/formancehq/go-libs/collectionutils"
"github.com/formancehq/go-libs/time"
"github.com/uptrace/bun"
"slices"
)

type Move struct {
bun.BaseModel `bun:"table:moves"`

Ledger string `bun:"ledger,type:varchar"`
TransactionID int `bun:"transactions_id,type:bigint"`
IsSource bool `bun:"is_source,type:bool"`
Account string `bun:"accounts_address,type:varchar"`
Amount *bunpaginate.BigInt `bun:"amount,type:numeric"`
Asset string `bun:"asset,type:varchar"`
InsertionDate time.Time `bun:"insertion_date,type:timestamp"`
EffectiveDate time.Time `bun:"effective_date,type:timestamp"`
PostCommitVolumes *Volumes `bun:"post_commit_volumes,type:jsonb"`
PostCommitEffectiveVolumes *Volumes `bun:"post_commit_effective_volumes,type:jsonb,scanonly"`
}

type Moves []*Move

func (m Moves) ComputePostCommitEffectiveVolumes() PostCommitVolumes {
type key struct {
Account string
Asset string
}

visited := collectionutils.Set[key]{}

// we need to find the more recent move for each account/asset
slices.Reverse(m)

ret := PostCommitVolumes{}
for _, move := range m {
if visited.Contains(key{
Account: move.Account,
Asset: move.Asset,
}) {
continue
}
ret = ret.Merge(PostCommitVolumes{
move.Account: VolumesByAssets{
move.Asset: *move.PostCommitEffectiveVolumes,
},
})
visited.Put(key{
Account: move.Account,
Asset: move.Asset,
})
}

return ret
}
1 change: 0 additions & 1 deletion internal/storage/bucket/migrations/11-stateless.sql
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ alter table "{{.Bucket}}".transactions
add column inserted_at timestamp without time zone
default (now() at time zone 'utc');

-- todo: check if still required
alter table "{{.Bucket}}".transactions
alter column timestamp
set default (now() at time zone 'utc');
Expand Down
59 changes: 2 additions & 57 deletions internal/storage/ledger/moves.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,9 @@ package ledger

import (
"context"
"slices"

. "github.com/formancehq/go-libs/collectionutils"
ledger "github.com/formancehq/ledger/internal"

"github.com/formancehq/go-libs/bun/bunpaginate"
"github.com/formancehq/go-libs/platform/postgres"
"github.com/formancehq/go-libs/time"
ledger "github.com/formancehq/ledger/internal"
"github.com/formancehq/ledger/internal/tracing"
"github.com/uptrace/bun"
)
Expand Down Expand Up @@ -57,7 +52,7 @@ func (s *Store) SelectDistinctMovesByEffectiveDate(date *time.Time) *bun.SelectQ
return ret
}

func (s *Store) insertMoves(ctx context.Context, moves ...*Move) error {
func (s *Store) insertMoves(ctx context.Context, moves ...*ledger.Move) error {
_, err := tracing.TraceWithLatency(ctx, "InsertMoves", tracing.NoResult(func(ctx context.Context) error {
_, err := s.db.NewInsert().
Model(&moves).
Expand All @@ -71,53 +66,3 @@ func (s *Store) insertMoves(ctx context.Context, moves ...*Move) error {

return err
}

type Move struct {
bun.BaseModel `bun:"table:moves"`

Ledger string `bun:"ledger,type:varchar"`
TransactionID int `bun:"transactions_id,type:bigint"`
IsSource bool `bun:"is_source,type:bool"`
Account string `bun:"accounts_address,type:varchar"`
Amount *bunpaginate.BigInt `bun:"amount,type:numeric"`
Asset string `bun:"asset,type:varchar"`
InsertionDate time.Time `bun:"insertion_date,type:timestamp"`
EffectiveDate time.Time `bun:"effective_date,type:timestamp"`
PostCommitVolumes *ledger.Volumes `bun:"post_commit_volumes,type:jsonb"`
PostCommitEffectiveVolumes *ledger.Volumes `bun:"post_commit_effective_volumes,type:jsonb,scanonly"`
}

type Moves []*Move

func (m Moves) ComputePostCommitEffectiveVolumes() ledger.PostCommitVolumes {
type key struct {
Account string
Asset string
}

visited := Set[key]{}

// we need to find the more recent move for each account/asset
slices.Reverse(m)

ret := ledger.PostCommitVolumes{}
for _, move := range m {
if visited.Contains(key{
Account: move.Account,
Asset: move.Asset,
}) {
continue
}
ret = ret.Merge(ledger.PostCommitVolumes{
move.Account: ledger.VolumesByAssets{
move.Asset: *move.PostCommitEffectiveVolumes,
},
})
visited.Put(key{
Account: move.Account,
Asset: move.Asset,
})
}

return ret
}
10 changes: 5 additions & 5 deletions internal/storage/ledger/moves_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func TestMovesInsert(t *testing.T) {
t4 := t3.Add(time.Hour)

// insert a first tx at t0
m1 := Move{
m1 := ledger.Move{
Ledger: store.ledger.Name,
IsSource: true,
Account: "world",
Expand All @@ -69,7 +69,7 @@ func TestMovesInsert(t *testing.T) {
}, *m1.PostCommitEffectiveVolumes)

// add a second move at t3
m2 := Move{
m2 := ledger.Move{
Ledger: store.ledger.Name,
IsSource: false,
Account: "world",
Expand All @@ -86,7 +86,7 @@ func TestMovesInsert(t *testing.T) {
}, *m2.PostCommitEffectiveVolumes)

// add a third move at t1
m3 := Move{
m3 := ledger.Move{
Ledger: store.ledger.Name,
IsSource: true,
Account: "world",
Expand All @@ -103,7 +103,7 @@ func TestMovesInsert(t *testing.T) {
}, *m3.PostCommitEffectiveVolumes)

// add a fourth move at t2
m4 := Move{
m4 := ledger.Move{
Ledger: store.ledger.Name,
IsSource: false,
Account: "world",
Expand All @@ -120,7 +120,7 @@ func TestMovesInsert(t *testing.T) {
}, *m4.PostCommitEffectiveVolumes)

// add a fifth move at t4
m5 := Move{
m5 := ledger.Move{
Ledger: store.ledger.Name,
IsSource: false,
Account: "world",
Expand Down
6 changes: 3 additions & 3 deletions internal/storage/ledger/transactions.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,12 +272,12 @@ func (s *Store) CommitTransaction(ctx context.Context, tx *ledger.Transaction) e
}

if s.ledger.HasFeature(ledger.FeatureMovesHistory, "ON") {
moves := Moves{}
moves := ledger.Moves{}
postings := tx.Postings
slices.Reverse(postings)

for _, posting := range postings {
moves = append(moves, &Move{
moves = append(moves, &ledger.Move{
Ledger: s.ledger.Name,
Account: posting.Destination,
Amount: (*bunpaginate.BigInt)(posting.Amount),
Expand All @@ -289,7 +289,7 @@ func (s *Store) CommitTransaction(ctx context.Context, tx *ledger.Transaction) e
})
postCommitVolumes.AddInput(posting.Destination, posting.Asset, new(big.Int).Neg(posting.Amount))

moves = append(moves, &Move{
moves = append(moves, &ledger.Move{
Ledger: s.ledger.Name,
IsSource: true,
Account: posting.Source,
Expand Down

0 comments on commit 4dd8f2a

Please sign in to comment.