Skip to content

Commit

Permalink
PR review improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
ptodev committed Jun 16, 2023
1 parent 356d973 commit 25b9be4
Show file tree
Hide file tree
Showing 10 changed files with 69 additions and 69 deletions.
17 changes: 11 additions & 6 deletions component/otelcol/config_grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,11 +137,11 @@ type GRPCClientArguments struct {
TLS TLSClientArguments `river:"tls,block,optional"`
Keepalive *KeepaliveClientArguments `river:"keepalive,block,optional"`

ReadBufferSize units.Base2Bytes `river:"read_buffer_size,attr,optional"`
WriteBufferSize units.Base2Bytes `river:"write_buffer_size,attr,optional"`
WaitForReady bool `river:"wait_for_ready,attr,optional"`
Headers map[string]configopaque.String `river:"headers,attr,optional"`
BalancerName string `river:"balancer_name,attr,optional"`
ReadBufferSize units.Base2Bytes `river:"read_buffer_size,attr,optional"`
WriteBufferSize units.Base2Bytes `river:"write_buffer_size,attr,optional"`
WaitForReady bool `river:"wait_for_ready,attr,optional"`
Headers map[string]string `river:"headers,attr,optional"`
BalancerName string `river:"balancer_name,attr,optional"`

// Auth is a binding to an otelcol.auth.* component extension which handles
// authentication.
Expand All @@ -154,6 +154,11 @@ func (args *GRPCClientArguments) Convert() *otelconfiggrpc.GRPCClientSettings {
return nil
}

opaqueHeaders := make(map[string]configopaque.String)
for headerName, headerVal := range args.Headers {
opaqueHeaders[headerName] = configopaque.String(headerVal)
}

// Configure the authentication if args.Auth is set.
var auth *otelconfigauth.Authentication
if args.Auth != nil {
Expand All @@ -171,7 +176,7 @@ func (args *GRPCClientArguments) Convert() *otelconfiggrpc.GRPCClientSettings {
ReadBufferSize: int(args.ReadBufferSize),
WriteBufferSize: int(args.WriteBufferSize),
WaitForReady: args.WaitForReady,
Headers: args.Headers,
Headers: opaqueHeaders,
BalancerName: args.BalancerName,

Auth: auth,
Expand Down
17 changes: 11 additions & 6 deletions component/otelcol/config_http.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
otelcomponent "go.opentelemetry.io/collector/component"
otelconfigauth "go.opentelemetry.io/collector/config/configauth"
otelconfighttp "go.opentelemetry.io/collector/config/confighttp"
otelconfigopaque "go.opentelemetry.io/collector/config/configopaque"
"go.opentelemetry.io/collector/config/configopaque"
otelextension "go.opentelemetry.io/collector/extension"
)

Expand Down Expand Up @@ -80,10 +80,10 @@ type HTTPClientArguments struct {

TLS TLSClientArguments `river:"tls,block,optional"`

ReadBufferSize units.Base2Bytes `river:"read_buffer_size,attr,optional"`
WriteBufferSize units.Base2Bytes `river:"write_buffer_size,attr,optional"`
Timeout time.Duration `river:"timeout,attr,optional"`
Headers map[string]otelconfigopaque.String `river:"headers,attr,optional"`
ReadBufferSize units.Base2Bytes `river:"read_buffer_size,attr,optional"`
WriteBufferSize units.Base2Bytes `river:"write_buffer_size,attr,optional"`
Timeout time.Duration `river:"timeout,attr,optional"`
Headers map[string]string `river:"headers,attr,optional"`
// CustomRoundTripper func(next http.RoundTripper) (http.RoundTripper, error) TODO (@tpaschalis)
MaxIdleConns *int `river:"max_idle_conns,attr,optional"`
MaxIdleConnsPerHost *int `river:"max_idle_conns_per_host,attr,optional"`
Expand All @@ -107,6 +107,11 @@ func (args *HTTPClientArguments) Convert() *otelconfighttp.HTTPClientSettings {
auth = &otelconfigauth.Authentication{AuthenticatorID: args.Auth.ID}
}

opaqueHeaders := make(map[string]configopaque.String)
for headerName, headerVal := range args.Headers {
opaqueHeaders[headerName] = configopaque.String(headerVal)
}

return &otelconfighttp.HTTPClientSettings{
Endpoint: args.Endpoint,

Expand All @@ -117,7 +122,7 @@ func (args *HTTPClientArguments) Convert() *otelconfighttp.HTTPClientSettings {
ReadBufferSize: int(args.ReadBufferSize),
WriteBufferSize: int(args.WriteBufferSize),
Timeout: args.Timeout,
Headers: args.Headers,
Headers: opaqueHeaders,
// CustomRoundTripper: func(http.RoundTripper) (http.RoundTripper, error) { panic("not implemented") }, TODO (@tpaschalis)
MaxIdleConns: args.MaxIdleConns,
MaxIdleConnsPerHost: args.MaxIdleConnsPerHost,
Expand Down
3 changes: 1 addition & 2 deletions component/otelcol/exporter/jaeger/jaeger.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"github.com/grafana/agent/component/otelcol/exporter"
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/jaegerexporter"
otelcomponent "go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configopaque"
otelpexporterhelper "go.opentelemetry.io/collector/exporter/exporterhelper"
otelextension "go.opentelemetry.io/collector/extension"
)
Expand Down Expand Up @@ -83,7 +82,7 @@ type GRPCClientArguments otelcol.GRPCClientArguments
// DefaultGRPCClientArguments holds component-specific default settings for
// GRPCClientArguments.
var DefaultGRPCClientArguments = GRPCClientArguments{
Headers: map[string]configopaque.String{},
Headers: map[string]string{},
Compression: otelcol.CompressionTypeGzip,
WriteBufferSize: 512 * 1024,
}
Expand Down
3 changes: 1 addition & 2 deletions component/otelcol/exporter/otlp/otlp.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"github.com/grafana/agent/component/otelcol"
"github.com/grafana/agent/component/otelcol/exporter"
otelcomponent "go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configopaque"
otelpexporterhelper "go.opentelemetry.io/collector/exporter/exporterhelper"
"go.opentelemetry.io/collector/exporter/otlpexporter"
otelextension "go.opentelemetry.io/collector/extension"
Expand Down Expand Up @@ -83,7 +82,7 @@ type GRPCClientArguments otelcol.GRPCClientArguments
// DefaultGRPCClientArguments holds component-specific default settings for
// GRPCClientArguments.
var DefaultGRPCClientArguments = GRPCClientArguments{
Headers: map[string]configopaque.String{},
Headers: map[string]string{},
Compression: otelcol.CompressionTypeGzip,
WriteBufferSize: 512 * 1024,
}
Expand Down
3 changes: 1 addition & 2 deletions component/otelcol/exporter/otlphttp/otlphttp.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"github.com/grafana/agent/component/otelcol"
"github.com/grafana/agent/component/otelcol/exporter"
otelcomponent "go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configopaque"
"go.opentelemetry.io/collector/exporter/otlphttpexporter"
otelextension "go.opentelemetry.io/collector/extension"
)
Expand Down Expand Up @@ -98,7 +97,7 @@ var (
IdleConnTimeout: &DefaultIdleConnTimeout,

Timeout: 30 * time.Second,
Headers: map[string]configopaque.String{},
Headers: map[string]string{},
Compression: otelcol.CompressionTypeGzip,
ReadBufferSize: 0,
WriteBufferSize: 512 * 1024,
Expand Down
73 changes: 26 additions & 47 deletions component/otelcol/receiver/prometheus/internal/transaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,33 +61,25 @@ var (
)

func TestTransactionCommitWithoutAdding(t *testing.T) {
recv, err := nopObsRecv()
assert.NoError(t, err)
tr := newTransaction(scrapeCtx, &startTimeAdjuster{startTime: startTimestamp}, consumertest.NewNop(), nil, receivertest.NewNopCreateSettings(), recv)
tr := newTransaction(scrapeCtx, &startTimeAdjuster{startTime: startTimestamp}, consumertest.NewNop(), nil, receivertest.NewNopCreateSettings(), nopObsRecv(t))
assert.NoError(t, tr.Commit())
}

func TestTransactionRollbackDoesNothing(t *testing.T) {
recv, err := nopObsRecv()
assert.NoError(t, err)
tr := newTransaction(scrapeCtx, &startTimeAdjuster{startTime: startTimestamp}, consumertest.NewNop(), nil, receivertest.NewNopCreateSettings(), recv)
tr := newTransaction(scrapeCtx, &startTimeAdjuster{startTime: startTimestamp}, consumertest.NewNop(), nil, receivertest.NewNopCreateSettings(), nopObsRecv(t))
assert.NoError(t, tr.Rollback())
}

func TestTransactionUpdateMetadataDoesNothing(t *testing.T) {
recv, err := nopObsRecv()
assert.NoError(t, err)
tr := newTransaction(scrapeCtx, &startTimeAdjuster{startTime: startTimestamp}, consumertest.NewNop(), nil, receivertest.NewNopCreateSettings(), recv)
_, err = tr.UpdateMetadata(0, labels.New(), metadata.Metadata{})
tr := newTransaction(scrapeCtx, &startTimeAdjuster{startTime: startTimestamp}, consumertest.NewNop(), nil, receivertest.NewNopCreateSettings(), nopObsRecv(t))
_, err := tr.UpdateMetadata(0, labels.New(), metadata.Metadata{})
assert.NoError(t, err)
}

func TestTransactionAppendNoTarget(t *testing.T) {
recv, err := nopObsRecv()
assert.NoError(t, err)
badLabels := labels.FromStrings(model.MetricNameLabel, "counter_test")
tr := newTransaction(scrapeCtx, &startTimeAdjuster{startTime: startTimestamp}, consumertest.NewNop(), nil, receivertest.NewNopCreateSettings(), recv)
_, err = tr.Append(0, badLabels, time.Now().Unix()*1000, 1.0)
tr := newTransaction(scrapeCtx, &startTimeAdjuster{startTime: startTimestamp}, consumertest.NewNop(), nil, receivertest.NewNopCreateSettings(), nopObsRecv(t))
_, err := tr.Append(0, badLabels, time.Now().Unix()*1000, 1.0)
assert.Error(t, err)
}

Expand All @@ -96,20 +88,16 @@ func TestTransactionAppendNoMetricName(t *testing.T) {
model.InstanceLabel: "localhost:8080",
model.JobLabel: "test2",
})
recv, err := nopObsRecv()
assert.NoError(t, err)
tr := newTransaction(scrapeCtx, &startTimeAdjuster{startTime: startTimestamp}, consumertest.NewNop(), nil, receivertest.NewNopCreateSettings(), recv)
_, err = tr.Append(0, jobNotFoundLb, time.Now().Unix()*1000, 1.0)
tr := newTransaction(scrapeCtx, &startTimeAdjuster{startTime: startTimestamp}, consumertest.NewNop(), nil, receivertest.NewNopCreateSettings(), nopObsRecv(t))
_, err := tr.Append(0, jobNotFoundLb, time.Now().Unix()*1000, 1.0)
assert.ErrorIs(t, err, errMetricNameNotFound)

assert.ErrorIs(t, tr.Commit(), errNoDataToBuild)
}

func TestTransactionAppendEmptyMetricName(t *testing.T) {
recv, err := nopObsRecv()
assert.NoError(t, err)
tr := newTransaction(scrapeCtx, &startTimeAdjuster{startTime: startTimestamp}, consumertest.NewNop(), nil, receivertest.NewNopCreateSettings(), recv)
_, err = tr.Append(0, labels.FromMap(map[string]string{
tr := newTransaction(scrapeCtx, &startTimeAdjuster{startTime: startTimestamp}, consumertest.NewNop(), nil, receivertest.NewNopCreateSettings(), nopObsRecv(t))
_, err := tr.Append(0, labels.FromMap(map[string]string{
model.InstanceLabel: "localhost:8080",
model.JobLabel: "test2",
model.MetricNameLabel: "",
Expand All @@ -119,10 +107,8 @@ func TestTransactionAppendEmptyMetricName(t *testing.T) {

func TestTransactionAppendResource(t *testing.T) {
sink := new(consumertest.MetricsSink)
recv, err := nopObsRecv()
assert.NoError(t, err)
tr := newTransaction(scrapeCtx, &startTimeAdjuster{startTime: startTimestamp}, sink, nil, receivertest.NewNopCreateSettings(), recv)
_, err = tr.Append(0, labels.FromMap(map[string]string{
tr := newTransaction(scrapeCtx, &startTimeAdjuster{startTime: startTimestamp}, sink, nil, receivertest.NewNopCreateSettings(), nopObsRecv(t))
_, err := tr.Append(0, labels.FromMap(map[string]string{
model.InstanceLabel: "localhost:8080",
model.JobLabel: "test",
model.MetricNameLabel: "counter_test",
Expand Down Expand Up @@ -150,20 +136,16 @@ func TestTransactionCommitErrorWhenAdjusterError(t *testing.T) {
})
sink := new(consumertest.MetricsSink)
adjusterErr := errors.New("adjuster error")
recv, err := nopObsRecv()
assert.NoError(t, err)
tr := newTransaction(scrapeCtx, &errorAdjuster{err: adjusterErr}, sink, nil, receivertest.NewNopCreateSettings(), recv)
_, err = tr.Append(0, goodLabels, time.Now().Unix()*1000, 1.0)
tr := newTransaction(scrapeCtx, &errorAdjuster{err: adjusterErr}, sink, nil, receivertest.NewNopCreateSettings(), nopObsRecv(t))
_, err := tr.Append(0, goodLabels, time.Now().Unix()*1000, 1.0)
assert.NoError(t, err)
assert.ErrorIs(t, tr.Commit(), adjusterErr)
}

// Ensure that we reject duplicate label keys. See https://github.com/open-telemetry/wg-prometheus/issues/44.
func TestTransactionAppendDuplicateLabels(t *testing.T) {
sink := new(consumertest.MetricsSink)
recv, err := nopObsRecv()
assert.NoError(t, err)
tr := newTransaction(scrapeCtx, &startTimeAdjuster{startTime: startTimestamp}, sink, nil, receivertest.NewNopCreateSettings(), recv)
tr := newTransaction(scrapeCtx, &startTimeAdjuster{startTime: startTimestamp}, sink, nil, receivertest.NewNopCreateSettings(), nopObsRecv(t))

dupLabels := labels.FromStrings(
model.InstanceLabel, "0.0.0.0:8855",
Expand All @@ -174,49 +156,48 @@ func TestTransactionAppendDuplicateLabels(t *testing.T) {
"z", "9",
)

_, err = tr.Append(0, dupLabels, 1917, 1.0)
_, err := tr.Append(0, dupLabels, 1917, 1.0)
require.Error(t, err)
assert.Contains(t, err.Error(), `invalid sample: non-unique label names: "a"`)
}

func TestTransactionAppendHistogramNoLe(t *testing.T) {
sink := new(consumertest.MetricsSink)
recv, err := nopObsRecv()
assert.NoError(t, err)
tr := newTransaction(scrapeCtx, &startTimeAdjuster{startTime: startTimestamp}, sink, nil, receivertest.NewNopCreateSettings(), recv)
tr := newTransaction(scrapeCtx, &startTimeAdjuster{startTime: startTimestamp}, sink, nil, receivertest.NewNopCreateSettings(), nopObsRecv(t))

goodLabels := labels.FromStrings(
model.InstanceLabel, "0.0.0.0:8855",
model.JobLabel, "test",
model.MetricNameLabel, "hist_test_bucket",
)

_, err = tr.Append(0, goodLabels, 1917, 1.0)
_, err := tr.Append(0, goodLabels, 1917, 1.0)
require.ErrorIs(t, err, errEmptyLeLabel)
}

func TestTransactionAppendSummaryNoQuantile(t *testing.T) {
sink := new(consumertest.MetricsSink)
recv, err := nopObsRecv()
assert.NoError(t, err)
tr := newTransaction(scrapeCtx, &startTimeAdjuster{startTime: startTimestamp}, sink, nil, receivertest.NewNopCreateSettings(), recv)
tr := newTransaction(scrapeCtx, &startTimeAdjuster{startTime: startTimestamp}, sink, nil, receivertest.NewNopCreateSettings(), nopObsRecv(t))

goodLabels := labels.FromStrings(
model.InstanceLabel, "0.0.0.0:8855",
model.JobLabel, "test",
model.MetricNameLabel, "summary_test",
)

_, err = tr.Append(0, goodLabels, 1917, 1.0)
_, err := tr.Append(0, goodLabels, 1917, 1.0)
require.ErrorIs(t, err, errEmptyQuantileLabel)
}

func nopObsRecv() (*obsreport.Receiver, error) {
return obsreport.NewReceiver(obsreport.ReceiverSettings{
func nopObsRecv(t *testing.T) *obsreport.Receiver {
res, err := obsreport.NewReceiver(obsreport.ReceiverSettings{
ReceiverID: component.NewID("prometheus"),
Transport: transport,
ReceiverCreateSettings: receivertest.NewNopCreateSettings(),
})

assert.NoError(t, err)
return res
}

func TestMetricBuilderCounters(t *testing.T) {
Expand Down Expand Up @@ -1047,9 +1028,7 @@ func (tt buildTestData) run(t *testing.T) {
st := ts
for i, page := range tt.inputs {
sink := new(consumertest.MetricsSink)
recv, err := nopObsRecv()
assert.NoError(t, err)
tr := newTransaction(scrapeCtx, &startTimeAdjuster{startTime: startTimestamp}, sink, nil, receivertest.NewNopCreateSettings(), recv)
tr := newTransaction(scrapeCtx, &startTimeAdjuster{startTime: startTimestamp}, sink, nil, receivertest.NewNopCreateSettings(), nopObsRecv(t))
for _, pt := range page.pts {
// set ts for testing
pt.t = st
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ configuration.

`otelcol.auth.bearer` does not expose any component-specific debug information.

## Example
## Examples

### Default scheme via gRPC transport

Expand Down
16 changes: 15 additions & 1 deletion pkg/traces/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,14 +132,28 @@ func (i *Instance) buildAndStartPipeline(ctx context.Context, cfg InstanceConfig
Version: build.Version,
}

// useOtelForInternalMetrics is required so that the Collector service configures Collector components using the Otel SDK
// instead of OpenCensus. If this is not specified, then the OtelMetricViews and OtelMetricReader parameters which we
// pass to service.New() below will not be taken into account. This would mean that metrics from custom components such as
// the one in pkg/traces/servicegraphprocessor would not work.
//
// disableHighCardinalityMetrics is required so that we don't include labels containing ports and IP addresses in gRPC metrics.
// Example metric with high cardinality...
// rpc_server_duration_bucket{net_sock_peer_addr="127.0.0.1",net_sock_peer_port="59947",rpc_grpc_status_code="0",rpc_method="Export",rpc_service="opentelemetry.proto.collector.trace.v1.TraceService",rpc_system="grpc",traces_config="default",le="7500"} 294
// ... the same metric when disableHighCardinalityMetrics is switched on looks like this:
// rpc_server_duration_bucket{rpc_grpc_status_code="0",rpc_method="Export",rpc_service="opentelemetry.proto.collector.trace.v1.TraceService",rpc_system="grpc",traces_config="default",le="7500"} 32
// For more context:
// https://opentelemetry.io/docs/specs/otel/metrics/semantic_conventions/rpc-metrics/
// https://github.com/open-telemetry/opentelemetry-go-contrib/pull/2700
// https://github.com/open-telemetry/opentelemetry-collector/pull/6788/files
err = enableOtelFeatureGates(
"telemetry.useOtelForInternalMetrics",
"telemetry.disableHighCardinalityMetrics")
if err != nil {
return err
}

promExporter, err := traceutils.PromeheusExporter(reg)
promExporter, err := traceutils.PrometheusExporter(reg)
if err != nil {
return fmt.Errorf("error creating otel prometheus exporter: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/traces/internal/traceutils/otel_meter_settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
otelprom "go.opentelemetry.io/otel/exporters/prometheus"
)

func PromeheusExporter(reg prometheus.Registerer) (*otelprom.Exporter, error) {
func PrometheusExporter(reg prometheus.Registerer) (*otelprom.Exporter, error) {
return otelprom.New(
otelprom.WithRegisterer(reg),
otelprom.WithoutUnits(),
Expand Down
2 changes: 1 addition & 1 deletion pkg/traces/servicegraphprocessor/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func TestConsumeMetrics(t *testing.T) {
}

func getTestMeterProvider(t *testing.T, reg prometheus.Registerer) *sdkmetric.MeterProvider {
promExporter, err := traceutils.PromeheusExporter(reg)
promExporter, err := traceutils.PrometheusExporter(reg)
require.NoError(t, err)

mp := sdkmetric.NewMeterProvider(
Expand Down

0 comments on commit 25b9be4

Please sign in to comment.