From 2ca1cbb390d27f16f3392e788e0c51da27054c70 Mon Sep 17 00:00:00 2001 From: Ragot Geoffrey Date: Tue, 28 Mar 2023 11:59:34 +0200 Subject: [PATCH] feat: centralize ledger lifecycle inside resolver. (#184) --- pkg/api/internal/testing.go | 7 +--- pkg/ledger/cache/manager.go | 39 ------------------ pkg/ledger/cache/module.go | 11 ----- pkg/ledger/ledger_test.go | 56 ++------------------------ pkg/ledger/main_test.go | 41 +++++++------------ pkg/ledger/module.go | 22 ++++++++++ pkg/ledger/resolver.go | 78 +++++++++++++++--------------------- pkg/ledger/runner/manager.go | 59 --------------------------- pkg/ledger/runner/module.go | 16 -------- 9 files changed, 73 insertions(+), 256 deletions(-) delete mode 100644 pkg/ledger/cache/manager.go delete mode 100644 pkg/ledger/cache/module.go create mode 100644 pkg/ledger/module.go delete mode 100644 pkg/ledger/runner/manager.go delete mode 100644 pkg/ledger/runner/module.go diff --git a/pkg/api/internal/testing.go b/pkg/api/internal/testing.go index 458ebd7d4..392e4fbbe 100644 --- a/pkg/api/internal/testing.go +++ b/pkg/api/internal/testing.go @@ -15,10 +15,8 @@ import ( "github.com/formancehq/ledger/pkg/api/routes" "github.com/formancehq/ledger/pkg/core" "github.com/formancehq/ledger/pkg/ledger" - "github.com/formancehq/ledger/pkg/ledger/cache" "github.com/formancehq/ledger/pkg/ledger/lock" "github.com/formancehq/ledger/pkg/ledger/query" - "github.com/formancehq/ledger/pkg/ledger/runner" "github.com/formancehq/ledger/pkg/ledgertesting" "github.com/formancehq/ledger/pkg/storage" sharedapi "github.com/formancehq/stack/libs/go-libs/api" @@ -208,15 +206,12 @@ func RunTest(t *testing.T, callback func(api chi.Router, storageDriver storage.D storageDriver := ledgertesting.StorageDriver(t) require.NoError(t, storageDriver.Initialize(context.Background())) - cacheManager := cache.NewManager(storageDriver) - lock := lock.NewInMemory() - runnerManager := runner.NewManager(storageDriver, lock, cacheManager, false) queryWorker := query.NewWorker(query.DefaultWorkerConfig, storageDriver, query.NewNoOpMonitor()) go func() { require.NoError(t, queryWorker.Run(context.Background())) }() - resolver := ledger.NewResolver(storageDriver, lock, cacheManager, runnerManager, queryWorker) + resolver := ledger.NewResolver(storageDriver, lock.NewInMemory(), queryWorker, false) router := routes.NewRouter(storageDriver, "latest", resolver, logging.FromContext(context.Background()), &health.HealthController{}) diff --git a/pkg/ledger/cache/manager.go b/pkg/ledger/cache/manager.go deleted file mode 100644 index 13dc453b0..000000000 --- a/pkg/ledger/cache/manager.go +++ /dev/null @@ -1,39 +0,0 @@ -package cache - -import ( - "context" - "sync" - - "github.com/formancehq/ledger/pkg/storage" -) - -type Manager struct { - mu sync.Mutex - storageDriver storage.Driver - // TODO(gfyrag): In a future release, we should be able to clear old cache from memory - ledgers map[string]*Cache -} - -func (m *Manager) ForLedger(ctx context.Context, ledger string) (*Cache, error) { - m.mu.Lock() - defer m.mu.Unlock() - - cache, ok := m.ledgers[ledger] - if !ok { - store, _, err := m.storageDriver.GetLedgerStore(ctx, ledger, true) - if err != nil { - return nil, err - } - cache = New(store) - m.ledgers[ledger] = cache - } - return cache, nil -} - -func NewManager(storageDriver storage.Driver) *Manager { - return &Manager{ - mu: sync.Mutex{}, - storageDriver: storageDriver, - ledgers: map[string]*Cache{}, - } -} diff --git a/pkg/ledger/cache/module.go b/pkg/ledger/cache/module.go deleted file mode 100644 index f61dc838b..000000000 --- a/pkg/ledger/cache/module.go +++ /dev/null @@ -1,11 +0,0 @@ -package cache - -import ( - "go.uber.org/fx" -) - -func Module() fx.Option { - return fx.Options( - fx.Provide(NewManager), - ) -} diff --git a/pkg/ledger/ledger_test.go b/pkg/ledger/ledger_test.go index 71039cece..3322bf67d 100644 --- a/pkg/ledger/ledger_test.go +++ b/pkg/ledger/ledger_test.go @@ -8,11 +8,6 @@ import ( "testing" "github.com/formancehq/ledger/pkg/core" - "github.com/formancehq/ledger/pkg/ledger/cache" - "github.com/formancehq/ledger/pkg/ledger/lock" - "github.com/formancehq/ledger/pkg/ledger/query" - "github.com/formancehq/ledger/pkg/ledger/runner" - "github.com/formancehq/ledger/pkg/ledgertesting" "github.com/google/uuid" "github.com/stretchr/testify/require" ) @@ -171,33 +166,12 @@ func TestVeryBigTransaction(t *testing.T) { } func BenchmarkSequentialWrites(b *testing.B) { - driver := ledgertesting.StorageDriver(b) - require.NoError(b, driver.Initialize(context.Background())) - ledgerName := uuid.NewString() - store, _, err := driver.GetLedgerStore(context.Background(), ledgerName, true) - require.NoError(b, err) - - _, err = store.Initialize(context.Background()) - require.NoError(b, err) - - cacheManager := cache.NewManager(driver) - cache, err := cacheManager.ForLedger(context.Background(), ledgerName) - require.NoError(b, err) - - locker := lock.NewInMemory() + resolver := newResolver(b) - runnerManager := runner.NewManager(driver, locker, cacheManager, false) - runner, err := runnerManager.ForLedger(context.Background(), ledgerName) + ledger, err := resolver.GetLedger(context.TODO(), ledgerName) require.NoError(b, err) - queryWorker := query.NewWorker(query.DefaultWorkerConfig, driver, query.NewNoOpMonitor()) - go func() { - require.NoError(b, queryWorker.Run(context.Background())) - }() - - ledger := New(store, cache, runner, locker, queryWorker) - b.ResetTimer() for i := 0; i < b.N; i++ { _, err := ledger.CreateTransaction(context.Background(), false, core.RunScript{ @@ -213,33 +187,11 @@ func BenchmarkSequentialWrites(b *testing.B) { } func BenchmarkParallelWrites(b *testing.B) { - driver := ledgertesting.StorageDriver(b) - require.NoError(b, driver.Initialize(context.Background())) + resolver := newResolver(b) - ledgerName := uuid.NewString() - store, _, err := driver.GetLedgerStore(context.Background(), ledgerName, true) + ledger, err := resolver.GetLedger(context.Background(), uuid.NewString()) require.NoError(b, err) - _, err = store.Initialize(context.Background()) - require.NoError(b, err) - - cacheManager := cache.NewManager(driver) - cache, err := cacheManager.ForLedger(context.Background(), ledgerName) - require.NoError(b, err) - - locker := lock.NewInMemory() - - runnerManager := runner.NewManager(driver, locker, cacheManager, false) - runner, err := runnerManager.ForLedger(context.Background(), ledgerName) - require.NoError(b, err) - - queryWorker := query.NewWorker(query.DefaultWorkerConfig, driver, query.NewNoOpMonitor()) - go func() { - require.NoError(b, queryWorker.Run(context.Background())) - }() - - ledger := New(store, cache, runner, locker, queryWorker) - b.ResetTimer() wg := sync.WaitGroup{} wg.Add(b.N) diff --git a/pkg/ledger/main_test.go b/pkg/ledger/main_test.go index 0b79d6996..9ba61308c 100644 --- a/pkg/ledger/main_test.go +++ b/pkg/ledger/main_test.go @@ -5,11 +5,8 @@ import ( "os" "testing" - "github.com/formancehq/ledger/pkg/ledger/cache" "github.com/formancehq/ledger/pkg/ledger/lock" - "github.com/formancehq/ledger/pkg/ledger/numscript" "github.com/formancehq/ledger/pkg/ledger/query" - "github.com/formancehq/ledger/pkg/ledger/runner" "github.com/formancehq/ledger/pkg/ledgertesting" "github.com/formancehq/stack/libs/go-libs/logging" "github.com/formancehq/stack/libs/go-libs/pgtesting" @@ -29,38 +26,28 @@ func TestMain(t *testing.M) { os.Exit(code) } -func runOnLedger(t interface { - pgtesting.TestingT - Parallel() -}, f func(l *Ledger)) { - - t.Parallel() - +func newResolver(t interface{ pgtesting.TestingT }) *Resolver { storageDriver := ledgertesting.StorageDriver(t) require.NoError(t, storageDriver.Initialize(context.Background())) - name := uuid.New() - store, _, err := storageDriver.GetLedgerStore(context.Background(), name, true) - require.NoError(t, err) - - _, err = store.Initialize(context.Background()) - require.NoError(t, err) - - cacheManager := cache.NewManager(storageDriver) - ledgerCache, err := cacheManager.ForLedger(context.Background(), name) - require.NoError(t, err) - - compiler := numscript.NewCompiler() - - runner, err := runner.New(store, lock.NewInMemory(), ledgerCache, compiler, false) - require.NoError(t, err) - queryWorker := query.NewWorker(query.DefaultWorkerConfig, storageDriver, query.NewNoOpMonitor()) go func() { require.NoError(t, queryWorker.Run(context.Background())) }() - l := New(store, ledgerCache, runner, lock.NewInMemory(), queryWorker) + return NewResolver(storageDriver, lock.NewInMemory(), queryWorker, false) +} + +func runOnLedger(t interface { + pgtesting.TestingT + Parallel() +}, f func(l *Ledger)) { + t.Parallel() + + ledgerName := uuid.New() + resolver := newResolver(t) + l, err := resolver.GetLedger(context.Background(), ledgerName) + require.NoError(t, err) defer l.Close(context.Background()) f(l) diff --git a/pkg/ledger/module.go b/pkg/ledger/module.go new file mode 100644 index 000000000..702e7b1db --- /dev/null +++ b/pkg/ledger/module.go @@ -0,0 +1,22 @@ +package ledger + +import ( + "github.com/formancehq/ledger/pkg/ledger/lock" + "github.com/formancehq/ledger/pkg/ledger/query" + "github.com/formancehq/ledger/pkg/storage" + "go.uber.org/fx" +) + +func Module(allowPastTimestamp bool) fx.Option { + return fx.Options( + lock.Module(), + fx.Provide(func( + storageDriver storage.Driver, + locker lock.Locker, + queryWorker *query.Worker, + ) *Resolver { + return NewResolver(storageDriver, locker, queryWorker, allowPastTimestamp) + }), + query.Module(), + ) +} diff --git a/pkg/ledger/resolver.go b/pkg/ledger/resolver.go index 43dcf1f77..3a29d0fac 100644 --- a/pkg/ledger/resolver.go +++ b/pkg/ledger/resolver.go @@ -6,82 +6,68 @@ import ( "github.com/formancehq/ledger/pkg/ledger/cache" "github.com/formancehq/ledger/pkg/ledger/lock" + "github.com/formancehq/ledger/pkg/ledger/numscript" "github.com/formancehq/ledger/pkg/ledger/query" "github.com/formancehq/ledger/pkg/ledger/runner" "github.com/formancehq/ledger/pkg/storage" "github.com/pkg/errors" - "go.uber.org/fx" ) type Resolver struct { - storageDriver storage.Driver - lock sync.RWMutex - initializedStores map[string]struct{} - locker lock.Locker - cacheManager *cache.Manager - runnerManager *runner.Manager - queryWorker *query.Worker + storageDriver storage.Driver + lock sync.RWMutex + locker lock.Locker + queryWorker *query.Worker + //TODO(gfyrag): add a routine to clean old ledger + ledgers map[string]*Ledger + compiler *numscript.Compiler + allowPastTimestamps bool } func NewResolver( storageDriver storage.Driver, locker lock.Locker, - cacheManager *cache.Manager, - runnerManager *runner.Manager, queryWorker *query.Worker, + allowPastTimestamps bool, ) *Resolver { return &Resolver{ - storageDriver: storageDriver, - cacheManager: cacheManager, - runnerManager: runnerManager, - initializedStores: map[string]struct{}{}, - locker: locker, - queryWorker: queryWorker, + storageDriver: storageDriver, + locker: locker, + queryWorker: queryWorker, + compiler: numscript.NewCompiler(), + ledgers: map[string]*Ledger{}, + allowPastTimestamps: allowPastTimestamps, } } func (r *Resolver) GetLedger(ctx context.Context, name string) (*Ledger, error) { - store, _, err := r.storageDriver.GetLedgerStore(ctx, name, true) - if err != nil { - return nil, errors.Wrap(err, "retrieving ledger store") - } r.lock.RLock() - _, ok := r.initializedStores[name] + ledger, ok := r.ledgers[name] r.lock.RUnlock() if !ok { r.lock.Lock() defer r.lock.Unlock() - if _, ok = r.initializedStores[name]; !ok { - _, err = store.Initialize(ctx) - if err != nil { - return nil, errors.Wrap(err, "initializing ledger store") + store, _, err := r.storageDriver.GetLedgerStore(ctx, name, true) + if err != nil { + return nil, errors.Wrap(err, "retrieving ledger store") + } + if !store.IsInitialized() { + if _, err := store.Initialize(ctx); err != nil { + return nil, err } - r.initializedStores[name] = struct{}{} } - } - cache, err := r.cacheManager.ForLedger(ctx, name) - if err != nil { - return nil, err - } + cache := cache.New(store) + runner, err := runner.New(store, r.locker, cache, r.compiler, r.allowPastTimestamps) + if err != nil { + return nil, err + } - runner, err := r.runnerManager.ForLedger(ctx, name) - if err != nil { - return nil, err + ledger = New(store, cache, runner, r.locker, r.queryWorker) + r.ledgers[name] = ledger } - return New(store, cache, runner, r.locker, r.queryWorker), nil -} - -func Module(allowPastTimestamp bool) fx.Option { - return fx.Options( - fx.Provide(NewResolver), - lock.Module(), - cache.Module(), - query.Module(), - // TODO: Maybe handle this by request ? - runner.Module(allowPastTimestamp), - ) + return ledger, nil } diff --git a/pkg/ledger/runner/manager.go b/pkg/ledger/runner/manager.go deleted file mode 100644 index 85d0e5143..000000000 --- a/pkg/ledger/runner/manager.go +++ /dev/null @@ -1,59 +0,0 @@ -package runner - -import ( - "context" - "sync" - - "github.com/formancehq/ledger/pkg/ledger/cache" - "github.com/formancehq/ledger/pkg/ledger/lock" - "github.com/formancehq/ledger/pkg/ledger/numscript" - "github.com/formancehq/ledger/pkg/storage" -) - -// TODO(gfyrag): In a future release, we should be able to clear old runners from memory -type Manager struct { - mu sync.Mutex - storageDriver storage.Driver - lock lock.Locker - allowPastTimestamps bool - cacheManager *cache.Manager - compiler *numscript.Compiler - // ledgers store the script runner for each ledger - ledgers map[string]*Runner -} - -func (m *Manager) ForLedger(ctx context.Context, ledger string) (*Runner, error) { - m.mu.Lock() - defer m.mu.Unlock() - - runner, ok := m.ledgers[ledger] - if !ok { - store, _, err := m.storageDriver.GetLedgerStore(ctx, ledger, true) - if err != nil { - return nil, err - } - - cache, err := m.cacheManager.ForLedger(ctx, ledger) - if err != nil { - return nil, err - } - - runner, err = New(store, m.lock, cache, m.compiler, m.allowPastTimestamps) - if err != nil { - return nil, err - } - m.ledgers[ledger] = runner - } - return runner, nil -} - -func NewManager(storageDriver storage.Driver, lock lock.Locker, cacheManager *cache.Manager, allowPastTimestamps bool) *Manager { - return &Manager{ - storageDriver: storageDriver, - lock: lock, - allowPastTimestamps: allowPastTimestamps, - cacheManager: cacheManager, - ledgers: map[string]*Runner{}, - compiler: numscript.NewCompiler(), - } -} diff --git a/pkg/ledger/runner/module.go b/pkg/ledger/runner/module.go deleted file mode 100644 index 6157bf5ad..000000000 --- a/pkg/ledger/runner/module.go +++ /dev/null @@ -1,16 +0,0 @@ -package runner - -import ( - "github.com/formancehq/ledger/pkg/ledger/cache" - "github.com/formancehq/ledger/pkg/ledger/lock" - "github.com/formancehq/ledger/pkg/storage" - "go.uber.org/fx" -) - -func Module(allowPastTimestamp bool) fx.Option { - return fx.Options( - fx.Provide(func(storageDriver storage.Driver, lock lock.Locker, cacheManager *cache.Manager) *Manager { - return NewManager(storageDriver, lock, cacheManager, allowPastTimestamp) - }), - ) -}