Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Elasticsearch 7 support #1690

Merged
merged 21 commits into from
Aug 22, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
2cc7221
Make it possible to run es-integration-test against different version…
gregoryfranklin Jul 19, 2019
b93a996
Update github.com/olivere/elastic to 6.2.21
gregoryfranklin Jul 22, 2019
3e26403
Migrate to a single '_doc' elasticsearch document type
gregoryfranklin Jul 22, 2019
6760141
Add IncludeTypeName for compatability with elasticsearch 7.x
gregoryfranklin Jul 22, 2019
013fe07
Remove deprecated __default__ field from es mappings
gregoryfranklin Jul 25, 2019
add36a7
Rebase on master. Fix for ES5
gregoryfranklin Aug 5, 2019
a432be9
Update esRollover.py to use per-version templates
gregoryfranklin Aug 5, 2019
7732370
Update github.com/olivere/elastic to 6.2.22 and use RestTotalHitsAsIn…
gregoryfranklin Aug 5, 2019
c26ce15
esRollover.py should set '?include_type_name=true' when creating temp…
gregoryfranklin Aug 5, 2019
84d45e4
Check loading of all versions of the ES mappings
gregoryfranklin Aug 5, 2019
c23b18b
Run es-integration-test.sh against ES5, ES6 and ES7
gregoryfranklin Aug 5, 2019
3a224ba
Run 'make fmt' to add license text to mocks
gregoryfranklin Aug 5, 2019
2564823
Update elasticsearch versions used for integration tests (and add --r…
gregoryfranklin Aug 13, 2019
b6c3f04
Do not use IncludeTypeName on ES7
gregoryfranklin Aug 14, 2019
b362dcf
Log elasticsearch version
gregoryfranklin Aug 14, 2019
a68f510
Make sure we can get the elasticsearch version on servers other than …
gregoryfranklin Aug 14, 2019
a157718
Fix es dependency storage test
gregoryfranklin Aug 14, 2019
c878afd
Fix token propagation test for elasticsearch version detection
gregoryfranklin Aug 19, 2019
ad227f4
Fix lint failure
gregoryfranklin Aug 19, 2019
8dcdc22
Use the same file for elasticsearch mappings on ES5 and ES6. Only ES…
gregoryfranklin Aug 19, 2019
22a2332
Change log message to use structured logging
gregoryfranklin Aug 19, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 13 additions & 13 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions Gopkg.toml
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,8 @@ required = [
version = "=1.20.1"

[[constraint]]
name = "gopkg.in/olivere/elastic.v5"
version = "^5.0.53"
name = "github.com/olivere/elastic"
version = "^6.2.22"

[[constraint]]
name = "github.com/gocql/gocql"
Expand Down Expand Up @@ -174,4 +174,4 @@ required = [

[[override]]
name = "github.com/Shopify/sarama"
revision = "cd910a683f9faa57222e5120d17b60d2e65f7fa9"
revision = "cd910a683f9faa57222e5120d17b60d2e65f7fa9"
4 changes: 2 additions & 2 deletions pkg/es/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (
"context"
"io"

"gopkg.in/olivere/elastic.v5"
"github.com/olivere/elastic"
)

// Client is an abstraction for elastic.Client
Expand All @@ -30,6 +30,7 @@ type Client interface {
Search(indices ...string) SearchService
MultiSearch() MultiSearchService
io.Closer
GetVersion() int
}

// IndicesExistsService is an abstraction for elastic.IndicesExistsService
Expand Down Expand Up @@ -60,7 +61,6 @@ type IndexService interface {

// SearchService is an abstraction for elastic.SearchService
type SearchService interface {
Type(typ string) SearchService
Size(size int) SearchService
Aggregation(name string, aggregation elastic.Aggregation) SearchService
IgnoreUnavailable(ignoreUnavailable bool) SearchService
Expand Down
17 changes: 15 additions & 2 deletions pkg/es/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,15 @@ import (
"io/ioutil"
"net/http"
"path/filepath"
"strconv"
"strings"
"sync"
"time"

"github.com/olivere/elastic"
"github.com/pkg/errors"
"github.com/uber/jaeger-lib/metrics"
"go.uber.org/zap"
"gopkg.in/olivere/elastic.v5"

"github.com/jaegertracing/jaeger/pkg/es"
"github.com/jaegertracing/jaeger/pkg/es/wrapper"
Expand Down Expand Up @@ -154,7 +155,19 @@ func (c *Configuration) NewClient(logger *zap.Logger, metricsFactory metrics.Fac
if err != nil {
return nil, err
}
return eswrapper.WrapESClient(rawClient, service), nil

// Determine ElasticSearch Version
pingResult, _, err := rawClient.Ping(c.Servers[0]).Do(context.Background())
if err != nil {
return nil, err
}
esVersion, err := strconv.Atoi(string(pingResult.Version.Number[0]))
if err != nil {
return nil, err
}
logger.Info("Elasticsearch detected", zap.Int("version", esVersion))

gregoryfranklin marked this conversation as resolved.
Show resolved Hide resolved
return eswrapper.WrapESClient(rawClient, service, esVersion), nil
}

// ApplyDefaults copies settings from source unless its own value is non-zero.
Expand Down
14 changes: 14 additions & 0 deletions pkg/es/mocks/Client.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pkg/es/mocks/IndicesCreateService.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pkg/es/mocks/MultiSearchService.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 1 addition & 17 deletions pkg/es/mocks/SearchService.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pkg/es/mocks/TemplateCreateService.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

43 changes: 28 additions & 15 deletions pkg/es/wrapper/wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ package eswrapper
import (
"context"

"gopkg.in/olivere/elastic.v5"
"github.com/olivere/elastic"

"github.com/jaegertracing/jaeger/pkg/es"
)
Expand All @@ -28,11 +28,17 @@ import (
type ClientWrapper struct {
client *elastic.Client
bulkService *elastic.BulkProcessor
esVersion int
}

// GetVersion returns the ElasticSearch Version
func (c ClientWrapper) GetVersion() int {
return c.esVersion
}

// WrapESClient creates a ESClient out of *elastic.Client.
func WrapESClient(client *elastic.Client, s *elastic.BulkProcessor) ClientWrapper {
return ClientWrapper{client: client, bulkService: s}
func WrapESClient(client *elastic.Client, s *elastic.BulkProcessor, esVersion int) ClientWrapper {
return ClientWrapper{client: client, bulkService: s, esVersion: esVersion}
}

// IndexExists calls this function to internal client.
Expand All @@ -53,17 +59,25 @@ func (c ClientWrapper) CreateTemplate(ttype string) es.TemplateCreateService {
// Index calls this function to internal client.
func (c ClientWrapper) Index() es.IndexService {
r := elastic.NewBulkIndexRequest()
return WrapESIndexService(r, c.bulkService)
return WrapESIndexService(r, c.bulkService, c.esVersion)
}

// Search calls this function to internal client.
func (c ClientWrapper) Search(indices ...string) es.SearchService {
return WrapESSearchService(c.client.Search(indices...))
searchService := c.client.Search(indices...)
if c.esVersion == 7 {
searchService = searchService.RestTotalHitsAsInt(true)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need this?

rest_total_hits_as_int or restTotalHitsAsInt | boolean - Indicates whether hits.total should be rendered as an integer or an object in the rest search response

https://www.elastic.co/guide/en/elasticsearch/reference/current/breaking-changes-7.0.html#hits-total-now-object-search-response

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, rest_total_hits_as_int is required whilst using the v6 branch of github.com/olivere/elastic because the result from elasticsearch needs to be unmarshalled into a go struct.
The go struct that comes with github.com/olivere/elastic v6 matches the response from ES5 and ES6 but only matches ES7 if rest_total_hits_as_int is used.

}
return WrapESSearchService(searchService)
}

// MultiSearch calls this function to internal client.
func (c ClientWrapper) MultiSearch() es.MultiSearchService {
return WrapESMultiSearchService(c.client.MultiSearch())
multiSearchService := c.client.MultiSearch()
if c.esVersion == 7 {
multiSearchService = multiSearchService.RestTotalHitsAsInt(true)
}
return WrapESMultiSearchService(multiSearchService)
}

// Close closes ESClient and flushes all data to the storage.
Expand Down Expand Up @@ -137,21 +151,25 @@ func (c TemplateCreateServiceWrapper) Do(ctx context.Context) (*elastic.IndicesP
type IndexServiceWrapper struct {
bulkIndexReq *elastic.BulkIndexRequest
bulkService *elastic.BulkProcessor
esVersion int
}

// WrapESIndexService creates an ESIndexService out of *elastic.ESIndexService.
func WrapESIndexService(indexService *elastic.BulkIndexRequest, bulkService *elastic.BulkProcessor) IndexServiceWrapper {
return IndexServiceWrapper{bulkIndexReq: indexService, bulkService: bulkService}
func WrapESIndexService(indexService *elastic.BulkIndexRequest, bulkService *elastic.BulkProcessor, esVersion int) IndexServiceWrapper {
return IndexServiceWrapper{bulkIndexReq: indexService, bulkService: bulkService, esVersion: esVersion}
}

// Index calls this function to internal service.
func (i IndexServiceWrapper) Index(index string) es.IndexService {
return WrapESIndexService(i.bulkIndexReq.Index(index), i.bulkService)
return WrapESIndexService(i.bulkIndexReq.Index(index), i.bulkService, i.esVersion)
}

// Type calls this function to internal service.
func (i IndexServiceWrapper) Type(typ string) es.IndexService {
return WrapESIndexService(i.bulkIndexReq.Type(typ), i.bulkService)
if i.esVersion == 7 {
return WrapESIndexService(i.bulkIndexReq, i.bulkService, i.esVersion)
}
return WrapESIndexService(i.bulkIndexReq.Type(typ), i.bulkService, i.esVersion)
}

// Add adds the request to bulk service
Expand All @@ -171,11 +189,6 @@ func WrapESSearchService(searchService *elastic.SearchService) SearchServiceWrap
return SearchServiceWrapper{searchService: searchService}
}

// Type calls this function to internal service.
func (s SearchServiceWrapper) Type(typ string) es.SearchService {
return WrapESSearchService(s.searchService.Type(typ))
}

// Size calls this function to internal service.
func (s SearchServiceWrapper) Size(size int) es.SearchService {
return WrapESSearchService(s.searchService.Size(size))
Expand Down
4 changes: 2 additions & 2 deletions pkg/es/wrapper/wrapper_nolint.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@ import "github.com/jaegertracing/jaeger/pkg/es"

// Id calls this function to internal service.
func (i IndexServiceWrapper) Id(id string) es.IndexService {
return WrapESIndexService(i.bulkIndexReq.Id(id), i.bulkService)
return WrapESIndexService(i.bulkIndexReq.Id(id), i.bulkService, i.esVersion)
}

// BodyJson calls this function to internal service.
func (i IndexServiceWrapper) BodyJson(body interface{}) es.IndexService {
return WrapESIndexService(i.bulkIndexReq.Doc(body), i.bulkService)
return WrapESIndexService(i.bulkIndexReq.Doc(body), i.bulkService, i.esVersion)
}
14 changes: 14 additions & 0 deletions plugin/storage/es/dependencystore/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,17 @@ const dependenciesMapping = `{
"` + dependencyType + `":{}
}
}`

const dependenciesMapping7 = `{
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need this? The legacy mapping should work with the ES7. I think you added this after I tested this and it worked without this change.

If it works please revert this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The legacy mapping does not work with ES7 because the legacy mapping includes the type name. Type names are not permitted in ES7 unless you use the include_type_name parameter, which we removed because it is deprecated.

"settings":{
"index.requests.cache.enable":true
},
"mappings":{}
}`

func getMapping(version int) string {
if version == 7 {
return dependenciesMapping7
}
return dependenciesMapping
}
5 changes: 2 additions & 3 deletions plugin/storage/es/dependencystore/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ import (
"encoding/json"
"time"

"github.com/olivere/elastic"
"github.com/pkg/errors"
"go.uber.org/zap"
"gopkg.in/olivere/elastic.v5"

"github.com/jaegertracing/jaeger/model"
"github.com/jaegertracing/jaeger/pkg/es"
Expand Down Expand Up @@ -66,7 +66,7 @@ func (s *DependencyStore) WriteDependencies(ts time.Time, dependencies []model.D
}

func (s *DependencyStore) createIndex(indexName string) error {
_, err := s.client.CreateIndex(indexName).Body(dependenciesMapping).Do(s.ctx)
_, err := s.client.CreateIndex(indexName).Body(getMapping(s.client.GetVersion())).Do(s.ctx)
if err != nil {
return errors.Wrap(err, "Failed to create index")
}
Expand All @@ -84,7 +84,6 @@ func (s *DependencyStore) writeDependencies(indexName string, ts time.Time, depe
func (s *DependencyStore) GetDependencies(endTs time.Time, lookback time.Duration) ([]model.DependencyLink, error) {
indices := getIndices(s.indexPrefix, endTs, lookback)
searchResult, err := s.client.Search(indices...).
Type(dependencyType).
Size(10000). // the default elasticsearch allowed limit
Query(buildTSQuery(endTs, lookback)).
IgnoreUnavailable(true).
Expand Down
Loading