Skip to content

Commit

Permalink
fix: query worker init
Browse files Browse the repository at this point in the history
  • Loading branch information
gfyrag authored and flemzord committed May 12, 2023
1 parent d9e59ac commit 2d126dc
Show file tree
Hide file tree
Showing 7 changed files with 140 additions and 130 deletions.
2 changes: 1 addition & 1 deletion components/ledger/pkg/ledger/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,6 @@ func Module(configuration Configuration) fx.Option {
}),
fx.Provide(fx.Annotate(monitor.NewNoOpMonitor, fx.As(new(monitor.Monitor)))),
fx.Provide(fx.Annotate(metrics.NewNoOpMetricsRegistry, fx.As(new(metrics.GlobalMetricsRegistry)))),
query.QueryInitModule(),
query.InitModule(),
)
}
107 changes: 107 additions & 0 deletions components/ledger/pkg/ledger/query/init.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
package query

import (
"context"

"github.com/formancehq/ledger/pkg/ledger/monitor"
"github.com/formancehq/ledger/pkg/opentelemetry/metrics"
"github.com/formancehq/ledger/pkg/storage"
"github.com/formancehq/stack/libs/go-libs/errorsutil"
"github.com/pkg/errors"
"golang.org/x/sync/errgroup"
)

func initLedger(
ctx context.Context,
ledgerName string,
store Store,
monitor monitor.Monitor,
metricsRegistry metrics.PerLedgerMetricsRegistry,
) (uint64, error) {
if !store.IsInitialized() {
return 0, nil
}

nextLogIDToProcess, err := store.GetNextLogID(ctx)
if err != nil && !storage.IsNotFoundError(err) {
return 0, errorsutil.NewError(ErrStorage,
errors.Wrap(err, "reading last log"))
}

logs, err := store.ReadLogsStartingFromID(ctx, nextLogIDToProcess)
if err != nil {
return 0, errorsutil.NewError(ErrStorage,
errors.Wrap(err, "reading logs since last ID"))
}

if len(logs) == 0 {
return 0, nil
}

if err := processLogs(ctx, ledgerName, store, monitor, logs...); err != nil {
return 0, errors.Wrap(err, "processing logs")
}

metricsRegistry.QueryProcessedLogs().Add(ctx, int64(len(logs)))

if err := store.UpdateNextLogID(ctx, logs[len(logs)-1].ID+1); err != nil {
return 0, errorsutil.NewError(ErrStorage,
errors.Wrap(err, "updating last read log"))
}
lastProcessedLogID := logs[len(logs)-1].ID

return lastProcessedLogID, nil
}

type InitLedger struct {
driver storage.Driver
monitor monitor.Monitor
metricsRegistry metrics.PerLedgerMetricsRegistry
}

func (iq InitLedger) initLedgers(ctx context.Context) error {
ledgers, err := iq.driver.GetSystemStore().ListLedgers(ctx)
if err != nil {
return err
}

eg, ctxGroup := errgroup.WithContext(ctx)
for _, ledger := range ledgers {
_ledger := ledger
eg.Go(func() error {
store, _, err := iq.driver.GetLedgerStore(ctxGroup, _ledger, false)
if err != nil && !storage.IsNotFoundError(err) {
return err
}

if storage.IsNotFoundError(err) {
return nil
}

if !store.IsInitialized() {
return nil
}

if _, err := initLedger(
ctxGroup,
_ledger,
NewDefaultStore(store),
iq.monitor,
iq.metricsRegistry,
); err != nil {
return err
}

return nil
})
}

return eg.Wait()
}

func NewInitLedgers(driver storage.Driver, monitor monitor.Monitor) *InitLedger {
return &InitLedger{
driver: driver,
monitor: monitor,
}
}
63 changes: 3 additions & 60 deletions components/ledger/pkg/ledger/query/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,13 @@ package query
import (
"context"

"github.com/formancehq/ledger/pkg/ledger/monitor"
"github.com/formancehq/ledger/pkg/opentelemetry/metrics"
"github.com/formancehq/ledger/pkg/storage"
"go.uber.org/fx"
"golang.org/x/sync/errgroup"
)

func QueryInitModule() fx.Option {
func InitModule() fx.Option {
return fx.Options(
fx.Provide(NewInitQuery),
fx.Invoke(func(lc fx.Lifecycle, initQuery *InitQuery) {
fx.Provide(NewInitLedgers),
fx.Invoke(func(lc fx.Lifecycle, initQuery *InitLedger) {
lc.Append(fx.Hook{
OnStart: func(ctx context.Context) error {
return initQuery.initLedgers(ctx)
Expand All @@ -22,56 +18,3 @@ func QueryInitModule() fx.Option {
}),
)
}

type InitQuery struct {
driver storage.Driver
monitor monitor.Monitor
metricsRegistry metrics.PerLedgerMetricsRegistry
}

func (iq InitQuery) initLedgers(ctx context.Context) error {
ledgers, err := iq.driver.GetSystemStore().ListLedgers(ctx)
if err != nil {
return err
}

eg, ctxGroup := errgroup.WithContext(ctx)
for _, ledger := range ledgers {
_ledger := ledger
eg.Go(func() error {
store, _, err := iq.driver.GetLedgerStore(ctxGroup, _ledger, false)
if err != nil && !storage.IsNotFoundError(err) {
return err
}

if storage.IsNotFoundError(err) {
return nil
}

if !store.IsInitialized() {
return nil
}

if _, err := initLedger(
ctxGroup,
_ledger,
NewDefaultStore(store),
iq.monitor,
iq.metricsRegistry,
); err != nil {
return err
}

return nil
})
}

return eg.Wait()
}

func NewInitQuery(driver storage.Driver, monitor monitor.Monitor) *InitQuery {
return &InitQuery{
driver: driver,
monitor: monitor,
}
}
59 changes: 2 additions & 57 deletions components/ledger/pkg/ledger/query/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"github.com/formancehq/ledger/pkg/ledger/aggregator"
"github.com/formancehq/ledger/pkg/ledger/monitor"
"github.com/formancehq/ledger/pkg/opentelemetry/metrics"
"github.com/formancehq/ledger/pkg/storage"
"github.com/formancehq/stack/libs/go-libs/errorsutil"
"github.com/formancehq/stack/libs/go-libs/logging"
"github.com/formancehq/stack/libs/go-libs/metadata"
Expand Down Expand Up @@ -50,8 +49,7 @@ type Worker struct {
monitor monitor.Monitor
metricsRegistry metrics.PerLedgerMetricsRegistry

lastProcessedLogID *uint64
ledgerName string
ledgerName string
}

func (w *Worker) Ready() chan struct{} {
Expand All @@ -62,13 +60,6 @@ func (w *Worker) Run(ctx context.Context) error {
logging.FromContext(ctx).Debugf("Start CQRS worker")

w.ctx = ctx
lastProcessedLogID, err := w.store.GetNextLogID(w.ctx)
if err != nil && !storage.IsNotFoundError(err) {
return errorsutil.NewError(ErrStorage,
errors.Wrap(err, "reading last log"))
}

w.lastProcessedLogID = &lastProcessedLogID

close(w.readyChan)

Expand Down Expand Up @@ -140,6 +131,7 @@ func (w *Worker) writeLoop(ctx context.Context) {
return
}

logging.FromContext(ctx).Infof("Ingested logs until: %d", logs[len(logs)-1].ID)
closeLogs(modelsHolder)
}
}
Expand Down Expand Up @@ -170,11 +162,6 @@ l:

// At this level, the job is writting some models, just accumulate models in a buffer
case wl := <-w.writeChannel:
if w.lastProcessedLogID != nil && wl.Log.ID <= *w.lastProcessedLogID {
close(wl.Ingested)
continue
}

w.pending = append(w.pending, wl)
w.metricsRegistry.QueryPendingMessages().Add(w.ctx, int64(len(w.pending)))

Expand Down Expand Up @@ -249,48 +236,6 @@ func (w *Worker) Stop(ctx context.Context) error {
return nil
}

func initLedger(
ctx context.Context,
ledgerName string,
store Store,
monitor monitor.Monitor,
metricsRegistry metrics.PerLedgerMetricsRegistry,
) (uint64, error) {
if !store.IsInitialized() {
return 0, nil
}

lastReadLogID, err := store.GetNextLogID(ctx)
if err != nil && !storage.IsNotFoundError(err) {
return 0, errorsutil.NewError(ErrStorage,
errors.Wrap(err, "reading last log"))
}

logs, err := store.ReadLogsStartingFromID(ctx, lastReadLogID)
if err != nil {
return 0, errorsutil.NewError(ErrStorage,
errors.Wrap(err, "reading logs since last ID"))
}

if len(logs) == 0 {
return 0, nil
}

if err := processLogs(ctx, ledgerName, store, monitor, logs...); err != nil {
return 0, errors.Wrap(err, "processing logs")
}

metricsRegistry.QueryProcessedLogs().Add(ctx, int64(len(logs)))

if err := store.UpdateNextLogID(ctx, logs[len(logs)-1].ID+1); err != nil {
return 0, errorsutil.NewError(ErrStorage,
errors.Wrap(err, "updating last read log"))
}
lastProcessedLogID := logs[len(logs)-1].ID

return lastProcessedLogID, nil
}

func processLogs(
ctx context.Context,
ledgerName string,
Expand Down
1 change: 1 addition & 0 deletions components/ledger/pkg/storage/sqlstorage/ledger/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ func (s *Store) RunInTransaction(ctx context.Context, f func(ctx context.Context
newStore.isInitialized = s.isInitialized

defer func() {
// TODO(gfyrag): add recover() to rollback txs
if err != nil {
_ = tx.Rollback()
}
Expand Down
19 changes: 15 additions & 4 deletions tests/integration/internal/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,26 +6,37 @@ import (
"net/http/httputil"
"net/url"
"strings"

"github.com/onsi/ginkgo/v2"
)

type proxy struct {
reverse *httputil.ReverseProxy
url *url.URL
}

var (
gatewayServer *httptest.Server
proxies = map[string]*httputil.ReverseProxy{}
proxies = map[string]proxy{}
)

func registerService(s string, url *url.URL) {
proxies[s] = httputil.NewSingleHostReverseProxy(url)
proxies[s] = proxy{
reverse: httputil.NewSingleHostReverseProxy(url),
url: url,
}
}

func startFakeGateway() {
gatewayServer = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
for name, proxy := range proxies {
if strings.HasPrefix(r.URL.Path, "/api/"+name) {
http.StripPrefix("/api/"+name, proxy).ServeHTTP(w, r)
ginkgo.GinkgoWriter.Printf("Proxying %s: %s\r\n", name, proxy.url.String())
http.StripPrefix("/api/"+name, proxy.reverse).ServeHTTP(w, r)
return
}
}
w.WriteHeader(http.StatusNotFound)
w.WriteHeader(http.StatusServiceUnavailable)
}))
}

Expand Down
19 changes: 11 additions & 8 deletions tests/integration/suite/ledger-set-metadata-on-account.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package suite

import (
"reflect"
"time"

"github.com/formancehq/formance-sdk-go"
Expand Down Expand Up @@ -34,7 +35,7 @@ var _ = Given("some empty environment", func() {
AfterEach(func() {
cancelSubscription()
})
It("should eventually be available on api", func() {
It("should be available on api", func() {
accountResponse, _, err := Client().AccountsApi.
GetAccount(TestContext(), "default", "foo").
Execute()
Expand All @@ -51,21 +52,23 @@ var _ = Given("some empty environment", func() {
Expect(events.Check(msg.Data, "ledger", bus.EventTypeSavedMetadata)).Should(Succeed())
})
It("should pop an account with the correct metadata on search service", func() {
Eventually(func(g Gomega) bool {
Eventually(func() bool {
res, _, err := Client().SearchApi.Search(TestContext()).Query(formance.Query{
Target: formance.PtrString("ACCOUNT"),
}).Execute()
g.Expect(err).ToNot(HaveOccurred())
g.Expect(res.Cursor.Data).To(HaveLen(1))
g.Expect(res.Cursor.Data[0]).To(Equal(map[string]any{
if err != nil {
return false
}
if len(res.Cursor.Data) != 1 {
return false
}
return reflect.DeepEqual(res.Cursor.Data[0], map[string]any{
"ledger": "default",
"metadata": map[string]any{
"clientType": "gold",
},
"address": "foo",
}))

return true
})
}).Should(BeTrue())
})
})
Expand Down

0 comments on commit 2d126dc

Please sign in to comment.