diff --git a/exporters/otlp/otlp_metric_test.go b/exporters/otlp/otlp_metric_test.go index 660d17ecec5..9d804b84df4 100644 --- a/exporters/otlp/otlp_metric_test.go +++ b/exporters/otlp/otlp_metric_test.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package otlp +package otlp_test import ( "context" @@ -23,6 +23,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel/exporters/otlp" commonpb "go.opentelemetry.io/otel/exporters/otlp/internal/opentelemetry-proto-gen/common/v1" metricpb "go.opentelemetry.io/otel/exporters/otlp/internal/opentelemetry-proto-gen/metrics/v1" resourcepb "go.opentelemetry.io/otel/exporters/otlp/internal/opentelemetry-proto-gen/resource/v1" @@ -692,8 +693,8 @@ func TestStatelessExportKind(t *testing.T) { t.Run(k.name, func(t *testing.T) { runMetricExportTests( t, - []ExporterOption{ - WithMetricExportKindSelector( + []otlp.ExporterOption{ + otlp.WithMetricExportKindSelector( metricsdk.StatelessExportKindSelector(), ), }, @@ -740,7 +741,7 @@ func TestStatelessExportKind(t *testing.T) { } } -func runMetricExportTests(t *testing.T, opts []ExporterOption, rs []record, expected []metricpb.ResourceMetrics) { +func runMetricExportTests(t *testing.T, opts []otlp.ExporterOption, rs []record, expected []metricpb.ResourceMetrics) { exp, driver := newExporter(t, opts...) recs := map[label.Distinct][]metricsdk.Record{} diff --git a/exporters/otlp/otlp_span_test.go b/exporters/otlp/otlp_span_test.go index c2c6aa6c935..b446fc0e39b 100644 --- a/exporters/otlp/otlp_span_test.go +++ b/exporters/otlp/otlp_span_test.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package otlp +package otlp_test import ( "context" diff --git a/exporters/otlp/otlp_test.go b/exporters/otlp/otlp_test.go index 1a10f5e281b..470c465aae1 100644 --- a/exporters/otlp/otlp_test.go +++ b/exporters/otlp/otlp_test.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package otlp +package otlp_test import ( "context" @@ -21,8 +21,10 @@ import ( "testing" "time" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel/exporters/otlp" metricpb "go.opentelemetry.io/otel/exporters/otlp/internal/opentelemetry-proto-gen/metrics/v1" tracepb "go.opentelemetry.io/otel/exporters/otlp/internal/opentelemetry-proto-gen/trace/v1" "go.opentelemetry.io/otel/exporters/otlp/internal/transform" @@ -32,31 +34,42 @@ import ( ) type stubProtocolDriver struct { + started int + stopped int + tracesExported int + metricsExported int + + injectedStartError error + injectedStopError error + rm []metricpb.ResourceMetrics rs []tracepb.ResourceSpans } -var _ ProtocolDriver = (*stubProtocolDriver)(nil) +var _ otlp.ProtocolDriver = (*stubProtocolDriver)(nil) func (m *stubProtocolDriver) Start(ctx context.Context) error { + m.started++ select { case <-ctx.Done(): return ctx.Err() default: - return nil + return m.injectedStartError } } func (m *stubProtocolDriver) Stop(ctx context.Context) error { + m.stopped++ select { case <-ctx.Done(): return ctx.Err() default: - return nil + return m.injectedStopError } } func (m *stubProtocolDriver) ExportMetrics(parent context.Context, cps metricsdk.CheckpointSet, selector metricsdk.ExportKindSelector) error { + m.metricsExported++ rms, err := transform.CheckpointSet(parent, selector, cps, 1) if err != nil { return err @@ -71,6 +84,7 @@ func (m *stubProtocolDriver) ExportMetrics(parent context.Context, cps metricsdk } func (m *stubProtocolDriver) ExportTraces(ctx context.Context, ss []*tracesdk.SpanSnapshot) error { + m.tracesExported++ for _, rs := range transform.SpanData(ss) { if rs == nil { continue @@ -85,9 +99,9 @@ func (m *stubProtocolDriver) Reset() { m.rs = nil } -func newExporter(t *testing.T, opts ...ExporterOption) (*Exporter, *stubProtocolDriver) { +func newExporter(t *testing.T, opts ...otlp.ExporterOption) (*otlp.Exporter, *stubProtocolDriver) { driver := &stubProtocolDriver{} - exp, err := NewExporter(context.Background(), driver, opts...) + exp, err := otlp.NewExporter(context.Background(), driver, opts...) require.NoError(t, err) return exp, driver } @@ -96,7 +110,7 @@ func TestExporterShutdownHonorsTimeout(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute) defer cancel() - e := NewUnstartedExporter(&stubProtocolDriver{}) + e := otlp.NewUnstartedExporter(&stubProtocolDriver{}) if err := e.Start(ctx); err != nil { t.Fatalf("failed to start exporter: %v", err) } @@ -115,7 +129,7 @@ func TestExporterShutdownHonorsCancel(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute) defer cancel() - e := NewUnstartedExporter(&stubProtocolDriver{}) + e := otlp.NewUnstartedExporter(&stubProtocolDriver{}) if err := e.Start(ctx); err != nil { t.Fatalf("failed to start exporter: %v", err) } @@ -134,7 +148,7 @@ func TestExporterShutdownNoError(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute) defer cancel() - e := NewUnstartedExporter(&stubProtocolDriver{}) + e := otlp.NewUnstartedExporter(&stubProtocolDriver{}) if err := e.Start(ctx); err != nil { t.Fatalf("failed to start exporter: %v", err) } @@ -146,7 +160,7 @@ func TestExporterShutdownNoError(t *testing.T) { func TestExporterShutdownManyTimes(t *testing.T) { ctx := context.Background() - e, err := NewExporter(ctx, &stubProtocolDriver{}) + e, err := otlp.NewExporter(ctx, &stubProtocolDriver{}) if err != nil { t.Fatalf("failed to start an exporter: %v", err) } @@ -170,3 +184,96 @@ func TestExporterShutdownManyTimes(t *testing.T) { } } } + +func TestSplitDriver(t *testing.T) { + driverTraces := &stubProtocolDriver{} + driverMetrics := &stubProtocolDriver{} + config := otlp.SplitConfig{ + ForMetrics: driverMetrics, + ForTraces: driverTraces, + } + driver := otlp.NewSplitDriver(config) + ctx := context.Background() + assert.NoError(t, driver.Start(ctx)) + assert.Equal(t, 1, driverTraces.started) + assert.Equal(t, 1, driverMetrics.started) + assert.Equal(t, 0, driverTraces.stopped) + assert.Equal(t, 0, driverMetrics.stopped) + assert.Equal(t, 0, driverTraces.tracesExported) + assert.Equal(t, 0, driverTraces.metricsExported) + assert.Equal(t, 0, driverMetrics.tracesExported) + assert.Equal(t, 0, driverMetrics.metricsExported) + + assert.NoError(t, driver.ExportMetrics(ctx, discCheckpointSet{}, metricsdk.StatelessExportKindSelector())) + assert.NoError(t, driver.ExportTraces(ctx, []*tracesdk.SpanSnapshot{discSpanSnapshot()})) + assert.Len(t, driverTraces.rm, 0) + assert.Len(t, driverTraces.rs, 1) + assert.Len(t, driverMetrics.rm, 1) + assert.Len(t, driverMetrics.rs, 0) + assert.Equal(t, 1, driverTraces.tracesExported) + assert.Equal(t, 0, driverTraces.metricsExported) + assert.Equal(t, 0, driverMetrics.tracesExported) + assert.Equal(t, 1, driverMetrics.metricsExported) + + assert.NoError(t, driver.Stop(ctx)) + assert.Equal(t, 1, driverTraces.started) + assert.Equal(t, 1, driverMetrics.started) + assert.Equal(t, 1, driverTraces.stopped) + assert.Equal(t, 1, driverMetrics.stopped) + assert.Equal(t, 1, driverTraces.tracesExported) + assert.Equal(t, 0, driverTraces.metricsExported) + assert.Equal(t, 0, driverMetrics.tracesExported) + assert.Equal(t, 1, driverMetrics.metricsExported) +} + +func TestSplitDriverFail(t *testing.T) { + ctx := context.Background() + for i := 0; i < 16; i++ { + var ( + errStartMetric error + errStartTrace error + errStopMetric error + errStopTrace error + ) + if (i & 1) != 0 { + errStartTrace = errors.New("trace start failed") + } + if (i & 2) != 0 { + errStopTrace = errors.New("trace stop failed") + } + if (i & 4) != 0 { + errStartMetric = errors.New("metric start failed") + } + if (i & 8) != 0 { + errStopMetric = errors.New("metric stop failed") + } + shouldStartFail := errStartTrace != nil || errStartMetric != nil + shouldStopFail := errStopTrace != nil || errStopMetric != nil + + driverTraces := &stubProtocolDriver{ + injectedStartError: errStartTrace, + injectedStopError: errStopTrace, + } + driverMetrics := &stubProtocolDriver{ + injectedStartError: errStartMetric, + injectedStopError: errStopMetric, + } + config := otlp.SplitConfig{ + ForMetrics: driverMetrics, + ForTraces: driverTraces, + } + driver := otlp.NewSplitDriver(config) + errStart := driver.Start(ctx) + if shouldStartFail { + assert.Error(t, errStart) + } else { + assert.NoError(t, errStart) + } + errStop := driver.Stop(ctx) + if shouldStopFail { + assert.Error(t, errStop) + } else { + assert.NoError(t, errStop) + } + } +} diff --git a/exporters/otlp/protocoldriver.go b/exporters/otlp/protocoldriver.go index c4992ba1e93..dc0ea4fc2bf 100644 --- a/exporters/otlp/protocoldriver.go +++ b/exporters/otlp/protocoldriver.go @@ -16,6 +16,7 @@ package otlp // import "go.opentelemetry.io/otel/exporters/otlp" import ( "context" + "sync" metricsdk "go.opentelemetry.io/otel/sdk/export/metric" tracesdk "go.opentelemetry.io/otel/sdk/export/trace" @@ -49,3 +50,83 @@ type ProtocolDriver interface { // take this into account by doing proper locking. ExportTraces(ctx context.Context, ss []*tracesdk.SpanSnapshot) error } + +type SplitConfig struct { + ForMetrics ProtocolDriver + ForTraces ProtocolDriver +} + +type splitDriver struct { + metric ProtocolDriver + trace ProtocolDriver +} + +var _ ProtocolDriver = (*splitDriver)(nil) + +// NewSplitDriver creates a protocol driver which contains two other +// protocol drivers and will forward traces to one of them and metrics +// to another. +func NewSplitDriver(cfg SplitConfig) ProtocolDriver { + return &splitDriver{ + metric: cfg.ForMetrics, + trace: cfg.ForTraces, + } +} + +func (d *splitDriver) Start(ctx context.Context) error { + wg := sync.WaitGroup{} + wg.Add(2) + var ( + metricErr error + traceErr error + ) + go func() { + defer wg.Done() + metricErr = d.metric.Start(ctx) + }() + go func() { + defer wg.Done() + traceErr = d.trace.Start(ctx) + }() + wg.Wait() + if metricErr != nil { + return metricErr + } + if traceErr != nil { + return traceErr + } + return nil +} + +func (d *splitDriver) Stop(ctx context.Context) error { + wg := sync.WaitGroup{} + wg.Add(2) + var ( + metricErr error + traceErr error + ) + go func() { + defer wg.Done() + metricErr = d.metric.Stop(ctx) + }() + go func() { + defer wg.Done() + traceErr = d.trace.Stop(ctx) + }() + wg.Wait() + if metricErr != nil { + return metricErr + } + if traceErr != nil { + return traceErr + } + return nil +} + +func (d *splitDriver) ExportMetrics(ctx context.Context, cps metricsdk.CheckpointSet, selector metricsdk.ExportKindSelector) error { + return d.metric.ExportMetrics(ctx, cps, selector) +} + +func (d *splitDriver) ExportTraces(ctx context.Context, ss []*tracesdk.SpanSnapshot) error { + return d.trace.ExportTraces(ctx, ss) +}