Skip to content

Commit

Permalink
feat: clean db usage from driver
Browse files Browse the repository at this point in the history
  • Loading branch information
gfyrag committed Nov 22, 2024
1 parent 2a30286 commit 821ff22
Show file tree
Hide file tree
Showing 11 changed files with 108 additions and 47 deletions.
18 changes: 10 additions & 8 deletions cmd/buckets_upgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@ import (
"github.com/formancehq/go-libs/v2/service"
"github.com/formancehq/ledger/internal/storage/bucket"
"github.com/formancehq/ledger/internal/storage/driver"
"github.com/formancehq/ledger/internal/storage/ledger"
systemstore "github.com/formancehq/ledger/internal/storage/system"
"github.com/spf13/cobra"
"github.com/uptrace/bun"
)

func NewBucketUpgrade() *cobra.Command {
Expand All @@ -19,12 +21,12 @@ func NewBucketUpgrade() *cobra.Command {
logger := logging.NewDefaultLogger(cmd.OutOrStdout(), service.IsDebug(cmd), false, false)
cmd.SetContext(logging.ContextWithLogger(cmd.Context(), logger))

driver, err := getDriver(cmd)
driver, db, err := getDriver(cmd)
if err != nil {
return err
}
defer func() {
_ = driver.GetDB().Close()
_ = db.Close()
}()

if args[0] == "*" {
Expand All @@ -41,26 +43,26 @@ func NewBucketUpgrade() *cobra.Command {
return cmd
}

func getDriver(cmd *cobra.Command) (*driver.Driver, error) {
func getDriver(cmd *cobra.Command) (*driver.Driver, *bun.DB, error) {

connectionOptions, err := bunconnect.ConnectionOptionsFromFlags(cmd)
if err != nil {
return nil, err
return nil, nil, err
}

db, err := bunconnect.OpenSQLDB(cmd.Context(), *connectionOptions)
if err != nil {
return nil, err
return nil, nil, err
}

driver := driver.New(
db,
ledger.NewFactory(db),
systemstore.New(db),
bucket.NewDefaultFactory(db),
)
if err := driver.Initialize(cmd.Context()); err != nil {
return nil, err
return nil, nil, err
}

return driver, nil
return driver, db, nil
}
4 changes: 2 additions & 2 deletions cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,12 @@ func NewRootCommand() *cobra.Command {
root.AddCommand(version)
root.AddCommand(bunmigrate.NewDefaultCommand(func(cmd *cobra.Command, _ []string, _ *bun.DB) error {
// todo: use provided db ...
driver, err := getDriver(cmd)
driver, db, err := getDriver(cmd)
if err != nil {
return err
}
defer func() {
_ = driver.GetDB().Close()
_ = db.Close()
}()

return driver.UpgradeAllBuckets(cmd.Context(), make(chan struct{}))
Expand Down
5 changes: 5 additions & 0 deletions internal/storage/bucket/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ type Bucket interface {

type Factory interface {
Create(name string) Bucket
GetMigrator(b string) *migrations.Migrator
}

type DefaultFactory struct {
Expand All @@ -29,6 +30,10 @@ func (f *DefaultFactory) Create(name string) Bucket {
return NewDefault(f.db, f.tracer, name)
}

func (f *DefaultFactory) GetMigrator(b string) *migrations.Migrator {
return GetMigrator(f.db, b)
}

func NewDefaultFactory(db *bun.DB, options ...DefaultFactoryOption) *DefaultFactory {
ret := &DefaultFactory{
db: db,
Expand Down
47 changes: 18 additions & 29 deletions internal/storage/driver/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,14 @@ import (
ledgercontroller "github.com/formancehq/ledger/internal/controller/ledger"

"github.com/formancehq/go-libs/v2/bun/bunpaginate"
"github.com/formancehq/go-libs/v2/logging"
ledger "github.com/formancehq/ledger/internal"
"github.com/formancehq/ledger/internal/storage/bucket"
ledgerstore "github.com/formancehq/ledger/internal/storage/ledger"
"github.com/uptrace/bun"

"github.com/formancehq/go-libs/v2/logging"
)

type Driver struct {
db *bun.DB
ledgerStoreFactory ledgerstore.Factory
systemStore systemstore.Store
bucketFactory bucket.Factory
tracer trace.Tracer
Expand Down Expand Up @@ -62,13 +60,7 @@ func (d *Driver) CreateLedger(ctx context.Context, l *ledger.Ledger) (*ledgersto
return nil, fmt.Errorf("adding ledger to bucket: %w", err)
}

return ledgerstore.New(
d.db,
b,
*l,
ledgerstore.WithMeter(d.meter),
ledgerstore.WithTracer(d.tracer),
), nil
return d.ledgerStoreFactory.Create(b, *l), nil
}

func (d *Driver) OpenLedger(ctx context.Context, name string) (*ledgerstore.Store, *ledger.Ledger, error) {
Expand All @@ -77,13 +69,9 @@ func (d *Driver) OpenLedger(ctx context.Context, name string) (*ledgerstore.Stor
return nil, nil, err
}

return ledgerstore.New(
d.db,
d.bucketFactory.Create(ret.Bucket),
*ret,
ledgerstore.WithMeter(d.meter),
ledgerstore.WithTracer(d.tracer),
), ret, nil
store := d.ledgerStoreFactory.Create(d.bucketFactory.Create(ret.Bucket), *ret)

return store, ret, err
}

func (d *Driver) Initialize(ctx context.Context) error {
Expand All @@ -93,7 +81,7 @@ func (d *Driver) Initialize(ctx context.Context) error {
return fmt.Errorf("detecting rollbacks: %w", err)
}

err = systemstore.Migrate(ctx, d.db, migrations.WithLockRetryInterval(d.migratorLockRetryInterval))
err = d.systemStore.Migrate(ctx, migrations.WithLockRetryInterval(d.migratorLockRetryInterval))
if err != nil {
constraintsFailed := postgres.ErrConstraintsFailed{}
if errors.As(err, &constraintsFailed) &&
Expand All @@ -112,7 +100,7 @@ func (d *Driver) Initialize(ctx context.Context) error {
func (d *Driver) detectRollbacks(ctx context.Context) error {

logging.FromContext(ctx).Debugf("Checking for downgrades on system schema")
if err := detectDowngrades(systemstore.GetMigrator(d.db), ctx); err != nil {
if err := detectDowngrades(d.systemStore.GetMigrator(), ctx); err != nil {
return fmt.Errorf("detecting rollbacks of system schema: %w", err)
}

Expand All @@ -126,7 +114,7 @@ func (d *Driver) detectRollbacks(ctx context.Context) error {

for _, b := range buckets {
logging.FromContext(ctx).Debugf("Checking for downgrades on bucket '%s'", b)
if err := detectDowngrades(bucket.GetMigrator(d.db, b), ctx); err != nil {
if err := detectDowngrades(d.bucketFactory.GetMigrator(b), ctx); err != nil {
return fmt.Errorf("detecting rollbacks on bucket '%s': %w", b, err)
}
}
Expand Down Expand Up @@ -210,15 +198,16 @@ func (d *Driver) UpgradeAllBuckets(ctx context.Context, minimalVersionReached ch
return grp.Wait()
}

func (d *Driver) GetDB() *bun.DB {
return d.db
}

func New(db *bun.DB, systemStore systemstore.Store, bucketFactory bucket.Factory, opts ...Option) *Driver {
func New(
ledgerStoreFactory ledgerstore.Factory,
systemStore systemstore.Store,
bucketFactory bucket.Factory,
opts ...Option,
) *Driver {
ret := &Driver{
db: db,
bucketFactory: bucketFactory,
systemStore: systemStore,
ledgerStoreFactory: ledgerStoreFactory,
bucketFactory: bucketFactory,
systemStore: systemStore,
}
for _, opt := range append(defaultOptions, opts...) {
opt(ret)
Expand Down
3 changes: 2 additions & 1 deletion internal/storage/driver/driver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
ledgercontroller "github.com/formancehq/ledger/internal/controller/ledger"
"github.com/formancehq/ledger/internal/storage/bucket"
"github.com/formancehq/ledger/internal/storage/driver"
ledgerstore "github.com/formancehq/ledger/internal/storage/ledger"
systemstore "github.com/formancehq/ledger/internal/storage/system"
"github.com/google/uuid"
"github.com/uptrace/bun"
Expand Down Expand Up @@ -184,7 +185,7 @@ func newStorageDriver(t docker.T, driverOptions ...driver.Option) *driver.Driver
require.NoError(t, err)

d := driver.New(
db,
ledgerstore.NewFactory(db),
systemstore.New(db),
bucket.NewDefaultFactory(db),
driverOptions...,
Expand Down
14 changes: 13 additions & 1 deletion internal/storage/driver/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package driver
import (
"context"
"github.com/formancehq/ledger/internal/storage/bucket"
ledgerstore "github.com/formancehq/ledger/internal/storage/ledger"
systemstore "github.com/formancehq/ledger/internal/storage/system"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/trace"
Expand Down Expand Up @@ -32,12 +33,23 @@ func NewFXModule() fx.Option {
}),
fx.Provide(func(
db *bun.DB,
tracerProvider trace.TracerProvider,
meterProvider metric.MeterProvider,
) ledgerstore.Factory {
return ledgerstore.NewFactory(db,
ledgerstore.WithMeter(meterProvider.Meter("store")),
ledgerstore.WithTracer(tracerProvider.Tracer("store")),
)
}),
fx.Provide(func(
bucketFactory bucket.Factory,
ledgerStoreFactory ledgerstore.Factory,
systemStore systemstore.Store,
tracerProvider trace.TracerProvider,
meterProvider metric.MeterProvider,
) (*Driver, error) {
return New(db,
return New(
ledgerStoreFactory,
systemStore,
bucketFactory,
WithMeter(meterProvider.Meter("store")),
Expand Down
27 changes: 27 additions & 0 deletions internal/storage/ledger/factory.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package ledger

import (
ledger "github.com/formancehq/ledger/internal"
"github.com/formancehq/ledger/internal/storage/bucket"
"github.com/uptrace/bun"
)

type Factory interface {
Create(bucket.Bucket, ledger.Ledger) *Store
}

type DefaultFactory struct {
db *bun.DB
options []Option
}

func NewFactory(db *bun.DB, options ...Option) *DefaultFactory {
return &DefaultFactory{
db: db,
options: options,
}
}

func (d *DefaultFactory) Create(b bucket.Bucket, l ledger.Ledger) *Store {
return New(d.db, b, l, d.options...)
}
6 changes: 5 additions & 1 deletion internal/storage/ledger/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,11 @@ func TestMain(m *testing.M) {
bunDB.SetMaxOpenConns(100)

require.NoError(t, systemstore.Migrate(logging.TestingContext(), bunDB))
defaultDriver.SetValue(driver.New(bunDB, systemstore.New(bunDB), bucket.NewDefaultFactory(bunDB)))
defaultDriver.SetValue(driver.New(
ledgerstore.NewFactory(bunDB),
systemstore.New(bunDB),
bucket.NewDefaultFactory(bunDB),
))

return bunDB
})
Expand Down
12 changes: 8 additions & 4 deletions internal/storage/ledger/transactions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -622,24 +622,28 @@ func TestTransactionsInsert(t *testing.T) {
bunDB, err := bunconnect.OpenSQLDB(ctx, db.ConnectionOptions())
require.NoError(t, err)

driver := driver.New(bunDB, systemstore.New(bunDB), bucket.NewDefaultFactory(bunDB))
driver := driver.New(
ledgerstore.NewFactory(bunDB),
systemstore.New(bunDB),
bucket.NewDefaultFactory(bunDB),
)
require.NoError(t, driver.Initialize(ctx))

ledgerName := uuid.NewString()[:8]

l := ledger.MustNewWithDefault(ledgerName)
l.Bucket = ledgerName

migrator := bucket.GetMigrator(driver.GetDB(), ledgerName)
migrator := bucket.GetMigrator(bunDB, ledgerName)
for i := 0; i < bucket.MinimalSchemaVersion; i++ {
require.NoError(t, migrator.UpByOne(ctx))
}

b := bucket.NewDefault(driver.GetDB(), noop.Tracer{}, ledgerName)
b := bucket.NewDefault(bunDB, noop.Tracer{}, ledgerName)
err = b.AddLedger(ctx, l)
require.NoError(t, err)

store := ledgerstore.New(driver.GetDB(), b, l)
store := ledgerstore.New(bunDB, b, l)

const nbTry = 100

Expand Down
12 changes: 12 additions & 0 deletions internal/storage/system/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"github.com/formancehq/go-libs/v2/metadata"
"github.com/formancehq/go-libs/v2/migrations"
"github.com/formancehq/go-libs/v2/platform/postgres"
ledgercontroller "github.com/formancehq/ledger/internal/controller/ledger"
systemcontroller "github.com/formancehq/ledger/internal/controller/system"
Expand All @@ -23,6 +24,9 @@ type Store interface {
ListLedgers(ctx context.Context, q ledgercontroller.ListLedgersQuery) (*bunpaginate.Cursor[ledger.Ledger], error)
GetLedger(ctx context.Context, name string) (*ledger.Ledger, error)
GetDistinctBuckets(ctx context.Context) ([]string, error)

Migrate(ctx context.Context, options ...migrations.Option) error
GetMigrator(options ...migrations.Option) *migrations.Migrator
}

const (
Expand Down Expand Up @@ -113,6 +117,14 @@ func (d *DefaultStore) GetLedger(ctx context.Context, name string) (*ledger.Ledg
return ret, nil
}

func (d *DefaultStore) Migrate(ctx context.Context, options ...migrations.Option) error {
return d.GetMigrator(options...).Up(ctx)
}

func (d *DefaultStore) GetMigrator(options ...migrations.Option) *migrations.Migrator {
return GetMigrator(d.db, options...)
}

func (d *DefaultStore) GetDB() *bun.DB {
return d.db
}
Expand Down
7 changes: 6 additions & 1 deletion test/migrations/upgrade_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/formancehq/go-libs/v2/testing/platform/pgtesting"
"github.com/formancehq/ledger/internal/storage/bucket"
"github.com/formancehq/ledger/internal/storage/driver"
"github.com/formancehq/ledger/internal/storage/ledger"
systemstore "github.com/formancehq/ledger/internal/storage/system"
"github.com/ory/dockertest/v3"
dockerlib "github.com/ory/dockertest/v3/docker"
Expand Down Expand Up @@ -53,7 +54,11 @@ func TestMigrations(t *testing.T) {
require.NoError(t, err)

// Migrate database
driver := driver.New(db, systemstore.New(db), bucket.NewDefaultFactory(db))
driver := driver.New(
ledger.NewFactory(db),
systemstore.New(db),
bucket.NewDefaultFactory(db),
)
require.NoError(t, driver.Initialize(ctx))
require.NoError(t, driver.UpgradeAllBuckets(ctx, make(chan struct{})))
}
Expand Down

0 comments on commit 821ff22

Please sign in to comment.