diff --git a/internal/storage/ledger/volumes.go b/internal/storage/ledger/volumes.go index 95853fa9b..24635d5a2 100644 --- a/internal/storage/ledger/volumes.go +++ b/internal/storage/ledger/volumes.go @@ -100,39 +100,14 @@ func (s *Store) selectVolumes(oot, pit *time.Time, useInsertionDate bool, groupL } } - ret = ret. + selectVolumes := s.db.NewSelect(). ColumnExpr("accounts_address as address"). Column("asset"). ColumnExpr("sum(case when not is_source then amount else 0 end) as input"). ColumnExpr("sum(case when is_source then amount else 0 end) as output"). ColumnExpr("sum(case when not is_source then amount else -amount end) as balance"). - ModelTableExpr(s.GetPrefixedRelationName("moves")) - - if needSegmentAddress { - ret = ret. - Join( - "join (?) accounts on accounts.address = moves.accounts_address", - s.db.NewSelect(). - ModelTableExpr(s.GetPrefixedRelationName("accounts")). - Where("ledger = ?", s.ledger.Name). - Column("address_array", "address"), - ). - Column("accounts.address_array") - } - - // todo: handle with pit by using accounts_metadata - if useMetadata { - ret = ret. - Join( - "join lateral (?) accounts on true", - s.db.NewSelect(). - Column("metadata"). - ModelTableExpr(s.GetPrefixedRelationName("accounts")). - Where("accounts.address = moves.accounts_address"), - ). - ColumnExpr("accounts.metadata as metadata"). - Group("accounts.metadata") - } + ModelTableExpr(s.GetPrefixedRelationName("moves")). + GroupExpr("accounts_address, asset") dateFilterColumn := "effective_date" if useInsertionDate { @@ -140,36 +115,56 @@ func (s *Store) selectVolumes(oot, pit *time.Time, useInsertionDate bool, groupL } if pit != nil && !pit.IsZero() { - ret = ret.Where(dateFilterColumn+" <= ?", pit) + selectVolumes = selectVolumes.Where(dateFilterColumn+" <= ?", pit) } if oot != nil && !oot.IsZero() { - ret = ret.Where(dateFilterColumn+" >= ?", oot) + selectVolumes = selectVolumes.Where(dateFilterColumn+" >= ?", oot) } - ret = ret.GroupExpr("accounts_address, asset") - if needSegmentAddress { - ret = ret.GroupExpr("address_array") - } + ret = ret. + ModelTableExpr("(?) volumes", selectVolumes). + Column("address", "asset", "input", "output", "balance") - globalQuery := s.db.NewSelect() - globalQuery = globalQuery. - With("query", ret). - ModelTableExpr("query") + if needSegmentAddress { + selectAccount := s.db.NewSelect(). + ModelTableExpr(s.GetPrefixedRelationName("accounts")). + Where("ledger = ? and address = volumes.address", s.ledger.Name). + Column("address_array") + if useMetadata && (pit == nil || pit.IsZero()) { + selectAccount = selectAccount.Column("metadata") + } - if groupLevel > 0 { - globalQuery = globalQuery. - ColumnExpr(fmt.Sprintf(`(array_to_string((string_to_array(address, ':'))[1:LEAST(array_length(string_to_array(address, ':'),1),%d)],':')) as account`, groupLevel)). - ColumnExpr("asset"). - ColumnExpr("sum(input) as input"). - ColumnExpr("sum(output) as output"). - ColumnExpr("sum(balance) as balance"). - GroupExpr("account, asset") - } else { - globalQuery = globalQuery.ColumnExpr("address as account, asset, input, output, balance") + ret = ret. + Join("join lateral (?) accounts on true", selectAccount). + Column("accounts.address_array") + if useMetadata && (pit == nil || pit.IsZero()) { + ret = ret.Column("accounts.metadata") + } } if useMetadata { - globalQuery = globalQuery.Column("metadata") + switch { + case needSegmentAddress && (pit == nil || pit.IsZero()): + // nothing to do, already handled earlier + case !needSegmentAddress && (pit == nil || pit.IsZero()): + selectAccount := s.db.NewSelect(). + ModelTableExpr(s.GetPrefixedRelationName("accounts")). + Where("ledger = ? and address = volumes.address", s.ledger.Name). + Column("metadata") + + ret = ret. + Join("join lateral (?) accounts on true", selectAccount). + Column("accounts.metadata") + case pit != nil && !pit.IsZero(): + selectAccountMetadata := s.db.NewSelect(). + Column("metadata"). + ModelTableExpr(s.GetPrefixedRelationName("accounts_metadata")). + Where("ledger = ? and accounts_address = volumes.address and date <= ?", s.ledger.Name, pit) + + ret = ret. + Join("join lateral (?) accounts_metadata on true", selectAccountMetadata). + Column("accounts_metadata.metadata") + } } if q != nil { @@ -195,7 +190,24 @@ func (s *Store) selectVolumes(oot, pit *time.Time, useInsertionDate bool, groupL if err != nil { return ret.Err(err) } - globalQuery = globalQuery.Where(where, args...) + ret = ret.Where(where, args...) + } + + globalQuery := s.db.NewSelect() + globalQuery = globalQuery. + With("query", ret). + ModelTableExpr("query") + + if groupLevel > 0 { + globalQuery = globalQuery. + ColumnExpr(fmt.Sprintf(`(array_to_string((string_to_array(address, ':'))[1:LEAST(array_length(string_to_array(address, ':'),1),%d)],':')) as account`, groupLevel)). + ColumnExpr("asset"). + ColumnExpr("sum(input) as input"). + ColumnExpr("sum(output) as output"). + ColumnExpr("sum(balance) as balance"). + GroupExpr("account, asset") + } else { + globalQuery = globalQuery.ColumnExpr("address as account, asset, input, output, balance") } return globalQuery diff --git a/internal/storage/ledger/volumes_test.go b/internal/storage/ledger/volumes_test.go index 2e4c7c07f..a5589b8f6 100644 --- a/internal/storage/ledger/volumes_test.go +++ b/internal/storage/ledger/volumes_test.go @@ -4,6 +4,7 @@ package ledger_test import ( "database/sql" + "github.com/formancehq/go-libs/pointer" "math/big" "testing" libtime "time" @@ -22,7 +23,7 @@ import ( "github.com/stretchr/testify/require" ) -func TestGetVolumesWithBalances(t *testing.T) { +func TestVolumesList(t *testing.T) { t.Parallel() store := newLedgerStore(t) now := time.Now() @@ -102,76 +103,7 @@ func TestGetVolumesWithBalances(t *testing.T) { err = store.CommitTransaction(ctx, &tx8) require.NoError(t, err) - //require.NoError(t, store.InsertLogs(ctx, - // ledger.ChainLogs( - // //ledger.NewSetMetadataOnAccountLog(time.Now(), "account:1", metadata.Metadata{"category": "1"}).WithTimestamp(now), - // //ledger.NewSetMetadataOnAccountLog(time.Now(), "account:2", metadata.Metadata{"category": "2"}).WithTimestamp(now), - // //ledger.NewSetMetadataOnAccountLog(time.Now(), "world", metadata.Metadata{"foo": "bar"}).WithTimestamp(now), - // //ledger.NewLog( - // // ledger.NewTransaction(). - // // WithPostings(ledger.NewPosting("world", "account:1", "USD", big.NewInt(100))). - // // WithTimestamp(now.Add(-4*time.Minute)), - // // map[string]metadata.Metadata{}, - // //).WithTimestamp(now.Add(4*time.Minute)), - // // - // //ledger.NewLog( - // // ledger.NewTransaction(). - // // WithPostings(ledger.NewPosting("world", "account:1", "USD", big.NewInt(100))). - // // WithIDUint64(1). - // // WithTimestamp(now.Add(-3*time.Minute)), - // // map[string]metadata.Metadata{}, - // //).WithTimestamp(now.Add(3*time.Minute)), - // - // //ledger.NewLog( - // // ledger.NewTransaction(). - // // WithPostings(ledger.NewPosting("account:1", "bank", "USD", big.NewInt(50))). - // // WithTimestamp(now.Add(-2*time.Minute)). - // // WithIDUint64(2), - // // map[string]metadata.Metadata{}, - // //).WithTimestamp(now.Add(2*time.Minute)), - // - // //ledger.NewLog( - // // ledger.NewTransaction(). - // // WithPostings(ledger.NewPosting("world", "account:1", "USD", big.NewInt(0))). - // // WithTimestamp(now.Add(-time.Minute)). - // // WithIDUint64(3), - // // map[string]metadata.Metadata{}, - // //).WithTimestamp(now.Add(1*time.Minute)), - // - // //ledger.NewLog( - // // ledger.NewTransaction(). - // // WithPostings(ledger.NewPosting("world", "account:2", "USD", big.NewInt(50))). - // // WithTimestamp(now).WithIDUint64(4), - // // map[string]metadata.Metadata{}, - // //).WithTimestamp(now), - // - // //ledger.NewLog( - // // ledger.NewTransaction(). - // // WithPostings(ledger.NewPosting("world", "account:2", "USD", big.NewInt(50))). - // // WithIDUint64(5). - // // WithTimestamp(now.Add(1*time.Minute)), - // // map[string]metadata.Metadata{}, - // //).WithTimestamp(now.Add(-1*time.Minute)), - // - // //ledger.NewLog( - // // ledger.NewTransaction(). - // // WithPostings(ledger.NewPosting("account:2", "bank", "USD", big.NewInt(50))). - // // WithTimestamp(now.Add(2*time.Minute)). - // // WithIDUint64(6), - // // map[string]metadata.Metadata{}, - // //).WithTimestamp(now.Add(-2*time.Minute)), - // - // //ledger.NewLog( - // // ledger.NewTransaction(). - // // WithPostings(ledger.NewPosting("world", "account:2", "USD", big.NewInt(25))). - // // WithTimestamp(now.Add(3*time.Minute)). - // // WithIDUint64(7), - // // map[string]metadata.Metadata{}, - // //).WithTimestamp(now.Add(-3*time.Minute)), - // )..., - //)) - - t.Run("Get All Volumes with Balance for Insertion date", func(t *testing.T) { + t.Run("Get all volumes with balance for insertion date", func(t *testing.T) { t.Parallel() volumes, err := store.GetVolumesWithBalances(ctx, ledgercontroller.NewGetVolumesWithBalancesQuery(ledgercontroller.NewPaginatedQueryOptions(ledgercontroller.FiltersForVolumes{UseInsertionDate: true}))) require.NoError(t, err) @@ -179,7 +111,7 @@ func TestGetVolumesWithBalances(t *testing.T) { require.Len(t, volumes.Data, 4) }) - t.Run("Get All Volumes with Balance for Effective date", func(t *testing.T) { + t.Run("Get all volumes with balance for effective date", func(t *testing.T) { t.Parallel() volumes, err := store.GetVolumesWithBalances(ctx, ledgercontroller.NewGetVolumesWithBalancesQuery(ledgercontroller.NewPaginatedQueryOptions(ledgercontroller.FiltersForVolumes{UseInsertionDate: false}))) require.NoError(t, err) @@ -187,7 +119,7 @@ func TestGetVolumesWithBalances(t *testing.T) { require.Len(t, volumes.Data, 4) }) - t.Run("Get All Volumes with Balance for Insertion date with previous pit", func(t *testing.T) { + t.Run("Get all volumes with balance for insertion date with previous pit", func(t *testing.T) { t.Parallel() volumes, err := store.GetVolumesWithBalances(ctx, ledgercontroller.NewGetVolumesWithBalancesQuery(ledgercontroller.NewPaginatedQueryOptions( ledgercontroller.FiltersForVolumes{ @@ -208,7 +140,7 @@ func TestGetVolumesWithBalances(t *testing.T) { }, volumes.Data[0]) }) - t.Run("Get All Volumes with Balance for Insertion date with futur pit", func(t *testing.T) { + t.Run("Get all volumes with balance for insertion date with futur pit", func(t *testing.T) { t.Parallel() volumes, err := store.GetVolumesWithBalances(ctx, ledgercontroller.NewGetVolumesWithBalancesQuery(ledgercontroller.NewPaginatedQueryOptions( ledgercontroller.FiltersForVolumes{ @@ -220,7 +152,7 @@ func TestGetVolumesWithBalances(t *testing.T) { require.Len(t, volumes.Data, 4) }) - t.Run("Get All Volumes with Balance for Insertion date with previous oot", func(t *testing.T) { + t.Run("Get all volumes with balance for insertion date with previous oot", func(t *testing.T) { t.Parallel() volumes, err := store.GetVolumesWithBalances(ctx, ledgercontroller.NewGetVolumesWithBalancesQuery(ledgercontroller.NewPaginatedQueryOptions( ledgercontroller.FiltersForVolumes{ @@ -232,7 +164,7 @@ func TestGetVolumesWithBalances(t *testing.T) { require.Len(t, volumes.Data, 4) }) - t.Run("Get All Volumes with Balance for Insertion date with futur oot", func(t *testing.T) { + t.Run("Get all volumes with balance for insertion date with future oot", func(t *testing.T) { t.Parallel() volumes, err := store.GetVolumesWithBalances(ctx, ledgercontroller.NewGetVolumesWithBalancesQuery(ledgercontroller.NewPaginatedQueryOptions( ledgercontroller.FiltersForVolumes{ @@ -253,7 +185,7 @@ func TestGetVolumesWithBalances(t *testing.T) { }, volumes.Data[0]) }) - t.Run("Get All Volumes with Balance for Effective date with previous pit", func(t *testing.T) { + t.Run("Get all volumes with balance for effective date with previous pit", func(t *testing.T) { t.Parallel() volumes, err := store.GetVolumesWithBalances(ctx, ledgercontroller.NewGetVolumesWithBalancesQuery(ledgercontroller.NewPaginatedQueryOptions( ledgercontroller.FiltersForVolumes{ @@ -274,7 +206,7 @@ func TestGetVolumesWithBalances(t *testing.T) { }, volumes.Data[0]) }) - t.Run("Get All Volumes with Balance for Effective date with futur pit", func(t *testing.T) { + t.Run("Get all volumes with balance for effective date with futur pit", func(t *testing.T) { t.Parallel() volumes, err := store.GetVolumesWithBalances(ctx, ledgercontroller.NewGetVolumesWithBalancesQuery(ledgercontroller.NewPaginatedQueryOptions( ledgercontroller.FiltersForVolumes{ @@ -286,7 +218,7 @@ func TestGetVolumesWithBalances(t *testing.T) { require.Len(t, volumes.Data, 4) }) - t.Run("Get All Volumes with Balance for Effective date with previous oot", func(t *testing.T) { + t.Run("Get all volumes with balance for effective date with previous oot", func(t *testing.T) { t.Parallel() volumes, err := store.GetVolumesWithBalances(ctx, ledgercontroller.NewGetVolumesWithBalancesQuery(ledgercontroller.NewPaginatedQueryOptions( ledgercontroller.FiltersForVolumes{ @@ -298,7 +230,7 @@ func TestGetVolumesWithBalances(t *testing.T) { require.Len(t, volumes.Data, 4) }) - t.Run("Get All Volumes with Balance for effective date with futur oot", func(t *testing.T) { + t.Run("Get all volumes with balance for effective date with futur oot", func(t *testing.T) { t.Parallel() volumes, err := store.GetVolumesWithBalances(ctx, ledgercontroller.NewGetVolumesWithBalancesQuery(ledgercontroller.NewPaginatedQueryOptions( ledgercontroller.FiltersForVolumes{ @@ -319,7 +251,7 @@ func TestGetVolumesWithBalances(t *testing.T) { }, volumes.Data[0]) }) - t.Run("Get All Volumes with Balance for insertion date with futur PIT and now OOT", func(t *testing.T) { + t.Run("Get all volumes with balance for insertion date with future PIT and now OOT", func(t *testing.T) { t.Parallel() volumes, err := store.GetVolumesWithBalances(ctx, ledgercontroller.NewGetVolumesWithBalancesQuery(ledgercontroller.NewPaginatedQueryOptions( ledgercontroller.FiltersForVolumes{ @@ -341,7 +273,7 @@ func TestGetVolumesWithBalances(t *testing.T) { }) - t.Run("Get All Volumes with Balance for insertion date with previous OOT and now PIT", func(t *testing.T) { + t.Run("Get all volumes with balance for insertion date with previous OOT and now PIT", func(t *testing.T) { t.Parallel() volumes, err := store.GetVolumesWithBalances(ctx, ledgercontroller.NewGetVolumesWithBalancesQuery(ledgercontroller.NewPaginatedQueryOptions( ledgercontroller.FiltersForVolumes{ @@ -363,7 +295,7 @@ func TestGetVolumesWithBalances(t *testing.T) { }) - t.Run("Get All Volumes with Balance for effective date with futur PIT and now OOT", func(t *testing.T) { + t.Run("Get all volumes with balance for effective date with future PIT and now OOT", func(t *testing.T) { t.Parallel() volumes, err := store.GetVolumesWithBalances(ctx, ledgercontroller.NewGetVolumesWithBalancesQuery(ledgercontroller.NewPaginatedQueryOptions( ledgercontroller.FiltersForVolumes{ @@ -384,7 +316,7 @@ func TestGetVolumesWithBalances(t *testing.T) { }, volumes.Data[0]) }) - t.Run("Get All Volumes with Balance for insertion date with previous OOT and now PIT", func(t *testing.T) { + t.Run("Get all volumes with balance for insertion date with previous OOT and now PIT", func(t *testing.T) { t.Parallel() volumes, err := store.GetVolumesWithBalances(ctx, ledgercontroller.NewGetVolumesWithBalancesQuery(ledgercontroller.NewPaginatedQueryOptions( ledgercontroller.FiltersForVolumes{ @@ -457,7 +389,6 @@ func TestGetVolumesWithBalances(t *testing.T) { require.NoError(t, err) require.Len(t, volumes.Data, 2) - }) t.Run("Using exists metadata filter 2", func(t *testing.T) { @@ -471,11 +402,10 @@ func TestGetVolumesWithBalances(t *testing.T) { require.NoError(t, err) require.Len(t, volumes.Data, 1) - }) } -func TestAggGetVolumesWithBalances(t *testing.T) { +func TestVolumesAggregate(t *testing.T) { t.Parallel() store := newLedgerStore(t) now := time.Now() @@ -527,7 +457,13 @@ func TestAggGetVolumesWithBalances(t *testing.T) { err = store.CommitTransaction(ctx, &tx7) require.NoError(t, err) - t.Run("Aggregation Volumes with Balance for GroupLvl 0", func(t *testing.T) { + require.NoError(t, store.UpdateAccountsMetadata(ctx, map[string]metadata.Metadata{ + "account:1:1": { + "foo": "bar", + }, + })) + + t.Run("Aggregation Volumes with balance for GroupLvl 0", func(t *testing.T) { t.Parallel() volumes, err := store.GetVolumesWithBalances(ctx, ledgercontroller.NewGetVolumesWithBalancesQuery( ledgercontroller.NewPaginatedQueryOptions( @@ -540,7 +476,7 @@ func TestAggGetVolumesWithBalances(t *testing.T) { require.Len(t, volumes.Data, 7) }) - t.Run("Aggregation Volumes with Balance for GroupLvl 1", func(t *testing.T) { + t.Run("Aggregation Volumes with balance for GroupLvl 1", func(t *testing.T) { t.Parallel() volumes, err := store.GetVolumesWithBalances(ctx, ledgercontroller.NewGetVolumesWithBalancesQuery( ledgercontroller.NewPaginatedQueryOptions( @@ -553,7 +489,7 @@ func TestAggGetVolumesWithBalances(t *testing.T) { require.Len(t, volumes.Data, 2) }) - t.Run("Aggregation Volumes with Balance for GroupLvl 2", func(t *testing.T) { + t.Run("Aggregation Volumes with balance for GroupLvl 2", func(t *testing.T) { t.Parallel() volumes, err := store.GetVolumesWithBalances(ctx, ledgercontroller.NewGetVolumesWithBalancesQuery( ledgercontroller.NewPaginatedQueryOptions( @@ -566,7 +502,7 @@ func TestAggGetVolumesWithBalances(t *testing.T) { require.Len(t, volumes.Data, 4) }) - t.Run("Aggregation Volumes with Balance for GroupLvl 3", func(t *testing.T) { + t.Run("Aggregation Volumes with balance for GroupLvl 3", func(t *testing.T) { t.Parallel() volumes, err := store.GetVolumesWithBalances(ctx, ledgercontroller.NewGetVolumesWithBalancesQuery( ledgercontroller.NewPaginatedQueryOptions( @@ -579,7 +515,7 @@ func TestAggGetVolumesWithBalances(t *testing.T) { require.Len(t, volumes.Data, 7) }) - t.Run("Aggregation Volumes with Balance for GroupLvl 1 && PIT && OOT && effectiveDate", func(t *testing.T) { + t.Run("Aggregation Volumes with balance for GroupLvl 1 && PIT && OOT && effectiveDate", func(t *testing.T) { t.Parallel() volumes, err := store.GetVolumesWithBalances(ctx, ledgercontroller.NewGetVolumesWithBalancesQuery( @@ -614,7 +550,7 @@ func TestAggGetVolumesWithBalances(t *testing.T) { }) }) - t.Run("Aggregation Volumes with Balance for GroupLvl 1 && PIT && OOT && effectiveDate && Balance Filter 1", func(t *testing.T) { + t.Run("Aggregation Volumes with balance for GroupLvl 1 && PIT && OOT && effectiveDate && Balance Filter 1", func(t *testing.T) { t.Parallel() volumes, err := store.GetVolumesWithBalances(ctx, ledgercontroller.NewGetVolumesWithBalancesQuery( ledgercontroller.NewPaginatedQueryOptions( @@ -641,7 +577,7 @@ func TestAggGetVolumesWithBalances(t *testing.T) { }) }) - t.Run("Aggregation Volumes with Balance for GroupLvl 1 && Balance Filter 2", func(t *testing.T) { + t.Run("Aggregation Volumes with balance for GroupLvl 1 && Balance Filter 2", func(t *testing.T) { t.Parallel() volumes, err := store.GetVolumesWithBalances(ctx, ledgercontroller.NewGetVolumesWithBalancesQuery( ledgercontroller.NewPaginatedQueryOptions( @@ -684,6 +620,61 @@ func TestAggGetVolumesWithBalances(t *testing.T) { }, }) }) + t.Run("filter using account matching, metadata, and group", func(t *testing.T) { + t.Parallel() + + volumes, err := store.GetVolumesWithBalances(ctx, + ledgercontroller.NewGetVolumesWithBalancesQuery( + ledgercontroller.NewPaginatedQueryOptions( + ledgercontroller.FiltersForVolumes{ + GroupLvl: 1, + }).WithQueryBuilder(query.And( + query.Match("account", "account::"), + query.Match("metadata[foo]", "bar"), + ))), + ) + + require.NoError(t, err) + require.Len(t, volumes.Data, 1) + }) + + t.Run("filter using account matching, metadata, and group and PIT", func(t *testing.T) { + t.Parallel() + + volumes, err := store.GetVolumesWithBalances(ctx, + ledgercontroller.NewGetVolumesWithBalancesQuery( + ledgercontroller.NewPaginatedQueryOptions( + ledgercontroller.FiltersForVolumes{ + GroupLvl: 1, + PITFilter: ledgercontroller.PITFilter{ + PIT: pointer.For(now.Add(time.Minute)), + }, + }).WithQueryBuilder(query.And( + query.Match("account", "account::"), + query.Match("metadata[foo]", "bar"), + ))), + ) + + require.NoError(t, err) + require.Len(t, volumes.Data, 1) + }) + + t.Run("filter using metadata matching only", func(t *testing.T) { + t.Parallel() + + volumes, err := store.GetVolumesWithBalances(ctx, + ledgercontroller.NewGetVolumesWithBalancesQuery( + ledgercontroller.NewPaginatedQueryOptions( + ledgercontroller.FiltersForVolumes{ + GroupLvl: 1, + }).WithQueryBuilder(query.And( + query.Match("metadata[foo]", "bar"), + ))), + ) + + require.NoError(t, err) + require.Len(t, volumes.Data, 1) + }) } func TestUpdateVolumes(t *testing.T) {