diff --git a/cmd/otelcorecol/go.mod b/cmd/otelcorecol/go.mod index 1c6a55bed82..6f44482a27e 100644 --- a/cmd/otelcorecol/go.mod +++ b/cmd/otelcorecol/go.mod @@ -100,6 +100,7 @@ require ( go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v0.39.0 // indirect go.opentelemetry.io/otel/exporters/prometheus v0.39.0 // indirect go.opentelemetry.io/otel/exporters/stdout/stdoutmetric v0.39.0 // indirect + go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.16.0 // indirect go.opentelemetry.io/otel/metric v1.16.0 // indirect go.opentelemetry.io/otel/sdk v1.16.0 // indirect go.opentelemetry.io/otel/sdk/metric v0.39.0 // indirect diff --git a/cmd/otelcorecol/go.sum b/cmd/otelcorecol/go.sum index 5bd39f8151f..1ea52fcaae2 100644 --- a/cmd/otelcorecol/go.sum +++ b/cmd/otelcorecol/go.sum @@ -1038,6 +1038,8 @@ go.opentelemetry.io/otel/exporters/prometheus v0.39.0 h1:whAaiHxOatgtKd+w0dOi//1 go.opentelemetry.io/otel/exporters/prometheus v0.39.0/go.mod h1:4jo5Q4CROlCpSPsXLhymi+LYrDXd2ObU5wbKayfZs7Y= go.opentelemetry.io/otel/exporters/stdout/stdoutmetric v0.39.0 h1:fl2WmyenEf6LYYlfHAtCUEDyGcpwJNqD4dHGO7PVm4w= go.opentelemetry.io/otel/exporters/stdout/stdoutmetric v0.39.0/go.mod h1:csyQxQ0UHHKVA8KApS7eUO/klMO5sd/av5CNZNU4O6w= +go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.16.0 h1:+XWJd3jf75RXJq29mxbuXhCXFDG3S3R4vBUeSI2P7tE= +go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.16.0/go.mod h1:hqgzBPTf4yONMFgdZvL/bK42R/iinTyVQtiWihs3SZc= go.opentelemetry.io/otel/metric v1.16.0 h1:RbrpwVG1Hfv85LgnZ7+txXioPDoh6EdbZHo26Q3hqOo= go.opentelemetry.io/otel/metric v1.16.0/go.mod h1:QE47cpOmkwipPiefDwo2wDzwJrlfxxNYodqc4xnGCo4= go.opentelemetry.io/otel/sdk v1.16.0 h1:Z1Ok1YsijYL0CSJpHt4cS3wDDh7p572grzNrBMiMWgE= diff --git a/go.mod b/go.mod index db0976e308b..ac880cf832c 100644 --- a/go.mod +++ b/go.mod @@ -33,6 +33,7 @@ require ( go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v0.39.0 go.opentelemetry.io/otel/exporters/prometheus v0.39.0 go.opentelemetry.io/otel/exporters/stdout/stdoutmetric v0.39.0 + go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.16.0 go.opentelemetry.io/otel/metric v1.16.0 go.opentelemetry.io/otel/sdk v1.16.0 go.opentelemetry.io/otel/sdk/metric v0.39.0 diff --git a/go.sum b/go.sum index 2004ffca129..c9185db0d5e 100644 --- a/go.sum +++ b/go.sum @@ -438,6 +438,8 @@ go.opentelemetry.io/otel/exporters/prometheus v0.39.0 h1:whAaiHxOatgtKd+w0dOi//1 go.opentelemetry.io/otel/exporters/prometheus v0.39.0/go.mod h1:4jo5Q4CROlCpSPsXLhymi+LYrDXd2ObU5wbKayfZs7Y= go.opentelemetry.io/otel/exporters/stdout/stdoutmetric v0.39.0 h1:fl2WmyenEf6LYYlfHAtCUEDyGcpwJNqD4dHGO7PVm4w= go.opentelemetry.io/otel/exporters/stdout/stdoutmetric v0.39.0/go.mod h1:csyQxQ0UHHKVA8KApS7eUO/klMO5sd/av5CNZNU4O6w= +go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.16.0 h1:+XWJd3jf75RXJq29mxbuXhCXFDG3S3R4vBUeSI2P7tE= +go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.16.0/go.mod h1:hqgzBPTf4yONMFgdZvL/bK42R/iinTyVQtiWihs3SZc= go.opentelemetry.io/otel/metric v1.16.0 h1:RbrpwVG1Hfv85LgnZ7+txXioPDoh6EdbZHo26Q3hqOo= go.opentelemetry.io/otel/metric v1.16.0/go.mod h1:QE47cpOmkwipPiefDwo2wDzwJrlfxxNYodqc4xnGCo4= go.opentelemetry.io/otel/sdk v1.16.0 h1:Z1Ok1YsijYL0CSJpHt4cS3wDDh7p572grzNrBMiMWgE= diff --git a/service/internal/proctelemetry/config.go b/service/internal/proctelemetry/config.go index 1e759dbe90c..a2ec828bfbf 100644 --- a/service/internal/proctelemetry/config.go +++ b/service/internal/proctelemetry/config.go @@ -22,10 +22,12 @@ import ( "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp" otelprom "go.opentelemetry.io/otel/exporters/prometheus" "go.opentelemetry.io/otel/exporters/stdout/stdoutmetric" + "go.opentelemetry.io/otel/exporters/stdout/stdouttrace" "go.opentelemetry.io/otel/sdk/instrumentation" sdkmetric "go.opentelemetry.io/otel/sdk/metric" "go.opentelemetry.io/otel/sdk/metric/aggregation" "go.opentelemetry.io/otel/sdk/resource" + sdktrace "go.opentelemetry.io/otel/sdk/trace" "go.opentelemetry.io/collector/obsreport" semconv "go.opentelemetry.io/collector/semconv/v1.18.0" @@ -58,6 +60,9 @@ var ( attribute.String(semconv.AttributeNetHostName, ""), attribute.String(semconv.AttributeNetHostPort, ""), } + + errNoValidMetricExporter = errors.New("no valid metric exporter") + errNoValidSpanExporter = errors.New("no valid span exporter") ) func InitMetricReader(ctx context.Context, reader telemetry.MetricReader, asyncErrorChannel chan error) (sdkmetric.Reader, *http.Server, error) { @@ -78,6 +83,56 @@ func InitMetricReader(ctx context.Context, reader telemetry.MetricReader, asyncE return nil, nil, fmt.Errorf("unsupported metric reader type %v", reader) } +func InitSpanProcessor(ctx context.Context, processor telemetry.SpanProcessor) (sdktrace.SpanProcessor, error) { + if processor.Batch != nil { + if processor.Batch.Exporter.Console != nil { + exp, err := stdouttrace.New( + stdouttrace.WithPrettyPrint(), + ) + if err != nil { + return nil, err + } + opts := []sdktrace.BatchSpanProcessorOption{} + if processor.Batch.ExportTimeout != nil { + if *processor.Batch.ExportTimeout < 0 { + return nil, fmt.Errorf("invalid export timeout %d", *processor.Batch.ExportTimeout) + } + opts = append(opts, sdktrace.WithExportTimeout(time.Millisecond*time.Duration(*processor.Batch.ExportTimeout))) + } + if processor.Batch.MaxExportBatchSize != nil { + if *processor.Batch.MaxExportBatchSize < 0 { + return nil, fmt.Errorf("invalid batch size %d", *processor.Batch.MaxExportBatchSize) + } + opts = append(opts, sdktrace.WithMaxExportBatchSize(*processor.Batch.MaxExportBatchSize)) + } + if processor.Batch.MaxQueueSize != nil { + if *processor.Batch.MaxQueueSize < 0 { + return nil, fmt.Errorf("invalid queue size %d", *processor.Batch.MaxQueueSize) + } + opts = append(opts, sdktrace.WithMaxQueueSize(*processor.Batch.MaxQueueSize)) + } + if processor.Batch.ScheduleDelay != nil { + if *processor.Batch.ScheduleDelay < 0 { + return nil, fmt.Errorf("invalid schedule delay %d", *processor.Batch.ScheduleDelay) + } + opts = append(opts, sdktrace.WithBatchTimeout(time.Millisecond*time.Duration(*processor.Batch.ScheduleDelay))) + } + return sdktrace.NewBatchSpanProcessor(exp, opts...), nil + } + return nil, errNoValidSpanExporter + } + return nil, fmt.Errorf("unsupported span processor type %v", processor) +} + +func InitTracerProvider(res *resource.Resource, options []sdktrace.TracerProviderOption) (*sdktrace.TracerProvider, error) { + opts := []sdktrace.TracerProviderOption{ + sdktrace.WithResource(res), + } + + opts = append(opts, options...) + return sdktrace.NewTracerProvider(opts...), nil +} + func InitOpenTelemetry(res *resource.Resource, options []sdkmetric.Option, disableHighCardinality bool) (*sdkmetric.MeterProvider, error) { opts := []sdkmetric.Option{ sdkmetric.WithResource(res), @@ -175,7 +230,7 @@ func initPullExporter(exporter telemetry.MetricExporter, asyncErrorChannel chan if exporter.Prometheus != nil { return initPrometheusExporter(exporter.Prometheus, asyncErrorChannel) } - return nil, nil, fmt.Errorf("no valid exporter") + return nil, nil, errNoValidMetricExporter } func initPeriodicExporter(ctx context.Context, exporter telemetry.MetricExporter, opts ...sdkmetric.PeriodicReaderOption) (sdkmetric.Reader, *http.Server, error) { @@ -207,7 +262,7 @@ func initPeriodicExporter(ctx context.Context, exporter telemetry.MetricExporter } return sdkmetric.NewPeriodicReader(exp, opts...), nil, nil } - return nil, nil, fmt.Errorf("no valid exporter") + return nil, nil, errNoValidMetricExporter } func normalizeEndpoint(endpoint string) string { diff --git a/service/internal/proctelemetry/config_test.go b/service/internal/proctelemetry/config_test.go index 4c3df39e5af..1a1fec4e4b6 100644 --- a/service/internal/proctelemetry/config_test.go +++ b/service/internal/proctelemetry/config_test.go @@ -42,7 +42,7 @@ func TestMetricReader(t *testing.T) { }, }, }, - err: errors.New("no valid exporter"), + err: errNoValidMetricExporter, }, { name: "pull/prometheus-invalid-config-no-host", @@ -93,14 +93,14 @@ func TestMetricReader(t *testing.T) { }, }, }, - err: errors.New("no valid exporter"), + err: errNoValidMetricExporter, }, { name: "periodic/no-exporter", reader: telemetry.MetricReader{ Periodic: &telemetry.PeriodicMetricReader{}, }, - err: errors.New("no valid exporter"), + err: errNoValidMetricExporter, }, { name: "periodic/console-exporter", @@ -344,3 +344,94 @@ func TestMetricReader(t *testing.T) { }) } } + +func TestSpanProcessor(t *testing.T) { + testCases := []struct { + name string + processor telemetry.SpanProcessor + args any + err error + }{ + { + name: "no processor", + err: errors.New("unsupported span processor type { }"), + }, + { + name: "batch processor invalid exporter", + processor: telemetry.SpanProcessor{ + Batch: &telemetry.BatchSpanProcessor{ + Exporter: telemetry.SpanExporter{}, + }, + }, + err: errNoValidSpanExporter, + }, + { + name: "batch processor invalid batch size console exporter", + processor: telemetry.SpanProcessor{ + Batch: &telemetry.BatchSpanProcessor{ + MaxExportBatchSize: intPtr(-1), + Exporter: telemetry.SpanExporter{ + Console: telemetry.Console{}, + }, + }, + }, + err: errors.New("invalid batch size -1"), + }, + { + name: "batch processor invalid export timeout console exporter", + processor: telemetry.SpanProcessor{ + Batch: &telemetry.BatchSpanProcessor{ + ExportTimeout: intPtr(-2), + Exporter: telemetry.SpanExporter{ + Console: telemetry.Console{}, + }, + }, + }, + err: errors.New("invalid export timeout -2"), + }, + { + name: "batch processor invalid queue size console exporter", + processor: telemetry.SpanProcessor{ + Batch: &telemetry.BatchSpanProcessor{ + MaxQueueSize: intPtr(-3), + Exporter: telemetry.SpanExporter{ + Console: telemetry.Console{}, + }, + }, + }, + err: errors.New("invalid queue size -3"), + }, + { + name: "batch processor invalid schedule delay console exporter", + processor: telemetry.SpanProcessor{ + Batch: &telemetry.BatchSpanProcessor{ + ScheduleDelay: intPtr(-4), + Exporter: telemetry.SpanExporter{ + Console: telemetry.Console{}, + }, + }, + }, + err: errors.New("invalid schedule delay -4"), + }, + { + name: "batch processor console exporter", + processor: telemetry.SpanProcessor{ + Batch: &telemetry.BatchSpanProcessor{ + MaxExportBatchSize: intPtr(0), + ExportTimeout: intPtr(0), + MaxQueueSize: intPtr(0), + ScheduleDelay: intPtr(0), + Exporter: telemetry.SpanExporter{ + Console: telemetry.Console{}, + }, + }, + }, + }, + } + for _, tt := range testCases { + t.Run(tt.name, func(t *testing.T) { + _, err := InitSpanProcessor(context.Background(), tt.processor) + assert.Equal(t, tt.err, err) + }) + } +} diff --git a/service/service.go b/service/service.go index 0235f547c40..318f99a868c 100644 --- a/service/service.go +++ b/service/service.go @@ -112,6 +112,7 @@ func New(ctx context.Context, set Settings, cfg Config) (*Service, error) { return nil, fmt.Errorf("failed to initialize telemetry: %w", err) } srv.telemetrySettings.MeterProvider = srv.telemetryInitializer.mp + srv.telemetrySettings.TracerProvider = srv.telemetryInitializer.tp // process the configuration and initialize the pipeline if err = srv.initExtensionsAndPipeline(ctx, set, cfg); err != nil { diff --git a/service/telemetry.go b/service/telemetry.go index 519e6bf987a..f99d9af77e3 100644 --- a/service/telemetry.go +++ b/service/telemetry.go @@ -20,10 +20,12 @@ import ( "go.opentelemetry.io/contrib/propagators/b3" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/metric" - "go.opentelemetry.io/otel/metric/noop" + noopmetric "go.opentelemetry.io/otel/metric/noop" "go.opentelemetry.io/otel/propagation" sdkmetric "go.opentelemetry.io/otel/sdk/metric" "go.opentelemetry.io/otel/sdk/resource" + sdktrace "go.opentelemetry.io/otel/sdk/trace" + "go.opentelemetry.io/otel/trace" "go.uber.org/multierr" "go.uber.org/zap" @@ -51,6 +53,7 @@ type telemetryInitializer struct { views []*view.View ocRegistry *ocmetric.Registry mp metric.MeterProvider + tp trace.TracerProvider servers []*http.Server useOtel bool @@ -60,7 +63,8 @@ type telemetryInitializer struct { func newColTelemetry(useOtel bool, disableHighCardinality bool, extendedConfig bool) *telemetryInitializer { return &telemetryInitializer{ - mp: noop.NewMeterProvider(), + mp: noopmetric.NewMeterProvider(), + tp: trace.NewNoopTracerProvider(), useOtel: useOtel, disableHighCardinality: disableHighCardinality, extendedConfig: extendedConfig, @@ -79,6 +83,12 @@ func (tel *telemetryInitializer) init(res *resource.Resource, settings component settings.Logger.Info("Setting up own telemetry...") + if tp, err := tel.initTraces(res, settings.Logger, cfg); err == nil { + tel.tp = tp + } else { + return err + } + if tp, err := textMapPropagatorFromConfig(cfg.Traces.Propagators); err == nil { otel.SetTextMapPropagator(tp) } else { @@ -88,6 +98,18 @@ func (tel *telemetryInitializer) init(res *resource.Resource, settings component return tel.initMetrics(res, settings.Logger, cfg, asyncErrorChannel) } +func (tel *telemetryInitializer) initTraces(res *resource.Resource, logger *zap.Logger, cfg telemetry.Config) (trace.TracerProvider, error) { + opts := []sdktrace.TracerProviderOption{} + for _, processor := range cfg.Traces.Processors { + sp, err := proctelemetry.InitSpanProcessor(context.Background(), processor) + if err != nil { + return nil, err + } + opts = append(opts, sdktrace.WithSpanProcessor(sp)) + } + return proctelemetry.InitTracerProvider(res, opts) +} + func (tel *telemetryInitializer) initMetrics(res *resource.Resource, logger *zap.Logger, cfg telemetry.Config, asyncErrorChannel chan error) error { // Initialize the ocRegistry, still used by the process metrics. tel.ocRegistry = ocmetric.NewRegistry() diff --git a/service/telemetry/config.go b/service/telemetry/config.go index 3bda2956fbe..ac91a6494b0 100644 --- a/service/telemetry/config.go +++ b/service/telemetry/config.go @@ -120,6 +120,9 @@ type TracesConfig struct { // tracecontext and b3 are supported. By default, the value is set to empty list and // context propagation is disabled. Propagators []string `mapstructure:"propagators"` + // Processors allow configuration of span processors to emit spans to + // any number of suported backends. + Processors []SpanProcessor `mapstructure:"processors"` } // Validate checks whether the current configuration is valid @@ -132,6 +135,29 @@ func (c *Config) Validate() error { return nil } +func (sp *SpanProcessor) Unmarshal(conf *confmap.Conf) error { + if !obsreportconfig.UseOtelWithSDKConfigurationForInternalTelemetryFeatureGate.IsEnabled() { + // only unmarshal if feature gate is enabled + return nil + } + + if conf == nil { + return nil + } + + if err := conf.Unmarshal(sp); err != nil { + return fmt.Errorf("invalid span processor configuration: %w", err) + } + + if sp.Batch != nil { + if sp.Batch.Exporter.Console == nil { + return fmt.Errorf("invalid exporter configuration") + } + return nil + } + return fmt.Errorf("unsupported span processor type %s", conf.AllKeys()) +} + func (mr *MetricReader) Unmarshal(conf *confmap.Conf) error { if !obsreportconfig.UseOtelWithSDKConfigurationForInternalTelemetryFeatureGate.IsEnabled() { // only unmarshal if feature gate is enabled diff --git a/service/telemetry/config_test.go b/service/telemetry/config_test.go index bcc464982cc..1e1a4cd8882 100644 --- a/service/telemetry/config_test.go +++ b/service/telemetry/config_test.go @@ -171,3 +171,59 @@ func TestUnmarshalMetricReader(t *testing.T) { }) } } + +func TestUnmarshalSpanProcessor(t *testing.T) { + defer setFeatureGateForTest(t, obsreportconfig.UseOtelWithSDKConfigurationForInternalTelemetryFeatureGate, true)() + tests := []struct { + name string + cfg *confmap.Conf + err string + }{ + { + name: "invalid config", + cfg: confmap.NewFromStringMap(map[string]any{"invalid": "invalid"}), + err: "unsupported span processor type [invalid]", + }, + { + name: "nil config, nothing to do", + }, + { + name: "invalid batch processor type with valid console exporter", + cfg: confmap.NewFromStringMap(map[string]any{"thing": BatchSpanProcessor{ + Exporter: SpanExporter{ + Console: Console{}, + }, + }}), + err: "unsupported span processor type [thing]", + }, + { + name: "valid batch processor, invalid config", + cfg: confmap.NewFromStringMap(map[string]any{"batch": "garbage"}), + err: "invalid span processor configuration", + }, + { + name: "valid batch processor, no exporter", + cfg: confmap.NewFromStringMap(map[string]any{"batch": BatchSpanProcessor{}}), + err: "invalid exporter configuration", + }, + { + name: "valid batch processor, valid console exporter", + cfg: confmap.NewFromStringMap(map[string]any{"batch": BatchSpanProcessor{ + Exporter: SpanExporter{ + Console: Console{}, + }, + }}), + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + processor := SpanProcessor{} + err := processor.Unmarshal(tt.cfg) + if len(tt.err) > 0 { + assert.ErrorContains(t, err, tt.err) + } else { + assert.NoError(t, err) + } + }) + } +}