Skip to content

Commit

Permalink
Make MaxNumSpans CLI configurable (#1283)
Browse files Browse the repository at this point in the history
Signed-off-by: Annanay <[email protected]>
  • Loading branch information
annanay25 authored and black-adder committed Jan 17, 2019
1 parent 48b034c commit d64cb70
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 1 deletion.
10 changes: 10 additions & 0 deletions pkg/es/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ type Configuration struct {
Username string
Password string
Sniffer bool // https://github.com/olivere/elastic/wiki/Sniffing
MaxNumSpans int // defines maximum number of spans to fetch from storage per query
MaxSpanAge time.Duration `yaml:"max_span_age"` // configures the maximum lookback on span reads
NumShards int64 `yaml:"shards"`
NumReplicas int64 `yaml:"replicas"`
Expand Down Expand Up @@ -68,6 +69,7 @@ type ClientBuilder interface {
GetNumShards() int64
GetNumReplicas() int64
GetMaxSpanAge() time.Duration
GetMaxNumSpans() int
GetIndexPrefix() string
GetTagsFilePath() string
GetAllTagsAsFields() bool
Expand Down Expand Up @@ -151,6 +153,9 @@ func (c *Configuration) ApplyDefaults(source *Configuration) {
if c.MaxSpanAge == 0 {
c.MaxSpanAge = source.MaxSpanAge
}
if c.MaxNumSpans == 0 {
c.MaxNumSpans = source.MaxNumSpans
}
if c.NumShards == 0 {
c.NumShards = source.NumShards
}
Expand Down Expand Up @@ -186,6 +191,11 @@ func (c *Configuration) GetMaxSpanAge() time.Duration {
return c.MaxSpanAge
}

// GetMaxNumSpans returns max spans allowed per query from Configuration
func (c *Configuration) GetMaxNumSpans() int {
return c.MaxNumSpans
}

// GetIndexPrefix returns index prefix
func (c *Configuration) GetIndexPrefix() string {
return c.IndexPrefix
Expand Down
1 change: 1 addition & 0 deletions plugin/storage/es/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ func (f *Factory) CreateSpanReader() (spanstore.Reader, error) {
Logger: f.logger,
MetricsFactory: f.metricsFactory,
MaxSpanAge: cfg.GetMaxSpanAge(),
MaxNumSpans: cfg.GetMaxNumSpans(),
IndexPrefix: cfg.GetIndexPrefix(),
TagDotReplacement: cfg.GetTagDotReplacement(),
}), nil
Expand Down
7 changes: 7 additions & 0 deletions plugin/storage/es/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ const (
suffixSniffer = ".sniffer"
suffixServerURLs = ".server-urls"
suffixMaxSpanAge = ".max-span-age"
suffixMaxNumSpans = ".max-num-spans"
suffixNumShards = ".num-shards"
suffixNumReplicas = ".num-replicas"
suffixBulkSize = ".bulk.size"
Expand Down Expand Up @@ -78,6 +79,7 @@ func NewOptions(primaryNamespace string, otherNamespaces ...string) *Options {
Password: "",
Sniffer: false,
MaxSpanAge: 72 * time.Hour,
MaxNumSpans: 10000,
NumShards: 5,
NumReplicas: 1,
BulkSize: 5 * 1000 * 1000,
Expand Down Expand Up @@ -132,6 +134,10 @@ func addFlags(flagSet *flag.FlagSet, nsConfig *namespaceConfig) {
nsConfig.namespace+suffixMaxSpanAge,
nsConfig.MaxSpanAge,
"The maximum lookback for spans in ElasticSearch")
flagSet.Int(
nsConfig.namespace+suffixMaxNumSpans,
nsConfig.MaxNumSpans,
"The maximum number of spans to fetch at a time per query in Elasticsearch")
flagSet.Int64(
nsConfig.namespace+suffixNumShards,
nsConfig.NumShards,
Expand Down Expand Up @@ -204,6 +210,7 @@ func initFromViper(cfg *namespaceConfig, v *viper.Viper) {
cfg.Sniffer = v.GetBool(cfg.namespace + suffixSniffer)
cfg.servers = v.GetString(cfg.namespace + suffixServerURLs)
cfg.MaxSpanAge = v.GetDuration(cfg.namespace + suffixMaxSpanAge)
cfg.MaxNumSpans = v.GetInt(cfg.namespace + suffixMaxNumSpans)
cfg.NumShards = v.GetInt64(cfg.namespace + suffixNumShards)
cfg.NumReplicas = v.GetInt64(cfg.namespace + suffixNumReplicas)
cfg.BulkSize = v.GetInt(cfg.namespace + suffixBulkSize)
Expand Down
5 changes: 4 additions & 1 deletion plugin/storage/es/spanstore/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ type SpanReader struct {
// The age of the oldest service/operation we will look for. Because indices in ElasticSearch are by day,
// this will be rounded down to UTC 00:00 of that day.
maxSpanAge time.Duration
maxNumSpans int
serviceOperationStorage *ServiceOperationStorage
spanIndexPrefix string
serviceIndexPrefix string
Expand All @@ -103,6 +104,7 @@ type SpanReaderParams struct {
Client es.Client
Logger *zap.Logger
MaxSpanAge time.Duration
MaxNumSpans int
MetricsFactory metrics.Factory
serviceOperationStorage *ServiceOperationStorage
IndexPrefix string
Expand All @@ -124,6 +126,7 @@ func newSpanReader(p SpanReaderParams) *SpanReader {
client: p.Client,
logger: p.Logger,
maxSpanAge: p.MaxSpanAge,
maxNumSpans: p.MaxNumSpans,
serviceOperationStorage: NewServiceOperationStorage(ctx, p.Client, p.Logger, 0), // the decorator takes care of metrics
spanIndexPrefix: p.IndexPrefix + spanIndex,
serviceIndexPrefix: p.IndexPrefix + serviceIndex,
Expand Down Expand Up @@ -270,7 +273,7 @@ func (s *SpanReader) multiRead(ctx context.Context, traceIDs []string, startTime
if val, ok := searchAfterTime[traceID]; ok {
nextTime = val
}
searchRequests[i] = elastic.NewSearchRequest().IgnoreUnavailable(true).Type(spanType).Source(elastic.NewSearchSource().Query(query).Size(defaultDocCount).Sort("startTime", true).SearchAfter(nextTime))
searchRequests[i] = elastic.NewSearchRequest().IgnoreUnavailable(true).Type(spanType).Source(elastic.NewSearchSource().Query(query).Size(defaultDocCount).TerminateAfter(s.maxNumSpans).Sort("startTime", true).SearchAfter(nextTime))
}
// set traceIDs to empty
traceIDs = nil
Expand Down

0 comments on commit d64cb70

Please sign in to comment.