From b264d2c357ecbeb94c999cf24df4ee11dbce2aec Mon Sep 17 00:00:00 2001 From: Mahad Zaryab <43658574+mahadzaryab1@users.noreply.github.com> Date: Sat, 7 Dec 2024 18:13:01 -0500 Subject: [PATCH] [v2][storage] Add dependency store to v2 storage interface (#6297) ## Which problem is this PR solving? - Towards #5079 ## Description of the changes - This PR creates a new factory in the `depstore` package for the DependencyReader and exposes a `CreateDependencyReader` function in the `factoryadapter` - This PR also changes some interface return types to structs because some structs implement multiple interfaces. - The new factory was integrated into the query service and the callsites were changed to use the v2 factory instead of the v1 factory to initialize the dependency reader. ## How was this change tested? - CI / Unit Tests ## Checklist - [x] I have read https://github.com/jaegertracing/jaeger/blob/master/CONTRIBUTING_GUIDELINES.md - [x] I have signed all commits - [x] I have added unit tests for the new functionality - [x] I have run lint and test steps successfully - for `jaeger`: `make lint test` - for `jaeger-ui`: `yarn lint` and `yarn test` --------- Signed-off-by: Mahad Zaryab Signed-off-by: Mahad Zaryab <43658574+mahadzaryab1@users.noreply.github.com> --- .mockery.yaml | 3 + cmd/all-in-one/main.go | 6 +- cmd/anonymizer/app/query/query_test.go | 2 +- .../exporters/storageexporter/exporter.go | 2 +- .../internal/extension/jaegerquery/server.go | 20 +++--- .../extension/jaegerquery/server_test.go | 2 +- .../extension/jaegerstorage/extension.go | 2 +- .../extension/jaegerstorage/extension_test.go | 4 +- cmd/query/app/apiv3/grpc_handler_test.go | 2 +- cmd/query/app/apiv3/http_gateway_test.go | 2 +- cmd/query/app/grpc_handler_test.go | 19 ++++-- cmd/query/app/handler_deps_test.go | 14 ++-- cmd/query/app/http_handler_test.go | 2 +- cmd/query/app/querysvc/query_service.go | 11 ++-- cmd/query/app/querysvc/query_service_test.go | 9 ++- cmd/query/app/server_test.go | 2 +- cmd/query/main.go | 2 +- storage_v2/depstore/factory.go | 8 +++ storage_v2/depstore/mocks/Factory.go | 62 +++++++++++++++++ storage_v2/depstore/mocks/Reader.go | 66 +++++++++++++++++++ storage_v2/depstore/package_test.go | 14 ++++ storage_v2/depstore/reader.go | 22 +++++++ storage_v2/factory.go | 21 ------ storage_v2/factoryadapter/factory.go | 12 +++- storage_v2/factoryadapter/factory_test.go | 22 +++++++ storage_v2/factoryadapter/reader.go | 20 ++++++ storage_v2/factoryadapter/reader_test.go | 21 ++++++ storage_v2/tracestore/factory.go | 6 -- storage_v2/tracestore/mocks/Factory.go | 38 ----------- 29 files changed, 307 insertions(+), 109 deletions(-) create mode 100644 storage_v2/depstore/factory.go create mode 100644 storage_v2/depstore/mocks/Factory.go create mode 100644 storage_v2/depstore/mocks/Reader.go create mode 100644 storage_v2/depstore/package_test.go create mode 100644 storage_v2/depstore/reader.go delete mode 100644 storage_v2/factory.go diff --git a/.mockery.yaml b/.mockery.yaml index a0f879a5a35..513908625a9 100644 --- a/.mockery.yaml +++ b/.mockery.yaml @@ -76,3 +76,6 @@ packages: github.com/jaegertracing/jaeger/storage_v2/tracestore: config: all: true + github.com/jaegertracing/jaeger/storage_v2/depstore: + config: + all: true diff --git a/cmd/all-in-one/main.go b/cmd/all-in-one/main.go index c9382ac80a7..a34a95d0ea2 100644 --- a/cmd/all-in-one/main.go +++ b/cmd/all-in-one/main.go @@ -38,7 +38,7 @@ import ( ss "github.com/jaegertracing/jaeger/plugin/sampling/strategyprovider" "github.com/jaegertracing/jaeger/plugin/storage" "github.com/jaegertracing/jaeger/ports" - "github.com/jaegertracing/jaeger/storage/dependencystore" + "github.com/jaegertracing/jaeger/storage_v2/depstore" "github.com/jaegertracing/jaeger/storage_v2/factoryadapter" "github.com/jaegertracing/jaeger/storage_v2/tracestore" ) @@ -115,7 +115,7 @@ by default uses only in-memory database.`, if err != nil { logger.Fatal("Failed to create span writer", zap.Error(err)) } - dependencyReader, err := storageFactory.CreateDependencyReader() + dependencyReader, err := v2Factory.CreateDependencyReader() if err != nil { logger.Fatal("Failed to create dependency reader", zap.Error(err)) } @@ -219,7 +219,7 @@ func startQuery( qOpts *queryApp.QueryOptions, queryOpts *querysvc.QueryServiceOptions, traceReader tracestore.Reader, - depReader dependencystore.Reader, + depReader depstore.Reader, metricsQueryService querysvc.MetricsQueryService, tm *tenancy.Manager, telset telemetry.Settings, diff --git a/cmd/anonymizer/app/query/query_test.go b/cmd/anonymizer/app/query/query_test.go index 24352622e54..768dbc3c1fb 100644 --- a/cmd/anonymizer/app/query/query_test.go +++ b/cmd/anonymizer/app/query/query_test.go @@ -18,9 +18,9 @@ import ( "github.com/jaegertracing/jaeger/model" "github.com/jaegertracing/jaeger/plugin/metricstore/disabled" "github.com/jaegertracing/jaeger/proto-gen/api_v2" - dependencyStoreMocks "github.com/jaegertracing/jaeger/storage/dependencystore/mocks" "github.com/jaegertracing/jaeger/storage/spanstore" spanstoremocks "github.com/jaegertracing/jaeger/storage/spanstore/mocks" + dependencyStoreMocks "github.com/jaegertracing/jaeger/storage_v2/depstore/mocks" "github.com/jaegertracing/jaeger/storage_v2/factoryadapter" ) diff --git a/cmd/jaeger/internal/exporters/storageexporter/exporter.go b/cmd/jaeger/internal/exporters/storageexporter/exporter.go index 3fbeab79b43..4b2dee9e0fc 100644 --- a/cmd/jaeger/internal/exporters/storageexporter/exporter.go +++ b/cmd/jaeger/internal/exporters/storageexporter/exporter.go @@ -34,7 +34,7 @@ func newExporter(config *Config, otel component.TelemetrySettings) *storageExpor } func (exp *storageExporter) start(_ context.Context, host component.Host) error { - f, err := jaegerstorage.GetStorageFactoryV2(exp.config.TraceStorage, host) + f, err := jaegerstorage.GetTraceStoreFactory(exp.config.TraceStorage, host) if err != nil { return fmt.Errorf("cannot find storage factory: %w", err) } diff --git a/cmd/jaeger/internal/extension/jaegerquery/server.go b/cmd/jaeger/internal/extension/jaegerquery/server.go index 29f836e9c89..5f578c3f2de 100644 --- a/cmd/jaeger/internal/extension/jaegerquery/server.go +++ b/cmd/jaeger/internal/extension/jaegerquery/server.go @@ -21,6 +21,7 @@ import ( "github.com/jaegertracing/jaeger/pkg/tenancy" "github.com/jaegertracing/jaeger/plugin/metricstore/disabled" "github.com/jaegertracing/jaeger/storage/metricstore" + "github.com/jaegertracing/jaeger/storage_v2/depstore" ) var ( @@ -71,23 +72,20 @@ func (s *server) Start(ctx context.Context, host component.Host) error { telset.Metrics = telset.Metrics. Namespace(metrics.NSOptions{Name: "jaeger"}). Namespace(metrics.NSOptions{Name: "query"}) - - // TODO currently v1 is still needed because of dependency storage - v1Factory, err := jaegerstorage.GetStorageFactory(s.config.Storage.TracesPrimary, host) + tf, err := jaegerstorage.GetTraceStoreFactory(s.config.Storage.TracesPrimary, host) if err != nil { - return fmt.Errorf("cannot find v1 factory for primary storage %s: %w", s.config.Storage.TracesPrimary, err) + return fmt.Errorf("cannot find factory for trace storage %s: %w", s.config.Storage.TracesPrimary, err) } - f, err := jaegerstorage.GetStorageFactoryV2(s.config.Storage.TracesPrimary, host) - if err != nil { - return fmt.Errorf("cannot find v2 factory for primary storage %s: %w", s.config.Storage.TracesPrimary, err) - } - - traceReader, err := f.CreateTraceReader() + traceReader, err := tf.CreateTraceReader() if err != nil { return fmt.Errorf("cannot create trace reader: %w", err) } - depReader, err := v1Factory.CreateDependencyReader() + df, ok := tf.(depstore.Factory) + if !ok { + return fmt.Errorf("cannot find factory for dependency storage %s: %w", s.config.Storage.TracesPrimary, err) + } + depReader, err := df.CreateDependencyReader() if err != nil { return fmt.Errorf("cannot create dependencies reader: %w", err) } diff --git a/cmd/jaeger/internal/extension/jaegerquery/server_test.go b/cmd/jaeger/internal/extension/jaegerquery/server_test.go index 70e60d78768..696b691c938 100644 --- a/cmd/jaeger/internal/extension/jaegerquery/server_test.go +++ b/cmd/jaeger/internal/extension/jaegerquery/server_test.go @@ -150,7 +150,7 @@ func TestServerStart(t *testing.T) { TracesPrimary: "need-factory-error", }, }, - expectedErr: "cannot find v1 factory for primary storage", + expectedErr: "cannot find factory for trace storage", }, { name: "span reader error", diff --git a/cmd/jaeger/internal/extension/jaegerstorage/extension.go b/cmd/jaeger/internal/extension/jaegerstorage/extension.go index 7ac99e15a29..c1dd68b01b7 100644 --- a/cmd/jaeger/internal/extension/jaegerstorage/extension.go +++ b/cmd/jaeger/internal/extension/jaegerstorage/extension.go @@ -74,7 +74,7 @@ func GetMetricStorageFactory(name string, host component.Host) (storage.MetricSt return mf, nil } -func GetStorageFactoryV2(name string, host component.Host) (tracestore.Factory, error) { +func GetTraceStoreFactory(name string, host component.Host) (tracestore.Factory, error) { f, err := GetStorageFactory(name, host) if err != nil { return nil, err diff --git a/cmd/jaeger/internal/extension/jaegerstorage/extension_test.go b/cmd/jaeger/internal/extension/jaegerstorage/extension_test.go index 6213c912765..8623703ad83 100644 --- a/cmd/jaeger/internal/extension/jaegerstorage/extension_test.go +++ b/cmd/jaeger/internal/extension/jaegerstorage/extension_test.go @@ -100,7 +100,7 @@ func TestStorageFactoryBadShutdownError(t *testing.T) { func TestGetFactoryV2Error(t *testing.T) { host := componenttest.NewNopHost() - _, err := GetStorageFactoryV2("something", host) + _, err := GetTraceStoreFactory("something", host) require.ErrorContains(t, err, "cannot find extension") } @@ -112,7 +112,7 @@ func TestGetFactory(t *testing.T) { require.NoError(t, err) require.NotNil(t, f) - f2, err := GetStorageFactoryV2(name, host) + f2, err := GetTraceStoreFactory(name, host) require.NoError(t, err) require.NotNil(t, f2) diff --git a/cmd/query/app/apiv3/grpc_handler_test.go b/cmd/query/app/apiv3/grpc_handler_test.go index b14f1c01abf..6534df23fe6 100644 --- a/cmd/query/app/apiv3/grpc_handler_test.go +++ b/cmd/query/app/apiv3/grpc_handler_test.go @@ -20,9 +20,9 @@ import ( "github.com/jaegertracing/jaeger/cmd/query/app/querysvc" "github.com/jaegertracing/jaeger/model" _ "github.com/jaegertracing/jaeger/pkg/gogocodec" // force gogo codec registration - dependencyStoreMocks "github.com/jaegertracing/jaeger/storage/dependencystore/mocks" "github.com/jaegertracing/jaeger/storage/spanstore" spanstoremocks "github.com/jaegertracing/jaeger/storage/spanstore/mocks" + dependencyStoreMocks "github.com/jaegertracing/jaeger/storage_v2/depstore/mocks" "github.com/jaegertracing/jaeger/storage_v2/factoryadapter" ) diff --git a/cmd/query/app/apiv3/http_gateway_test.go b/cmd/query/app/apiv3/http_gateway_test.go index 5f742a0522f..f0abbeb3307 100644 --- a/cmd/query/app/apiv3/http_gateway_test.go +++ b/cmd/query/app/apiv3/http_gateway_test.go @@ -22,9 +22,9 @@ import ( "github.com/jaegertracing/jaeger/model" "github.com/jaegertracing/jaeger/pkg/jtracer" "github.com/jaegertracing/jaeger/pkg/testutils" - dependencyStoreMocks "github.com/jaegertracing/jaeger/storage/dependencystore/mocks" "github.com/jaegertracing/jaeger/storage/spanstore" spanstoremocks "github.com/jaegertracing/jaeger/storage/spanstore/mocks" + dependencyStoreMocks "github.com/jaegertracing/jaeger/storage_v2/depstore/mocks" "github.com/jaegertracing/jaeger/storage_v2/factoryadapter" ) diff --git a/cmd/query/app/grpc_handler_test.go b/cmd/query/app/grpc_handler_test.go index 273f2a2f712..a5c59638031 100644 --- a/cmd/query/app/grpc_handler_test.go +++ b/cmd/query/app/grpc_handler_test.go @@ -28,11 +28,12 @@ import ( "github.com/jaegertracing/jaeger/plugin/metricstore/disabled" "github.com/jaegertracing/jaeger/proto-gen/api_v2" "github.com/jaegertracing/jaeger/proto-gen/api_v2/metrics" - depsmocks "github.com/jaegertracing/jaeger/storage/dependencystore/mocks" "github.com/jaegertracing/jaeger/storage/metricstore" metricsmocks "github.com/jaegertracing/jaeger/storage/metricstore/mocks" "github.com/jaegertracing/jaeger/storage/spanstore" spanstoremocks "github.com/jaegertracing/jaeger/storage/spanstore/mocks" + "github.com/jaegertracing/jaeger/storage_v2/depstore" + depsmocks "github.com/jaegertracing/jaeger/storage_v2/depstore/mocks" "github.com/jaegertracing/jaeger/storage_v2/factoryadapter" ) @@ -513,8 +514,10 @@ func TestGetDependenciesSuccessGRPC(t *testing.T) { endTs := time.Now().UTC() server.depReader.On("GetDependencies", mock.Anything, // context.Context - endTs, - defaultDependencyLookbackDuration, + depstore.QueryParameters{ + StartTime: endTs.Add(-defaultDependencyLookbackDuration), + EndTime: endTs, + }, ).Return(expectedDependencies, nil).Times(1) res, err := client.GetDependencies(context.Background(), &api_v2.GetDependenciesRequest{ @@ -529,11 +532,13 @@ func TestGetDependenciesSuccessGRPC(t *testing.T) { func TestGetDependenciesFailureGRPC(t *testing.T) { withServerAndClient(t, func(server *grpcServer, client *grpcClient) { endTs := time.Now().UTC() - server.depReader.On( - "GetDependencies", + server.depReader.On("GetDependencies", mock.Anything, // context.Context - endTs, - defaultDependencyLookbackDuration).Return(nil, errStorageGRPC).Times(1) + depstore.QueryParameters{ + StartTime: endTs.Add(-defaultDependencyLookbackDuration), + EndTime: endTs, + }, + ).Return(nil, errStorageGRPC).Times(1) _, err := client.GetDependencies(context.Background(), &api_v2.GetDependenciesRequest{ StartTime: endTs.Add(time.Duration(-1) * defaultDependencyLookbackDuration), diff --git a/cmd/query/app/handler_deps_test.go b/cmd/query/app/handler_deps_test.go index ff71efe62cf..e24799c00ce 100644 --- a/cmd/query/app/handler_deps_test.go +++ b/cmd/query/app/handler_deps_test.go @@ -15,6 +15,7 @@ import ( "github.com/jaegertracing/jaeger/model" ui "github.com/jaegertracing/jaeger/model/json" + "github.com/jaegertracing/jaeger/storage_v2/depstore" ) func TestDeduplicateDependencies(t *testing.T) { @@ -304,8 +305,10 @@ func TestGetDependenciesSuccess(t *testing.T) { endTs := time.Unix(0, 1476374248550*millisToNanosMultiplier) ts.dependencyReader.On("GetDependencies", mock.Anything, // context - endTs, - defaultDependencyLookbackDuration, + depstore.QueryParameters{ + StartTime: endTs.Add(-defaultDependencyLookbackDuration), + EndTime: endTs, + }, ).Return(expectedDependencies, nil).Times(1) var response structuredResponse @@ -324,8 +327,11 @@ func TestGetDependenciesCassandraFailure(t *testing.T) { endTs := time.Unix(0, 1476374248550*millisToNanosMultiplier) ts.dependencyReader.On("GetDependencies", mock.Anything, // context - endTs, - defaultDependencyLookbackDuration).Return(nil, errStorage).Times(1) + depstore.QueryParameters{ + StartTime: endTs.Add(-defaultDependencyLookbackDuration), + EndTime: endTs, + }, + ).Return(nil, errStorage).Times(1) var response structuredResponse err := getJSON(ts.server.URL+"/api/dependencies?endTs=1476374248550&service=testing", &response) diff --git a/cmd/query/app/http_handler_test.go b/cmd/query/app/http_handler_test.go index dfa54f6701a..511495193c3 100644 --- a/cmd/query/app/http_handler_test.go +++ b/cmd/query/app/http_handler_test.go @@ -38,10 +38,10 @@ import ( "github.com/jaegertracing/jaeger/pkg/tenancy" "github.com/jaegertracing/jaeger/plugin/metricstore/disabled" "github.com/jaegertracing/jaeger/proto-gen/api_v2/metrics" - depsmocks "github.com/jaegertracing/jaeger/storage/dependencystore/mocks" metricsmocks "github.com/jaegertracing/jaeger/storage/metricstore/mocks" "github.com/jaegertracing/jaeger/storage/spanstore" spanstoremocks "github.com/jaegertracing/jaeger/storage/spanstore/mocks" + depsmocks "github.com/jaegertracing/jaeger/storage_v2/depstore/mocks" "github.com/jaegertracing/jaeger/storage_v2/factoryadapter" ) diff --git a/cmd/query/app/querysvc/query_service.go b/cmd/query/app/querysvc/query_service.go index f627c3c8344..1a34cd1e408 100644 --- a/cmd/query/app/querysvc/query_service.go +++ b/cmd/query/app/querysvc/query_service.go @@ -13,8 +13,8 @@ import ( "github.com/jaegertracing/jaeger/model" "github.com/jaegertracing/jaeger/model/adjuster" "github.com/jaegertracing/jaeger/storage" - "github.com/jaegertracing/jaeger/storage/dependencystore" "github.com/jaegertracing/jaeger/storage/spanstore" + "github.com/jaegertracing/jaeger/storage_v2/depstore" "github.com/jaegertracing/jaeger/storage_v2/factoryadapter" "github.com/jaegertracing/jaeger/storage_v2/tracestore" ) @@ -43,12 +43,12 @@ type StorageCapabilities struct { // QueryService contains span utils required by the query-service. type QueryService struct { traceReader tracestore.Reader - dependencyReader dependencystore.Reader + dependencyReader depstore.Reader options QueryServiceOptions } // NewQueryService returns a new QueryService. -func NewQueryService(traceReader tracestore.Reader, dependencyReader dependencystore.Reader, options QueryServiceOptions) *QueryService { +func NewQueryService(traceReader tracestore.Reader, dependencyReader depstore.Reader, options QueryServiceOptions) *QueryService { qsvc := &QueryService{ traceReader: traceReader, dependencyReader: dependencyReader, @@ -134,7 +134,10 @@ func (qs QueryService) Adjust(trace *model.Trace) (*model.Trace, error) { // GetDependencies implements dependencystore.Reader.GetDependencies func (qs QueryService) GetDependencies(ctx context.Context, endTs time.Time, lookback time.Duration) ([]model.DependencyLink, error) { - return qs.dependencyReader.GetDependencies(ctx, endTs, lookback) + return qs.dependencyReader.GetDependencies(ctx, depstore.QueryParameters{ + StartTime: endTs.Add(-lookback), + EndTime: endTs, + }) } // GetCapabilities returns the features supported by the query service. diff --git a/cmd/query/app/querysvc/query_service_test.go b/cmd/query/app/querysvc/query_service_test.go index 3a9f906d2b5..9cfd8ec6212 100644 --- a/cmd/query/app/querysvc/query_service_test.go +++ b/cmd/query/app/querysvc/query_service_test.go @@ -22,9 +22,10 @@ import ( "github.com/jaegertracing/jaeger/pkg/testutils" "github.com/jaegertracing/jaeger/storage" "github.com/jaegertracing/jaeger/storage/dependencystore" - depsmocks "github.com/jaegertracing/jaeger/storage/dependencystore/mocks" "github.com/jaegertracing/jaeger/storage/spanstore" spanstoremocks "github.com/jaegertracing/jaeger/storage/spanstore/mocks" + "github.com/jaegertracing/jaeger/storage_v2/depstore" + depsmocks "github.com/jaegertracing/jaeger/storage_v2/depstore/mocks" "github.com/jaegertracing/jaeger/storage_v2/factoryadapter" "github.com/jaegertracing/jaeger/storage_v2/tracestore" ) @@ -345,8 +346,10 @@ func TestGetDependencies(t *testing.T) { tqs.depsReader.On( "GetDependencies", mock.Anything, // context.Context - endTs, - defaultDependencyLookbackDuration).Return(expectedDependencies, nil).Times(1) + depstore.QueryParameters{ + StartTime: endTs.Add(-defaultDependencyLookbackDuration), + EndTime: endTs, + }).Return(expectedDependencies, nil).Times(1) actualDependencies, err := tqs.queryService.GetDependencies(context.Background(), time.Unix(0, 1476374248550*millisToNanosMultiplier), defaultDependencyLookbackDuration) require.NoError(t, err) diff --git a/cmd/query/app/server_test.go b/cmd/query/app/server_test.go index 51db8aeb239..fc2c0d298f0 100644 --- a/cmd/query/app/server_test.go +++ b/cmd/query/app/server_test.go @@ -39,8 +39,8 @@ import ( "github.com/jaegertracing/jaeger/pkg/tenancy" "github.com/jaegertracing/jaeger/ports" "github.com/jaegertracing/jaeger/proto-gen/api_v2" - depsmocks "github.com/jaegertracing/jaeger/storage/dependencystore/mocks" spanstoremocks "github.com/jaegertracing/jaeger/storage/spanstore/mocks" + depsmocks "github.com/jaegertracing/jaeger/storage_v2/depstore/mocks" "github.com/jaegertracing/jaeger/storage_v2/factoryadapter" ) diff --git a/cmd/query/main.go b/cmd/query/main.go index 4868cb28428..1b8b353a7ae 100644 --- a/cmd/query/main.go +++ b/cmd/query/main.go @@ -97,7 +97,7 @@ func main() { if err != nil { logger.Fatal("Failed to create trace reader", zap.Error(err)) } - dependencyReader, err := storageFactory.CreateDependencyReader() + dependencyReader, err := v2Factory.CreateDependencyReader() if err != nil { logger.Fatal("Failed to create dependency reader", zap.Error(err)) } diff --git a/storage_v2/depstore/factory.go b/storage_v2/depstore/factory.go new file mode 100644 index 00000000000..b6818a0adfb --- /dev/null +++ b/storage_v2/depstore/factory.go @@ -0,0 +1,8 @@ +// Copyright (c) 2024 The Jaeger Authors. +// SPDX-License-Identifier: Apache-2.0 + +package depstore + +type Factory interface { + CreateDependencyReader() (Reader, error) +} diff --git a/storage_v2/depstore/mocks/Factory.go b/storage_v2/depstore/mocks/Factory.go new file mode 100644 index 00000000000..7fed1659c69 --- /dev/null +++ b/storage_v2/depstore/mocks/Factory.go @@ -0,0 +1,62 @@ +// Copyright (c) The Jaeger Authors. +// SPDX-License-Identifier: Apache-2.0 +// +// Run 'make generate-mocks' to regenerate. + +// Code generated by mockery. DO NOT EDIT. + +package mocks + +import ( + depstore "github.com/jaegertracing/jaeger/storage_v2/depstore" + mock "github.com/stretchr/testify/mock" +) + +// Factory is an autogenerated mock type for the Factory type +type Factory struct { + mock.Mock +} + +// CreateDependencyReader provides a mock function with no fields +func (_m *Factory) CreateDependencyReader() (depstore.Reader, error) { + ret := _m.Called() + + if len(ret) == 0 { + panic("no return value specified for CreateDependencyReader") + } + + var r0 depstore.Reader + var r1 error + if rf, ok := ret.Get(0).(func() (depstore.Reader, error)); ok { + return rf() + } + if rf, ok := ret.Get(0).(func() depstore.Reader); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(depstore.Reader) + } + } + + if rf, ok := ret.Get(1).(func() error); ok { + r1 = rf() + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// NewFactory creates a new instance of Factory. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewFactory(t interface { + mock.TestingT + Cleanup(func()) +}) *Factory { + mock := &Factory{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/storage_v2/depstore/mocks/Reader.go b/storage_v2/depstore/mocks/Reader.go new file mode 100644 index 00000000000..b997a3d3036 --- /dev/null +++ b/storage_v2/depstore/mocks/Reader.go @@ -0,0 +1,66 @@ +// Copyright (c) The Jaeger Authors. +// SPDX-License-Identifier: Apache-2.0 +// +// Run 'make generate-mocks' to regenerate. + +// Code generated by mockery. DO NOT EDIT. + +package mocks + +import ( + context "context" + + depstore "github.com/jaegertracing/jaeger/storage_v2/depstore" + mock "github.com/stretchr/testify/mock" + + model "github.com/jaegertracing/jaeger/model" +) + +// Reader is an autogenerated mock type for the Reader type +type Reader struct { + mock.Mock +} + +// GetDependencies provides a mock function with given fields: ctx, query +func (_m *Reader) GetDependencies(ctx context.Context, query depstore.QueryParameters) ([]model.DependencyLink, error) { + ret := _m.Called(ctx, query) + + if len(ret) == 0 { + panic("no return value specified for GetDependencies") + } + + var r0 []model.DependencyLink + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, depstore.QueryParameters) ([]model.DependencyLink, error)); ok { + return rf(ctx, query) + } + if rf, ok := ret.Get(0).(func(context.Context, depstore.QueryParameters) []model.DependencyLink); ok { + r0 = rf(ctx, query) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]model.DependencyLink) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, depstore.QueryParameters) error); ok { + r1 = rf(ctx, query) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// NewReader creates a new instance of Reader. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewReader(t interface { + mock.TestingT + Cleanup(func()) +}) *Reader { + mock := &Reader{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/storage_v2/depstore/package_test.go b/storage_v2/depstore/package_test.go new file mode 100644 index 00000000000..bb0d70c2b01 --- /dev/null +++ b/storage_v2/depstore/package_test.go @@ -0,0 +1,14 @@ +// Copyright (c) 2024 The Jaeger Authors. +// SPDX-License-Identifier: Apache-2.0 + +package depstore + +import ( + "testing" + + "github.com/jaegertracing/jaeger/pkg/testutils" +) + +func TestMain(m *testing.M) { + testutils.VerifyGoLeaks(m) +} diff --git a/storage_v2/depstore/reader.go b/storage_v2/depstore/reader.go new file mode 100644 index 00000000000..436dfb62187 --- /dev/null +++ b/storage_v2/depstore/reader.go @@ -0,0 +1,22 @@ +// Copyright (c) 2024 The Jaeger Authors. +// SPDX-License-Identifier: Apache-2.0 + +package depstore + +import ( + "context" + "time" + + "github.com/jaegertracing/jaeger/model" +) + +// QueryParameters contains the parameters that can be used to query dependencies. +type QueryParameters struct { + StartTime time.Time + EndTime time.Time +} + +// Reader can load service dependencies from storage. +type Reader interface { + GetDependencies(ctx context.Context, query QueryParameters) ([]model.DependencyLink, error) +} diff --git a/storage_v2/factory.go b/storage_v2/factory.go deleted file mode 100644 index e7515549e05..00000000000 --- a/storage_v2/factory.go +++ /dev/null @@ -1,21 +0,0 @@ -// Copyright (c) 2024 The Jaeger Authors. -// SPDX-License-Identifier: Apache-2.0 - -package storage_v2 - -import ( - "context" -) - -// Factory is a general factory interface to be reused across different storage factories. -// It lives within the OTEL collector extension component's lifecycle. -// The Initialize and Close functions supposed to be called from the -// OTEL component's Start and Shutdown functions. -type FactoryBase interface { - // Initialize performs internal initialization of the factory, - // such as opening connections to the backend store. - Initialize(ctx context.Context) error - - // Close closes the resources held by the factory - Close(ctx context.Context) error -} diff --git a/storage_v2/factoryadapter/factory.go b/storage_v2/factoryadapter/factory.go index 56356a70262..1eaaea146ed 100644 --- a/storage_v2/factoryadapter/factory.go +++ b/storage_v2/factoryadapter/factory.go @@ -8,6 +8,7 @@ import ( "io" storage_v1 "github.com/jaegertracing/jaeger/storage" + "github.com/jaegertracing/jaeger/storage_v2/depstore" "github.com/jaegertracing/jaeger/storage_v2/tracestore" ) @@ -15,7 +16,7 @@ type Factory struct { ss storage_v1.Factory } -func NewFactory(ss storage_v1.Factory) tracestore.Factory { +func NewFactory(ss storage_v1.Factory) *Factory { return &Factory{ ss: ss, } @@ -51,3 +52,12 @@ func (f *Factory) CreateTraceWriter() (tracestore.Writer, error) { } return NewTraceWriter(spanWriter), nil } + +// CreateDependencyReader implements depstore.Factory. +func (f *Factory) CreateDependencyReader() (depstore.Reader, error) { + dr, err := f.ss.CreateDependencyReader() + if err != nil { + return nil, err + } + return NewDependencyReader(dr), nil +} diff --git a/storage_v2/factoryadapter/factory_test.go b/storage_v2/factoryadapter/factory_test.go index a8e2819ce0f..d8794adf327 100644 --- a/storage_v2/factoryadapter/factory_test.go +++ b/storage_v2/factoryadapter/factory_test.go @@ -11,6 +11,7 @@ import ( "github.com/stretchr/testify/require" "github.com/jaegertracing/jaeger/plugin/storage/grpc" + dependencyStoreMocks "github.com/jaegertracing/jaeger/storage/dependencystore/mocks" factoryMocks "github.com/jaegertracing/jaeger/storage/mocks" spanstoreMocks "github.com/jaegertracing/jaeger/storage/spanstore/mocks" ) @@ -71,3 +72,24 @@ func TestAdapterCreateTraceWriter(t *testing.T) { _, err := f.CreateTraceWriter() require.NoError(t, err) } + +func TestAdapterCreateDependencyReader(t *testing.T) { + f1 := new(factoryMocks.Factory) + f1.On("CreateDependencyReader").Return(new(dependencyStoreMocks.Reader), nil) + + f := NewFactory(f1) + r, err := f.CreateDependencyReader() + require.NoError(t, err) + require.NotNil(t, r) +} + +func TestAdapterCreateDependencyReaderError(t *testing.T) { + f1 := new(factoryMocks.Factory) + testErr := errors.New("test error") + f1.On("CreateDependencyReader").Return(nil, testErr) + + f := NewFactory(f1) + r, err := f.CreateDependencyReader() + require.ErrorIs(t, err, testErr) + require.Nil(t, r) +} diff --git a/storage_v2/factoryadapter/reader.go b/storage_v2/factoryadapter/reader.go index 2bc5e0b5936..021870f03a6 100644 --- a/storage_v2/factoryadapter/reader.go +++ b/storage_v2/factoryadapter/reader.go @@ -10,7 +10,10 @@ import ( "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/ptrace" + "github.com/jaegertracing/jaeger/model" + "github.com/jaegertracing/jaeger/storage/dependencystore" "github.com/jaegertracing/jaeger/storage/spanstore" + "github.com/jaegertracing/jaeger/storage_v2/depstore" "github.com/jaegertracing/jaeger/storage_v2/tracestore" ) @@ -52,3 +55,20 @@ func (*TraceReader) FindTraces(_ context.Context, _ tracestore.TraceQueryParamet func (*TraceReader) FindTraceIDs(_ context.Context, _ tracestore.TraceQueryParameters) ([]pcommon.TraceID, error) { panic("not implemented") } + +type DependencyReader struct { + reader dependencystore.Reader +} + +func NewDependencyReader(reader dependencystore.Reader) *DependencyReader { + return &DependencyReader{ + reader: reader, + } +} + +func (dr *DependencyReader) GetDependencies( + ctx context.Context, + query depstore.QueryParameters, +) ([]model.DependencyLink, error) { + return dr.reader.GetDependencies(ctx, query.EndTime, query.EndTime.Sub(query.StartTime)) +} diff --git a/storage_v2/factoryadapter/reader_test.go b/storage_v2/factoryadapter/reader_test.go index 85c245f7a70..7148edf40d7 100644 --- a/storage_v2/factoryadapter/reader_test.go +++ b/storage_v2/factoryadapter/reader_test.go @@ -6,12 +6,17 @@ package factoryadapter import ( "context" "testing" + "time" + "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/ptrace" + "github.com/jaegertracing/jaeger/model" "github.com/jaegertracing/jaeger/plugin/storage/memory" + dependencyStoreMocks "github.com/jaegertracing/jaeger/storage/dependencystore/mocks" + "github.com/jaegertracing/jaeger/storage_v2/depstore" "github.com/jaegertracing/jaeger/storage_v2/tracestore" ) @@ -101,3 +106,19 @@ func TestTraceReader_FindTraceIDsPanics(t *testing.T) { func() { traceReader.FindTraceIDs(context.Background(), tracestore.TraceQueryParameters{}) }, ) } + +func TestDependencyReader_GetDependencies(t *testing.T) { + end := time.Now() + start := end.Add(-1 * time.Minute) + query := depstore.QueryParameters{ + StartTime: start, + EndTime: end, + } + expectedDeps := []model.DependencyLink{{Parent: "parent", Child: "child", CallCount: 12}} + mr := new(dependencyStoreMocks.Reader) + mr.On("GetDependencies", mock.Anything, end, time.Minute).Return(expectedDeps, nil) + dr := NewDependencyReader(mr) + deps, err := dr.GetDependencies(context.Background(), query) + require.NoError(t, err) + require.Equal(t, expectedDeps, deps) +} diff --git a/storage_v2/tracestore/factory.go b/storage_v2/tracestore/factory.go index 0f670946562..bf081f0fb45 100644 --- a/storage_v2/tracestore/factory.go +++ b/storage_v2/tracestore/factory.go @@ -3,15 +3,9 @@ package tracestore -import ( - "github.com/jaegertracing/jaeger/storage_v2" -) - // Factory defines an interface for a factory that can create implementations of // different span storage components. type Factory interface { - storage_v2.FactoryBase - // CreateTraceReader creates a spanstore.Reader. CreateTraceReader() (Reader, error) diff --git a/storage_v2/tracestore/mocks/Factory.go b/storage_v2/tracestore/mocks/Factory.go index 922d57ae2c4..095cac10d0f 100644 --- a/storage_v2/tracestore/mocks/Factory.go +++ b/storage_v2/tracestore/mocks/Factory.go @@ -8,8 +8,6 @@ package mocks import ( - context "context" - tracestore "github.com/jaegertracing/jaeger/storage_v2/tracestore" mock "github.com/stretchr/testify/mock" ) @@ -19,24 +17,6 @@ type Factory struct { mock.Mock } -// Close provides a mock function with given fields: ctx -func (_m *Factory) Close(ctx context.Context) error { - ret := _m.Called(ctx) - - if len(ret) == 0 { - panic("no return value specified for Close") - } - - var r0 error - if rf, ok := ret.Get(0).(func(context.Context) error); ok { - r0 = rf(ctx) - } else { - r0 = ret.Error(0) - } - - return r0 -} - // CreateTraceReader provides a mock function with no fields func (_m *Factory) CreateTraceReader() (tracestore.Reader, error) { ret := _m.Called() @@ -97,24 +77,6 @@ func (_m *Factory) CreateTraceWriter() (tracestore.Writer, error) { return r0, r1 } -// Initialize provides a mock function with given fields: ctx -func (_m *Factory) Initialize(ctx context.Context) error { - ret := _m.Called(ctx) - - if len(ret) == 0 { - panic("no return value specified for Initialize") - } - - var r0 error - if rf, ok := ret.Get(0).(func(context.Context) error); ok { - r0 = rf(ctx) - } else { - r0 = ret.Error(0) - } - - return r0 -} - // NewFactory creates a new instance of Factory. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. // The first argument is typically a *testing.T value. func NewFactory(t interface {