Skip to content

Commit

Permalink
Add a split protocol driver
Browse files Browse the repository at this point in the history
This is a wrapper around two other protocol drivers, so it makes it
possible to send traces using a different protocol than the one used
for metrics.
  • Loading branch information
krnowak committed Dec 21, 2020
1 parent 439cd31 commit 0b67c2f
Show file tree
Hide file tree
Showing 4 changed files with 204 additions and 15 deletions.
9 changes: 5 additions & 4 deletions exporters/otlp/otlp_metric_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package otlp
package otlp_test

import (
"context"
Expand All @@ -23,6 +23,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"go.opentelemetry.io/otel/exporters/otlp"
commonpb "go.opentelemetry.io/otel/exporters/otlp/internal/opentelemetry-proto-gen/common/v1"
metricpb "go.opentelemetry.io/otel/exporters/otlp/internal/opentelemetry-proto-gen/metrics/v1"
resourcepb "go.opentelemetry.io/otel/exporters/otlp/internal/opentelemetry-proto-gen/resource/v1"
Expand Down Expand Up @@ -692,8 +693,8 @@ func TestStatelessExportKind(t *testing.T) {
t.Run(k.name, func(t *testing.T) {
runMetricExportTests(
t,
[]ExporterOption{
WithMetricExportKindSelector(
[]otlp.ExporterOption{
otlp.WithMetricExportKindSelector(
metricsdk.StatelessExportKindSelector(),
),
},
Expand Down Expand Up @@ -740,7 +741,7 @@ func TestStatelessExportKind(t *testing.T) {
}
}

func runMetricExportTests(t *testing.T, opts []ExporterOption, rs []record, expected []metricpb.ResourceMetrics) {
func runMetricExportTests(t *testing.T, opts []otlp.ExporterOption, rs []record, expected []metricpb.ResourceMetrics) {
exp, driver := newExporter(t, opts...)

recs := map[label.Distinct][]metricsdk.Record{}
Expand Down
2 changes: 1 addition & 1 deletion exporters/otlp/otlp_span_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package otlp
package otlp_test

import (
"context"
Expand Down
127 changes: 117 additions & 10 deletions exporters/otlp/otlp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package otlp
package otlp_test

import (
"context"
Expand All @@ -21,8 +21,10 @@ import (
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"go.opentelemetry.io/otel/exporters/otlp"
metricpb "go.opentelemetry.io/otel/exporters/otlp/internal/opentelemetry-proto-gen/metrics/v1"
tracepb "go.opentelemetry.io/otel/exporters/otlp/internal/opentelemetry-proto-gen/trace/v1"
"go.opentelemetry.io/otel/exporters/otlp/internal/transform"
Expand All @@ -32,31 +34,42 @@ import (
)

type stubProtocolDriver struct {
started int
stopped int
tracesExported int
metricsExported int

injectedStartError error
injectedStopError error

rm []metricpb.ResourceMetrics
rs []tracepb.ResourceSpans
}

var _ ProtocolDriver = (*stubProtocolDriver)(nil)
var _ otlp.ProtocolDriver = (*stubProtocolDriver)(nil)

func (m *stubProtocolDriver) Start(ctx context.Context) error {
m.started++
select {
case <-ctx.Done():
return ctx.Err()
default:
return nil
return m.injectedStartError
}
}

func (m *stubProtocolDriver) Stop(ctx context.Context) error {
m.stopped++
select {
case <-ctx.Done():
return ctx.Err()
default:
return nil
return m.injectedStopError
}
}

func (m *stubProtocolDriver) ExportMetrics(parent context.Context, cps metricsdk.CheckpointSet, selector metricsdk.ExportKindSelector) error {
m.metricsExported++
rms, err := transform.CheckpointSet(parent, selector, cps, 1)
if err != nil {
return err
Expand All @@ -71,6 +84,7 @@ func (m *stubProtocolDriver) ExportMetrics(parent context.Context, cps metricsdk
}

func (m *stubProtocolDriver) ExportTraces(ctx context.Context, ss []*tracesdk.SpanSnapshot) error {
m.tracesExported++
for _, rs := range transform.SpanData(ss) {
if rs == nil {
continue
Expand All @@ -85,9 +99,9 @@ func (m *stubProtocolDriver) Reset() {
m.rs = nil
}

func newExporter(t *testing.T, opts ...ExporterOption) (*Exporter, *stubProtocolDriver) {
func newExporter(t *testing.T, opts ...otlp.ExporterOption) (*otlp.Exporter, *stubProtocolDriver) {
driver := &stubProtocolDriver{}
exp, err := NewExporter(context.Background(), driver, opts...)
exp, err := otlp.NewExporter(context.Background(), driver, opts...)
require.NoError(t, err)
return exp, driver
}
Expand All @@ -96,7 +110,7 @@ func TestExporterShutdownHonorsTimeout(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute)
defer cancel()

e := NewUnstartedExporter(&stubProtocolDriver{})
e := otlp.NewUnstartedExporter(&stubProtocolDriver{})
if err := e.Start(ctx); err != nil {
t.Fatalf("failed to start exporter: %v", err)
}
Expand All @@ -115,7 +129,7 @@ func TestExporterShutdownHonorsCancel(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute)
defer cancel()

e := NewUnstartedExporter(&stubProtocolDriver{})
e := otlp.NewUnstartedExporter(&stubProtocolDriver{})
if err := e.Start(ctx); err != nil {
t.Fatalf("failed to start exporter: %v", err)
}
Expand All @@ -134,7 +148,7 @@ func TestExporterShutdownNoError(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute)
defer cancel()

e := NewUnstartedExporter(&stubProtocolDriver{})
e := otlp.NewUnstartedExporter(&stubProtocolDriver{})
if err := e.Start(ctx); err != nil {
t.Fatalf("failed to start exporter: %v", err)
}
Expand All @@ -146,7 +160,7 @@ func TestExporterShutdownNoError(t *testing.T) {

func TestExporterShutdownManyTimes(t *testing.T) {
ctx := context.Background()
e, err := NewExporter(ctx, &stubProtocolDriver{})
e, err := otlp.NewExporter(ctx, &stubProtocolDriver{})
if err != nil {
t.Fatalf("failed to start an exporter: %v", err)
}
Expand All @@ -170,3 +184,96 @@ func TestExporterShutdownManyTimes(t *testing.T) {
}
}
}

func TestSplitDriver(t *testing.T) {
driverTraces := &stubProtocolDriver{}
driverMetrics := &stubProtocolDriver{}
config := otlp.SplitConfig{
ForMetrics: driverMetrics,
ForTraces: driverTraces,
}
driver := otlp.NewSplitDriver(config)
ctx := context.Background()
assert.NoError(t, driver.Start(ctx))
assert.Equal(t, 1, driverTraces.started)
assert.Equal(t, 1, driverMetrics.started)
assert.Equal(t, 0, driverTraces.stopped)
assert.Equal(t, 0, driverMetrics.stopped)
assert.Equal(t, 0, driverTraces.tracesExported)
assert.Equal(t, 0, driverTraces.metricsExported)
assert.Equal(t, 0, driverMetrics.tracesExported)
assert.Equal(t, 0, driverMetrics.metricsExported)

assert.NoError(t, driver.ExportMetrics(ctx, discCheckpointSet{}, metricsdk.StatelessExportKindSelector()))
assert.NoError(t, driver.ExportTraces(ctx, []*tracesdk.SpanSnapshot{discSpanSnapshot()}))
assert.Len(t, driverTraces.rm, 0)
assert.Len(t, driverTraces.rs, 1)
assert.Len(t, driverMetrics.rm, 1)
assert.Len(t, driverMetrics.rs, 0)
assert.Equal(t, 1, driverTraces.tracesExported)
assert.Equal(t, 0, driverTraces.metricsExported)
assert.Equal(t, 0, driverMetrics.tracesExported)
assert.Equal(t, 1, driverMetrics.metricsExported)

assert.NoError(t, driver.Stop(ctx))
assert.Equal(t, 1, driverTraces.started)
assert.Equal(t, 1, driverMetrics.started)
assert.Equal(t, 1, driverTraces.stopped)
assert.Equal(t, 1, driverMetrics.stopped)
assert.Equal(t, 1, driverTraces.tracesExported)
assert.Equal(t, 0, driverTraces.metricsExported)
assert.Equal(t, 0, driverMetrics.tracesExported)
assert.Equal(t, 1, driverMetrics.metricsExported)
}

func TestSplitDriverFail(t *testing.T) {
ctx := context.Background()
for i := 0; i < 16; i++ {
var (
errStartMetric error
errStartTrace error
errStopMetric error
errStopTrace error
)
if (i & 1) != 0 {
errStartTrace = errors.New("trace start failed")
}
if (i & 2) != 0 {
errStopTrace = errors.New("trace stop failed")
}
if (i & 4) != 0 {
errStartMetric = errors.New("metric start failed")
}
if (i & 8) != 0 {
errStopMetric = errors.New("metric stop failed")
}
shouldStartFail := errStartTrace != nil || errStartMetric != nil
shouldStopFail := errStopTrace != nil || errStopMetric != nil

driverTraces := &stubProtocolDriver{
injectedStartError: errStartTrace,
injectedStopError: errStopTrace,
}
driverMetrics := &stubProtocolDriver{
injectedStartError: errStartMetric,
injectedStopError: errStopMetric,
}
config := otlp.SplitConfig{
ForMetrics: driverMetrics,
ForTraces: driverTraces,
}
driver := otlp.NewSplitDriver(config)
errStart := driver.Start(ctx)
if shouldStartFail {
assert.Error(t, errStart)
} else {
assert.NoError(t, errStart)
}
errStop := driver.Stop(ctx)
if shouldStopFail {
assert.Error(t, errStop)
} else {
assert.NoError(t, errStop)
}
}
}
81 changes: 81 additions & 0 deletions exporters/otlp/protocoldriver.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package otlp // import "go.opentelemetry.io/otel/exporters/otlp"

import (
"context"
"sync"

metricsdk "go.opentelemetry.io/otel/sdk/export/metric"
tracesdk "go.opentelemetry.io/otel/sdk/export/trace"
Expand Down Expand Up @@ -49,3 +50,83 @@ type ProtocolDriver interface {
// take this into account by doing proper locking.
ExportTraces(ctx context.Context, ss []*tracesdk.SpanSnapshot) error
}

type SplitConfig struct {
ForMetrics ProtocolDriver
ForTraces ProtocolDriver
}

type splitDriver struct {
metric ProtocolDriver
trace ProtocolDriver
}

var _ ProtocolDriver = (*splitDriver)(nil)

// NewSplitDriver creates a protocol driver which contains two other
// protocol drivers and will forward traces to one of them and metrics
// to another.
func NewSplitDriver(cfg SplitConfig) ProtocolDriver {
return &splitDriver{
metric: cfg.ForMetrics,
trace: cfg.ForTraces,
}
}

func (d *splitDriver) Start(ctx context.Context) error {
wg := sync.WaitGroup{}
wg.Add(2)
var (
metricErr error
traceErr error
)
go func() {
defer wg.Done()
metricErr = d.metric.Start(ctx)
}()
go func() {
defer wg.Done()
traceErr = d.trace.Start(ctx)
}()
wg.Wait()
if metricErr != nil {
return metricErr
}
if traceErr != nil {
return traceErr
}
return nil
}

func (d *splitDriver) Stop(ctx context.Context) error {
wg := sync.WaitGroup{}
wg.Add(2)
var (
metricErr error
traceErr error
)
go func() {
defer wg.Done()
metricErr = d.metric.Stop(ctx)
}()
go func() {
defer wg.Done()
traceErr = d.trace.Stop(ctx)
}()
wg.Wait()
if metricErr != nil {
return metricErr
}
if traceErr != nil {
return traceErr
}
return nil
}

func (d *splitDriver) ExportMetrics(ctx context.Context, cps metricsdk.CheckpointSet, selector metricsdk.ExportKindSelector) error {
return d.metric.ExportMetrics(ctx, cps, selector)
}

func (d *splitDriver) ExportTraces(ctx context.Context, ss []*tracesdk.SpanSnapshot) error {
return d.trace.ExportTraces(ctx, ss)
}

0 comments on commit 0b67c2f

Please sign in to comment.