Skip to content

Commit

Permalink
Use new ES reader implementation in OTEL (#2441)
Browse files Browse the repository at this point in the history
* Use new ES reader implementation in OTEL

Signed-off-by: Pavol Loffay <[email protected]>

* fmt

Signed-off-by: Pavol Loffay <[email protected]>

* Fix ITest

Signed-off-by: Pavol Loffay <[email protected]>

* fixes after rebase

Signed-off-by: Pavol Loffay <[email protected]>
  • Loading branch information
pavolloffay authored Sep 3, 2020
1 parent 21de269 commit 043d00c
Show file tree
Hide file tree
Showing 11 changed files with 469 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
// new creates Elasticsearch exporter/storage.
func new(ctx context.Context, config *Config, params component.ExporterCreateParams) (component.TraceExporter, error) {
esCfg := config.GetPrimary()
w, err := newEsSpanWriter(*esCfg, params.Logger)
w, err := newEsSpanWriter(*esCfg, params.Logger, false)
if err != nil {
return nil, err
}
Expand Down
51 changes: 51 additions & 0 deletions cmd/opentelemetry/app/exporter/elasticsearchexporter/index_name.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
// Copyright (c) 2020 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package elasticsearchexporter

import "time"

func newIndexNameProvider(index, prefix string, useAliases bool, useArchive bool) indexNameProvider {
if prefix != "" {
prefix = prefix + "-"
index = prefix + index
}
index = index + "-"
if useArchive {
index = index + "archive"
}
if useAliases {
if index[len(index)-1] != '-' {
index += "-"
}
index = index + "write"
}
return indexNameProvider{
index: index,
useSingleIndex: useAliases || useArchive,
}
}

type indexNameProvider struct {
index string
useSingleIndex bool
}

func (n indexNameProvider) get(date time.Time) string {
if n.useSingleIndex {
return n.index
}
spanDate := date.UTC().Format(indexDateFormat)
return n.index + spanDate
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
// Copyright (c) 2020 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package elasticsearchexporter

import (
"testing"
"time"

"github.com/stretchr/testify/assert"
)

func TestIndexName(t *testing.T) {
tests := []struct {
name string
index string
nameProvider indexNameProvider
date time.Time
end time.Time
}{
{
name: "index prefix",
nameProvider: newIndexNameProvider("myindex", "production", false, false),
index: "production-myindex-0001-01-01",
},
{
name: "no prefix",
nameProvider: newIndexNameProvider("myindex", "", false, false),
index: "myindex-2020-08-28",
date: time.Date(2020, 8, 28, 0, 0, 0, 0, time.UTC),
},
{
name: "use aliases",
nameProvider: newIndexNameProvider("myindex", "", true, false),
index: "myindex-write",
date: time.Date(2020, 8, 28, 0, 0, 0, 0, time.UTC),
},
{
name: "use archive",
nameProvider: newIndexNameProvider("myindex", "", false, true),
index: "myindex-archive",
date: time.Date(2020, 8, 28, 0, 0, 0, 0, time.UTC),
},
{
name: "use archive alias",
nameProvider: newIndexNameProvider("myindex", "", true, true),
index: "myindex-archive-write",
date: time.Date(2020, 8, 28, 0, 0, 0, 0, time.UTC),
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
indices := test.nameProvider.get(test.date)
assert.Equal(t, test.index, indices)
})
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func (s *IntegrationTest) initSpanstore(allTagsAsFields bool) error {
AllAsFields: allTagsAsFields,
},
}
w, err := newEsSpanWriter(cfg, s.logger)
w, err := newEsSpanWriter(cfg, s.logger, false)
if err != nil {
return err
}
Expand Down
53 changes: 15 additions & 38 deletions cmd/opentelemetry/app/exporter/elasticsearchexporter/spanstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,11 @@ type esSpanWriter struct {
spanIndexName indexNameProvider
serviceIndexName indexNameProvider
translator *esmodeltranslator.Translator
isArchive bool
}

// newEsSpanWriter creates new instance of esSpanWriter
func newEsSpanWriter(params config.Configuration, logger *zap.Logger) (*esSpanWriter, error) {
func newEsSpanWriter(params config.Configuration, logger *zap.Logger, archive bool) (*esSpanWriter, error) {
client, err := esclient.NewElasticsearchClient(params, logger)
if err != nil {
return nil, err
Expand All @@ -66,9 +67,10 @@ func newEsSpanWriter(params config.Configuration, logger *zap.Logger) (*esSpanWr
}
return &esSpanWriter{
client: client,
spanIndexName: newIndexNameProvider(spanIndexBaseName, params.IndexPrefix, params.UseReadWriteAliases),
serviceIndexName: newIndexNameProvider(serviceIndexBaseName, params.IndexPrefix, params.UseReadWriteAliases),
spanIndexName: newIndexNameProvider(spanIndexBaseName, params.IndexPrefix, params.UseReadWriteAliases, archive),
serviceIndexName: newIndexNameProvider(serviceIndexBaseName, params.IndexPrefix, params.UseReadWriteAliases, archive),
translator: esmodeltranslator.NewTranslator(params.Tags.AllAsFields, tagsKeysAsFields, params.GetTagDotReplacement()),
isArchive: archive,
serviceCache: cache.NewLRUWithOptions(
// we do not expect more than 100k unique services
100_000,
Expand All @@ -79,34 +81,6 @@ func newEsSpanWriter(params config.Configuration, logger *zap.Logger) (*esSpanWr
}, nil
}

func newIndexNameProvider(index, prefix string, useAliases bool) indexNameProvider {
if prefix != "" {
prefix = prefix + "-"
index = prefix + index
}
index = index + "-"
if useAliases {
index = index + "write"
}
return indexNameProvider{
index: index,
useAlias: useAliases,
}
}

type indexNameProvider struct {
index string
useAlias bool
}

func (n indexNameProvider) get(date time.Time) string {
if n.useAlias {
return n.index
}
spanDate := date.UTC().Format(indexDateFormat)
return n.index + spanDate
}

// CreateTemplates creates index templates.
func (w *esSpanWriter) CreateTemplates(ctx context.Context, spanTemplate, serviceTemplate string) error {
err := w.client.PutTemplate(context.Background(), spanIndexBaseName, strings.NewReader(spanTemplate))
Expand Down Expand Up @@ -145,13 +119,16 @@ func (w *esSpanWriter) writeSpans(ctx context.Context, spans []*dbmodel.Span) (i
indexName := w.spanIndexName.get(model.EpochMicrosecondsAsTime(span.StartTime))
bulkOperations = append(bulkOperations, bulkItem{span: span, isService: false})
w.client.AddDataToBulkBuffer(buffer, data, indexName, spanTypeName)
write, err := w.writeService(span, buffer)
if err != nil {
errs = append(errs, err)
// dropped is not increased since this is only service name, the span could be written well
continue
} else if write {
bulkOperations = append(bulkOperations, bulkItem{span: span, isService: true})

if !w.isArchive {
storeService, err := w.writeService(span, buffer)
if err != nil {
errs = append(errs, err)
// dropped is not increased since this is only service name, the span could be written well
continue
} else if storeService {
bulkOperations = append(bulkOperations, bulkItem{span: span, isService: true})
}
}
}
res, err := w.client.Bulk(ctx, bytes.NewReader(buffer.Bytes()))
Expand Down
152 changes: 152 additions & 0 deletions cmd/opentelemetry/app/exporter/elasticsearchexporter/storagefactory.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
// Copyright (c) 2020 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package elasticsearchexporter

import (
"context"

"github.com/uber/jaeger-lib/metrics"
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/cmd/opentelemetry/app/internal/esclient"
"github.com/jaegertracing/jaeger/cmd/opentelemetry/app/internal/reader/es/esdependencyreader"
"github.com/jaegertracing/jaeger/cmd/opentelemetry/app/internal/reader/es/esspanreader"
"github.com/jaegertracing/jaeger/model"
"github.com/jaegertracing/jaeger/plugin/storage/es"
"github.com/jaegertracing/jaeger/plugin/storage/es/spanstore/dbmodel"
"github.com/jaegertracing/jaeger/storage"
"github.com/jaegertracing/jaeger/storage/dependencystore"
"github.com/jaegertracing/jaeger/storage/spanstore"
)

const archiveNamespace = "es-archive"

// StorageFactory implements storage.Factory and storage.ArchiveFactory
type StorageFactory struct {
Options *es.Options
logger *zap.Logger
}

var _ storage.Factory = (*StorageFactory)(nil)
var _ storage.ArchiveFactory = (*StorageFactory)(nil)

// NewStorageFactory creates StorageFactory
func NewStorageFactory(opts *es.Options, logger *zap.Logger) *StorageFactory {
return &StorageFactory{
Options: opts,
logger: logger,
}
}

// Initialize initializes StorageFactory
func (s *StorageFactory) Initialize(_ metrics.Factory, logger *zap.Logger) error {
s.logger = logger
return nil
}

// CreateSpanWriter creates spanstore.Writer
func (s *StorageFactory) CreateSpanWriter() (spanstore.Writer, error) {
cfg := s.Options.GetPrimary()
writer, err := newEsSpanWriter(*cfg, s.logger, false)
if err != nil {
return nil, err
}
fields, err := cfg.TagKeysAsFields()
if err != nil {
return nil, err
}
return &singleSpanWriter{
converter: dbmodel.NewFromDomain(cfg.GetAllTagsAsFields(), fields, cfg.GetTagDotReplacement()),
writer: writer,
}, nil
}

// CreateSpanReader creates spanstore.Reader
func (s *StorageFactory) CreateSpanReader() (spanstore.Reader, error) {
cfg := s.Options.GetPrimary()
client, err := esclient.NewElasticsearchClient(*cfg, s.logger)
if err != nil {
return nil, err
}
return esspanreader.NewEsSpanReader(client, s.logger, esspanreader.Config{
Archive: false,
UseReadWriteAliases: cfg.GetUseReadWriteAliases(),
IndexPrefix: cfg.GetIndexPrefix(),
MaxSpanAge: cfg.GetMaxSpanAge(),
MaxNumSpans: cfg.GetMaxNumSpans(),
TagDotReplacement: cfg.GetTagDotReplacement(),
}), nil
}

// CreateDependencyReader creates dependencystore.Reader
func (s *StorageFactory) CreateDependencyReader() (dependencystore.Reader, error) {
cfg := s.Options.GetPrimary()
client, err := esclient.NewElasticsearchClient(*cfg, s.logger)
if err != nil {
return nil, err
}
return esdependencyreader.NewDependencyStore(client, s.logger, cfg.GetIndexPrefix()), nil
}

// CreateArchiveSpanReader creates archive spanstore.Reader
func (s *StorageFactory) CreateArchiveSpanReader() (spanstore.Reader, error) {
cfg := s.Options.Get(archiveNamespace)
client, err := esclient.NewElasticsearchClient(*cfg, s.logger)
if err != nil {
return nil, err
}
return esspanreader.NewEsSpanReader(client, s.logger, esspanreader.Config{
Archive: true,
UseReadWriteAliases: cfg.GetUseReadWriteAliases(),
IndexPrefix: cfg.GetIndexPrefix(),
MaxSpanAge: cfg.GetMaxSpanAge(),
MaxNumSpans: cfg.GetMaxNumSpans(),
TagDotReplacement: cfg.GetTagDotReplacement(),
}), nil
}

// CreateArchiveSpanWriter creates archive spanstore.Writer
func (s *StorageFactory) CreateArchiveSpanWriter() (spanstore.Writer, error) {
cfg := s.Options.Get(archiveNamespace)
writer, err := newEsSpanWriter(*cfg, s.logger, true)
if err != nil {
return nil, err
}
fields, err := cfg.TagKeysAsFields()
if err != nil {
return nil, err
}
return &singleSpanWriter{
converter: dbmodel.NewFromDomain(cfg.GetAllTagsAsFields(), fields, cfg.GetTagDotReplacement()),
writer: writer,
}, nil
}

type singleSpanWriter struct {
writer batchSpanWriter
converter dbmodel.FromDomain
}

type batchSpanWriter interface {
writeSpans(context.Context, []*dbmodel.Span) (int, error)
}

var _ spanstore.Writer = (*singleSpanWriter)(nil)

func (s singleSpanWriter) WriteSpan(ctx context.Context, span *model.Span) error {
dbSpan := s.converter.FromDomainEmbedProcess(span)
_, err := s.writer.writeSpans(ctx, []*dbmodel.Span{dbSpan})
return err
}
Loading

0 comments on commit 043d00c

Please sign in to comment.