Skip to content

Commit

Permalink
feat: optimize queries package (#187)
Browse files Browse the repository at this point in the history
  • Loading branch information
paul-nicolas authored and flemzord committed May 12, 2023
1 parent 5f387df commit 1dbd425
Show file tree
Hide file tree
Showing 24 changed files with 846 additions and 222 deletions.
14 changes: 8 additions & 6 deletions pkg/api/internal/testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (
"github.com/formancehq/ledger/pkg/core"
"github.com/formancehq/ledger/pkg/ledger"
"github.com/formancehq/ledger/pkg/ledger/lock"
"github.com/formancehq/ledger/pkg/ledger/query"
"github.com/formancehq/ledger/pkg/ledger/monitor"
"github.com/formancehq/ledger/pkg/ledgertesting"
"github.com/formancehq/ledger/pkg/storage"
sharedapi "github.com/formancehq/stack/libs/go-libs/api"
Expand Down Expand Up @@ -206,12 +206,14 @@ func RunTest(t *testing.T, callback func(api chi.Router, storageDriver storage.D
storageDriver := ledgertesting.StorageDriver(t)
require.NoError(t, storageDriver.Initialize(context.Background()))

queryWorker := query.NewWorker(query.DefaultWorkerConfig, storageDriver, query.NewNoOpMonitor())
go func() {
require.NoError(t, queryWorker.Run(context.Background()))
}()
ledgerStore, _, err := storageDriver.GetLedgerStore(context.Background(), uuid.New(), true)
require.NoError(t, err)

modified, err := ledgerStore.Initialize(context.Background())
require.NoError(t, err)
require.True(t, modified)

resolver := ledger.NewResolver(storageDriver, lock.NewInMemory(), queryWorker, false)
resolver := ledger.NewResolver(storageDriver, monitor.NewNoOpMonitor(), lock.NewInMemory(), false)
router := routes.NewRouter(storageDriver, "latest", resolver,
logging.FromContext(context.Background()), &health.HealthController{})

Expand Down
4 changes: 2 additions & 2 deletions pkg/api/middlewares/ledger_middleware.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package middlewares

import (
"context"
"net/http"

"github.com/formancehq/ledger/pkg/api/apierrors"
Expand Down Expand Up @@ -34,7 +33,8 @@ func LedgerMiddleware(resolver *ledger.Resolver) func(handler http.Handler) http
apierrors.ResponseError(w, r, err)
return
}
defer l.Close(context.Background())
// TODO(polo/gfyrag): close ledger if not used for x minutes
// defer l.Close(context.Background())

r = r.WithContext(controllers.ContextWithLedger(r.Context(), l))

Expand Down
4 changes: 2 additions & 2 deletions pkg/bus/module.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package bus

import (
"github.com/formancehq/ledger/pkg/ledger/query"
"github.com/formancehq/ledger/pkg/ledger/monitor"
"go.uber.org/fx"
)

func LedgerMonitorModule() fx.Option {
return fx.Decorate(fx.Annotate(newLedgerMonitor, fx.As(new(query.Monitor))))
return fx.Decorate(fx.Annotate(newLedgerMonitor, fx.As(new(monitor.Monitor))))
}
4 changes: 2 additions & 2 deletions pkg/bus/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (

"github.com/ThreeDotsLabs/watermill/message"
"github.com/formancehq/ledger/pkg/core"
"github.com/formancehq/ledger/pkg/ledger/query"
"github.com/formancehq/ledger/pkg/ledger/monitor"
"github.com/formancehq/stack/libs/go-libs/logging"
"github.com/formancehq/stack/libs/go-libs/publish"
)
Expand All @@ -14,7 +14,7 @@ type ledgerMonitor struct {
publisher message.Publisher
}

var _ query.Monitor = &ledgerMonitor{}
var _ monitor.Monitor = &ledgerMonitor{}

func newLedgerMonitor(publisher message.Publisher) *ledgerMonitor {
m := &ledgerMonitor{
Expand Down
5 changes: 5 additions & 0 deletions pkg/core/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@ type Transaction struct {
ID uint64 `json:"txid"`
}

type TransactionWithMetadata struct {
ID uint64
Metadata Metadata
}

func (t Transaction) WithPostings(postings ...Posting) Transaction {
t.TransactionData = t.TransactionData.WithPostings(postings...)
return t
Expand Down
5 changes: 5 additions & 0 deletions pkg/ledger/ledger.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,11 @@ func (l *Ledger) Close(ctx context.Context) error {
if err := l.store.Close(ctx); err != nil {
return errors.Wrap(err, "closing store")
}

if err := l.queryWorker.Stop(ctx); err != nil {
return errors.Wrap(err, "stopping query worker")
}

return nil
}

Expand Down
14 changes: 8 additions & 6 deletions pkg/ledger/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
"testing"

"github.com/formancehq/ledger/pkg/ledger/lock"
"github.com/formancehq/ledger/pkg/ledger/query"
"github.com/formancehq/ledger/pkg/ledger/monitor"
"github.com/formancehq/ledger/pkg/ledgertesting"
"github.com/formancehq/stack/libs/go-libs/logging"
"github.com/formancehq/stack/libs/go-libs/pgtesting"
Expand All @@ -30,12 +30,14 @@ func newResolver(t interface{ pgtesting.TestingT }) *Resolver {
storageDriver := ledgertesting.StorageDriver(t)
require.NoError(t, storageDriver.Initialize(context.Background()))

queryWorker := query.NewWorker(query.DefaultWorkerConfig, storageDriver, query.NewNoOpMonitor())
go func() {
require.NoError(t, queryWorker.Run(context.Background()))
}()
ledgerStore, _, err := storageDriver.GetLedgerStore(context.Background(), uuid.New(), true)
require.NoError(t, err)

modified, err := ledgerStore.Initialize(context.Background())
require.NoError(t, err)
require.True(t, modified)

return NewResolver(storageDriver, lock.NewInMemory(), queryWorker, false)
return NewResolver(storageDriver, monitor.NewNoOpMonitor(), lock.NewInMemory(), false)
}

func runOnLedger(t interface {
Expand Down
8 changes: 4 additions & 4 deletions pkg/ledger/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package ledger

import (
"github.com/formancehq/ledger/pkg/ledger/lock"
"github.com/formancehq/ledger/pkg/ledger/query"
"github.com/formancehq/ledger/pkg/ledger/monitor"
"github.com/formancehq/ledger/pkg/storage"
"go.uber.org/fx"
)
Expand All @@ -12,11 +12,11 @@ func Module(allowPastTimestamp bool) fx.Option {
lock.Module(),
fx.Provide(func(
storageDriver storage.Driver,
monitor monitor.Monitor,
locker lock.Locker,
queryWorker *query.Worker,
) *Resolver {
return NewResolver(storageDriver, locker, queryWorker, allowPastTimestamp)
return NewResolver(storageDriver, monitor, locker, allowPastTimestamp)
}),
query.Module(),
fx.Provide(fx.Annotate(monitor.NewNoOpMonitor, fx.As(new(monitor.Monitor)))),
)
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package query
package monitor

import (
"context"
Expand Down
35 changes: 0 additions & 35 deletions pkg/ledger/query/module.go

This file was deleted.

Loading

0 comments on commit 1dbd425

Please sign in to comment.