Skip to content

Commit

Permalink
chore: merge some packages
Browse files Browse the repository at this point in the history
  • Loading branch information
gfyrag committed Oct 23, 2024
1 parent 2f2b812 commit 4bb42be
Show file tree
Hide file tree
Showing 28 changed files with 255 additions and 380 deletions.
4 changes: 2 additions & 2 deletions cmd/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package cmd

import (
"context"
"github.com/formancehq/ledger/internal/storage/driver"
"net/http"

"github.com/formancehq/ledger/internal/bus"
Expand All @@ -23,7 +24,6 @@ import (

"github.com/formancehq/go-libs/ballast"
"github.com/formancehq/go-libs/service"
systemstore "github.com/formancehq/ledger/internal/storage/system"
"github.com/spf13/cobra"
"go.uber.org/fx"

Expand Down Expand Up @@ -113,7 +113,7 @@ func NewServeCommand() *cobra.Command {
otlptraces.AddFlags(cmd.Flags())
auth.AddFlags(cmd.Flags())
publish.AddFlags(ServiceName, cmd.Flags(), func(cd *publish.ConfigDefault) {
cd.PublisherCircuitBreakerSchema = systemstore.Schema
cd.PublisherCircuitBreakerSchema = driver.SchemaSystem
})
iam.AddFlags(cmd.Flags())

Expand Down
1 change: 0 additions & 1 deletion internal/controller/ledger/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ type TX interface {
UpsertAccount(ctx context.Context, account *ledger.Account) (bool, error)
DeleteAccountMetadata(ctx context.Context, address, key string) error
InsertLog(ctx context.Context, log *ledger.Log) error
SwitchLedgerState(ctx context.Context, name string, state string) error

LockLedger(ctx context.Context) error
ListLogs(ctx context.Context, q GetLogsQuery) (*bunpaginate.Cursor[ledger.Log], error)
Expand Down
5 changes: 5 additions & 0 deletions internal/ledger.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@ func (l Ledger) HasFeature(feature, value string) bool {
return l.Features[feature] == value
}

func (l Ledger) WithMetadata(m metadata.Metadata) Ledger {
l.Metadata = m
return l
}

func New(name string, configuration Configuration) (*Ledger, error) {

if err := configuration.Validate(); err != nil {
Expand Down
12 changes: 6 additions & 6 deletions internal/storage/bucket/bucket_test.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
//go:build it

package bucket
package bucket_test

import (
"github.com/formancehq/ledger/internal/storage/bucket"
"github.com/formancehq/ledger/internal/storage/driver"
"testing"

systemstore "github.com/formancehq/ledger/internal/storage/system"

"github.com/formancehq/go-libs/bun/bunconnect"

"github.com/formancehq/go-libs/logging"
Expand All @@ -24,8 +24,8 @@ func TestBuckets(t *testing.T) {
db, err := bunconnect.OpenSQLDB(ctx, pgDatabase.ConnectionOptions())
require.NoError(t, err)

require.NoError(t, systemstore.Migrate(ctx, db))
require.NoError(t, driver.Migrate(ctx, db))

bucket := New(db, name)
require.NoError(t, bucket.Migrate(ctx))
b := bucket.New(db, name)
require.NoError(t, b.Migrate(ctx))
}
2 changes: 1 addition & 1 deletion internal/storage/bucket/main_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//go:build it

package bucket
package bucket_test

import (
"testing"
Expand Down
30 changes: 5 additions & 25 deletions internal/storage/driver/adapters.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,38 +3,18 @@ package driver
import (
"context"

"github.com/formancehq/go-libs/bun/bunpaginate"
"github.com/formancehq/go-libs/metadata"
systemcontroller "github.com/formancehq/ledger/internal/controller/system"
systemstore "github.com/formancehq/ledger/internal/storage/system"

ledger "github.com/formancehq/ledger/internal"
ledgercontroller "github.com/formancehq/ledger/internal/controller/ledger"
systemcontroller "github.com/formancehq/ledger/internal/controller/system"
ledgerstore "github.com/formancehq/ledger/internal/storage/ledger"
)

type DefaultStorageDriverAdapter struct {
d *Driver
}

func (d *DefaultStorageDriverAdapter) GetLedger(ctx context.Context, name string) (*ledger.Ledger, error) {
return systemstore.New(d.d.db).GetLedger(ctx, name)
}

func (d *DefaultStorageDriverAdapter) ListLedgers(ctx context.Context, query ledgercontroller.ListLedgersQuery) (*bunpaginate.Cursor[ledger.Ledger], error) {
return systemstore.New(d.d.db).ListLedgers(ctx, query)
}

func (d *DefaultStorageDriverAdapter) UpdateLedgerMetadata(ctx context.Context, name string, m metadata.Metadata) error {
return systemstore.New(d.d.db).UpdateLedgerMetadata(ctx, name, m)
}

func (d *DefaultStorageDriverAdapter) DeleteLedgerMetadata(ctx context.Context, param string, key string) error {
return systemstore.New(d.d.db).DeleteLedgerMetadata(ctx, param, key)
*Driver
}

func (d *DefaultStorageDriverAdapter) OpenLedger(ctx context.Context, name string) (ledgercontroller.Store, *ledger.Ledger, error) {
store, l, err := d.d.OpenLedger(ctx, name)
store, l, err := d.Driver.OpenLedger(ctx, name)
if err != nil {
return nil, nil, err
}
Expand All @@ -43,12 +23,12 @@ func (d *DefaultStorageDriverAdapter) OpenLedger(ctx context.Context, name strin
}

func (d *DefaultStorageDriverAdapter) CreateLedger(ctx context.Context, l *ledger.Ledger) error {
_, err := d.d.CreateLedger(ctx, l)
_, err := d.Driver.CreateLedger(ctx, l)
return err
}

func NewControllerStorageDriverAdapter(d *Driver) *DefaultStorageDriverAdapter {
return &DefaultStorageDriverAdapter{d: d}
return &DefaultStorageDriverAdapter{Driver: d}
}

var _ systemcontroller.Store = (*DefaultStorageDriverAdapter)(nil)
109 changes: 91 additions & 18 deletions internal/storage/driver/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,23 +3,28 @@ package driver
import (
"context"
"database/sql"
. "github.com/formancehq/go-libs/collectionutils"
"github.com/formancehq/go-libs/metadata"
"github.com/formancehq/go-libs/platform/postgres"

systemcontroller "github.com/formancehq/ledger/internal/controller/system"

ledgercontroller "github.com/formancehq/ledger/internal/controller/ledger"

"github.com/formancehq/go-libs/bun/bunpaginate"
"github.com/formancehq/go-libs/collectionutils"
ledger "github.com/formancehq/ledger/internal"
"github.com/formancehq/ledger/internal/storage/bucket"
ledgerstore "github.com/formancehq/ledger/internal/storage/ledger"
"github.com/formancehq/ledger/internal/storage/system"
"github.com/pkg/errors"
"github.com/uptrace/bun"

"github.com/formancehq/go-libs/logging"
)

const (
SchemaSystem = "_system"
)

type Driver struct {
db *bun.DB
}
Expand Down Expand Up @@ -94,11 +99,24 @@ func (d *Driver) CreateLedger(ctx context.Context, l *ledger.Ledger) (*ledgersto
_ = tx.Rollback()
}()

created, err := system.New(tx).CreateLedger(ctx, l)
if l.Metadata == nil {
l.Metadata = metadata.Metadata{}
}

ret, err := d.db.NewInsert().
Model(l).
Ignore().
Returning("id").
Exec(ctx)
if err != nil {
return nil, postgres.ResolveError(err)
}

affected, err := ret.RowsAffected()
if err != nil {
return nil, errors.Wrap(err, "creating ledger")
}
if !created {
if affected == 0 {
return nil, systemcontroller.ErrLedgerAlreadyExists
}

Expand All @@ -115,27 +133,86 @@ func (d *Driver) CreateLedger(ctx context.Context, l *ledger.Ledger) (*ledgersto
}

func (d *Driver) OpenLedger(ctx context.Context, name string) (*ledgerstore.Store, *ledger.Ledger, error) {
l, err := system.New(d.db).GetLedger(ctx, name)
if err != nil {
return nil, nil, errors.Wrap(err, "opening ledger")
ret := &ledger.Ledger{}
if err := d.db.NewSelect().
Model(ret).
Column("*").
Where("name = ?", name).
Scan(ctx); err != nil {
return nil, nil, postgres.ResolveError(err)
}

return ledgerstore.New(d.db, *l), l, nil
return ledgerstore.New(d.db, *ret), ret, nil
}

func (d *Driver) Initialize(ctx context.Context) error {
logging.FromContext(ctx).Debugf("Initialize driver")
return errors.Wrap(system.Migrate(ctx, d.db), "migrating system store")
return errors.Wrap(Migrate(ctx, d.db), "migrating system store")
}

func (d *Driver) UpgradeAllBuckets(ctx context.Context) error {
func (d *Driver) UpdateLedgerMetadata(ctx context.Context, name string, m metadata.Metadata) error {
_, err := d.db.NewUpdate().
Model(&ledger.Ledger{}).
Set("metadata = metadata || ?", m).
Where("name = ?", name).
Exec(ctx)
return err
}

func (d *Driver) UpdateLedgerState(ctx context.Context, name string, state string) error {
_, err := d.db.NewUpdate().
Model(&ledger.Ledger{}).
Set("state = ?", state).
Where("name = ?", name).
Exec(ctx)
return err
}

systemStore := system.New(d.db)
func (d *Driver) DeleteLedgerMetadata(ctx context.Context, name string, key string) error {
_, err := d.db.NewUpdate().
Model(&ledger.Ledger{}).
Set("metadata = metadata - ?", key).
Where("name = ?", name).
Exec(ctx)
return err
}

func (d *Driver) ListLedgers(ctx context.Context, q ledgercontroller.ListLedgersQuery) (*bunpaginate.Cursor[ledger.Ledger], error) {
query := d.db.NewSelect().
Model(&ledger.Ledger{}).
Column("*").
Order("addedat asc")

return bunpaginate.UsingOffset[ledgercontroller.PaginatedQueryOptions[struct{}], ledger.Ledger](
ctx,
query,
bunpaginate.OffsetPaginatedQuery[ledgercontroller.PaginatedQueryOptions[struct{}]](q),
)
}

bucketsNames := collectionutils.Set[string]{}
func (d *Driver) GetLedger(ctx context.Context, name string) (*ledger.Ledger, error) {
ret := &ledger.Ledger{}
if err := d.db.NewSelect().
Model(ret).
Column("*").
Where("name = ?", name).
Scan(ctx); err != nil {
return nil, postgres.ResolveError(err)
}

return ret, nil
}

func (d *Driver) UpgradeBucket(ctx context.Context, name string) error {
return bucket.New(d.db, name).Migrate(ctx)
}

func (d *Driver) UpgradeAllBuckets(ctx context.Context) error {

bucketsNames := Set[string]{}
err := bunpaginate.Iterate(ctx, ledgercontroller.NewListLedgersQuery(10),
func(ctx context.Context, q ledgercontroller.ListLedgersQuery) (*bunpaginate.Cursor[ledger.Ledger], error) {
return systemStore.ListLedgers(ctx, q)
return d.ListLedgers(ctx, q)
},
func(cursor *bunpaginate.Cursor[ledger.Ledger]) error {
for _, name := range cursor.Data {
Expand All @@ -147,7 +224,7 @@ func (d *Driver) UpgradeAllBuckets(ctx context.Context) error {
return err
}

for _, bucketName := range collectionutils.Keys(bucketsNames) {
for _, bucketName := range Keys(bucketsNames) {
b := bucket.New(d.db, bucketName)

logging.FromContext(ctx).Infof("Upgrading bucket '%s'", bucketName)
Expand All @@ -159,10 +236,6 @@ func (d *Driver) UpgradeAllBuckets(ctx context.Context) error {
return nil
}

func (d *Driver) UpgradeBucket(ctx context.Context, name string) error {
return bucket.New(d.db, name).Migrate(ctx)
}

func New(db *bun.DB) *Driver {
return &Driver{
db: db,
Expand Down
Loading

0 comments on commit 4bb42be

Please sign in to comment.