Skip to content

Commit

Permalink
Add Elasticsearch storage support for adaptive sampling (#5158)
Browse files Browse the repository at this point in the history
## Which problem is this PR solving?
- #3305

## Description of the changes
- Implemented Elasticsearch storage for adaptive sampling

## How was this change tested?
- not tested yet

## Checklist
- [x] I have read
https://github.com/jaegertracing/jaeger/blob/master/CONTRIBUTING_GUIDELINES.md
- [x] I have signed all commits
- [x] I have added unit tests for the new functionality
- [x] I have run lint and test steps successfully
  - for `jaeger`: `make lint test`
  - for `jaeger-ui`: `yarn lint` and `yarn test`

---------

Signed-off-by: Pushkar Mishra <[email protected]>
Co-authored-by: Yuri Shkuro <[email protected]>
  • Loading branch information
Pushkarm029 and yurishkuro authored Feb 27, 2024
1 parent 7a04461 commit d72a9b0
Show file tree
Hide file tree
Showing 32 changed files with 1,204 additions and 93 deletions.
7 changes: 4 additions & 3 deletions cmd/es-index-cleaner/app/index_filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,9 @@ func (i *IndexFilter) filter(indices []client.Index) []client.Index {
// archive works only for rollover
reg, _ = regexp.Compile(fmt.Sprintf("^%sjaeger-span-archive-\\d{6}", i.IndexPrefix))
case i.Rollover:
reg, _ = regexp.Compile(fmt.Sprintf("^%sjaeger-(span|service|dependencies)-\\d{6}", i.IndexPrefix))
reg, _ = regexp.Compile(fmt.Sprintf("^%sjaeger-(span|service|dependencies|sampling)-\\d{6}", i.IndexPrefix))
default:
reg, _ = regexp.Compile(fmt.Sprintf("^%sjaeger-(span|service|dependencies)-\\d{4}%s\\d{2}%s\\d{2}", i.IndexPrefix, i.IndexDateSeparator, i.IndexDateSeparator))
reg, _ = regexp.Compile(fmt.Sprintf("^%sjaeger-(span|service|dependencies|sampling)-\\d{4}%s\\d{2}%s\\d{2}", i.IndexPrefix, i.IndexDateSeparator, i.IndexDateSeparator))
}

var filtered []client.Index
Expand All @@ -62,7 +62,8 @@ func (i *IndexFilter) filter(indices []client.Index) []client.Index {
if in.Aliases[i.IndexPrefix+"jaeger-span-write"] ||
in.Aliases[i.IndexPrefix+"jaeger-service-write"] ||
in.Aliases[i.IndexPrefix+"jaeger-span-archive-write"] ||
in.Aliases[i.IndexPrefix+"jaeger-dependencies-write"] {
in.Aliases[i.IndexPrefix+"jaeger-dependencies-write"] ||
in.Aliases[i.IndexPrefix+"jaeger-sampling-write"] {
continue
}
filtered = append(filtered, in)
Expand Down
25 changes: 25 additions & 0 deletions cmd/es-index-cleaner/app/index_filter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,16 @@ func testIndexFilter(t *testing.T, prefix string) {
CreationTime: time.Date(2020, time.August, 0o5, 15, 0, 0, 0, time.UTC),
Aliases: map[string]bool{},
},
{
Index: prefix + "jaeger-sampling-2020-08-06",
CreationTime: time.Date(2020, time.August, 0o6, 15, 0, 0, 0, time.UTC),
Aliases: map[string]bool{},
},
{
Index: prefix + "jaeger-sampling-2020-08-05",
CreationTime: time.Date(2020, time.August, 0o5, 15, 0, 0, 0, time.UTC),
Aliases: map[string]bool{},
},
{
Index: prefix + "jaeger-span-archive",
CreationTime: time.Date(2020, time.August, 0, 15, 0, 0, 0, time.UTC),
Expand Down Expand Up @@ -186,6 +196,11 @@ func testIndexFilter(t *testing.T, prefix string) {
CreationTime: time.Date(2020, time.August, 0o5, 15, 0, 0, 0, time.UTC),
Aliases: map[string]bool{},
},
{
Index: prefix + "jaeger-sampling-2020-08-05",
CreationTime: time.Date(2020, time.August, 0o5, 15, 0, 0, 0, time.UTC),
Aliases: map[string]bool{},
},
},
},
{
Expand Down Expand Up @@ -228,6 +243,16 @@ func testIndexFilter(t *testing.T, prefix string) {
CreationTime: time.Date(2020, time.August, 0o5, 15, 0, 0, 0, time.UTC),
Aliases: map[string]bool{},
},
{
Index: prefix + "jaeger-sampling-2020-08-06",
CreationTime: time.Date(2020, time.August, 0o6, 15, 0, 0, 0, time.UTC),
Aliases: map[string]bool{},
},
{
Index: prefix + "jaeger-sampling-2020-08-05",
CreationTime: time.Date(2020, time.August, 0o5, 15, 0, 0, 0, time.UTC),
Aliases: map[string]bool{},
},
},
},
{
Expand Down
4 changes: 4 additions & 0 deletions cmd/es-rollover/app/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ const (
ilmPolicyName = "es.ilm-policy-name"
timeout = "timeout"
skipDependencies = "skip-dependencies"
adaptiveSampling = "adaptive-sampling"
)

// Config holds the global configurations for the es rollover, common to all actions
Expand All @@ -42,6 +43,7 @@ type Config struct {
UseILM bool
Timeout int
SkipDependencies bool
AdaptiveSampling bool
}

// AddFlags adds flags
Expand All @@ -54,6 +56,7 @@ func AddFlags(flags *flag.FlagSet) {
flags.String(ilmPolicyName, "jaeger-ilm-policy", "The name of the ILM policy to use if ILM is active")
flags.Int(timeout, 120, "Number of seconds to wait for master node response")
flags.Bool(skipDependencies, false, "Disable rollover for dependencies index")
flags.Bool(adaptiveSampling, false, "Enable rollover for adaptive sampling index")
}

// InitFromViper initializes config from viper.Viper.
Expand All @@ -69,4 +72,5 @@ func (c *Config) InitFromViper(v *viper.Viper) {
c.UseILM = v.GetBool(useILM)
c.Timeout = v.GetInt(timeout)
c.SkipDependencies = v.GetBool(skipDependencies)
c.AdaptiveSampling = v.GetBool(adaptiveSampling)
}
2 changes: 2 additions & 0 deletions cmd/es-rollover/app/flags_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ func TestBindFlags(t *testing.T) {
"--es.use-ilm=true",
"--es.ilm-policy-name=jaeger-ilm",
"--skip-dependencies=true",
"--adaptive-sampling=true",
})
require.NoError(t, err)

Expand All @@ -53,4 +54,5 @@ func TestBindFlags(t *testing.T) {
assert.Equal(t, "qwerty123", c.Password)
assert.Equal(t, "jaeger-ilm", c.ILMPolicyName)
assert.True(t, c.SkipDependencies)
assert.True(t, c.AdaptiveSampling)
}
10 changes: 9 additions & 1 deletion cmd/es-rollover/app/index_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ type IndexOption struct {
}

// RolloverIndices return an array of indices to rollover
func RolloverIndices(archive bool, skipDependencies bool, prefix string) []IndexOption {
func RolloverIndices(archive bool, skipDependencies bool, adaptiveSampling bool, prefix string) []IndexOption {
if archive {
return []IndexOption{
{
Expand Down Expand Up @@ -65,6 +65,14 @@ func RolloverIndices(archive bool, skipDependencies bool, prefix string) []Index
})
}

if adaptiveSampling {
indexOptions = append(indexOptions, IndexOption{
prefix: prefix,
Mapping: "jaeger-sampling",
indexType: "jaeger-sampling",
})
}

return indexOptions
}

Expand Down
54 changes: 43 additions & 11 deletions cmd/es-rollover/app/index_options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ func TestRolloverIndices(t *testing.T) {
archive bool
prefix string
skipDependencies bool
adaptiveSampling bool
expected []expectedValues
}{
{
Expand Down Expand Up @@ -74,13 +75,6 @@ func TestRolloverIndices(t *testing.T) {
writeAliasName: "mytenant-jaeger-span-archive-write",
initialRolloverIndex: "mytenant-jaeger-span-archive-000001",
},
{
mapping: "jaeger-dependencies",
templateName: "mytenant-jaeger-dependencies",
readAliasName: "mytenant-jaeger-dependencies-read",
writeAliasName: "mytenant-jaeger-dependencies-write",
initialRolloverIndex: "mytenant-jaeger-dependencies-000001",
},
},
},
{
Expand All @@ -97,8 +91,9 @@ func TestRolloverIndices(t *testing.T) {
},
},
{
name: "with prefix",
prefix: "mytenant",
name: "with prefix",
prefix: "mytenant",
adaptiveSampling: true,
expected: []expectedValues{
{
mapping: "jaeger-span",
Expand All @@ -121,12 +116,41 @@ func TestRolloverIndices(t *testing.T) {
writeAliasName: "mytenant-jaeger-dependencies-write",
initialRolloverIndex: "mytenant-jaeger-dependencies-000001",
},
{
mapping: "jaeger-sampling",
templateName: "mytenant-jaeger-sampling",
readAliasName: "mytenant-jaeger-sampling-read",
writeAliasName: "mytenant-jaeger-sampling-write",
initialRolloverIndex: "mytenant-jaeger-sampling-000001",
},
},
},
{
name: "skip-dependency enable",
prefix: "mytenant",
skipDependencies: true,
expected: []expectedValues{
{
mapping: "jaeger-span",
templateName: "mytenant-jaeger-span",
readAliasName: "mytenant-jaeger-span-read",
writeAliasName: "mytenant-jaeger-span-write",
initialRolloverIndex: "mytenant-jaeger-span-000001",
},
{
mapping: "jaeger-service",
templateName: "mytenant-jaeger-service",
readAliasName: "mytenant-jaeger-service-read",
writeAliasName: "mytenant-jaeger-service-write",
initialRolloverIndex: "mytenant-jaeger-service-000001",
},
},
},
{
name: "dependency enable",
name: "adaptive sampling enable",
prefix: "mytenant",
skipDependencies: true,
adaptiveSampling: true,
expected: []expectedValues{
{
mapping: "jaeger-span",
Expand All @@ -142,6 +166,13 @@ func TestRolloverIndices(t *testing.T) {
writeAliasName: "mytenant-jaeger-service-write",
initialRolloverIndex: "mytenant-jaeger-service-000001",
},
{
mapping: "jaeger-sampling",
templateName: "mytenant-jaeger-sampling",
readAliasName: "mytenant-jaeger-sampling-read",
writeAliasName: "mytenant-jaeger-sampling-write",
initialRolloverIndex: "mytenant-jaeger-sampling-000001",
},
},
},
}
Expand All @@ -151,7 +182,8 @@ func TestRolloverIndices(t *testing.T) {
if test.prefix != "" {
test.prefix += "-"
}
result := RolloverIndices(test.archive, test.skipDependencies, test.prefix)
result := RolloverIndices(test.archive, test.skipDependencies, test.adaptiveSampling, test.prefix)
assert.Equal(t, len(test.expected), len(result))
for i, r := range result {
assert.Equal(t, test.expected[i].templateName, r.TemplateName())
assert.Equal(t, test.expected[i].mapping, r.Mapping)
Expand Down
3 changes: 2 additions & 1 deletion cmd/es-rollover/app/init/action.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ func (c Action) getMapping(version uint, templateName string) (string, error) {
PrioritySpanTemplate: int64(c.Config.PrioritySpanTemplate),
PriorityServiceTemplate: int64(c.Config.PriorityServiceTemplate),
PriorityDependenciesTemplate: int64(c.Config.PriorityDependenciesTemplate),
PrioritySamplingTemplate: int64(c.Config.PrioritySamplingTemplate),
Shards: int64(c.Config.Shards),
Replicas: int64(c.Config.Replicas),
IndexPrefix: c.Config.IndexPrefix,
Expand Down Expand Up @@ -73,7 +74,7 @@ func (c Action) Do() error {
return fmt.Errorf("ILM is supported only for ES version 7+")
}
}
rolloverIndices := app.RolloverIndices(c.Config.Archive, c.Config.SkipDependencies, c.Config.IndexPrefix)
rolloverIndices := app.RolloverIndices(c.Config.Archive, c.Config.SkipDependencies, c.Config.AdaptiveSampling, c.Config.IndexPrefix)
for _, indexName := range rolloverIndices {
if err := c.init(version, indexName); err != nil {
return err
Expand Down
6 changes: 5 additions & 1 deletion cmd/es-rollover/app/init/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ const (
prioritySpanTemplate = "priority-span-template"
priorityServiceTemplate = "priority-service-template"
priorityDependenciesTemplate = "priority-dependencies-template"
prioritySamplingTemplate = "priority-sampling-template"
)

// Config holds configuration for index cleaner binary.
Expand All @@ -38,6 +39,7 @@ type Config struct {
PrioritySpanTemplate int
PriorityServiceTemplate int
PriorityDependenciesTemplate int
PrioritySamplingTemplate int
}

// AddFlags adds flags for TLS to the FlagSet.
Expand All @@ -46,7 +48,8 @@ func (c *Config) AddFlags(flags *flag.FlagSet) {
flags.Int(replicas, 1, "Number of replicas")
flags.Int(prioritySpanTemplate, 0, "Priority of jaeger-span index template (ESv8 only)")
flags.Int(priorityServiceTemplate, 0, "Priority of jaeger-service index template (ESv8 only)")
flags.Int(priorityDependenciesTemplate, 0, "Priority of jaeger-dependecies index template (ESv8 only)")
flags.Int(priorityDependenciesTemplate, 0, "Priority of jaeger-dependencies index template (ESv8 only)")
flags.Int(prioritySamplingTemplate, 0, "Priority of jaeger-sampling index template (ESv8 only)")
}

// InitFromViper initializes config from viper.Viper.
Expand All @@ -56,4 +59,5 @@ func (c *Config) InitFromViper(v *viper.Viper) {
c.PrioritySpanTemplate = v.GetInt(prioritySpanTemplate)
c.PriorityServiceTemplate = v.GetInt(priorityServiceTemplate)
c.PriorityDependenciesTemplate = v.GetInt(priorityDependenciesTemplate)
c.PrioritySamplingTemplate = v.GetInt(prioritySamplingTemplate)
}
2 changes: 2 additions & 0 deletions cmd/es-rollover/app/init/flags_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ func TestBindFlags(t *testing.T) {
"--priority-span-template=300",
"--priority-service-template=301",
"--priority-dependencies-template=302",
"--priority-sampling-template=303",
})
require.NoError(t, err)

Expand All @@ -48,4 +49,5 @@ func TestBindFlags(t *testing.T) {
assert.Equal(t, 300, c.PrioritySpanTemplate)
assert.Equal(t, 301, c.PriorityServiceTemplate)
assert.Equal(t, 302, c.PriorityDependenciesTemplate)
assert.Equal(t, 303, c.PrioritySamplingTemplate)
}
2 changes: 1 addition & 1 deletion cmd/es-rollover/app/lookback/action.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ type Action struct {

// Do the lookback action
func (a *Action) Do() error {
rolloverIndices := app.RolloverIndices(a.Config.Archive, a.Config.SkipDependencies, a.Config.IndexPrefix)
rolloverIndices := app.RolloverIndices(a.Config.Archive, a.Config.SkipDependencies, a.Config.AdaptiveSampling, a.Config.IndexPrefix)
for _, indexName := range rolloverIndices {
if err := a.lookback(indexName); err != nil {
return err
Expand Down
2 changes: 1 addition & 1 deletion cmd/es-rollover/app/rollover/action.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ type Action struct {

// Do the rollover action
func (a *Action) Do() error {
rolloverIndices := app.RolloverIndices(a.Config.Archive, a.Config.SkipDependencies, a.Config.IndexPrefix)
rolloverIndices := app.RolloverIndices(a.Config.Archive, a.Config.SkipDependencies, a.Config.AdaptiveSampling, a.Config.IndexPrefix)
for _, indexName := range rolloverIndices {
if err := a.rollover(indexName); err != nil {
return err
Expand Down
1 change: 1 addition & 0 deletions cmd/esmapping-generator/app/renderer/render.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ var supportedMappings = map[string]struct{}{
"jaeger-span": {},
"jaeger-service": {},
"jaeger-dependencies": {},
"jaeger-sampling": {},
}

// GetMappingAsString returns rendered index templates as string
Expand Down
23 changes: 18 additions & 5 deletions pkg/es/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,12 @@ type Configuration struct {
IndexPrefix string `mapstructure:"index_prefix"`
IndexDateLayoutSpans string `mapstructure:"-"`
IndexDateLayoutServices string `mapstructure:"-"`
IndexDateLayoutSampling string `mapstructure:"-"`
IndexDateLayoutDependencies string `mapstructure:"-"`
IndexRolloverFrequencySpans string `mapstructure:"-"`
IndexRolloverFrequencyServices string `mapstructure:"-"`
IndexRolloverFrequencySampling string `mapstructure:"-"`
AdaptiveSamplingLookback time.Duration `mapstructure:"-"`
Tags TagsAsFields `mapstructure:"tags_as_fields"`
Enabled bool `mapstructure:"-"`
TLS tlscfg.Options `mapstructure:"tls"`
Expand Down Expand Up @@ -231,6 +234,9 @@ func (c *Configuration) ApplyDefaults(source *Configuration) {
if c.MaxSpanAge == 0 {
c.MaxSpanAge = source.MaxSpanAge
}
if c.AdaptiveSamplingLookback == 0 {
c.AdaptiveSamplingLookback = source.AdaptiveSamplingLookback
}
if c.NumShards == 0 {
c.NumShards = source.NumShards
}
Expand Down Expand Up @@ -286,15 +292,22 @@ func (c *Configuration) ApplyDefaults(source *Configuration) {

// GetIndexRolloverFrequencySpansDuration returns jaeger-span index rollover frequency duration
func (c *Configuration) GetIndexRolloverFrequencySpansDuration() time.Duration {
if c.IndexRolloverFrequencySpans == "hour" {
return -1 * time.Hour
}
return -24 * time.Hour
return getIndexRolloverFrequencyDuration(c.IndexRolloverFrequencySpans)
}

// GetIndexRolloverFrequencyServicesDuration returns jaeger-service index rollover frequency duration
func (c *Configuration) GetIndexRolloverFrequencyServicesDuration() time.Duration {
if c.IndexRolloverFrequencyServices == "hour" {
return getIndexRolloverFrequencyDuration(c.IndexRolloverFrequencyServices)
}

// GetIndexRolloverFrequencySamplingDuration returns jaeger-sampling index rollover frequency duration
func (c *Configuration) GetIndexRolloverFrequencySamplingDuration() time.Duration {
return getIndexRolloverFrequencyDuration(c.IndexRolloverFrequencySampling)
}

// GetIndexRolloverFrequencyDuration returns the index rollover frequency duration for the given frequency string
func getIndexRolloverFrequencyDuration(frequency string) time.Duration {
if frequency == "hour" {
return -1 * time.Hour
}
return -24 * time.Hour
Expand Down
Loading

0 comments on commit d72a9b0

Please sign in to comment.