From d72a9b02132cd9169b0a81b8fceedef33d36becf Mon Sep 17 00:00:00 2001 From: Pushkar Mishra Date: Tue, 27 Feb 2024 07:59:46 +0530 Subject: [PATCH] Add Elasticsearch storage support for adaptive sampling (#5158) ## Which problem is this PR solving? - #3305 ## Description of the changes - Implemented Elasticsearch storage for adaptive sampling ## How was this change tested? - not tested yet ## Checklist - [x] I have read https://github.com/jaegertracing/jaeger/blob/master/CONTRIBUTING_GUIDELINES.md - [x] I have signed all commits - [x] I have added unit tests for the new functionality - [x] I have run lint and test steps successfully - for `jaeger`: `make lint test` - for `jaeger-ui`: `yarn lint` and `yarn test` --------- Signed-off-by: Pushkar Mishra Co-authored-by: Yuri Shkuro --- cmd/es-index-cleaner/app/index_filter.go | 7 +- cmd/es-index-cleaner/app/index_filter_test.go | 25 + cmd/es-rollover/app/flags.go | 4 + cmd/es-rollover/app/flags_test.go | 2 + cmd/es-rollover/app/index_options.go | 10 +- cmd/es-rollover/app/index_options_test.go | 54 +- cmd/es-rollover/app/init/action.go | 3 +- cmd/es-rollover/app/init/flags.go | 6 +- cmd/es-rollover/app/init/flags_test.go | 2 + cmd/es-rollover/app/lookback/action.go | 2 +- cmd/es-rollover/app/rollover/action.go | 2 +- .../app/renderer/render.go | 1 + pkg/es/config/config.go | 23 +- plugin/storage/es/factory.go | 15 + plugin/storage/es/factory_test.go | 4 + .../mappings/fixtures/jaeger-sampling-7.json | 17 + .../mappings/fixtures/jaeger-sampling-8.json | 20 + .../es/mappings/fixtures/jaeger-sampling.json | 10 + .../es/mappings/jaeger-sampling-7.json | 21 + .../es/mappings/jaeger-sampling-8.json | 24 + .../storage/es/mappings/jaeger-sampling.json | 10 + plugin/storage/es/mappings/mapping.go | 6 + plugin/storage/es/mappings/mapping_test.go | 13 + plugin/storage/es/options.go | 17 +- .../es/samplingstore/dbmodel/converter.go | 51 ++ .../samplingstore/dbmodel/converter_test.go | 53 ++ .../storage/es/samplingstore/dbmodel/model.go | 42 ++ plugin/storage/es/samplingstore/storage.go | 216 ++++++++ .../storage/es/samplingstore/storage_test.go | 461 ++++++++++++++++++ .../storage/integration/elasticsearch_test.go | 93 ++-- .../integration/es_index_cleaner_test.go | 57 ++- .../integration/es_index_rollover_test.go | 26 +- 32 files changed, 1204 insertions(+), 93 deletions(-) create mode 100644 plugin/storage/es/mappings/fixtures/jaeger-sampling-7.json create mode 100644 plugin/storage/es/mappings/fixtures/jaeger-sampling-8.json create mode 100644 plugin/storage/es/mappings/fixtures/jaeger-sampling.json create mode 100644 plugin/storage/es/mappings/jaeger-sampling-7.json create mode 100644 plugin/storage/es/mappings/jaeger-sampling-8.json create mode 100644 plugin/storage/es/mappings/jaeger-sampling.json create mode 100644 plugin/storage/es/samplingstore/dbmodel/converter.go create mode 100644 plugin/storage/es/samplingstore/dbmodel/converter_test.go create mode 100644 plugin/storage/es/samplingstore/dbmodel/model.go create mode 100644 plugin/storage/es/samplingstore/storage.go create mode 100644 plugin/storage/es/samplingstore/storage_test.go diff --git a/cmd/es-index-cleaner/app/index_filter.go b/cmd/es-index-cleaner/app/index_filter.go index 3f77dc18f54..60c28970b64 100644 --- a/cmd/es-index-cleaner/app/index_filter.go +++ b/cmd/es-index-cleaner/app/index_filter.go @@ -50,9 +50,9 @@ func (i *IndexFilter) filter(indices []client.Index) []client.Index { // archive works only for rollover reg, _ = regexp.Compile(fmt.Sprintf("^%sjaeger-span-archive-\\d{6}", i.IndexPrefix)) case i.Rollover: - reg, _ = regexp.Compile(fmt.Sprintf("^%sjaeger-(span|service|dependencies)-\\d{6}", i.IndexPrefix)) + reg, _ = regexp.Compile(fmt.Sprintf("^%sjaeger-(span|service|dependencies|sampling)-\\d{6}", i.IndexPrefix)) default: - reg, _ = regexp.Compile(fmt.Sprintf("^%sjaeger-(span|service|dependencies)-\\d{4}%s\\d{2}%s\\d{2}", i.IndexPrefix, i.IndexDateSeparator, i.IndexDateSeparator)) + reg, _ = regexp.Compile(fmt.Sprintf("^%sjaeger-(span|service|dependencies|sampling)-\\d{4}%s\\d{2}%s\\d{2}", i.IndexPrefix, i.IndexDateSeparator, i.IndexDateSeparator)) } var filtered []client.Index @@ -62,7 +62,8 @@ func (i *IndexFilter) filter(indices []client.Index) []client.Index { if in.Aliases[i.IndexPrefix+"jaeger-span-write"] || in.Aliases[i.IndexPrefix+"jaeger-service-write"] || in.Aliases[i.IndexPrefix+"jaeger-span-archive-write"] || - in.Aliases[i.IndexPrefix+"jaeger-dependencies-write"] { + in.Aliases[i.IndexPrefix+"jaeger-dependencies-write"] || + in.Aliases[i.IndexPrefix+"jaeger-sampling-write"] { continue } filtered = append(filtered, in) diff --git a/cmd/es-index-cleaner/app/index_filter_test.go b/cmd/es-index-cleaner/app/index_filter_test.go index ec03e2376ec..6a72970c359 100644 --- a/cmd/es-index-cleaner/app/index_filter_test.go +++ b/cmd/es-index-cleaner/app/index_filter_test.go @@ -64,6 +64,16 @@ func testIndexFilter(t *testing.T, prefix string) { CreationTime: time.Date(2020, time.August, 0o5, 15, 0, 0, 0, time.UTC), Aliases: map[string]bool{}, }, + { + Index: prefix + "jaeger-sampling-2020-08-06", + CreationTime: time.Date(2020, time.August, 0o6, 15, 0, 0, 0, time.UTC), + Aliases: map[string]bool{}, + }, + { + Index: prefix + "jaeger-sampling-2020-08-05", + CreationTime: time.Date(2020, time.August, 0o5, 15, 0, 0, 0, time.UTC), + Aliases: map[string]bool{}, + }, { Index: prefix + "jaeger-span-archive", CreationTime: time.Date(2020, time.August, 0, 15, 0, 0, 0, time.UTC), @@ -186,6 +196,11 @@ func testIndexFilter(t *testing.T, prefix string) { CreationTime: time.Date(2020, time.August, 0o5, 15, 0, 0, 0, time.UTC), Aliases: map[string]bool{}, }, + { + Index: prefix + "jaeger-sampling-2020-08-05", + CreationTime: time.Date(2020, time.August, 0o5, 15, 0, 0, 0, time.UTC), + Aliases: map[string]bool{}, + }, }, }, { @@ -228,6 +243,16 @@ func testIndexFilter(t *testing.T, prefix string) { CreationTime: time.Date(2020, time.August, 0o5, 15, 0, 0, 0, time.UTC), Aliases: map[string]bool{}, }, + { + Index: prefix + "jaeger-sampling-2020-08-06", + CreationTime: time.Date(2020, time.August, 0o6, 15, 0, 0, 0, time.UTC), + Aliases: map[string]bool{}, + }, + { + Index: prefix + "jaeger-sampling-2020-08-05", + CreationTime: time.Date(2020, time.August, 0o5, 15, 0, 0, 0, time.UTC), + Aliases: map[string]bool{}, + }, }, }, { diff --git a/cmd/es-rollover/app/flags.go b/cmd/es-rollover/app/flags.go index 582fe08c81e..56f3acce644 100644 --- a/cmd/es-rollover/app/flags.go +++ b/cmd/es-rollover/app/flags.go @@ -29,6 +29,7 @@ const ( ilmPolicyName = "es.ilm-policy-name" timeout = "timeout" skipDependencies = "skip-dependencies" + adaptiveSampling = "adaptive-sampling" ) // Config holds the global configurations for the es rollover, common to all actions @@ -42,6 +43,7 @@ type Config struct { UseILM bool Timeout int SkipDependencies bool + AdaptiveSampling bool } // AddFlags adds flags @@ -54,6 +56,7 @@ func AddFlags(flags *flag.FlagSet) { flags.String(ilmPolicyName, "jaeger-ilm-policy", "The name of the ILM policy to use if ILM is active") flags.Int(timeout, 120, "Number of seconds to wait for master node response") flags.Bool(skipDependencies, false, "Disable rollover for dependencies index") + flags.Bool(adaptiveSampling, false, "Enable rollover for adaptive sampling index") } // InitFromViper initializes config from viper.Viper. @@ -69,4 +72,5 @@ func (c *Config) InitFromViper(v *viper.Viper) { c.UseILM = v.GetBool(useILM) c.Timeout = v.GetInt(timeout) c.SkipDependencies = v.GetBool(skipDependencies) + c.AdaptiveSampling = v.GetBool(adaptiveSampling) } diff --git a/cmd/es-rollover/app/flags_test.go b/cmd/es-rollover/app/flags_test.go index 3c0e174c033..6bb8f31720e 100644 --- a/cmd/es-rollover/app/flags_test.go +++ b/cmd/es-rollover/app/flags_test.go @@ -42,6 +42,7 @@ func TestBindFlags(t *testing.T) { "--es.use-ilm=true", "--es.ilm-policy-name=jaeger-ilm", "--skip-dependencies=true", + "--adaptive-sampling=true", }) require.NoError(t, err) @@ -53,4 +54,5 @@ func TestBindFlags(t *testing.T) { assert.Equal(t, "qwerty123", c.Password) assert.Equal(t, "jaeger-ilm", c.ILMPolicyName) assert.True(t, c.SkipDependencies) + assert.True(t, c.AdaptiveSampling) } diff --git a/cmd/es-rollover/app/index_options.go b/cmd/es-rollover/app/index_options.go index d34c20187c5..f732ba22101 100644 --- a/cmd/es-rollover/app/index_options.go +++ b/cmd/es-rollover/app/index_options.go @@ -33,7 +33,7 @@ type IndexOption struct { } // RolloverIndices return an array of indices to rollover -func RolloverIndices(archive bool, skipDependencies bool, prefix string) []IndexOption { +func RolloverIndices(archive bool, skipDependencies bool, adaptiveSampling bool, prefix string) []IndexOption { if archive { return []IndexOption{ { @@ -65,6 +65,14 @@ func RolloverIndices(archive bool, skipDependencies bool, prefix string) []Index }) } + if adaptiveSampling { + indexOptions = append(indexOptions, IndexOption{ + prefix: prefix, + Mapping: "jaeger-sampling", + indexType: "jaeger-sampling", + }) + } + return indexOptions } diff --git a/cmd/es-rollover/app/index_options_test.go b/cmd/es-rollover/app/index_options_test.go index 0579062b6ec..f16f6607a9c 100644 --- a/cmd/es-rollover/app/index_options_test.go +++ b/cmd/es-rollover/app/index_options_test.go @@ -34,6 +34,7 @@ func TestRolloverIndices(t *testing.T) { archive bool prefix string skipDependencies bool + adaptiveSampling bool expected []expectedValues }{ { @@ -74,13 +75,6 @@ func TestRolloverIndices(t *testing.T) { writeAliasName: "mytenant-jaeger-span-archive-write", initialRolloverIndex: "mytenant-jaeger-span-archive-000001", }, - { - mapping: "jaeger-dependencies", - templateName: "mytenant-jaeger-dependencies", - readAliasName: "mytenant-jaeger-dependencies-read", - writeAliasName: "mytenant-jaeger-dependencies-write", - initialRolloverIndex: "mytenant-jaeger-dependencies-000001", - }, }, }, { @@ -97,8 +91,9 @@ func TestRolloverIndices(t *testing.T) { }, }, { - name: "with prefix", - prefix: "mytenant", + name: "with prefix", + prefix: "mytenant", + adaptiveSampling: true, expected: []expectedValues{ { mapping: "jaeger-span", @@ -121,12 +116,41 @@ func TestRolloverIndices(t *testing.T) { writeAliasName: "mytenant-jaeger-dependencies-write", initialRolloverIndex: "mytenant-jaeger-dependencies-000001", }, + { + mapping: "jaeger-sampling", + templateName: "mytenant-jaeger-sampling", + readAliasName: "mytenant-jaeger-sampling-read", + writeAliasName: "mytenant-jaeger-sampling-write", + initialRolloverIndex: "mytenant-jaeger-sampling-000001", + }, + }, + }, + { + name: "skip-dependency enable", + prefix: "mytenant", + skipDependencies: true, + expected: []expectedValues{ + { + mapping: "jaeger-span", + templateName: "mytenant-jaeger-span", + readAliasName: "mytenant-jaeger-span-read", + writeAliasName: "mytenant-jaeger-span-write", + initialRolloverIndex: "mytenant-jaeger-span-000001", + }, + { + mapping: "jaeger-service", + templateName: "mytenant-jaeger-service", + readAliasName: "mytenant-jaeger-service-read", + writeAliasName: "mytenant-jaeger-service-write", + initialRolloverIndex: "mytenant-jaeger-service-000001", + }, }, }, { - name: "dependency enable", + name: "adaptive sampling enable", prefix: "mytenant", skipDependencies: true, + adaptiveSampling: true, expected: []expectedValues{ { mapping: "jaeger-span", @@ -142,6 +166,13 @@ func TestRolloverIndices(t *testing.T) { writeAliasName: "mytenant-jaeger-service-write", initialRolloverIndex: "mytenant-jaeger-service-000001", }, + { + mapping: "jaeger-sampling", + templateName: "mytenant-jaeger-sampling", + readAliasName: "mytenant-jaeger-sampling-read", + writeAliasName: "mytenant-jaeger-sampling-write", + initialRolloverIndex: "mytenant-jaeger-sampling-000001", + }, }, }, } @@ -151,7 +182,8 @@ func TestRolloverIndices(t *testing.T) { if test.prefix != "" { test.prefix += "-" } - result := RolloverIndices(test.archive, test.skipDependencies, test.prefix) + result := RolloverIndices(test.archive, test.skipDependencies, test.adaptiveSampling, test.prefix) + assert.Equal(t, len(test.expected), len(result)) for i, r := range result { assert.Equal(t, test.expected[i].templateName, r.TemplateName()) assert.Equal(t, test.expected[i].mapping, r.Mapping) diff --git a/cmd/es-rollover/app/init/action.go b/cmd/es-rollover/app/init/action.go index 99134eb61f7..28ee6dea2d5 100644 --- a/cmd/es-rollover/app/init/action.go +++ b/cmd/es-rollover/app/init/action.go @@ -44,6 +44,7 @@ func (c Action) getMapping(version uint, templateName string) (string, error) { PrioritySpanTemplate: int64(c.Config.PrioritySpanTemplate), PriorityServiceTemplate: int64(c.Config.PriorityServiceTemplate), PriorityDependenciesTemplate: int64(c.Config.PriorityDependenciesTemplate), + PrioritySamplingTemplate: int64(c.Config.PrioritySamplingTemplate), Shards: int64(c.Config.Shards), Replicas: int64(c.Config.Replicas), IndexPrefix: c.Config.IndexPrefix, @@ -73,7 +74,7 @@ func (c Action) Do() error { return fmt.Errorf("ILM is supported only for ES version 7+") } } - rolloverIndices := app.RolloverIndices(c.Config.Archive, c.Config.SkipDependencies, c.Config.IndexPrefix) + rolloverIndices := app.RolloverIndices(c.Config.Archive, c.Config.SkipDependencies, c.Config.AdaptiveSampling, c.Config.IndexPrefix) for _, indexName := range rolloverIndices { if err := c.init(version, indexName); err != nil { return err diff --git a/cmd/es-rollover/app/init/flags.go b/cmd/es-rollover/app/init/flags.go index 27fe910e569..6b51e1ebce5 100644 --- a/cmd/es-rollover/app/init/flags.go +++ b/cmd/es-rollover/app/init/flags.go @@ -28,6 +28,7 @@ const ( prioritySpanTemplate = "priority-span-template" priorityServiceTemplate = "priority-service-template" priorityDependenciesTemplate = "priority-dependencies-template" + prioritySamplingTemplate = "priority-sampling-template" ) // Config holds configuration for index cleaner binary. @@ -38,6 +39,7 @@ type Config struct { PrioritySpanTemplate int PriorityServiceTemplate int PriorityDependenciesTemplate int + PrioritySamplingTemplate int } // AddFlags adds flags for TLS to the FlagSet. @@ -46,7 +48,8 @@ func (c *Config) AddFlags(flags *flag.FlagSet) { flags.Int(replicas, 1, "Number of replicas") flags.Int(prioritySpanTemplate, 0, "Priority of jaeger-span index template (ESv8 only)") flags.Int(priorityServiceTemplate, 0, "Priority of jaeger-service index template (ESv8 only)") - flags.Int(priorityDependenciesTemplate, 0, "Priority of jaeger-dependecies index template (ESv8 only)") + flags.Int(priorityDependenciesTemplate, 0, "Priority of jaeger-dependencies index template (ESv8 only)") + flags.Int(prioritySamplingTemplate, 0, "Priority of jaeger-sampling index template (ESv8 only)") } // InitFromViper initializes config from viper.Viper. @@ -56,4 +59,5 @@ func (c *Config) InitFromViper(v *viper.Viper) { c.PrioritySpanTemplate = v.GetInt(prioritySpanTemplate) c.PriorityServiceTemplate = v.GetInt(priorityServiceTemplate) c.PriorityDependenciesTemplate = v.GetInt(priorityDependenciesTemplate) + c.PrioritySamplingTemplate = v.GetInt(prioritySamplingTemplate) } diff --git a/cmd/es-rollover/app/init/flags_test.go b/cmd/es-rollover/app/init/flags_test.go index 4104f71e5ff..8856ca6f549 100644 --- a/cmd/es-rollover/app/init/flags_test.go +++ b/cmd/es-rollover/app/init/flags_test.go @@ -39,6 +39,7 @@ func TestBindFlags(t *testing.T) { "--priority-span-template=300", "--priority-service-template=301", "--priority-dependencies-template=302", + "--priority-sampling-template=303", }) require.NoError(t, err) @@ -48,4 +49,5 @@ func TestBindFlags(t *testing.T) { assert.Equal(t, 300, c.PrioritySpanTemplate) assert.Equal(t, 301, c.PriorityServiceTemplate) assert.Equal(t, 302, c.PriorityDependenciesTemplate) + assert.Equal(t, 303, c.PrioritySamplingTemplate) } diff --git a/cmd/es-rollover/app/lookback/action.go b/cmd/es-rollover/app/lookback/action.go index f944ff59523..6d2e5dcf4c7 100644 --- a/cmd/es-rollover/app/lookback/action.go +++ b/cmd/es-rollover/app/lookback/action.go @@ -35,7 +35,7 @@ type Action struct { // Do the lookback action func (a *Action) Do() error { - rolloverIndices := app.RolloverIndices(a.Config.Archive, a.Config.SkipDependencies, a.Config.IndexPrefix) + rolloverIndices := app.RolloverIndices(a.Config.Archive, a.Config.SkipDependencies, a.Config.AdaptiveSampling, a.Config.IndexPrefix) for _, indexName := range rolloverIndices { if err := a.lookback(indexName); err != nil { return err diff --git a/cmd/es-rollover/app/rollover/action.go b/cmd/es-rollover/app/rollover/action.go index 2ed612ee6ee..594a488f0f3 100644 --- a/cmd/es-rollover/app/rollover/action.go +++ b/cmd/es-rollover/app/rollover/action.go @@ -30,7 +30,7 @@ type Action struct { // Do the rollover action func (a *Action) Do() error { - rolloverIndices := app.RolloverIndices(a.Config.Archive, a.Config.SkipDependencies, a.Config.IndexPrefix) + rolloverIndices := app.RolloverIndices(a.Config.Archive, a.Config.SkipDependencies, a.Config.AdaptiveSampling, a.Config.IndexPrefix) for _, indexName := range rolloverIndices { if err := a.rollover(indexName); err != nil { return err diff --git a/cmd/esmapping-generator/app/renderer/render.go b/cmd/esmapping-generator/app/renderer/render.go index e9d1ed134eb..74bdfa3cb66 100644 --- a/cmd/esmapping-generator/app/renderer/render.go +++ b/cmd/esmapping-generator/app/renderer/render.go @@ -26,6 +26,7 @@ var supportedMappings = map[string]struct{}{ "jaeger-span": {}, "jaeger-service": {}, "jaeger-dependencies": {}, + "jaeger-sampling": {}, } // GetMappingAsString returns rendered index templates as string diff --git a/pkg/es/config/config.go b/pkg/es/config/config.go index 6fc76db5615..3e0f44673ab 100644 --- a/pkg/es/config/config.go +++ b/pkg/es/config/config.go @@ -70,9 +70,12 @@ type Configuration struct { IndexPrefix string `mapstructure:"index_prefix"` IndexDateLayoutSpans string `mapstructure:"-"` IndexDateLayoutServices string `mapstructure:"-"` + IndexDateLayoutSampling string `mapstructure:"-"` IndexDateLayoutDependencies string `mapstructure:"-"` IndexRolloverFrequencySpans string `mapstructure:"-"` IndexRolloverFrequencyServices string `mapstructure:"-"` + IndexRolloverFrequencySampling string `mapstructure:"-"` + AdaptiveSamplingLookback time.Duration `mapstructure:"-"` Tags TagsAsFields `mapstructure:"tags_as_fields"` Enabled bool `mapstructure:"-"` TLS tlscfg.Options `mapstructure:"tls"` @@ -231,6 +234,9 @@ func (c *Configuration) ApplyDefaults(source *Configuration) { if c.MaxSpanAge == 0 { c.MaxSpanAge = source.MaxSpanAge } + if c.AdaptiveSamplingLookback == 0 { + c.AdaptiveSamplingLookback = source.AdaptiveSamplingLookback + } if c.NumShards == 0 { c.NumShards = source.NumShards } @@ -286,15 +292,22 @@ func (c *Configuration) ApplyDefaults(source *Configuration) { // GetIndexRolloverFrequencySpansDuration returns jaeger-span index rollover frequency duration func (c *Configuration) GetIndexRolloverFrequencySpansDuration() time.Duration { - if c.IndexRolloverFrequencySpans == "hour" { - return -1 * time.Hour - } - return -24 * time.Hour + return getIndexRolloverFrequencyDuration(c.IndexRolloverFrequencySpans) } // GetIndexRolloverFrequencyServicesDuration returns jaeger-service index rollover frequency duration func (c *Configuration) GetIndexRolloverFrequencyServicesDuration() time.Duration { - if c.IndexRolloverFrequencyServices == "hour" { + return getIndexRolloverFrequencyDuration(c.IndexRolloverFrequencyServices) +} + +// GetIndexRolloverFrequencySamplingDuration returns jaeger-sampling index rollover frequency duration +func (c *Configuration) GetIndexRolloverFrequencySamplingDuration() time.Duration { + return getIndexRolloverFrequencyDuration(c.IndexRolloverFrequencySampling) +} + +// GetIndexRolloverFrequencyDuration returns the index rollover frequency duration for the given frequency string +func getIndexRolloverFrequencyDuration(frequency string) time.Duration { + if frequency == "hour" { return -1 * time.Hour } return -24 * time.Hour diff --git a/plugin/storage/es/factory.go b/plugin/storage/es/factory.go index 5187ac5db7a..8a6866c74ae 100644 --- a/plugin/storage/es/factory.go +++ b/plugin/storage/es/factory.go @@ -37,9 +37,11 @@ import ( "github.com/jaegertracing/jaeger/plugin" esDepStore "github.com/jaegertracing/jaeger/plugin/storage/es/dependencystore" "github.com/jaegertracing/jaeger/plugin/storage/es/mappings" + esSampleStore "github.com/jaegertracing/jaeger/plugin/storage/es/samplingstore" esSpanStore "github.com/jaegertracing/jaeger/plugin/storage/es/spanstore" "github.com/jaegertracing/jaeger/storage" "github.com/jaegertracing/jaeger/storage/dependencystore" + "github.com/jaegertracing/jaeger/storage/samplingstore" "github.com/jaegertracing/jaeger/storage/spanstore" ) @@ -305,6 +307,19 @@ func createSpanWriter( return writer, nil } +func (f *Factory) CreateSamplingStore(maxBuckets int) (samplingstore.Store, error) { + store := esSampleStore.NewSamplingStore(esSampleStore.SamplingStoreParams{ + Client: f.getPrimaryClient, + Logger: f.logger, + IndexPrefix: f.primaryConfig.IndexPrefix, + IndexDateLayout: f.primaryConfig.IndexDateLayoutSampling, + IndexRolloverFrequency: f.primaryConfig.GetIndexRolloverFrequencySamplingDuration(), + Lookback: f.primaryConfig.AdaptiveSamplingLookback, + MaxDocCount: f.primaryConfig.MaxDocCount, + }) + return store, nil +} + func createDependencyReader( clientFn func() es.Client, cfg *config.Configuration, diff --git a/plugin/storage/es/factory_test.go b/plugin/storage/es/factory_test.go index 039552c574b..5d3359622ff 100644 --- a/plugin/storage/es/factory_test.go +++ b/plugin/storage/es/factory_test.go @@ -106,6 +106,10 @@ func TestElasticsearchFactory(t *testing.T) { _, err = f.CreateArchiveSpanWriter() require.NoError(t, err) + + _, err = f.CreateSamplingStore(1) + require.NoError(t, err) + require.NoError(t, f.Close()) } diff --git a/plugin/storage/es/mappings/fixtures/jaeger-sampling-7.json b/plugin/storage/es/mappings/fixtures/jaeger-sampling-7.json new file mode 100644 index 00000000000..910a6ca8675 --- /dev/null +++ b/plugin/storage/es/mappings/fixtures/jaeger-sampling-7.json @@ -0,0 +1,17 @@ +{ + "index_patterns": "*jaeger-sampling-*", + "aliases": { + "test-jaeger-sampling-read" : {} + }, + "settings":{ + "index.number_of_shards": 3, + "index.number_of_replicas": 3, + "index.mapping.nested_fields.limit":50, + "index.requests.cache.enable":true + ,"lifecycle": { + "name": "jaeger-test-policy", + "rollover_alias": "test-jaeger-sampling-write" + } + }, + "mappings":{} +} diff --git a/plugin/storage/es/mappings/fixtures/jaeger-sampling-8.json b/plugin/storage/es/mappings/fixtures/jaeger-sampling-8.json new file mode 100644 index 00000000000..e8fbbbf4082 --- /dev/null +++ b/plugin/storage/es/mappings/fixtures/jaeger-sampling-8.json @@ -0,0 +1,20 @@ +{ + "priority": 503, + "index_patterns": "test-jaeger-sampling-*", + "template": { + "aliases": { + "test-jaeger-sampling-read": {} + }, + "settings": { + "index.number_of_shards": 3, + "index.number_of_replicas": 3, + "index.mapping.nested_fields.limit": 50, + "index.requests.cache.enable": true, + "lifecycle": { + "name": "jaeger-test-policy", + "rollover_alias": "test-jaeger-sampling-write" + } + }, + "mappings": {} + } +} diff --git a/plugin/storage/es/mappings/fixtures/jaeger-sampling.json b/plugin/storage/es/mappings/fixtures/jaeger-sampling.json new file mode 100644 index 00000000000..87304011017 --- /dev/null +++ b/plugin/storage/es/mappings/fixtures/jaeger-sampling.json @@ -0,0 +1,10 @@ +{ + "template": "*jaeger-sampling-*", + "settings":{ + "index.number_of_shards": 3, + "index.number_of_replicas": 3, + "index.mapping.nested_fields.limit":50, + "index.requests.cache.enable":true + }, + "mappings":{} +} diff --git a/plugin/storage/es/mappings/jaeger-sampling-7.json b/plugin/storage/es/mappings/jaeger-sampling-7.json new file mode 100644 index 00000000000..167c1d47928 --- /dev/null +++ b/plugin/storage/es/mappings/jaeger-sampling-7.json @@ -0,0 +1,21 @@ +{ + "index_patterns": "*jaeger-sampling-*", + {{- if .UseILM }} + "aliases": { + "{{ .IndexPrefix }}jaeger-sampling-read" : {} + }, + {{- end }} + "settings":{ + "index.number_of_shards": {{ .Shards }}, + "index.number_of_replicas": {{ .Replicas }}, + "index.mapping.nested_fields.limit":50, + "index.requests.cache.enable":false + {{- if .UseILM }} + ,"lifecycle": { + "name": "{{ .ILMPolicyName }}", + "rollover_alias": "{{ .IndexPrefix }}jaeger-sampling-write" + } + {{- end }} + }, + "mappings":{} +} diff --git a/plugin/storage/es/mappings/jaeger-sampling-8.json b/plugin/storage/es/mappings/jaeger-sampling-8.json new file mode 100644 index 00000000000..0667520803a --- /dev/null +++ b/plugin/storage/es/mappings/jaeger-sampling-8.json @@ -0,0 +1,24 @@ +{ + "priority": {{ .PrioritySamplingTemplate }}, + "index_patterns": "{{ .IndexPrefix }}jaeger-sampling-*", + "template": { + {{- if .UseILM }} + "aliases": { + "{{ .IndexPrefix }}jaeger-sampling-read": {} + }, + {{- end }} + "settings": { + "index.number_of_shards": {{ .Shards }}, + "index.number_of_replicas": {{ .Replicas }}, + "index.mapping.nested_fields.limit": 50, + "index.requests.cache.enable": false + {{- if .UseILM }}, + "lifecycle": { + "name": "{{ .ILMPolicyName }}", + "rollover_alias": "{{ .IndexPrefix }}jaeger-sampling-write" + } + {{- end }} + }, + "mappings": {} + } +} diff --git a/plugin/storage/es/mappings/jaeger-sampling.json b/plugin/storage/es/mappings/jaeger-sampling.json new file mode 100644 index 00000000000..458d490a357 --- /dev/null +++ b/plugin/storage/es/mappings/jaeger-sampling.json @@ -0,0 +1,10 @@ +{ + "template": "*jaeger-sampling-*", + "settings":{ + "index.number_of_shards": {{ .Shards }}, + "index.number_of_replicas": {{ .Replicas }}, + "index.mapping.nested_fields.limit":50, + "index.requests.cache.enable":false + }, + "mappings":{} +} diff --git a/plugin/storage/es/mappings/mapping.go b/plugin/storage/es/mappings/mapping.go index 9ee624120db..5da255f8838 100644 --- a/plugin/storage/es/mappings/mapping.go +++ b/plugin/storage/es/mappings/mapping.go @@ -35,6 +35,7 @@ type MappingBuilder struct { PrioritySpanTemplate int64 PriorityServiceTemplate int64 PriorityDependenciesTemplate int64 + PrioritySamplingTemplate int64 EsVersion uint IndexPrefix string UseILM bool @@ -69,6 +70,11 @@ func (mb *MappingBuilder) GetDependenciesMappings() (string, error) { return mb.GetMapping("jaeger-dependencies") } +// GetSamplingMappings returns sampling mappings +func (mb *MappingBuilder) GetSamplingMappings() (string, error) { + return mb.GetMapping("jaeger-sampling") +} + func loadMapping(name string) string { s, _ := MAPPINGS.ReadFile(name) return string(s) diff --git a/plugin/storage/es/mappings/mapping_test.go b/plugin/storage/es/mappings/mapping_test.go index 562c8400f83..167c6893d1b 100644 --- a/plugin/storage/es/mappings/mapping_test.go +++ b/plugin/storage/es/mappings/mapping_test.go @@ -332,6 +332,19 @@ func TestMappingBuilder_GetDependenciesMappings(t *testing.T) { require.EqualError(t, err, "template load error") } +func TestMappingBuilder_GetSamplingMappings(t *testing.T) { + tb := mocks.TemplateBuilder{} + ta := mocks.TemplateApplier{} + ta.On("Execute", mock.Anything, mock.Anything).Return(errors.New("template load error")) + tb.On("Parse", mock.Anything).Return(&ta, nil) + + mappingBuilder := MappingBuilder{ + TemplateBuilder: &tb, + } + _, err := mappingBuilder.GetSamplingMappings() + require.EqualError(t, err, "template load error") +} + func TestMain(m *testing.M) { testutils.VerifyGoLeaks(m) } diff --git a/plugin/storage/es/options.go b/plugin/storage/es/options.go index 406da2c313e..efdd1c6fd9b 100644 --- a/plugin/storage/es/options.go +++ b/plugin/storage/es/options.go @@ -38,6 +38,7 @@ const ( suffixServerURLs = ".server-urls" suffixRemoteReadClusters = ".remote-read-clusters" suffixMaxSpanAge = ".max-span-age" + suffixAdaptiveSamplingLookback = ".adaptive-sampling.lookback" suffixNumShards = ".num-shards" suffixNumReplicas = ".num-replicas" suffixPrioritySpanTemplate = ".prioirity-span-template" @@ -52,6 +53,7 @@ const ( suffixIndexDateSeparator = ".index-date-separator" suffixIndexRolloverFrequencySpans = ".index-rollover-frequency-spans" suffixIndexRolloverFrequencyServices = ".index-rollover-frequency-services" + suffixIndexRolloverFrequencySampling = ".index-rollover-frequency-adaptive-sampling" suffixTagsAsFields = ".tags-as-fields" suffixTagsAsFieldsAll = suffixTagsAsFields + ".all" suffixTagsAsFieldsInclude = suffixTagsAsFields + ".include" @@ -101,6 +103,7 @@ func NewOptions(primaryNamespace string, otherNamespaces ...string) *Options { Password: "", Sniffer: false, MaxSpanAge: 72 * time.Hour, + AdaptiveSamplingLookback: 72 * time.Hour, NumShards: 5, NumReplicas: 1, PrioritySpanTemplate: 0, @@ -244,6 +247,11 @@ func addFlags(flagSet *flag.FlagSet, nsConfig *namespaceConfig) { defaultIndexRolloverFrequency, "Rotates jaeger-service indices over the given period. For example \"day\" creates \"jaeger-service-yyyy-MM-dd\" every day after UTC 12AM. Valid options: [hour, day]. "+ "This does not delete old indices. For details on complete index management solutions supported by Jaeger, refer to: https://www.jaegertracing.io/docs/deployment/#elasticsearch-rollover") + flagSet.String( + nsConfig.namespace+suffixIndexRolloverFrequencySampling, + defaultIndexRolloverFrequency, + "Rotates jaeger-sampling indices over the given period. For example \"day\" creates \"jaeger-sampling-yyyy-MM-dd\" every day after UTC 12AM. Valid options: [hour, day]. "+ + "This does not delete old indices. For details on complete index management solutions supported by Jaeger, refer to: https://www.jaegertracing.io/docs/deployment/#elasticsearch-rollover") flagSet.Bool( nsConfig.namespace+suffixTagsAsFieldsAll, nsConfig.Tags.AllAsFields, @@ -296,7 +304,10 @@ func addFlags(flagSet *flag.FlagSet, nsConfig *namespaceConfig) { nsConfig.namespace+suffixSendGetBodyAs, nsConfig.SendGetBodyAs, "HTTP verb for requests that contain a body [GET, POST].") - + flagSet.Duration( + nsConfig.namespace+suffixAdaptiveSamplingLookback, + nsConfig.AdaptiveSamplingLookback, + "How far back to look for the latest adaptive sampling probabilities") if nsConfig.namespace == archiveNamespace { flagSet.Bool( nsConfig.namespace+suffixEnabled, @@ -330,6 +341,7 @@ func initFromViper(cfg *namespaceConfig, v *viper.Viper) { cfg.SnifferTLSEnabled = v.GetBool(cfg.namespace + suffixSnifferTLSEnabled) cfg.Servers = strings.Split(stripWhiteSpace(v.GetString(cfg.namespace+suffixServerURLs)), ",") cfg.MaxSpanAge = v.GetDuration(cfg.namespace + suffixMaxSpanAge) + cfg.AdaptiveSamplingLookback = v.GetDuration(cfg.namespace + suffixAdaptiveSamplingLookback) cfg.NumShards = v.GetInt64(cfg.namespace + suffixNumShards) cfg.NumReplicas = v.GetInt64(cfg.namespace + suffixNumReplicas) cfg.PrioritySpanTemplate = v.GetInt64(cfg.namespace + suffixPrioritySpanTemplate) @@ -365,14 +377,15 @@ func initFromViper(cfg *namespaceConfig, v *viper.Viper) { cfg.IndexRolloverFrequencySpans = strings.ToLower(v.GetString(cfg.namespace + suffixIndexRolloverFrequencySpans)) cfg.IndexRolloverFrequencyServices = strings.ToLower(v.GetString(cfg.namespace + suffixIndexRolloverFrequencyServices)) + cfg.IndexRolloverFrequencySampling = strings.ToLower(v.GetString(cfg.namespace + suffixIndexRolloverFrequencySampling)) separator := v.GetString(cfg.namespace + suffixIndexDateSeparator) cfg.IndexDateLayoutSpans = initDateLayout(cfg.IndexRolloverFrequencySpans, separator) cfg.IndexDateLayoutServices = initDateLayout(cfg.IndexRolloverFrequencyServices, separator) + cfg.IndexDateLayoutSampling = initDateLayout(cfg.IndexRolloverFrequencySampling, separator) // Dependencies calculation should be daily, and this index size is very small cfg.IndexDateLayoutDependencies = initDateLayout(defaultIndexRolloverFrequency, separator) - var err error cfg.TLS, err = cfg.getTLSFlagsConfig().InitFromViper(v) if err != nil { diff --git a/plugin/storage/es/samplingstore/dbmodel/converter.go b/plugin/storage/es/samplingstore/dbmodel/converter.go new file mode 100644 index 00000000000..259bc5cbc36 --- /dev/null +++ b/plugin/storage/es/samplingstore/dbmodel/converter.go @@ -0,0 +1,51 @@ +// Copyright (c) 2024 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package dbmodel + +import ( + "github.com/jaegertracing/jaeger/cmd/collector/app/sampling/model" +) + +func FromThroughputs(throughputs []*model.Throughput) []Throughput { + if throughputs == nil { + return nil + } + ret := make([]Throughput, len(throughputs)) + for i, d := range throughputs { + ret[i] = Throughput{ + Service: d.Service, + Operation: d.Operation, + Count: d.Count, + Probabilities: d.Probabilities, + } + } + return ret +} + +func ToThroughputs(throughputs []Throughput) []*model.Throughput { + if throughputs == nil { + return nil + } + ret := make([]*model.Throughput, len(throughputs)) + for i, d := range throughputs { + ret[i] = &model.Throughput{ + Service: d.Service, + Operation: d.Operation, + Count: d.Count, + Probabilities: d.Probabilities, + } + } + return ret +} diff --git a/plugin/storage/es/samplingstore/dbmodel/converter_test.go b/plugin/storage/es/samplingstore/dbmodel/converter_test.go new file mode 100644 index 00000000000..9ac02f1eac8 --- /dev/null +++ b/plugin/storage/es/samplingstore/dbmodel/converter_test.go @@ -0,0 +1,53 @@ +// Copyright (c) 2024 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package dbmodel + +import ( + "strconv" + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/jaegertracing/jaeger/cmd/collector/app/sampling/model" + "github.com/jaegertracing/jaeger/pkg/testutils" +) + +func TestConvertDependencies(t *testing.T) { + tests := []struct { + throughputs []*model.Throughput + }{ + { + throughputs: []*model.Throughput{{Service: "service1", Operation: "operation1", Count: 10, Probabilities: map[string]struct{}{"new-srv": {}}}}, + }, + { + throughputs: []*model.Throughput{}, + }, + { + throughputs: nil, + }, + } + + for i, test := range tests { + t.Run(strconv.Itoa(i), func(t *testing.T) { + got := FromThroughputs(test.throughputs) + a := ToThroughputs(got) + assert.Equal(t, test.throughputs, a) + }) + } +} + +func TestMain(m *testing.M) { + testutils.VerifyGoLeaks(m) +} diff --git a/plugin/storage/es/samplingstore/dbmodel/model.go b/plugin/storage/es/samplingstore/dbmodel/model.go new file mode 100644 index 00000000000..8205dc6415b --- /dev/null +++ b/plugin/storage/es/samplingstore/dbmodel/model.go @@ -0,0 +1,42 @@ +// Copyright (c) 2024 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package dbmodel + +import ( + "time" +) + +type Throughput struct { + Service string + Operation string + Count int64 + Probabilities map[string]struct{} +} + +type TimeThroughput struct { + Timestamp time.Time `json:"timestamp"` + Throughput Throughput `json:"throughputs"` +} + +type ProbabilitiesAndQPS struct { + Hostname string + Probabilities map[string]map[string]float64 + QPS map[string]map[string]float64 +} + +type TimeProbabilitiesAndQPS struct { + Timestamp time.Time `json:"timestamp"` + ProbabilitiesAndQPS ProbabilitiesAndQPS `json:"probabilitiesandqps"` +} diff --git a/plugin/storage/es/samplingstore/storage.go b/plugin/storage/es/samplingstore/storage.go new file mode 100644 index 00000000000..c27eb34b126 --- /dev/null +++ b/plugin/storage/es/samplingstore/storage.go @@ -0,0 +1,216 @@ +// Copyright (c) 2024 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package samplingstore + +import ( + "context" + "encoding/json" + "fmt" + "time" + + "github.com/olivere/elastic" + "go.uber.org/zap" + + "github.com/jaegertracing/jaeger/cmd/collector/app/sampling/model" + "github.com/jaegertracing/jaeger/pkg/es" + "github.com/jaegertracing/jaeger/plugin/storage/es/samplingstore/dbmodel" +) + +const ( + samplingIndex = "jaeger-sampling-" + throughputType = "throughput-sampling" + probabilitiesType = "probabilities-sampling" + indexPrefixSeparator = "-" +) + +type SamplingStore struct { + client func() es.Client + logger *zap.Logger + samplingIndexPrefix string + indexDateLayout string + maxDocCount int + indexRolloverFrequency time.Duration + lookback time.Duration +} + +type SamplingStoreParams struct { + Client func() es.Client + Logger *zap.Logger + IndexPrefix string + IndexDateLayout string + IndexRolloverFrequency time.Duration + Lookback time.Duration + MaxDocCount int +} + +func NewSamplingStore(p SamplingStoreParams) *SamplingStore { + return &SamplingStore{ + client: p.Client, + logger: p.Logger, + samplingIndexPrefix: p.prefixIndexName(), + indexDateLayout: p.IndexDateLayout, + maxDocCount: p.MaxDocCount, + indexRolloverFrequency: p.IndexRolloverFrequency, + lookback: p.Lookback, + } +} + +func (s *SamplingStore) InsertThroughput(throughput []*model.Throughput) error { + ts := time.Now() + indexName := indexWithDate(s.samplingIndexPrefix, s.indexDateLayout, ts) + for _, eachThroughput := range dbmodel.FromThroughputs(throughput) { + s.client().Index().Index(indexName).Type(throughputType). + BodyJson(&dbmodel.TimeThroughput{ + Timestamp: ts, + Throughput: eachThroughput, + }).Add() + } + return nil +} + +func (s *SamplingStore) GetThroughput(start, end time.Time) ([]*model.Throughput, error) { + ctx := context.Background() + indices := getReadIndices(s.samplingIndexPrefix, s.indexDateLayout, start, end, s.indexRolloverFrequency) + searchResult, err := s.client().Search(indices...). + Size(s.maxDocCount). + Query(buildTSQuery(start, end)). + IgnoreUnavailable(true). + Do(ctx) + if err != nil { + return nil, fmt.Errorf("failed to search for throughputs: %w", err) + } + output := make([]dbmodel.TimeThroughput, len(searchResult.Hits.Hits)) + for i, hit := range searchResult.Hits.Hits { + if err := json.Unmarshal(*hit.Source, &output[i]); err != nil { + return nil, fmt.Errorf("unmarshalling documents failed: %w", err) + } + } + outThroughputs := make([]dbmodel.Throughput, len(output)) + for i, out := range output { + outThroughputs[i] = out.Throughput + } + return dbmodel.ToThroughputs(outThroughputs), nil +} + +func (s *SamplingStore) InsertProbabilitiesAndQPS(hostname string, + probabilities model.ServiceOperationProbabilities, + qps model.ServiceOperationQPS, +) error { + ts := time.Now() + writeIndexName := indexWithDate(s.samplingIndexPrefix, s.indexDateLayout, ts) + val := dbmodel.ProbabilitiesAndQPS{ + Hostname: hostname, + Probabilities: probabilities, + QPS: qps, + } + s.writeProbabilitiesAndQPS(writeIndexName, ts, val) + return nil +} + +func (s *SamplingStore) GetLatestProbabilities() (model.ServiceOperationProbabilities, error) { + ctx := context.Background() + clientFn := s.client() + indices, err := getLatestIndices(s.samplingIndexPrefix, s.indexDateLayout, clientFn, s.indexRolloverFrequency, s.lookback) + if err != nil { + return nil, fmt.Errorf("failed to get latest indices: %w", err) + } + searchResult, err := clientFn.Search(indices...). + Size(s.maxDocCount). + IgnoreUnavailable(true). + Do(ctx) + if err != nil { + return nil, fmt.Errorf("failed to search for Latest Probabilities: %w", err) + } + lengthOfSearchResult := len(searchResult.Hits.Hits) + if lengthOfSearchResult == 0 { + return nil, nil + } + + var latestProbabilities dbmodel.TimeProbabilitiesAndQPS + latestTime := time.Time{} + for _, hit := range searchResult.Hits.Hits { + var data dbmodel.TimeProbabilitiesAndQPS + if err = json.Unmarshal(*hit.Source, &data); err != nil { + return nil, fmt.Errorf("unmarshalling documents failed: %w", err) + } + if data.Timestamp.After(latestTime) { + latestTime = data.Timestamp + latestProbabilities = data + } + } + return latestProbabilities.ProbabilitiesAndQPS.Probabilities, nil +} + +func (s *SamplingStore) writeProbabilitiesAndQPS(indexName string, ts time.Time, pandqps dbmodel.ProbabilitiesAndQPS) { + s.client().Index().Index(indexName).Type(probabilitiesType). + BodyJson(&dbmodel.TimeProbabilitiesAndQPS{ + Timestamp: ts, + ProbabilitiesAndQPS: pandqps, + }).Add() +} + +func (s *SamplingStore) CreateTemplates(samplingTemplate string) error { + _, err := s.client().CreateTemplate("jaeger-sampling").Body(samplingTemplate).Do(context.Background()) + return err +} + +func getLatestIndices(indexPrefix, indexDateLayout string, clientFn es.Client, rollover time.Duration, maxDuration time.Duration) ([]string, error) { + ctx := context.Background() + now := time.Now().UTC() + earliest := now.Add(-maxDuration) + earliestIndex := indexWithDate(indexPrefix, indexDateLayout, earliest) + for { + currentIndex := indexWithDate(indexPrefix, indexDateLayout, now) + exists, err := clientFn.IndexExists(currentIndex).Do(ctx) + if err != nil { + return nil, fmt.Errorf("failed to check index existence: %w", err) + } + if exists { + return []string{currentIndex}, nil + } + if currentIndex == earliestIndex { + return nil, fmt.Errorf("falied to find latest index") + } + now = now.Add(rollover) // rollover is negative + } +} + +func getReadIndices(indexName, indexDateLayout string, startTime time.Time, endTime time.Time, rollover time.Duration) []string { + var indices []string + firstIndex := indexWithDate(indexName, indexDateLayout, startTime) + currentIndex := indexWithDate(indexName, indexDateLayout, endTime) + for currentIndex != firstIndex { + indices = append(indices, currentIndex) + endTime = endTime.Add(rollover) // rollover is negative + currentIndex = indexWithDate(indexName, indexDateLayout, endTime) + } + indices = append(indices, firstIndex) + return indices +} + +func (p *SamplingStoreParams) prefixIndexName() string { + if p.IndexPrefix != "" { + return p.IndexPrefix + indexPrefixSeparator + samplingIndex + } + return samplingIndex +} + +func buildTSQuery(start, end time.Time) elastic.Query { + return elastic.NewRangeQuery("timestamp").Gte(start).Lte(end) +} + +func indexWithDate(indexNamePrefix, indexDateLayout string, date time.Time) string { + return indexNamePrefix + date.UTC().Format(indexDateLayout) +} diff --git a/plugin/storage/es/samplingstore/storage_test.go b/plugin/storage/es/samplingstore/storage_test.go new file mode 100644 index 00000000000..7950cfde7dc --- /dev/null +++ b/plugin/storage/es/samplingstore/storage_test.go @@ -0,0 +1,461 @@ +// Copyright (c) 2024 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package samplingstore + +import ( + "encoding/json" + "errors" + "strings" + "testing" + "time" + + "github.com/olivere/elastic" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + "go.uber.org/zap" + + samplemodel "github.com/jaegertracing/jaeger/cmd/collector/app/sampling/model" + "github.com/jaegertracing/jaeger/pkg/es" + "github.com/jaegertracing/jaeger/pkg/es/mocks" + "github.com/jaegertracing/jaeger/pkg/testutils" + "github.com/jaegertracing/jaeger/plugin/storage/es/samplingstore/dbmodel" +) + +const defaultMaxDocCount = 10_000 + +type samplingStorageTest struct { + client *mocks.Client + logger *zap.Logger + logBuffer *testutils.Buffer + storage *SamplingStore +} + +func withEsSampling(indexPrefix, indexDateLayout string, maxDocCount int, fn func(w *samplingStorageTest)) { + client := &mocks.Client{} + logger, logBuffer := testutils.NewLogger() + w := &samplingStorageTest{ + client: client, + logger: logger, + logBuffer: logBuffer, + storage: NewSamplingStore(SamplingStoreParams{ + Client: func() es.Client { return client }, + Logger: logger, + IndexPrefix: indexPrefix, + IndexDateLayout: indexDateLayout, + MaxDocCount: maxDocCount, + }), + } + fn(w) +} + +func TestNewIndexPrefix(t *testing.T) { + tests := []struct { + name string + prefix string + expected string + }{ + { + name: "without prefix", + prefix: "", + expected: "", + }, + { + name: "with prefix", + prefix: "foo", + expected: "foo-", + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + client := &mocks.Client{} + r := NewSamplingStore(SamplingStoreParams{ + Client: func() es.Client { return client }, + Logger: zap.NewNop(), + IndexPrefix: test.prefix, + IndexDateLayout: "2006-01-02", + MaxDocCount: defaultMaxDocCount, + }) + assert.Equal(t, test.expected+samplingIndex, r.samplingIndexPrefix) + }) + } +} + +func TestGetReadIndices(t *testing.T) { + test := struct { + name string + start time.Time + end time.Time + }{ + name: "", + start: time.Date(2024, time.February, 10, 0, 0, 0, 0, time.UTC), + end: time.Date(2024, time.February, 12, 0, 0, 0, 0, time.UTC), + } + t.Run(test.name, func(t *testing.T) { + expectedIndices := []string{ + "prefix-jaeger-sampling-2024-02-12", + "prefix-jaeger-sampling-2024-02-11", + "prefix-jaeger-sampling-2024-02-10", + } + rollover := -time.Hour * 24 + indices := getReadIndices("prefix-jaeger-sampling-", "2006-01-02", test.start, test.end, rollover) + assert.Equal(t, expectedIndices, indices) + }) +} + +func TestGetLatestIndices(t *testing.T) { + tests := []struct { + name string + indexDateLayout string + maxDuration time.Duration + expectedIndices []string + expectedError string + IndexExistError error + indexExist bool + }{ + { + name: "with index", + indexDateLayout: "2006-01-02", + maxDuration: 24 * time.Hour, + expectedIndices: []string{indexWithDate("", "2006-01-02", time.Now().UTC())}, + expectedError: "", + indexExist: true, + }, + { + name: "without index", + indexDateLayout: "2006-01-02", + maxDuration: 72 * time.Hour, + expectedError: "falied to find latest index", + indexExist: false, + }, + { + name: "check index existence", + indexDateLayout: "2006-01-02", + maxDuration: 24 * time.Hour, + expectedError: "failed to check index existence: fail", + indexExist: true, + IndexExistError: errors.New("fail"), + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + withEsSampling("", test.indexDateLayout, defaultMaxDocCount, func(w *samplingStorageTest) { + indexService := &mocks.IndicesExistsService{} + w.client.On("IndexExists", mock.Anything).Return(indexService) + indexService.On("Do", mock.Anything).Return(test.indexExist, test.IndexExistError) + clientFnMock := w.storage.client() + actualIndices, err := getLatestIndices("", test.indexDateLayout, clientFnMock, -24*time.Hour, test.maxDuration) + if test.expectedError != "" { + require.EqualError(t, err, test.expectedError) + assert.Nil(t, actualIndices) + } else { + require.NoError(t, err) + require.Equal(t, test.expectedIndices, actualIndices) + } + }) + }) + } +} + +func TestInsertThroughput(t *testing.T) { + test := struct { + name string + expectedError string + }{ + name: "insert throughput", + expectedError: "", + } + + t.Run(test.name, func(t *testing.T) { + withEsSampling("", "2006-01-02", defaultMaxDocCount, func(w *samplingStorageTest) { + throughputs := []*samplemodel.Throughput{ + {Service: "my-svc", Operation: "op"}, + {Service: "our-svc", Operation: "op2"}, + } + fixedTime := time.Now() + indexName := indexWithDate("", "2006-01-02", fixedTime) + writeService := &mocks.IndexService{} + w.client.On("Index").Return(writeService) + writeService.On("Index", stringMatcher(indexName)).Return(writeService) + writeService.On("Type", stringMatcher(throughputType)).Return(writeService) + writeService.On("BodyJson", mock.Anything).Return(writeService) + writeService.On("Add", mock.Anything) + err := w.storage.InsertThroughput(throughputs) + if test.expectedError != "" { + require.EqualError(t, err, test.expectedError) + } else { + require.NoError(t, err) + } + }) + }) +} + +func TestInsertProbabilitiesAndQPS(t *testing.T) { + test := struct { + name string + expectedError string + }{ + name: "insert probabilities and qps", + expectedError: "", + } + + t.Run(test.name, func(t *testing.T) { + withEsSampling("", "2006-01-02", defaultMaxDocCount, func(w *samplingStorageTest) { + pAQ := dbmodel.ProbabilitiesAndQPS{ + Hostname: "dell11eg843d", + Probabilities: samplemodel.ServiceOperationProbabilities{"new-srv": {"op": 0.1}}, + QPS: samplemodel.ServiceOperationQPS{"new-srv": {"op": 4}}, + } + fixedTime := time.Now() + indexName := indexWithDate("", "2006-01-02", fixedTime) + writeService := &mocks.IndexService{} + w.client.On("Index").Return(writeService) + writeService.On("Index", stringMatcher(indexName)).Return(writeService) + writeService.On("Type", stringMatcher(probabilitiesType)).Return(writeService) + writeService.On("BodyJson", mock.Anything).Return(writeService) + writeService.On("Add", mock.Anything) + err := w.storage.InsertProbabilitiesAndQPS(pAQ.Hostname, pAQ.Probabilities, pAQ.QPS) + if test.expectedError != "" { + require.EqualError(t, err, test.expectedError) + } else { + require.NoError(t, err) + } + }) + }) +} + +func TestGetThroughput(t *testing.T) { + mockIndex := "jaeger-sampling-" + time.Now().UTC().Format("2006-01-02") + goodThroughputs := `{ + "timestamp": "2024-02-08T12:00:00Z", + "throughputs": { + "Service": "my-svc", + "Operation": "op", + "Count": 10 + } + }` + tests := []struct { + name string + searchResult *elastic.SearchResult + searchError error + expectedError string + expectedOutput []*samplemodel.Throughput + indexPrefix string + maxDocCount int + index string + }{ + { + name: "good throughputs without prefix", + searchResult: createSearchResult(goodThroughputs), + expectedOutput: []*samplemodel.Throughput{ + { + Service: "my-svc", + Operation: "op", + Count: 10, + }, + }, + index: mockIndex, + maxDocCount: 1000, + }, + { + name: "good throughputs without prefix", + searchResult: createSearchResult(goodThroughputs), + expectedOutput: []*samplemodel.Throughput{ + { + Service: "my-svc", + Operation: "op", + Count: 10, + }, + }, + index: mockIndex, + maxDocCount: 1000, + }, + { + name: "good throughputs with prefix", + searchResult: createSearchResult(goodThroughputs), + expectedOutput: []*samplemodel.Throughput{ + { + Service: "my-svc", + Operation: "op", + Count: 10, + }, + }, + index: mockIndex, + indexPrefix: "foo", + maxDocCount: 1000, + }, + { + name: "bad throughputs", + searchResult: createSearchResult(`badJson{hello}world`), + expectedError: "unmarshalling documents failed: invalid character 'b' looking for beginning of value", + index: mockIndex, + }, + { + name: "search fails", + searchError: errors.New("search failure"), + expectedError: "failed to search for throughputs: search failure", + index: mockIndex, + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + withEsSampling(test.indexPrefix, "2006-01-02", defaultMaxDocCount, func(w *samplingStorageTest) { + searchService := &mocks.SearchService{} + if test.indexPrefix != "" { + test.indexPrefix += "-" + } + index := test.indexPrefix + test.index + w.client.On("Search", index).Return(searchService) + searchService.On("Size", mock.Anything).Return(searchService) + searchService.On("Query", mock.Anything).Return(searchService) + searchService.On("IgnoreUnavailable", true).Return(searchService) + searchService.On("Do", mock.Anything).Return(test.searchResult, test.searchError) + + actual, err := w.storage.GetThroughput(time.Now().Add(-time.Minute), time.Now()) + if test.expectedError != "" { + require.EqualError(t, err, test.expectedError) + assert.Nil(t, actual) + } else { + require.NoError(t, err) + assert.EqualValues(t, test.expectedOutput, actual) + } + }) + }) + } +} + +func TestGetLatestProbabilities(t *testing.T) { + mockIndex := "jaeger-sampling-" + time.Now().UTC().Format("2006-01-02") + goodProbabilities := `{ + "timestamp": "2024-02-08T12:00:00Z", + "probabilitiesandqps": { + "Hostname": "dell11eg843d", + "Probabilities": { + "new-srv": {"op": 0.1} + }, + "QPS": { + "new-srv": {"op": 4} + } + } + }` + tests := []struct { + name string + searchResult *elastic.SearchResult + searchError error + expectedOutput samplemodel.ServiceOperationProbabilities + expectedError string + maxDocCount int + index string + indexPresent bool + indexError error + indexPrefix string + }{ + { + name: "good probabilities without prefix", + searchResult: createSearchResult(goodProbabilities), + expectedOutput: samplemodel.ServiceOperationProbabilities{ + "new-srv": { + "op": 0.1, + }, + }, + index: mockIndex, + maxDocCount: 1000, + indexPresent: true, + }, + { + name: "good probabilities with prefix", + searchResult: createSearchResult(goodProbabilities), + expectedOutput: samplemodel.ServiceOperationProbabilities{ + "new-srv": { + "op": 0.1, + }, + }, + index: mockIndex, + maxDocCount: 1000, + indexPresent: true, + indexPrefix: "foo", + }, + { + name: "bad probabilities", + searchResult: createSearchResult(`badJson{hello}world`), + expectedError: "unmarshalling documents failed: invalid character 'b' looking for beginning of value", + index: mockIndex, + indexPresent: true, + }, + { + name: "search fail", + searchError: errors.New("search failure"), + expectedError: "failed to search for Latest Probabilities: search failure", + index: mockIndex, + indexPresent: true, + }, + { + name: "index check fail", + indexError: errors.New("index check failure"), + expectedError: "failed to get latest indices: failed to check index existence: index check failure", + index: mockIndex, + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + withEsSampling(test.indexPrefix, "2006-01-02", defaultMaxDocCount, func(w *samplingStorageTest) { + searchService := &mocks.SearchService{} + if test.indexPrefix != "" { + test.indexPrefix += "-" + } + index := test.indexPrefix + test.index + w.client.On("Search", index).Return(searchService) + searchService.On("Size", mock.Anything).Return(searchService) + searchService.On("IgnoreUnavailable", true).Return(searchService) + searchService.On("Do", mock.Anything).Return(test.searchResult, test.searchError) + + indicesexistsservice := &mocks.IndicesExistsService{} + w.client.On("IndexExists", index).Return(indicesexistsservice) + indicesexistsservice.On("Do", mock.Anything).Return(test.indexPresent, test.indexError) + + actual, err := w.storage.GetLatestProbabilities() + if test.expectedError != "" { + require.EqualError(t, err, test.expectedError) + assert.Nil(t, actual) + } else { + require.NoError(t, err) + assert.EqualValues(t, test.expectedOutput, actual) + } + }) + }) + } +} + +func createSearchResult(rawJsonStr string) *elastic.SearchResult { + rawJsonArr := []byte(rawJsonStr) + hits := make([]*elastic.SearchHit, 1) + hits[0] = &elastic.SearchHit{ + Source: (*json.RawMessage)(&rawJsonArr), + } + searchResult := &elastic.SearchResult{Hits: &elastic.SearchHits{Hits: hits}} + return searchResult +} + +func stringMatcher(q string) interface{} { + matchFunc := func(s string) bool { + return strings.Contains(s, q) + } + return mock.MatchedBy(matchFunc) +} + +func TestMain(m *testing.M) { + testutils.VerifyGoLeaks(m) +} diff --git a/plugin/storage/integration/elasticsearch_test.go b/plugin/storage/integration/elasticsearch_test.go index 8a687be23fe..7b67321a02b 100644 --- a/plugin/storage/integration/elasticsearch_test.go +++ b/plugin/storage/integration/elasticsearch_test.go @@ -41,6 +41,7 @@ import ( "github.com/jaegertracing/jaeger/pkg/testutils" "github.com/jaegertracing/jaeger/plugin/storage/es/dependencystore" "github.com/jaegertracing/jaeger/plugin/storage/es/mappings" + "github.com/jaegertracing/jaeger/plugin/storage/es/samplingstore" "github.com/jaegertracing/jaeger/plugin/storage/es/spanstore" ) @@ -100,13 +101,11 @@ func (s *ESStorageIntegration) getVersion() (uint, error) { return uint(esVersion), nil } -func (s *ESStorageIntegration) initializeES(allTagsAsFields, archive bool) error { +func (s *ESStorageIntegration) initializeES(t *testing.T, allTagsAsFields, archive bool) error { rawClient, err := elastic.NewClient( elastic.SetURL(queryURL), elastic.SetSniff(false)) - if err != nil { - return err - } + require.NoError(t, err) s.logger, _ = testutils.NewLogger() s.client = rawClient @@ -114,39 +113,65 @@ func (s *ESStorageIntegration) initializeES(allTagsAsFields, archive bool) error Addresses: []string{queryURL}, DiscoverNodesOnStart: false, }) - if err != nil { - return err - } + require.NoError(t, err) + + s.initSpanstore(t, allTagsAsFields, archive) + s.initSamplingStore(t) - if err := s.initSpanstore(allTagsAsFields, archive); err != nil { - return err - } s.CleanUp = func() error { - return s.esCleanUp(allTagsAsFields, archive) + s.esCleanUp(t, allTagsAsFields, archive) + return nil } s.Refresh = s.esRefresh - s.esCleanUp(allTagsAsFields, archive) + s.esCleanUp(t, allTagsAsFields, archive) // TODO: remove this flag after ES support returning spanKind when get operations s.GetOperationsMissingSpanKind = true return nil } -func (s *ESStorageIntegration) esCleanUp(allTagsAsFields, archive bool) error { +func (s *ESStorageIntegration) esCleanUp(t *testing.T, allTagsAsFields, archive bool) { _, err := s.client.DeleteIndex("*").Do(context.Background()) - if err != nil { - return err + require.NoError(t, err) + s.initSpanstore(t, allTagsAsFields, archive) +} + +func (s *ESStorageIntegration) initSamplingStore(t *testing.T) { + client := s.getEsClient(t) + mappingBuilder := mappings.MappingBuilder{ + TemplateBuilder: estemplate.TextTemplateBuilder{}, + Shards: 5, + Replicas: 1, + EsVersion: client.GetVersion(), + IndexPrefix: indexPrefix, + UseILM: false, } - return s.initSpanstore(allTagsAsFields, archive) + clientFn := func() estemplate.Client { return client } + samplingstore := samplingstore.NewSamplingStore( + samplingstore.SamplingStoreParams{ + Client: clientFn, + Logger: s.logger, + IndexPrefix: indexPrefix, + IndexDateLayout: indexDateLayout, + MaxDocCount: defaultMaxDocCount, + }) + sampleMapping, err := mappingBuilder.GetSamplingMappings() + require.NoError(t, err) + err = samplingstore.CreateTemplates(sampleMapping) + require.NoError(t, err) + s.SamplingStore = samplingstore } -func (s *ESStorageIntegration) initSpanstore(allTagsAsFields, archive bool) error { - bp, _ := s.client.BulkProcessor().BulkActions(1).FlushInterval(time.Nanosecond).Do(context.Background()) +func (s *ESStorageIntegration) getEsClient(t *testing.T) eswrapper.ClientWrapper { + bp, err := s.client.BulkProcessor().BulkActions(1).FlushInterval(time.Nanosecond).Do(context.Background()) + require.NoError(t, err) s.bulkProcessor = bp esVersion, err := s.getVersion() - if err != nil { - return err - } - client := eswrapper.WrapESClient(s.client, bp, esVersion, s.v8Client) + require.NoError(t, err) + return eswrapper.WrapESClient(s.client, bp, esVersion, s.v8Client) +} + +func (s *ESStorageIntegration) initSpanstore(t *testing.T, allTagsAsFields, archive bool) error { + client := s.getEsClient(t) mappingBuilder := mappings.MappingBuilder{ TemplateBuilder: estemplate.TextTemplateBuilder{}, Shards: 5, @@ -156,10 +181,7 @@ func (s *ESStorageIntegration) initSpanstore(allTagsAsFields, archive bool) erro UseILM: false, } spanMapping, serviceMapping, err := mappingBuilder.GetSpanServiceMappings() - if err != nil { - return err - } - + require.NoError(t, err) clientFn := func() estemplate.Client { return client } w := spanstore.NewSpanWriter( @@ -173,9 +195,7 @@ func (s *ESStorageIntegration) initSpanstore(allTagsAsFields, archive bool) erro Archive: archive, }) err = w.CreateTemplates(spanMapping, serviceMapping, indexPrefix) - if err != nil { - return err - } + require.NoError(t, err) tracer, _, closer := s.tracerProvider() defer closer() s.SpanWriter = w @@ -199,13 +219,9 @@ func (s *ESStorageIntegration) initSpanstore(allTagsAsFields, archive bool) erro }) depMapping, err := mappingBuilder.GetDependenciesMappings() - if err != nil { - return err - } + require.NoError(t, err) err = dependencyStore.CreateTemplates(depMapping) - if err != nil { - return err - } + require.NoError(t, err) s.DependencyReader = dependencyStore s.DependencyWriter = dependencyStore return nil @@ -238,7 +254,7 @@ func testElasticsearchStorage(t *testing.T, allTagsAsFields, archive bool) { t.Fatal(err) } s := &ESStorageIntegration{} - require.NoError(t, s.initializeES(allTagsAsFields, archive)) + s.initializeES(t, allTagsAsFields, archive) s.Fixtures = LoadAndParseQueryTestCases(t, "fixtures/queries_es.json") @@ -269,7 +285,7 @@ func TestElasticsearchStorage_IndexTemplates(t *testing.T) { t.Fatal(err) } s := &ESStorageIntegration{} - require.NoError(t, s.initializeES(true, false)) + s.initializeES(t, true, false) esVersion, err := s.getVersion() require.NoError(t, err) // TODO abstract this into pkg/es/client.IndexManagementLifecycleAPI @@ -288,8 +304,7 @@ func TestElasticsearchStorage_IndexTemplates(t *testing.T) { require.NoError(t, err) assert.Equal(t, 200, spanTemplateExistsResponse.StatusCode) } - err = s.cleanESIndexTemplates(t, indexPrefix) - require.NoError(t, err) + s.cleanESIndexTemplates(t, indexPrefix) } func (s *ESStorageIntegration) testArchiveTrace(t *testing.T) { diff --git a/plugin/storage/integration/es_index_cleaner_test.go b/plugin/storage/integration/es_index_cleaner_test.go index 6805c99a684..df19d463a0f 100644 --- a/plugin/storage/integration/es_index_cleaner_test.go +++ b/plugin/storage/integration/es_index_cleaner_test.go @@ -34,6 +34,7 @@ import ( const ( archiveIndexName = "jaeger-span-archive" dependenciesIndexName = "jaeger-dependencies-2019-01-01" + samplingIndexName = "jaeger-sampling-2019-01-01" spanIndexName = "jaeger-span-2019-01-01" serviceIndexName = "jaeger-service-2019-01-01" indexCleanerImage = "jaegertracing/jaeger-es-index-cleaner:latest" @@ -73,7 +74,8 @@ func TestIndexCleaner_doNotFailOnFullStorage(t *testing.T) { for _, test := range tests { _, err = client.DeleteIndex("*").Do(context.Background()) require.NoError(t, err) - err := createAllIndices(client, "") + // Create Indices with adaptive sampling disabled (set to false). + err := createAllIndices(client, "", false) require.NoError(t, err) err = runEsCleaner(1500, test.envs) require.NoError(t, err) @@ -87,9 +89,10 @@ func TestIndexCleaner(t *testing.T) { require.NoError(t, err) tests := []struct { - name string - envVars []string - expectedIndices []string + name string + envVars []string + expectedIndices []string + adaptiveSampling bool }{ { name: "RemoveDailyIndices", @@ -99,42 +102,55 @@ func TestIndexCleaner(t *testing.T) { "jaeger-span-000001", "jaeger-service-000001", "jaeger-dependencies-000001", "jaeger-span-000002", "jaeger-service-000002", "jaeger-dependencies-000002", "jaeger-span-archive-000001", "jaeger-span-archive-000002", }, + adaptiveSampling: false, }, { name: "RemoveRolloverIndices", envVars: []string{"ROLLOVER=true"}, expectedIndices: []string{ - archiveIndexName, spanIndexName, serviceIndexName, dependenciesIndexName, + archiveIndexName, spanIndexName, serviceIndexName, dependenciesIndexName, samplingIndexName, "jaeger-span-000002", "jaeger-service-000002", "jaeger-dependencies-000002", "jaeger-span-archive-000001", "jaeger-span-archive-000002", }, + adaptiveSampling: false, }, { name: "RemoveArchiveIndices", envVars: []string{"ARCHIVE=true"}, expectedIndices: []string{ - archiveIndexName, spanIndexName, serviceIndexName, dependenciesIndexName, + archiveIndexName, spanIndexName, serviceIndexName, dependenciesIndexName, samplingIndexName, "jaeger-span-000001", "jaeger-service-000001", "jaeger-dependencies-000001", "jaeger-span-000002", "jaeger-service-000002", "jaeger-dependencies-000002", "jaeger-span-archive-000002", }, + adaptiveSampling: false, + }, + { + name: "RemoveDailyIndices with adaptiveSampling", + envVars: []string{}, + expectedIndices: []string{ + archiveIndexName, + "jaeger-span-000001", "jaeger-service-000001", "jaeger-dependencies-000001", "jaeger-span-000002", "jaeger-service-000002", "jaeger-dependencies-000002", + "jaeger-span-archive-000001", "jaeger-span-archive-000002", "jaeger-sampling-000001", "jaeger-sampling-000002", + }, + adaptiveSampling: true, }, } for _, test := range tests { t.Run(fmt.Sprintf("%s_no_prefix, %s", test.name, test.envVars), func(t *testing.T) { - runIndexCleanerTest(t, client, v8Client, "", test.expectedIndices, test.envVars) + runIndexCleanerTest(t, client, v8Client, "", test.expectedIndices, test.envVars, test.adaptiveSampling) }) t.Run(fmt.Sprintf("%s_prefix, %s", test.name, test.envVars), func(t *testing.T) { - runIndexCleanerTest(t, client, v8Client, indexPrefix, test.expectedIndices, append(test.envVars, "INDEX_PREFIX="+indexPrefix)) + runIndexCleanerTest(t, client, v8Client, indexPrefix, test.expectedIndices, append(test.envVars, "INDEX_PREFIX="+indexPrefix), test.adaptiveSampling) }) } } -func runIndexCleanerTest(t *testing.T, client *elastic.Client, v8Client *elasticsearch8.Client, prefix string, expectedIndices, envVars []string) { +func runIndexCleanerTest(t *testing.T, client *elastic.Client, v8Client *elasticsearch8.Client, prefix string, expectedIndices, envVars []string, adaptiveSampling bool) { // make sure ES is clean _, err := client.DeleteIndex("*").Do(context.Background()) require.NoError(t, err) defer cleanESIndexTemplates(t, client, v8Client, prefix) - err = createAllIndices(client, prefix) + err = createAllIndices(client, prefix, adaptiveSampling) require.NoError(t, err) err = runEsCleaner(0, envVars) require.NoError(t, err) @@ -150,34 +166,37 @@ func runIndexCleanerTest(t *testing.T, client *elastic.Client, v8Client *elastic assert.ElementsMatch(t, indices, expected, fmt.Sprintf("indices found: %v, expected: %v", indices, expected)) } -func createAllIndices(client *elastic.Client, prefix string) error { +func createAllIndices(client *elastic.Client, prefix string, adaptiveSampling bool) error { prefixWithSeparator := prefix if prefix != "" { prefixWithSeparator = prefixWithSeparator + "-" } // create daily indices and archive index err := createEsIndices(client, []string{ - prefixWithSeparator + spanIndexName, prefixWithSeparator + serviceIndexName, - prefixWithSeparator + dependenciesIndexName, prefixWithSeparator + archiveIndexName, + prefixWithSeparator + spanIndexName, + prefixWithSeparator + serviceIndexName, + prefixWithSeparator + dependenciesIndexName, + prefixWithSeparator + samplingIndexName, + prefixWithSeparator + archiveIndexName, }) if err != nil { return err } // create rollover archive index and roll alias to the new index - err = runEsRollover("init", []string{"ARCHIVE=true", "INDEX_PREFIX=" + prefix}) + err = runEsRollover("init", []string{"ARCHIVE=true", "INDEX_PREFIX=" + prefix}, adaptiveSampling) if err != nil { return err } - err = runEsRollover("rollover", []string{"ARCHIVE=true", "INDEX_PREFIX=" + prefix, rolloverNowEnvVar}) + err = runEsRollover("rollover", []string{"ARCHIVE=true", "INDEX_PREFIX=" + prefix, rolloverNowEnvVar}, adaptiveSampling) if err != nil { return err } // create rollover main indices and roll over to the new index - err = runEsRollover("init", []string{"ARCHIVE=false", "INDEX_PREFIX=" + prefix}) + err = runEsRollover("init", []string{"ARCHIVE=false", "INDEX_PREFIX=" + prefix}, adaptiveSampling) if err != nil { return err } - err = runEsRollover("rollover", []string{"ARCHIVE=false", "INDEX_PREFIX=" + prefix, rolloverNowEnvVar}) + err = runEsRollover("rollover", []string{"ARCHIVE=false", "INDEX_PREFIX=" + prefix, rolloverNowEnvVar}, adaptiveSampling) if err != nil { return err } @@ -205,12 +224,12 @@ func runEsCleaner(days int, envs []string) error { return err } -func runEsRollover(action string, envs []string) error { +func runEsRollover(action string, envs []string, adaptiveSampling bool) error { var dockerEnv string for _, e := range envs { dockerEnv += fmt.Sprintf(" -e %s", e) } - args := fmt.Sprintf("docker run %s --rm --net=host %s %s http://%s", dockerEnv, rolloverImage, action, queryHostPort) + args := fmt.Sprintf("docker run %s --rm --net=host %s %s --adaptive-sampling=%t http://%s", dockerEnv, rolloverImage, action, adaptiveSampling, queryHostPort) cmd := exec.Command("/bin/sh", "-c", args) out, err := cmd.CombinedOutput() fmt.Println(string(out)) diff --git a/plugin/storage/integration/es_index_rollover_test.go b/plugin/storage/integration/es_index_rollover_test.go index a3f1ef99f16..853482dfe9c 100644 --- a/plugin/storage/integration/es_index_rollover_test.go +++ b/plugin/storage/integration/es_index_rollover_test.go @@ -48,7 +48,8 @@ func TestIndexRollover_FailIfILMNotPresent(t *testing.T) { // make sure ES is clean cleanES(t, client, defaultILMPolicyName) envVars := []string{"ES_USE_ILM=true"} - err = runEsRollover("init", envVars) + // Run the ES rollover test with adaptive sampling disabled (set to false). + err = runEsRollover("init", envVars, false) require.EqualError(t, err, "exit status 1") indices, err := client.IndexNames() require.NoError(t, err) @@ -84,7 +85,8 @@ func runCreateIndicesWithILM(t *testing.T, ilmPolicyName string) { if esVersion < 7 { cleanES(t, client, "") - err := runEsRollover("init", envVars) + // Run the ES rollover test with adaptive sampling disabled (set to false). + err := runEsRollover("init", envVars, false) require.EqualError(t, err, "exit status 1") indices, err1 := client.IndexNames() require.NoError(t, err1) @@ -93,17 +95,23 @@ func runCreateIndicesWithILM(t *testing.T, ilmPolicyName string) { } else { expectedIndices := []string{"jaeger-span-000001", "jaeger-service-000001", "jaeger-dependencies-000001"} t.Run(fmt.Sprintf("NoPrefix"), func(t *testing.T) { - runIndexRolloverWithILMTest(t, client, "", expectedIndices, envVars, ilmPolicyName) + runIndexRolloverWithILMTest(t, client, "", expectedIndices, envVars, ilmPolicyName, false) }) t.Run(fmt.Sprintf("WithPrefix"), func(t *testing.T) { - runIndexRolloverWithILMTest(t, client, indexPrefix, expectedIndices, append(envVars, "INDEX_PREFIX="+indexPrefix), ilmPolicyName) + runIndexRolloverWithILMTest(t, client, indexPrefix, expectedIndices, append(envVars, "INDEX_PREFIX="+indexPrefix), ilmPolicyName, false) + }) + t.Run(fmt.Sprintf("WithAdaptiveSampling"), func(t *testing.T) { + runIndexRolloverWithILMTest(t, client, indexPrefix, expectedIndices, append(envVars, "INDEX_PREFIX="+indexPrefix), ilmPolicyName, true) }) } } -func runIndexRolloverWithILMTest(t *testing.T, client *elastic.Client, prefix string, expectedIndices, envVars []string, ilmPolicyName string) { +func runIndexRolloverWithILMTest(t *testing.T, client *elastic.Client, prefix string, expectedIndices, envVars []string, ilmPolicyName string, adaptiveSampling bool) { writeAliases := []string{"jaeger-service-write", "jaeger-span-write", "jaeger-dependencies-write"} - + if adaptiveSampling { + writeAliases = append(writeAliases, "jaeger-sampling-write") + expectedIndices = append(expectedIndices, "jaeger-sampling-000001") + } // make sure ES is cleaned before test cleanES(t, client, ilmPolicyName) v8Client, err := createESV8Client() @@ -126,7 +134,7 @@ func runIndexRolloverWithILMTest(t *testing.T, client *elastic.Client, prefix st } // Run rollover with given EnvVars - err = runEsRollover("init", envVars) + err = runEsRollover("init", envVars, adaptiveSampling) require.NoError(t, err) indices, err := client.IndexNames() @@ -159,12 +167,12 @@ func createESV8Client() (*elasticsearch8.Client, error) { }) } -func runEsRollover(action string, envs []string) error { +func runEsRollover(action string, envs []string, adaptiveSampling bool) error { var dockerEnv string for _, e := range envs { dockerEnv += fmt.Sprintf(" -e %s", e) } - args := fmt.Sprintf("docker run %s --rm --net=host %s %s http://%s", dockerEnv, rolloverImage, action, queryHostPort) + args := fmt.Sprintf("docker run %s --rm --net=host %s %s --adaptive-sampling=%t http://%s", dockerEnv, rolloverImage, action, adaptiveSampling, queryHostPort) cmd := exec.Command("/bin/sh", "-c", args) out, err := cmd.CombinedOutput() fmt.Println(string(out))