Skip to content

Commit

Permalink
add span processor configuration
Browse files Browse the repository at this point in the history
This PR adds support for configuring batch span processor for internal
collector traces. It adds support to export them via the stdout exporter.

Signed-off-by: Alex Boten <[email protected]>
  • Loading branch information
Alex Boten committed Jul 20, 2023
1 parent b1d0f13 commit 1f30fae
Show file tree
Hide file tree
Showing 10 changed files with 264 additions and 7 deletions.
1 change: 1 addition & 0 deletions cmd/otelcorecol/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ require (
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v0.39.0 // indirect
go.opentelemetry.io/otel/exporters/prometheus v0.39.0 // indirect
go.opentelemetry.io/otel/exporters/stdout/stdoutmetric v0.39.0 // indirect
go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.16.0 // indirect
go.opentelemetry.io/otel/metric v1.16.0 // indirect
go.opentelemetry.io/otel/sdk v1.16.0 // indirect
go.opentelemetry.io/otel/sdk/metric v0.39.0 // indirect
Expand Down
2 changes: 2 additions & 0 deletions cmd/otelcorecol/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1038,6 +1038,8 @@ go.opentelemetry.io/otel/exporters/prometheus v0.39.0 h1:whAaiHxOatgtKd+w0dOi//1
go.opentelemetry.io/otel/exporters/prometheus v0.39.0/go.mod h1:4jo5Q4CROlCpSPsXLhymi+LYrDXd2ObU5wbKayfZs7Y=
go.opentelemetry.io/otel/exporters/stdout/stdoutmetric v0.39.0 h1:fl2WmyenEf6LYYlfHAtCUEDyGcpwJNqD4dHGO7PVm4w=
go.opentelemetry.io/otel/exporters/stdout/stdoutmetric v0.39.0/go.mod h1:csyQxQ0UHHKVA8KApS7eUO/klMO5sd/av5CNZNU4O6w=
go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.16.0 h1:+XWJd3jf75RXJq29mxbuXhCXFDG3S3R4vBUeSI2P7tE=
go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.16.0/go.mod h1:hqgzBPTf4yONMFgdZvL/bK42R/iinTyVQtiWihs3SZc=
go.opentelemetry.io/otel/metric v1.16.0 h1:RbrpwVG1Hfv85LgnZ7+txXioPDoh6EdbZHo26Q3hqOo=
go.opentelemetry.io/otel/metric v1.16.0/go.mod h1:QE47cpOmkwipPiefDwo2wDzwJrlfxxNYodqc4xnGCo4=
go.opentelemetry.io/otel/sdk v1.16.0 h1:Z1Ok1YsijYL0CSJpHt4cS3wDDh7p572grzNrBMiMWgE=
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ require (
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v0.39.0
go.opentelemetry.io/otel/exporters/prometheus v0.39.0
go.opentelemetry.io/otel/exporters/stdout/stdoutmetric v0.39.0
go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.16.0
go.opentelemetry.io/otel/metric v1.16.0
go.opentelemetry.io/otel/sdk v1.16.0
go.opentelemetry.io/otel/sdk/metric v0.39.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,8 @@ go.opentelemetry.io/otel/exporters/prometheus v0.39.0 h1:whAaiHxOatgtKd+w0dOi//1
go.opentelemetry.io/otel/exporters/prometheus v0.39.0/go.mod h1:4jo5Q4CROlCpSPsXLhymi+LYrDXd2ObU5wbKayfZs7Y=
go.opentelemetry.io/otel/exporters/stdout/stdoutmetric v0.39.0 h1:fl2WmyenEf6LYYlfHAtCUEDyGcpwJNqD4dHGO7PVm4w=
go.opentelemetry.io/otel/exporters/stdout/stdoutmetric v0.39.0/go.mod h1:csyQxQ0UHHKVA8KApS7eUO/klMO5sd/av5CNZNU4O6w=
go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.16.0 h1:+XWJd3jf75RXJq29mxbuXhCXFDG3S3R4vBUeSI2P7tE=
go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.16.0/go.mod h1:hqgzBPTf4yONMFgdZvL/bK42R/iinTyVQtiWihs3SZc=
go.opentelemetry.io/otel/metric v1.16.0 h1:RbrpwVG1Hfv85LgnZ7+txXioPDoh6EdbZHo26Q3hqOo=
go.opentelemetry.io/otel/metric v1.16.0/go.mod h1:QE47cpOmkwipPiefDwo2wDzwJrlfxxNYodqc4xnGCo4=
go.opentelemetry.io/otel/sdk v1.16.0 h1:Z1Ok1YsijYL0CSJpHt4cS3wDDh7p572grzNrBMiMWgE=
Expand Down
59 changes: 57 additions & 2 deletions service/internal/proctelemetry/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,12 @@ import (
"go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp"
otelprom "go.opentelemetry.io/otel/exporters/prometheus"
"go.opentelemetry.io/otel/exporters/stdout/stdoutmetric"
"go.opentelemetry.io/otel/exporters/stdout/stdouttrace"
"go.opentelemetry.io/otel/sdk/instrumentation"
sdkmetric "go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/aggregation"
"go.opentelemetry.io/otel/sdk/resource"
sdktrace "go.opentelemetry.io/otel/sdk/trace"

"go.opentelemetry.io/collector/obsreport"
semconv "go.opentelemetry.io/collector/semconv/v1.18.0"
Expand Down Expand Up @@ -58,6 +60,9 @@ var (
attribute.String(semconv.AttributeNetHostName, ""),
attribute.String(semconv.AttributeNetHostPort, ""),
}

errNoValidMetricExporter = errors.New("no valid metric exporter")
errNoValidSpanExporter = errors.New("no valid span exporter")
)

func InitMetricReader(ctx context.Context, reader telemetry.MetricReader, asyncErrorChannel chan error) (sdkmetric.Reader, *http.Server, error) {
Expand All @@ -78,6 +83,56 @@ func InitMetricReader(ctx context.Context, reader telemetry.MetricReader, asyncE
return nil, nil, fmt.Errorf("unsupported metric reader type %v", reader)
}

func InitSpanProcessor(ctx context.Context, processor telemetry.SpanProcessor) (sdktrace.SpanProcessor, error) {
if processor.Batch != nil {
if processor.Batch.Exporter.Console != nil {
exp, err := stdouttrace.New(
stdouttrace.WithPrettyPrint(),
)
if err != nil {
return nil, err
}
opts := []sdktrace.BatchSpanProcessorOption{}
if processor.Batch.ExportTimeout != nil {
if *processor.Batch.ExportTimeout < 0 {
return nil, fmt.Errorf("invalid export timeout %d", *processor.Batch.ExportTimeout)
}
opts = append(opts, sdktrace.WithExportTimeout(time.Millisecond*time.Duration(*processor.Batch.ExportTimeout)))
}
if processor.Batch.MaxExportBatchSize != nil {
if *processor.Batch.MaxExportBatchSize < 0 {
return nil, fmt.Errorf("invalid batch size %d", *processor.Batch.MaxExportBatchSize)
}
opts = append(opts, sdktrace.WithMaxExportBatchSize(*processor.Batch.MaxExportBatchSize))
}
if processor.Batch.MaxQueueSize != nil {
if *processor.Batch.MaxQueueSize < 0 {
return nil, fmt.Errorf("invalid queue size %d", *processor.Batch.MaxQueueSize)
}
opts = append(opts, sdktrace.WithMaxQueueSize(*processor.Batch.MaxQueueSize))
}
if processor.Batch.ScheduleDelay != nil {
if *processor.Batch.ScheduleDelay < 0 {
return nil, fmt.Errorf("invalid schedule delay %d", *processor.Batch.ScheduleDelay)
}
opts = append(opts, sdktrace.WithBatchTimeout(time.Millisecond*time.Duration(*processor.Batch.ScheduleDelay)))
}
return sdktrace.NewBatchSpanProcessor(exp, opts...), nil
}
return nil, errNoValidSpanExporter
}
return nil, fmt.Errorf("unsupported span processor type %v", processor)
}

func InitTracerProvider(res *resource.Resource, options []sdktrace.TracerProviderOption) (*sdktrace.TracerProvider, error) {
opts := []sdktrace.TracerProviderOption{
sdktrace.WithResource(res),
}

opts = append(opts, options...)
return sdktrace.NewTracerProvider(opts...), nil
}

func InitOpenTelemetry(res *resource.Resource, options []sdkmetric.Option, disableHighCardinality bool) (*sdkmetric.MeterProvider, error) {
opts := []sdkmetric.Option{
sdkmetric.WithResource(res),
Expand Down Expand Up @@ -175,7 +230,7 @@ func initPullExporter(exporter telemetry.MetricExporter, asyncErrorChannel chan
if exporter.Prometheus != nil {
return initPrometheusExporter(exporter.Prometheus, asyncErrorChannel)
}
return nil, nil, fmt.Errorf("no valid exporter")
return nil, nil, errNoValidMetricExporter
}

func initPeriodicExporter(ctx context.Context, exporter telemetry.MetricExporter, opts ...sdkmetric.PeriodicReaderOption) (sdkmetric.Reader, *http.Server, error) {
Expand Down Expand Up @@ -207,7 +262,7 @@ func initPeriodicExporter(ctx context.Context, exporter telemetry.MetricExporter
}
return sdkmetric.NewPeriodicReader(exp, opts...), nil, nil
}
return nil, nil, fmt.Errorf("no valid exporter")
return nil, nil, errNoValidMetricExporter
}

func normalizeEndpoint(endpoint string) string {
Expand Down
97 changes: 94 additions & 3 deletions service/internal/proctelemetry/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func TestMetricReader(t *testing.T) {
},
},
},
err: errors.New("no valid exporter"),
err: errNoValidMetricExporter,
},
{
name: "pull/prometheus-invalid-config-no-host",
Expand Down Expand Up @@ -93,14 +93,14 @@ func TestMetricReader(t *testing.T) {
},
},
},
err: errors.New("no valid exporter"),
err: errNoValidMetricExporter,
},
{
name: "periodic/no-exporter",
reader: telemetry.MetricReader{
Periodic: &telemetry.PeriodicMetricReader{},
},
err: errors.New("no valid exporter"),
err: errNoValidMetricExporter,
},
{
name: "periodic/console-exporter",
Expand Down Expand Up @@ -344,3 +344,94 @@ func TestMetricReader(t *testing.T) {
})
}
}

func TestSpanProcessor(t *testing.T) {
testCases := []struct {
name string
processor telemetry.SpanProcessor
args any
err error
}{
{
name: "no processor",
err: errors.New("unsupported span processor type {<nil> <nil>}"),
},
{
name: "batch processor invalid exporter",
processor: telemetry.SpanProcessor{
Batch: &telemetry.BatchSpanProcessor{
Exporter: telemetry.SpanExporter{},
},
},
err: errNoValidSpanExporter,
},
{
name: "batch processor invalid batch size console exporter",
processor: telemetry.SpanProcessor{
Batch: &telemetry.BatchSpanProcessor{
MaxExportBatchSize: intPtr(-1),
Exporter: telemetry.SpanExporter{
Console: telemetry.Console{},
},
},
},
err: errors.New("invalid batch size -1"),
},
{
name: "batch processor invalid export timeout console exporter",
processor: telemetry.SpanProcessor{
Batch: &telemetry.BatchSpanProcessor{
ExportTimeout: intPtr(-2),
Exporter: telemetry.SpanExporter{
Console: telemetry.Console{},
},
},
},
err: errors.New("invalid export timeout -2"),
},
{
name: "batch processor invalid queue size console exporter",
processor: telemetry.SpanProcessor{
Batch: &telemetry.BatchSpanProcessor{
MaxQueueSize: intPtr(-3),
Exporter: telemetry.SpanExporter{
Console: telemetry.Console{},
},
},
},
err: errors.New("invalid queue size -3"),
},
{
name: "batch processor invalid schedule delay console exporter",
processor: telemetry.SpanProcessor{
Batch: &telemetry.BatchSpanProcessor{
ScheduleDelay: intPtr(-4),
Exporter: telemetry.SpanExporter{
Console: telemetry.Console{},
},
},
},
err: errors.New("invalid schedule delay -4"),
},
{
name: "batch processor console exporter",
processor: telemetry.SpanProcessor{
Batch: &telemetry.BatchSpanProcessor{
MaxExportBatchSize: intPtr(0),
ExportTimeout: intPtr(0),
MaxQueueSize: intPtr(0),
ScheduleDelay: intPtr(0),
Exporter: telemetry.SpanExporter{
Console: telemetry.Console{},
},
},
},
},
}
for _, tt := range testCases {
t.Run(tt.name, func(t *testing.T) {
_, err := InitSpanProcessor(context.Background(), tt.processor)
assert.Equal(t, tt.err, err)
})
}
}
1 change: 1 addition & 0 deletions service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ func New(ctx context.Context, set Settings, cfg Config) (*Service, error) {
return nil, fmt.Errorf("failed to initialize telemetry: %w", err)
}
srv.telemetrySettings.MeterProvider = srv.telemetryInitializer.mp
srv.telemetrySettings.TracerProvider = srv.telemetryInitializer.tp

// process the configuration and initialize the pipeline
if err = srv.initExtensionsAndPipeline(ctx, set, cfg); err != nil {
Expand Down
26 changes: 24 additions & 2 deletions service/telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,12 @@ import (
"go.opentelemetry.io/contrib/propagators/b3"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/metric/noop"
noopmetric "go.opentelemetry.io/otel/metric/noop"
"go.opentelemetry.io/otel/propagation"
sdkmetric "go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/resource"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
"go.opentelemetry.io/otel/trace"
"go.uber.org/multierr"
"go.uber.org/zap"

Expand Down Expand Up @@ -51,6 +53,7 @@ type telemetryInitializer struct {
views []*view.View
ocRegistry *ocmetric.Registry
mp metric.MeterProvider
tp trace.TracerProvider
servers []*http.Server

useOtel bool
Expand All @@ -60,7 +63,8 @@ type telemetryInitializer struct {

func newColTelemetry(useOtel bool, disableHighCardinality bool, extendedConfig bool) *telemetryInitializer {
return &telemetryInitializer{
mp: noop.NewMeterProvider(),
mp: noopmetric.NewMeterProvider(),
tp: trace.NewNoopTracerProvider(),
useOtel: useOtel,
disableHighCardinality: disableHighCardinality,
extendedConfig: extendedConfig,
Expand All @@ -79,6 +83,12 @@ func (tel *telemetryInitializer) init(res *resource.Resource, settings component

settings.Logger.Info("Setting up own telemetry...")

if tp, err := tel.initTraces(res, settings.Logger, cfg); err == nil {
tel.tp = tp
} else {
return err
}

if tp, err := textMapPropagatorFromConfig(cfg.Traces.Propagators); err == nil {
otel.SetTextMapPropagator(tp)
} else {
Expand All @@ -88,6 +98,18 @@ func (tel *telemetryInitializer) init(res *resource.Resource, settings component
return tel.initMetrics(res, settings.Logger, cfg, asyncErrorChannel)
}

func (tel *telemetryInitializer) initTraces(res *resource.Resource, logger *zap.Logger, cfg telemetry.Config) (trace.TracerProvider, error) {
opts := []sdktrace.TracerProviderOption{}
for _, processor := range cfg.Traces.Processors {
sp, err := proctelemetry.InitSpanProcessor(context.Background(), processor)
if err != nil {
return nil, err
}
opts = append(opts, sdktrace.WithSpanProcessor(sp))
}
return proctelemetry.InitTracerProvider(res, opts)
}

func (tel *telemetryInitializer) initMetrics(res *resource.Resource, logger *zap.Logger, cfg telemetry.Config, asyncErrorChannel chan error) error {
// Initialize the ocRegistry, still used by the process metrics.
tel.ocRegistry = ocmetric.NewRegistry()
Expand Down
26 changes: 26 additions & 0 deletions service/telemetry/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,9 @@ type TracesConfig struct {
// tracecontext and b3 are supported. By default, the value is set to empty list and
// context propagation is disabled.
Propagators []string `mapstructure:"propagators"`
// Processors allow configuration of span processors to emit spans to
// any number of suported backends.
Processors []SpanProcessor `mapstructure:"processors"`
}

// Validate checks whether the current configuration is valid
Expand All @@ -132,6 +135,29 @@ func (c *Config) Validate() error {
return nil
}

func (sp *SpanProcessor) Unmarshal(conf *confmap.Conf) error {
if !obsreportconfig.UseOtelWithSDKConfigurationForInternalTelemetryFeatureGate.IsEnabled() {
// only unmarshal if feature gate is enabled
return nil
}

if conf == nil {
return nil
}

if err := conf.Unmarshal(sp); err != nil {
return fmt.Errorf("invalid span processor configuration: %w", err)
}

if sp.Batch != nil {
if sp.Batch.Exporter.Console == nil {
return fmt.Errorf("invalid exporter configuration")
}
return nil
}
return fmt.Errorf("unsupported span processor type %s", conf.AllKeys())
}

func (mr *MetricReader) Unmarshal(conf *confmap.Conf) error {
if !obsreportconfig.UseOtelWithSDKConfigurationForInternalTelemetryFeatureGate.IsEnabled() {
// only unmarshal if feature gate is enabled
Expand Down
Loading

0 comments on commit 1f30fae

Please sign in to comment.