From 1498f313455f2a7ae9fde7d9ffdf71cde5895510 Mon Sep 17 00:00:00 2001 From: Pavol Loffay Date: Mon, 9 Dec 2019 13:34:06 +0100 Subject: [PATCH 1/5] Use wildcard in Elasticsearch indices in reader Signed-off-by: Pavol Loffay --- plugin/storage/es/dependencystore/storage.go | 21 +++++----- .../es/dependencystore/storage_test.go | 14 +++---- plugin/storage/es/spanstore/reader.go | 42 +++++++------------ plugin/storage/es/spanstore/reader_test.go | 31 ++++---------- .../storage/es/spanstore/service_operation.go | 8 ++-- 5 files changed, 44 insertions(+), 72 deletions(-) diff --git a/plugin/storage/es/dependencystore/storage.go b/plugin/storage/es/dependencystore/storage.go index 90d0eea1901..5bf633446ed 100644 --- a/plugin/storage/es/dependencystore/storage.go +++ b/plugin/storage/es/dependencystore/storage.go @@ -36,10 +36,10 @@ const ( // DependencyStore handles all queries and insertions to ElasticSearch dependencies type DependencyStore struct { - ctx context.Context - client es.Client - logger *zap.Logger - indexPrefix string + ctx context.Context + client es.Client + logger *zap.Logger + index string } // NewDependencyStore returns a DependencyStore @@ -49,16 +49,16 @@ func NewDependencyStore(client es.Client, logger *zap.Logger, indexPrefix string prefix = indexPrefix + "-" } return &DependencyStore{ - ctx: context.Background(), - client: client, - logger: logger, - indexPrefix: prefix + dependencyIndex, + ctx: context.Background(), + client: client, + logger: logger, + index: prefix + dependencyIndex + "*", } } // WriteDependencies implements dependencystore.Writer#WriteDependencies. func (s *DependencyStore) WriteDependencies(ts time.Time, dependencies []model.DependencyLink) error { - indexName := indexWithDate(s.indexPrefix, ts) + indexName := indexWithDate(s.index, ts) if err := s.createIndex(indexName); err != nil { return err } @@ -83,8 +83,7 @@ func (s *DependencyStore) writeDependencies(indexName string, ts time.Time, depe // GetDependencies returns all interservice dependencies func (s *DependencyStore) GetDependencies(endTs time.Time, lookback time.Duration) ([]model.DependencyLink, error) { - indices := getIndices(s.indexPrefix, endTs, lookback) - searchResult, err := s.client.Search(indices...). + searchResult, err := s.client.Search(s.index). Size(10000). // the default elasticsearch allowed limit Query(buildTSQuery(endTs, lookback)). IgnoreUnavailable(true). diff --git a/plugin/storage/es/dependencystore/storage_test.go b/plugin/storage/es/dependencystore/storage_test.go index 91b71e9cc02..b6467b23b98 100644 --- a/plugin/storage/es/dependencystore/storage_test.go +++ b/plugin/storage/es/dependencystore/storage_test.go @@ -67,7 +67,7 @@ func TestNewSpanReaderIndexPrefix(t *testing.T) { for _, testCase := range testCases { client := &mocks.Client{} r := NewDependencyStore(client, zap.NewNop(), testCase.prefix) - assert.Equal(t, testCase.expected+dependencyIndex, r.indexPrefix) + assert.Equal(t, testCase.expected+dependencyIndex+"*", r.index) } } @@ -142,7 +142,7 @@ func TestGetDependencies(t *testing.T) { expectedError string expectedOutput []model.DependencyLink indexPrefix string - indices []interface{} + index string }{ { searchResult: createSearchResult(goodDependencies), @@ -153,23 +153,23 @@ func TestGetDependencies(t *testing.T) { CallCount: 12, }, }, - indices: []interface{}{"jaeger-dependencies-1995-04-21", "jaeger-dependencies-1995-04-20"}, + index: "jaeger-dependencies-*", }, { searchResult: createSearchResult(badDependencies), expectedError: "Unmarshalling ElasticSearch documents failed", - indices: []interface{}{"jaeger-dependencies-1995-04-21", "jaeger-dependencies-1995-04-20"}, + index: "jaeger-dependencies-*", }, { searchError: errors.New("search failure"), expectedError: "Failed to search for dependencies: search failure", - indices: []interface{}{"jaeger-dependencies-1995-04-21", "jaeger-dependencies-1995-04-20"}, + index: "jaeger-dependencies-*", }, { searchError: errors.New("search failure"), expectedError: "Failed to search for dependencies: search failure", indexPrefix: "foo", - indices: []interface{}{"foo-jaeger-dependencies-1995-04-21", "foo-jaeger-dependencies-1995-04-20"}, + index: "foo-jaeger-dependencies-*", }, } for _, testCase := range testCases { @@ -177,7 +177,7 @@ func TestGetDependencies(t *testing.T) { fixedTime := time.Date(1995, time.April, 21, 4, 21, 19, 95, time.UTC) searchService := &mocks.SearchService{} - r.client.On("Search", testCase.indices...).Return(searchService) + r.client.On("Search", testCase.index).Return(searchService) searchService.On("Size", mock.Anything).Return(searchService) searchService.On("Query", mock.Anything).Return(searchService) diff --git a/plugin/storage/es/spanstore/reader.go b/plugin/storage/es/spanstore/reader.go index 29124eda749..93f2a29430b 100644 --- a/plugin/storage/es/spanstore/reader.go +++ b/plugin/storage/es/spanstore/reader.go @@ -131,7 +131,7 @@ func NewSpanReader(p SpanReaderParams) *SpanReader { } } -type timeRangeIndexFn func(indexName string, startTime time.Time, endTime time.Time) []string +type timeRangeIndexFn func(indexName string) string type sourceFn func(query elastic.Query, nextTime uint64) *elastic.SearchSource @@ -143,16 +143,18 @@ func getTimeRangeIndexFn(archive, useReadWriteAliases bool) timeRangeIndexFn { } else { archivePrefix = archiveIndexSuffix } - return func(indexName string, startTime time.Time, endTime time.Time) []string { - return []string{archiveIndex(indexName, archivePrefix)} + return func(indexName string) string { + return archiveIndex(indexName, archivePrefix) } } if useReadWriteAliases { - return func(indices string, startTime time.Time, endTime time.Time) []string { - return []string{indices + "read"} + return func(index string) string { + return index + "read" } } - return timeRangeIndices + return func(indexName string) string { + return indexName + "*" + } } func getSourceFn(archive bool, maxNumSpans int) sourceFn { @@ -169,20 +171,6 @@ func getSourceFn(archive bool, maxNumSpans int) sourceFn { } } -// timeRangeIndices returns the array of indices that we need to query, based on query params -func timeRangeIndices(indexName string, startTime time.Time, endTime time.Time) []string { - var indices []string - firstIndex := indexWithDate(indexName, startTime) - currentIndex := indexWithDate(indexName, endTime) - for currentIndex != firstIndex { - indices = append(indices, currentIndex) - endTime = endTime.Add(-24 * time.Hour) - currentIndex = indexWithDate(indexName, endTime) - } - indices = append(indices, firstIndex) - return indices -} - func indexNames(prefix, index string) string { if prefix != "" { return prefix + indexPrefixSeparator + index @@ -239,8 +227,7 @@ func (s *SpanReader) unmarshalJSONSpan(esSpanRaw *elastic.SearchHit) (*dbmodel.S func (s *SpanReader) GetServices(ctx context.Context) ([]string, error) { span, ctx := opentracing.StartSpanFromContext(ctx, "GetServices") defer span.Finish() - currentTime := time.Now() - jaegerIndices := s.timeRangeIndices(s.serviceIndexPrefix, currentTime.Add(-s.maxSpanAge), currentTime) + jaegerIndices := s.timeRangeIndices(s.serviceIndexPrefix) return s.serviceOperationStorage.getServices(ctx, jaegerIndices) } @@ -251,8 +238,7 @@ func (s *SpanReader) GetOperations( ) ([]spanstore.Operation, error) { span, ctx := opentracing.StartSpanFromContext(ctx, "GetOperations") defer span.Finish() - currentTime := time.Now() - jaegerIndices := s.timeRangeIndices(s.serviceIndexPrefix, currentTime.Add(-s.maxSpanAge), currentTime) + jaegerIndices := s.timeRangeIndices(s.serviceIndexPrefix) operations, err := s.serviceOperationStorage.getOperations(ctx, jaegerIndices, query.ServiceName) if err != nil { return nil, err @@ -325,7 +311,7 @@ func (s *SpanReader) multiRead(ctx context.Context, traceIDs []model.TraceID, st // Add an hour in both directions so that traces that straddle two indexes are retrieved. // i.e starts in one and ends in another. - indices := s.timeRangeIndices(s.spanIndexPrefix, startTime.Add(-time.Hour), endTime.Add(time.Hour)) + index := s.timeRangeIndices(s.spanIndexPrefix) nextTime := model.TimeAsEpochMicroseconds(startTime.Add(-time.Hour)) searchAfterTime := make(map[model.TraceID]uint64) @@ -350,7 +336,7 @@ func (s *SpanReader) multiRead(ctx context.Context, traceIDs []model.TraceID, st } // set traceIDs to empty traceIDs = nil - results, err := s.client.MultiSearch().Add(searchRequests...).Index(indices...).Do(ctx) + results, err := s.client.MultiSearch().Add(searchRequests...).Index(index).Do(ctx) if err != nil { logErrorToSpan(childSpan, err) @@ -511,9 +497,9 @@ func (s *SpanReader) findTraceIDs(ctx context.Context, traceQuery *spanstore.Tra // } aggregation := s.buildTraceIDAggregation(traceQuery.NumTraces) boolQuery := s.buildFindTraceIDsQuery(traceQuery) - jaegerIndices := s.timeRangeIndices(s.spanIndexPrefix, traceQuery.StartTimeMin, traceQuery.StartTimeMax) + index := s.timeRangeIndices(s.spanIndexPrefix) - searchService := s.client.Search(jaegerIndices...). + searchService := s.client.Search(index). Size(0). // set to 0 because we don't want actual documents. Aggregation(traceIDAggregation, aggregation). IgnoreUnavailable(true). diff --git a/plugin/storage/es/spanstore/reader_test.go b/plugin/storage/es/spanstore/reader_test.go index 62f7e63eb58..c8dfadad52f 100644 --- a/plugin/storage/es/spanstore/reader_test.go +++ b/plugin/storage/es/spanstore/reader_test.go @@ -143,21 +143,19 @@ func TestSpanReaderIndices(t *testing.T) { client := &mocks.Client{} logger, _ := testutils.NewLogger() metricsFactory := metricstest.NewFactory(0) - date := time.Date(2019, 10, 10, 5, 0, 0, 0, time.UTC) - dateFormat := date.UTC().Format("2006-01-02") testCases := []struct { index string params SpanReaderParams }{ {params: SpanReaderParams{Client: client, Logger: logger, MetricsFactory: metricsFactory, IndexPrefix: "", Archive: false}, - index: spanIndex + dateFormat}, + index: spanIndex + "*"}, {params: SpanReaderParams{Client: client, Logger: logger, MetricsFactory: metricsFactory, IndexPrefix: "", UseReadWriteAliases: true}, index: spanIndex + "read"}, {params: SpanReaderParams{Client: client, Logger: logger, MetricsFactory: metricsFactory, IndexPrefix: "foo:", Archive: false}, - index: "foo:" + indexPrefixSeparator + spanIndex + dateFormat}, + index: "foo:" + indexPrefixSeparator + spanIndex + "*"}, {params: SpanReaderParams{Client: client, Logger: logger, MetricsFactory: metricsFactory, IndexPrefix: "foo:", UseReadWriteAliases: true}, index: "foo:-" + spanIndex + "read"}, @@ -173,8 +171,8 @@ func TestSpanReaderIndices(t *testing.T) { } for _, testCase := range testCases { r := NewSpanReader(testCase.params) - actual := r.timeRangeIndices(r.spanIndexPrefix, date, date) - assert.Equal(t, []string{testCase.index}, actual) + actual := r.timeRangeIndices(r.spanIndexPrefix) + assert.Equal(t, testCase.index, actual) } } @@ -430,42 +428,31 @@ func TestSpanReader_esJSONtoJSONSpanModelError(t *testing.T) { func TestSpanReaderFindIndices(t *testing.T) { today := time.Date(1995, time.April, 21, 4, 12, 19, 95, time.UTC) - yesterday := today.AddDate(0, 0, -1) - twoDaysAgo := today.AddDate(0, 0, -2) testCases := []struct { startTime time.Time endTime time.Time - expected []string + expected string }{ { startTime: today.Add(-time.Millisecond), endTime: today, - expected: []string{ - indexWithDate(spanIndex, today), - }, + expected: spanIndex + "*", }, { startTime: today.Add(-13 * time.Hour), endTime: today, - expected: []string{ - indexWithDate(spanIndex, today), - indexWithDate(spanIndex, yesterday), - }, + expected: spanIndex + "*", }, { startTime: today.Add(-48 * time.Hour), endTime: today, - expected: []string{ - indexWithDate(spanIndex, today), - indexWithDate(spanIndex, yesterday), - indexWithDate(spanIndex, twoDaysAgo), - }, + expected: spanIndex + "*", }, } withSpanReader(func(r *spanReaderTest) { for _, testCase := range testCases { - actual := r.reader.timeRangeIndices(spanIndex, testCase.startTime, testCase.endTime) + actual := r.reader.timeRangeIndices(spanIndex) assert.EqualValues(t, testCase.expected, actual) } }) diff --git a/plugin/storage/es/spanstore/service_operation.go b/plugin/storage/es/spanstore/service_operation.go index e394a0a20eb..63c6fa4458a 100644 --- a/plugin/storage/es/spanstore/service_operation.go +++ b/plugin/storage/es/spanstore/service_operation.go @@ -77,10 +77,10 @@ func (s *ServiceOperationStorage) Write(indexName string, jsonSpan *dbmodel.Span } } -func (s *ServiceOperationStorage) getServices(context context.Context, indices []string) ([]string, error) { +func (s *ServiceOperationStorage) getServices(context context.Context, index string) ([]string, error) { serviceAggregation := getServicesAggregation() - searchService := s.client.Search(indices...). + searchService := s.client.Search(index). Size(0). // set to 0 because we don't want actual documents. IgnoreUnavailable(true). Aggregation(servicesAggregation, serviceAggregation) @@ -106,11 +106,11 @@ func getServicesAggregation() elastic.Query { Size(defaultDocCount) // Must set to some large number. ES deprecated size omission for aggregating all. https://github.com/elastic/elasticsearch/issues/18838 } -func (s *ServiceOperationStorage) getOperations(context context.Context, indices []string, service string) ([]string, error) { +func (s *ServiceOperationStorage) getOperations(context context.Context, index string, service string) ([]string, error) { serviceQuery := elastic.NewTermQuery(serviceName, service) serviceFilter := getOperationsAggregation() - searchService := s.client.Search(indices...). + searchService := s.client.Search(index). Size(0). Query(serviceQuery). IgnoreUnavailable(true). From 239a4f57941b25a78952c51a1a094d1242016ff8 Mon Sep 17 00:00:00 2001 From: Pavol Loffay Date: Mon, 9 Dec 2019 14:13:28 +0100 Subject: [PATCH 2/5] Remove other references to lookback Signed-off-by: Pavol Loffay --- pkg/es/config/config.go | 10 ------ plugin/storage/es/dependencystore/storage.go | 12 ------- .../es/dependencystore/storage_test.go | 33 ------------------- plugin/storage/es/factory.go | 1 - plugin/storage/es/options.go | 9 ++--- plugin/storage/es/options_test.go | 5 --- plugin/storage/es/spanstore/reader.go | 10 +++--- plugin/storage/es/spanstore/reader_test.go | 5 +-- 8 files changed, 8 insertions(+), 77 deletions(-) diff --git a/pkg/es/config/config.go b/pkg/es/config/config.go index 9db6be5051c..6ce332bf87d 100644 --- a/pkg/es/config/config.go +++ b/pkg/es/config/config.go @@ -47,7 +47,6 @@ type Configuration struct { AllowTokenFromContext bool Sniffer bool // https://github.com/olivere/elastic/wiki/Sniffing MaxNumSpans int // defines maximum number of spans to fetch from storage per query - MaxSpanAge time.Duration `yaml:"max_span_age"` // configures the maximum lookback on span reads NumShards int64 `yaml:"shards"` NumReplicas int64 `yaml:"replicas"` Timeout time.Duration `validate:"min=500"` @@ -80,7 +79,6 @@ type ClientBuilder interface { NewClient(logger *zap.Logger, metricsFactory metrics.Factory) (es.Client, error) GetNumShards() int64 GetNumReplicas() int64 - GetMaxSpanAge() time.Duration GetMaxNumSpans() int GetIndexPrefix() string GetTagsFilePath() string @@ -187,9 +185,6 @@ func (c *Configuration) ApplyDefaults(source *Configuration) { if !c.Sniffer { c.Sniffer = source.Sniffer } - if c.MaxSpanAge == 0 { - c.MaxSpanAge = source.MaxSpanAge - } if c.MaxNumSpans == 0 { c.MaxNumSpans = source.MaxNumSpans } @@ -223,11 +218,6 @@ func (c *Configuration) GetNumReplicas() int64 { return c.NumReplicas } -// GetMaxSpanAge returns max span age from Configuration -func (c *Configuration) GetMaxSpanAge() time.Duration { - return c.MaxSpanAge -} - // GetMaxNumSpans returns max spans allowed per query from Configuration func (c *Configuration) GetMaxNumSpans() int { return c.MaxNumSpans diff --git a/plugin/storage/es/dependencystore/storage.go b/plugin/storage/es/dependencystore/storage.go index 5bf633446ed..29973cdc9a4 100644 --- a/plugin/storage/es/dependencystore/storage.go +++ b/plugin/storage/es/dependencystore/storage.go @@ -109,18 +109,6 @@ func buildTSQuery(endTs time.Time, lookback time.Duration) elastic.Query { return elastic.NewRangeQuery("timestamp").Gte(endTs.Add(-lookback)).Lte(endTs) } -func getIndices(prefix string, ts time.Time, lookback time.Duration) []string { - var indices []string - firstIndex := indexWithDate(prefix, ts.Add(-lookback)) - currentIndex := indexWithDate(prefix, ts) - for currentIndex != firstIndex { - indices = append(indices, currentIndex) - ts = ts.Add(-24 * time.Hour) - currentIndex = indexWithDate(prefix, ts) - } - return append(indices, firstIndex) -} - func indexWithDate(indexNamePrefix string, date time.Time) string { return indexNamePrefix + date.UTC().Format("2006-01-02") } diff --git a/plugin/storage/es/dependencystore/storage_test.go b/plugin/storage/es/dependencystore/storage_test.go index b6467b23b98..c2facfd6621 100644 --- a/plugin/storage/es/dependencystore/storage_test.go +++ b/plugin/storage/es/dependencystore/storage_test.go @@ -206,39 +206,6 @@ func createSearchResult(dependencyLink string) *elastic.SearchResult { return searchResult } -func TestGetIndices(t *testing.T) { - fixedTime := time.Date(1995, time.April, 21, 4, 12, 19, 95, time.UTC) - testCases := []struct { - expected []string - lookback time.Duration - prefix string - }{ - { - expected: []string{indexWithDate("", fixedTime), indexWithDate("", fixedTime.Add(-24*time.Hour))}, - lookback: 23 * time.Hour, - prefix: "", - }, - { - expected: []string{indexWithDate("", fixedTime), indexWithDate("", fixedTime.Add(-24*time.Hour))}, - lookback: 13 * time.Hour, - prefix: "", - }, - { - expected: []string{indexWithDate("foo:", fixedTime)}, - lookback: 1 * time.Hour, - prefix: "foo:", - }, - { - expected: []string{indexWithDate("foo-", fixedTime)}, - lookback: 0, - prefix: "foo-", - }, - } - for _, testCase := range testCases { - assert.EqualValues(t, testCase.expected, getIndices(testCase.prefix, fixedTime, testCase.lookback)) - } -} - // stringMatcher can match a string argument when it contains a specific substring q func stringMatcher(q string) interface{} { matchFunc := func(s string) bool { diff --git a/plugin/storage/es/factory.go b/plugin/storage/es/factory.go index 7960470ce6f..2003a1d8004 100644 --- a/plugin/storage/es/factory.go +++ b/plugin/storage/es/factory.go @@ -153,7 +153,6 @@ func createSpanReader( Logger: logger, MetricsFactory: mFactory, MaxNumSpans: cfg.GetMaxNumSpans(), - MaxSpanAge: cfg.GetMaxSpanAge(), IndexPrefix: cfg.GetIndexPrefix(), TagDotReplacement: cfg.GetTagDotReplacement(), UseReadWriteAliases: cfg.GetUseReadWriteAliases(), diff --git a/plugin/storage/es/options.go b/plugin/storage/es/options.go index 9b6b52c32e6..3e467624051 100644 --- a/plugin/storage/es/options.go +++ b/plugin/storage/es/options.go @@ -86,7 +86,6 @@ func NewOptions(primaryNamespace string, otherNamespaces ...string) *Options { Username: "", Password: "", Sniffer: false, - MaxSpanAge: 72 * time.Hour, MaxNumSpans: 10000, NumShards: 5, NumReplicas: 1, @@ -147,8 +146,8 @@ func addFlags(flagSet *flag.FlagSet, nsConfig *namespaceConfig) { "Timeout used for queries. A Timeout of zero means no timeout") flagSet.Duration( nsConfig.namespace+suffixMaxSpanAge, - nsConfig.MaxSpanAge, - "The maximum lookback for spans in Elasticsearch") + time.Hour*72, + "(deprecated) The maximum lookback for spans in Elasticsearch. Now all indices are searched.") flagSet.Int( nsConfig.namespace+suffixMaxNumSpans, nsConfig.MaxNumSpans, @@ -217,8 +216,7 @@ func addFlags(flagSet *flag.FlagSet, nsConfig *namespaceConfig) { nsConfig.namespace+suffixReadAlias, nsConfig.UseReadWriteAliases, "(experimental) Use read and write aliases for indices. Use this option with Elasticsearch rollover "+ - "API. It requires an external component to create aliases before startup and then performing its management. "+ - "Note that "+nsConfig.namespace+suffixMaxSpanAge+" is not taken into the account and has to be substituted by external component managing read alias.") + "API. It requires an external component to create aliases before startup and then performing its management.") flagSet.Bool( nsConfig.namespace+suffixCreateIndexTemplate, nsConfig.CreateIndexTemplates, @@ -249,7 +247,6 @@ func initFromViper(cfg *namespaceConfig, v *viper.Viper) { cfg.TokenFilePath = v.GetString(cfg.namespace + suffixTokenPath) cfg.Sniffer = v.GetBool(cfg.namespace + suffixSniffer) cfg.servers = stripWhiteSpace(v.GetString(cfg.namespace + suffixServerURLs)) - cfg.MaxSpanAge = v.GetDuration(cfg.namespace + suffixMaxSpanAge) cfg.MaxNumSpans = v.GetInt(cfg.namespace + suffixMaxNumSpans) cfg.NumShards = v.GetInt64(cfg.namespace + suffixNumShards) cfg.NumReplicas = v.GetInt64(cfg.namespace + suffixNumReplicas) diff --git a/plugin/storage/es/options_test.go b/plugin/storage/es/options_test.go index 10ca2e2e01e..f4a55ad2774 100644 --- a/plugin/storage/es/options_test.go +++ b/plugin/storage/es/options_test.go @@ -17,7 +17,6 @@ package es import ( "testing" - "time" "github.com/stretchr/testify/assert" @@ -32,7 +31,6 @@ func TestOptions(t *testing.T) { assert.NotEmpty(t, primary.Servers) assert.Equal(t, int64(5), primary.NumShards) assert.Equal(t, int64(1), primary.NumReplicas) - assert.Equal(t, 72*time.Hour, primary.MaxSpanAge) assert.False(t, primary.Sniffer) aux := opts.Get("archive") @@ -50,7 +48,6 @@ func TestOptionsWithFlags(t *testing.T) { "--es.password=world", "--es.token-file=/foo/bar", "--es.sniffer=true", - "--es.max-span-age=48h", "--es.num-shards=20", "--es.num-replicas=10", // a couple overrides @@ -66,7 +63,6 @@ func TestOptionsWithFlags(t *testing.T) { assert.Equal(t, "hello", primary.Username) assert.Equal(t, "/foo/bar", primary.TokenFilePath) assert.Equal(t, []string{"1.1.1.1", "2.2.2.2"}, primary.Servers) - assert.Equal(t, 48*time.Hour, primary.MaxSpanAge) assert.True(t, primary.Sniffer) assert.Equal(t, true, primary.TLS.Enabled) assert.Equal(t, true, primary.TLS.SkipHostVerify) @@ -77,7 +73,6 @@ func TestOptionsWithFlags(t *testing.T) { assert.Equal(t, "world", aux.Password) assert.Equal(t, int64(20), aux.NumShards) assert.Equal(t, int64(10), aux.NumReplicas) - assert.Equal(t, 24*time.Hour, aux.MaxSpanAge) assert.True(t, aux.Sniffer) } diff --git a/plugin/storage/es/spanstore/reader.go b/plugin/storage/es/spanstore/reader.go index 93f2a29430b..d488ff3ba68 100644 --- a/plugin/storage/es/spanstore/reader.go +++ b/plugin/storage/es/spanstore/reader.go @@ -94,7 +94,6 @@ type SpanReader struct { logger *zap.Logger // The age of the oldest service/operation we will look for. Because indices in ElasticSearch are by day, // this will be rounded down to UTC 00:00 of that day. - maxSpanAge time.Duration serviceOperationStorage *ServiceOperationStorage spanIndexPrefix string serviceIndexPrefix string @@ -107,7 +106,6 @@ type SpanReader struct { type SpanReaderParams struct { Client es.Client Logger *zap.Logger - MaxSpanAge time.Duration MaxNumSpans int MetricsFactory metrics.Factory IndexPrefix string @@ -121,7 +119,6 @@ func NewSpanReader(p SpanReaderParams) *SpanReader { return &SpanReader{ client: p.Client, logger: p.Logger, - maxSpanAge: p.MaxSpanAge, serviceOperationStorage: NewServiceOperationStorage(p.Client, p.Logger, 0), // the decorator takes care of metrics spanIndexPrefix: indexNames(p.IndexPrefix, spanIndex), serviceIndexPrefix: indexNames(p.IndexPrefix, serviceIndex), @@ -183,7 +180,7 @@ func (s *SpanReader) GetTrace(ctx context.Context, traceID model.TraceID) (*mode span, ctx := opentracing.StartSpanFromContext(ctx, "GetTrace") defer span.Finish() currentTime := time.Now() - traces, err := s.multiRead(ctx, []model.TraceID{traceID}, currentTime.Add(-s.maxSpanAge), currentTime) + traces, err := s.multiRead(ctx, []model.TraceID{traceID}, currentTime.Add(-time.Hour*24*900)) if err != nil { return nil, err } @@ -276,7 +273,7 @@ func (s *SpanReader) FindTraces(ctx context.Context, traceQuery *spanstore.Trace if err != nil { return nil, err } - return s.multiRead(ctx, uniqueTraceIDs, traceQuery.StartTimeMin, traceQuery.StartTimeMax) + return s.multiRead(ctx, uniqueTraceIDs, traceQuery.StartTimeMin) } // FindTraceIDs retrieves traces IDs that match the traceQuery @@ -299,7 +296,7 @@ func (s *SpanReader) FindTraceIDs(ctx context.Context, traceQuery *spanstore.Tra return convertTraceIDsStringsToModels(esTraceIDs) } -func (s *SpanReader) multiRead(ctx context.Context, traceIDs []model.TraceID, startTime, endTime time.Time) ([]*model.Trace, error) { +func (s *SpanReader) multiRead(ctx context.Context, traceIDs []model.TraceID, startTime time.Time) ([]*model.Trace, error) { childSpan, _ := opentracing.StartSpanFromContext(ctx, "multiRead") childSpan.LogFields(otlog.Object("trace_ids", traceIDs)) @@ -309,6 +306,7 @@ func (s *SpanReader) multiRead(ctx context.Context, traceIDs []model.TraceID, st return []*model.Trace{}, nil } + // TODO // Add an hour in both directions so that traces that straddle two indexes are retrieved. // i.e starts in one and ends in another. index := s.timeRangeIndices(s.spanIndexPrefix) diff --git a/plugin/storage/es/spanstore/reader_test.go b/plugin/storage/es/spanstore/reader_test.go index c8dfadad52f..cb08dd9e966 100644 --- a/plugin/storage/es/spanstore/reader_test.go +++ b/plugin/storage/es/spanstore/reader_test.go @@ -98,7 +98,6 @@ func withSpanReader(fn func(r *spanReaderTest)) { reader: NewSpanReader(SpanReaderParams{ Client: client, Logger: zap.NewNop(), - MaxSpanAge: 0, IndexPrefix: "", TagDotReplacement: "@", }), @@ -116,7 +115,6 @@ func withArchiveSpanReader(readAlias bool, fn func(r *spanReaderTest)) { reader: NewSpanReader(SpanReaderParams{ Client: client, Logger: zap.NewNop(), - MaxSpanAge: 0, IndexPrefix: "", TagDotReplacement: "@", Archive: true, @@ -133,7 +131,6 @@ func TestNewSpanReader(t *testing.T) { reader := NewSpanReader(SpanReaderParams{ Client: client, Logger: zap.NewNop(), - MaxSpanAge: 0, MetricsFactory: metrics.NullFactory, IndexPrefix: ""}) assert.NotNil(t, reader) @@ -266,7 +263,7 @@ func TestSpanReader_multiRead_followUp_query(t *testing.T) { }, }, nil) - traces, err := r.reader.multiRead(context.Background(), []model.TraceID{{High: 0, Low: 1}, {High: 0, Low: 2}}, date, date) + traces, err := r.reader.multiRead(context.Background(), []model.TraceID{{High: 0, Low: 1}, {High: 0, Low: 2}}, date) require.NoError(t, err) require.NotNil(t, traces) require.Len(t, traces, 2) From d809da5cf0b178106723a6fd5b2d334f9533a829 Mon Sep 17 00:00:00 2001 From: Pavol Loffay Date: Mon, 9 Dec 2019 14:26:51 +0100 Subject: [PATCH 3/5] Fix dependencies index Signed-off-by: Pavol Loffay --- plugin/storage/es/dependencystore/storage.go | 4 ++-- plugin/storage/integration/elasticsearch_test.go | 4 +--- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/plugin/storage/es/dependencystore/storage.go b/plugin/storage/es/dependencystore/storage.go index 29973cdc9a4..74dada43aa7 100644 --- a/plugin/storage/es/dependencystore/storage.go +++ b/plugin/storage/es/dependencystore/storage.go @@ -52,7 +52,7 @@ func NewDependencyStore(client es.Client, logger *zap.Logger, indexPrefix string ctx: context.Background(), client: client, logger: logger, - index: prefix + dependencyIndex + "*", + index: prefix + dependencyIndex, } } @@ -83,7 +83,7 @@ func (s *DependencyStore) writeDependencies(indexName string, ts time.Time, depe // GetDependencies returns all interservice dependencies func (s *DependencyStore) GetDependencies(endTs time.Time, lookback time.Duration) ([]model.DependencyLink, error) { - searchResult, err := s.client.Search(s.index). + searchResult, err := s.client.Search(s.index + "*"). Size(10000). // the default elasticsearch allowed limit Query(buildTSQuery(endTs, lookback)). IgnoreUnavailable(true). diff --git a/plugin/storage/integration/elasticsearch_test.go b/plugin/storage/integration/elasticsearch_test.go index de138819330..2a4a50c4d3c 100644 --- a/plugin/storage/integration/elasticsearch_test.go +++ b/plugin/storage/integration/elasticsearch_test.go @@ -45,7 +45,6 @@ const ( queryURL = "http://" + queryHostPort indexPrefix = "integration-test" tagKeyDeDotChar = "@" - maxSpanAge = time.Hour * 72 ) type ESStorageIntegration struct { @@ -135,7 +134,6 @@ func (s *ESStorageIntegration) initSpanstore(allTagsAsFields, archive bool) erro Logger: s.logger, MetricsFactory: metrics.NullFactory, IndexPrefix: indexPrefix, - MaxSpanAge: maxSpanAge, TagDotReplacement: tagKeyDeDotChar, Archive: archive, }) @@ -195,7 +193,7 @@ func (s *StorageIntegration) testArchiveTrace(t *testing.T) { tID := model.NewTraceID(uint64(11), uint64(22)) expected := &model.Span{ OperationName: "archive_span", - StartTime: time.Now().Add(-maxSpanAge * 5), + StartTime: time.Now(), TraceID: tID, SpanID: model.NewSpanID(55), References: []model.SpanRef{}, From e1080aa2c31f42a7c69863c46c27b16731c78a43 Mon Sep 17 00:00:00 2001 From: Pavol Loffay Date: Mon, 9 Dec 2019 14:53:58 +0100 Subject: [PATCH 4/5] Use now instead of date in the past Signed-off-by: Pavol Loffay --- plugin/storage/es/spanstore/reader.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/plugin/storage/es/spanstore/reader.go b/plugin/storage/es/spanstore/reader.go index d488ff3ba68..e022477f86e 100644 --- a/plugin/storage/es/spanstore/reader.go +++ b/plugin/storage/es/spanstore/reader.go @@ -179,8 +179,7 @@ func indexNames(prefix, index string) string { func (s *SpanReader) GetTrace(ctx context.Context, traceID model.TraceID) (*model.Trace, error) { span, ctx := opentracing.StartSpanFromContext(ctx, "GetTrace") defer span.Finish() - currentTime := time.Now() - traces, err := s.multiRead(ctx, []model.TraceID{traceID}, currentTime.Add(-time.Hour*24*900)) + traces, err := s.multiRead(ctx, []model.TraceID{traceID}, time.Now()) if err != nil { return nil, err } @@ -306,7 +305,6 @@ func (s *SpanReader) multiRead(ctx context.Context, traceIDs []model.TraceID, st return []*model.Trace{}, nil } - // TODO // Add an hour in both directions so that traces that straddle two indexes are retrieved. // i.e starts in one and ends in another. index := s.timeRangeIndices(s.spanIndexPrefix) From 8797957713acd277ae966e27fc046fecfd786726 Mon Sep 17 00:00:00 2001 From: Pavol Loffay Date: Mon, 9 Dec 2019 15:16:51 +0100 Subject: [PATCH 5/5] fix test Signed-off-by: Pavol Loffay --- plugin/storage/es/dependencystore/storage_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/plugin/storage/es/dependencystore/storage_test.go b/plugin/storage/es/dependencystore/storage_test.go index c2facfd6621..2cb0c553cd7 100644 --- a/plugin/storage/es/dependencystore/storage_test.go +++ b/plugin/storage/es/dependencystore/storage_test.go @@ -67,7 +67,7 @@ func TestNewSpanReaderIndexPrefix(t *testing.T) { for _, testCase := range testCases { client := &mocks.Client{} r := NewDependencyStore(client, zap.NewNop(), testCase.prefix) - assert.Equal(t, testCase.expected+dependencyIndex+"*", r.index) + assert.Equal(t, testCase.expected+dependencyIndex, r.index) } }