Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[v2] Add v1 factory converter to v2 storage factory #5497

Merged
merged 21 commits into from
Jun 19, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions storage_v2/converter/empty_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
// Copyright (c) 2024 The Jaeger Authors.
// SPDX-License-Identifier: Apache-2.0

package converter

import (
"testing"

"github.com/jaegertracing/jaeger/pkg/testutils"
)

func TestMain(m *testing.M) {
testutils.VerifyGoLeaks(m)
}
60 changes: 60 additions & 0 deletions storage_v2/converter/factory.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
// Copyright (c) 2024 The Jaeger Authors.
// SPDX-License-Identifier: Apache-2.0

package converter

import (
"context"
"time"

"go.uber.org/zap"

"github.com/jaegertracing/jaeger/model"
"github.com/jaegertracing/jaeger/pkg/metrics"
storage_v1 "github.com/jaegertracing/jaeger/storage"
dependencystore_v1 "github.com/jaegertracing/jaeger/storage/dependencystore"
spanstore_v1 "github.com/jaegertracing/jaeger/storage/spanstore"
"github.com/jaegertracing/jaeger/storage_v2/spanstore"
)

type Factory struct {
ss spanstore.Factory
}

func NewFactory(ss spanstore.Factory) storage_v1.Factory {
yurishkuro marked this conversation as resolved.
Show resolved Hide resolved
return &Factory{
ss: ss,

Check warning on line 26 in storage_v2/converter/factory.go

View check run for this annotation

Codecov / codecov/patch

storage_v2/converter/factory.go#L24-L26

Added lines #L24 - L26 were not covered by tests
}
}

func (f *Factory) Initialize(metricsFactory metrics.Factory, logger *zap.Logger) error {
return f.ss.Initialize(context.Background())

Check warning on line 31 in storage_v2/converter/factory.go

View check run for this annotation

Codecov / codecov/patch

storage_v2/converter/factory.go#L30-L31

Added lines #L30 - L31 were not covered by tests
}

func (f *Factory) CreateSpanReader() (spanstore_v1.Reader, error) {
traceReader, err := f.ss.CreateTraceReader()
if err != nil {
return nil, err

Check warning on line 37 in storage_v2/converter/factory.go

View check run for this annotation

Codecov / codecov/patch

storage_v2/converter/factory.go#L34-L37

Added lines #L34 - L37 were not covered by tests
}

return NewSpanReader(traceReader)

Check warning on line 40 in storage_v2/converter/factory.go

View check run for this annotation

Codecov / codecov/patch

storage_v2/converter/factory.go#L40

Added line #L40 was not covered by tests
}

func (f *Factory) CreateSpanWriter() (spanstore_v1.Writer, error) {
traceWriter, err := f.ss.CreateTraceWriter()
if err != nil {
return nil, err

Check warning on line 46 in storage_v2/converter/factory.go

View check run for this annotation

Codecov / codecov/patch

storage_v2/converter/factory.go#L43-L46

Added lines #L43 - L46 were not covered by tests
}

return NewSpanWriter(traceWriter)

Check warning on line 49 in storage_v2/converter/factory.go

View check run for this annotation

Codecov / codecov/patch

storage_v2/converter/factory.go#L49

Added line #L49 was not covered by tests
}

type UnimplementedDependencyStore struct{}

func (s *UnimplementedDependencyStore) GetDependencies(ctx context.Context, endTs time.Time, lookback time.Duration) ([]model.DependencyLink, error) {
panic("not implemented")

Check warning on line 55 in storage_v2/converter/factory.go

View check run for this annotation

Codecov / codecov/patch

storage_v2/converter/factory.go#L54-L55

Added lines #L54 - L55 were not covered by tests
}

func (f *Factory) CreateDependencyReader() (dependencystore_v1.Reader, error) {
return &UnimplementedDependencyStore{}, nil

Check warning on line 59 in storage_v2/converter/factory.go

View check run for this annotation

Codecov / codecov/patch

storage_v2/converter/factory.go#L58-L59

Added lines #L58 - L59 were not covered by tests
}
153 changes: 153 additions & 0 deletions storage_v2/converter/reader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
// Copyright (c) 2024 The Jaeger Authors.
// SPDX-License-Identifier: Apache-2.0

package converter

import (
"context"
"encoding/binary"
"fmt"

otlp2jaeger "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger"

"github.com/jaegertracing/jaeger/model"
spanstore_v1 "github.com/jaegertracing/jaeger/storage/spanstore"
"github.com/jaegertracing/jaeger/storage_v2/spanstore"
)

type SpanReader struct {
traceReader spanstore.Reader
}

func NewSpanReader(traceReader spanstore.Reader) (spanstore_v1.Reader, error) {
return &SpanReader{
traceReader: traceReader,
}, nil

Check warning on line 25 in storage_v2/converter/reader.go

View check run for this annotation

Codecov / codecov/patch

storage_v2/converter/reader.go#L22-L25

Added lines #L22 - L25 were not covered by tests
}

// FindTraceIDs implements spanstore.Reader.
yurishkuro marked this conversation as resolved.
Show resolved Hide resolved
func (s *SpanReader) FindTraceIDs(ctx context.Context, query *spanstore_v1.TraceQueryParameters) ([]model.TraceID, error) {
IDs, err := s.traceReader.FindTraceIDs(ctx, spanstore.TraceQueryParameters{
ServiceName: query.ServiceName,
OperationName: query.OperationName,
Tags: query.Tags,
StartTimeMin: query.StartTimeMin,
StartTimeMax: query.StartTimeMax,
DurationMin: query.DurationMin,
DurationMax: query.DurationMax,
NumTraces: query.NumTraces,
})
if err != nil {
return []model.TraceID{}, err

Check warning on line 41 in storage_v2/converter/reader.go

View check run for this annotation

Codecov / codecov/patch

storage_v2/converter/reader.go#L29-L41

Added lines #L29 - L41 were not covered by tests
}

traceIDs := []model.TraceID{}
for _, ID := range IDs {

Check warning on line 45 in storage_v2/converter/reader.go

View check run for this annotation

Codecov / codecov/patch

storage_v2/converter/reader.go#L44-L45

Added lines #L44 - L45 were not covered by tests
// otelcol-contrib has the translator to jaeger proto but declared in private function
// similar to https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/internal/coreinternal/idutils/big_endian_converter.go#L21
traceIDHigh, traceIDLow := binary.BigEndian.Uint64(ID[:8]), binary.BigEndian.Uint64(ID[8:])
traceIDs = append(traceIDs, model.TraceID{
Low: traceIDLow,
High: traceIDHigh,
})

Check warning on line 52 in storage_v2/converter/reader.go

View check run for this annotation

Codecov / codecov/patch

storage_v2/converter/reader.go#L48-L52

Added lines #L48 - L52 were not covered by tests
}

return traceIDs, nil

Check warning on line 55 in storage_v2/converter/reader.go

View check run for this annotation

Codecov / codecov/patch

storage_v2/converter/reader.go#L55

Added line #L55 was not covered by tests
}

// FindTraces implements spanstore.Reader.
func (s *SpanReader) FindTraces(ctx context.Context, query *spanstore_v1.TraceQueryParameters) ([]*model.Trace, error) {
tds, err := s.traceReader.FindTraces(ctx, spanstore.TraceQueryParameters{
ServiceName: query.ServiceName,
OperationName: query.OperationName,
Tags: query.Tags,
StartTimeMin: query.StartTimeMin,
StartTimeMax: query.StartTimeMax,
DurationMin: query.DurationMin,
DurationMax: query.DurationMax,
NumTraces: query.NumTraces,
})
if err != nil {
return []*model.Trace{}, err

Check warning on line 71 in storage_v2/converter/reader.go

View check run for this annotation

Codecov / codecov/patch

storage_v2/converter/reader.go#L59-L71

Added lines #L59 - L71 were not covered by tests
}

traces := []*model.Trace{}
for _, td := range tds {
batches, err := otlp2jaeger.ProtoFromTraces(td)
if err != nil {
return []*model.Trace{}, fmt.Errorf("cannot transform OTLP trace to Jaeger format: %w", err)

Check warning on line 78 in storage_v2/converter/reader.go

View check run for this annotation

Codecov / codecov/patch

storage_v2/converter/reader.go#L74-L78

Added lines #L74 - L78 were not covered by tests
}

trace := &model.Trace{}
for _, batch := range batches {
for _, span := range batch.Spans {
if span.Process == nil {
span.Process = batch.Process

Check warning on line 85 in storage_v2/converter/reader.go

View check run for this annotation

Codecov / codecov/patch

storage_v2/converter/reader.go#L81-L85

Added lines #L81 - L85 were not covered by tests
}
}
trace.Spans = append(trace.Spans, batch.Spans...)

Check warning on line 88 in storage_v2/converter/reader.go

View check run for this annotation

Codecov / codecov/patch

storage_v2/converter/reader.go#L88

Added line #L88 was not covered by tests
}
traces = append(traces, trace)

Check warning on line 90 in storage_v2/converter/reader.go

View check run for this annotation

Codecov / codecov/patch

storage_v2/converter/reader.go#L90

Added line #L90 was not covered by tests
}

return traces, nil

Check warning on line 93 in storage_v2/converter/reader.go

View check run for this annotation

Codecov / codecov/patch

storage_v2/converter/reader.go#L93

Added line #L93 was not covered by tests
}

// GetOperations implements spanstore.Reader.
func (s *SpanReader) GetOperations(ctx context.Context, query spanstore_v1.OperationQueryParameters) ([]spanstore_v1.Operation, error) {
ops, err := s.traceReader.GetOperations(ctx, spanstore.OperationQueryParameters{
ServiceName: query.ServiceName,
SpanKind: query.SpanKind,
})
if err != nil {
return []spanstore_v1.Operation{}, err

Check warning on line 103 in storage_v2/converter/reader.go

View check run for this annotation

Codecov / codecov/patch

storage_v2/converter/reader.go#L97-L103

Added lines #L97 - L103 were not covered by tests
}

operations := []spanstore_v1.Operation{}
for _, op := range ops {
operations = append(operations, spanstore_v1.Operation{
Name: op.Name,
SpanKind: op.SpanKind,
})

Check warning on line 111 in storage_v2/converter/reader.go

View check run for this annotation

Codecov / codecov/patch

storage_v2/converter/reader.go#L106-L111

Added lines #L106 - L111 were not covered by tests
}
return operations, nil

Check warning on line 113 in storage_v2/converter/reader.go

View check run for this annotation

Codecov / codecov/patch

storage_v2/converter/reader.go#L113

Added line #L113 was not covered by tests
}

// GetServices implements spanstore.Reader.
func (s *SpanReader) GetServices(ctx context.Context) ([]string, error) {
services, err := s.traceReader.GetServices(ctx)
if err != nil {
return []string{}, err

Check warning on line 120 in storage_v2/converter/reader.go

View check run for this annotation

Codecov / codecov/patch

storage_v2/converter/reader.go#L117-L120

Added lines #L117 - L120 were not covered by tests
}

return services, nil

Check warning on line 123 in storage_v2/converter/reader.go

View check run for this annotation

Codecov / codecov/patch

storage_v2/converter/reader.go#L123

Added line #L123 was not covered by tests
}

// GetTrace implements spanstore.Reader.
func (s *SpanReader) GetTrace(ctx context.Context, traceID model.TraceID) (*model.Trace, error) {

Check warning on line 127 in storage_v2/converter/reader.go

View check run for this annotation

Codecov / codecov/patch

storage_v2/converter/reader.go#L127

Added line #L127 was not covered by tests
// otelcol-contrib has the translator to pcommon.TraceID but declared in private function
// similar to https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/internal/coreinternal/idutils/big_endian_converter.go#L13
ID := [16]byte{}
binary.BigEndian.PutUint64(ID[:8], traceID.High)
binary.BigEndian.PutUint64(ID[8:], traceID.Low)
td, err := s.traceReader.GetTrace(ctx, ID)
if err != nil {
return nil, err

Check warning on line 135 in storage_v2/converter/reader.go

View check run for this annotation

Codecov / codecov/patch

storage_v2/converter/reader.go#L130-L135

Added lines #L130 - L135 were not covered by tests
}

batches, err := otlp2jaeger.ProtoFromTraces(td)
if err != nil {
return nil, fmt.Errorf("cannot transform OTLP trace to Jaeger format: %w", err)

Check warning on line 140 in storage_v2/converter/reader.go

View check run for this annotation

Codecov / codecov/patch

storage_v2/converter/reader.go#L138-L140

Added lines #L138 - L140 were not covered by tests
}

trace := &model.Trace{}
for _, batch := range batches {
for _, span := range batch.Spans {
if span.Process == nil {
span.Process = batch.Process

Check warning on line 147 in storage_v2/converter/reader.go

View check run for this annotation

Codecov / codecov/patch

storage_v2/converter/reader.go#L143-L147

Added lines #L143 - L147 were not covered by tests
}
}
trace.Spans = append(trace.Spans, batch.Spans...)

Check warning on line 150 in storage_v2/converter/reader.go

View check run for this annotation

Codecov / codecov/patch

storage_v2/converter/reader.go#L150

Added line #L150 was not covered by tests
}
return trace, nil

Check warning on line 152 in storage_v2/converter/reader.go

View check run for this annotation

Codecov / codecov/patch

storage_v2/converter/reader.go#L152

Added line #L152 was not covered by tests
}
35 changes: 35 additions & 0 deletions storage_v2/converter/writer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
// Copyright (c) 2024 The Jaeger Authors.
// SPDX-License-Identifier: Apache-2.0

package converter

import (
"context"
"fmt"

jaeger2otlp "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger"

"github.com/jaegertracing/jaeger/model"
spanstore_v1 "github.com/jaegertracing/jaeger/storage/spanstore"
"github.com/jaegertracing/jaeger/storage_v2/spanstore"
)

type SpanWriter struct {
traceWriter spanstore.Writer
}

func NewSpanWriter(traceWriter spanstore.Writer) (spanstore_v1.Writer, error) {
return &SpanWriter{
traceWriter: traceWriter,
}, nil

Check warning on line 24 in storage_v2/converter/writer.go

View check run for this annotation

Codecov / codecov/patch

storage_v2/converter/writer.go#L21-L24

Added lines #L21 - L24 were not covered by tests
}

// WriteSpan implements spanstore.Writer.
func (s *SpanWriter) WriteSpan(ctx context.Context, span *model.Span) error {
batch := []*model.Batch{{Spans: []*model.Span{span}}}
td, err := jaeger2otlp.ProtoToTraces(batch)
if err != nil {
return fmt.Errorf("cannot transform Jaeger span to OTLP trace format: %w", err)

Check warning on line 32 in storage_v2/converter/writer.go

View check run for this annotation

Codecov / codecov/patch

storage_v2/converter/writer.go#L28-L32

Added lines #L28 - L32 were not covered by tests
}
return s.traceWriter.WriteTraces(ctx, td)

Check warning on line 34 in storage_v2/converter/writer.go

View check run for this annotation

Codecov / codecov/patch

storage_v2/converter/writer.go#L34

Added line #L34 was not covered by tests
}
Loading