From 25b9be4d7a43764a8e8ca5833dac4e169c58c7c3 Mon Sep 17 00:00:00 2001 From: Paulin Todev Date: Fri, 16 Jun 2023 18:25:27 +0100 Subject: [PATCH] PR review improvements --- component/otelcol/config_grpc.go | 17 +++-- component/otelcol/config_http.go | 17 +++-- component/otelcol/exporter/jaeger/jaeger.go | 3 +- component/otelcol/exporter/otlp/otlp.go | 3 +- .../otelcol/exporter/otlphttp/otlphttp.go | 3 +- .../prometheus/internal/transaction_test.go | 73 +++++++------------ .../components/otelcol.auth.bearer.md | 2 +- pkg/traces/instance.go | 16 +++- .../traceutils/otel_meter_settings.go | 2 +- .../servicegraphprocessor/processor_test.go | 2 +- 10 files changed, 69 insertions(+), 69 deletions(-) diff --git a/component/otelcol/config_grpc.go b/component/otelcol/config_grpc.go index 64a7a1b87025..9852d40dbd0c 100644 --- a/component/otelcol/config_grpc.go +++ b/component/otelcol/config_grpc.go @@ -137,11 +137,11 @@ type GRPCClientArguments struct { TLS TLSClientArguments `river:"tls,block,optional"` Keepalive *KeepaliveClientArguments `river:"keepalive,block,optional"` - ReadBufferSize units.Base2Bytes `river:"read_buffer_size,attr,optional"` - WriteBufferSize units.Base2Bytes `river:"write_buffer_size,attr,optional"` - WaitForReady bool `river:"wait_for_ready,attr,optional"` - Headers map[string]configopaque.String `river:"headers,attr,optional"` - BalancerName string `river:"balancer_name,attr,optional"` + ReadBufferSize units.Base2Bytes `river:"read_buffer_size,attr,optional"` + WriteBufferSize units.Base2Bytes `river:"write_buffer_size,attr,optional"` + WaitForReady bool `river:"wait_for_ready,attr,optional"` + Headers map[string]string `river:"headers,attr,optional"` + BalancerName string `river:"balancer_name,attr,optional"` // Auth is a binding to an otelcol.auth.* component extension which handles // authentication. @@ -154,6 +154,11 @@ func (args *GRPCClientArguments) Convert() *otelconfiggrpc.GRPCClientSettings { return nil } + opaqueHeaders := make(map[string]configopaque.String) + for headerName, headerVal := range args.Headers { + opaqueHeaders[headerName] = configopaque.String(headerVal) + } + // Configure the authentication if args.Auth is set. var auth *otelconfigauth.Authentication if args.Auth != nil { @@ -171,7 +176,7 @@ func (args *GRPCClientArguments) Convert() *otelconfiggrpc.GRPCClientSettings { ReadBufferSize: int(args.ReadBufferSize), WriteBufferSize: int(args.WriteBufferSize), WaitForReady: args.WaitForReady, - Headers: args.Headers, + Headers: opaqueHeaders, BalancerName: args.BalancerName, Auth: auth, diff --git a/component/otelcol/config_http.go b/component/otelcol/config_http.go index ff9baae64443..13ddaf306996 100644 --- a/component/otelcol/config_http.go +++ b/component/otelcol/config_http.go @@ -8,7 +8,7 @@ import ( otelcomponent "go.opentelemetry.io/collector/component" otelconfigauth "go.opentelemetry.io/collector/config/configauth" otelconfighttp "go.opentelemetry.io/collector/config/confighttp" - otelconfigopaque "go.opentelemetry.io/collector/config/configopaque" + "go.opentelemetry.io/collector/config/configopaque" otelextension "go.opentelemetry.io/collector/extension" ) @@ -80,10 +80,10 @@ type HTTPClientArguments struct { TLS TLSClientArguments `river:"tls,block,optional"` - ReadBufferSize units.Base2Bytes `river:"read_buffer_size,attr,optional"` - WriteBufferSize units.Base2Bytes `river:"write_buffer_size,attr,optional"` - Timeout time.Duration `river:"timeout,attr,optional"` - Headers map[string]otelconfigopaque.String `river:"headers,attr,optional"` + ReadBufferSize units.Base2Bytes `river:"read_buffer_size,attr,optional"` + WriteBufferSize units.Base2Bytes `river:"write_buffer_size,attr,optional"` + Timeout time.Duration `river:"timeout,attr,optional"` + Headers map[string]string `river:"headers,attr,optional"` // CustomRoundTripper func(next http.RoundTripper) (http.RoundTripper, error) TODO (@tpaschalis) MaxIdleConns *int `river:"max_idle_conns,attr,optional"` MaxIdleConnsPerHost *int `river:"max_idle_conns_per_host,attr,optional"` @@ -107,6 +107,11 @@ func (args *HTTPClientArguments) Convert() *otelconfighttp.HTTPClientSettings { auth = &otelconfigauth.Authentication{AuthenticatorID: args.Auth.ID} } + opaqueHeaders := make(map[string]configopaque.String) + for headerName, headerVal := range args.Headers { + opaqueHeaders[headerName] = configopaque.String(headerVal) + } + return &otelconfighttp.HTTPClientSettings{ Endpoint: args.Endpoint, @@ -117,7 +122,7 @@ func (args *HTTPClientArguments) Convert() *otelconfighttp.HTTPClientSettings { ReadBufferSize: int(args.ReadBufferSize), WriteBufferSize: int(args.WriteBufferSize), Timeout: args.Timeout, - Headers: args.Headers, + Headers: opaqueHeaders, // CustomRoundTripper: func(http.RoundTripper) (http.RoundTripper, error) { panic("not implemented") }, TODO (@tpaschalis) MaxIdleConns: args.MaxIdleConns, MaxIdleConnsPerHost: args.MaxIdleConnsPerHost, diff --git a/component/otelcol/exporter/jaeger/jaeger.go b/component/otelcol/exporter/jaeger/jaeger.go index 0dea790f8aea..29fdce5548f7 100644 --- a/component/otelcol/exporter/jaeger/jaeger.go +++ b/component/otelcol/exporter/jaeger/jaeger.go @@ -9,7 +9,6 @@ import ( "github.com/grafana/agent/component/otelcol/exporter" "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/jaegerexporter" otelcomponent "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/config/configopaque" otelpexporterhelper "go.opentelemetry.io/collector/exporter/exporterhelper" otelextension "go.opentelemetry.io/collector/extension" ) @@ -83,7 +82,7 @@ type GRPCClientArguments otelcol.GRPCClientArguments // DefaultGRPCClientArguments holds component-specific default settings for // GRPCClientArguments. var DefaultGRPCClientArguments = GRPCClientArguments{ - Headers: map[string]configopaque.String{}, + Headers: map[string]string{}, Compression: otelcol.CompressionTypeGzip, WriteBufferSize: 512 * 1024, } diff --git a/component/otelcol/exporter/otlp/otlp.go b/component/otelcol/exporter/otlp/otlp.go index b42ab78f0bae..baf3240af5e2 100644 --- a/component/otelcol/exporter/otlp/otlp.go +++ b/component/otelcol/exporter/otlp/otlp.go @@ -8,7 +8,6 @@ import ( "github.com/grafana/agent/component/otelcol" "github.com/grafana/agent/component/otelcol/exporter" otelcomponent "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/config/configopaque" otelpexporterhelper "go.opentelemetry.io/collector/exporter/exporterhelper" "go.opentelemetry.io/collector/exporter/otlpexporter" otelextension "go.opentelemetry.io/collector/extension" @@ -83,7 +82,7 @@ type GRPCClientArguments otelcol.GRPCClientArguments // DefaultGRPCClientArguments holds component-specific default settings for // GRPCClientArguments. var DefaultGRPCClientArguments = GRPCClientArguments{ - Headers: map[string]configopaque.String{}, + Headers: map[string]string{}, Compression: otelcol.CompressionTypeGzip, WriteBufferSize: 512 * 1024, } diff --git a/component/otelcol/exporter/otlphttp/otlphttp.go b/component/otelcol/exporter/otlphttp/otlphttp.go index fc06f6c08162..8d6e9fc23c3f 100644 --- a/component/otelcol/exporter/otlphttp/otlphttp.go +++ b/component/otelcol/exporter/otlphttp/otlphttp.go @@ -9,7 +9,6 @@ import ( "github.com/grafana/agent/component/otelcol" "github.com/grafana/agent/component/otelcol/exporter" otelcomponent "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/config/configopaque" "go.opentelemetry.io/collector/exporter/otlphttpexporter" otelextension "go.opentelemetry.io/collector/extension" ) @@ -98,7 +97,7 @@ var ( IdleConnTimeout: &DefaultIdleConnTimeout, Timeout: 30 * time.Second, - Headers: map[string]configopaque.String{}, + Headers: map[string]string{}, Compression: otelcol.CompressionTypeGzip, ReadBufferSize: 0, WriteBufferSize: 512 * 1024, diff --git a/component/otelcol/receiver/prometheus/internal/transaction_test.go b/component/otelcol/receiver/prometheus/internal/transaction_test.go index 58db6fe02bb0..deb47ff989d5 100644 --- a/component/otelcol/receiver/prometheus/internal/transaction_test.go +++ b/component/otelcol/receiver/prometheus/internal/transaction_test.go @@ -61,33 +61,25 @@ var ( ) func TestTransactionCommitWithoutAdding(t *testing.T) { - recv, err := nopObsRecv() - assert.NoError(t, err) - tr := newTransaction(scrapeCtx, &startTimeAdjuster{startTime: startTimestamp}, consumertest.NewNop(), nil, receivertest.NewNopCreateSettings(), recv) + tr := newTransaction(scrapeCtx, &startTimeAdjuster{startTime: startTimestamp}, consumertest.NewNop(), nil, receivertest.NewNopCreateSettings(), nopObsRecv(t)) assert.NoError(t, tr.Commit()) } func TestTransactionRollbackDoesNothing(t *testing.T) { - recv, err := nopObsRecv() - assert.NoError(t, err) - tr := newTransaction(scrapeCtx, &startTimeAdjuster{startTime: startTimestamp}, consumertest.NewNop(), nil, receivertest.NewNopCreateSettings(), recv) + tr := newTransaction(scrapeCtx, &startTimeAdjuster{startTime: startTimestamp}, consumertest.NewNop(), nil, receivertest.NewNopCreateSettings(), nopObsRecv(t)) assert.NoError(t, tr.Rollback()) } func TestTransactionUpdateMetadataDoesNothing(t *testing.T) { - recv, err := nopObsRecv() - assert.NoError(t, err) - tr := newTransaction(scrapeCtx, &startTimeAdjuster{startTime: startTimestamp}, consumertest.NewNop(), nil, receivertest.NewNopCreateSettings(), recv) - _, err = tr.UpdateMetadata(0, labels.New(), metadata.Metadata{}) + tr := newTransaction(scrapeCtx, &startTimeAdjuster{startTime: startTimestamp}, consumertest.NewNop(), nil, receivertest.NewNopCreateSettings(), nopObsRecv(t)) + _, err := tr.UpdateMetadata(0, labels.New(), metadata.Metadata{}) assert.NoError(t, err) } func TestTransactionAppendNoTarget(t *testing.T) { - recv, err := nopObsRecv() - assert.NoError(t, err) badLabels := labels.FromStrings(model.MetricNameLabel, "counter_test") - tr := newTransaction(scrapeCtx, &startTimeAdjuster{startTime: startTimestamp}, consumertest.NewNop(), nil, receivertest.NewNopCreateSettings(), recv) - _, err = tr.Append(0, badLabels, time.Now().Unix()*1000, 1.0) + tr := newTransaction(scrapeCtx, &startTimeAdjuster{startTime: startTimestamp}, consumertest.NewNop(), nil, receivertest.NewNopCreateSettings(), nopObsRecv(t)) + _, err := tr.Append(0, badLabels, time.Now().Unix()*1000, 1.0) assert.Error(t, err) } @@ -96,20 +88,16 @@ func TestTransactionAppendNoMetricName(t *testing.T) { model.InstanceLabel: "localhost:8080", model.JobLabel: "test2", }) - recv, err := nopObsRecv() - assert.NoError(t, err) - tr := newTransaction(scrapeCtx, &startTimeAdjuster{startTime: startTimestamp}, consumertest.NewNop(), nil, receivertest.NewNopCreateSettings(), recv) - _, err = tr.Append(0, jobNotFoundLb, time.Now().Unix()*1000, 1.0) + tr := newTransaction(scrapeCtx, &startTimeAdjuster{startTime: startTimestamp}, consumertest.NewNop(), nil, receivertest.NewNopCreateSettings(), nopObsRecv(t)) + _, err := tr.Append(0, jobNotFoundLb, time.Now().Unix()*1000, 1.0) assert.ErrorIs(t, err, errMetricNameNotFound) assert.ErrorIs(t, tr.Commit(), errNoDataToBuild) } func TestTransactionAppendEmptyMetricName(t *testing.T) { - recv, err := nopObsRecv() - assert.NoError(t, err) - tr := newTransaction(scrapeCtx, &startTimeAdjuster{startTime: startTimestamp}, consumertest.NewNop(), nil, receivertest.NewNopCreateSettings(), recv) - _, err = tr.Append(0, labels.FromMap(map[string]string{ + tr := newTransaction(scrapeCtx, &startTimeAdjuster{startTime: startTimestamp}, consumertest.NewNop(), nil, receivertest.NewNopCreateSettings(), nopObsRecv(t)) + _, err := tr.Append(0, labels.FromMap(map[string]string{ model.InstanceLabel: "localhost:8080", model.JobLabel: "test2", model.MetricNameLabel: "", @@ -119,10 +107,8 @@ func TestTransactionAppendEmptyMetricName(t *testing.T) { func TestTransactionAppendResource(t *testing.T) { sink := new(consumertest.MetricsSink) - recv, err := nopObsRecv() - assert.NoError(t, err) - tr := newTransaction(scrapeCtx, &startTimeAdjuster{startTime: startTimestamp}, sink, nil, receivertest.NewNopCreateSettings(), recv) - _, err = tr.Append(0, labels.FromMap(map[string]string{ + tr := newTransaction(scrapeCtx, &startTimeAdjuster{startTime: startTimestamp}, sink, nil, receivertest.NewNopCreateSettings(), nopObsRecv(t)) + _, err := tr.Append(0, labels.FromMap(map[string]string{ model.InstanceLabel: "localhost:8080", model.JobLabel: "test", model.MetricNameLabel: "counter_test", @@ -150,10 +136,8 @@ func TestTransactionCommitErrorWhenAdjusterError(t *testing.T) { }) sink := new(consumertest.MetricsSink) adjusterErr := errors.New("adjuster error") - recv, err := nopObsRecv() - assert.NoError(t, err) - tr := newTransaction(scrapeCtx, &errorAdjuster{err: adjusterErr}, sink, nil, receivertest.NewNopCreateSettings(), recv) - _, err = tr.Append(0, goodLabels, time.Now().Unix()*1000, 1.0) + tr := newTransaction(scrapeCtx, &errorAdjuster{err: adjusterErr}, sink, nil, receivertest.NewNopCreateSettings(), nopObsRecv(t)) + _, err := tr.Append(0, goodLabels, time.Now().Unix()*1000, 1.0) assert.NoError(t, err) assert.ErrorIs(t, tr.Commit(), adjusterErr) } @@ -161,9 +145,7 @@ func TestTransactionCommitErrorWhenAdjusterError(t *testing.T) { // Ensure that we reject duplicate label keys. See https://github.com/open-telemetry/wg-prometheus/issues/44. func TestTransactionAppendDuplicateLabels(t *testing.T) { sink := new(consumertest.MetricsSink) - recv, err := nopObsRecv() - assert.NoError(t, err) - tr := newTransaction(scrapeCtx, &startTimeAdjuster{startTime: startTimestamp}, sink, nil, receivertest.NewNopCreateSettings(), recv) + tr := newTransaction(scrapeCtx, &startTimeAdjuster{startTime: startTimestamp}, sink, nil, receivertest.NewNopCreateSettings(), nopObsRecv(t)) dupLabels := labels.FromStrings( model.InstanceLabel, "0.0.0.0:8855", @@ -174,16 +156,14 @@ func TestTransactionAppendDuplicateLabels(t *testing.T) { "z", "9", ) - _, err = tr.Append(0, dupLabels, 1917, 1.0) + _, err := tr.Append(0, dupLabels, 1917, 1.0) require.Error(t, err) assert.Contains(t, err.Error(), `invalid sample: non-unique label names: "a"`) } func TestTransactionAppendHistogramNoLe(t *testing.T) { sink := new(consumertest.MetricsSink) - recv, err := nopObsRecv() - assert.NoError(t, err) - tr := newTransaction(scrapeCtx, &startTimeAdjuster{startTime: startTimestamp}, sink, nil, receivertest.NewNopCreateSettings(), recv) + tr := newTransaction(scrapeCtx, &startTimeAdjuster{startTime: startTimestamp}, sink, nil, receivertest.NewNopCreateSettings(), nopObsRecv(t)) goodLabels := labels.FromStrings( model.InstanceLabel, "0.0.0.0:8855", @@ -191,15 +171,13 @@ func TestTransactionAppendHistogramNoLe(t *testing.T) { model.MetricNameLabel, "hist_test_bucket", ) - _, err = tr.Append(0, goodLabels, 1917, 1.0) + _, err := tr.Append(0, goodLabels, 1917, 1.0) require.ErrorIs(t, err, errEmptyLeLabel) } func TestTransactionAppendSummaryNoQuantile(t *testing.T) { sink := new(consumertest.MetricsSink) - recv, err := nopObsRecv() - assert.NoError(t, err) - tr := newTransaction(scrapeCtx, &startTimeAdjuster{startTime: startTimestamp}, sink, nil, receivertest.NewNopCreateSettings(), recv) + tr := newTransaction(scrapeCtx, &startTimeAdjuster{startTime: startTimestamp}, sink, nil, receivertest.NewNopCreateSettings(), nopObsRecv(t)) goodLabels := labels.FromStrings( model.InstanceLabel, "0.0.0.0:8855", @@ -207,16 +185,19 @@ func TestTransactionAppendSummaryNoQuantile(t *testing.T) { model.MetricNameLabel, "summary_test", ) - _, err = tr.Append(0, goodLabels, 1917, 1.0) + _, err := tr.Append(0, goodLabels, 1917, 1.0) require.ErrorIs(t, err, errEmptyQuantileLabel) } -func nopObsRecv() (*obsreport.Receiver, error) { - return obsreport.NewReceiver(obsreport.ReceiverSettings{ +func nopObsRecv(t *testing.T) *obsreport.Receiver { + res, err := obsreport.NewReceiver(obsreport.ReceiverSettings{ ReceiverID: component.NewID("prometheus"), Transport: transport, ReceiverCreateSettings: receivertest.NewNopCreateSettings(), }) + + assert.NoError(t, err) + return res } func TestMetricBuilderCounters(t *testing.T) { @@ -1047,9 +1028,7 @@ func (tt buildTestData) run(t *testing.T) { st := ts for i, page := range tt.inputs { sink := new(consumertest.MetricsSink) - recv, err := nopObsRecv() - assert.NoError(t, err) - tr := newTransaction(scrapeCtx, &startTimeAdjuster{startTime: startTimestamp}, sink, nil, receivertest.NewNopCreateSettings(), recv) + tr := newTransaction(scrapeCtx, &startTimeAdjuster{startTime: startTimestamp}, sink, nil, receivertest.NewNopCreateSettings(), nopObsRecv(t)) for _, pt := range page.pts { // set ts for testing pt.t = st diff --git a/docs/sources/flow/reference/components/otelcol.auth.bearer.md b/docs/sources/flow/reference/components/otelcol.auth.bearer.md index eefd25c13fb1..0481c6fccf96 100644 --- a/docs/sources/flow/reference/components/otelcol.auth.bearer.md +++ b/docs/sources/flow/reference/components/otelcol.auth.bearer.md @@ -51,7 +51,7 @@ configuration. `otelcol.auth.bearer` does not expose any component-specific debug information. -## Example +## Examples ### Default scheme via gRPC transport diff --git a/pkg/traces/instance.go b/pkg/traces/instance.go index 2e6950f07d17..6e92d43bab80 100644 --- a/pkg/traces/instance.go +++ b/pkg/traces/instance.go @@ -132,6 +132,20 @@ func (i *Instance) buildAndStartPipeline(ctx context.Context, cfg InstanceConfig Version: build.Version, } + // useOtelForInternalMetrics is required so that the Collector service configures Collector components using the Otel SDK + // instead of OpenCensus. If this is not specified, then the OtelMetricViews and OtelMetricReader parameters which we + // pass to service.New() below will not be taken into account. This would mean that metrics from custom components such as + // the one in pkg/traces/servicegraphprocessor would not work. + // + // disableHighCardinalityMetrics is required so that we don't include labels containing ports and IP addresses in gRPC metrics. + // Example metric with high cardinality... + // rpc_server_duration_bucket{net_sock_peer_addr="127.0.0.1",net_sock_peer_port="59947",rpc_grpc_status_code="0",rpc_method="Export",rpc_service="opentelemetry.proto.collector.trace.v1.TraceService",rpc_system="grpc",traces_config="default",le="7500"} 294 + // ... the same metric when disableHighCardinalityMetrics is switched on looks like this: + // rpc_server_duration_bucket{rpc_grpc_status_code="0",rpc_method="Export",rpc_service="opentelemetry.proto.collector.trace.v1.TraceService",rpc_system="grpc",traces_config="default",le="7500"} 32 + // For more context: + // https://opentelemetry.io/docs/specs/otel/metrics/semantic_conventions/rpc-metrics/ + // https://github.com/open-telemetry/opentelemetry-go-contrib/pull/2700 + // https://github.com/open-telemetry/opentelemetry-collector/pull/6788/files err = enableOtelFeatureGates( "telemetry.useOtelForInternalMetrics", "telemetry.disableHighCardinalityMetrics") @@ -139,7 +153,7 @@ func (i *Instance) buildAndStartPipeline(ctx context.Context, cfg InstanceConfig return err } - promExporter, err := traceutils.PromeheusExporter(reg) + promExporter, err := traceutils.PrometheusExporter(reg) if err != nil { return fmt.Errorf("error creating otel prometheus exporter: %w", err) } diff --git a/pkg/traces/internal/traceutils/otel_meter_settings.go b/pkg/traces/internal/traceutils/otel_meter_settings.go index 2752a30ae66b..b02a3cb07a4a 100644 --- a/pkg/traces/internal/traceutils/otel_meter_settings.go +++ b/pkg/traces/internal/traceutils/otel_meter_settings.go @@ -5,7 +5,7 @@ import ( otelprom "go.opentelemetry.io/otel/exporters/prometheus" ) -func PromeheusExporter(reg prometheus.Registerer) (*otelprom.Exporter, error) { +func PrometheusExporter(reg prometheus.Registerer) (*otelprom.Exporter, error) { return otelprom.New( otelprom.WithRegisterer(reg), otelprom.WithoutUnits(), diff --git a/pkg/traces/servicegraphprocessor/processor_test.go b/pkg/traces/servicegraphprocessor/processor_test.go index 273f6995f962..f9e0d9cb6219 100644 --- a/pkg/traces/servicegraphprocessor/processor_test.go +++ b/pkg/traces/servicegraphprocessor/processor_test.go @@ -109,7 +109,7 @@ func TestConsumeMetrics(t *testing.T) { } func getTestMeterProvider(t *testing.T, reg prometheus.Registerer) *sdkmetric.MeterProvider { - promExporter, err := traceutils.PromeheusExporter(reg) + promExporter, err := traceutils.PrometheusExporter(reg) require.NoError(t, err) mp := sdkmetric.NewMeterProvider(