Skip to content

Commit

Permalink
Add Elasticsearch 7 support (#1690)
Browse files Browse the repository at this point in the history
* Make it possible to run es-integration-test against different versions of elasticsearch

A convenient way to run the elasticsearch integration test is to just run ./scripts/travis/es-integration-test.sh

This change allows you to specify different elasticsearch versions

eg

ES_VERSION=5.6.12 scripts/travis/es-integration-test.sh
ES_VERSION=6.8.1 scripts/travis/es-integration-test.sh
ES_VERSION=7.2.0 scripts/travis/es-integration-test.sh

The default version to use for the tests is currently 5.6.12

Signed-off-by: Greg Franklin <[email protected]>

* Update github.com/olivere/elastic to 6.2.21

Signed-off-by: Greg Franklin <[email protected]>

* Migrate to a single '_doc' elasticsearch document type

Signed-off-by: Greg Franklin <[email protected]>

* Add IncludeTypeName for compatability with elasticsearch 7.x

Signed-off-by: Greg Franklin <[email protected]>

* Remove deprecated __default__ field from es mappings

Signed-off-by: Greg Franklin <[email protected]>

* Rebase on master.  Fix for ES5

Signed-off-by: Greg Franklin <[email protected]>

* Update esRollover.py to use per-version templates

Signed-off-by: Greg Franklin <[email protected]>

* Update github.com/olivere/elastic to 6.2.22 and use RestTotalHitsAsInt against ES7

Signed-off-by: Greg Franklin <[email protected]>

* esRollover.py should set '?include_type_name=true' when creating templates for ES7

Signed-off-by: Greg Franklin <[email protected]>

* Check loading of all versions of the ES mappings

Signed-off-by: Greg Franklin <[email protected]>

* Run es-integration-test.sh against ES5, ES6 and ES7

Signed-off-by: Greg Franklin <[email protected]>

* Run 'make fmt' to add license text to mocks

Signed-off-by: Greg Franklin <[email protected]>

* Update elasticsearch versions used for integration tests (and add --rm to docker commands)

Signed-off-by: Greg Franklin <[email protected]>

* Do not use IncludeTypeName on ES7

Without IncludeTypeName, ES will use the type "_doc" all documents.

Type is removed from all search queries so that it queries all types
whether they be "_doc" or (eg) "span".

Type should not be used when indexing documents on ES7 (so that the
default "_doc" type is used).  It should, however, be used on ES5 and ES6
so that the type matches that described by the mapping.

Signed-off-by: Greg Franklin <[email protected]>

* Log elasticsearch version

Signed-off-by: Greg Franklin <[email protected]>

* Make sure we can get the elasticsearch version on servers other than localhost

Signed-off-by: Greg Franklin <[email protected]>

* Fix es dependency storage test

Signed-off-by: Greg Franklin <[email protected]>

* Fix token propagation test for elasticsearch version detection

The query service needs to detect the elasticsearch version to start up.

Elasticsearch must therefore be started before the query service and must
return a version number in response to a ping request

Signed-off-by: Greg Franklin <[email protected]>

* Fix lint failure

plugin/storage/es/dependencystore/schema.go:37:9: if block ends with a return statement, so drop this else and outdent its block

Signed-off-by: Greg Franklin <[email protected]>

* Use the same file for elasticsearch mappings on ES5 and ES6.  Only ES7 needs to be different

Signed-off-by: Greg Franklin <[email protected]>

* Change log message to use structured logging

Signed-off-by: Greg Franklin <[email protected]>
  • Loading branch information
gregoryfranklin authored and pavolloffay committed Aug 22, 2019
1 parent 0ef9a02 commit 4bf5e9b
Show file tree
Hide file tree
Showing 28 changed files with 450 additions and 110 deletions.
26 changes: 13 additions & 13 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions Gopkg.toml
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,8 @@ required = [
version = "=1.20.1"

[[constraint]]
name = "gopkg.in/olivere/elastic.v5"
version = "^5.0.53"
name = "github.com/olivere/elastic"
version = "^6.2.22"

[[constraint]]
name = "github.com/gocql/gocql"
Expand Down
4 changes: 2 additions & 2 deletions pkg/es/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (
"context"
"io"

"gopkg.in/olivere/elastic.v5"
"github.com/olivere/elastic"
)

// Client is an abstraction for elastic.Client
Expand All @@ -31,6 +31,7 @@ type Client interface {
Search(indices ...string) SearchService
MultiSearch() MultiSearchService
io.Closer
GetVersion() int
}

// IndicesExistsService is an abstraction for elastic.IndicesExistsService
Expand Down Expand Up @@ -61,7 +62,6 @@ type IndexService interface {

// SearchService is an abstraction for elastic.SearchService
type SearchService interface {
Type(typ string) SearchService
Size(size int) SearchService
Aggregation(name string, aggregation elastic.Aggregation) SearchService
IgnoreUnavailable(ignoreUnavailable bool) SearchService
Expand Down
17 changes: 15 additions & 2 deletions pkg/es/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,15 @@ import (
"io/ioutil"
"net/http"
"path/filepath"
"strconv"
"strings"
"sync"
"time"

"github.com/olivere/elastic"
"github.com/pkg/errors"
"github.com/uber/jaeger-lib/metrics"
"go.uber.org/zap"
"gopkg.in/olivere/elastic.v5"

"github.com/jaegertracing/jaeger/pkg/es"
"github.com/jaegertracing/jaeger/pkg/es/wrapper"
Expand Down Expand Up @@ -155,7 +156,19 @@ func (c *Configuration) NewClient(logger *zap.Logger, metricsFactory metrics.Fac
if err != nil {
return nil, err
}
return eswrapper.WrapESClient(rawClient, service), nil

// Determine ElasticSearch Version
pingResult, _, err := rawClient.Ping(c.Servers[0]).Do(context.Background())
if err != nil {
return nil, err
}
esVersion, err := strconv.Atoi(string(pingResult.Version.Number[0]))
if err != nil {
return nil, err
}
logger.Info("Elasticsearch detected", zap.Int("version", esVersion))

return eswrapper.WrapESClient(rawClient, service, esVersion), nil
}

// ApplyDefaults copies settings from source unless its own value is non-zero.
Expand Down
14 changes: 14 additions & 0 deletions pkg/es/mocks/Client.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pkg/es/mocks/IndicesCreateService.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pkg/es/mocks/MultiSearchService.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 1 addition & 17 deletions pkg/es/mocks/SearchService.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pkg/es/mocks/TemplateCreateService.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

43 changes: 28 additions & 15 deletions pkg/es/wrapper/wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package eswrapper
import (
"context"

"gopkg.in/olivere/elastic.v5"
"github.com/olivere/elastic"

"github.com/jaegertracing/jaeger/pkg/es"
)
Expand All @@ -29,11 +29,17 @@ import (
type ClientWrapper struct {
client *elastic.Client
bulkService *elastic.BulkProcessor
esVersion int
}

// GetVersion returns the ElasticSearch Version
func (c ClientWrapper) GetVersion() int {
return c.esVersion
}

// WrapESClient creates a ESClient out of *elastic.Client.
func WrapESClient(client *elastic.Client, s *elastic.BulkProcessor) ClientWrapper {
return ClientWrapper{client: client, bulkService: s}
func WrapESClient(client *elastic.Client, s *elastic.BulkProcessor, esVersion int) ClientWrapper {
return ClientWrapper{client: client, bulkService: s, esVersion: esVersion}
}

// IndexExists calls this function to internal client.
Expand All @@ -54,17 +60,25 @@ func (c ClientWrapper) CreateTemplate(ttype string) es.TemplateCreateService {
// Index calls this function to internal client.
func (c ClientWrapper) Index() es.IndexService {
r := elastic.NewBulkIndexRequest()
return WrapESIndexService(r, c.bulkService)
return WrapESIndexService(r, c.bulkService, c.esVersion)
}

// Search calls this function to internal client.
func (c ClientWrapper) Search(indices ...string) es.SearchService {
return WrapESSearchService(c.client.Search(indices...))
searchService := c.client.Search(indices...)
if c.esVersion == 7 {
searchService = searchService.RestTotalHitsAsInt(true)
}
return WrapESSearchService(searchService)
}

// MultiSearch calls this function to internal client.
func (c ClientWrapper) MultiSearch() es.MultiSearchService {
return WrapESMultiSearchService(c.client.MultiSearch())
multiSearchService := c.client.MultiSearch()
if c.esVersion == 7 {
multiSearchService = multiSearchService.RestTotalHitsAsInt(true)
}
return WrapESMultiSearchService(multiSearchService)
}

// Close closes ESClient and flushes all data to the storage.
Expand Down Expand Up @@ -138,21 +152,25 @@ func (c TemplateCreateServiceWrapper) Do(ctx context.Context) (*elastic.IndicesP
type IndexServiceWrapper struct {
bulkIndexReq *elastic.BulkIndexRequest
bulkService *elastic.BulkProcessor
esVersion int
}

// WrapESIndexService creates an ESIndexService out of *elastic.ESIndexService.
func WrapESIndexService(indexService *elastic.BulkIndexRequest, bulkService *elastic.BulkProcessor) IndexServiceWrapper {
return IndexServiceWrapper{bulkIndexReq: indexService, bulkService: bulkService}
func WrapESIndexService(indexService *elastic.BulkIndexRequest, bulkService *elastic.BulkProcessor, esVersion int) IndexServiceWrapper {
return IndexServiceWrapper{bulkIndexReq: indexService, bulkService: bulkService, esVersion: esVersion}
}

// Index calls this function to internal service.
func (i IndexServiceWrapper) Index(index string) es.IndexService {
return WrapESIndexService(i.bulkIndexReq.Index(index), i.bulkService)
return WrapESIndexService(i.bulkIndexReq.Index(index), i.bulkService, i.esVersion)
}

// Type calls this function to internal service.
func (i IndexServiceWrapper) Type(typ string) es.IndexService {
return WrapESIndexService(i.bulkIndexReq.Type(typ), i.bulkService)
if i.esVersion == 7 {
return WrapESIndexService(i.bulkIndexReq, i.bulkService, i.esVersion)
}
return WrapESIndexService(i.bulkIndexReq.Type(typ), i.bulkService, i.esVersion)
}

// Add adds the request to bulk service
Expand All @@ -172,11 +190,6 @@ func WrapESSearchService(searchService *elastic.SearchService) SearchServiceWrap
return SearchServiceWrapper{searchService: searchService}
}

// Type calls this function to internal service.
func (s SearchServiceWrapper) Type(typ string) es.SearchService {
return WrapESSearchService(s.searchService.Type(typ))
}

// Size calls this function to internal service.
func (s SearchServiceWrapper) Size(size int) es.SearchService {
return WrapESSearchService(s.searchService.Size(size))
Expand Down
4 changes: 2 additions & 2 deletions pkg/es/wrapper/wrapper_nolint.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@ import "github.com/jaegertracing/jaeger/pkg/es"

// Id calls this function to internal service.
func (i IndexServiceWrapper) Id(id string) es.IndexService {
return WrapESIndexService(i.bulkIndexReq.Id(id), i.bulkService)
return WrapESIndexService(i.bulkIndexReq.Id(id), i.bulkService, i.esVersion)
}

// BodyJson calls this function to internal service.
func (i IndexServiceWrapper) BodyJson(body interface{}) es.IndexService {
return WrapESIndexService(i.bulkIndexReq.Doc(body), i.bulkService)
return WrapESIndexService(i.bulkIndexReq.Doc(body), i.bulkService, i.esVersion)
}
14 changes: 14 additions & 0 deletions plugin/storage/es/dependencystore/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,17 @@ const dependenciesMapping = `{
"` + dependencyType + `":{}
}
}`

const dependenciesMapping7 = `{
"settings":{
"index.requests.cache.enable":true
},
"mappings":{}
}`

func getMapping(version int) string {
if version == 7 {
return dependenciesMapping7
}
return dependenciesMapping
}
5 changes: 2 additions & 3 deletions plugin/storage/es/dependencystore/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ import (
"encoding/json"
"time"

"github.com/olivere/elastic"
"github.com/pkg/errors"
"go.uber.org/zap"
"gopkg.in/olivere/elastic.v5"

"github.com/jaegertracing/jaeger/model"
"github.com/jaegertracing/jaeger/pkg/es"
Expand Down Expand Up @@ -67,7 +67,7 @@ func (s *DependencyStore) WriteDependencies(ts time.Time, dependencies []model.D
}

func (s *DependencyStore) createIndex(indexName string) error {
_, err := s.client.CreateIndex(indexName).Body(dependenciesMapping).Do(s.ctx)
_, err := s.client.CreateIndex(indexName).Body(getMapping(s.client.GetVersion())).Do(s.ctx)
if err != nil {
return errors.Wrap(err, "Failed to create index")
}
Expand All @@ -85,7 +85,6 @@ func (s *DependencyStore) writeDependencies(indexName string, ts time.Time, depe
func (s *DependencyStore) GetDependencies(endTs time.Time, lookback time.Duration) ([]model.DependencyLink, error) {
indices := getIndices(s.indexPrefix, endTs, lookback)
searchResult, err := s.client.Search(indices...).
Type(dependencyType).
Size(10000). // the default elasticsearch allowed limit
Query(buildTSQuery(endTs, lookback)).
IgnoreUnavailable(true).
Expand Down
Loading

0 comments on commit 4bf5e9b

Please sign in to comment.