Skip to content

Commit

Permalink
feat: make numscript cache configurable
Browse files Browse the repository at this point in the history
  • Loading branch information
gfyrag authored and flemzord committed May 12, 2023
1 parent 535c96a commit 85c4822
Show file tree
Hide file tree
Showing 11 changed files with 50 additions and 22 deletions.
5 changes: 4 additions & 1 deletion cmd/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,13 @@ func resolveOptions(output io.Writer, userOptions ...fx.Option) []fx.Option {
internal.NewAnalyticsModule(v, Version),
ledger.Module(ledger.Configuration{
AllowPastTimestamp: v.GetString(commitPolicyFlag) == "allow-past-timestamps",
Cache: ledger.CacheConfiguration{
AccountsCache: ledger.AccountsCacheConfiguration{
EvictionRetainDelay: v.GetDuration(cacheEvictionRetainDelay),
EvictionPeriod: v.GetDuration(cacheEvictionPeriodFlag),
},
NumscriptCache: ledger.NumscriptCacheConfiguration{
MaxCount: v.GetInt(numscriptCacheMaxCount),
},
Query: ledger.QueryConfiguration{
LimitReadLogs: v.GetInt(queryLimitReadLogsFlag),
},
Expand Down
2 changes: 2 additions & 0 deletions cmd/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ const (
cacheEvictionRetainDelay = "cache-eviction-retain-delay"
queryLimitReadLogsFlag = "query-limit-read-logs"
ballastSizeInBytesFlag = "ballast-size"
numscriptCacheMaxCount = "numscript-cache-max-count"
)

func NewServe() *cobra.Command {
Expand All @@ -35,5 +36,6 @@ func NewServe() *cobra.Command {
cmd.Flags().Duration(cacheEvictionRetainDelay, cache.DefaultRetainDelay, "Cache retain delay")
cmd.Flags().Int(queryLimitReadLogsFlag, 10000, "Query limit read logs")
cmd.Flags().Uint(ballastSizeInBytesFlag, 0, "Ballast size in bytes, default to 0")
cmd.Flags().Int(numscriptCacheMaxCount, 1024, "Numscript cache max count")
return cmd
}
3 changes: 2 additions & 1 deletion pkg/ledger/command/commander.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ func New(
locker Locker,
logIngester LogIngester,
state *State,
compiler *Compiler,
metricsRegistry metrics.PerLedgerMetricsRegistry,
) *Commander {
return &Commander{
Expand All @@ -70,7 +71,7 @@ func New(
logIngester: logIngester,
metricsRegistry: metricsRegistry,
state: state,
compiler: NewCompiler(),
compiler: compiler,
}
}

Expand Down
11 changes: 7 additions & 4 deletions pkg/ledger/command/commander_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,8 @@ func TestCreateTransaction(t *testing.T) {
store := newMockStore()
cache := newMockCache()

ledger := New(store, cache, NoOpLocker, NoOpIngester, Load(store, false), nil)
ledger := New(store, cache, NoOpLocker, NoOpIngester,
LoadState(store, false), NewCompiler(1024), nil)

if tc.setup != nil {
tc.setup(t, store)
Expand Down Expand Up @@ -366,7 +367,8 @@ func TestRevert(t *testing.T) {
},
}

ledger := New(store, cache, NoOpLocker, NoOpIngester, Load(store, false), nil)
ledger := New(store, cache, NoOpLocker, NoOpIngester,
LoadState(store, false), NewCompiler(1024), nil)
_, err = ledger.RevertTransaction(context.Background(), Parameters{}, txID)
require.NoError(t, err)
}
Expand All @@ -393,7 +395,8 @@ func TestRevertWithAlreadyReverted(t *testing.T) {
},
}

ledger := New(store, cache, NoOpLocker, NoOpIngester, Load(store, false), nil)
ledger := New(store, cache, NoOpLocker, NoOpIngester,
LoadState(store, false), NewCompiler(1024), nil)

_, err = ledger.RevertTransaction(context.Background(), Parameters{}, 0)
require.True(t, errors.Is(err, ErrAlreadyReverted))
Expand Down Expand Up @@ -431,7 +434,7 @@ func TestRevertWithRevertOccurring(t *testing.T) {
ingestedLog <- log
<-log.Ingested
return nil
}), Load(store, false), nil)
}), LoadState(store, false), NewCompiler(1024), nil)
go func() {
_, err := ledger.RevertTransaction(context.Background(), Parameters{}, uint64(0))
require.NoError(t, err)
Expand Down
6 changes: 4 additions & 2 deletions pkg/ledger/command/compiler.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,10 @@ func (c *Compiler) Compile(ctx context.Context, script string) (*program.Program
return program, nil
}

func NewCompiler() *Compiler {
func NewCompiler(maxCacheCount int) *Compiler {
return &Compiler{
cache: gcache.New(1024).LFU().Build(), // TODO(gfyrag): Make configurable
cache: gcache.New(maxCacheCount).
LFU().
Build(),
}
}
2 changes: 1 addition & 1 deletion pkg/ledger/command/compiler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ func TestCompiler(t *testing.T) {
destination = @bank
)`

compiler := NewCompiler()
compiler := NewCompiler(1024)
p1, err := compiler.Compile(context.Background(), script)
require.NoError(t, err)

Expand Down
2 changes: 1 addition & 1 deletion pkg/ledger/command/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func (s *State) GetNextTXID() uint64 {
return uint64(s.lastTXID.Add(1))
}

func Load(store Store, allowPastTimestamps bool) *State {
func LoadState(store Store, allowPastTimestamps bool) *State {
log, err := store.ReadLastLogWithType(context.Background(), core.NewTransactionLogType, core.RevertedTransactionLogType)
if err != nil && !storage.IsNotFoundError(err) {
panic(err)
Expand Down
10 changes: 5 additions & 5 deletions pkg/ledger/command/state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
)

func TestState(t *testing.T) {
state := Load(AlwaysEmptyStore, false)
state := LoadState(AlwaysEmptyStore, false)
reserve, _, err := state.Reserve(context.Background(), ReserveRequest{
Timestamp: core.Now(),
})
Expand All @@ -20,7 +20,7 @@ func TestState(t *testing.T) {
}

func TestStateInsertInPastWithNotAllowedPastTimestamp(t *testing.T) {
state := Load(AlwaysEmptyStore, false)
state := LoadState(AlwaysEmptyStore, false)
now := core.Now()
reserve1, _, err := state.Reserve(context.Background(), ReserveRequest{
Timestamp: now,
Expand All @@ -36,7 +36,7 @@ func TestStateInsertInPastWithNotAllowedPastTimestamp(t *testing.T) {
}

func TestStateInsertInPastWithAllowPastTimestamps(t *testing.T) {
state := Load(AlwaysEmptyStore, true)
state := LoadState(AlwaysEmptyStore, true)
now := core.Now()
reserve1, _, err := state.Reserve(context.Background(), ReserveRequest{
Timestamp: now,
Expand All @@ -52,7 +52,7 @@ func TestStateInsertInPastWithAllowPastTimestamps(t *testing.T) {
}

func TestStateWithError(t *testing.T) {
state := Load(AlwaysEmptyStore, false)
state := LoadState(AlwaysEmptyStore, false)
now := core.Now()

_, _, err := state.Reserve(context.Background(), ReserveRequest{
Expand All @@ -68,7 +68,7 @@ func TestStateWithError(t *testing.T) {
}

func BenchmarkState(b *testing.B) {
state := Load(AlwaysEmptyStore, false)
state := LoadState(AlwaysEmptyStore, false)
b.ResetTimer()
now := core.Now()
eg := errgroup.Group{}
Expand Down
3 changes: 2 additions & 1 deletion pkg/ledger/ledger.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,11 @@ func New(
locker *command.DefaultLocker,
queryWorker *query.Worker,
state *command.State,
compiler *command.Compiler,
metricsRegistry metrics.PerLedgerMetricsRegistry,
) *Ledger {
return &Ledger{
Commander: command.New(store, cache, locker, queryWorker, state, metricsRegistry),
Commander: command.New(store, cache, locker, queryWorker, state, compiler, metricsRegistry),
store: store,
queryWorker: queryWorker,
locker: locker,
Expand Down
17 changes: 13 additions & 4 deletions pkg/ledger/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,25 +3,31 @@ package ledger
import (
"time"

"github.com/formancehq/ledger/pkg/ledger/command"
"github.com/formancehq/ledger/pkg/ledger/monitor"
"github.com/formancehq/ledger/pkg/ledger/query"
"github.com/formancehq/ledger/pkg/opentelemetry/metrics"
"github.com/formancehq/ledger/pkg/storage"
"go.uber.org/fx"
)

type CacheConfiguration struct {
type AccountsCacheConfiguration struct {
EvictionRetainDelay time.Duration
EvictionPeriod time.Duration
}

type NumscriptCacheConfiguration struct {
MaxCount int
}

type QueryConfiguration struct {
LimitReadLogs int
}

type Configuration struct {
AllowPastTimestamp bool
Cache CacheConfiguration
AccountsCache AccountsCacheConfiguration
NumscriptCache NumscriptCacheConfiguration
Query QueryConfiguration
}

Expand All @@ -35,12 +41,15 @@ func Module(configuration Configuration) fx.Option {
options := []option{
WithMonitor(monitor),
WithMetricsRegistry(metricsRegistry),
WithCacheEvictionPeriod(configuration.Cache.EvictionPeriod),
WithCacheEvictionRetainDelay(configuration.Cache.EvictionRetainDelay),
WithCacheEvictionPeriod(configuration.AccountsCache.EvictionPeriod),
WithCacheEvictionRetainDelay(configuration.AccountsCache.EvictionRetainDelay),
}
if configuration.AllowPastTimestamp {
options = append(options, WithAllowPastTimestamps())
}
if configuration.NumscriptCache.MaxCount != 0 {
options = append(options, WithCompiler(command.NewCompiler(configuration.NumscriptCache.MaxCount)))
}
return NewResolver(storageDriver, options...)
}),
fx.Provide(fx.Annotate(monitor.NewNoOpMonitor, fx.As(new(monitor.Monitor)))),
Expand Down
11 changes: 9 additions & 2 deletions pkg/ledger/resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,16 @@ func WithCacheEvictionPeriod(t time.Duration) option {
}
}

func WithCompiler(compiler *command.Compiler) option {
return func(r *Resolver) {
r.compiler = compiler
}
}

var defaultOptions = []option{
WithMetricsRegistry(metrics.NewNoOpMetricsRegistry()),
WithMonitor(monitor.NewNoOpMonitor()),
WithCompiler(command.NewCompiler(1024)),
}

type Resolver struct {
Expand All @@ -67,7 +74,6 @@ type Resolver struct {
func NewResolver(storageDriver storage.Driver, options ...option) *Resolver {
r := &Resolver{
storageDriver: storageDriver,
compiler: command.NewCompiler(),
ledgers: map[string]*Ledger{},
}
for _, opt := range append(defaultOptions, options...) {
Expand Down Expand Up @@ -131,7 +137,8 @@ func (r *Resolver) GetLedger(ctx context.Context, name string) (*Ledger, error)
queryWorker := query.NewWorker(query.DefaultWorkerConfig, query.NewDefaultStore(store), name, r.monitor, metricsRegistry)
runOrPanic(queryWorker.Run)

ledger = New(store, cache, locker, queryWorker, command.Load(store, r.allowPastTimestamps), metricsRegistry)
ledger = New(store, cache, locker, queryWorker,
command.LoadState(store, r.allowPastTimestamps), r.compiler, metricsRegistry)
r.ledgers[name] = ledger
r.metricsRegistry.ActiveLedgers().Add(ctx, +1)
}
Expand Down

0 comments on commit 85c4822

Please sign in to comment.