From 3781bb26e612b7522c7a50591cd65965e7f84abe Mon Sep 17 00:00:00 2001 From: Geoffrey Ragot Date: Fri, 27 Sep 2024 13:00:11 +0200 Subject: [PATCH] feat: move code --- internal/moves.go | 59 +++++++++++++++++++ .../bucket/migrations/11-stateless.sql | 1 - internal/storage/ledger/moves.go | 59 +------------------ internal/storage/ledger/moves_test.go | 10 ++-- internal/storage/ledger/transactions.go | 6 +- 5 files changed, 69 insertions(+), 66 deletions(-) create mode 100644 internal/moves.go diff --git a/internal/moves.go b/internal/moves.go new file mode 100644 index 000000000..ded782405 --- /dev/null +++ b/internal/moves.go @@ -0,0 +1,59 @@ +package ledger + +import ( + "github.com/formancehq/go-libs/bun/bunpaginate" + "github.com/formancehq/go-libs/collectionutils" + "github.com/formancehq/go-libs/time" + "github.com/uptrace/bun" + "slices" +) + +type Move struct { + bun.BaseModel `bun:"table:moves"` + + Ledger string `bun:"ledger,type:varchar"` + TransactionID int `bun:"transactions_id,type:bigint"` + IsSource bool `bun:"is_source,type:bool"` + Account string `bun:"accounts_address,type:varchar"` + Amount *bunpaginate.BigInt `bun:"amount,type:numeric"` + Asset string `bun:"asset,type:varchar"` + InsertionDate time.Time `bun:"insertion_date,type:timestamp"` + EffectiveDate time.Time `bun:"effective_date,type:timestamp"` + PostCommitVolumes *Volumes `bun:"post_commit_volumes,type:jsonb"` + PostCommitEffectiveVolumes *Volumes `bun:"post_commit_effective_volumes,type:jsonb,scanonly"` +} + +type Moves []*Move + +func (m Moves) ComputePostCommitEffectiveVolumes() PostCommitVolumes { + type key struct { + Account string + Asset string + } + + visited := collectionutils.Set[key]{} + + // we need to find the more recent move for each account/asset + slices.Reverse(m) + + ret := PostCommitVolumes{} + for _, move := range m { + if visited.Contains(key{ + Account: move.Account, + Asset: move.Asset, + }) { + continue + } + ret = ret.Merge(PostCommitVolumes{ + move.Account: VolumesByAssets{ + move.Asset: *move.PostCommitEffectiveVolumes, + }, + }) + visited.Put(key{ + Account: move.Account, + Asset: move.Asset, + }) + } + + return ret +} diff --git a/internal/storage/bucket/migrations/11-stateless.sql b/internal/storage/bucket/migrations/11-stateless.sql index 5840c2085..a1b9d2761 100644 --- a/internal/storage/bucket/migrations/11-stateless.sql +++ b/internal/storage/bucket/migrations/11-stateless.sql @@ -7,7 +7,6 @@ alter table "{{.Bucket}}".transactions add column inserted_at timestamp without time zone default (now() at time zone 'utc'); --- todo: check if still required alter table "{{.Bucket}}".transactions alter column timestamp set default (now() at time zone 'utc'); diff --git a/internal/storage/ledger/moves.go b/internal/storage/ledger/moves.go index ffade5e20..5204b6458 100644 --- a/internal/storage/ledger/moves.go +++ b/internal/storage/ledger/moves.go @@ -2,14 +2,9 @@ package ledger import ( "context" - "slices" - - . "github.com/formancehq/go-libs/collectionutils" - ledger "github.com/formancehq/ledger/internal" - - "github.com/formancehq/go-libs/bun/bunpaginate" "github.com/formancehq/go-libs/platform/postgres" "github.com/formancehq/go-libs/time" + ledger "github.com/formancehq/ledger/internal" "github.com/formancehq/ledger/internal/tracing" "github.com/uptrace/bun" ) @@ -57,7 +52,7 @@ func (s *Store) SelectDistinctMovesByEffectiveDate(date *time.Time) *bun.SelectQ return ret } -func (s *Store) insertMoves(ctx context.Context, moves ...*Move) error { +func (s *Store) insertMoves(ctx context.Context, moves ...*ledger.Move) error { _, err := tracing.TraceWithLatency(ctx, "InsertMoves", tracing.NoResult(func(ctx context.Context) error { _, err := s.db.NewInsert(). Model(&moves). @@ -71,53 +66,3 @@ func (s *Store) insertMoves(ctx context.Context, moves ...*Move) error { return err } - -type Move struct { - bun.BaseModel `bun:"table:moves"` - - Ledger string `bun:"ledger,type:varchar"` - TransactionID int `bun:"transactions_id,type:bigint"` - IsSource bool `bun:"is_source,type:bool"` - Account string `bun:"accounts_address,type:varchar"` - Amount *bunpaginate.BigInt `bun:"amount,type:numeric"` - Asset string `bun:"asset,type:varchar"` - InsertionDate time.Time `bun:"insertion_date,type:timestamp"` - EffectiveDate time.Time `bun:"effective_date,type:timestamp"` - PostCommitVolumes *ledger.Volumes `bun:"post_commit_volumes,type:jsonb"` - PostCommitEffectiveVolumes *ledger.Volumes `bun:"post_commit_effective_volumes,type:jsonb,scanonly"` -} - -type Moves []*Move - -func (m Moves) ComputePostCommitEffectiveVolumes() ledger.PostCommitVolumes { - type key struct { - Account string - Asset string - } - - visited := Set[key]{} - - // we need to find the more recent move for each account/asset - slices.Reverse(m) - - ret := ledger.PostCommitVolumes{} - for _, move := range m { - if visited.Contains(key{ - Account: move.Account, - Asset: move.Asset, - }) { - continue - } - ret = ret.Merge(ledger.PostCommitVolumes{ - move.Account: ledger.VolumesByAssets{ - move.Asset: *move.PostCommitEffectiveVolumes, - }, - }) - visited.Put(key{ - Account: move.Account, - Asset: move.Asset, - }) - } - - return ret -} diff --git a/internal/storage/ledger/moves_test.go b/internal/storage/ledger/moves_test.go index 5a8caff36..51c3c47f1 100644 --- a/internal/storage/ledger/moves_test.go +++ b/internal/storage/ledger/moves_test.go @@ -52,7 +52,7 @@ func TestMovesInsert(t *testing.T) { t4 := t3.Add(time.Hour) // insert a first tx at t0 - m1 := Move{ + m1 := ledger.Move{ Ledger: store.ledger.Name, IsSource: true, Account: "world", @@ -69,7 +69,7 @@ func TestMovesInsert(t *testing.T) { }, *m1.PostCommitEffectiveVolumes) // add a second move at t3 - m2 := Move{ + m2 := ledger.Move{ Ledger: store.ledger.Name, IsSource: false, Account: "world", @@ -86,7 +86,7 @@ func TestMovesInsert(t *testing.T) { }, *m2.PostCommitEffectiveVolumes) // add a third move at t1 - m3 := Move{ + m3 := ledger.Move{ Ledger: store.ledger.Name, IsSource: true, Account: "world", @@ -103,7 +103,7 @@ func TestMovesInsert(t *testing.T) { }, *m3.PostCommitEffectiveVolumes) // add a fourth move at t2 - m4 := Move{ + m4 := ledger.Move{ Ledger: store.ledger.Name, IsSource: false, Account: "world", @@ -120,7 +120,7 @@ func TestMovesInsert(t *testing.T) { }, *m4.PostCommitEffectiveVolumes) // add a fifth move at t4 - m5 := Move{ + m5 := ledger.Move{ Ledger: store.ledger.Name, IsSource: false, Account: "world", diff --git a/internal/storage/ledger/transactions.go b/internal/storage/ledger/transactions.go index 6ce4b18f2..f622c3523 100644 --- a/internal/storage/ledger/transactions.go +++ b/internal/storage/ledger/transactions.go @@ -272,12 +272,12 @@ func (s *Store) CommitTransaction(ctx context.Context, tx *ledger.Transaction) e } if s.ledger.HasFeature(ledger.FeatureMovesHistory, "ON") { - moves := Moves{} + moves := ledger.Moves{} postings := tx.Postings slices.Reverse(postings) for _, posting := range postings { - moves = append(moves, &Move{ + moves = append(moves, &ledger.Move{ Ledger: s.ledger.Name, Account: posting.Destination, Amount: (*bunpaginate.BigInt)(posting.Amount), @@ -289,7 +289,7 @@ func (s *Store) CommitTransaction(ctx context.Context, tx *ledger.Transaction) e }) postCommitVolumes.AddInput(posting.Destination, posting.Asset, new(big.Int).Neg(posting.Amount)) - moves = append(moves, &Move{ + moves = append(moves, &ledger.Move{ Ledger: s.ledger.Name, IsSource: true, Account: posting.Source,