Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use a single index with wildcard in Elasticsearch reader #1969

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 0 additions & 10 deletions pkg/es/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ type Configuration struct {
AllowTokenFromContext bool
Sniffer bool // https://github.com/olivere/elastic/wiki/Sniffing
MaxNumSpans int // defines maximum number of spans to fetch from storage per query
MaxSpanAge time.Duration `yaml:"max_span_age"` // configures the maximum lookback on span reads
NumShards int64 `yaml:"shards"`
NumReplicas int64 `yaml:"replicas"`
Timeout time.Duration `validate:"min=500"`
Expand Down Expand Up @@ -80,7 +79,6 @@ type ClientBuilder interface {
NewClient(logger *zap.Logger, metricsFactory metrics.Factory) (es.Client, error)
GetNumShards() int64
GetNumReplicas() int64
GetMaxSpanAge() time.Duration
GetMaxNumSpans() int
GetIndexPrefix() string
GetTagsFilePath() string
Expand Down Expand Up @@ -187,9 +185,6 @@ func (c *Configuration) ApplyDefaults(source *Configuration) {
if !c.Sniffer {
c.Sniffer = source.Sniffer
}
if c.MaxSpanAge == 0 {
c.MaxSpanAge = source.MaxSpanAge
}
if c.MaxNumSpans == 0 {
c.MaxNumSpans = source.MaxNumSpans
}
Expand Down Expand Up @@ -223,11 +218,6 @@ func (c *Configuration) GetNumReplicas() int64 {
return c.NumReplicas
}

// GetMaxSpanAge returns max span age from Configuration
func (c *Configuration) GetMaxSpanAge() time.Duration {
return c.MaxSpanAge
}

// GetMaxNumSpans returns max spans allowed per query from Configuration
func (c *Configuration) GetMaxNumSpans() int {
return c.MaxNumSpans
Expand Down
33 changes: 10 additions & 23 deletions plugin/storage/es/dependencystore/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,10 @@ const (

// DependencyStore handles all queries and insertions to ElasticSearch dependencies
type DependencyStore struct {
ctx context.Context
client es.Client
logger *zap.Logger
indexPrefix string
ctx context.Context
client es.Client
logger *zap.Logger
index string
}

// NewDependencyStore returns a DependencyStore
Expand All @@ -49,16 +49,16 @@ func NewDependencyStore(client es.Client, logger *zap.Logger, indexPrefix string
prefix = indexPrefix + "-"
}
return &DependencyStore{
ctx: context.Background(),
client: client,
logger: logger,
indexPrefix: prefix + dependencyIndex,
ctx: context.Background(),
client: client,
logger: logger,
index: prefix + dependencyIndex,
}
}

// WriteDependencies implements dependencystore.Writer#WriteDependencies.
func (s *DependencyStore) WriteDependencies(ts time.Time, dependencies []model.DependencyLink) error {
indexName := indexWithDate(s.indexPrefix, ts)
indexName := indexWithDate(s.index, ts)
if err := s.createIndex(indexName); err != nil {
return err
}
Expand All @@ -83,8 +83,7 @@ func (s *DependencyStore) writeDependencies(indexName string, ts time.Time, depe

// GetDependencies returns all interservice dependencies
func (s *DependencyStore) GetDependencies(endTs time.Time, lookback time.Duration) ([]model.DependencyLink, error) {
indices := getIndices(s.indexPrefix, endTs, lookback)
searchResult, err := s.client.Search(indices...).
searchResult, err := s.client.Search(s.index + "*").
Size(10000). // the default elasticsearch allowed limit
Query(buildTSQuery(endTs, lookback)).
IgnoreUnavailable(true).
Expand All @@ -110,18 +109,6 @@ func buildTSQuery(endTs time.Time, lookback time.Duration) elastic.Query {
return elastic.NewRangeQuery("timestamp").Gte(endTs.Add(-lookback)).Lte(endTs)
}

func getIndices(prefix string, ts time.Time, lookback time.Duration) []string {
var indices []string
firstIndex := indexWithDate(prefix, ts.Add(-lookback))
currentIndex := indexWithDate(prefix, ts)
for currentIndex != firstIndex {
indices = append(indices, currentIndex)
ts = ts.Add(-24 * time.Hour)
currentIndex = indexWithDate(prefix, ts)
}
return append(indices, firstIndex)
}

func indexWithDate(indexNamePrefix string, date time.Time) string {
return indexNamePrefix + date.UTC().Format("2006-01-02")
}
47 changes: 7 additions & 40 deletions plugin/storage/es/dependencystore/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func TestNewSpanReaderIndexPrefix(t *testing.T) {
for _, testCase := range testCases {
client := &mocks.Client{}
r := NewDependencyStore(client, zap.NewNop(), testCase.prefix)
assert.Equal(t, testCase.expected+dependencyIndex, r.indexPrefix)
assert.Equal(t, testCase.expected+dependencyIndex, r.index)
}
}

Expand Down Expand Up @@ -142,7 +142,7 @@ func TestGetDependencies(t *testing.T) {
expectedError string
expectedOutput []model.DependencyLink
indexPrefix string
indices []interface{}
index string
}{
{
searchResult: createSearchResult(goodDependencies),
Expand All @@ -153,31 +153,31 @@ func TestGetDependencies(t *testing.T) {
CallCount: 12,
},
},
indices: []interface{}{"jaeger-dependencies-1995-04-21", "jaeger-dependencies-1995-04-20"},
index: "jaeger-dependencies-*",
},
{
searchResult: createSearchResult(badDependencies),
expectedError: "Unmarshalling ElasticSearch documents failed",
indices: []interface{}{"jaeger-dependencies-1995-04-21", "jaeger-dependencies-1995-04-20"},
index: "jaeger-dependencies-*",
},
{
searchError: errors.New("search failure"),
expectedError: "Failed to search for dependencies: search failure",
indices: []interface{}{"jaeger-dependencies-1995-04-21", "jaeger-dependencies-1995-04-20"},
index: "jaeger-dependencies-*",
},
{
searchError: errors.New("search failure"),
expectedError: "Failed to search for dependencies: search failure",
indexPrefix: "foo",
indices: []interface{}{"foo-jaeger-dependencies-1995-04-21", "foo-jaeger-dependencies-1995-04-20"},
index: "foo-jaeger-dependencies-*",
},
}
for _, testCase := range testCases {
withDepStorage(testCase.indexPrefix, func(r *depStorageTest) {
fixedTime := time.Date(1995, time.April, 21, 4, 21, 19, 95, time.UTC)

searchService := &mocks.SearchService{}
r.client.On("Search", testCase.indices...).Return(searchService)
r.client.On("Search", testCase.index).Return(searchService)

searchService.On("Size", mock.Anything).Return(searchService)
searchService.On("Query", mock.Anything).Return(searchService)
Expand Down Expand Up @@ -206,39 +206,6 @@ func createSearchResult(dependencyLink string) *elastic.SearchResult {
return searchResult
}

func TestGetIndices(t *testing.T) {
fixedTime := time.Date(1995, time.April, 21, 4, 12, 19, 95, time.UTC)
testCases := []struct {
expected []string
lookback time.Duration
prefix string
}{
{
expected: []string{indexWithDate("", fixedTime), indexWithDate("", fixedTime.Add(-24*time.Hour))},
lookback: 23 * time.Hour,
prefix: "",
},
{
expected: []string{indexWithDate("", fixedTime), indexWithDate("", fixedTime.Add(-24*time.Hour))},
lookback: 13 * time.Hour,
prefix: "",
},
{
expected: []string{indexWithDate("foo:", fixedTime)},
lookback: 1 * time.Hour,
prefix: "foo:",
},
{
expected: []string{indexWithDate("foo-", fixedTime)},
lookback: 0,
prefix: "foo-",
},
}
for _, testCase := range testCases {
assert.EqualValues(t, testCase.expected, getIndices(testCase.prefix, fixedTime, testCase.lookback))
}
}

// stringMatcher can match a string argument when it contains a specific substring q
func stringMatcher(q string) interface{} {
matchFunc := func(s string) bool {
Expand Down
1 change: 0 additions & 1 deletion plugin/storage/es/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,6 @@ func createSpanReader(
Logger: logger,
MetricsFactory: mFactory,
MaxNumSpans: cfg.GetMaxNumSpans(),
MaxSpanAge: cfg.GetMaxSpanAge(),
IndexPrefix: cfg.GetIndexPrefix(),
TagDotReplacement: cfg.GetTagDotReplacement(),
UseReadWriteAliases: cfg.GetUseReadWriteAliases(),
Expand Down
9 changes: 3 additions & 6 deletions plugin/storage/es/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,6 @@ func NewOptions(primaryNamespace string, otherNamespaces ...string) *Options {
Username: "",
Password: "",
Sniffer: false,
MaxSpanAge: 72 * time.Hour,
MaxNumSpans: 10000,
NumShards: 5,
NumReplicas: 1,
Expand Down Expand Up @@ -147,8 +146,8 @@ func addFlags(flagSet *flag.FlagSet, nsConfig *namespaceConfig) {
"Timeout used for queries. A Timeout of zero means no timeout")
flagSet.Duration(
nsConfig.namespace+suffixMaxSpanAge,
nsConfig.MaxSpanAge,
"The maximum lookback for spans in Elasticsearch")
time.Hour*72,
"(deprecated) The maximum lookback for spans in Elasticsearch. Now all indices are searched.")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems confusing. Can we add the message Now all indices are searched in the parenthesis?

flagSet.Int(
nsConfig.namespace+suffixMaxNumSpans,
nsConfig.MaxNumSpans,
Expand Down Expand Up @@ -217,8 +216,7 @@ func addFlags(flagSet *flag.FlagSet, nsConfig *namespaceConfig) {
nsConfig.namespace+suffixReadAlias,
nsConfig.UseReadWriteAliases,
"(experimental) Use read and write aliases for indices. Use this option with Elasticsearch rollover "+
"API. It requires an external component to create aliases before startup and then performing its management. "+
"Note that "+nsConfig.namespace+suffixMaxSpanAge+" is not taken into the account and has to be substituted by external component managing read alias.")
"API. It requires an external component to create aliases before startup and then performing its management.")
flagSet.Bool(
nsConfig.namespace+suffixCreateIndexTemplate,
nsConfig.CreateIndexTemplates,
Expand Down Expand Up @@ -249,7 +247,6 @@ func initFromViper(cfg *namespaceConfig, v *viper.Viper) {
cfg.TokenFilePath = v.GetString(cfg.namespace + suffixTokenPath)
cfg.Sniffer = v.GetBool(cfg.namespace + suffixSniffer)
cfg.servers = stripWhiteSpace(v.GetString(cfg.namespace + suffixServerURLs))
cfg.MaxSpanAge = v.GetDuration(cfg.namespace + suffixMaxSpanAge)
cfg.MaxNumSpans = v.GetInt(cfg.namespace + suffixMaxNumSpans)
cfg.NumShards = v.GetInt64(cfg.namespace + suffixNumShards)
cfg.NumReplicas = v.GetInt64(cfg.namespace + suffixNumReplicas)
Expand Down
5 changes: 0 additions & 5 deletions plugin/storage/es/options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package es

import (
"testing"
"time"

"github.com/stretchr/testify/assert"

Expand All @@ -32,7 +31,6 @@ func TestOptions(t *testing.T) {
assert.NotEmpty(t, primary.Servers)
assert.Equal(t, int64(5), primary.NumShards)
assert.Equal(t, int64(1), primary.NumReplicas)
assert.Equal(t, 72*time.Hour, primary.MaxSpanAge)
assert.False(t, primary.Sniffer)

aux := opts.Get("archive")
Expand All @@ -50,7 +48,6 @@ func TestOptionsWithFlags(t *testing.T) {
"--es.password=world",
"--es.token-file=/foo/bar",
"--es.sniffer=true",
"--es.max-span-age=48h",
"--es.num-shards=20",
"--es.num-replicas=10",
// a couple overrides
Expand All @@ -66,7 +63,6 @@ func TestOptionsWithFlags(t *testing.T) {
assert.Equal(t, "hello", primary.Username)
assert.Equal(t, "/foo/bar", primary.TokenFilePath)
assert.Equal(t, []string{"1.1.1.1", "2.2.2.2"}, primary.Servers)
assert.Equal(t, 48*time.Hour, primary.MaxSpanAge)
assert.True(t, primary.Sniffer)
assert.Equal(t, true, primary.TLS.Enabled)
assert.Equal(t, true, primary.TLS.SkipHostVerify)
Expand All @@ -77,7 +73,6 @@ func TestOptionsWithFlags(t *testing.T) {
assert.Equal(t, "world", aux.Password)
assert.Equal(t, int64(20), aux.NumShards)
assert.Equal(t, int64(10), aux.NumReplicas)
assert.Equal(t, 24*time.Hour, aux.MaxSpanAge)
assert.True(t, aux.Sniffer)

}
Loading