diff --git a/pkg/es/config/config.go b/pkg/es/config/config.go index e53c806ae50..13749781966 100644 --- a/pkg/es/config/config.go +++ b/pkg/es/config/config.go @@ -39,6 +39,7 @@ type Configuration struct { Username string Password string Sniffer bool // https://github.com/olivere/elastic/wiki/Sniffing + MaxNumSpans int // defines maximum number of spans to fetch from storage per query MaxSpanAge time.Duration `yaml:"max_span_age"` // configures the maximum lookback on span reads NumShards int64 `yaml:"shards"` NumReplicas int64 `yaml:"replicas"` @@ -68,6 +69,7 @@ type ClientBuilder interface { GetNumShards() int64 GetNumReplicas() int64 GetMaxSpanAge() time.Duration + GetMaxNumSpans() int GetIndexPrefix() string GetTagsFilePath() string GetAllTagsAsFields() bool @@ -151,6 +153,9 @@ func (c *Configuration) ApplyDefaults(source *Configuration) { if c.MaxSpanAge == 0 { c.MaxSpanAge = source.MaxSpanAge } + if c.MaxNumSpans == 0 { + c.MaxNumSpans = source.MaxNumSpans + } if c.NumShards == 0 { c.NumShards = source.NumShards } @@ -186,6 +191,11 @@ func (c *Configuration) GetMaxSpanAge() time.Duration { return c.MaxSpanAge } +// GetMaxNumSpans returns max spans allowed per query from Configuration +func (c *Configuration) GetMaxNumSpans() int { + return c.MaxNumSpans +} + // GetIndexPrefix returns index prefix func (c *Configuration) GetIndexPrefix() string { return c.IndexPrefix diff --git a/plugin/storage/es/factory.go b/plugin/storage/es/factory.go index 75ae4e6ccb8..dc1480b2527 100644 --- a/plugin/storage/es/factory.go +++ b/plugin/storage/es/factory.go @@ -83,6 +83,7 @@ func (f *Factory) CreateSpanReader() (spanstore.Reader, error) { Logger: f.logger, MetricsFactory: f.metricsFactory, MaxSpanAge: cfg.GetMaxSpanAge(), + MaxNumSpans: cfg.GetMaxNumSpans(), IndexPrefix: cfg.GetIndexPrefix(), TagDotReplacement: cfg.GetTagDotReplacement(), }), nil diff --git a/plugin/storage/es/options.go b/plugin/storage/es/options.go index 8d85ba0220e..df7b57330b3 100644 --- a/plugin/storage/es/options.go +++ b/plugin/storage/es/options.go @@ -30,6 +30,7 @@ const ( suffixSniffer = ".sniffer" suffixServerURLs = ".server-urls" suffixMaxSpanAge = ".max-span-age" + suffixMaxNumSpans = ".max-num-spans" suffixNumShards = ".num-shards" suffixNumReplicas = ".num-replicas" suffixBulkSize = ".bulk.size" @@ -78,6 +79,7 @@ func NewOptions(primaryNamespace string, otherNamespaces ...string) *Options { Password: "", Sniffer: false, MaxSpanAge: 72 * time.Hour, + MaxNumSpans: 10000, NumShards: 5, NumReplicas: 1, BulkSize: 5 * 1000 * 1000, @@ -132,6 +134,10 @@ func addFlags(flagSet *flag.FlagSet, nsConfig *namespaceConfig) { nsConfig.namespace+suffixMaxSpanAge, nsConfig.MaxSpanAge, "The maximum lookback for spans in ElasticSearch") + flagSet.Int( + nsConfig.namespace+suffixMaxNumSpans, + nsConfig.MaxNumSpans, + "The maximum number of spans to fetch at a time per query in Elasticsearch") flagSet.Int64( nsConfig.namespace+suffixNumShards, nsConfig.NumShards, @@ -204,6 +210,7 @@ func initFromViper(cfg *namespaceConfig, v *viper.Viper) { cfg.Sniffer = v.GetBool(cfg.namespace + suffixSniffer) cfg.servers = v.GetString(cfg.namespace + suffixServerURLs) cfg.MaxSpanAge = v.GetDuration(cfg.namespace + suffixMaxSpanAge) + cfg.MaxNumSpans = v.GetInt(cfg.namespace + suffixMaxNumSpans) cfg.NumShards = v.GetInt64(cfg.namespace + suffixNumShards) cfg.NumReplicas = v.GetInt64(cfg.namespace + suffixNumReplicas) cfg.BulkSize = v.GetInt(cfg.namespace + suffixBulkSize) diff --git a/plugin/storage/es/spanstore/reader.go b/plugin/storage/es/spanstore/reader.go index b42e97cef1f..aa3a229f125 100644 --- a/plugin/storage/es/spanstore/reader.go +++ b/plugin/storage/es/spanstore/reader.go @@ -92,6 +92,7 @@ type SpanReader struct { // The age of the oldest service/operation we will look for. Because indices in ElasticSearch are by day, // this will be rounded down to UTC 00:00 of that day. maxSpanAge time.Duration + maxNumSpans int serviceOperationStorage *ServiceOperationStorage spanIndexPrefix string serviceIndexPrefix string @@ -103,6 +104,7 @@ type SpanReaderParams struct { Client es.Client Logger *zap.Logger MaxSpanAge time.Duration + MaxNumSpans int MetricsFactory metrics.Factory serviceOperationStorage *ServiceOperationStorage IndexPrefix string @@ -124,6 +126,7 @@ func newSpanReader(p SpanReaderParams) *SpanReader { client: p.Client, logger: p.Logger, maxSpanAge: p.MaxSpanAge, + maxNumSpans: p.MaxNumSpans, serviceOperationStorage: NewServiceOperationStorage(ctx, p.Client, p.Logger, 0), // the decorator takes care of metrics spanIndexPrefix: p.IndexPrefix + spanIndex, serviceIndexPrefix: p.IndexPrefix + serviceIndex, @@ -270,7 +273,7 @@ func (s *SpanReader) multiRead(ctx context.Context, traceIDs []string, startTime if val, ok := searchAfterTime[traceID]; ok { nextTime = val } - searchRequests[i] = elastic.NewSearchRequest().IgnoreUnavailable(true).Type(spanType).Source(elastic.NewSearchSource().Query(query).Size(defaultDocCount).Sort("startTime", true).SearchAfter(nextTime)) + searchRequests[i] = elastic.NewSearchRequest().IgnoreUnavailable(true).Type(spanType).Source(elastic.NewSearchSource().Query(query).Size(defaultDocCount).TerminateAfter(s.maxNumSpans).Sort("startTime", true).SearchAfter(nextTime)) } // set traceIDs to empty traceIDs = nil