From f12ab5d4f5c75c4a0a5b9c5a61d34e24e4b63ab3 Mon Sep 17 00:00:00 2001 From: Geoffrey Ragot Date: Sat, 12 Oct 2024 16:58:09 +0200 Subject: [PATCH] feat: remove global tracer usage --- internal/api/common/middleware_resolver.go | 8 ++-- .../api/common/middleware_resolver_test.go | 3 +- internal/api/module.go | 3 ++ internal/api/router.go | 41 +++++++++++++++- internal/api/v1/routes.go | 27 ++++++++++- internal/api/v2/controllers_bulk.go | 16 +++---- internal/api/v2/routes.go | 25 +++++++++- .../controller/ledger/controller_default.go | 47 ++++++++++++------- ...ontroller_with_too_many_client_handling.go | 32 +++++++++---- ...ller_with_too_many_client_handling_test.go | 5 +- .../ledger/controller_with_traces.go | 45 +++++++++--------- internal/controller/system/controller.go | 46 +++++++++++------- internal/controller/system/module.go | 3 ++ internal/storage/bucket/bucket.go | 5 +- internal/storage/bucket/bucket_test.go | 3 +- internal/storage/bucket/migrations.go | 7 ++- internal/storage/driver/driver.go | 38 +++++++++++---- internal/storage/driver/module.go | 12 ++++- internal/storage/ledger/accounts.go | 6 +++ internal/storage/ledger/balances.go | 1 + internal/storage/ledger/logs.go | 3 ++ internal/storage/ledger/main_test.go | 5 +- internal/storage/ledger/migrations.go | 7 ++- internal/storage/ledger/moves.go | 1 + internal/storage/ledger/store.go | 38 +++++++++++---- internal/storage/ledger/transactions.go | 44 ++++++++++------- internal/storage/ledger/volumes.go | 2 + internal/tracing/{tracer.go => tracing.go} | 28 +++++++---- internal/tracing/utils.go | 20 -------- 29 files changed, 362 insertions(+), 159 deletions(-) rename internal/tracing/{tracer.go => tracing.go} (60%) delete mode 100644 internal/tracing/utils.go diff --git a/internal/api/common/middleware_resolver.go b/internal/api/common/middleware_resolver.go index e713844b9..57a4c9600 100644 --- a/internal/api/common/middleware_resolver.go +++ b/internal/api/common/middleware_resolver.go @@ -1,14 +1,13 @@ package common import ( + "go.opentelemetry.io/otel/trace" "net/http" "strings" - "github.com/formancehq/ledger/internal/controller/system" - "github.com/formancehq/ledger/internal/tracing" - "github.com/formancehq/go-libs/api" "github.com/formancehq/go-libs/platform/postgres" + "github.com/formancehq/ledger/internal/controller/system" "errors" ) @@ -20,6 +19,7 @@ const ( func LedgerMiddleware( backend system.Controller, resolver func(*http.Request) string, + tracer trace.Tracer, excludePathFromSchemaCheck ...string, ) func(handler http.Handler) http.Handler { return func(handler http.Handler) http.Handler { @@ -30,7 +30,7 @@ func LedgerMiddleware( return } - ctx, span := tracing.Start(r.Context(), "OpenLedger") + ctx, span := tracer.Start(r.Context(), "OpenLedger") defer span.End() var err error diff --git a/internal/api/common/middleware_resolver_test.go b/internal/api/common/middleware_resolver_test.go index 1f63e4942..2e708652a 100644 --- a/internal/api/common/middleware_resolver_test.go +++ b/internal/api/common/middleware_resolver_test.go @@ -2,6 +2,7 @@ package common import ( "encoding/json" + nooptracer "go.opentelemetry.io/otel/trace/noop" "net/http" "net/http/httptest" "testing" @@ -89,7 +90,7 @@ func TestResolverMiddleware(t *testing.T) { m := LedgerMiddleware(systemController, func(*http.Request) string { return ledger - }) + }, nooptracer.Tracer{}) h := m(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { w.WriteHeader(http.StatusNoContent) })) diff --git a/internal/api/module.go b/internal/api/module.go index 1c4a68d49..265a9fd45 100644 --- a/internal/api/module.go +++ b/internal/api/module.go @@ -5,6 +5,7 @@ import ( "github.com/formancehq/go-libs/logging" "github.com/formancehq/ledger/internal/controller/system" "github.com/go-chi/chi/v5" + "go.opentelemetry.io/otel/trace" "github.com/formancehq/go-libs/auth" "github.com/formancehq/go-libs/health" @@ -23,6 +24,7 @@ func Module(cfg Config) fx.Option { healthController *health.HealthController, authenticator auth.Authenticator, logger logging.Logger, + tracer trace.TracerProvider, ) chi.Router { return NewRouter( backend, @@ -31,6 +33,7 @@ func Module(cfg Config) fx.Option { logger, "develop", cfg.Debug, + WithTracer(tracer.Tracer("api")), ) }), health.Module(), diff --git a/internal/api/router.go b/internal/api/router.go index 439fbe623..ee4f44ca7 100644 --- a/internal/api/router.go +++ b/internal/api/router.go @@ -1,6 +1,8 @@ package api import ( + "go.opentelemetry.io/otel/trace" + nooptracer "go.opentelemetry.io/otel/trace/noop" "net/http" "github.com/formancehq/ledger/internal/controller/system" @@ -25,7 +27,14 @@ func NewRouter( logger logging.Logger, version string, debug bool, + opts ...RouterOption, ) chi.Router { + + routerOptions := routerOptions{} + for _, opt := range append(defaultRouterOptions, opts...) { + opt(&routerOptions) + } + mux := chi.NewRouter() mux.Use( middleware.Recoverer, @@ -47,12 +56,40 @@ func NewRouter( ) mux.Get("/_healthcheck", healthController.Check) - v2Router := v2.NewRouter(systemController, authenticator, version, debug) + v2Router := v2.NewRouter( + systemController, + authenticator, + version, + debug, + v2.WithTracer(routerOptions.tracer), + ) mux.Handle("/v2*", http.StripPrefix("/v2", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { chi.RouteContext(r.Context()).Reset() v2Router.ServeHTTP(w, r) }))) - mux.Handle("/*", v1.NewRouter(systemController, authenticator, version, debug)) + mux.Handle("/*", v1.NewRouter( + systemController, + authenticator, + version, + debug, + v1.WithTracer(routerOptions.tracer), + )) return mux } + +type routerOptions struct { + tracer trace.Tracer +} + +type RouterOption func(ro *routerOptions) + +func WithTracer(tracer trace.Tracer) RouterOption { + return func(ro *routerOptions) { + ro.tracer = tracer + } +} + +var defaultRouterOptions = []RouterOption{ + WithTracer(nooptracer.Tracer{}), +} \ No newline at end of file diff --git a/internal/api/v1/routes.go b/internal/api/v1/routes.go index d2c57c45e..d9499281a 100644 --- a/internal/api/v1/routes.go +++ b/internal/api/v1/routes.go @@ -1,6 +1,8 @@ package v1 import ( + "go.opentelemetry.io/otel/trace" + nooptracer "go.opentelemetry.io/otel/trace/noop" "net/http" "github.com/formancehq/ledger/internal/controller/system" @@ -20,7 +22,14 @@ func NewRouter( authenticator auth.Authenticator, version string, debug bool, + opts ...RouterOption, ) chi.Router { + + routerOptions := &routerOptions{} + for _, opt := range append(defaultRouterOptions, opts...) { + opt(routerOptions) + } + router := chi.NewMux() router.Get("/_info", getInfo(systemController, version)) @@ -39,7 +48,7 @@ func NewRouter( router.Use(autoCreateMiddleware(systemController)) router.Use(common.LedgerMiddleware(systemController, func(r *http.Request) string { return chi.URLParam(r, "ledger") - }, "/_info")) + }, routerOptions.tracer, "/_info")) // LedgerController router.Get("/_info", getLedgerInfo) @@ -74,3 +83,19 @@ func NewRouter( return router } + +type routerOptions struct { + tracer trace.Tracer +} + +type RouterOption func(ro *routerOptions) + +func WithTracer(tracer trace.Tracer) RouterOption { + return func(ro *routerOptions) { + ro.tracer = tracer + } +} + +var defaultRouterOptions = []RouterOption{ + WithTracer(nooptracer.Tracer{}), +} \ No newline at end of file diff --git a/internal/api/v2/controllers_bulk.go b/internal/api/v2/controllers_bulk.go index b98f1d536..32c932019 100644 --- a/internal/api/v2/controllers_bulk.go +++ b/internal/api/v2/controllers_bulk.go @@ -7,13 +7,11 @@ import ( "net/http" "errors" + "github.com/formancehq/go-libs/api" "github.com/formancehq/go-libs/metadata" ledger "github.com/formancehq/ledger/internal" - ledgercontroller "github.com/formancehq/ledger/internal/controller/ledger" - "github.com/formancehq/ledger/internal/tracing" - - "github.com/formancehq/go-libs/api" "github.com/formancehq/ledger/internal/api/common" + ledgercontroller "github.com/formancehq/ledger/internal/controller/ledger" ) func bulkHandler(w http.ResponseWriter, r *http.Request) { @@ -60,10 +58,12 @@ type Result struct { ResponseType string `json:"responseType"` // Added for sdk generation (discriminator in oneOf) } -func ProcessBulk(ctx context.Context, l ledgercontroller.Controller, bulk Bulk, continueOnFailure bool) ([]Result, bool, error) { - - ctx, span := tracing.Start(ctx, "Bulk") - defer span.End() +func ProcessBulk( + ctx context.Context, + l ledgercontroller.Controller, + bulk Bulk, + continueOnFailure bool, +) ([]Result, bool, error) { ret := make([]Result, 0, len(bulk)) diff --git a/internal/api/v2/routes.go b/internal/api/v2/routes.go index df47eb0a7..934cff74d 100644 --- a/internal/api/v2/routes.go +++ b/internal/api/v2/routes.go @@ -1,6 +1,7 @@ package v2 import ( + nooptracer "go.opentelemetry.io/otel/trace/noop" "net/http" "github.com/formancehq/ledger/internal/controller/system" @@ -23,7 +24,13 @@ func NewRouter( authenticator auth.Authenticator, version string, debug bool, + opts ...RouterOption, ) chi.Router { + routerOptions := routerOptions{} + for _, opt := range append(defaultRouterOptions, opts...) { + opt(&routerOptions) + } + router := chi.NewMux() router.Get("/_info", getInfo(version)) @@ -50,7 +57,7 @@ func NewRouter( router.With(common.LedgerMiddleware(systemController, func(r *http.Request) string { return chi.URLParam(r, "ledger") - }, "/_info")).Group(func(router chi.Router) { + }, routerOptions.tracer, "/_info")).Group(func(router chi.Router) { router.Post("/_bulk", bulkHandler) // LedgerController @@ -87,3 +94,19 @@ func NewRouter( return router } + +type routerOptions struct { + tracer trace.Tracer +} + +type RouterOption func(ro *routerOptions) + +func WithTracer(tracer trace.Tracer) RouterOption { + return func(ro *routerOptions) { + ro.tracer = tracer + } +} + +var defaultRouterOptions = []RouterOption{ + WithTracer(nooptracer.Tracer{}), +} \ No newline at end of file diff --git a/internal/controller/ledger/controller_default.go b/internal/controller/ledger/controller_default.go index 6429b062e..56d07c306 100644 --- a/internal/controller/ledger/controller_default.go +++ b/internal/controller/ledger/controller_default.go @@ -5,7 +5,9 @@ import ( "database/sql" "fmt" "go.opentelemetry.io/otel/metric" - "go.opentelemetry.io/otel/metric/noop" + noopmetrics "go.opentelemetry.io/otel/metric/noop" + "go.opentelemetry.io/otel/trace" + nooptracer "go.opentelemetry.io/otel/trace/noop" "math/big" "reflect" @@ -27,23 +29,12 @@ import ( ledger "github.com/formancehq/ledger/internal" ) -type DefaultControllerOption func(controller *DefaultController) - -var defaultOptions []DefaultControllerOption = []DefaultControllerOption{ - WithMeter(noop.Meter{}), -} - -func WithMeter(meter metric.Meter) DefaultControllerOption { - return func(controller *DefaultController) { - controller.meter = meter - } -} - type DefaultController struct { store Store machineFactory MachineFactory ledger ledger.Ledger + tracer trace.Tracer meter metric.Meter executeMachineHistogram metric.Int64Histogram } @@ -246,9 +237,15 @@ func (ctrl *DefaultController) CreateTransaction(ctx context.Context, parameters } output, err := forgeLog(ctx, ctrl.store, parameters, func(ctx context.Context, sqlTX TX, input RunScript) (*ledger.CreatedTransaction, error) { - result, err := tracing.TraceWithMetric(ctx, "ExecuteMachine", ctrl.executeMachineHistogram, func(ctx context.Context) (*MachineResult, error) { - return m.Execute(ctx, sqlTX, input.Vars) - }) + result, err := tracing.TraceWithMetric( + ctx, + "ExecuteMachine", + ctrl.tracer, + ctrl.executeMachineHistogram, + func(ctx context.Context) (*MachineResult, error) { + return m.Execute(ctx, sqlTX, input.Vars) + }, + ) if err != nil { return nil, fmt.Errorf("failed to execute program: %w", err) } @@ -433,3 +430,21 @@ func (ctrl *DefaultController) DeleteAccountMetadata(ctx context.Context, parame } var _ Controller = (*DefaultController)(nil) + +type DefaultControllerOption func(controller *DefaultController) + +var defaultOptions = []DefaultControllerOption{ + WithMeter(noopmetrics.Meter{}), + WithTracer(nooptracer.Tracer{}), +} + +func WithMeter(meter metric.Meter) DefaultControllerOption { + return func(controller *DefaultController) { + controller.meter = meter + } +} +func WithTracer(tracer trace.Tracer) DefaultControllerOption { + return func(controller *DefaultController) { + controller.tracer = tracer + } +} \ No newline at end of file diff --git a/internal/controller/ledger/controller_with_too_many_client_handling.go b/internal/controller/ledger/controller_with_too_many_client_handling.go index aa963f4c2..3f676c9c1 100644 --- a/internal/controller/ledger/controller_with_too_many_client_handling.go +++ b/internal/controller/ledger/controller_with_too_many_client_handling.go @@ -5,8 +5,8 @@ import ( "errors" "github.com/formancehq/go-libs/platform/postgres" ledger "github.com/formancehq/ledger/internal" - "github.com/formancehq/ledger/internal/tracing" "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" "time" ) @@ -23,46 +23,52 @@ func (fn DelayCalculatorFn) Next(iteration int) time.Duration { type ControllerWithTooManyClientHandling struct { Controller delayCalculator DelayCalculator + tracer trace.Tracer } -func NewControllerWithTooManyClientHandling(underlying Controller, delayCalculator DelayCalculator) *ControllerWithTooManyClientHandling { +func NewControllerWithTooManyClientHandling( + underlying Controller, + tracer trace.Tracer, + delayCalculator DelayCalculator, +) *ControllerWithTooManyClientHandling { return &ControllerWithTooManyClientHandling{ Controller: underlying, delayCalculator: delayCalculator, + tracer: tracer, } } func (ctrl *ControllerWithTooManyClientHandling) CreateTransaction(ctx context.Context, parameters Parameters[RunScript]) (*ledger.CreatedTransaction, error) { - return handleRetry(ctx, ctrl.delayCalculator, parameters, ctrl.Controller.CreateTransaction) + return handleRetry(ctx, ctrl.tracer, ctrl.delayCalculator, parameters, ctrl.Controller.CreateTransaction) } func (ctrl *ControllerWithTooManyClientHandling) RevertTransaction(ctx context.Context, parameters Parameters[RevertTransaction]) (*ledger.RevertedTransaction, error) { - return handleRetry(ctx, ctrl.delayCalculator, parameters, ctrl.Controller.RevertTransaction) + return handleRetry(ctx, ctrl.tracer, ctrl.delayCalculator, parameters, ctrl.Controller.RevertTransaction) } func (ctrl *ControllerWithTooManyClientHandling) SaveTransactionMetadata(ctx context.Context, parameters Parameters[SaveTransactionMetadata]) error { - _, err := handleRetry(ctx, ctrl.delayCalculator, parameters, func(ctx context.Context, parameters Parameters[SaveTransactionMetadata]) (*struct{}, error) { + _, err := handleRetry(ctx, ctrl.tracer, ctrl.delayCalculator, parameters, func(ctx context.Context, parameters Parameters[SaveTransactionMetadata]) (*struct{}, error) { return nil, ctrl.Controller.SaveTransactionMetadata(ctx, parameters) }) return err } func (ctrl *ControllerWithTooManyClientHandling) SaveAccountMetadata(ctx context.Context, parameters Parameters[SaveAccountMetadata]) error { - _, err := handleRetry(ctx, ctrl.delayCalculator, parameters, func(ctx context.Context, parameters Parameters[SaveAccountMetadata]) (*struct{}, error) { + _, err := handleRetry(ctx, ctrl.tracer, ctrl.delayCalculator, parameters, func(ctx context.Context, parameters Parameters[SaveAccountMetadata]) (*struct{}, error) { return nil, ctrl.Controller.SaveAccountMetadata(ctx, parameters) }) return err } func (ctrl *ControllerWithTooManyClientHandling) DeleteTransactionMetadata(ctx context.Context, parameters Parameters[DeleteTransactionMetadata]) error { - _, err := handleRetry(ctx, ctrl.delayCalculator, parameters, func(ctx context.Context, parameters Parameters[DeleteTransactionMetadata]) (*struct{}, error) { + _, err := handleRetry(ctx, ctrl.tracer, ctrl.delayCalculator, parameters, func(ctx context.Context, parameters Parameters[DeleteTransactionMetadata]) (*struct{}, error) { return nil, ctrl.Controller.DeleteTransactionMetadata(ctx, parameters) }) return err } func (ctrl *ControllerWithTooManyClientHandling) DeleteAccountMetadata(ctx context.Context, parameters Parameters[DeleteAccountMetadata]) error { - _, err := handleRetry(ctx, ctrl.delayCalculator, parameters, func(ctx context.Context, parameters Parameters[DeleteAccountMetadata]) (*struct{}, error) { + _, err := handleRetry(ctx, ctrl.tracer, ctrl.delayCalculator, parameters, func(ctx context.Context, parameters Parameters[DeleteAccountMetadata]) (*struct{}, error) { return nil, ctrl.Controller.DeleteAccountMetadata(ctx, parameters) }) return err @@ -70,9 +76,15 @@ func (ctrl *ControllerWithTooManyClientHandling) DeleteAccountMetadata(ctx conte var _ Controller = (*ControllerWithTooManyClientHandling)(nil) -func handleRetry[INPUT, OUTPUT any](ctx context.Context, delayCalculator DelayCalculator, parameters Parameters[INPUT], fn func(ctx context.Context, parameters Parameters[INPUT]) (*OUTPUT, error)) (*OUTPUT, error) { +func handleRetry[INPUT, OUTPUT any]( + ctx context.Context, + tracer trace.Tracer, + delayCalculator DelayCalculator, + parameters Parameters[INPUT], + fn func(ctx context.Context, parameters Parameters[INPUT]) (*OUTPUT, error), +) (*OUTPUT, error) { - ctx, span := tracing.Start(ctx, "Retrier") + ctx, span := tracer.Start(ctx, "Retrier") defer span.End() count := 0 diff --git a/internal/controller/ledger/controller_with_too_many_client_handling_test.go b/internal/controller/ledger/controller_with_too_many_client_handling_test.go index 07dc6661b..ea8c0b228 100644 --- a/internal/controller/ledger/controller_with_too_many_client_handling_test.go +++ b/internal/controller/ledger/controller_with_too_many_client_handling_test.go @@ -7,6 +7,7 @@ import ( "github.com/formancehq/go-libs/time" ledger "github.com/formancehq/ledger/internal" "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel/trace/noop" "go.uber.org/mock/gomock" "testing" ) @@ -43,7 +44,7 @@ func TestNewControllerWithTooManyClientHandling(t *testing.T) { Next(1). Return(10 * time.Millisecond) - ledgerController := NewControllerWithTooManyClientHandling(underlyingLedgerController, delayCalculator) + ledgerController := NewControllerWithTooManyClientHandling(underlyingLedgerController, noop.Tracer{}, delayCalculator) _, err := ledgerController.CreateTransaction(ctx, parameters) require.NoError(t, err) }) @@ -71,7 +72,7 @@ func TestNewControllerWithTooManyClientHandling(t *testing.T) { Next(1). Return(time.Duration(0)) - ledgerController := NewControllerWithTooManyClientHandling(underlyingLedgerController, delayCalculator) + ledgerController := NewControllerWithTooManyClientHandling(underlyingLedgerController, noop.Tracer{}, delayCalculator) _, err := ledgerController.CreateTransaction(ctx, parameters) require.Error(t, err) require.True(t, errors.Is(err, postgres.ErrTooManyClient{})) diff --git a/internal/controller/ledger/controller_with_traces.go b/internal/controller/ledger/controller_with_traces.go index f809cba39..d8ef4bc56 100644 --- a/internal/controller/ledger/controller_with_traces.go +++ b/internal/controller/ledger/controller_with_traces.go @@ -4,18 +4,21 @@ import ( "context" "github.com/formancehq/go-libs/migrations" "github.com/formancehq/ledger/internal/tracing" + "go.opentelemetry.io/otel/trace" "github.com/formancehq/go-libs/bun/bunpaginate" ledger "github.com/formancehq/ledger/internal" ) type ControllerWithTraces struct { - underlying Controller + underlying Controller + tracer trace.Tracer } -func NewControllerWithTraces(underlying Controller) *ControllerWithTraces { +func NewControllerWithTraces(underlying Controller, tracer trace.Tracer) *ControllerWithTraces { return &ControllerWithTraces{ underlying: underlying, + tracer: tracer, } } @@ -24,115 +27,115 @@ func (ctrl *ControllerWithTraces) GetMigrationsInfo(ctx context.Context) ([]migr } func (ctrl *ControllerWithTraces) ListTransactions(ctx context.Context, q ListTransactionsQuery) (*bunpaginate.Cursor[ledger.Transaction], error) { - return tracing.Trace(ctx, "ListTransactions", func(ctx context.Context) (*bunpaginate.Cursor[ledger.Transaction], error) { + return tracing.Trace(ctx, ctrl.tracer, "ListTransactions", func(ctx context.Context) (*bunpaginate.Cursor[ledger.Transaction], error) { return ctrl.underlying.ListTransactions(ctx, q) }) } func (ctrl *ControllerWithTraces) CountTransactions(ctx context.Context, q ListTransactionsQuery) (int, error) { - return tracing.Trace(ctx, "CountTransactions", func(ctx context.Context) (int, error) { + return tracing.Trace(ctx, ctrl.tracer, "CountTransactions", func(ctx context.Context) (int, error) { return ctrl.underlying.CountTransactions(ctx, q) }) } func (ctrl *ControllerWithTraces) GetTransaction(ctx context.Context, query GetTransactionQuery) (*ledger.Transaction, error) { - return tracing.Trace(ctx, "GetTransaction", func(ctx context.Context) (*ledger.Transaction, error) { + return tracing.Trace(ctx, ctrl.tracer, "GetTransaction", func(ctx context.Context) (*ledger.Transaction, error) { return ctrl.underlying.GetTransaction(ctx, query) }) } func (ctrl *ControllerWithTraces) CountAccounts(ctx context.Context, a ListAccountsQuery) (int, error) { - return tracing.Trace(ctx, "CountAccounts", func(ctx context.Context) (int, error) { + return tracing.Trace(ctx, ctrl.tracer, "CountAccounts", func(ctx context.Context) (int, error) { return ctrl.underlying.CountAccounts(ctx, a) }) } func (ctrl *ControllerWithTraces) ListAccounts(ctx context.Context, a ListAccountsQuery) (*bunpaginate.Cursor[ledger.Account], error) { - return tracing.Trace(ctx, "ListAccounts", func(ctx context.Context) (*bunpaginate.Cursor[ledger.Account], error) { + return tracing.Trace(ctx, ctrl.tracer, "ListAccounts", func(ctx context.Context) (*bunpaginate.Cursor[ledger.Account], error) { return ctrl.underlying.ListAccounts(ctx, a) }) } func (ctrl *ControllerWithTraces) GetAccount(ctx context.Context, q GetAccountQuery) (*ledger.Account, error) { - return tracing.Trace(ctx, "GetAccount", func(ctx context.Context) (*ledger.Account, error) { + return tracing.Trace(ctx, ctrl.tracer, "GetAccount", func(ctx context.Context) (*ledger.Account, error) { return ctrl.underlying.GetAccount(ctx, q) }) } func (ctrl *ControllerWithTraces) GetAggregatedBalances(ctx context.Context, q GetAggregatedBalanceQuery) (ledger.BalancesByAssets, error) { - return tracing.Trace(ctx, "GetAggregatedBalances", func(ctx context.Context) (ledger.BalancesByAssets, error) { + return tracing.Trace(ctx, ctrl.tracer, "GetAggregatedBalances", func(ctx context.Context) (ledger.BalancesByAssets, error) { return ctrl.underlying.GetAggregatedBalances(ctx, q) }) } func (ctrl *ControllerWithTraces) ListLogs(ctx context.Context, q GetLogsQuery) (*bunpaginate.Cursor[ledger.Log], error) { - return tracing.Trace(ctx, "ListLogs", func(ctx context.Context) (*bunpaginate.Cursor[ledger.Log], error) { + return tracing.Trace(ctx, ctrl.tracer, "ListLogs", func(ctx context.Context) (*bunpaginate.Cursor[ledger.Log], error) { return ctrl.underlying.ListLogs(ctx, q) }) } func (ctrl *ControllerWithTraces) Import(ctx context.Context, stream chan ledger.Log) error { - return tracing.SkipResult(tracing.Trace(ctx, "Import", tracing.NoResult(func(ctx context.Context) error { + return tracing.SkipResult(tracing.Trace(ctx, ctrl.tracer, "Import", tracing.NoResult(func(ctx context.Context) error { return ctrl.underlying.Import(ctx, stream) }))) } func (ctrl *ControllerWithTraces) Export(ctx context.Context, w ExportWriter) error { - return tracing.SkipResult(tracing.Trace(ctx, "Export", tracing.NoResult(func(ctx context.Context) error { + return tracing.SkipResult(tracing.Trace(ctx, ctrl.tracer, "Export", tracing.NoResult(func(ctx context.Context) error { return ctrl.underlying.Export(ctx, w) }))) } func (ctrl *ControllerWithTraces) IsDatabaseUpToDate(ctx context.Context) (bool, error) { - return tracing.Trace(ctx, "IsDatabaseUpToDate", func(ctx context.Context) (bool, error) { + return tracing.Trace(ctx, ctrl.tracer, "IsDatabaseUpToDate", func(ctx context.Context) (bool, error) { return ctrl.underlying.IsDatabaseUpToDate(ctx) }) } func (ctrl *ControllerWithTraces) GetVolumesWithBalances(ctx context.Context, q GetVolumesWithBalancesQuery) (*bunpaginate.Cursor[ledger.VolumesWithBalanceByAssetByAccount], error) { - return tracing.Trace(ctx, "GetVolumesWithBalances", func(ctx context.Context) (*bunpaginate.Cursor[ledger.VolumesWithBalanceByAssetByAccount], error) { + return tracing.Trace(ctx, ctrl.tracer, "GetVolumesWithBalances", func(ctx context.Context) (*bunpaginate.Cursor[ledger.VolumesWithBalanceByAssetByAccount], error) { return ctrl.underlying.GetVolumesWithBalances(ctx, q) }) } func (ctrl *ControllerWithTraces) CreateTransaction(ctx context.Context, parameters Parameters[RunScript]) (*ledger.CreatedTransaction, error) { - return tracing.Trace(ctx, "CreateTransaction", func(ctx context.Context) (*ledger.CreatedTransaction, error) { + return tracing.Trace(ctx, ctrl.tracer, "CreateTransaction", func(ctx context.Context) (*ledger.CreatedTransaction, error) { return ctrl.underlying.CreateTransaction(ctx, parameters) }) } func (ctrl *ControllerWithTraces) RevertTransaction(ctx context.Context, parameters Parameters[RevertTransaction]) (*ledger.RevertedTransaction, error) { - return tracing.Trace(ctx, "RevertTransaction", func(ctx context.Context) (*ledger.RevertedTransaction, error) { + return tracing.Trace(ctx, ctrl.tracer, "RevertTransaction", func(ctx context.Context) (*ledger.RevertedTransaction, error) { return ctrl.underlying.RevertTransaction(ctx, parameters) }) } func (ctrl *ControllerWithTraces) SaveTransactionMetadata(ctx context.Context, parameters Parameters[SaveTransactionMetadata]) error { - return tracing.SkipResult(tracing.Trace(ctx, "SaveTransactionMetadata", tracing.NoResult(func(ctx context.Context) error { + return tracing.SkipResult(tracing.Trace(ctx, ctrl.tracer, "SaveTransactionMetadata", tracing.NoResult(func(ctx context.Context) error { return ctrl.underlying.SaveTransactionMetadata(ctx, parameters) }))) } func (ctrl *ControllerWithTraces) SaveAccountMetadata(ctx context.Context, parameters Parameters[SaveAccountMetadata]) error { - return tracing.SkipResult(tracing.Trace(ctx, "SaveAccountMetadata", tracing.NoResult(func(ctx context.Context) error { + return tracing.SkipResult(tracing.Trace(ctx, ctrl.tracer, "SaveAccountMetadata", tracing.NoResult(func(ctx context.Context) error { return ctrl.underlying.SaveAccountMetadata(ctx, parameters) }))) } func (ctrl *ControllerWithTraces) DeleteTransactionMetadata(ctx context.Context, parameters Parameters[DeleteTransactionMetadata]) error { - return tracing.SkipResult(tracing.Trace(ctx, "DeleteTransactionMetadata", tracing.NoResult(func(ctx context.Context) error { + return tracing.SkipResult(tracing.Trace(ctx, ctrl.tracer, "DeleteTransactionMetadata", tracing.NoResult(func(ctx context.Context) error { return ctrl.underlying.DeleteTransactionMetadata(ctx, parameters) }))) } func (ctrl *ControllerWithTraces) DeleteAccountMetadata(ctx context.Context, parameters Parameters[DeleteAccountMetadata]) error { - return tracing.SkipResult(tracing.Trace(ctx, "DeleteAccountMetadata", tracing.NoResult(func(ctx context.Context) error { + return tracing.SkipResult(tracing.Trace(ctx, ctrl.tracer, "DeleteAccountMetadata", tracing.NoResult(func(ctx context.Context) error { return ctrl.underlying.DeleteAccountMetadata(ctx, parameters) }))) } func (ctrl *ControllerWithTraces) GetStats(ctx context.Context) (Stats, error) { - return tracing.Trace(ctx, "GetStats", func(ctx context.Context) (Stats, error) { + return tracing.Trace(ctx, ctrl.tracer, "GetStats", func(ctx context.Context) (Stats, error) { return ctrl.underlying.GetStats(ctx) }) } diff --git a/internal/controller/system/controller.go b/internal/controller/system/controller.go index 734f497c2..f26d12c10 100644 --- a/internal/controller/system/controller.go +++ b/internal/controller/system/controller.go @@ -3,7 +3,9 @@ package system import ( "context" "go.opentelemetry.io/otel/metric" - "go.opentelemetry.io/otel/metric/noop" + noopmetrics "go.opentelemetry.io/otel/metric/noop" + "go.opentelemetry.io/otel/trace" + nooptracer "go.opentelemetry.io/otel/trace/noop" "time" "github.com/formancehq/ledger/internal/tracing" @@ -33,11 +35,12 @@ type DefaultController struct { registry *ledgercontroller.StateRegistry databaseRetryConfiguration DatabaseRetryConfiguration + tracer trace.Tracer meter metric.Meter } func (ctrl *DefaultController) GetLedgerController(ctx context.Context, name string) (ledgercontroller.Controller, error) { - return tracing.Trace(ctx, "GetLedgerController", func(ctx context.Context) (ledgercontroller.Controller, error) { + return tracing.Trace(ctx, ctrl.tracer, "GetLedgerController", func(ctx context.Context) (ledgercontroller.Controller, error) { store, l, err := ctrl.store.OpenLedger(ctx, name) if err != nil { return nil, err @@ -51,19 +54,23 @@ func (ctrl *DefaultController) GetLedgerController(ctx context.Context, name str ) // Add too many client error handling - ledgerController = ledgercontroller.NewControllerWithTooManyClientHandling(ledgerController, ledgercontroller.DelayCalculatorFn(func(i int) time.Duration { - if i < ctrl.databaseRetryConfiguration.MaxRetry { - return time.Duration(i+1)*ctrl.databaseRetryConfiguration.Delay - } - - return 0 - })) + ledgerController = ledgercontroller.NewControllerWithTooManyClientHandling( + ledgerController, + ctrl.tracer, + ledgercontroller.DelayCalculatorFn(func(i int) time.Duration { + if i < ctrl.databaseRetryConfiguration.MaxRetry { + return time.Duration(i+1)*ctrl.databaseRetryConfiguration.Delay + } + + return 0 + }), + ) // Add cache regarding database state ledgerController = ledgercontroller.NewControllerWithCache(*l, ledgerController, ctrl.registry) // Add traces - ledgerController = ledgercontroller.NewControllerWithTraces(ledgerController) + ledgerController = ledgercontroller.NewControllerWithTraces(ledgerController, ctrl.tracer) // Add events listener if ctrl.listener != nil { @@ -75,7 +82,7 @@ func (ctrl *DefaultController) GetLedgerController(ctx context.Context, name str } func (ctrl *DefaultController) CreateLedger(ctx context.Context, name string, configuration ledger.Configuration) error { - return tracing.SkipResult(tracing.Trace(ctx, "CreateLedger", tracing.NoResult(func(ctx context.Context) error { + return tracing.SkipResult(tracing.Trace(ctx, ctrl.tracer, "CreateLedger", tracing.NoResult(func(ctx context.Context) error { configuration.SetDefaults() l, err := ledger.New(name, configuration) if err != nil { @@ -87,25 +94,25 @@ func (ctrl *DefaultController) CreateLedger(ctx context.Context, name string, co } func (ctrl *DefaultController) GetLedger(ctx context.Context, name string) (*ledger.Ledger, error) { - return tracing.Trace(ctx, "GetLedger", func(ctx context.Context) (*ledger.Ledger, error) { + return tracing.Trace(ctx, ctrl.tracer, "GetLedger", func(ctx context.Context) (*ledger.Ledger, error) { return ctrl.store.GetLedger(ctx, name) }) } func (ctrl *DefaultController) ListLedgers(ctx context.Context, query ledgercontroller.ListLedgersQuery) (*bunpaginate.Cursor[ledger.Ledger], error) { - return tracing.Trace(ctx, "ListLedgers", func(ctx context.Context) (*bunpaginate.Cursor[ledger.Ledger], error) { + return tracing.Trace(ctx, ctrl.tracer, "ListLedgers", func(ctx context.Context) (*bunpaginate.Cursor[ledger.Ledger], error) { return ctrl.store.ListLedgers(ctx, query) }) } func (ctrl *DefaultController) UpdateLedgerMetadata(ctx context.Context, name string, m map[string]string) error { - return tracing.SkipResult(tracing.Trace(ctx, "UpdateLedgerMetadata", tracing.NoResult(func(ctx context.Context) error { + return tracing.SkipResult(tracing.Trace(ctx, ctrl.tracer, "UpdateLedgerMetadata", tracing.NoResult(func(ctx context.Context) error { return ctrl.store.UpdateLedgerMetadata(ctx, name, m) }))) } func (ctrl *DefaultController) DeleteLedgerMetadata(ctx context.Context, param string, key string) error { - return tracing.SkipResult(tracing.Trace(ctx, "DeleteLedgerMetadata", tracing.NoResult(func(ctx context.Context) error { + return tracing.SkipResult(tracing.Trace(ctx, ctrl.tracer, "DeleteLedgerMetadata", tracing.NoResult(func(ctx context.Context) error { return ctrl.store.DeleteLedgerMetadata(ctx, param, key) }))) } @@ -142,7 +149,14 @@ func WithMeter(m metric.Meter) Option { } } +func WithTracer(t trace.Tracer) Option { + return func(ctrl *DefaultController) { + ctrl.tracer = t + } +} + var defaultOptions = []Option{ WithCompiler(ledgercontroller.NewDefaultCompiler()), - WithMeter(noop.Meter{}), + WithMeter(noopmetrics.Meter{}), + WithTracer(nooptracer.Tracer{}), } diff --git a/internal/controller/system/module.go b/internal/controller/system/module.go index 18bfeab6b..286a9c15c 100644 --- a/internal/controller/system/module.go +++ b/internal/controller/system/module.go @@ -3,6 +3,7 @@ package system import ( ledgercontroller "github.com/formancehq/ledger/internal/controller/ledger" "go.opentelemetry.io/otel/metric" + "go.opentelemetry.io/otel/trace" "go.uber.org/fx" "time" ) @@ -26,6 +27,7 @@ func NewFXModule(configuration ModuleConfiguration) fx.Option { store Store, listener ledgercontroller.Listener, meterProvider metric.MeterProvider, + tracerProvider trace.TracerProvider, ) *DefaultController { options := make([]Option, 0) if configuration.NSCacheConfiguration.MaxCount != 0 { @@ -41,6 +43,7 @@ func NewFXModule(configuration ModuleConfiguration) fx.Option { append(options, WithDatabaseRetryConfiguration(configuration.DatabaseRetryConfiguration), WithMeter(meterProvider.Meter("core")), + WithTracer(tracerProvider.Tracer("core")), )..., ) }), diff --git a/internal/storage/bucket/bucket.go b/internal/storage/bucket/bucket.go index c8ec81aa7..0bb0a64af 100644 --- a/internal/storage/bucket/bucket.go +++ b/internal/storage/bucket/bucket.go @@ -3,6 +3,7 @@ package bucket import ( "context" _ "embed" + "go.opentelemetry.io/otel/trace" "errors" "github.com/formancehq/go-libs/migrations" @@ -14,8 +15,8 @@ type Bucket struct { db bun.IDB } -func (b *Bucket) Migrate(ctx context.Context) error { - return Migrate(ctx, b.db, b.name) +func (b *Bucket) Migrate(ctx context.Context, tracer trace.Tracer) error { + return Migrate(ctx, tracer, b.db, b.name) } func (b *Bucket) IsUpToDate(ctx context.Context) (bool, error) { diff --git a/internal/storage/bucket/bucket_test.go b/internal/storage/bucket/bucket_test.go index 294de941e..09be235e5 100644 --- a/internal/storage/bucket/bucket_test.go +++ b/internal/storage/bucket/bucket_test.go @@ -5,6 +5,7 @@ package bucket_test import ( "github.com/formancehq/ledger/internal/storage/bucket" "github.com/formancehq/ledger/internal/storage/driver" + "go.opentelemetry.io/otel/trace/noop" "testing" "github.com/formancehq/go-libs/bun/bunconnect" @@ -25,5 +26,5 @@ func TestBuckets(t *testing.T) { require.NoError(t, driver.Migrate(ctx, db)) b := bucket.New(db, name) - require.NoError(t, b.Migrate(ctx)) + require.NoError(t, b.Migrate(ctx, noop.Tracer{})) } diff --git a/internal/storage/bucket/migrations.go b/internal/storage/bucket/migrations.go index 8fb0f69e7..536fb5d2c 100644 --- a/internal/storage/bucket/migrations.go +++ b/internal/storage/bucket/migrations.go @@ -4,10 +4,9 @@ import ( "bytes" "context" "embed" + "go.opentelemetry.io/otel/trace" "text/template" - "github.com/formancehq/ledger/internal/tracing" - "github.com/formancehq/go-libs/migrations" "github.com/uptrace/bun" ) @@ -34,8 +33,8 @@ func GetMigrator(name string) *migrations.Migrator { return migrator } -func Migrate(ctx context.Context, db bun.IDB, name string) error { - ctx, span := tracing.Start(ctx, "Migrate bucket") +func Migrate(ctx context.Context, tracer trace.Tracer, db bun.IDB, name string) error { + ctx, span := tracer.Start(ctx, "Migrate bucket") defer span.End() return GetMigrator(name).Up(ctx, db) diff --git a/internal/storage/driver/driver.go b/internal/storage/driver/driver.go index 02361a8c6..debf90daf 100644 --- a/internal/storage/driver/driver.go +++ b/internal/storage/driver/driver.go @@ -8,7 +8,9 @@ import ( "github.com/formancehq/go-libs/metadata" "github.com/formancehq/go-libs/platform/postgres" "go.opentelemetry.io/otel/metric" - "go.opentelemetry.io/otel/metric/noop" + noopmetrics "go.opentelemetry.io/otel/metric/noop" + "go.opentelemetry.io/otel/trace" + nooptracer "go.opentelemetry.io/otel/trace/noop" systemcontroller "github.com/formancehq/ledger/internal/controller/system" @@ -29,6 +31,7 @@ const ( type Driver struct { db *bun.DB + tracer trace.Tracer meter metric.Meter } @@ -40,11 +43,11 @@ func (d *Driver) createLedgerStore(ctx context.Context, db bun.IDB, ledger ledge } b := bucket.New(tx, ledger.Bucket) - if err := b.Migrate(ctx); err != nil { + if err := b.Migrate(ctx, d.tracer); err != nil { return nil, fmt.Errorf("migrating bucket: %w", err) } - if err := ledgerstore.Migrate(ctx, tx, ledger); err != nil { + if err := ledgerstore.Migrate(ctx, d.tracer, tx, ledger); err != nil { return nil, fmt.Errorf("failed to migrate ledger store: %w", err) } @@ -52,7 +55,12 @@ func (d *Driver) createLedgerStore(ctx context.Context, db bun.IDB, ledger ledge return nil, fmt.Errorf("committing sql transaction to create ledger and schemas: %w", err) } - return ledgerstore.New(d.db, ledger, ledgerstore.WithMeter(d.meter)), nil + return ledgerstore.New( + d.db, + ledger, + ledgerstore.WithMeter(d.meter), + ledgerstore.WithTracer(d.tracer), + ), nil } func (d *Driver) CreateLedger(ctx context.Context, l *ledger.Ledger) (*ledgerstore.Store, error) { @@ -109,7 +117,12 @@ func (d *Driver) OpenLedger(ctx context.Context, name string) (*ledgerstore.Stor return nil, nil, postgres.ResolveError(err) } - return ledgerstore.New(d.db, *ret, ledgerstore.WithMeter(d.meter)), ret, nil + return ledgerstore.New( + d.db, + *ret, + ledgerstore.WithMeter(d.meter), + ledgerstore.WithTracer(d.tracer), + ), ret, nil } func (d *Driver) Initialize(ctx context.Context) error { @@ -166,7 +179,7 @@ func (d *Driver) GetLedger(ctx context.Context, name string) (*ledger.Ledger, er } func (d *Driver) UpgradeBucket(ctx context.Context, name string) error { - return bucket.New(d.db, name).Migrate(ctx) + return bucket.New(d.db, name).Migrate(ctx, d.tracer) } func (d *Driver) UpgradeAllBuckets(ctx context.Context) error { @@ -190,7 +203,7 @@ func (d *Driver) UpgradeAllBuckets(ctx context.Context) error { b := bucket.New(d.db, bucketName) logging.FromContext(ctx).Infof("Upgrading bucket '%s'", bucketName) - if err := b.Migrate(ctx); err != nil { + if err := b.Migrate(ctx, d.tracer); err != nil { return err } } @@ -205,7 +218,7 @@ func (d *Driver) UpgradeAllLedgers(ctx context.Context) error { }, func(cursor *bunpaginate.Cursor[ledger.Ledger]) error { for _, ledger := range cursor.Data { - if err := ledgerstore.Migrate(ctx, d.db, ledger); err != nil { + if err := ledgerstore.Migrate(ctx, d.tracer, d.db, ledger); err != nil { return err } } @@ -236,6 +249,13 @@ func WithMeter(m metric.Meter) Option { } } +func WithTracer(tracer trace.Tracer) Option { + return func(d *Driver) { + d.tracer = tracer + } +} + var defaultOptions = []Option { - WithMeter(noop.Meter{}), + WithMeter(noopmetrics.Meter{}), + WithTracer(nooptracer.Tracer{}), } \ No newline at end of file diff --git a/internal/storage/driver/module.go b/internal/storage/driver/module.go index ada621dd3..1ef19f997 100644 --- a/internal/storage/driver/module.go +++ b/internal/storage/driver/module.go @@ -3,6 +3,7 @@ package driver import ( "context" "go.opentelemetry.io/otel/metric" + "go.opentelemetry.io/otel/trace" systemcontroller "github.com/formancehq/ledger/internal/controller/system" @@ -21,8 +22,15 @@ type ModuleConfiguration struct { func NewFXModule(autoUpgrade bool) fx.Option { return fx.Options( - fx.Provide(func(db *bun.DB, meterProvider metric.MeterProvider) (*Driver, error) { - return New(db, WithMeter(meterProvider.Meter("store"))), nil + fx.Provide(func( + db *bun.DB, + tracerProvider trace.TracerProvider, + meterProvider metric.MeterProvider, + ) (*Driver, error) { + return New(db, + WithMeter(meterProvider.Meter("store")), + WithTracer(tracerProvider.Tracer("store")), + ), nil }), fx.Provide(fx.Annotate(NewControllerStorageDriverAdapter, fx.As(new(systemcontroller.Store)))), fx.Invoke(func(driver *Driver, lifecycle fx.Lifecycle, logger logging.Logger) error { diff --git a/internal/storage/ledger/accounts.go b/internal/storage/ledger/accounts.go index 486399bb2..bf7e20665 100644 --- a/internal/storage/ledger/accounts.go +++ b/internal/storage/ledger/accounts.go @@ -213,6 +213,7 @@ func (s *Store) ListAccounts(ctx context.Context, q ledgercontroller.ListAccount return tracing.TraceWithMetric( ctx, "ListAccounts", + s.tracer, s.listAccountsHistogram, func(ctx context.Context) (*Cursor[ledger.Account], error) { ret, err := UsingOffset[ledgercontroller.PaginatedQueryOptions[ledgercontroller.PITFilterWithVolumes], ledger.Account]( @@ -239,6 +240,7 @@ func (s *Store) GetAccount(ctx context.Context, q ledgercontroller.GetAccountQue return tracing.TraceWithMetric( ctx, "GetAccount", + s.tracer, s.getAccountHistogram, func(ctx context.Context) (*ledger.Account, error) { ret := &ledger.Account{} @@ -259,6 +261,7 @@ func (s *Store) CountAccounts(ctx context.Context, q ledgercontroller.ListAccoun return tracing.TraceWithMetric( ctx, "CountAccounts", + s.tracer, s.countAccountsHistogram, func(ctx context.Context) (int, error) { return s.db.NewSelect(). @@ -277,6 +280,7 @@ func (s *Store) UpdateAccountsMetadata(ctx context.Context, m map[string]metadat _, err := tracing.TraceWithMetric( ctx, "UpdateAccountsMetadata", + s.tracer, s.updateAccountsMetadataHistogram, tracing.NoResult(func(ctx context.Context) error { type AccountWithLedger struct { @@ -312,6 +316,7 @@ func (s *Store) DeleteAccountMetadata(ctx context.Context, account, key string) _, err := tracing.TraceWithMetric( ctx, "DeleteAccountMetadata", + s.tracer, s.deleteAccountMetadataHistogram, tracing.NoResult(func(ctx context.Context) error { _, err := s.db.NewUpdate(). @@ -330,6 +335,7 @@ func (s *Store) UpsertAccount(ctx context.Context, account *ledger.Account) (boo return tracing.TraceWithMetric( ctx, "UpsertAccount", + s.tracer, s.upsertAccountHistogram, func(ctx context.Context) (bool, error) { upserted := false diff --git a/internal/storage/ledger/balances.go b/internal/storage/ledger/balances.go index 1b519a42f..a71836d21 100644 --- a/internal/storage/ledger/balances.go +++ b/internal/storage/ledger/balances.go @@ -191,6 +191,7 @@ func (s *Store) GetBalances(ctx context.Context, query ledgercontroller.BalanceQ return tracing.TraceWithMetric( ctx, "GetBalances", + s.tracer, s.getBalancesHistogram, func(ctx context.Context) (ledgercontroller.Balances, error) { conditions := make([]string, 0) diff --git a/internal/storage/ledger/logs.go b/internal/storage/ledger/logs.go index 4d2cf9573..6d2ab944f 100644 --- a/internal/storage/ledger/logs.go +++ b/internal/storage/ledger/logs.go @@ -58,6 +58,7 @@ func (s *Store) InsertLog(ctx context.Context, log *ledger.Log) error { _, err := tracing.TraceWithMetric( ctx, "InsertLog", + s.tracer, s.insertLogHistogram, tracing.NoResult(func(ctx context.Context) error { @@ -111,6 +112,7 @@ func (s *Store) ListLogs(ctx context.Context, q ledgercontroller.GetLogsQuery) ( return tracing.TraceWithMetric( ctx, "ListLogs", + s.tracer, s.listLogsHistogram, func(ctx context.Context) (*bunpaginate.Cursor[ledger.Log], error) { selectQuery := s.db.NewSelect(). @@ -147,6 +149,7 @@ func (s *Store) ReadLogWithIdempotencyKey(ctx context.Context, key string) (*led return tracing.TraceWithMetric( ctx, "ReadLogWithIdempotencyKey", + s.tracer, s.readLogWithIdempotencyKeyHistogram, func(ctx context.Context) (*ledger.Log, error) { ret := &Log{} diff --git a/internal/storage/ledger/main_test.go b/internal/storage/ledger/main_test.go index ffc399e29..aff532f25 100644 --- a/internal/storage/ledger/main_test.go +++ b/internal/storage/ledger/main_test.go @@ -6,6 +6,7 @@ import ( "database/sql" systemstore "github.com/formancehq/ledger/internal/storage/driver" ledgerstore "github.com/formancehq/ledger/internal/storage/ledger" + "go.opentelemetry.io/otel/trace/noop" "math/big" "testing" @@ -85,8 +86,8 @@ func newLedgerStore(t T) *ledgerstore.Store { l.Bucket = ledgerName b := bucket.New(db, ledgerName) - require.NoError(t, b.Migrate(ctx)) - require.NoError(t, ledgerstore.Migrate(ctx, db, l)) + require.NoError(t, b.Migrate(ctx, noop.Tracer{})) + require.NoError(t, ledgerstore.Migrate(ctx, noop.Tracer{}, db, l)) return ledgerstore.New(db, l) } diff --git a/internal/storage/ledger/migrations.go b/internal/storage/ledger/migrations.go index d93db531d..b61c809fb 100644 --- a/internal/storage/ledger/migrations.go +++ b/internal/storage/ledger/migrations.go @@ -5,10 +5,9 @@ import ( "context" "embed" "fmt" + "go.opentelemetry.io/otel/trace" "text/template" - "github.com/formancehq/ledger/internal/tracing" - "github.com/formancehq/go-libs/migrations" ledger "github.com/formancehq/ledger/internal" "github.com/uptrace/bun" @@ -36,8 +35,8 @@ func getMigrator(ledger ledger.Ledger) *migrations.Migrator { return migrator } -func Migrate(ctx context.Context, db bun.IDB, ledger ledger.Ledger) error { - ctx, span := tracing.Start(ctx, "Migrate ledger") +func Migrate(ctx context.Context, tracer trace.Tracer, db bun.IDB, ledger ledger.Ledger) error { + ctx, span := tracer.Start(ctx, "Migrate ledger") defer span.End() return getMigrator(ledger).Up(ctx, db) diff --git a/internal/storage/ledger/moves.go b/internal/storage/ledger/moves.go index 231818a47..06a559b8f 100644 --- a/internal/storage/ledger/moves.go +++ b/internal/storage/ledger/moves.go @@ -64,6 +64,7 @@ func (s *Store) InsertMoves(ctx context.Context, moves ...*ledger.Move) error { _, err := tracing.TraceWithMetric( ctx, "InsertMoves", + s.tracer, s.insertMovesHistogram, tracing.NoResult(func(ctx context.Context) error { _, err := s.db.NewInsert(). diff --git a/internal/storage/ledger/store.go b/internal/storage/ledger/store.go index f35e56c1c..1363c1e95 100644 --- a/internal/storage/ledger/store.go +++ b/internal/storage/ledger/store.go @@ -5,7 +5,9 @@ import ( "fmt" "github.com/formancehq/go-libs/platform/postgres" "go.opentelemetry.io/otel/metric" - "go.opentelemetry.io/otel/metric/noop" + noopmetrics "go.opentelemetry.io/otel/metric/noop" + "go.opentelemetry.io/otel/trace" + nooptracer "go.opentelemetry.io/otel/trace/noop" "github.com/formancehq/ledger/internal/tracing" @@ -20,6 +22,7 @@ type Store struct { db bun.IDB ledger ledger.Ledger + tracer trace.Tracer meter metric.Meter listAccountsHistogram metric.Int64Histogram checkBucketSchemaHistogram metric.Int64Histogram @@ -66,9 +69,15 @@ func (s *Store) GetMigrationsInfo(ctx context.Context) ([]migrations.Info, error } func (s *Store) IsUpToDate(ctx context.Context) (bool, error) { - bucketUpToDate, err := tracing.TraceWithMetric(ctx, "CheckBucketSchema", s.checkBucketSchemaHistogram, func(ctx context.Context) (bool, error) { - return bucket.New(s.db, s.ledger.Bucket).IsUpToDate(ctx) - }) + bucketUpToDate, err := tracing.TraceWithMetric( + ctx, + "CheckBucketSchema", + s.tracer, + s.checkBucketSchemaHistogram, + func(ctx context.Context) (bool, error) { + return bucket.New(s.db, s.ledger.Bucket).IsUpToDate(ctx) + }, + ) if err != nil { return false, fmt.Errorf("failed to check if bucket is up to date: %w", err) } @@ -76,9 +85,15 @@ func (s *Store) IsUpToDate(ctx context.Context) (bool, error) { return false, nil } - ret, err := tracing.TraceWithMetric(ctx, "CheckLedgerSchema", s.checkLedgerSchemaHistogram, func(ctx context.Context) (bool, error) { - return getMigrator(s.ledger).IsUpToDate(ctx, s.db) - }) + ret, err := tracing.TraceWithMetric( + ctx, + "CheckLedgerSchema", + s.tracer, + s.checkLedgerSchemaHistogram, + func(ctx context.Context) (bool, error) { + return getMigrator(s.ledger).IsUpToDate(ctx, s.db) + }, + ) if err != nil && errors.Is(err, migrations.ErrMissingVersionTable) { return false, nil } @@ -214,6 +229,13 @@ func WithMeter(meter metric.Meter) Option { } } +func WithTracer(tracer trace.Tracer) Option { + return func(s *Store) { + s.tracer = tracer + } +} + var defaultOptions = []Option{ - WithMeter(noop.Meter{}), + WithMeter(noopmetrics.Meter{}), + WithTracer(nooptracer.Tracer{}), } diff --git a/internal/storage/ledger/transactions.go b/internal/storage/ledger/transactions.go index 5718e1375..66e651a6c 100644 --- a/internal/storage/ledger/transactions.go +++ b/internal/storage/ledger/transactions.go @@ -300,29 +300,36 @@ func (s *Store) CommitTransaction(ctx context.Context, tx *ledger.Transaction) e } func (s *Store) ListTransactions(ctx context.Context, q ledgercontroller.ListTransactionsQuery) (*bunpaginate.Cursor[ledger.Transaction], error) { - return tracing.TraceWithMetric(ctx, "ListTransactions", s.listTransactionsHistogram, func(ctx context.Context) (*bunpaginate.Cursor[ledger.Transaction], error) { - cursor, err := bunpaginate.UsingColumn[ledgercontroller.PaginatedQueryOptions[ledgercontroller.PITFilterWithVolumes], ledger.Transaction]( - ctx, - s.selectTransactions( - q.Options.Options.PIT, - q.Options.Options.ExpandVolumes, - q.Options.Options.ExpandEffectiveVolumes, - q.Options.QueryBuilder, - ), - bunpaginate.ColumnPaginatedQuery[ledgercontroller.PaginatedQueryOptions[ledgercontroller.PITFilterWithVolumes]](q), - ) - if err != nil { - return nil, err - } + return tracing.TraceWithMetric( + ctx, + "ListTransactions", + s.tracer, + s.listTransactionsHistogram, + func(ctx context.Context) (*bunpaginate.Cursor[ledger.Transaction], error) { + cursor, err := bunpaginate.UsingColumn[ledgercontroller.PaginatedQueryOptions[ledgercontroller.PITFilterWithVolumes], ledger.Transaction]( + ctx, + s.selectTransactions( + q.Options.Options.PIT, + q.Options.Options.ExpandVolumes, + q.Options.Options.ExpandEffectiveVolumes, + q.Options.QueryBuilder, + ), + bunpaginate.ColumnPaginatedQuery[ledgercontroller.PaginatedQueryOptions[ledgercontroller.PITFilterWithVolumes]](q), + ) + if err != nil { + return nil, err + } - return cursor, nil - }) + return cursor, nil + }, + ) } func (s *Store) CountTransactions(ctx context.Context, q ledgercontroller.ListTransactionsQuery) (int, error) { return tracing.TraceWithMetric( ctx, "CountTransactions", + s.tracer, s.countTransactionsHistogram, func(ctx context.Context) (int, error) { return s.db.NewSelect(). @@ -341,6 +348,7 @@ func (s *Store) GetTransaction(ctx context.Context, filter ledgercontroller.GetT return tracing.TraceWithMetric( ctx, "GetTransaction", + s.tracer, s.getTransactionHistogram, func(ctx context.Context) (*ledger.Transaction, error) { @@ -367,6 +375,7 @@ func (s *Store) InsertTransaction(ctx context.Context, tx *ledger.Transaction) e _, err := tracing.TraceWithMetric( ctx, "InsertTransaction", + s.tracer, s.insertTransactionHistogram, func(ctx context.Context) (*ledger.Transaction, error) { _, err := s.db.NewInsert(). @@ -439,6 +448,7 @@ func (s *Store) RevertTransaction(ctx context.Context, id int) (tx *ledger.Trans _, err = tracing.TraceWithMetric( ctx, "RevertTransaction", + s.tracer, s.revertTransactionHistogram, func(ctx context.Context) (*ledger.Transaction, error) { tx, modified, err = s.updateTxWithRetrieve( @@ -467,6 +477,7 @@ func (s *Store) UpdateTransactionMetadata(ctx context.Context, id int, m metadat _, err = tracing.TraceWithMetric( ctx, "UpdateTransactionMetadata", + s.tracer, s.updateTransactionMetadataHistogram, func(ctx context.Context) (*ledger.Transaction, error) { tx, modified, err = s.updateTxWithRetrieve( @@ -495,6 +506,7 @@ func (s *Store) DeleteTransactionMetadata(ctx context.Context, id int, key strin _, err = tracing.TraceWithMetric( ctx, "DeleteTransactionMetadata", + s.tracer, s.deleteTransactionMetadataHistogram, func(ctx context.Context) (*ledger.Transaction, error) { tx, modified, err = s.updateTxWithRetrieve( diff --git a/internal/storage/ledger/volumes.go b/internal/storage/ledger/volumes.go index 1b07c6138..621a088b4 100644 --- a/internal/storage/ledger/volumes.go +++ b/internal/storage/ledger/volumes.go @@ -20,6 +20,7 @@ func (s *Store) UpdateVolumes(ctx context.Context, accountVolumes ...ledger.Acco return tracing.TraceWithMetric( ctx, "UpdateBalances", + s.tracer, s.updateBalancesHistogram, func(ctx context.Context) (ledger.PostCommitVolumes, error) { @@ -222,6 +223,7 @@ func (s *Store) GetVolumesWithBalances(ctx context.Context, q ledgercontroller.G return tracing.TraceWithMetric( ctx, "GetVolumesWithBalances", + s.tracer, s.getVolumesWithBalancesHistogram, func(ctx context.Context) (*bunpaginate.Cursor[ledger.VolumesWithBalanceByAssetByAccount], error) { return bunpaginate.UsingOffset[ledgercontroller.PaginatedQueryOptions[ledgercontroller.FiltersForVolumes], ledger.VolumesWithBalanceByAssetByAccount]( diff --git a/internal/tracing/tracer.go b/internal/tracing/tracing.go similarity index 60% rename from internal/tracing/tracer.go rename to internal/tracing/tracing.go index f8c377a88..40a7a9b8f 100644 --- a/internal/tracing/tracer.go +++ b/internal/tracing/tracing.go @@ -6,27 +6,20 @@ import ( "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/metric" - "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/trace" ) -// todo: remove global -var Tracer = otel.Tracer("com.formance.ledger") - -func Start(ctx context.Context, name string, opts ...trace.SpanStartOption) (context.Context, trace.Span) { - return Tracer.Start(ctx, name, opts...) -} - func TraceWithMetric[RET any]( ctx context.Context, operationName string, + tracer trace.Tracer, histogram metric.Int64Histogram, fn func(ctx context.Context) (RET, error), finalizers ...func(ctx context.Context, ret RET), ) (RET, error) { var zeroRet RET - return Trace(ctx, operationName, func(ctx context.Context) (RET, error) { + return Trace(ctx, tracer, operationName, func(ctx context.Context) (RET, error) { now := time.Now() ret, err := fn(ctx) if err != nil { @@ -45,3 +38,20 @@ func TraceWithMetric[RET any]( return ret, nil }) } + +func Trace[RET any](ctx context.Context, tracer trace.Tracer, name string, fn func(ctx context.Context) (RET, error)) (RET, error) { + ctx, trace := tracer.Start(ctx, name) + defer trace.End() + + return fn(ctx) +} + +func NoResult(fn func(ctx context.Context) error) func(ctx context.Context) (any, error) { + return func(ctx context.Context) (any, error) { + return nil, fn(ctx) + } +} + +func SkipResult[RET any](_ RET, err error) error { + return err +} diff --git a/internal/tracing/utils.go b/internal/tracing/utils.go deleted file mode 100644 index 02fe5ade3..000000000 --- a/internal/tracing/utils.go +++ /dev/null @@ -1,20 +0,0 @@ -package tracing - -import "context" - -func Trace[RET any](ctx context.Context, name string, fn func(ctx context.Context) (RET, error)) (RET, error) { - ctx, trace := Start(ctx, name) - defer trace.End() - - return fn(ctx) -} - -func NoResult(fn func(ctx context.Context) error) func(ctx context.Context) (any, error) { - return func(ctx context.Context) (any, error) { - return nil, fn(ctx) - } -} - -func SkipResult[RET any](_ RET, err error) error { - return err -}