Skip to content

Commit

Permalink
feat: add limit for select in query init ledgers (#223)
Browse files Browse the repository at this point in the history
  • Loading branch information
paul-nicolas authored and flemzord committed May 12, 2023
1 parent f63ba72 commit 46fe475
Show file tree
Hide file tree
Showing 10 changed files with 211 additions and 56 deletions.
3 changes: 3 additions & 0 deletions cmd/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ func resolveOptions(v *viper.Viper, userOptions ...fx.Option) []fx.Option {
EvictionRetainDelay: v.GetDuration(cacheEvictionRetainDelay),
EvictionPeriod: v.GetDuration(cacheEvictionPeriodFlag),
},
Query: ledger.QueryConfiguration{
LimitReadLogs: v.GetInt(queryLimitReadLogsFlag),
},
}),
)

Expand Down
2 changes: 2 additions & 0 deletions cmd/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
const (
cacheEvictionPeriodFlag = "cache-eviction-period"
cacheEvictionRetainDelay = "cache-eviction-retain-delay"
queryLimitReadLogsFlag = "query-limit-read-logs"
)

func NewServe() *cobra.Command {
Expand All @@ -29,5 +30,6 @@ func NewServe() *cobra.Command {
}
cmd.Flags().Duration(cacheEvictionPeriodFlag, cache.DefaultEvictionPeriod, "Cache eviction period")
cmd.Flags().Duration(cacheEvictionRetainDelay, cache.DefaultRetainDelay, "Cache retain delay")
cmd.Flags().Int(queryLimitReadLogsFlag, 10000, "Query limit read logs")
return cmd
}
8 changes: 8 additions & 0 deletions pkg/ledger/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,14 @@ type CacheConfiguration struct {
EvictionPeriod time.Duration
}

type QueryConfiguration struct {
LimitReadLogs int
}

type Configuration struct {
AllowPastTimestamp bool
Cache CacheConfiguration
Query QueryConfiguration
}

func Module(configuration Configuration) fx.Option {
Expand All @@ -41,5 +46,8 @@ func Module(configuration Configuration) fx.Option {
fx.Provide(fx.Annotate(monitor.NewNoOpMonitor, fx.As(new(monitor.Monitor)))),
fx.Provide(fx.Annotate(metrics.NewNoOpMetricsRegistry, fx.As(new(metrics.GlobalMetricsRegistry)))),
query.InitModule(),
fx.Decorate(func() *query.InitLedgerConfig {
return query.NewInitLedgerConfig(configuration.Query.LimitReadLogs)
}),
)
}
129 changes: 82 additions & 47 deletions pkg/ledger/query/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,56 +11,37 @@ import (
"golang.org/x/sync/errgroup"
)

func initLedger(
ctx context.Context,
ledgerName string,
store Store,
monitor monitor.Monitor,
metricsRegistry metrics.PerLedgerMetricsRegistry,
) (uint64, error) {
if !store.IsInitialized() {
return 0, nil
}

nextLogIDToProcess, err := store.GetNextLogID(ctx)
if err != nil && !storage.IsNotFoundError(err) {
return 0, errorsutil.NewError(ErrStorage,
errors.Wrap(err, "reading last log"))
}

logs, err := store.ReadLogsStartingFromID(ctx, nextLogIDToProcess)
if err != nil {
return 0, errorsutil.NewError(ErrStorage,
errors.Wrap(err, "reading logs since last ID"))
}

if len(logs) == 0 {
return 0, nil
}

if err := processLogs(ctx, ledgerName, store, monitor, logs...); err != nil {
return 0, errors.Wrap(err, "processing logs")
}
type InitLedger struct {
cfg *InitLedgerConfig
driver storage.Driver
monitor monitor.Monitor
metricsRegistry metrics.PerLedgerMetricsRegistry
}

metricsRegistry.QueryProcessedLogs().Add(ctx, int64(len(logs)))
type InitLedgerConfig struct {
LimitReadLogs int
}

if err := store.UpdateNextLogID(ctx, logs[len(logs)-1].ID+1); err != nil {
return 0, errorsutil.NewError(ErrStorage,
errors.Wrap(err, "updating last read log"))
func NewInitLedgerConfig(limitReadLogs int) *InitLedgerConfig {
return &InitLedgerConfig{
LimitReadLogs: limitReadLogs,
}
lastProcessedLogID := logs[len(logs)-1].ID

return lastProcessedLogID, nil
}

type InitLedger struct {
driver storage.Driver
monitor monitor.Monitor
metricsRegistry metrics.PerLedgerMetricsRegistry
func NewDefaultInitLedgerConfig() *InitLedgerConfig {
return &InitLedgerConfig{
LimitReadLogs: 10000,
}
}

func (iq InitLedger) initLedgers(ctx context.Context) error {
ledgers, err := iq.driver.GetSystemStore().ListLedgers(ctx)
func initLedgers(
ctx context.Context,
cfg *InitLedgerConfig,
driver storage.Driver,
monitor monitor.Monitor,
metricsRegistry metrics.PerLedgerMetricsRegistry,
) error {
ledgers, err := driver.GetSystemStore().ListLedgers(ctx)
if err != nil {
return err
}
Expand All @@ -69,7 +50,7 @@ func (iq InitLedger) initLedgers(ctx context.Context) error {
for _, ledger := range ledgers {
_ledger := ledger
eg.Go(func() error {
store, _, err := iq.driver.GetLedgerStore(ctxGroup, _ledger, false)
store, _, err := driver.GetLedgerStore(ctxGroup, _ledger, false)
if err != nil && !storage.IsNotFoundError(err) {
return err
}
Expand All @@ -84,10 +65,11 @@ func (iq InitLedger) initLedgers(ctx context.Context) error {

if _, err := initLedger(
ctxGroup,
cfg,
_ledger,
NewDefaultStore(store),
iq.monitor,
iq.metricsRegistry,
monitor,
metricsRegistry,
); err != nil {
return err
}
Expand All @@ -99,8 +81,61 @@ func (iq InitLedger) initLedgers(ctx context.Context) error {
return eg.Wait()
}

func NewInitLedgers(driver storage.Driver, monitor monitor.Monitor) *InitLedger {
func initLedger(
ctx context.Context,
cfg *InitLedgerConfig,
ledgerName string,
store Store,
monitor monitor.Monitor,
metricsRegistry metrics.PerLedgerMetricsRegistry,
) (uint64, error) {
if !store.IsInitialized() {
return 0, nil
}

lastReadLogID, err := store.GetNextLogID(ctx)
if err != nil && !storage.IsNotFoundError(err) {
return 0, errorsutil.NewError(ErrStorage,
errors.Wrap(err, "reading last log"))
}

lastProcessedLogID := uint64(0)
for {
logs, err := store.ReadLogsRange(ctx, lastReadLogID, lastReadLogID+uint64(cfg.LimitReadLogs))
if err != nil {
return 0, errorsutil.NewError(ErrStorage,
errors.Wrap(err, "reading logs since last ID"))
}

if len(logs) == 0 {
// No logs, nothing to do
return lastProcessedLogID, nil
}

if err := processLogs(ctx, ledgerName, store, monitor, logs...); err != nil {
return 0, errors.Wrap(err, "processing logs")
}

metricsRegistry.QueryProcessedLogs().Add(ctx, int64(len(logs)))

if err := store.UpdateNextLogID(ctx, logs[len(logs)-1].ID+1); err != nil {
return 0, errorsutil.NewError(ErrStorage,
errors.Wrap(err, "updating last read log"))
}
lastReadLogID = logs[len(logs)-1].ID + 1
lastProcessedLogID = logs[len(logs)-1].ID

if len(logs) < cfg.LimitReadLogs {
// Nothing to do anymore, no need to read more logs
return lastProcessedLogID, nil
}

}
}

func NewInitLedgers(cfg *InitLedgerConfig, driver storage.Driver, monitor monitor.Monitor) *InitLedger {
return &InitLedger{
cfg: cfg,
driver: driver,
monitor: monitor,
}
Expand Down
9 changes: 8 additions & 1 deletion pkg/ledger/query/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,18 @@ import (

func InitModule() fx.Option {
return fx.Options(
fx.Provide(NewDefaultInitLedgerConfig),
fx.Provide(NewInitLedgers),
fx.Invoke(func(lc fx.Lifecycle, initQuery *InitLedger) {
lc.Append(fx.Hook{
OnStart: func(ctx context.Context) error {
return initQuery.initLedgers(ctx)
return initLedgers(
ctx,
initQuery.cfg,
initQuery.driver,
initQuery.monitor,
initQuery.metricsRegistry,
)
},
})
}),
Expand Down
90 changes: 90 additions & 0 deletions pkg/ledger/query/module_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package query

import (
"context"
"math/big"
"testing"

"github.com/formancehq/ledger/pkg/core"
"github.com/formancehq/ledger/pkg/ledger/monitor"
"github.com/formancehq/ledger/pkg/opentelemetry/metrics"
"github.com/formancehq/stack/libs/go-libs/metadata"
"github.com/stretchr/testify/require"
)

func TestInitQuery(t *testing.T) {
t.Parallel()

now := core.Now()

tx0 := core.NewTransaction().WithPostings(
core.NewPosting("world", "bank", "USD/2", big.NewInt(100)),
)
tx1 := core.NewTransaction().WithPostings(
core.NewPosting("bank", "user:1", "USD/2", big.NewInt(10)),
)

appliedMetadataOnTX1 := metadata.Metadata{
"paymentID": "1234",
}
appliedMetadataOnAccount := metadata.Metadata{
"category": "gold",
}

log0 := core.NewTransactionLog(tx0, nil)
log0.ID = 0
log1 := core.NewTransactionLog(tx1, nil)
log1.ID = 1
log2 := core.NewSetMetadataLog(now, core.SetMetadataLogPayload{
TargetType: core.MetaTargetTypeTransaction,
TargetID: tx1.ID,
Metadata: appliedMetadataOnTX1,
})
log2.ID = 2
log3 := core.NewSetMetadataLog(now, core.SetMetadataLogPayload{
TargetType: core.MetaTargetTypeAccount,
TargetID: "bank",
Metadata: appliedMetadataOnAccount,
})
log3.ID = 3
log4 := core.NewSetMetadataLog(now, core.SetMetadataLogPayload{
TargetType: core.MetaTargetTypeAccount,
TargetID: "another:account",
Metadata: appliedMetadataOnAccount,
})
log4.ID = 4

logs := []core.Log{
log0,
log1,
log2,
log3,
log4,
}

ledgerStore := &mockStore{
accounts: map[string]*core.AccountWithVolumes{},
logs: logs,
}

nextLogID, err := ledgerStore.GetNextLogID(context.Background())
require.NoError(t, err)
require.Equal(t, uint64(0), nextLogID)

lastProcessedId, err := initLedger(
context.Background(),
&InitLedgerConfig{
LimitReadLogs: 2,
},
"default_test",
ledgerStore,
monitor.NewNoOpMonitor(),
metrics.NewNoOpMetricsRegistry(),
)
require.NoError(t, err)
require.Equal(t, uint64(4), lastProcessedId)

lastReadLogID, err := ledgerStore.GetNextLogID(context.Background())
require.NoError(t, err)
require.Equal(t, uint64(5), lastReadLogID)
}
2 changes: 1 addition & 1 deletion pkg/ledger/query/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ type Store interface {
UpdateNextLogID(ctx context.Context, u uint64) error
IsInitialized() bool
GetNextLogID(ctx context.Context) (uint64, error)
ReadLogsStartingFromID(ctx context.Context, id uint64) ([]core.Log, error)
ReadLogsRange(ctx context.Context, idMin, idMax uint64) ([]core.Log, error)
RunInTransaction(ctx context.Context, f func(ctx context.Context, tx Store) error) error
GetAccountWithVolumes(ctx context.Context, address string) (*core.AccountWithVolumes, error)
GetTransaction(ctx context.Context, id string) (*core.ExpandedTransaction, error)
Expand Down
11 changes: 10 additions & 1 deletion pkg/ledger/query/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (

type mockStore struct {
nextLogID uint64
logs []core.Log
accounts map[string]*core.AccountWithVolumes
transactions []*core.ExpandedTransaction
}
Expand Down Expand Up @@ -93,7 +94,15 @@ func (m *mockStore) GetNextLogID(ctx context.Context) (uint64, error) {
return m.nextLogID, nil
}

func (m *mockStore) ReadLogsStartingFromID(ctx context.Context, id uint64) ([]core.Log, error) {
func (m *mockStore) ReadLogsRange(ctx context.Context, idMin, idMax uint64) ([]core.Log, error) {
if idMax > uint64(len(m.logs)) {
idMax = uint64(len(m.logs))
}

if idMin < uint64(len(m.logs)) {
return m.logs[idMin:idMax], nil
}

return []core.Log{}, nil
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ type LedgerStore interface {

AppendLog(context.Context, *core.Log) error
GetNextLogID(ctx context.Context) (uint64, error)
ReadLogsStartingFromID(ctx context.Context, id uint64) ([]core.Log, error)
ReadLogsRange(ctx context.Context, idMin, idMax uint64) ([]core.Log, error)
UpdateNextLogID(ctx context.Context, id uint64) error
GetLogs(context.Context, LogsQuery) (*api.Cursor[core.Log], error)
GetLastLog(context.Context) (*core.Log, error)
Expand Down
Loading

0 comments on commit 46fe475

Please sign in to comment.