Skip to content

Commit

Permalink
feat: centralize ledger lifecycle inside resolver. (#184)
Browse files Browse the repository at this point in the history
  • Loading branch information
gfyrag authored and flemzord committed May 12, 2023
1 parent 2f444ea commit 2ca1cbb
Show file tree
Hide file tree
Showing 9 changed files with 73 additions and 256 deletions.
7 changes: 1 addition & 6 deletions pkg/api/internal/testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,8 @@ import (
"github.com/formancehq/ledger/pkg/api/routes"
"github.com/formancehq/ledger/pkg/core"
"github.com/formancehq/ledger/pkg/ledger"
"github.com/formancehq/ledger/pkg/ledger/cache"
"github.com/formancehq/ledger/pkg/ledger/lock"
"github.com/formancehq/ledger/pkg/ledger/query"
"github.com/formancehq/ledger/pkg/ledger/runner"
"github.com/formancehq/ledger/pkg/ledgertesting"
"github.com/formancehq/ledger/pkg/storage"
sharedapi "github.com/formancehq/stack/libs/go-libs/api"
Expand Down Expand Up @@ -208,15 +206,12 @@ func RunTest(t *testing.T, callback func(api chi.Router, storageDriver storage.D
storageDriver := ledgertesting.StorageDriver(t)
require.NoError(t, storageDriver.Initialize(context.Background()))

cacheManager := cache.NewManager(storageDriver)
lock := lock.NewInMemory()
runnerManager := runner.NewManager(storageDriver, lock, cacheManager, false)
queryWorker := query.NewWorker(query.DefaultWorkerConfig, storageDriver, query.NewNoOpMonitor())
go func() {
require.NoError(t, queryWorker.Run(context.Background()))
}()
resolver := ledger.NewResolver(storageDriver, lock, cacheManager, runnerManager, queryWorker)

resolver := ledger.NewResolver(storageDriver, lock.NewInMemory(), queryWorker, false)
router := routes.NewRouter(storageDriver, "latest", resolver,
logging.FromContext(context.Background()), &health.HealthController{})

Expand Down
39 changes: 0 additions & 39 deletions pkg/ledger/cache/manager.go

This file was deleted.

11 changes: 0 additions & 11 deletions pkg/ledger/cache/module.go

This file was deleted.

56 changes: 4 additions & 52 deletions pkg/ledger/ledger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,6 @@ import (
"testing"

"github.com/formancehq/ledger/pkg/core"
"github.com/formancehq/ledger/pkg/ledger/cache"
"github.com/formancehq/ledger/pkg/ledger/lock"
"github.com/formancehq/ledger/pkg/ledger/query"
"github.com/formancehq/ledger/pkg/ledger/runner"
"github.com/formancehq/ledger/pkg/ledgertesting"
"github.com/google/uuid"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -171,33 +166,12 @@ func TestVeryBigTransaction(t *testing.T) {
}

func BenchmarkSequentialWrites(b *testing.B) {
driver := ledgertesting.StorageDriver(b)
require.NoError(b, driver.Initialize(context.Background()))

ledgerName := uuid.NewString()
store, _, err := driver.GetLedgerStore(context.Background(), ledgerName, true)
require.NoError(b, err)

_, err = store.Initialize(context.Background())
require.NoError(b, err)

cacheManager := cache.NewManager(driver)
cache, err := cacheManager.ForLedger(context.Background(), ledgerName)
require.NoError(b, err)

locker := lock.NewInMemory()
resolver := newResolver(b)

runnerManager := runner.NewManager(driver, locker, cacheManager, false)
runner, err := runnerManager.ForLedger(context.Background(), ledgerName)
ledger, err := resolver.GetLedger(context.TODO(), ledgerName)
require.NoError(b, err)

queryWorker := query.NewWorker(query.DefaultWorkerConfig, driver, query.NewNoOpMonitor())
go func() {
require.NoError(b, queryWorker.Run(context.Background()))
}()

ledger := New(store, cache, runner, locker, queryWorker)

b.ResetTimer()
for i := 0; i < b.N; i++ {
_, err := ledger.CreateTransaction(context.Background(), false, core.RunScript{
Expand All @@ -213,33 +187,11 @@ func BenchmarkSequentialWrites(b *testing.B) {
}

func BenchmarkParallelWrites(b *testing.B) {
driver := ledgertesting.StorageDriver(b)
require.NoError(b, driver.Initialize(context.Background()))
resolver := newResolver(b)

ledgerName := uuid.NewString()
store, _, err := driver.GetLedgerStore(context.Background(), ledgerName, true)
ledger, err := resolver.GetLedger(context.Background(), uuid.NewString())
require.NoError(b, err)

_, err = store.Initialize(context.Background())
require.NoError(b, err)

cacheManager := cache.NewManager(driver)
cache, err := cacheManager.ForLedger(context.Background(), ledgerName)
require.NoError(b, err)

locker := lock.NewInMemory()

runnerManager := runner.NewManager(driver, locker, cacheManager, false)
runner, err := runnerManager.ForLedger(context.Background(), ledgerName)
require.NoError(b, err)

queryWorker := query.NewWorker(query.DefaultWorkerConfig, driver, query.NewNoOpMonitor())
go func() {
require.NoError(b, queryWorker.Run(context.Background()))
}()

ledger := New(store, cache, runner, locker, queryWorker)

b.ResetTimer()
wg := sync.WaitGroup{}
wg.Add(b.N)
Expand Down
41 changes: 14 additions & 27 deletions pkg/ledger/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,8 @@ import (
"os"
"testing"

"github.com/formancehq/ledger/pkg/ledger/cache"
"github.com/formancehq/ledger/pkg/ledger/lock"
"github.com/formancehq/ledger/pkg/ledger/numscript"
"github.com/formancehq/ledger/pkg/ledger/query"
"github.com/formancehq/ledger/pkg/ledger/runner"
"github.com/formancehq/ledger/pkg/ledgertesting"
"github.com/formancehq/stack/libs/go-libs/logging"
"github.com/formancehq/stack/libs/go-libs/pgtesting"
Expand All @@ -29,38 +26,28 @@ func TestMain(t *testing.M) {
os.Exit(code)
}

func runOnLedger(t interface {
pgtesting.TestingT
Parallel()
}, f func(l *Ledger)) {

t.Parallel()

func newResolver(t interface{ pgtesting.TestingT }) *Resolver {
storageDriver := ledgertesting.StorageDriver(t)
require.NoError(t, storageDriver.Initialize(context.Background()))

name := uuid.New()
store, _, err := storageDriver.GetLedgerStore(context.Background(), name, true)
require.NoError(t, err)

_, err = store.Initialize(context.Background())
require.NoError(t, err)

cacheManager := cache.NewManager(storageDriver)
ledgerCache, err := cacheManager.ForLedger(context.Background(), name)
require.NoError(t, err)

compiler := numscript.NewCompiler()

runner, err := runner.New(store, lock.NewInMemory(), ledgerCache, compiler, false)
require.NoError(t, err)

queryWorker := query.NewWorker(query.DefaultWorkerConfig, storageDriver, query.NewNoOpMonitor())
go func() {
require.NoError(t, queryWorker.Run(context.Background()))
}()

l := New(store, ledgerCache, runner, lock.NewInMemory(), queryWorker)
return NewResolver(storageDriver, lock.NewInMemory(), queryWorker, false)
}

func runOnLedger(t interface {
pgtesting.TestingT
Parallel()
}, f func(l *Ledger)) {
t.Parallel()

ledgerName := uuid.New()
resolver := newResolver(t)
l, err := resolver.GetLedger(context.Background(), ledgerName)
require.NoError(t, err)
defer l.Close(context.Background())

f(l)
Expand Down
22 changes: 22 additions & 0 deletions pkg/ledger/module.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package ledger

import (
"github.com/formancehq/ledger/pkg/ledger/lock"
"github.com/formancehq/ledger/pkg/ledger/query"
"github.com/formancehq/ledger/pkg/storage"
"go.uber.org/fx"
)

func Module(allowPastTimestamp bool) fx.Option {
return fx.Options(
lock.Module(),
fx.Provide(func(
storageDriver storage.Driver,
locker lock.Locker,
queryWorker *query.Worker,
) *Resolver {
return NewResolver(storageDriver, locker, queryWorker, allowPastTimestamp)
}),
query.Module(),
)
}
78 changes: 32 additions & 46 deletions pkg/ledger/resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,82 +6,68 @@ import (

"github.com/formancehq/ledger/pkg/ledger/cache"
"github.com/formancehq/ledger/pkg/ledger/lock"
"github.com/formancehq/ledger/pkg/ledger/numscript"
"github.com/formancehq/ledger/pkg/ledger/query"
"github.com/formancehq/ledger/pkg/ledger/runner"
"github.com/formancehq/ledger/pkg/storage"
"github.com/pkg/errors"
"go.uber.org/fx"
)

type Resolver struct {
storageDriver storage.Driver
lock sync.RWMutex
initializedStores map[string]struct{}
locker lock.Locker
cacheManager *cache.Manager
runnerManager *runner.Manager
queryWorker *query.Worker
storageDriver storage.Driver
lock sync.RWMutex
locker lock.Locker
queryWorker *query.Worker
//TODO(gfyrag): add a routine to clean old ledger
ledgers map[string]*Ledger
compiler *numscript.Compiler
allowPastTimestamps bool
}

func NewResolver(
storageDriver storage.Driver,
locker lock.Locker,
cacheManager *cache.Manager,
runnerManager *runner.Manager,
queryWorker *query.Worker,
allowPastTimestamps bool,
) *Resolver {
return &Resolver{
storageDriver: storageDriver,
cacheManager: cacheManager,
runnerManager: runnerManager,
initializedStores: map[string]struct{}{},
locker: locker,
queryWorker: queryWorker,
storageDriver: storageDriver,
locker: locker,
queryWorker: queryWorker,
compiler: numscript.NewCompiler(),
ledgers: map[string]*Ledger{},
allowPastTimestamps: allowPastTimestamps,
}
}

func (r *Resolver) GetLedger(ctx context.Context, name string) (*Ledger, error) {
store, _, err := r.storageDriver.GetLedgerStore(ctx, name, true)
if err != nil {
return nil, errors.Wrap(err, "retrieving ledger store")
}

r.lock.RLock()
_, ok := r.initializedStores[name]
ledger, ok := r.ledgers[name]
r.lock.RUnlock()
if !ok {
r.lock.Lock()
defer r.lock.Unlock()

if _, ok = r.initializedStores[name]; !ok {
_, err = store.Initialize(ctx)
if err != nil {
return nil, errors.Wrap(err, "initializing ledger store")
store, _, err := r.storageDriver.GetLedgerStore(ctx, name, true)
if err != nil {
return nil, errors.Wrap(err, "retrieving ledger store")
}
if !store.IsInitialized() {
if _, err := store.Initialize(ctx); err != nil {
return nil, err
}
r.initializedStores[name] = struct{}{}
}
}

cache, err := r.cacheManager.ForLedger(ctx, name)
if err != nil {
return nil, err
}
cache := cache.New(store)
runner, err := runner.New(store, r.locker, cache, r.compiler, r.allowPastTimestamps)
if err != nil {
return nil, err
}

runner, err := r.runnerManager.ForLedger(ctx, name)
if err != nil {
return nil, err
ledger = New(store, cache, runner, r.locker, r.queryWorker)
r.ledgers[name] = ledger
}

return New(store, cache, runner, r.locker, r.queryWorker), nil
}

func Module(allowPastTimestamp bool) fx.Option {
return fx.Options(
fx.Provide(NewResolver),
lock.Module(),
cache.Module(),
query.Module(),
// TODO: Maybe handle this by request ?
runner.Module(allowPastTimestamp),
)
return ledger, nil
}
Loading

0 comments on commit 2ca1cbb

Please sign in to comment.