Skip to content

Commit

Permalink
Switch OTLP exporter to internal data format (#803)
Browse files Browse the repository at this point in the history
This commit switches OTLP exporter to use internal data format instead of OC which allows to use internal data for the whole OTLP-based pipeline.
  • Loading branch information
dmitryax authored Apr 8, 2020
1 parent 0b866c7 commit 0e1b2e3
Show file tree
Hide file tree
Showing 23 changed files with 881 additions and 379 deletions.
77 changes: 0 additions & 77 deletions consumer/converter.go

This file was deleted.

128 changes: 128 additions & 0 deletions consumer/converter/converter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
// Copyright 2019, OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package converter

import (
"context"

"github.com/open-telemetry/opentelemetry-collector/consumer"
"github.com/open-telemetry/opentelemetry-collector/consumer/consumerdata"
"github.com/open-telemetry/opentelemetry-collector/internal/data"
"github.com/open-telemetry/opentelemetry-collector/translator/internaldata"
)

// NewInternalToOCTraceConverter creates new internalToOCTraceConverter that takes TraceConsumer and
// implements ConsumeTrace interface.
func NewInternalToOCTraceConverter(tc consumer.TraceConsumerOld) consumer.TraceConsumer {
return &internalToOCTraceConverter{tc}
}

// internalToOCTraceConverter is a internal to oc translation shim that takes TraceConsumer and
// implements ConsumeTrace interface.
type internalToOCTraceConverter struct {
traceConsumer consumer.TraceConsumerOld
}

// ConsumeTrace takes new-style data.TraceData method, converts it to OC and uses old-style ConsumeTraceData method
// to process the trace data.
func (tc *internalToOCTraceConverter) ConsumeTrace(ctx context.Context, td data.TraceData) error {
ocTraces := internaldata.TraceDataToOC(td)
for i := range ocTraces {
err := tc.traceConsumer.ConsumeTraceData(ctx, ocTraces[i])
if err != nil {
return err
}
}
return nil
}

var _ consumer.TraceConsumer = (*internalToOCTraceConverter)(nil)

// NewInternalToOCMetricsConverter creates new internalToOCMetricsConverter that takes MetricsConsumer and
// implements ConsumeTrace interface.
func NewInternalToOCMetricsConverter(tc consumer.MetricsConsumerOld) consumer.MetricsConsumer {
return &internalToOCMetricsConverter{tc}
}

// internalToOCMetricsConverter is a internal to oc translation shim that takes MetricsConsumer and
// implements ConsumeMetrics interface.
type internalToOCMetricsConverter struct {
metricsConsumer consumer.MetricsConsumerOld
}

// ConsumeMetrics takes new-style data.MetricData method, converts it to OC and uses old-style ConsumeMetricsData method
// to process the metrics data.
func (tc *internalToOCMetricsConverter) ConsumeMetrics(ctx context.Context, td data.MetricData) error {
ocMetrics := internaldata.MetricDataToOC(td)
for i := range ocMetrics {
err := tc.metricsConsumer.ConsumeMetricsData(ctx, ocMetrics[i])
if err != nil {
return err
}
}
return nil
}

var _ consumer.MetricsConsumer = (*internalToOCMetricsConverter)(nil)

// NewOCToInternalTraceConverter creates new ocToInternalTraceConverter that takes TraceConsumer and
// implements ConsumeTrace interface.
func NewOCToInternalTraceConverter(tc consumer.TraceConsumer) consumer.TraceConsumerOld {
return &ocToInternalTraceConverter{tc}
}

// ocToInternalTraceConverter is a internal to oc translation shim that takes TraceConsumer and
// implements ConsumeTrace interface.
type ocToInternalTraceConverter struct {
traceConsumer consumer.TraceConsumer
}

// ConsumeTrace takes new-style data.TraceData method, converts it to OC and uses old-style ConsumeTraceData method
// to process the trace data.
func (tc *ocToInternalTraceConverter) ConsumeTraceData(ctx context.Context, td consumerdata.TraceData) error {
traceData := internaldata.OCToTraceData(td)
err := tc.traceConsumer.ConsumeTrace(ctx, traceData)
if err != nil {
return err
}

return nil
}

var _ consumer.TraceConsumerOld = (*ocToInternalTraceConverter)(nil)

// NewOCToInternalMetricsConverter creates new ocToInternalMetricsConverter that takes MetricsConsumer and
// implements ConsumeTrace interface.
func NewOCToInternalMetricsConverter(tc consumer.MetricsConsumer) consumer.MetricsConsumerOld {
return &ocToInternalMetricsConverter{tc}
}

// ocToInternalMetricsConverter is a internal to oc translation shim that takes MetricsConsumer and
// implements ConsumeMetrics interface.
type ocToInternalMetricsConverter struct {
metricsConsumer consumer.MetricsConsumer
}

// ConsumeMetrics takes new-style data.MetricData method, converts it to OC and uses old-style ConsumeMetricsData method
// to process the metrics data.
func (tc *ocToInternalMetricsConverter) ConsumeMetricsData(ctx context.Context, td consumerdata.MetricsData) error {
metricsData := internaldata.OCToMetricData(td)
err := tc.metricsConsumer.ConsumeMetrics(ctx, metricsData)
if err != nil {
return err
}
return nil
}

var _ consumer.MetricsConsumerOld = (*ocToInternalMetricsConverter)(nil)
100 changes: 100 additions & 0 deletions consumer/converter/converter_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
// Copyright 2019, OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package converter

import (
"context"
"fmt"
"testing"

"github.com/stretchr/testify/assert"

"github.com/open-telemetry/opentelemetry-collector/exporter/exportertest"
"github.com/open-telemetry/opentelemetry-collector/internal/data/testdata"
"github.com/open-telemetry/opentelemetry-collector/translator/internaldata"
)

func TestNewInternalToOCTraceConverter(t *testing.T) {
td := testdata.GenerateTraceDataTwoSpansSameResourceOneDifferent()
traceExporterOld := new(exportertest.SinkTraceExporterOld)
converter := NewInternalToOCTraceConverter(traceExporterOld)

err := converter.ConsumeTrace(context.Background(), td)
assert.Nil(t, err)

ocTraces := traceExporterOld.AllTraces()
assert.Equal(t, len(ocTraces), 2)
assert.EqualValues(t, ocTraces, internaldata.TraceDataToOC(td))

traceExporterOld.SetConsumeTraceError(fmt.Errorf("consumer error"))
err = converter.ConsumeTrace(context.Background(), td)
assert.NotNil(t, err)
}

func TestNewInternalToOCMetricsConverter(t *testing.T) {
md := testdata.GenerateMetricDataOneMetric()
metricsExporterOld := new(exportertest.SinkMetricsExporterOld)
converter := NewInternalToOCMetricsConverter(metricsExporterOld)

err := converter.ConsumeMetrics(context.Background(), md)
assert.Nil(t, err)

ocMetrics := metricsExporterOld.AllMetrics()
assert.Equal(t, len(ocMetrics), 1)
assert.EqualValues(t, ocMetrics, internaldata.MetricDataToOC(md))

metricsExporterOld.SetConsumeMetricsError(fmt.Errorf("consumer error"))
err = converter.ConsumeMetrics(context.Background(), md)
assert.NotNil(t, err)
}

func TestNewOCTraceToInternalTraceConverter(t *testing.T) {
td := testdata.GenerateTraceDataOneSpan()
ocTraceData := internaldata.TraceDataToOC(td)[0]
traceExporter := new(exportertest.SinkTraceExporter)
converter := NewOCToInternalTraceConverter(traceExporter)

err := converter.ConsumeTraceData(context.Background(), ocTraceData)
assert.Nil(t, err)
err = converter.ConsumeTraceData(context.Background(), ocTraceData)
assert.Nil(t, err)

ocTraces := traceExporter.AllTraces()
assert.Equal(t, len(ocTraces), 2)
assert.EqualValues(t, ocTraces[0], td)

traceExporter.SetConsumeTraceError(fmt.Errorf("consumer error"))
err = converter.ConsumeTraceData(context.Background(), ocTraceData)
assert.NotNil(t, err)
}

func TestNewOCToInternalMetricsConverter(t *testing.T) {
md := testdata.GenerateMetricDataOneMetric()
ocMetricData := internaldata.MetricDataToOC(md)[0]
metricsExporter := new(exportertest.SinkMetricsExporter)
converter := NewOCToInternalMetricsConverter(metricsExporter)

err := converter.ConsumeMetricsData(context.Background(), ocMetricData)
assert.Nil(t, err)
err = converter.ConsumeMetricsData(context.Background(), ocMetricData)
assert.Nil(t, err)

ocMetrics := metricsExporter.AllMetrics()
assert.Equal(t, len(ocMetrics), 2)
assert.EqualValues(t, ocMetrics[0], md)

metricsExporter.SetConsumeMetricsError(fmt.Errorf("consumer error"))
err = converter.ConsumeMetricsData(context.Background(), ocMetricData)
assert.NotNil(t, err)
}
Loading

0 comments on commit 0e1b2e3

Please sign in to comment.