Skip to content

Commit

Permalink
Make ES number of shards and replicas configurable (#303)
Browse files Browse the repository at this point in the history
  • Loading branch information
mh-park authored Aug 2, 2017
1 parent 77dc8bd commit 464471d
Show file tree
Hide file tree
Showing 6 changed files with 90 additions and 14 deletions.
8 changes: 7 additions & 1 deletion cmd/collector/app/builder/span_handler_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,13 @@ func (e *esSpanHandlerBuilder) BuildHandlers() (app.ZipkinSpansHandler, app.Jaeg
if err != nil {
return nil, nil, err
}
spanStore := esSpanstore.NewSpanWriter(client, e.logger, e.metricsFactory)
spanStore := esSpanstore.NewSpanWriter(
client,
e.logger,
e.metricsFactory,
e.configuration.NumShards,
e.configuration.NumReplicas,
)

return buildHandlers(spanStore, e.logger, e.metricsFactory)
}
Expand Down
14 changes: 8 additions & 6 deletions pkg/es/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,15 @@ import (
"github.com/uber/jaeger/pkg/es"
)

// Configuration describes the configuration properties needed to connect to a ElasticSearch cluster
// Configuration describes the configuration properties needed to connect to an ElasticSearch cluster
type Configuration struct {
Servers []string
Username string
Password string
Sniffer bool // https://github.com/olivere/elastic/wiki/Sniffing
MaxSpanAge time.Duration // configures the maximum lookback on span reads
Servers []string
Username string
Password string
Sniffer bool // https://github.com/olivere/elastic/wiki/Sniffing
MaxSpanAge time.Duration `yaml:"max_span_age"` // configures the maximum lookback on span reads
NumShards int64 `yaml:"shards"`
NumReplicas int64 `yaml:"replicas"`
}

// NewClient creates a new ElasticSearch client
Expand Down
2 changes: 2 additions & 0 deletions plugin/storage/es/spanstore/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import "fmt"
// TODO: resolve traceID concerns (may not require any changes here)
const mapping = `{
"settings":{
"index.number_of_shards": ${__NUMBER_OF_SHARDS__},
"index.number_of_replicas": ${__NUMBER_OF_REPLICAS__},
"index.mapping.nested_fields.limit":50,
"index.requests.cache.enable":true,
"index.mapper.dynamic":false
Expand Down
31 changes: 29 additions & 2 deletions plugin/storage/es/spanstore/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ package spanstore

import (
"context"
"strconv"
"strings"
"time"

"github.com/pkg/errors"
Expand All @@ -39,6 +41,9 @@ import (
const (
spanType = "span"
serviceType = "service"

defaultNumShards = 5
defaultNumReplicas = 2
)

type spanWriterMetrics struct {
Expand All @@ -56,6 +61,8 @@ type SpanWriter struct {
writerMetrics spanWriterMetrics // TODO: build functions to wrap around each Do fn
indexCache cache.Cache
serviceWriter serviceWriter
numShards int64
numReplicas int64
}

// Service is the JSON struct for service:operation documents in ElasticSearch
Expand All @@ -65,8 +72,20 @@ type Service struct {
}

// NewSpanWriter creates a new SpanWriter for use
func NewSpanWriter(client es.Client, logger *zap.Logger, metricsFactory metrics.Factory) *SpanWriter {
func NewSpanWriter(
client es.Client,
logger *zap.Logger,
metricsFactory metrics.Factory,
numShards int64,
numReplicas int64,
) *SpanWriter {
ctx := context.Background()
if numShards == 0 {
numShards = defaultNumShards
}
if numReplicas == 0 {
numReplicas = defaultNumReplicas
}
// TODO: Configurable TTL
serviceOperationStorage := NewServiceOperationStorage(ctx, client, metricsFactory, logger, time.Hour*12)
return &SpanWriter{
Expand All @@ -84,6 +103,8 @@ func NewSpanWriter(client es.Client, logger *zap.Logger, metricsFactory metrics.
TTL: 48 * time.Hour,
},
),
numShards: numShards,
numReplicas: numReplicas,
}
}

Expand Down Expand Up @@ -121,7 +142,7 @@ func (s *SpanWriter) createIndex(indexName string, mapping string, jsonSpan *jMo
// if there are multiple collectors writing to the same elasticsearch host, if the collectors pass
// the exists check above and try to create the same index all at once, this might fail and
// drop a couple spans (~1 per collector). Creating indices ahead of time alleviates this issue.
_, err := s.client.CreateIndex(indexName).Body(mapping).Do(s.ctx)
_, err := s.client.CreateIndex(indexName).Body(s.fixMapping(mapping)).Do(s.ctx)
s.writerMetrics.indexCreate.Emit(err, time.Since(start))
if err != nil {
return s.logError(jsonSpan, err, "Failed to create index", s.logger)
Expand All @@ -140,6 +161,12 @@ func writeCache(key string, c cache.Cache) {
c.Put(key, key)
}

func (s *SpanWriter) fixMapping(mapping string) string {
mapping = strings.Replace(mapping, "${__NUMBER_OF_SHARDS__}", strconv.FormatInt(s.numShards, 10), 1)
mapping = strings.Replace(mapping, "${__NUMBER_OF_REPLICAS__}", strconv.FormatInt(s.numReplicas, 10), 1)
return mapping
}

func (s *SpanWriter) writeService(indexName string, jsonSpan *jModel.Span) error {
return s.serviceWriter(indexName, jsonSpan)
}
Expand Down
47 changes: 43 additions & 4 deletions plugin/storage/es/spanstore/writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func withSpanWriter(fn func(w *spanWriterTest)) {
client: client,
logger: logger,
logBuffer: logBuffer,
writer: NewSpanWriter(client, logger, metricsFactory),
writer: NewSpanWriter(client, logger, metricsFactory, 0, 0),
}
fn(w)
}
Expand Down Expand Up @@ -178,11 +178,11 @@ func TestSpanWriter_WriteSpan(t *testing.T) {
spanExistsService.On("Do", mock.AnythingOfType("*context.emptyCtx")).Return(testCase.spanIndexExists, nil)

serviceCreateService := &mocks.IndicesCreateService{}
serviceCreateService.On("Body", stringMatcher(serviceMapping)).Return(serviceCreateService)
serviceCreateService.On("Body", stringMatcher(w.writer.fixMapping(serviceMapping))).Return(serviceCreateService)
serviceCreateService.On("Do", mock.AnythingOfType("*context.emptyCtx")).Return(nil, testCase.serviceIndexCreateError)

spanCreateService := &mocks.IndicesCreateService{}
spanCreateService.On("Body", stringMatcher(spanMapping)).Return(spanCreateService)
spanCreateService.On("Body", stringMatcher(w.writer.fixMapping(spanMapping))).Return(spanCreateService)
spanCreateService.On("Do", mock.AnythingOfType("*context.emptyCtx")).Return(nil, testCase.spanIndexCreateError)

indexService := &mocks.IndexService{}
Expand Down Expand Up @@ -272,7 +272,7 @@ func TestCheckAndCreateIndex(t *testing.T) {
existsService.On("Do", mock.AnythingOfType("*context.emptyCtx")).Return(testCase.indexExists, testCase.indexExistsError)

createService := &mocks.IndicesCreateService{}
createService.On("Body", stringMatcher(spanMapping)).Return(createService)
createService.On("Body", stringMatcher(w.writer.fixMapping(spanMapping))).Return(createService)
createService.On("Do", mock.AnythingOfType("*context.emptyCtx")).Return(testCase.createResult, testCase.createError)

indexName := "jaeger-1995-04-21"
Expand Down Expand Up @@ -306,6 +306,45 @@ func TestCheckAndCreateIndex(t *testing.T) {
}
}

func TestFixMapping(t *testing.T) {
withSpanWriter(func(w *spanWriterTest) {
testMapping := `{
"settings":{
"index.number_of_shards": ${__NUMBER_OF_SHARDS__},
"index.number_of_replicas": ${__NUMBER_OF_REPLICAS__},
"index.mapping.nested_fields.limit":50,
"index.requests.cache.enable":true,
"index.mapper.dynamic":false
},
"mappings":{
"_default_":{
"_all":{
"enabled":false
}
}
}
}`
expectedMapping := `{
"settings":{
"index.number_of_shards": 5,
"index.number_of_replicas": 2,
"index.mapping.nested_fields.limit":50,
"index.requests.cache.enable":true,
"index.mapper.dynamic":false
},
"mappings":{
"_default_":{
"_all":{
"enabled":false
}
}
}
}`

assert.Equal(t, expectedMapping, w.writer.fixMapping(testMapping))
})
}

func TestWriteSpanInternal(t *testing.T) {
withSpanWriter(func(w *spanWriterTest) {
indexService := &mocks.IndexService{}
Expand Down
2 changes: 1 addition & 1 deletion plugin/storage/integration/es_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func (s *ESStorageIntegration) esCleanUp() error {

func (s *ESStorageIntegration) initSpanstore() {
client := es.WrapESClient(s.client)
s.spanWriter = spanstore.NewSpanWriter(client, s.logger, metrics.NullFactory)
s.spanWriter = spanstore.NewSpanWriter(client, s.logger, metrics.NullFactory, 0, 0)
s.spanReader = spanstore.NewSpanReader(client, s.logger, 72*time.Hour, metrics.NullFactory)
}

Expand Down

0 comments on commit 464471d

Please sign in to comment.