Skip to content

Commit

Permalink
feat: add system store
Browse files Browse the repository at this point in the history
  • Loading branch information
gfyrag committed Nov 22, 2024
1 parent d6ea95e commit 2a30286
Show file tree
Hide file tree
Showing 23 changed files with 462 additions and 180 deletions.
8 changes: 7 additions & 1 deletion cmd/buckets_upgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ import (
"github.com/formancehq/go-libs/v2/bun/bunconnect"
"github.com/formancehq/go-libs/v2/logging"
"github.com/formancehq/go-libs/v2/service"
"github.com/formancehq/ledger/internal/storage/bucket"
"github.com/formancehq/ledger/internal/storage/driver"
systemstore "github.com/formancehq/ledger/internal/storage/system"
"github.com/spf13/cobra"
)

Expand Down Expand Up @@ -51,7 +53,11 @@ func getDriver(cmd *cobra.Command) (*driver.Driver, error) {
return nil, err
}

driver := driver.New(db)
driver := driver.New(
db,
systemstore.New(db),
bucket.NewDefaultFactory(db),
)
if err := driver.Initialize(cmd.Context()); err != nil {
return nil, err
}
Expand Down
4 changes: 2 additions & 2 deletions cmd/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package cmd

import (
"github.com/formancehq/go-libs/v2/logging"
systemstore "github.com/formancehq/ledger/internal/storage/system"
"net/http"
"net/http/pprof"
"time"
Expand All @@ -10,7 +11,6 @@ import (
"github.com/formancehq/go-libs/v2/health"
"github.com/formancehq/go-libs/v2/httpserver"
"github.com/formancehq/go-libs/v2/otlp"
"github.com/formancehq/ledger/internal/storage/driver"
"github.com/go-chi/chi/v5"
"go.opentelemetry.io/otel/sdk/metric"

Expand Down Expand Up @@ -137,7 +137,7 @@ func NewServeCommand() *cobra.Command {
otlptraces.AddFlags(cmd.Flags())
auth.AddFlags(cmd.Flags())
publish.AddFlags(ServiceName, cmd.Flags(), func(cd *publish.ConfigDefault) {
cd.PublisherCircuitBreakerSchema = driver.SchemaSystem
cd.PublisherCircuitBreakerSchema = systemstore.SchemaSystem
})
iam.AddFlags(cmd.Flags())

Expand Down
40 changes: 31 additions & 9 deletions internal/storage/bucket/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,25 +6,47 @@ import (
ledger "github.com/formancehq/ledger/internal"
"github.com/uptrace/bun"
"go.opentelemetry.io/otel/trace"
"go.opentelemetry.io/otel/trace/noop"
)

type Bucket interface {
Migrate(ctx context.Context, tracer trace.Tracer, minimalVersionReached chan struct{}, opts ...migrations.Option) error
AddLedger(ctx context.Context, ledger ledger.Ledger, db bun.IDB) error
Migrate(ctx context.Context, minimalVersionReached chan struct{}, opts ...migrations.Option) error
AddLedger(ctx context.Context, ledger ledger.Ledger) error
HasMinimalVersion(ctx context.Context) (bool, error)
GetMigrationsInfo(ctx context.Context) ([]migrations.Info, error)
}

type Factory interface {
Create(db *bun.DB, name string) Bucket
Create(name string) Bucket
}

type DefaultFactory struct {}
type DefaultFactory struct {
tracer trace.Tracer
db *bun.DB
}

func (f *DefaultFactory) Create(name string) Bucket {
return NewDefault(f.db, f.tracer, name)
}

func (f *DefaultFactory) Create(db *bun.DB, name string) Bucket {
return NewDefault(db, name)
func NewDefaultFactory(db *bun.DB, options ...DefaultFactoryOption) *DefaultFactory {
ret := &DefaultFactory{
db: db,
}
for _, option := range append(defaultOptions, options...) {
option(ret)
}
return ret
}

func NewDefaultFactory() *DefaultFactory {
return &DefaultFactory{}
}
type DefaultFactoryOption func(factory *DefaultFactory)

func WithTracer(tracer trace.Tracer) DefaultFactoryOption {
return func(factory *DefaultFactory) {
factory.tracer = tracer
}
}

var defaultOptions = []DefaultFactoryOption{
WithTracer(noop.Tracer{}),
}
12 changes: 7 additions & 5 deletions internal/storage/bucket/default_bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,11 @@ const MinimalSchemaVersion = 12
type DefaultBucket struct {
name string
db *bun.DB
tracer trace.Tracer
}

func (b *DefaultBucket) Migrate(ctx context.Context, tracer trace.Tracer, minimalVersionReached chan struct{}, options ...migrations.Option) error {
return migrate(ctx, tracer, b.db, b.name, minimalVersionReached, options...)
func (b *DefaultBucket) Migrate(ctx context.Context, minimalVersionReached chan struct{}, options ...migrations.Option) error {
return migrate(ctx, b.tracer, b.db, b.name, minimalVersionReached, options...)
}

func (b *DefaultBucket) HasMinimalVersion(ctx context.Context) (bool, error) {
Expand All @@ -39,7 +40,7 @@ func (b *DefaultBucket) GetMigrationsInfo(ctx context.Context) ([]migrations.Inf
return GetMigrator(b.db, b.name).GetMigrations(ctx)
}

func (b *DefaultBucket) AddLedger(ctx context.Context, l ledger.Ledger, db bun.IDB) error {
func (b *DefaultBucket) AddLedger(ctx context.Context, l ledger.Ledger) error {

for _, setup := range ledgerSetups {
if l.Features.Match(setup.requireFeatures) {
Expand All @@ -49,7 +50,7 @@ func (b *DefaultBucket) AddLedger(ctx context.Context, l ledger.Ledger, db bun.I
return fmt.Errorf("executing template: %w", err)
}

_, err := db.ExecContext(ctx, buf.String())
_, err := b.db.ExecContext(ctx, buf.String())
if err != nil {
return fmt.Errorf("executing sql: %w", err)
}
Expand All @@ -59,10 +60,11 @@ func (b *DefaultBucket) AddLedger(ctx context.Context, l ledger.Ledger, db bun.I
return nil
}

func NewDefault(db *bun.DB, name string) *DefaultBucket {
func NewDefault(db *bun.DB, tracer trace.Tracer, name string) *DefaultBucket {
return &DefaultBucket{
db: db,
name: name,
tracer: tracer,
}
}

Expand Down
6 changes: 3 additions & 3 deletions internal/storage/bucket/default_bucket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ package bucket_test
import (
"github.com/formancehq/go-libs/v2/bun/bundebug"
"github.com/formancehq/ledger/internal/storage/bucket"
"github.com/formancehq/ledger/internal/storage/driver"
"github.com/formancehq/ledger/internal/storage/system"
"go.opentelemetry.io/otel/trace/noop"
"testing"

Expand All @@ -30,6 +30,6 @@ func TestBuckets(t *testing.T) {

require.NoError(t, driver.Migrate(ctx, db))

b := bucket.NewDefault(db, name)
require.NoError(t, b.Migrate(ctx, noop.Tracer{}, make(chan struct{})))
b := bucket.NewDefault(db, noop.Tracer{}, name)
require.NoError(t, b.Migrate(ctx, make(chan struct{})))
}
2 changes: 1 addition & 1 deletion internal/storage/bucket/migrations_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"github.com/formancehq/go-libs/v2/logging"
"github.com/formancehq/go-libs/v2/migrations"
"github.com/formancehq/ledger/internal/storage/bucket"
"github.com/formancehq/ledger/internal/storage/driver"
"github.com/formancehq/ledger/internal/storage/system"
"github.com/google/uuid"
_ "github.com/jackc/pgx/v5/stdlib"
"github.com/stretchr/testify/require"
Expand Down
140 changes: 140 additions & 0 deletions internal/storage/driver/bucket_generated_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 2a30286

Please sign in to comment.