Skip to content

Commit

Permalink
feat: add balances aggregation by metadata (#826)
Browse files Browse the repository at this point in the history
  • Loading branch information
gfyrag authored and flemzord committed Dec 4, 2023
1 parent 88af7b9 commit 7e5ecf1
Show file tree
Hide file tree
Showing 11 changed files with 112 additions and 21 deletions.
1 change: 0 additions & 1 deletion Earthfile
Original file line number Diff line number Diff line change
Expand Up @@ -78,5 +78,4 @@ lint:
SAVE ARTIFACT main.go AS LOCAL main.go

pre-commit:
BUILD --pass-args +generate-mocks
BUILD --pass-args +copy-libs
5 changes: 3 additions & 2 deletions internal/api/bulk/bulk.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,13 @@ import (
"context"
"encoding/json"
"fmt"
"github.com/formancehq/ledger/internal"
"math/big"

ledger "github.com/formancehq/ledger/internal"
"github.com/formancehq/ledger/internal/api/backend"
"github.com/formancehq/ledger/internal/api/shared"
"github.com/formancehq/ledger/internal/engine/command"
"github.com/formancehq/stack/libs/go-libs/metadata"
"math/big"
)

const (
Expand Down
3 changes: 2 additions & 1 deletion internal/api/v2/controllers_bulk.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@ package v2

import (
"encoding/json"
"net/http"

"github.com/formancehq/ledger/internal/api/bulk"
"github.com/formancehq/ledger/internal/api/shared"
sharedapi "github.com/formancehq/stack/libs/go-libs/api"
"net/http"
)

func bulkHandler(w http.ResponseWriter, r *http.Request) {
Expand Down
13 changes: 7 additions & 6 deletions internal/api/v2/controllers_bulk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,13 @@ package v2_test
import (
"bytes"
"fmt"
"math/big"
"net/http"
"net/http/httptest"
"net/url"
"testing"
"time"

ledger "github.com/formancehq/ledger/internal"
"github.com/formancehq/ledger/internal/api/backend"
"github.com/formancehq/ledger/internal/api/bulk"
Expand All @@ -14,12 +21,6 @@ import (
"github.com/pkg/errors"
"github.com/stretchr/testify/require"
"go.uber.org/mock/gomock"
"math/big"
"net/http"
"net/http/httptest"
"net/url"
"testing"
"time"
)

func TestBulk(t *testing.T) {
Expand Down
3 changes: 2 additions & 1 deletion internal/api/v2/controllers_transactions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,15 @@ package v2_test
import (
"bytes"
"fmt"
"github.com/formancehq/ledger/internal/api/shared"
"math/big"
"net/http"
"net/http/httptest"
"net/url"
"testing"
"time"

"github.com/formancehq/ledger/internal/api/shared"

ledger "github.com/formancehq/ledger/internal"
v2 "github.com/formancehq/ledger/internal/api/v2"
"github.com/formancehq/ledger/internal/engine/command"
Expand Down
5 changes: 3 additions & 2 deletions internal/api/v2/utils.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
package v2

import (
"io"
"net/http"

ledger "github.com/formancehq/ledger/internal"
"github.com/formancehq/ledger/internal/storage/ledgerstore"
"github.com/formancehq/stack/libs/go-libs/collectionutils"
"github.com/formancehq/stack/libs/go-libs/pointer"
"github.com/formancehq/stack/libs/go-libs/query"
"io"
"net/http"
)

func getPITFilter(r *http.Request) (*ledgerstore.PITFilter, error) {
Expand Down
1 change: 1 addition & 0 deletions internal/script.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package ledger

import (
"fmt"

"github.com/formancehq/stack/libs/go-libs/metadata"
)

Expand Down
32 changes: 31 additions & 1 deletion internal/storage/ledgerstore/balances.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"fmt"
"math/big"

"github.com/formancehq/ledger/internal/storage"

ledger "github.com/formancehq/ledger/internal"
"github.com/formancehq/ledger/internal/storage/paginate"
"github.com/formancehq/stack/libs/go-libs/query"
Expand All @@ -18,7 +20,7 @@ func (store *Store) GetAggregatedBalances(ctx context.Context, q *GetAggregatedB
type Temp struct {
Aggregated ledger.VolumesByAssets `bun:"aggregated,type:jsonb"`
}
return fetchAndMap[*Temp, ledger.BalancesByAssets](store, ctx,
ret, err := fetchAndMap[*Temp, ledger.BalancesByAssets](store, ctx,
func(temp *Temp) ledger.BalancesByAssets {
return temp.Aggregated.Balances()
},
Expand All @@ -31,6 +33,7 @@ func (store *Store) GetAggregatedBalances(ctx context.Context, q *GetAggregatedB
Apply(filterPIT(q.Options.Options.PIT, "insertion_date")) // todo(gfyrag): expose capability to use effective_date

if q.Options.QueryBuilder != nil {
joinOnMetadataAdded := false
subQuery, args, err := q.Options.QueryBuilder.Build(query.ContextFn(func(key, operator string, value any) (string, []any, error) {
switch {
case key == "address":
Expand All @@ -44,6 +47,25 @@ func (store *Store) GetAggregatedBalances(ctx context.Context, q *GetAggregatedB
default:
return "", nil, fmt.Errorf("unexpected type %T for column 'address'", address)
}
case metadataRegex.Match([]byte(key)):
if operator != "$match" {
return "", nil, errors.New("'metadata' column can only be used with $match")
}
match := metadataRegex.FindAllStringSubmatch(key, 3)
if !joinOnMetadataAdded {
moves = moves.Join(`left join lateral (
select metadata
from accounts_metadata am
where am.address = moves.account_address and (? is null or date <= ?)
order by revision desc
limit 1
) am on true`, q.Options.Options.PIT, q.Options.Options.PIT)
joinOnMetadataAdded = true
}

return "am.metadata @> ?", []any{map[string]any{
match[0][1]: value,
}}, nil
default:
return "", nil, fmt.Errorf("unknown key '%s' when building query", key)
}
Expand All @@ -60,6 +82,14 @@ func (store *Store) GetAggregatedBalances(ctx context.Context, q *GetAggregatedB
ColumnExpr("volumes_to_jsonb((moves.asset, (sum((moves.post_commit_volumes).inputs), sum((moves.post_commit_volumes).outputs))::volumes)) as aggregated").
Group("moves.asset")
})
if err != nil && !errors.Is(err, storage.ErrNotFound) {
return nil, err
}
if errors.Is(err, storage.ErrNotFound) {
return ledger.BalancesByAssets{}, nil
}

return ret, nil
}

func (store *Store) GetBalance(ctx context.Context, address, asset string) (*big.Int, error) {
Expand Down
62 changes: 57 additions & 5 deletions internal/storage/ledgerstore/balances_test.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
package ledgerstore_test

import (
"context"
"math/big"
"testing"
"time"

"github.com/formancehq/stack/libs/go-libs/logging"
"github.com/formancehq/stack/libs/go-libs/pointer"

ledger "github.com/formancehq/ledger/internal"
"github.com/formancehq/ledger/internal/storage/ledgerstore"
internaltesting "github.com/formancehq/ledger/internal/testing"
Expand All @@ -18,6 +20,7 @@ func TestGetBalancesAggregated(t *testing.T) {
t.Parallel()
store := newLedgerStore(t)
now := ledger.Now()
ctx := logging.TestingContext()

bigInt, _ := big.NewInt(0).SetString("999999999999999999999999999999999999999999999999999999999999999999999999999999999", 10)
smallInt := big.NewInt(199)
Expand All @@ -32,24 +35,43 @@ func TestGetBalancesAggregated(t *testing.T) {
ledger.NewPosting("world", "users:2", "USD", smallInt),
).WithDate(now.Add(time.Minute)).WithIDUint64(1)

require.NoError(t, store.InsertLogs(context.Background(),
require.NoError(t, store.InsertLogs(ctx,
ledger.ChainLogs(
ledger.NewTransactionLog(tx1, map[string]metadata.Metadata{}).WithDate(tx1.Timestamp),
ledger.NewTransactionLog(tx2, map[string]metadata.Metadata{}).WithDate(tx2.Timestamp),
ledger.NewSetMetadataLog(now.Add(time.Minute), ledger.SetMetadataLogPayload{
TargetType: ledger.MetaTargetTypeAccount,
TargetID: "users:1",
Metadata: metadata.Metadata{
"category": "premium",
},
}),
ledger.NewSetMetadataLog(now.Add(time.Minute), ledger.SetMetadataLogPayload{
TargetType: ledger.MetaTargetTypeAccount,
TargetID: "users:2",
Metadata: metadata.Metadata{
"category": "premium",
},
}),
ledger.NewDeleteMetadataLog(now.Add(2*time.Minute), ledger.DeleteMetadataLogPayload{
TargetType: ledger.MetaTargetTypeAccount,
TargetID: "users:2",
Key: "category",
}),
)...))

t.Run("aggregate on all", func(t *testing.T) {
t.Parallel()
q := ledgerstore.NewPaginatedQueryOptions(ledgerstore.PITFilter{}).WithPageSize(10)
cursor, err := store.GetAggregatedBalances(context.Background(), ledgerstore.NewGetAggregatedBalancesQuery(q))
cursor, err := store.GetAggregatedBalances(ctx, ledgerstore.NewGetAggregatedBalancesQuery(q))
require.NoError(t, err)
internaltesting.RequireEqual(t, ledger.BalancesByAssets{
"USD": big.NewInt(0),
}, cursor)
})
t.Run("filter on address", func(t *testing.T) {
t.Parallel()
ret, err := store.GetAggregatedBalances(context.Background(), ledgerstore.NewGetAggregatedBalancesQuery(ledgerstore.NewPaginatedQueryOptions(ledgerstore.PITFilter{}).
ret, err := store.GetAggregatedBalances(ctx, ledgerstore.NewGetAggregatedBalancesQuery(ledgerstore.NewPaginatedQueryOptions(ledgerstore.PITFilter{}).
WithQueryBuilder(query.Match("address", "users:")).
WithPageSize(10),
))
Expand All @@ -63,7 +85,7 @@ func TestGetBalancesAggregated(t *testing.T) {
})
t.Run("using pit", func(t *testing.T) {
t.Parallel()
ret, err := store.GetAggregatedBalances(context.Background(), ledgerstore.NewGetAggregatedBalancesQuery(ledgerstore.NewPaginatedQueryOptions(ledgerstore.PITFilter{
ret, err := store.GetAggregatedBalances(ctx, ledgerstore.NewGetAggregatedBalancesQuery(ledgerstore.NewPaginatedQueryOptions(ledgerstore.PITFilter{
PIT: &now,
}).
WithQueryBuilder(query.Match("address", "users:")).
Expand All @@ -76,4 +98,34 @@ func TestGetBalancesAggregated(t *testing.T) {
),
}, ret)
})
t.Run("using a metadata and pit", func(t *testing.T) {
t.Parallel()
ret, err := store.GetAggregatedBalances(ctx, ledgerstore.NewGetAggregatedBalancesQuery(ledgerstore.NewPaginatedQueryOptions(ledgerstore.PITFilter{
PIT: pointer.For(now.Add(time.Minute)),
}).
WithQueryBuilder(query.Match("metadata[category]", "premium")).
WithPageSize(10)))
require.NoError(t, err)
require.Equal(t, ledger.BalancesByAssets{
"USD": big.NewInt(400),
}, ret)
})
t.Run("using a metadata without pit", func(t *testing.T) {
t.Parallel()
ret, err := store.GetAggregatedBalances(ctx, ledgerstore.NewGetAggregatedBalancesQuery(ledgerstore.NewPaginatedQueryOptions(ledgerstore.PITFilter{}).
WithQueryBuilder(query.Match("metadata[category]", "premium")).
WithPageSize(10)))
require.NoError(t, err)
require.Equal(t, ledger.BalancesByAssets{
"USD": big.NewInt(2),
}, ret)
})
t.Run("when no matching", func(t *testing.T) {
t.Parallel()
ret, err := store.GetAggregatedBalances(ctx, ledgerstore.NewGetAggregatedBalancesQuery(ledgerstore.NewPaginatedQueryOptions(ledgerstore.PITFilter{}).
WithQueryBuilder(query.Match("metadata[category]", "guest")).
WithPageSize(10)))
require.NoError(t, err)
require.Equal(t, ledger.BalancesByAssets{}, ret)
})
}
5 changes: 4 additions & 1 deletion internal/storage/ledgerstore/transactions.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ const (
MovesTableName = "moves"
)

var (
metadataRegex = regexp.MustCompile("metadata\\[(.+)\\]")
)

type Transaction struct {
bun.BaseModel `bun:"transactions,alias:transactions"`

Expand Down Expand Up @@ -145,7 +149,6 @@ func (store *Store) buildTransactionQuery(p PITFilterWithVolumes, query *bun.Sel
}

func (store *Store) transactionQueryContext(qb query.Builder) (string, []any, error) {
metadataRegex := regexp.MustCompile("metadata\\[(.+)\\]")

return qb.Build(query.ContextFn(func(key, operator string, value any) (string, []any, error) {
switch {
Expand Down
3 changes: 2 additions & 1 deletion internal/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@ package ledger

import (
"fmt"
"github.com/pkg/errors"
"math/big"

"github.com/pkg/errors"

"github.com/formancehq/stack/libs/go-libs/metadata"
)

Expand Down

0 comments on commit 7e5ecf1

Please sign in to comment.