diff --git a/core/metrics/README.md b/core/metrics/README.md index 166e993961..5f60dfbfe6 100644 --- a/core/metrics/README.md +++ b/core/metrics/README.md @@ -28,7 +28,9 @@ The additional environment variables to note are: | `OTEL_EXPORTER_OTLP_TRANSPORT_3` | The transport protocol for the third additional OTLP exporter | `http` | | ... | Additional transports can be specified by incrementing the number | `http` | - +You can do the same thing for `OTEL_EXPORTER_OTLP_SECURE_MODE` and `OTEL_EXPORTER_OTLP_HEADERS` + + Note: The OTLP exporter endpoints and transports can be specified for multiple exporters by using incrementing numbers (1, 2, 3, etc.) in the environment variable names. This allows for configuration of multiple OTLP exporters. The primary exporter uses the base names without numbers. diff --git a/core/metrics/multiexporter.go b/core/metrics/multiexporter.go index b21265fee4..cbd59495e4 100644 --- a/core/metrics/multiexporter.go +++ b/core/metrics/multiexporter.go @@ -3,9 +3,11 @@ package metrics import ( "context" "fmt" + "go.uber.org/multierr" + "sync" + "time" "go.opentelemetry.io/otel/sdk/trace" - tracesdk "go.opentelemetry.io/otel/sdk/trace" ) // MultiExporter is an interface that allows exporting spans to multiple OTLP trace exporters. @@ -27,26 +29,50 @@ func NewMultiExporter(exporters ...trace.SpanExporter) MultiExporter { } } +const defaultTimeout = 30 * time.Second + // ExportSpans exports a batch of spans. -func (m *multiExporter) ExportSpans(ctx context.Context, ss []trace.ReadOnlySpan) error { +func (m *multiExporter) ExportSpans(parentCtx context.Context, ss []trace.ReadOnlySpan) error { + return m.doParallel(parentCtx, func(ctx context.Context, exporter trace.SpanExporter) error { + return exporter.ExportSpans(ctx, ss) + }) +} + +func (m *multiExporter) doParallel(parentCtx context.Context, fn func(context.Context, trace.SpanExporter) error) error { + ctx, cancel := context.WithTimeout(parentCtx, defaultTimeout) + defer cancel() + + var wg sync.WaitGroup + var errors []error + var mu sync.Mutex + + wg.Add(len(m.exporters)) for _, exporter := range m.exporters { - err := exporter.ExportSpans(ctx, ss) - if err != nil { - return fmt.Errorf("could not export spans: %w", err) - } + go func(exporter trace.SpanExporter) { + defer wg.Done() + err := fn(ctx, exporter) + if err != nil { + mu.Lock() + errors = append(errors, fmt.Errorf("error in doMultiple: %w", err)) + mu.Unlock() + } + }(exporter) } + + wg.Wait() + if len(errors) > 0 { + // nolint: wrapcheck + return multierr.Combine(errors...) + } + return nil } // Shutdown notifies the exporter of a pending halt to operations. func (m *multiExporter) Shutdown(ctx context.Context) error { - for _, exporter := range m.exporters { - err := exporter.Shutdown(ctx) - if err != nil { - return fmt.Errorf("could not stop exporter: %w", err) - } - } - return nil + return m.doParallel(ctx, func(ctx context.Context, exporter trace.SpanExporter) error { + return exporter.Shutdown(ctx) + }) } // AddExporter adds an exporter to the multi exporter. @@ -54,4 +80,4 @@ func (m *multiExporter) AddExporter(exporter trace.SpanExporter) { m.exporters = append(m.exporters, exporter) } -var _ tracesdk.SpanExporter = &multiExporter{} +var _ trace.SpanExporter = &multiExporter{} diff --git a/core/metrics/otlp.go b/core/metrics/otlp.go index 72f21f651c..3dd747f752 100644 --- a/core/metrics/otlp.go +++ b/core/metrics/otlp.go @@ -51,7 +51,7 @@ func (n *otlpHandler) Start(ctx context.Context) (err error) { exporter, err := makeOTLPExporter(ctx, envSuffix) if err != nil { - return fmt.Errorf("could not create exporter %d: %v", i, err) + return fmt.Errorf("could not create exporter %d: %w", i, err) } exporters = append(exporters, exporter) @@ -163,10 +163,10 @@ func makeOTLPExporter(ctx context.Context, envSuffix string) (*otlptrace.Exporte oteltraceClient, err := buildClientFromTransport( transport, - WithURL(url), + withURL(url), // defaults to true - WithSecure(secure), - WithHeaders(headers), + withSecure(secure), + withHeaders(headers), ) if err != nil { return nil, fmt.Errorf("could not create client from transport: %w", err) @@ -211,7 +211,7 @@ type transportOptions struct { // only one will be used in creating the actual client. type Option func(*transportOptions) error -func WithURL(url string) Option { +func withURL(url string) Option { return func(o *transportOptions) error { o.httpOptions = append(o.httpOptions, otlptracehttp.WithEndpointURL(url)) o.grpcOptions = append(o.grpcOptions, otlptracegrpc.WithEndpointURL(url)) @@ -220,7 +220,7 @@ func WithURL(url string) Option { } } -func WithSecure(secure bool) Option { +func withSecure(secure bool) Option { return func(o *transportOptions) error { if secure { tlsCreds := credentials.NewClientTLSFromCert(nil, "") @@ -228,6 +228,7 @@ func WithSecure(secure bool) Option { o.grpcOptions = append(o.grpcOptions, otlptracegrpc.WithTLSCredentials(tlsCreds)) tlsConfig := &tls.Config{ + MinVersion: tls.VersionTLS12, // RootCAs is nil, which means the default system root CAs are used } o.httpOptions = append(o.httpOptions, otlptracehttp.WithTLSClientConfig(tlsConfig)) @@ -240,7 +241,7 @@ func WithSecure(secure bool) Option { } } -func WithHeaders(headers string) Option { +func withHeaders(headers string) Option { return func(o *transportOptions) error { realHeaders := headersToMap(headers) o.httpOptions = append(o.httpOptions, otlptracehttp.WithHeaders(realHeaders))