From b6c45b86afbe3017c179cdb2171ac799e2fc120b Mon Sep 17 00:00:00 2001 From: Yuri Shkuro Date: Sun, 16 Jun 2024 13:05:08 -0400 Subject: [PATCH] Remove numBuckets argument from CreateSamplingStore() API Signed-off-by: Yuri Shkuro --- cmd/collector/app/server/package_test.go | 24 +++++++++++ cmd/collector/app/server/test.go | 43 ------------------- .../storageexporter/exporter_test.go | 4 +- .../extension/jaegerstorage/config.go | 9 +--- .../extension/jaegerstorage/extension.go | 5 +-- .../extension/jaegerstorage/extension_test.go | 6 +-- pkg/memory/config/empty_test.go | 25 ----------- .../strategyprovider/adaptive/factory.go | 2 +- .../strategyprovider/adaptive/factory_test.go | 2 +- .../sampling/strategyprovider/factory_test.go | 2 +- plugin/storage/badger/factory.go | 2 +- plugin/storage/cassandra/factory.go | 2 +- plugin/storage/cassandra/factory_test.go | 2 +- plugin/storage/es/factory.go | 10 +++-- plugin/storage/es/factory_test.go | 4 +- .../storage/integration/badgerstore_test.go | 2 +- plugin/storage/integration/cassandra_test.go | 2 +- .../storage/integration/elasticsearch_test.go | 2 +- .../storage/memory}/config.go | 7 ++- plugin/storage/memory/factory.go | 14 +++--- plugin/storage/memory/factory_test.go | 12 +++--- plugin/storage/memory/memory.go | 13 +++--- plugin/storage/memory/memory_test.go | 3 +- plugin/storage/memory/options.go | 13 +++--- plugin/storage/memory/options_test.go | 8 +++- storage/factory.go | 2 +- 26 files changed, 91 insertions(+), 129 deletions(-) delete mode 100644 cmd/collector/app/server/test.go delete mode 100644 pkg/memory/config/empty_test.go rename {pkg/memory/config => plugin/storage/memory}/config.go (69%) diff --git a/cmd/collector/app/server/package_test.go b/cmd/collector/app/server/package_test.go index 2cf94b646ba..52a0d986764 100644 --- a/cmd/collector/app/server/package_test.go +++ b/cmd/collector/app/server/package_test.go @@ -15,11 +15,35 @@ package server import ( + "context" "testing" + "github.com/jaegertracing/jaeger/cmd/collector/app/processor" + "github.com/jaegertracing/jaeger/model" "github.com/jaegertracing/jaeger/pkg/testutils" + "github.com/jaegertracing/jaeger/proto-gen/api_v2" ) +type mockSamplingProvider struct{} + +func (mockSamplingProvider) GetSamplingStrategy(context.Context, string /* serviceName */) (*api_v2.SamplingStrategyResponse, error) { + return nil, nil +} + +func (mockSamplingProvider) Close() error { + return nil +} + +type mockSpanProcessor struct{} + +func (*mockSpanProcessor) Close() error { + return nil +} + +func (*mockSpanProcessor) ProcessSpans([]*model.Span, processor.SpansOptions) ([]bool, error) { + return []bool{}, nil +} + func TestMain(m *testing.M) { testutils.VerifyGoLeaks(m) } diff --git a/cmd/collector/app/server/test.go b/cmd/collector/app/server/test.go deleted file mode 100644 index f1acc24c7f9..00000000000 --- a/cmd/collector/app/server/test.go +++ /dev/null @@ -1,43 +0,0 @@ -// Copyright (c) 2020 The Jaeger Authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package server - -import ( - "context" - - "github.com/jaegertracing/jaeger/cmd/collector/app/processor" - "github.com/jaegertracing/jaeger/model" - "github.com/jaegertracing/jaeger/proto-gen/api_v2" -) - -type mockSamplingProvider struct{} - -func (mockSamplingProvider) GetSamplingStrategy(context.Context, string /* serviceName */) (*api_v2.SamplingStrategyResponse, error) { - return nil, nil -} - -func (mockSamplingProvider) Close() error { - return nil -} - -type mockSpanProcessor struct{} - -func (*mockSpanProcessor) Close() error { - return nil -} - -func (*mockSpanProcessor) ProcessSpans([]*model.Span, processor.SpansOptions) ([]bool, error) { - return []bool{}, nil -} diff --git a/cmd/jaeger/internal/exporters/storageexporter/exporter_test.go b/cmd/jaeger/internal/exporters/storageexporter/exporter_test.go index 1934231c943..81bb21a8424 100644 --- a/cmd/jaeger/internal/exporters/storageexporter/exporter_test.go +++ b/cmd/jaeger/internal/exporters/storageexporter/exporter_test.go @@ -31,7 +31,7 @@ import ( "github.com/jaegertracing/jaeger/cmd/jaeger/internal/extension/jaegerstorage" "github.com/jaegertracing/jaeger/model" - memoryCfg "github.com/jaegertracing/jaeger/pkg/memory/config" + "github.com/jaegertracing/jaeger/plugin/storage/memory" ) type storageHost struct { @@ -143,7 +143,7 @@ func makeStorageExtension(t *testing.T, memstoreName string) storageHost { TracerProvider: nooptrace.NewTracerProvider(), }, }, - &jaegerstorage.Config{Memory: map[string]memoryCfg.Configuration{ + &jaegerstorage.Config{Memory: map[string]memory.Configuration{ memstoreName: {MaxTraces: 10000}, }}) require.NoError(t, err) diff --git a/cmd/jaeger/internal/extension/jaegerstorage/config.go b/cmd/jaeger/internal/extension/jaegerstorage/config.go index 88b43f9f56a..fede0c0f966 100644 --- a/cmd/jaeger/internal/extension/jaegerstorage/config.go +++ b/cmd/jaeger/internal/extension/jaegerstorage/config.go @@ -8,15 +8,15 @@ import ( "reflect" esCfg "github.com/jaegertracing/jaeger/pkg/es/config" - memoryCfg "github.com/jaegertracing/jaeger/pkg/memory/config" badgerCfg "github.com/jaegertracing/jaeger/plugin/storage/badger" "github.com/jaegertracing/jaeger/plugin/storage/cassandra" grpcCfg "github.com/jaegertracing/jaeger/plugin/storage/grpc" + "github.com/jaegertracing/jaeger/plugin/storage/memory" ) // Config has the configuration for jaeger-query, type Config struct { - Memory map[string]memoryCfg.Configuration `mapstructure:"memory"` + Memory map[string]memory.Configuration `mapstructure:"memory"` Badger map[string]badgerCfg.NamespaceConfig `mapstructure:"badger"` GRPC map[string]grpcCfg.ConfigV2 `mapstructure:"grpc"` Opensearch map[string]esCfg.Configuration `mapstructure:"opensearch"` @@ -27,11 +27,6 @@ type Config struct { // Option: instead of looking for specific name, check interface. } -type MemoryStorage struct { - Name string `mapstructure:"name"` - memoryCfg.Configuration -} - func (cfg *Config) Validate() error { emptyCfg := createDefaultConfig().(*Config) if reflect.DeepEqual(*cfg, *emptyCfg) { diff --git a/cmd/jaeger/internal/extension/jaegerstorage/extension.go b/cmd/jaeger/internal/extension/jaegerstorage/extension.go index 3bb1073ddf7..aa8cd484651 100644 --- a/cmd/jaeger/internal/extension/jaegerstorage/extension.go +++ b/cmd/jaeger/internal/extension/jaegerstorage/extension.go @@ -14,7 +14,6 @@ import ( "go.uber.org/zap" esCfg "github.com/jaegertracing/jaeger/pkg/es/config" - memoryCfg "github.com/jaegertracing/jaeger/pkg/memory/config" "github.com/jaegertracing/jaeger/pkg/metrics" "github.com/jaegertracing/jaeger/plugin/storage/badger" "github.com/jaegertracing/jaeger/plugin/storage/cassandra" @@ -96,13 +95,13 @@ func (s *starter[Config, Factory]) build(_ context.Context, _ component.Host) er } func (s *storageExt) Start(ctx context.Context, host component.Host) error { - memStarter := &starter[memoryCfg.Configuration, *memory.Factory]{ + memStarter := &starter[memory.Configuration, *memory.Factory]{ ext: s, storageKind: "memory", cfg: s.config.Memory, // memory factory does not return an error, so need to wrap it builder: func( - cfg memoryCfg.Configuration, + cfg memory.Configuration, metricsFactory metrics.Factory, logger *zap.Logger, ) (*memory.Factory, error) { diff --git a/cmd/jaeger/internal/extension/jaegerstorage/extension_test.go b/cmd/jaeger/internal/extension/jaegerstorage/extension_test.go index d5a6c6c29bf..4cf1f3cc781 100644 --- a/cmd/jaeger/internal/extension/jaegerstorage/extension_test.go +++ b/cmd/jaeger/internal/extension/jaegerstorage/extension_test.go @@ -19,10 +19,10 @@ import ( "go.uber.org/zap" esCfg "github.com/jaegertracing/jaeger/pkg/es/config" - memoryCfg "github.com/jaegertracing/jaeger/pkg/memory/config" "github.com/jaegertracing/jaeger/pkg/metrics" "github.com/jaegertracing/jaeger/pkg/testutils" badgerCfg "github.com/jaegertracing/jaeger/plugin/storage/badger" + "github.com/jaegertracing/jaeger/plugin/storage/memory" "github.com/jaegertracing/jaeger/storage" "github.com/jaegertracing/jaeger/storage/dependencystore" "github.com/jaegertracing/jaeger/storage/spanstore" @@ -80,7 +80,7 @@ func TestStorageExtensionConfigError(t *testing.T) { func TestStorageExtensionNameConflict(t *testing.T) { storageExtension := makeStorageExtenion(t, &Config{ - Memory: map[string]memoryCfg.Configuration{ + Memory: map[string]memory.Configuration{ "foo": {MaxTraces: 10000}, }, Badger: map[string]badgerCfg.NamespaceConfig{ @@ -219,7 +219,7 @@ func makeStorageExtenion(t *testing.T, config *Config) component.Component { func startStorageExtension(t *testing.T, memstoreName string) component.Component { config := &Config{ - Memory: map[string]memoryCfg.Configuration{ + Memory: map[string]memory.Configuration{ memstoreName: {MaxTraces: 10000}, }, } diff --git a/pkg/memory/config/empty_test.go b/pkg/memory/config/empty_test.go deleted file mode 100644 index 6557ace0cb9..00000000000 --- a/pkg/memory/config/empty_test.go +++ /dev/null @@ -1,25 +0,0 @@ -// Copyright (c) 2019 The Jaeger Authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package config - -import ( - "testing" - - "github.com/jaegertracing/jaeger/pkg/testutils" -) - -func TestMain(m *testing.M) { - testutils.VerifyGoLeaks(m) -} diff --git a/plugin/sampling/strategyprovider/adaptive/factory.go b/plugin/sampling/strategyprovider/adaptive/factory.go index 232a007d198..589b6ed8354 100644 --- a/plugin/sampling/strategyprovider/adaptive/factory.go +++ b/plugin/sampling/strategyprovider/adaptive/factory.go @@ -78,7 +78,7 @@ func (f *Factory) Initialize(metricsFactory metrics.Factory, ssFactory storage.S if err != nil { return err } - f.store, err = ssFactory.CreateSamplingStore(f.options.AggregationBuckets) + f.store, err = ssFactory.CreateSamplingStore() if err != nil { return err } diff --git a/plugin/sampling/strategyprovider/adaptive/factory_test.go b/plugin/sampling/strategyprovider/adaptive/factory_test.go index 68db90da9f8..d8bc1500b79 100644 --- a/plugin/sampling/strategyprovider/adaptive/factory_test.go +++ b/plugin/sampling/strategyprovider/adaptive/factory_test.go @@ -131,7 +131,7 @@ func (m *mockSamplingStoreFactory) CreateLock() (distributedlock.Lock, error) { return mockLock, nil } -func (m *mockSamplingStoreFactory) CreateSamplingStore(int /* maxBuckets */) (samplingstore.Store, error) { +func (m *mockSamplingStoreFactory) CreateSamplingStore() (samplingstore.Store, error) { if m.storeFailsWith != nil { return nil, m.storeFailsWith } diff --git a/plugin/sampling/strategyprovider/factory_test.go b/plugin/sampling/strategyprovider/factory_test.go index 1095390d010..67236defbd5 100644 --- a/plugin/sampling/strategyprovider/factory_test.go +++ b/plugin/sampling/strategyprovider/factory_test.go @@ -159,6 +159,6 @@ func (*mockSamplingStoreFactory) CreateLock() (distributedlock.Lock, error) { return nil, nil } -func (*mockSamplingStoreFactory) CreateSamplingStore(int /* maxBuckets */) (samplingstore.Store, error) { +func (*mockSamplingStoreFactory) CreateSamplingStore() (samplingstore.Store, error) { return nil, nil } diff --git a/plugin/storage/badger/factory.go b/plugin/storage/badger/factory.go index 264d8995776..525b641f36a 100644 --- a/plugin/storage/badger/factory.go +++ b/plugin/storage/badger/factory.go @@ -199,7 +199,7 @@ func (f *Factory) CreateDependencyReader() (dependencystore.Reader, error) { } // CreateSamplingStore implements storage.SamplingStoreFactory -func (f *Factory) CreateSamplingStore(int /* maxBuckets */) (samplingstore.Store, error) { +func (f *Factory) CreateSamplingStore() (samplingstore.Store, error) { return badgerSampling.NewSamplingStore(f.store), nil } diff --git a/plugin/storage/cassandra/factory.go b/plugin/storage/cassandra/factory.go index a3bcc895f95..924a11c013d 100644 --- a/plugin/storage/cassandra/factory.go +++ b/plugin/storage/cassandra/factory.go @@ -218,7 +218,7 @@ func (f *Factory) CreateLock() (distributedlock.Lock, error) { } // CreateSamplingStore implements storage.SamplingStoreFactory -func (f *Factory) CreateSamplingStore(int /* maxBuckets */) (samplingstore.Store, error) { +func (f *Factory) CreateSamplingStore() (samplingstore.Store, error) { return cSamplingStore.New(f.primarySession, f.primaryMetricsFactory, f.logger), nil } diff --git a/plugin/storage/cassandra/factory_test.go b/plugin/storage/cassandra/factory_test.go index 448a23eead9..b774650b020 100644 --- a/plugin/storage/cassandra/factory_test.go +++ b/plugin/storage/cassandra/factory_test.go @@ -103,7 +103,7 @@ func TestCassandraFactory(t *testing.T) { _, err = f.CreateLock() require.NoError(t, err) - _, err = f.CreateSamplingStore(0) + _, err = f.CreateSamplingStore() require.NoError(t, err) require.NoError(t, f.Close()) diff --git a/plugin/storage/es/factory.go b/plugin/storage/es/factory.go index 267254d260d..34750143c64 100644 --- a/plugin/storage/es/factory.go +++ b/plugin/storage/es/factory.go @@ -54,9 +54,11 @@ const ( var ( // interface comformance checks _ storage.Factory = (*Factory)(nil) _ storage.ArchiveFactory = (*Factory)(nil) - _ io.Closer = (*Factory)(nil) - _ plugin.Configurable = (*Factory)(nil) - _ storage.Purger = (*Factory)(nil) + // TODO does not implement CreateLock ! + // _ storage.SamplingStoreFactory = (*Factory)(nil) + _ io.Closer = (*Factory)(nil) + _ plugin.Configurable = (*Factory)(nil) + _ storage.Purger = (*Factory)(nil) ) // Factory implements storage.Factory for Elasticsearch backend. @@ -296,7 +298,7 @@ func createSpanWriter( return writer, nil } -func (f *Factory) CreateSamplingStore(int /* maxBuckets */) (samplingstore.Store, error) { +func (f *Factory) CreateSamplingStore() (samplingstore.Store, error) { params := esSampleStore.Params{ Client: f.getPrimaryClient, Logger: f.logger, diff --git a/plugin/storage/es/factory_test.go b/plugin/storage/es/factory_test.go index 1ea353fb685..8dc47c764f1 100644 --- a/plugin/storage/es/factory_test.go +++ b/plugin/storage/es/factory_test.go @@ -107,7 +107,7 @@ func TestElasticsearchFactory(t *testing.T) { _, err = f.CreateArchiveSpanWriter() require.NoError(t, err) - _, err = f.CreateSamplingStore(1) + _, err = f.CreateSamplingStore() require.NoError(t, err) require.NoError(t, f.Close()) @@ -217,7 +217,7 @@ func TestCreateTemplateError(t *testing.T) { assert.Nil(t, w) require.Error(t, err, "template-error") - s, err := f.CreateSamplingStore(1) + s, err := f.CreateSamplingStore() assert.Nil(t, s) require.Error(t, err, "template-error") } diff --git a/plugin/storage/integration/badgerstore_test.go b/plugin/storage/integration/badgerstore_test.go index 0ddb3a023c3..611fe53ab5d 100644 --- a/plugin/storage/integration/badgerstore_test.go +++ b/plugin/storage/integration/badgerstore_test.go @@ -48,7 +48,7 @@ func (s *BadgerIntegrationStorage) initialize(t *testing.T) { s.SpanReader, err = s.factory.CreateSpanReader() require.NoError(t, err) - s.SamplingStore, err = s.factory.CreateSamplingStore(0) + s.SamplingStore, err = s.factory.CreateSamplingStore() require.NoError(t, err) } diff --git a/plugin/storage/integration/cassandra_test.go b/plugin/storage/integration/cassandra_test.go index f61af7d1f84..e07b2f7b045 100644 --- a/plugin/storage/integration/cassandra_test.go +++ b/plugin/storage/integration/cassandra_test.go @@ -77,7 +77,7 @@ func (s *CassandraStorageIntegration) initializeCassandra(t *testing.T) { require.NoError(t, err) s.ArchiveSpanWriter, err = f.CreateArchiveSpanWriter() require.NoError(t, err) - s.SamplingStore, err = f.CreateSamplingStore(0) + s.SamplingStore, err = f.CreateSamplingStore() require.NoError(t, err) s.initializeDependencyReaderAndWriter(t, f) t.Cleanup(func() { diff --git a/plugin/storage/integration/elasticsearch_test.go b/plugin/storage/integration/elasticsearch_test.go index 3f36334f7f9..541d8375c36 100644 --- a/plugin/storage/integration/elasticsearch_test.go +++ b/plugin/storage/integration/elasticsearch_test.go @@ -151,7 +151,7 @@ func (s *ESStorageIntegration) initSpanstore(t *testing.T, allTagsAsFields bool) require.NoError(t, err) s.DependencyWriter = s.DependencyReader.(dependencystore.Writer) - s.SamplingStore, err = f.CreateSamplingStore(1) + s.SamplingStore, err = f.CreateSamplingStore() require.NoError(t, err) } diff --git a/pkg/memory/config/config.go b/plugin/storage/memory/config.go similarity index 69% rename from pkg/memory/config/config.go rename to plugin/storage/memory/config.go index 0fb59c36359..d6b86df032b 100644 --- a/pkg/memory/config/config.go +++ b/plugin/storage/memory/config.go @@ -12,9 +12,14 @@ // See the License for the specific language governing permissions and // limitations under the License. -package config +package memory // Configuration describes the options to customize the storage behavior type Configuration struct { + // MaxTraces is the maximum number of traces that can be stored in memory in FIFO manner. MaxTraces int `mapstructure:"max_traces"` + + // SamplingAggregationBuckets is used with adaptive sampling to control how many buckets + // of trace throughput is stored in memory. + SamplingAggregationBuckets int `mapstructure:"sampling_aggregation_buckets"` } diff --git a/plugin/storage/memory/factory.go b/plugin/storage/memory/factory.go index 2c9fe0b5fda..acfc6ebf42e 100644 --- a/plugin/storage/memory/factory.go +++ b/plugin/storage/memory/factory.go @@ -23,7 +23,6 @@ import ( "github.com/jaegertracing/jaeger/internal/safeexpvar" "github.com/jaegertracing/jaeger/pkg/distributedlock" - "github.com/jaegertracing/jaeger/pkg/memory/config" "github.com/jaegertracing/jaeger/pkg/metrics" "github.com/jaegertracing/jaeger/plugin" "github.com/jaegertracing/jaeger/storage" @@ -54,12 +53,12 @@ func NewFactory() *Factory { // NewFactoryWithConfig is used from jaeger(v2). func NewFactoryWithConfig( - cfg config.Configuration, + cfg Configuration, metricsFactory metrics.Factory, logger *zap.Logger, ) *Factory { f := NewFactory() - f.configureFromOptions(Options{Configuration: cfg}) + f.configureFromOptions(Options{Config: cfg}) _ = f.Initialize(metricsFactory, logger) return f } @@ -82,7 +81,7 @@ func (f *Factory) configureFromOptions(opts Options) { // Initialize implements storage.Factory func (f *Factory) Initialize(metricsFactory metrics.Factory, logger *zap.Logger) error { f.metricsFactory, f.logger = metricsFactory, logger - f.store = WithConfiguration(f.options.Configuration) + f.store = WithConfiguration(f.options.Config) logger.Info("Memory storage initialized", zap.Any("configuration", f.store.defaultConfig)) f.publishOpts() @@ -115,8 +114,8 @@ func (f *Factory) CreateDependencyReader() (dependencystore.Reader, error) { } // CreateSamplingStore implements storage.SamplingStoreFactory -func (*Factory) CreateSamplingStore(maxBuckets int) (samplingstore.Store, error) { - return NewSamplingStore(maxBuckets), nil +func (f *Factory) CreateSamplingStore() (samplingstore.Store, error) { + return NewSamplingStore(f.options.Config.SamplingAggregationBuckets), nil } // CreateLock implements storage.SamplingStoreFactory @@ -125,5 +124,6 @@ func (*Factory) CreateLock() (distributedlock.Lock, error) { } func (f *Factory) publishOpts() { - safeexpvar.SetInt("jaeger_storage_memory_max_traces", int64(f.options.Configuration.MaxTraces)) + safeexpvar.SetInt("jaeger_storage_memory_max_traces", int64(f.options.Config.MaxTraces)) + safeexpvar.SetInt("jaeger_storage_memory_sampling_aggregation_buckets", int64(f.options.Config.MaxTraces)) } diff --git a/plugin/storage/memory/factory_test.go b/plugin/storage/memory/factory_test.go index 5c43799b704..68bb35a84a3 100644 --- a/plugin/storage/memory/factory_test.go +++ b/plugin/storage/memory/factory_test.go @@ -24,7 +24,6 @@ import ( "go.uber.org/zap" "github.com/jaegertracing/jaeger/pkg/config" - memCfg "github.com/jaegertracing/jaeger/pkg/memory/config" "github.com/jaegertracing/jaeger/pkg/metrics" "github.com/jaegertracing/jaeger/storage" ) @@ -44,9 +43,10 @@ func TestMemoryStorageFactory(t *testing.T) { depReader, err := f.CreateDependencyReader() require.NoError(t, err) assert.Equal(t, f.store, depReader) - samplingStore, err := f.CreateSamplingStore(2) + f.options.Config.SamplingAggregationBuckets = 123 + samplingStore, err := f.CreateSamplingStore() require.NoError(t, err) - assert.Equal(t, 2, samplingStore.(*SamplingStore).maxBuckets) + assert.Equal(t, 123, samplingStore.(*SamplingStore).maxBuckets) lock, err := f.CreateLock() require.NoError(t, err) assert.NotNil(t, lock) @@ -57,15 +57,15 @@ func TestWithConfiguration(t *testing.T) { v, command := config.Viperize(f.AddFlags) command.ParseFlags([]string{"--memory.max-traces=100"}) f.InitFromViper(v, zap.NewNop()) - assert.Equal(t, 100, f.options.Configuration.MaxTraces) + assert.Equal(t, 100, f.options.Config.MaxTraces) } func TestNewFactoryWithConfig(t *testing.T) { - cfg := memCfg.Configuration{ + cfg := Configuration{ MaxTraces: 42, } f := NewFactoryWithConfig(cfg, metrics.NullFactory, zap.NewNop()) - assert.Equal(t, cfg, f.options.Configuration) + assert.Equal(t, cfg, f.options.Config) } func TestPublishOpts(t *testing.T) { diff --git a/plugin/storage/memory/memory.go b/plugin/storage/memory/memory.go index e7c8c3fb0a0..2e89a6c00de 100644 --- a/plugin/storage/memory/memory.go +++ b/plugin/storage/memory/memory.go @@ -26,7 +26,6 @@ import ( "github.com/jaegertracing/jaeger/model" "github.com/jaegertracing/jaeger/model/adjuster" - "github.com/jaegertracing/jaeger/pkg/memory/config" "github.com/jaegertracing/jaeger/pkg/tenancy" "github.com/jaegertracing/jaeger/storage/spanstore" ) @@ -36,7 +35,7 @@ type Store struct { sync.RWMutex // Each tenant gets a copy of default config. // In the future this can be extended to contain per-tenant configuration. - defaultConfig config.Configuration + defaultConfig Configuration perTenant map[string]*Tenant } @@ -48,24 +47,24 @@ type Tenant struct { services map[string]struct{} operations map[string]map[spanstore.Operation]struct{} deduper adjuster.Adjuster - config config.Configuration + config Configuration index int } // NewStore creates an unbounded in-memory store func NewStore() *Store { - return WithConfiguration(config.Configuration{MaxTraces: 0}) + return WithConfiguration(Configuration{MaxTraces: 0}) } // WithConfiguration creates a new in memory storage based on the given configuration -func WithConfiguration(configuration config.Configuration) *Store { +func WithConfiguration(config Configuration) *Store { return &Store{ - defaultConfig: configuration, + defaultConfig: config, perTenant: make(map[string]*Tenant), } } -func newTenant(cfg config.Configuration) *Tenant { +func newTenant(cfg Configuration) *Tenant { return &Tenant{ ids: make([]*model.TraceID, cfg.MaxTraces), traces: map[model.TraceID]*model.Trace{}, diff --git a/plugin/storage/memory/memory_test.go b/plugin/storage/memory/memory_test.go index a1bc0500909..72e62a2c980 100644 --- a/plugin/storage/memory/memory_test.go +++ b/plugin/storage/memory/memory_test.go @@ -24,7 +24,6 @@ import ( "github.com/stretchr/testify/require" "github.com/jaegertracing/jaeger/model" - "github.com/jaegertracing/jaeger/pkg/memory/config" "github.com/jaegertracing/jaeger/pkg/tenancy" "github.com/jaegertracing/jaeger/storage/spanstore" ) @@ -170,7 +169,7 @@ func TestStoreWriteSpan(t *testing.T) { func TestStoreWithLimit(t *testing.T) { maxTraces := 100 - store := WithConfiguration(config.Configuration{MaxTraces: maxTraces}) + store := WithConfiguration(Configuration{MaxTraces: maxTraces}) for i := 0; i < maxTraces*2; i++ { id := model.NewTraceID(1, uint64(i)) diff --git a/plugin/storage/memory/options.go b/plugin/storage/memory/options.go index d2ec03d21b4..27ee79f40c6 100644 --- a/plugin/storage/memory/options.go +++ b/plugin/storage/memory/options.go @@ -18,23 +18,26 @@ import ( "flag" "github.com/spf13/viper" - - "github.com/jaegertracing/jaeger/pkg/memory/config" ) -const limit = "memory.max-traces" +const ( + limit = "memory.max-traces" + samplingAggregationBuckets = "memory.sampling.aggregation-buckets" +) // Options stores the configuration entries for this storage type Options struct { - Configuration config.Configuration `mapstructure:",squash"` + Config Configuration `mapstructure:",squash"` } // AddFlags from this storage to the CLI func AddFlags(flagSet *flag.FlagSet) { flagSet.Int(limit, 0, "The maximum amount of traces to store in memory. The default number of traces is unbounded.") + flagSet.Int(samplingAggregationBuckets, 20, "SamplingAggregationBuckets is used with adaptive sampling to control how many buckets of trace throughput is stored in memory. Should not be fewer than the number of buckets used in adaptive sampling.") } // InitFromViper initializes the options struct with values from Viper func (opt *Options) InitFromViper(v *viper.Viper) { - opt.Configuration.MaxTraces = v.GetInt(limit) + opt.Config.MaxTraces = v.GetInt(limit) + opt.Config.SamplingAggregationBuckets = v.GetInt(samplingAggregationBuckets) } diff --git a/plugin/storage/memory/options_test.go b/plugin/storage/memory/options_test.go index 4aede1048ed..377181f6450 100644 --- a/plugin/storage/memory/options_test.go +++ b/plugin/storage/memory/options_test.go @@ -24,9 +24,13 @@ import ( func TestOptionsWithFlags(t *testing.T) { v, command := config.Viperize(AddFlags) - command.ParseFlags([]string{"--memory.max-traces=100"}) + command.ParseFlags([]string{ + "--memory.max-traces=100", + "--memory.sampling.aggregation-buckets=42", + }) opts := Options{} opts.InitFromViper(v) - assert.Equal(t, 100, opts.Configuration.MaxTraces) + assert.Equal(t, 100, opts.Config.MaxTraces) + assert.Equal(t, 42, opts.Config.SamplingAggregationBuckets) } diff --git a/storage/factory.go b/storage/factory.go index 462a626b64f..8f963838c85 100644 --- a/storage/factory.go +++ b/storage/factory.go @@ -63,7 +63,7 @@ type SamplingStoreFactory interface { // CreateLock creates a distributed lock. CreateLock() (distributedlock.Lock, error) // CreateSamplingStore creates a sampling store. - CreateSamplingStore(maxBuckets int) (samplingstore.Store, error) + CreateSamplingStore() (samplingstore.Store, error) } var (