diff --git a/exporter/loadbalancingexporter/documentation.md b/exporter/loadbalancingexporter/documentation.md new file mode 100644 index 000000000000..f30a262049ec --- /dev/null +++ b/exporter/loadbalancingexporter/documentation.md @@ -0,0 +1,47 @@ +[comment]: <> (Code generated by mdatagen. DO NOT EDIT.) + +# loadbalancing + +## Internal Telemetry + +The following telemetry is emitted by this component. + +### loadbalancer_backend_latency + +Response latency in ms for the backends. + +| Unit | Metric Type | Value Type | +| ---- | ----------- | ---------- | +| ms | Histogram | Int | + +### loadbalancer_backend_outcome + +Number of successes and failures for each endpoint. + +| Unit | Metric Type | Value Type | Monotonic | +| ---- | ----------- | ---------- | --------- | +| {outcomes} | Sum | Int | true | + +### loadbalancer_num_backend_updates + +Number of times the list of backends was updated. + +| Unit | Metric Type | Value Type | Monotonic | +| ---- | ----------- | ---------- | --------- | +| {updates} | Sum | Int | true | + +### loadbalancer_num_backends + +Current number of backends in use. + +| Unit | Metric Type | Value Type | +| ---- | ----------- | ---------- | +| {backends} | Gauge | Int | + +### loadbalancer_num_resolutions + +Number of times the resolver has triggered new resolutions. + +| Unit | Metric Type | Value Type | Monotonic | +| ---- | ----------- | ---------- | --------- | +| {resolutions} | Sum | Int | true | diff --git a/exporter/loadbalancingexporter/factory.go b/exporter/loadbalancingexporter/factory.go index 7288bc54be94..f1c37e151757 100644 --- a/exporter/loadbalancingexporter/factory.go +++ b/exporter/loadbalancingexporter/factory.go @@ -8,7 +8,6 @@ package loadbalancingexporter // import "github.com/open-telemetry/opentelemetry import ( "context" - "go.opencensus.io/stats/view" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/exporter" "go.opentelemetry.io/collector/exporter/otlpexporter" @@ -18,8 +17,6 @@ import ( // NewFactory creates a factory for the exporter. func NewFactory() exporter.Factory { - _ = view.Register(metricViews()...) - return exporter.NewFactory( metadata.Type, createDefaultConfig, diff --git a/exporter/loadbalancingexporter/generated_component_telemetry_test.go b/exporter/loadbalancingexporter/generated_component_telemetry_test.go new file mode 100644 index 000000000000..9b3268b2052d --- /dev/null +++ b/exporter/loadbalancingexporter/generated_component_telemetry_test.go @@ -0,0 +1,76 @@ +// Code generated by mdatagen. DO NOT EDIT. + +package loadbalancingexporter + +import ( + "context" + "testing" + + "github.com/stretchr/testify/require" + sdkmetric "go.opentelemetry.io/otel/sdk/metric" + "go.opentelemetry.io/otel/sdk/metric/metricdata" + "go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/exporter" + "go.opentelemetry.io/collector/exporter/exportertest" +) + +type componentTestTelemetry struct { + reader *sdkmetric.ManualReader + meterProvider *sdkmetric.MeterProvider +} + +func (tt *componentTestTelemetry) NewSettings() exporter.Settings { + settings := exportertest.NewNopSettings() + settings.MeterProvider = tt.meterProvider + settings.ID = component.NewID(component.MustNewType("loadbalancing")) + + return settings +} + +func setupTestTelemetry() componentTestTelemetry { + reader := sdkmetric.NewManualReader() + return componentTestTelemetry{ + reader: reader, + meterProvider: sdkmetric.NewMeterProvider(sdkmetric.WithReader(reader)), + } +} + +func (tt *componentTestTelemetry) assertMetrics(t *testing.T, expected []metricdata.Metrics) { + var md metricdata.ResourceMetrics + require.NoError(t, tt.reader.Collect(context.Background(), &md)) + // ensure all required metrics are present + for _, want := range expected { + got := tt.getMetric(want.Name, md) + metricdatatest.AssertEqual(t, want, got, metricdatatest.IgnoreTimestamp()) + } + + // ensure no additional metrics are emitted + require.Equal(t, len(expected), tt.len(md)) +} + +func (tt *componentTestTelemetry) getMetric(name string, got metricdata.ResourceMetrics) metricdata.Metrics { + for _, sm := range got.ScopeMetrics { + for _, m := range sm.Metrics { + if m.Name == name { + return m + } + } + } + + return metricdata.Metrics{} +} + +func (tt *componentTestTelemetry) len(got metricdata.ResourceMetrics) int { + metricsCount := 0 + for _, sm := range got.ScopeMetrics { + metricsCount += len(sm.Metrics) + } + + return metricsCount +} + +func (tt *componentTestTelemetry) Shutdown(ctx context.Context) error { + return tt.meterProvider.Shutdown(ctx) +} diff --git a/exporter/loadbalancingexporter/generated_package_test.go b/exporter/loadbalancingexporter/generated_package_test.go index 9f3b2bb9b31d..469069df786b 100644 --- a/exporter/loadbalancingexporter/generated_package_test.go +++ b/exporter/loadbalancingexporter/generated_package_test.go @@ -9,5 +9,5 @@ import ( ) func TestMain(m *testing.M) { - goleak.VerifyTestMain(m, goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start")) + goleak.VerifyTestMain(m) } diff --git a/exporter/loadbalancingexporter/go.mod b/exporter/loadbalancingexporter/go.mod index 7ffad882a2f5..98be1e66b55f 100644 --- a/exporter/loadbalancingexporter/go.mod +++ b/exporter/loadbalancingexporter/go.mod @@ -12,8 +12,8 @@ require ( github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden v0.103.0 github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest v0.103.0 github.com/stretchr/testify v1.9.0 - go.opencensus.io v0.24.0 go.opentelemetry.io/collector/component v0.103.0 + go.opentelemetry.io/collector/config/configtelemetry v0.103.0 go.opentelemetry.io/collector/confmap v0.103.0 go.opentelemetry.io/collector/consumer v0.103.0 go.opentelemetry.io/collector/exporter v0.103.0 @@ -21,7 +21,9 @@ require ( go.opentelemetry.io/collector/otelcol v0.103.0 go.opentelemetry.io/collector/pdata v1.10.0 go.opentelemetry.io/collector/semconv v0.103.0 + go.opentelemetry.io/otel v1.27.0 go.opentelemetry.io/otel/metric v1.27.0 + go.opentelemetry.io/otel/sdk/metric v1.27.0 go.opentelemetry.io/otel/trace v1.27.0 go.uber.org/goleak v1.3.0 go.uber.org/multierr v1.11.0 @@ -101,6 +103,7 @@ require ( github.com/tklauser/go-sysconf v0.3.12 // indirect github.com/tklauser/numcpus v0.6.1 // indirect github.com/yusufpapurcu/wmi v1.2.4 // indirect + go.opencensus.io v0.24.0 // indirect go.opentelemetry.io/collector v0.103.0 // indirect go.opentelemetry.io/collector/config/configauth v0.103.0 // indirect go.opentelemetry.io/collector/config/configcompression v1.10.0 // indirect @@ -108,7 +111,6 @@ require ( go.opentelemetry.io/collector/config/confignet v0.103.0 // indirect go.opentelemetry.io/collector/config/configopaque v1.10.0 // indirect go.opentelemetry.io/collector/config/configretry v0.103.0 // indirect - go.opentelemetry.io/collector/config/configtelemetry v0.103.0 // indirect go.opentelemetry.io/collector/config/configtls v0.103.0 // indirect go.opentelemetry.io/collector/config/internal v0.103.0 // indirect go.opentelemetry.io/collector/confmap/converter/expandconverter v0.103.0 // indirect @@ -128,7 +130,6 @@ require ( go.opentelemetry.io/contrib/config v0.7.0 // indirect go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.52.0 // indirect go.opentelemetry.io/contrib/propagators/b3 v1.27.0 // indirect - go.opentelemetry.io/otel v1.27.0 // indirect go.opentelemetry.io/otel/bridge/opencensus v1.27.0 // indirect go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.27.0 // indirect go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v1.27.0 // indirect @@ -139,7 +140,6 @@ require ( go.opentelemetry.io/otel/exporters/stdout/stdoutmetric v1.27.0 // indirect go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.27.0 // indirect go.opentelemetry.io/otel/sdk v1.27.0 // indirect - go.opentelemetry.io/otel/sdk/metric v1.27.0 // indirect go.opentelemetry.io/proto/otlp v1.2.0 // indirect golang.org/x/exp v0.0.0-20240506185415-9bf2ced13842 // indirect golang.org/x/net v0.26.0 // indirect diff --git a/exporter/loadbalancingexporter/internal/metadata/generated_telemetry.go b/exporter/loadbalancingexporter/internal/metadata/generated_telemetry.go index e22a8fdda8f5..d77f081da550 100644 --- a/exporter/loadbalancingexporter/internal/metadata/generated_telemetry.go +++ b/exporter/loadbalancingexporter/internal/metadata/generated_telemetry.go @@ -3,9 +3,14 @@ package metadata import ( - "go.opentelemetry.io/collector/component" + "errors" + "go.opentelemetry.io/otel/metric" + "go.opentelemetry.io/otel/metric/noop" "go.opentelemetry.io/otel/trace" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/config/configtelemetry" ) func Meter(settings component.TelemetrySettings) metric.Meter { @@ -15,3 +20,71 @@ func Meter(settings component.TelemetrySettings) metric.Meter { func Tracer(settings component.TelemetrySettings) trace.Tracer { return settings.TracerProvider.Tracer("otelcol/loadbalancing") } + +// TelemetryBuilder provides an interface for components to report telemetry +// as defined in metadata and user config. +type TelemetryBuilder struct { + meter metric.Meter + LoadbalancerBackendLatency metric.Int64Histogram + LoadbalancerBackendOutcome metric.Int64Counter + LoadbalancerNumBackendUpdates metric.Int64Counter + LoadbalancerNumBackends metric.Int64Gauge + LoadbalancerNumResolutions metric.Int64Counter + level configtelemetry.Level +} + +// telemetryBuilderOption applies changes to default builder. +type telemetryBuilderOption func(*TelemetryBuilder) + +// WithLevel sets the current telemetry level for the component. +func WithLevel(lvl configtelemetry.Level) telemetryBuilderOption { + return func(builder *TelemetryBuilder) { + builder.level = lvl + } +} + +// NewTelemetryBuilder provides a struct with methods to update all internal telemetry +// for a component +func NewTelemetryBuilder(settings component.TelemetrySettings, options ...telemetryBuilderOption) (*TelemetryBuilder, error) { + builder := TelemetryBuilder{level: configtelemetry.LevelBasic} + for _, op := range options { + op(&builder) + } + var err, errs error + if builder.level >= configtelemetry.LevelBasic { + builder.meter = Meter(settings) + } else { + builder.meter = noop.Meter{} + } + builder.LoadbalancerBackendLatency, err = builder.meter.Int64Histogram( + "loadbalancer_backend_latency", + metric.WithDescription("Response latency in ms for the backends."), + metric.WithUnit("ms"), metric.WithExplicitBucketBoundaries([]float64{5, 10, 20, 50, 100, 200, 500, 1000, 2000, 5000}...), + ) + errs = errors.Join(errs, err) + builder.LoadbalancerBackendOutcome, err = builder.meter.Int64Counter( + "loadbalancer_backend_outcome", + metric.WithDescription("Number of successes and failures for each endpoint."), + metric.WithUnit("{outcomes}"), + ) + errs = errors.Join(errs, err) + builder.LoadbalancerNumBackendUpdates, err = builder.meter.Int64Counter( + "loadbalancer_num_backend_updates", + metric.WithDescription("Number of times the list of backends was updated."), + metric.WithUnit("{updates}"), + ) + errs = errors.Join(errs, err) + builder.LoadbalancerNumBackends, err = builder.meter.Int64Gauge( + "loadbalancer_num_backends", + metric.WithDescription("Current number of backends in use."), + metric.WithUnit("{backends}"), + ) + errs = errors.Join(errs, err) + builder.LoadbalancerNumResolutions, err = builder.meter.Int64Counter( + "loadbalancer_num_resolutions", + metric.WithDescription("Number of times the resolver has triggered new resolutions."), + metric.WithUnit("{resolutions}"), + ) + errs = errors.Join(errs, err) + return &builder, errs +} diff --git a/exporter/loadbalancingexporter/internal/metadata/generated_telemetry_test.go b/exporter/loadbalancingexporter/internal/metadata/generated_telemetry_test.go index 8b724ff7dece..438c31259094 100644 --- a/exporter/loadbalancingexporter/internal/metadata/generated_telemetry_test.go +++ b/exporter/loadbalancingexporter/internal/metadata/generated_telemetry_test.go @@ -61,3 +61,16 @@ func TestProviders(t *testing.T) { require.Fail(t, "returned Meter not mockTracer") } } + +func TestNewTelemetryBuilder(t *testing.T) { + set := component.TelemetrySettings{ + MeterProvider: mockMeterProvider{}, + TracerProvider: mockTracerProvider{}, + } + applied := false + _, err := NewTelemetryBuilder(set, func(b *TelemetryBuilder) { + applied = true + }) + require.NoError(t, err) + require.True(t, applied) +} diff --git a/exporter/loadbalancingexporter/loadbalancer.go b/exporter/loadbalancingexporter/loadbalancer.go index 83dc1f08655e..494266d130c2 100644 --- a/exporter/loadbalancingexporter/loadbalancer.go +++ b/exporter/loadbalancingexporter/loadbalancer.go @@ -11,8 +11,9 @@ import ( "sync" "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/exporter" "go.uber.org/zap" + + "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/loadbalancingexporter/internal/metadata" ) const ( @@ -41,7 +42,7 @@ type loadBalancer struct { } // Create new load balancer -func newLoadBalancer(params exporter.Settings, cfg component.Config, factory componentFactory) (*loadBalancer, error) { +func newLoadBalancer(logger *zap.Logger, cfg component.Config, factory componentFactory, telemetry *metadata.TelemetryBuilder) (*loadBalancer, error) { oCfg := cfg.(*Config) var count = 0 @@ -64,37 +65,63 @@ func newLoadBalancer(params exporter.Settings, cfg component.Config, factory com var res resolver if oCfg.Resolver.Static != nil { var err error - res, err = newStaticResolver(oCfg.Resolver.Static.Hostnames) + res, err = newStaticResolver( + oCfg.Resolver.Static.Hostnames, + telemetry, + ) if err != nil { return nil, err } } if oCfg.Resolver.DNS != nil { - dnsLogger := params.Logger.With(zap.String("resolver", "dns")) + dnsLogger := logger.With(zap.String("resolver", "dns")) var err error - res, err = newDNSResolver(dnsLogger, oCfg.Resolver.DNS.Hostname, oCfg.Resolver.DNS.Port, oCfg.Resolver.DNS.Interval, oCfg.Resolver.DNS.Timeout) + res, err = newDNSResolver( + dnsLogger, + oCfg.Resolver.DNS.Hostname, + oCfg.Resolver.DNS.Port, + oCfg.Resolver.DNS.Interval, + oCfg.Resolver.DNS.Timeout, + telemetry, + ) if err != nil { return nil, err } } if oCfg.Resolver.K8sSvc != nil { - k8sLogger := params.Logger.With(zap.String("resolver", "k8s service")) + k8sLogger := logger.With(zap.String("resolver", "k8s service")) clt, err := newInClusterClient() if err != nil { return nil, err } - res, err = newK8sResolver(clt, k8sLogger, oCfg.Resolver.K8sSvc.Service, oCfg.Resolver.K8sSvc.Ports, oCfg.Resolver.K8sSvc.Timeout) + res, err = newK8sResolver( + clt, + k8sLogger, + oCfg.Resolver.K8sSvc.Service, + oCfg.Resolver.K8sSvc.Ports, + oCfg.Resolver.K8sSvc.Timeout, + telemetry, + ) if err != nil { return nil, err } } if oCfg.Resolver.AWSCloudMap != nil { - awsCloudMapLogger := params.Logger.With(zap.String("resolver", "aws_cloud_map")) + awsCloudMapLogger := logger.With(zap.String("resolver", "aws_cloud_map")) var err error - res, err = newCloudMapResolver(awsCloudMapLogger, &oCfg.Resolver.AWSCloudMap.NamespaceName, &oCfg.Resolver.AWSCloudMap.ServiceName, oCfg.Resolver.AWSCloudMap.Port, &oCfg.Resolver.AWSCloudMap.HealthStatus, oCfg.Resolver.AWSCloudMap.Interval, oCfg.Resolver.AWSCloudMap.Timeout) + res, err = newCloudMapResolver( + awsCloudMapLogger, + &oCfg.Resolver.AWSCloudMap.NamespaceName, + &oCfg.Resolver.AWSCloudMap.ServiceName, + oCfg.Resolver.AWSCloudMap.Port, + &oCfg.Resolver.AWSCloudMap.HealthStatus, + oCfg.Resolver.AWSCloudMap.Interval, + oCfg.Resolver.AWSCloudMap.Timeout, + telemetry, + ) if err != nil { return nil, err } @@ -105,7 +132,7 @@ func newLoadBalancer(params exporter.Settings, cfg component.Config, factory com } return &loadBalancer{ - logger: params.Logger, + logger: logger, res: res, componentFactory: factory, exporters: map[string]*wrappedExporter{}, @@ -146,7 +173,7 @@ func (lb *loadBalancer) addMissingExporters(ctx context.Context, endpoints []str lb.logger.Error("failed to create new exporter for endpoint", zap.String("endpoint", endpoint), zap.Error(err)) continue } - we := newWrappedExporter(exp) + we := newWrappedExporter(exp, endpoint) if err = we.Start(ctx, lb.host); err != nil { lb.logger.Error("failed to start new exporter for endpoint", zap.String("endpoint", endpoint), zap.Error(err)) continue diff --git a/exporter/loadbalancingexporter/loadbalancer_test.go b/exporter/loadbalancingexporter/loadbalancer_test.go index 4cf8a6650c4d..88c9efa7fc5a 100644 --- a/exporter/loadbalancingexporter/loadbalancer_test.go +++ b/exporter/loadbalancingexporter/loadbalancer_test.go @@ -20,10 +20,11 @@ import ( func TestNewLoadBalancerNoResolver(t *testing.T) { // prepare + ts, tb := getTelemetryAssets(t) cfg := &Config{} // test - p, err := newLoadBalancer(exportertest.NewNopSettings(), cfg, nil) + p, err := newLoadBalancer(ts.Logger, cfg, nil, tb) // verify require.Nil(t, p) @@ -32,6 +33,7 @@ func TestNewLoadBalancerNoResolver(t *testing.T) { func TestNewLoadBalancerInvalidStaticResolver(t *testing.T) { // prepare + ts, tb := getTelemetryAssets(t) cfg := &Config{ Resolver: ResolverSettings{ Static: &StaticResolver{Hostnames: []string{}}, @@ -39,7 +41,7 @@ func TestNewLoadBalancerInvalidStaticResolver(t *testing.T) { } // test - p, err := newLoadBalancer(exportertest.NewNopSettings(), cfg, nil) + p, err := newLoadBalancer(ts.Logger, cfg, nil, tb) // verify require.Nil(t, p) @@ -48,6 +50,7 @@ func TestNewLoadBalancerInvalidStaticResolver(t *testing.T) { func TestNewLoadBalancerInvalidDNSResolver(t *testing.T) { // prepare + ts, tb := getTelemetryAssets(t) cfg := &Config{ Resolver: ResolverSettings{ DNS: &DNSResolver{ @@ -57,7 +60,7 @@ func TestNewLoadBalancerInvalidDNSResolver(t *testing.T) { } // test - p, err := newLoadBalancer(exportertest.NewNopSettings(), cfg, nil) + p, err := newLoadBalancer(ts.Logger, cfg, nil, tb) // verify require.Nil(t, p) @@ -66,6 +69,7 @@ func TestNewLoadBalancerInvalidDNSResolver(t *testing.T) { func TestNewLoadBalancerInvalidK8sResolver(t *testing.T) { // prepare + ts, tb := getTelemetryAssets(t) cfg := &Config{ Resolver: ResolverSettings{ K8sSvc: &K8sSvcResolver{ @@ -75,7 +79,7 @@ func TestNewLoadBalancerInvalidK8sResolver(t *testing.T) { } // test - p, err := newLoadBalancer(exportertest.NewNopSettings(), cfg, nil) + p, err := newLoadBalancer(ts.Logger, cfg, nil, tb) // verify assert.Nil(t, p) @@ -84,8 +88,10 @@ func TestNewLoadBalancerInvalidK8sResolver(t *testing.T) { func TestLoadBalancerStart(t *testing.T) { // prepare + ts, tb := getTelemetryAssets(t) cfg := simpleConfig() - p, err := newLoadBalancer(exportertest.NewNopSettings(), cfg, nil) + + p, err := newLoadBalancer(ts.Logger, cfg, nil, tb) require.NotNil(t, p) require.NoError(t, err) p.res = &mockResolver{} @@ -100,6 +106,7 @@ func TestLoadBalancerStart(t *testing.T) { } func TestWithDNSResolver(t *testing.T) { + ts, tb := getTelemetryAssets(t) cfg := &Config{ Resolver: ResolverSettings{ DNS: &DNSResolver{ @@ -107,7 +114,8 @@ func TestWithDNSResolver(t *testing.T) { }, }, } - p, err := newLoadBalancer(exportertest.NewNopSettings(), cfg, nil) + + p, err := newLoadBalancer(ts.Logger, cfg, nil, tb) require.NotNil(t, p) require.NoError(t, err) @@ -121,6 +129,7 @@ func TestWithDNSResolver(t *testing.T) { func TestWithDNSResolverNoEndpoints(t *testing.T) { // prepare + ts, tb := getTelemetryAssets(t) cfg := &Config{ Resolver: ResolverSettings{ DNS: &DNSResolver{ @@ -128,7 +137,8 @@ func TestWithDNSResolverNoEndpoints(t *testing.T) { }, }, } - p, err := newLoadBalancer(exportertest.NewNopSettings(), cfg, nil) + + p, err := newLoadBalancer(ts.Logger, cfg, nil, tb) require.NotNil(t, p) require.NoError(t, err) @@ -144,6 +154,7 @@ func TestWithDNSResolverNoEndpoints(t *testing.T) { } func TestMultipleResolvers(t *testing.T) { + ts, tb := getTelemetryAssets(t) cfg := &Config{ Resolver: ResolverSettings{ Static: &StaticResolver{ @@ -156,7 +167,7 @@ func TestMultipleResolvers(t *testing.T) { } // test - p, err := newLoadBalancer(exportertest.NewNopSettings(), cfg, nil) + p, err := newLoadBalancer(ts.Logger, cfg, nil, tb) // verify assert.Nil(t, p) @@ -165,8 +176,10 @@ func TestMultipleResolvers(t *testing.T) { func TestStartFailureStaticResolver(t *testing.T) { // prepare + ts, tb := getTelemetryAssets(t) cfg := simpleConfig() - p, err := newLoadBalancer(exportertest.NewNopSettings(), cfg, nil) + + p, err := newLoadBalancer(ts.Logger, cfg, nil, tb) require.NotNil(t, p) require.NoError(t, err) @@ -200,11 +213,13 @@ func TestLoadBalancerShutdown(t *testing.T) { func TestOnBackendChanges(t *testing.T) { // prepare + ts, tb := getTelemetryAssets(t) cfg := simpleConfig() componentFactory := func(_ context.Context, _ string) (component.Component, error) { return newNopMockExporter(), nil } - p, err := newLoadBalancer(exportertest.NewNopSettings(), cfg, componentFactory) + + p, err := newLoadBalancer(ts.Logger, cfg, componentFactory, tb) require.NotNil(t, p) require.NoError(t, err) @@ -222,11 +237,13 @@ func TestOnBackendChanges(t *testing.T) { func TestRemoveExtraExporters(t *testing.T) { // prepare + ts, tb := getTelemetryAssets(t) cfg := simpleConfig() componentFactory := func(_ context.Context, _ string) (component.Component, error) { return newNopMockExporter(), nil } - p, err := newLoadBalancer(exportertest.NewNopSettings(), cfg, componentFactory) + + p, err := newLoadBalancer(ts.Logger, cfg, componentFactory, tb) require.NotNil(t, p) require.NoError(t, err) @@ -243,6 +260,7 @@ func TestRemoveExtraExporters(t *testing.T) { func TestAddMissingExporters(t *testing.T) { // prepare + ts, tb := getTelemetryAssets(t) cfg := simpleConfig() exporterFactory := exporter.NewFactory(component.MustNewType("otlp"), func() component.Config { return &otlpexporter.Config{} @@ -259,7 +277,7 @@ func TestAddMissingExporters(t *testing.T) { return exporterFactory.CreateTracesExporter(ctx, exportertest.NewNopSettings(), &oCfg) } - p, err := newLoadBalancer(exportertest.NewNopSettings(), cfg, fn) + p, err := newLoadBalancer(ts.Logger, cfg, fn, tb) require.NotNil(t, p) require.NoError(t, err) @@ -276,6 +294,7 @@ func TestAddMissingExporters(t *testing.T) { func TestFailedToAddMissingExporters(t *testing.T) { // prepare + ts, tb := getTelemetryAssets(t) cfg := simpleConfig() expectedErr := errors.New("some expected error") exporterFactory := exporter.NewFactory(component.MustNewType("otlp"), func() component.Config { @@ -293,7 +312,7 @@ func TestFailedToAddMissingExporters(t *testing.T) { return exporterFactory.CreateTracesExporter(ctx, exportertest.NewNopSettings(), &oCfg) } - p, err := newLoadBalancer(exportertest.NewNopSettings(), cfg, fn) + p, err := newLoadBalancer(ts.Logger, cfg, fn, tb) require.NotNil(t, p) require.NoError(t, err) @@ -350,6 +369,7 @@ func TestFailedExporterInRing(t *testing.T) { // this test is based on the discussion in the original PR for this exporter: // https://github.com/open-telemetry/opentelemetry-collector-contrib/pull/1542#discussion_r521268180 // prepare + ts, tb := getTelemetryAssets(t) cfg := &Config{ Resolver: ResolverSettings{ Static: &StaticResolver{Hostnames: []string{"endpoint-1", "endpoint-2"}}, @@ -358,7 +378,7 @@ func TestFailedExporterInRing(t *testing.T) { componentFactory := func(_ context.Context, _ string) (component.Component, error) { return newNopMockExporter(), nil } - p, err := newLoadBalancer(exportertest.NewNopSettings(), cfg, componentFactory) + p, err := newLoadBalancer(ts.Logger, cfg, componentFactory, tb) require.NotNil(t, p) require.NoError(t, err) @@ -392,6 +412,7 @@ func TestFailedExporterInRing(t *testing.T) { func TestNewLoadBalancerInvalidNamespaceAwsResolver(t *testing.T) { // prepare + ts, tb := getTelemetryAssets(t) cfg := &Config{ Resolver: ResolverSettings{ AWSCloudMap: &AWSCloudMapResolver{ @@ -401,7 +422,7 @@ func TestNewLoadBalancerInvalidNamespaceAwsResolver(t *testing.T) { } // test - p, err := newLoadBalancer(exportertest.NewNopSettings(), cfg, nil) + p, err := newLoadBalancer(ts.Logger, cfg, nil, tb) // verify assert.Nil(t, p) @@ -410,6 +431,7 @@ func TestNewLoadBalancerInvalidNamespaceAwsResolver(t *testing.T) { func TestNewLoadBalancerInvalidServiceAwsResolver(t *testing.T) { // prepare + ts, tb := getTelemetryAssets(t) cfg := &Config{ Resolver: ResolverSettings{ AWSCloudMap: &AWSCloudMapResolver{ @@ -420,7 +442,7 @@ func TestNewLoadBalancerInvalidServiceAwsResolver(t *testing.T) { } // test - p, err := newLoadBalancer(exportertest.NewNopSettings(), cfg, nil) + p, err := newLoadBalancer(ts.Logger, cfg, nil, tb) // verify assert.Nil(t, p) @@ -428,5 +450,5 @@ func TestNewLoadBalancerInvalidServiceAwsResolver(t *testing.T) { } func newNopMockExporter() *wrappedExporter { - return newWrappedExporter(mockComponent{}) + return newWrappedExporter(mockComponent{}, "mock") } diff --git a/exporter/loadbalancingexporter/log_exporter.go b/exporter/loadbalancingexporter/log_exporter.go index ea1feca6c670..49068c7d9e44 100644 --- a/exporter/loadbalancingexporter/log_exporter.go +++ b/exporter/loadbalancingexporter/log_exporter.go @@ -9,16 +9,16 @@ import ( "sync" "time" - "go.opencensus.io/stats" - "go.opencensus.io/tag" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/exporter" "go.opentelemetry.io/collector/exporter/otlpexporter" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/plog" + "go.opentelemetry.io/otel/metric" "go.uber.org/multierr" + "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/loadbalancingexporter/internal/metadata" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/batchpersignal" ) @@ -29,22 +29,29 @@ type logExporterImp struct { started bool shutdownWg sync.WaitGroup + telemetry *metadata.TelemetryBuilder } // Create new logs exporter func newLogsExporter(params exporter.Settings, cfg component.Config) (*logExporterImp, error) { + telemetry, err := metadata.NewTelemetryBuilder(params.TelemetrySettings) + if err != nil { + return nil, err + } exporterFactory := otlpexporter.NewFactory() - - lb, err := newLoadBalancer(params, cfg, func(ctx context.Context, endpoint string) (component.Component, error) { + cfFunc := func(ctx context.Context, endpoint string) (component.Component, error) { oCfg := buildExporterConfig(cfg.(*Config), endpoint) return exporterFactory.CreateLogsExporter(ctx, params, &oCfg) - }) + } + + lb, err := newLoadBalancer(params.Logger, cfg, cfFunc, telemetry) if err != nil { return nil, err } return &logExporterImp{ loadBalancer: lb, + telemetry: telemetry, }, nil } @@ -87,7 +94,7 @@ func (e *logExporterImp) consumeLog(ctx context.Context, ld plog.Logs) error { balancingKey = random() } - le, endpoint, err := e.loadBalancer.exporterAndEndpoint(balancingKey[:]) + le, _, err := e.loadBalancer.exporterAndEndpoint(balancingKey[:]) if err != nil { return err } @@ -98,16 +105,11 @@ func (e *logExporterImp) consumeLog(ctx context.Context, ld plog.Logs) error { start := time.Now() err = le.ConsumeLogs(ctx, ld) duration := time.Since(start) + e.telemetry.LoadbalancerBackendLatency.Record(ctx, duration.Milliseconds(), metric.WithAttributeSet(le.endpointAttr)) if err == nil { - _ = stats.RecordWithTags( - ctx, - []tag.Mutator{tag.Upsert(endpointTagKey, endpoint), successTrueMutator}, - mBackendLatency.M(duration.Milliseconds())) + e.telemetry.LoadbalancerBackendOutcome.Add(ctx, 1, metric.WithAttributeSet(le.successAttr)) } else { - _ = stats.RecordWithTags( - ctx, - []tag.Mutator{tag.Upsert(endpointTagKey, endpoint), successFalseMutator}, - mBackendLatency.M(duration.Milliseconds())) + e.telemetry.LoadbalancerBackendOutcome.Add(ctx, 1, metric.WithAttributeSet(le.failureAttr)) } return err diff --git a/exporter/loadbalancingexporter/log_exporter_test.go b/exporter/loadbalancingexporter/log_exporter_test.go index b698b55e9881..27181533087a 100644 --- a/exporter/loadbalancingexporter/log_exporter_test.go +++ b/exporter/loadbalancingexporter/log_exporter_test.go @@ -54,6 +54,7 @@ func TestNewLogsExporter(t *testing.T) { } func TestLogExporterStart(t *testing.T) { + ts, tb := getTelemetryAssets(t) for _, tt := range []struct { desc string le *logExporterImp @@ -71,7 +72,8 @@ func TestLogExporterStart(t *testing.T) { "error", func() *logExporterImp { // prepare - lb, _ := newLoadBalancer(exportertest.NewNopSettings(), simpleConfig(), nil) + lb, err := newLoadBalancer(ts.Logger, simpleConfig(), nil, tb) + require.NoError(t, err) p, _ := newLogsExporter(exportertest.NewNopSettings(), simpleConfig()) lb.res = &mockResolver{ @@ -114,14 +116,16 @@ func TestLogExporterShutdown(t *testing.T) { } func TestConsumeLogs(t *testing.T) { + ts, tb := getTelemetryAssets(t) componentFactory := func(_ context.Context, _ string) (component.Component, error) { return newNopMockLogsExporter(), nil } - lb, err := newLoadBalancer(exportertest.NewNopSettings(), simpleConfig(), componentFactory) + + lb, err := newLoadBalancer(ts.Logger, simpleConfig(), componentFactory, tb) require.NotNil(t, lb) require.NoError(t, err) - p, err := newLogsExporter(exportertest.NewNopSettings(), simpleConfig()) + p, err := newLogsExporter(ts, simpleConfig()) require.NotNil(t, p) require.NoError(t, err) @@ -152,11 +156,13 @@ func TestConsumeLogsUnexpectedExporterType(t *testing.T) { componentFactory := func(_ context.Context, _ string) (component.Component, error) { return newNopMockExporter(), nil } - lb, err := newLoadBalancer(exportertest.NewNopSettings(), simpleConfig(), componentFactory) + ts, tb := getTelemetryAssets(t) + + lb, err := newLoadBalancer(ts.Logger, simpleConfig(), componentFactory, tb) require.NotNil(t, lb) require.NoError(t, err) - p, err := newLogsExporter(exportertest.NewNopSettings(), simpleConfig()) + p, err := newLogsExporter(ts, simpleConfig()) require.NotNil(t, p) require.NoError(t, err) @@ -185,15 +191,17 @@ func TestConsumeLogsUnexpectedExporterType(t *testing.T) { } func TestLogBatchWithTwoTraces(t *testing.T) { + ts, tb := getTelemetryAssets(t) sink := new(consumertest.LogsSink) componentFactory := func(_ context.Context, _ string) (component.Component, error) { return newMockLogsExporter(sink.ConsumeLogs), nil } - lb, err := newLoadBalancer(exportertest.NewNopSettings(), simpleConfig(), componentFactory) + + lb, err := newLoadBalancer(ts.Logger, simpleConfig(), componentFactory, tb) require.NotNil(t, lb) require.NoError(t, err) - p, err := newLogsExporter(exportertest.NewNopSettings(), simpleConfig()) + p, err := newLogsExporter(ts, simpleConfig()) require.NotNil(t, p) require.NoError(t, err) @@ -257,15 +265,16 @@ func TestNoLogsInBatch(t *testing.T) { } func TestLogsWithoutTraceID(t *testing.T) { + ts, tb := getTelemetryAssets(t) sink := new(consumertest.LogsSink) componentFactory := func(_ context.Context, _ string) (component.Component, error) { return newMockLogsExporter(sink.ConsumeLogs), nil } - lb, err := newLoadBalancer(exportertest.NewNopSettings(), simpleConfig(), componentFactory) + lb, err := newLoadBalancer(ts.Logger, simpleConfig(), componentFactory, tb) require.NotNil(t, lb) require.NoError(t, err) - p, err := newLogsExporter(exportertest.NewNopSettings(), simpleConfig()) + p, err := newLogsExporter(ts, simpleConfig()) require.NotNil(t, p) require.NoError(t, err) @@ -289,6 +298,7 @@ func TestLogsWithoutTraceID(t *testing.T) { // this test validates that exporter is can concurrently change the endpoints while consuming logs. func TestConsumeLogs_ConcurrentResolverChange(t *testing.T) { + ts, tb := getTelemetryAssets(t) consumeStarted := make(chan struct{}) consumeDone := make(chan struct{}) @@ -302,11 +312,11 @@ func TestConsumeLogs_ConcurrentResolverChange(t *testing.T) { componentFactory := func(_ context.Context, _ string) (component.Component, error) { return te, nil } - lb, err := newLoadBalancer(exportertest.NewNopSettings(), simpleConfig(), componentFactory) + lb, err := newLoadBalancer(ts.Logger, simpleConfig(), componentFactory, tb) require.NotNil(t, lb) require.NoError(t, err) - p, err := newLogsExporter(exportertest.NewNopSettings(), simpleConfig()) + p, err := newLogsExporter(ts, simpleConfig()) require.NotNil(t, p) require.NoError(t, err) @@ -348,7 +358,8 @@ func TestRollingUpdatesWhenConsumeLogs(t *testing.T) { // simulate rolling updates, the dns resolver should resolve in the following order // ["127.0.0.1"] -> ["127.0.0.1", "127.0.0.2"] -> ["127.0.0.2"] - res, err := newDNSResolver(zap.NewNop(), "service-1", "", 5*time.Second, 1*time.Second) + ts, tb := getTelemetryAssets(t) + res, err := newDNSResolver(zap.NewNop(), "service-1", "", 5*time.Second, 1*time.Second, tb) require.NoError(t, err) mu := sync.Mutex{} @@ -399,7 +410,7 @@ func TestRollingUpdatesWhenConsumeLogs(t *testing.T) { componentFactory := func(_ context.Context, _ string) (component.Component, error) { return newNopMockLogsExporter(), nil } - lb, err := newLoadBalancer(exportertest.NewNopSettings(), cfg, componentFactory) + lb, err := newLoadBalancer(ts.Logger, cfg, componentFactory, tb) require.NotNil(t, lb) require.NoError(t, err) @@ -412,17 +423,19 @@ func TestRollingUpdatesWhenConsumeLogs(t *testing.T) { counter1 := &atomic.Int64{} counter2 := &atomic.Int64{} + id1 := "127.0.0.1:4317" + id2 := "127.0.0.2:4317" defaultExporters := map[string]*wrappedExporter{ - "127.0.0.1:4317": newWrappedExporter(newMockLogsExporter(func(_ context.Context, _ plog.Logs) error { + id1: newWrappedExporter(newMockLogsExporter(func(_ context.Context, _ plog.Logs) error { counter1.Add(1) // simulate an unreachable backend time.Sleep(10 * time.Second) return nil - })), - "127.0.0.2:4317": newWrappedExporter(newMockLogsExporter(func(_ context.Context, _ plog.Logs) error { + }), id1), + id2: newWrappedExporter(newMockLogsExporter(func(_ context.Context, _ plog.Logs) error { counter2.Add(1) return nil - })), + }), id2), } // test diff --git a/exporter/loadbalancingexporter/metadata.yaml b/exporter/loadbalancingexporter/metadata.yaml index d0077af9cafd..9a23c1ef421b 100644 --- a/exporter/loadbalancingexporter/metadata.yaml +++ b/exporter/loadbalancingexporter/metadata.yaml @@ -25,8 +25,40 @@ tests: - backend-3:4317 - backend-4:4317 expect_consumer_error: true - goleak: - ignore: - top: - # See https://github.com/census-instrumentation/opencensus-go/issues/1191 for more information. - - "go.opencensus.io/stats/view.(*worker).start" + +telemetry: + metrics: + loadbalancer_num_resolutions: + enabled: true + description: Number of times the resolver has triggered new resolutions. + unit: "{resolutions}" + sum: + value_type: int + monotonic: true + loadbalancer_num_backends: + enabled: true + description: Current number of backends in use. + unit: "{backends}" + gauge: + value_type: int + loadbalancer_backend_latency: + enabled: true + description: Response latency in ms for the backends. + unit: ms + histogram: + value_type: int + bucket_boundaries: [5, 10, 20, 50, 100, 200, 500, 1000, 2000, 5000] + loadbalancer_num_backend_updates: + enabled: true + description: Number of times the list of backends was updated. + unit: "{updates}" + sum: + value_type: int + monotonic: true + loadbalancer_backend_outcome: + enabled: true + description: Number of successes and failures for each endpoint. + unit: "{outcomes}" + sum: + value_type: int + monotonic: true diff --git a/exporter/loadbalancingexporter/metrics.go b/exporter/loadbalancingexporter/metrics.go deleted file mode 100644 index 0bbdb4528d13..000000000000 --- a/exporter/loadbalancingexporter/metrics.go +++ /dev/null @@ -1,73 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package loadbalancingexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/loadbalancingexporter" - -import ( - "go.opencensus.io/stats" - "go.opencensus.io/stats/view" - "go.opencensus.io/tag" -) - -var ( - mNumResolutions = stats.Int64("loadbalancer_num_resolutions", "Number of times the resolver triggered a new resolutions", stats.UnitDimensionless) - mNumBackends = stats.Int64("loadbalancer_num_backends", "Current number of backends in use", stats.UnitDimensionless) - mBackendLatency = stats.Int64("loadbalancer_backend_latency", "Response latency in ms for the backends", stats.UnitMilliseconds) - - endpointTagKey = tag.MustNewKey("endpoint") - successTrueMutator = tag.Upsert(tag.MustNewKey("success"), "true") - successFalseMutator = tag.Upsert(tag.MustNewKey("success"), "false") -) - -// metricViews return the metrics views according to given telemetry level. -func metricViews() []*view.View { - return []*view.View{ - { - Name: mNumResolutions.Name(), - Measure: mNumResolutions, - Description: mNumResolutions.Description(), - Aggregation: view.Count(), - TagKeys: []tag.Key{ - tag.MustNewKey("resolver"), - tag.MustNewKey("success"), - }, - }, - { - Name: mNumBackends.Name(), - Measure: mNumBackends, - Description: mNumBackends.Description(), - Aggregation: view.LastValue(), - TagKeys: []tag.Key{ - tag.MustNewKey("resolver"), - }, - }, - { - Name: "loadbalancer_num_backend_updates", // counts the number of times the measure was changed - Measure: mNumBackends, - Description: "Number of times the list of backends was updated", - Aggregation: view.Count(), - TagKeys: []tag.Key{ - tag.MustNewKey("resolver"), - }, - }, - { - Name: mBackendLatency.Name(), - Measure: mBackendLatency, - Description: mBackendLatency.Description(), - TagKeys: []tag.Key{ - tag.MustNewKey("endpoint"), - }, - Aggregation: view.Distribution(0, 5, 10, 20, 50, 100, 200, 500, 1000, 2000, 5000), - }, - { - Name: "loadbalancer_backend_outcome", - Measure: mBackendLatency, - Description: "Number of success/failures for each endpoint", - TagKeys: []tag.Key{ - tag.MustNewKey("endpoint"), - tag.MustNewKey("success"), - }, - Aggregation: view.Count(), - }, - } -} diff --git a/exporter/loadbalancingexporter/metrics_exporter.go b/exporter/loadbalancingexporter/metrics_exporter.go index 70b2f4a88593..ab26d6bff305 100644 --- a/exporter/loadbalancingexporter/metrics_exporter.go +++ b/exporter/loadbalancingexporter/metrics_exporter.go @@ -10,16 +10,16 @@ import ( "sync" "time" - "go.opencensus.io/stats" - "go.opencensus.io/tag" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/exporter" "go.opentelemetry.io/collector/exporter/otlpexporter" "go.opentelemetry.io/collector/pdata/pmetric" conventions "go.opentelemetry.io/collector/semconv/v1.6.1" + "go.opentelemetry.io/otel/metric" "go.uber.org/multierr" + "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/loadbalancingexporter/internal/metadata" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/identity" ) @@ -32,20 +32,30 @@ type metricExporterImp struct { stopped bool shutdownWg sync.WaitGroup + telemetry *metadata.TelemetryBuilder } func newMetricsExporter(params exporter.Settings, cfg component.Config) (*metricExporterImp, error) { + telemetry, err := metadata.NewTelemetryBuilder(params.TelemetrySettings) + if err != nil { + return nil, err + } exporterFactory := otlpexporter.NewFactory() - - lb, err := newLoadBalancer(params, cfg, func(ctx context.Context, endpoint string) (component.Component, error) { + cfFunc := func(ctx context.Context, endpoint string) (component.Component, error) { oCfg := buildExporterConfig(cfg.(*Config), endpoint) return exporterFactory.CreateMetricsExporter(ctx, params, &oCfg) - }) + } + + lb, err := newLoadBalancer(params.Logger, cfg, cfFunc, telemetry) if err != nil { return nil, err } - metricExporter := metricExporterImp{loadBalancer: lb, routingKey: svcRouting} + metricExporter := metricExporterImp{ + loadBalancer: lb, + routingKey: svcRouting, + telemetry: telemetry, + } switch cfg.(*Config).RoutingKey { case svcRoutingStr, "": @@ -122,17 +132,11 @@ func (e *metricExporterImp) ConsumeMetrics(ctx context.Context, md pmetric.Metri exp.consumeWG.Done() errs = multierr.Append(errs, err) - + e.telemetry.LoadbalancerBackendLatency.Record(ctx, duration.Milliseconds(), metric.WithAttributeSet(exp.endpointAttr)) if err == nil { - _ = stats.RecordWithTags( - ctx, - []tag.Mutator{tag.Upsert(endpointTagKey, exporterEndpoints[exp]), successTrueMutator}, - mBackendLatency.M(duration.Milliseconds())) + e.telemetry.LoadbalancerBackendOutcome.Add(ctx, 1, metric.WithAttributeSet(exp.successAttr)) } else { - _ = stats.RecordWithTags( - ctx, - []tag.Mutator{tag.Upsert(endpointTagKey, exporterEndpoints[exp]), successFalseMutator}, - mBackendLatency.M(duration.Milliseconds())) + e.telemetry.LoadbalancerBackendOutcome.Add(ctx, 1, metric.WithAttributeSet(exp.failureAttr)) } } diff --git a/exporter/loadbalancingexporter/metrics_exporter_test.go b/exporter/loadbalancingexporter/metrics_exporter_test.go index c3d6907c913f..b19ddedc924d 100644 --- a/exporter/loadbalancingexporter/metrics_exporter_test.go +++ b/exporter/loadbalancingexporter/metrics_exporter_test.go @@ -25,12 +25,10 @@ import ( "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/consumertest" "go.opentelemetry.io/collector/exporter" - "go.opentelemetry.io/collector/exporter/exportertest" "go.opentelemetry.io/collector/exporter/otlpexporter" "go.opentelemetry.io/collector/otelcol/otelcoltest" "go.opentelemetry.io/collector/pdata/pmetric" conventions "go.opentelemetry.io/collector/semconv/v1.9.0" - "go.uber.org/zap" "gopkg.in/yaml.v2" "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/loadbalancingexporter/internal/metadata" @@ -61,6 +59,7 @@ const ( ) func TestNewMetricsExporter(t *testing.T) { + ts, _ := getTelemetryAssets(t) for _, tt := range []struct { desc string config *Config @@ -96,7 +95,7 @@ func TestNewMetricsExporter(t *testing.T) { } { t.Run(tt.desc, func(t *testing.T) { // test - _, err := newMetricsExporter(exportertest.NewNopSettings(), tt.config) + _, err := newMetricsExporter(ts, tt.config) // verify require.Equal(t, tt.err, err) @@ -105,6 +104,7 @@ func TestNewMetricsExporter(t *testing.T) { } func TestMetricsExporterStart(t *testing.T) { + ts, tb := getTelemetryAssets(t) for _, tt := range []struct { desc string te *metricExporterImp @@ -113,7 +113,7 @@ func TestMetricsExporterStart(t *testing.T) { { "ok", func() *metricExporterImp { - p, _ := newMetricsExporter(exportertest.NewNopSettings(), serviceBasedRoutingConfig()) + p, _ := newMetricsExporter(ts, serviceBasedRoutingConfig()) return p }(), nil, @@ -121,8 +121,10 @@ func TestMetricsExporterStart(t *testing.T) { { "error", func() *metricExporterImp { - lb, _ := newLoadBalancer(exportertest.NewNopSettings(), serviceBasedRoutingConfig(), nil) - p, _ := newMetricsExporter(exportertest.NewNopSettings(), serviceBasedRoutingConfig()) + lb, err := newLoadBalancer(ts.Logger, serviceBasedRoutingConfig(), nil, tb) + require.NoError(t, err) + + p, _ := newMetricsExporter(ts, serviceBasedRoutingConfig()) lb.res = &mockResolver{ onStart: func(context.Context) error { @@ -152,7 +154,8 @@ func TestMetricsExporterStart(t *testing.T) { } func TestMetricsExporterShutdown(t *testing.T) { - p, err := newMetricsExporter(exportertest.NewNopSettings(), serviceBasedRoutingConfig()) + ts, _ := getTelemetryAssets(t) + p, err := newMetricsExporter(ts, serviceBasedRoutingConfig()) require.NotNil(t, p) require.NoError(t, err) @@ -301,6 +304,7 @@ func TestSplitMetrics(t *testing.T) { } func TestConsumeMetrics_SingleEndpoint(t *testing.T) { + ts, tb := getTelemetryAssets(t) t.Parallel() testCases := []struct { @@ -328,7 +332,7 @@ func TestConsumeMetrics_SingleEndpoint(t *testing.T) { t.Run(tc.name, func(t *testing.T) { t.Parallel() - createSettings := exportertest.NewNopSettings() + createSettings := ts config := &Config{ Resolver: ResolverSettings{ Static: &StaticResolver{Hostnames: []string{"endpoint-1"}}, @@ -348,7 +352,7 @@ func TestConsumeMetrics_SingleEndpoint(t *testing.T) { return newMockMetricsExporter(sink.ConsumeMetrics), nil } - lb, err := newLoadBalancer(createSettings, config, componentFactory) + lb, err := newLoadBalancer(ts.Logger, config, componentFactory, tb) require.NoError(t, err) require.NotNil(t, lb) @@ -399,6 +403,7 @@ func TestConsumeMetrics_SingleEndpoint(t *testing.T) { } func TestConsumeMetrics_TripleEndpoint(t *testing.T) { + ts, tb := getTelemetryAssets(t) // I'm not fully satisfied with the design of this test. // We're hard-reliant on the implementation of the ring hash to give use the routing. // So if that algorithm changes, all these tests will need to be updated. In addition, @@ -432,7 +437,7 @@ func TestConsumeMetrics_TripleEndpoint(t *testing.T) { t.Run(tc.name, func(t *testing.T) { t.Parallel() - createSettings := exportertest.NewNopSettings() + createSettings := ts config := &Config{ Resolver: ResolverSettings{ Static: &StaticResolver{Hostnames: []string{"endpoint-1", "endpoint-2", "endpoint-3"}}, @@ -465,7 +470,7 @@ func TestConsumeMetrics_TripleEndpoint(t *testing.T) { return nil, errors.New("invalid endpoint") } - lb, err := newLoadBalancer(createSettings, config, componentFactory) + lb, err := newLoadBalancer(ts.Logger, config, componentFactory, tb) require.NoError(t, err) require.NotNil(t, lb) @@ -529,6 +534,7 @@ func TestConsumeMetrics_TripleEndpoint(t *testing.T) { // this test validates that exporter is can concurrently change the endpoints while consuming metrics. func TestConsumeMetrics_ConcurrentResolverChange(t *testing.T) { + ts, tb := getTelemetryAssets(t) consumeStarted := make(chan struct{}) consumeDone := make(chan struct{}) @@ -542,11 +548,11 @@ func TestConsumeMetrics_ConcurrentResolverChange(t *testing.T) { componentFactory := func(_ context.Context, _ string) (component.Component, error) { return te, nil } - lb, err := newLoadBalancer(exportertest.NewNopSettings(), simpleConfig(), componentFactory) + lb, err := newLoadBalancer(ts.Logger, simpleConfig(), componentFactory, tb) require.NotNil(t, lb) require.NoError(t, err) - p, err := newMetricsExporter(exportertest.NewNopSettings(), simpleConfig()) + p, err := newMetricsExporter(ts, simpleConfig()) require.NotNil(t, p) require.NoError(t, err) @@ -580,14 +586,15 @@ func TestConsumeMetrics_ConcurrentResolverChange(t *testing.T) { } func TestConsumeMetricsExporterNoEndpoint(t *testing.T) { + ts, tb := getTelemetryAssets(t) componentFactory := func(_ context.Context, _ string) (component.Component, error) { return newNopMockMetricsExporter(), nil } - lb, err := newLoadBalancer(exportertest.NewNopSettings(), serviceBasedRoutingConfig(), componentFactory) + lb, err := newLoadBalancer(ts.Logger, serviceBasedRoutingConfig(), componentFactory, tb) require.NotNil(t, lb) require.NoError(t, err) - p, err := newMetricsExporter(exportertest.NewNopSettings(), endpoint2Config()) + p, err := newMetricsExporter(ts, endpoint2Config()) require.NotNil(t, p) require.NoError(t, err) @@ -614,14 +621,15 @@ func TestConsumeMetricsExporterNoEndpoint(t *testing.T) { } func TestConsumeMetricsUnexpectedExporterType(t *testing.T) { + ts, tb := getTelemetryAssets(t) componentFactory := func(_ context.Context, _ string) (component.Component, error) { return newNopMockExporter(), nil } - lb, err := newLoadBalancer(exportertest.NewNopSettings(), serviceBasedRoutingConfig(), componentFactory) + lb, err := newLoadBalancer(ts.Logger, serviceBasedRoutingConfig(), componentFactory, tb) require.NotNil(t, lb) require.NoError(t, err) - p, err := newMetricsExporter(exportertest.NewNopSettings(), serviceBasedRoutingConfig()) + p, err := newMetricsExporter(ts, serviceBasedRoutingConfig()) require.NotNil(t, p) require.NoError(t, err) @@ -680,15 +688,16 @@ func TestBuildExporterConfigUnknown(t *testing.T) { } func TestBatchWithTwoMetrics(t *testing.T) { + ts, tb := getTelemetryAssets(t) sink := new(consumertest.MetricsSink) componentFactory := func(_ context.Context, _ string) (component.Component, error) { return newMockMetricsExporter(sink.ConsumeMetrics), nil } - lb, err := newLoadBalancer(exportertest.NewNopSettings(), serviceBasedRoutingConfig(), componentFactory) + lb, err := newLoadBalancer(ts.Logger, serviceBasedRoutingConfig(), componentFactory, tb) require.NotNil(t, lb) require.NoError(t, err) - p, err := newMetricsExporter(exportertest.NewNopSettings(), serviceBasedRoutingConfig()) + p, err := newMetricsExporter(ts, serviceBasedRoutingConfig()) require.NotNil(t, p) require.NoError(t, err) @@ -710,6 +719,7 @@ func TestBatchWithTwoMetrics(t *testing.T) { func TestRollingUpdatesWhenConsumeMetrics(t *testing.T) { t.Skip("Flaky Test - See https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/13331") + ts, tb := getTelemetryAssets(t) // this test is based on the discussion in the following issue for this exporter: // https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/1690 @@ -717,7 +727,7 @@ func TestRollingUpdatesWhenConsumeMetrics(t *testing.T) { // simulate rolling updates, the dns resolver should resolve in the following order // ["127.0.0.1"] -> ["127.0.0.1", "127.0.0.2"] -> ["127.0.0.2"] - res, err := newDNSResolver(zap.NewNop(), "service-1", "", 5*time.Second, 1*time.Second) + res, err := newDNSResolver(ts.Logger, "service-1", "", 5*time.Second, 1*time.Second, tb) require.NoError(t, err) mu := sync.Mutex{} @@ -768,11 +778,11 @@ func TestRollingUpdatesWhenConsumeMetrics(t *testing.T) { componentFactory := func(_ context.Context, _ string) (component.Component, error) { return newNopMockMetricsExporter(), nil } - lb, err := newLoadBalancer(exportertest.NewNopSettings(), cfg, componentFactory) + lb, err := newLoadBalancer(ts.Logger, cfg, componentFactory, tb) require.NotNil(t, lb) require.NoError(t, err) - p, err := newMetricsExporter(exportertest.NewNopSettings(), cfg) + p, err := newMetricsExporter(ts, cfg) require.NotNil(t, p) require.NoError(t, err) @@ -787,11 +797,11 @@ func TestRollingUpdatesWhenConsumeMetrics(t *testing.T) { // simulate an unreachable backend time.Sleep(10 * time.Second) return nil - })), + }), "127.0.0.1"), "127.0.0.2:4317": newWrappedExporter(newMockMetricsExporter(func(_ context.Context, _ pmetric.Metrics) error { counter2.Add(1) return nil - })), + }), "127.0.0.2"), } // test @@ -856,6 +866,7 @@ func appendSimpleMetricWithServiceName(metric pmetric.Metrics, serviceName strin } func benchConsumeMetrics(b *testing.B, endpointsCount int, metricsCount int) { + ts, tb := getTelemetryAssets(b) sink := new(consumertest.MetricsSink) componentFactory := func(_ context.Context, _ string) (component.Component, error) { return newMockMetricsExporter(sink.ConsumeMetrics), nil @@ -872,11 +883,11 @@ func benchConsumeMetrics(b *testing.B, endpointsCount int, metricsCount int) { }, } - lb, err := newLoadBalancer(exportertest.NewNopSettings(), config, componentFactory) + lb, err := newLoadBalancer(ts.Logger, config, componentFactory, tb) require.NotNil(b, lb) require.NoError(b, err) - p, err := newMetricsExporter(exportertest.NewNopSettings(), config) + p, err := newMetricsExporter(ts, config) require.NotNil(b, p) require.NoError(b, err) diff --git a/exporter/loadbalancingexporter/metrics_test.go b/exporter/loadbalancingexporter/metrics_test.go deleted file mode 100644 index 2db6ea4183fa..000000000000 --- a/exporter/loadbalancingexporter/metrics_test.go +++ /dev/null @@ -1,24 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package loadbalancingexporter - -import ( - "testing" - - "github.com/stretchr/testify/assert" -) - -func TestProcessorMetrics(t *testing.T) { - expectedViewNames := []string{ - "loadbalancer_num_resolutions", - "loadbalancer_num_backends", - "loadbalancer_num_backend_updates", - "loadbalancer_backend_latency", - } - - views := metricViews() - for i, viewName := range expectedViewNames { - assert.Equal(t, viewName, views[i].Name) - } -} diff --git a/exporter/loadbalancingexporter/resolver_aws_cloudmap.go b/exporter/loadbalancingexporter/resolver_aws_cloudmap.go index ea316df105cf..72b03f7e04e4 100644 --- a/exporter/loadbalancingexporter/resolver_aws_cloudmap.go +++ b/exporter/loadbalancingexporter/resolver_aws_cloudmap.go @@ -15,9 +15,11 @@ import ( "github.com/aws/aws-sdk-go-v2/config" "github.com/aws/aws-sdk-go-v2/service/servicediscovery" "github.com/aws/aws-sdk-go-v2/service/servicediscovery/types" - "go.opencensus.io/stats" - "go.opencensus.io/tag" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric" "go.uber.org/zap" + + "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/loadbalancingexporter/internal/metadata" ) const ( @@ -29,10 +31,10 @@ var ( errNoNamespace = errors.New("no Cloud Map namespace specified to resolve the backends") errNoServiceName = errors.New("no Cloud Map service_name specified to resolve the backends") - awsResolverMutator = tag.Upsert(tag.MustNewKey("resolver"), "aws") - - awsResolverSuccessTrueMutators = []tag.Mutator{awsResolverMutator, successTrueMutator} - awsResolverSuccessFalseMutators = []tag.Mutator{awsResolverMutator, successFalseMutator} + awsResolverAttr = attribute.String("resolver", "aws") + awsResolverAttrSet = attribute.NewSet(awsResolverAttr) + awsResolverSuccessAttrSet = attribute.NewSet(awsResolverAttr, attribute.Bool("success", true)) + awsResolverFailureAttrSet = attribute.NewSet(awsResolverAttr, attribute.Bool("success", false)) ) func createDiscoveryFunction(client *servicediscovery.Client) func(params *servicediscovery.DiscoverInstancesInput) (*servicediscovery.DiscoverInstancesOutput, error) { @@ -59,10 +61,19 @@ type cloudMapResolver struct { shutdownWg sync.WaitGroup changeCallbackLock sync.RWMutex discoveryFn func(params *servicediscovery.DiscoverInstancesInput) (*servicediscovery.DiscoverInstancesOutput, error) + telemetry *metadata.TelemetryBuilder } -func newCloudMapResolver(logger *zap.Logger, namespaceName *string, serviceName *string, port *uint16, healthStatus *types.HealthStatusFilter, interval time.Duration, timeout time.Duration) (*cloudMapResolver, error) { - // Using the SDK's default configuration, loading additional config +func newCloudMapResolver( + logger *zap.Logger, + namespaceName *string, + serviceName *string, + port *uint16, + healthStatus *types.HealthStatusFilter, + interval time.Duration, + timeout time.Duration, + tb *metadata.TelemetryBuilder, +) (*cloudMapResolver, error) { // Using the SDK's default configuration, loading additional config // and credentials values from the environment variables, shared // credentials, and shared configuration files cfg, err := config.LoadDefaultConfig(context.TODO(), config.WithDefaultRegion("us-east-1")) @@ -104,6 +115,7 @@ func newCloudMapResolver(logger *zap.Logger, namespaceName *string, serviceName resTimeout: timeout, stopCh: make(chan struct{}), discoveryFn: createDiscoveryFunction(svc), + telemetry: tb, }, nil } @@ -165,11 +177,11 @@ func (r *cloudMapResolver) resolve(ctx context.Context) ([]string, error) { QueryParameters: nil, }) if err != nil { - _ = stats.RecordWithTags(ctx, awsResolverSuccessFalseMutators, mNumResolutions.M(1)) + r.telemetry.LoadbalancerNumResolutions.Add(ctx, 1, metric.WithAttributeSet(awsResolverFailureAttrSet)) return nil, err } - _ = stats.RecordWithTags(ctx, awsResolverSuccessTrueMutators, mNumResolutions.M(1)) + r.telemetry.LoadbalancerNumResolutions.Add(ctx, 1, metric.WithAttributeSet(awsResolverSuccessAttrSet)) r.logger.Debug("resolver has discovered instances ", zap.Int("Instance Count", len(discoverInstancesOutput.Instances))) @@ -200,7 +212,8 @@ func (r *cloudMapResolver) resolve(ctx context.Context) ([]string, error) { r.updateLock.Lock() r.endpoints = backends r.updateLock.Unlock() - _ = stats.RecordWithTags(ctx, awsResolverSuccessTrueMutators, mNumBackends.M(int64(len(backends)))) + r.telemetry.LoadbalancerNumBackends.Record(ctx, int64(len(backends)), metric.WithAttributeSet(awsResolverAttrSet)) + r.telemetry.LoadbalancerNumBackendUpdates.Add(ctx, 1, metric.WithAttributeSet(awsResolverAttrSet)) // propagate the change r.changeCallbackLock.RLock() diff --git a/exporter/loadbalancingexporter/resolver_aws_cloudmap_test.go b/exporter/loadbalancingexporter/resolver_aws_cloudmap_test.go index a236f14bce0b..cbc006610ebe 100644 --- a/exporter/loadbalancingexporter/resolver_aws_cloudmap_test.go +++ b/exporter/loadbalancingexporter/resolver_aws_cloudmap_test.go @@ -25,6 +25,7 @@ var port uint16 = 1234 func TestInitialCloudMapResolution(t *testing.T) { // prepare + _, tb := getTelemetryAssets(t) res := &cloudMapResolver{ logger: zap.NewNop(), @@ -35,6 +36,7 @@ func TestInitialCloudMapResolution(t *testing.T) { resTimeout: 1 * time.Second, stopCh: make(chan struct{}), discoveryFn: mockDiscovery, + telemetry: tb, } // test @@ -56,6 +58,7 @@ func TestInitialCloudMapResolution(t *testing.T) { func TestInitialCloudMapResolutionWithPort(t *testing.T) { // prepare + _, tb := getTelemetryAssets(t) res := &cloudMapResolver{ logger: zap.NewNop(), @@ -67,6 +70,7 @@ func TestInitialCloudMapResolutionWithPort(t *testing.T) { resTimeout: 1 * time.Second, stopCh: make(chan struct{}), discoveryFn: mockDiscovery, + telemetry: tb, } // test diff --git a/exporter/loadbalancingexporter/resolver_dns.go b/exporter/loadbalancingexporter/resolver_dns.go index 97d111d3d029..0d1a5422db27 100644 --- a/exporter/loadbalancingexporter/resolver_dns.go +++ b/exporter/loadbalancingexporter/resolver_dns.go @@ -12,9 +12,11 @@ import ( "sync" "time" - "go.opencensus.io/stats" - "go.opencensus.io/tag" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric" "go.uber.org/zap" + + "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/loadbalancingexporter/internal/metadata" ) var _ resolver = (*dnsResolver)(nil) @@ -27,10 +29,10 @@ const ( var ( errNoHostname = errors.New("no hostname specified to resolve the backends") - resolverMutator = tag.Upsert(tag.MustNewKey("resolver"), "dns") - - resolverSuccessTrueMutators = []tag.Mutator{resolverMutator, successTrueMutator} - resolverSuccessFalseMutators = []tag.Mutator{resolverMutator, successFalseMutator} + dnsResolverAttr = attribute.String("resolver", "dns") + dnsResolverAttrSet = attribute.NewSet(dnsResolverAttr) + dnsResolverSuccessAttrSet = attribute.NewSet(dnsResolverAttr, attribute.Bool("success", true)) + dnsResolverFailureAttrSet = attribute.NewSet(dnsResolverAttr, attribute.Bool("success", false)) ) type dnsResolver struct { @@ -49,13 +51,21 @@ type dnsResolver struct { updateLock sync.Mutex shutdownWg sync.WaitGroup changeCallbackLock sync.RWMutex + telemetry *metadata.TelemetryBuilder } type netResolver interface { LookupIPAddr(ctx context.Context, host string) ([]net.IPAddr, error) } -func newDNSResolver(logger *zap.Logger, hostname string, port string, interval time.Duration, timeout time.Duration) (*dnsResolver, error) { +func newDNSResolver( + logger *zap.Logger, + hostname string, + port string, + interval time.Duration, + timeout time.Duration, + tb *metadata.TelemetryBuilder, +) (*dnsResolver, error) { if len(hostname) == 0 { return nil, errNoHostname } @@ -74,6 +84,7 @@ func newDNSResolver(logger *zap.Logger, hostname string, port string, interval t resInterval: interval, resTimeout: timeout, stopCh: make(chan struct{}), + telemetry: tb, }, nil } @@ -125,11 +136,11 @@ func (r *dnsResolver) resolve(ctx context.Context) ([]string, error) { addrs, err := r.resolver.LookupIPAddr(ctx, r.hostname) if err != nil { - _ = stats.RecordWithTags(ctx, resolverSuccessFalseMutators, mNumResolutions.M(1)) + r.telemetry.LoadbalancerNumResolutions.Add(ctx, 1, metric.WithAttributeSet(dnsResolverFailureAttrSet)) return nil, err } - _ = stats.RecordWithTags(ctx, resolverSuccessTrueMutators, mNumResolutions.M(1)) + r.telemetry.LoadbalancerNumResolutions.Add(ctx, 1, metric.WithAttributeSet(dnsResolverSuccessAttrSet)) backends := make([]string, len(addrs)) for i, ip := range addrs { @@ -160,7 +171,8 @@ func (r *dnsResolver) resolve(ctx context.Context) ([]string, error) { r.updateLock.Lock() r.endpoints = backends r.updateLock.Unlock() - _ = stats.RecordWithTags(ctx, resolverSuccessTrueMutators, mNumBackends.M(int64(len(backends)))) + r.telemetry.LoadbalancerNumBackends.Record(ctx, int64(len(backends)), metric.WithAttributeSet(dnsResolverAttrSet)) + r.telemetry.LoadbalancerNumBackendUpdates.Add(ctx, 1, metric.WithAttributeSet(dnsResolverAttrSet)) // propagate the change r.changeCallbackLock.RLock() diff --git a/exporter/loadbalancingexporter/resolver_dns_test.go b/exporter/loadbalancingexporter/resolver_dns_test.go index 0201c5a9cea5..f432d6cbe723 100644 --- a/exporter/loadbalancingexporter/resolver_dns_test.go +++ b/exporter/loadbalancingexporter/resolver_dns_test.go @@ -19,7 +19,8 @@ import ( func TestInitialDNSResolution(t *testing.T) { // prepare - res, err := newDNSResolver(zap.NewNop(), "service-1", "", 5*time.Second, 1*time.Second) + _, tb := getTelemetryAssets(t) + res, err := newDNSResolver(zap.NewNop(), "service-1", "", 5*time.Second, 1*time.Second, tb) require.NoError(t, err) res.resolver = &mockDNSResolver{ @@ -51,7 +52,8 @@ func TestInitialDNSResolution(t *testing.T) { func TestInitialDNSResolutionWithPort(t *testing.T) { // prepare - res, err := newDNSResolver(zap.NewNop(), "service-1", "55690", 5*time.Second, 1*time.Second) + _, tb := getTelemetryAssets(t) + res, err := newDNSResolver(zap.NewNop(), "service-1", "55690", 5*time.Second, 1*time.Second, tb) require.NoError(t, err) res.resolver = &mockDNSResolver{ @@ -83,7 +85,8 @@ func TestInitialDNSResolutionWithPort(t *testing.T) { func TestErrNoHostname(t *testing.T) { // test - res, err := newDNSResolver(zap.NewNop(), "", "", 5*time.Second, 1*time.Second) + _, tb := getTelemetryAssets(t) + res, err := newDNSResolver(zap.NewNop(), "", "", 5*time.Second, 1*time.Second, tb) // verify assert.Nil(t, res) @@ -92,7 +95,8 @@ func TestErrNoHostname(t *testing.T) { func TestCantResolve(t *testing.T) { // prepare - res, err := newDNSResolver(zap.NewNop(), "service-1", "", 5*time.Second, 1*time.Second) + _, tb := getTelemetryAssets(t) + res, err := newDNSResolver(zap.NewNop(), "service-1", "", 5*time.Second, 1*time.Second, tb) require.NoError(t, err) expectedErr := errors.New("some expected error") @@ -112,7 +116,8 @@ func TestCantResolve(t *testing.T) { func TestOnChange(t *testing.T) { // prepare - res, err := newDNSResolver(zap.NewNop(), "service-1", "", 5*time.Second, 1*time.Second) + _, tb := getTelemetryAssets(t) + res, err := newDNSResolver(zap.NewNop(), "service-1", "", 5*time.Second, 1*time.Second, tb) require.NoError(t, err) resolve := []net.IPAddr{ @@ -179,7 +184,8 @@ func TestEqualStringSlice(t *testing.T) { func TestPeriodicallyResolve(t *testing.T) { // prepare - res, err := newDNSResolver(zap.NewNop(), "service-1", "", 10*time.Millisecond, 1*time.Second) + _, tb := getTelemetryAssets(t) + res, err := newDNSResolver(zap.NewNop(), "service-1", "", 10*time.Millisecond, 1*time.Second, tb) require.NoError(t, err) counter := &atomic.Int64{} @@ -237,7 +243,8 @@ func TestPeriodicallyResolve(t *testing.T) { func TestPeriodicallyResolveFailure(t *testing.T) { // prepare - res, err := newDNSResolver(zap.NewNop(), "service-1", "", 10*time.Millisecond, 1*time.Second) + _, tb := getTelemetryAssets(t) + res, err := newDNSResolver(zap.NewNop(), "service-1", "", 10*time.Millisecond, 1*time.Second, tb) require.NoError(t, err) expectedErr := errors.New("some expected error") @@ -280,7 +287,8 @@ func TestPeriodicallyResolveFailure(t *testing.T) { func TestShutdownClearsCallbacks(t *testing.T) { // prepare - res, err := newDNSResolver(zap.NewNop(), "service-1", "", 5*time.Second, 1*time.Second) + _, tb := getTelemetryAssets(t) + res, err := newDNSResolver(zap.NewNop(), "service-1", "", 5*time.Second, 1*time.Second, tb) require.NoError(t, err) res.resolver = &mockDNSResolver{} diff --git a/exporter/loadbalancingexporter/resolver_k8s.go b/exporter/loadbalancingexporter/resolver_k8s.go index 4a2d64fb6a61..c2a9bbe20afa 100644 --- a/exporter/loadbalancingexporter/resolver_k8s.go +++ b/exporter/loadbalancingexporter/resolver_k8s.go @@ -15,8 +15,8 @@ import ( "sync" "time" - "go.opencensus.io/stats" - "go.opencensus.io/tag" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric" "go.uber.org/zap" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -27,15 +27,19 @@ import ( "k8s.io/utils/ptr" "k8s.io/utils/strings/slices" "sigs.k8s.io/controller-runtime/pkg/client/config" + + "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/loadbalancingexporter/internal/metadata" ) var _ resolver = (*k8sResolver)(nil) var ( - errNoSvc = errors.New("no service specified to resolve the backends") - k8sResolverMutator = tag.Upsert(tag.MustNewKey("resolver"), "k8s") - k8sResolverSuccessTrueMutators = []tag.Mutator{k8sResolverMutator, successTrueMutator} - k8sResolverSuccessFalseMutators = []tag.Mutator{k8sResolverMutator, successFalseMutator} + errNoSvc = errors.New("no service specified to resolve the backends") + + k8sResolverAttr = attribute.String("resolver", "k8s") + k8sResolverAttrSet = attribute.NewSet(k8sResolverAttr) + k8sResolverSuccessAttrSet = attribute.NewSet(k8sResolverAttr, attribute.Bool("success", true)) + k8sResolverFailureAttrSet = attribute.NewSet(k8sResolverAttr, attribute.Bool("success", false)) ) const ( @@ -62,12 +66,17 @@ type k8sResolver struct { updateLock sync.RWMutex shutdownWg sync.WaitGroup changeCallbackLock sync.RWMutex + + telemetry *metadata.TelemetryBuilder } func newK8sResolver(clt kubernetes.Interface, logger *zap.Logger, service string, - ports []int32, timeout time.Duration) (*k8sResolver, error) { + ports []int32, + timeout time.Duration, + tb *metadata.TelemetryBuilder, +) (*k8sResolver, error) { if len(service) == 0 { return nil, errNoSvc @@ -106,7 +115,11 @@ func newK8sResolver(clt kubernetes.Interface, } epsStore := &sync.Map{} - h := &handler{endpoints: epsStore, logger: logger} + h := &handler{ + endpoints: epsStore, + logger: logger, + telemetry: tb, + } r := &k8sResolver{ logger: logger, svcName: name, @@ -118,6 +131,7 @@ func newK8sResolver(clt kubernetes.Interface, handler: h, stopCh: make(chan struct{}), lwTimeout: timeout, + telemetry: tb, } h.callback = r.resolve @@ -184,7 +198,7 @@ func (r *k8sResolver) resolve(ctx context.Context) ([]string, error) { } return true }) - _ = stats.RecordWithTags(ctx, k8sResolverSuccessTrueMutators, mNumResolutions.M(1)) + r.telemetry.LoadbalancerNumResolutions.Add(ctx, 1, metric.WithAttributeSet(k8sResolverSuccessAttrSet)) // keep it always in the same order sort.Strings(backends) @@ -197,7 +211,8 @@ func (r *k8sResolver) resolve(ctx context.Context) ([]string, error) { r.updateLock.Lock() r.endpoints = backends r.updateLock.Unlock() - _ = stats.RecordWithTags(ctx, k8sResolverSuccessTrueMutators, mNumBackends.M(int64(len(backends)))) + r.telemetry.LoadbalancerNumBackends.Record(ctx, int64(len(backends)), metric.WithAttributeSet(k8sResolverAttrSet)) + r.telemetry.LoadbalancerNumBackendUpdates.Add(ctx, 1, metric.WithAttributeSet(k8sResolverAttrSet)) // propagate the change r.changeCallbackLock.RLock() diff --git a/exporter/loadbalancingexporter/resolver_k8s_handler.go b/exporter/loadbalancingexporter/resolver_k8s_handler.go index d93cc2d94a88..0eac62ea40d2 100644 --- a/exporter/loadbalancingexporter/resolver_k8s_handler.go +++ b/exporter/loadbalancingexporter/resolver_k8s_handler.go @@ -7,10 +7,12 @@ import ( "context" "sync" - "go.opencensus.io/stats" + "go.opentelemetry.io/otel/metric" "go.uber.org/zap" corev1 "k8s.io/api/core/v1" "k8s.io/client-go/tools/cache" + + "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/loadbalancingexporter/internal/metadata" ) var _ cache.ResourceEventHandler = (*handler)(nil) @@ -19,6 +21,7 @@ type handler struct { endpoints *sync.Map callback func(ctx context.Context) ([]string, error) logger *zap.Logger + telemetry *metadata.TelemetryBuilder } func (h handler) OnAdd(obj any, _ bool) { @@ -29,7 +32,7 @@ func (h handler) OnAdd(obj any, _ bool) { endpoints = convertToEndpoints(object) default: // unsupported h.logger.Warn("Got an unexpected Kubernetes data type during the inclusion of a new pods for the service", zap.Any("obj", obj)) - _ = stats.RecordWithTags(context.Background(), k8sResolverSuccessFalseMutators, mNumResolutions.M(1)) + h.telemetry.LoadbalancerNumResolutions.Add(context.Background(), 1, metric.WithAttributeSet(k8sResolverFailureAttrSet)) return } changed := false @@ -57,7 +60,7 @@ func (h handler) OnUpdate(oldObj, newObj any) { newEps, ok := newObj.(*corev1.Endpoints) if !ok { h.logger.Warn("Got an unexpected Kubernetes data type during the update of the pods for a service", zap.Any("obj", newObj)) - _ = stats.RecordWithTags(context.Background(), k8sResolverSuccessFalseMutators, mNumResolutions.M(1)) + h.telemetry.LoadbalancerNumResolutions.Add(context.Background(), 1, metric.WithAttributeSet(k8sResolverFailureAttrSet)) return } changed := false @@ -71,7 +74,7 @@ func (h handler) OnUpdate(oldObj, newObj any) { } default: // unsupported h.logger.Warn("Got an unexpected Kubernetes data type during the update of the pods for a service", zap.Any("obj", oldObj)) - _ = stats.RecordWithTags(context.Background(), k8sResolverSuccessFalseMutators, mNumResolutions.M(1)) + h.telemetry.LoadbalancerNumResolutions.Add(context.Background(), 1, metric.WithAttributeSet(k8sResolverFailureAttrSet)) return } } @@ -88,7 +91,7 @@ func (h handler) OnDelete(obj any) { } default: // unsupported h.logger.Warn("Got an unexpected Kubernetes data type during the removal of the pods for a service", zap.Any("obj", obj)) - _ = stats.RecordWithTags(context.Background(), k8sResolverSuccessFalseMutators, mNumResolutions.M(1)) + h.telemetry.LoadbalancerNumResolutions.Add(context.Background(), 1, metric.WithAttributeSet(k8sResolverFailureAttrSet)) return } if len(endpoints) != 0 { diff --git a/exporter/loadbalancingexporter/resolver_k8s_test.go b/exporter/loadbalancingexporter/resolver_k8s_test.go index b7faf8497743..3225f11fe535 100644 --- a/exporter/loadbalancingexporter/resolver_k8s_test.go +++ b/exporter/loadbalancingexporter/resolver_k8s_test.go @@ -56,7 +56,8 @@ func TestK8sResolve(t *testing.T) { } cl := fake.NewSimpleClientset(endpoint) - res, err := newK8sResolver(cl, zap.NewNop(), service, ports, defaultListWatchTimeout) + _, tb := getTelemetryAssets(t) + res, err := newK8sResolver(cl, zap.NewNop(), service, ports, defaultListWatchTimeout, tb) require.NoError(t, err) require.NoError(t, res.start(context.Background())) @@ -241,7 +242,8 @@ func Test_newK8sResolver(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - got, err := newK8sResolver(fake.NewSimpleClientset(), tt.args.logger, tt.args.service, tt.args.ports, defaultListWatchTimeout) + _, tb := getTelemetryAssets(t) + got, err := newK8sResolver(fake.NewSimpleClientset(), tt.args.logger, tt.args.service, tt.args.ports, defaultListWatchTimeout, tb) if tt.wantErr != nil { require.Error(t, err, tt.wantErr) } else { diff --git a/exporter/loadbalancingexporter/resolver_static.go b/exporter/loadbalancingexporter/resolver_static.go index 86759f132bbe..dac67631a0a2 100644 --- a/exporter/loadbalancingexporter/resolver_static.go +++ b/exporter/loadbalancingexporter/resolver_static.go @@ -9,25 +9,30 @@ import ( "sort" "sync" - "go.opencensus.io/stats" - "go.opencensus.io/tag" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric" + + "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/loadbalancingexporter/internal/metadata" ) var _ resolver = (*staticResolver)(nil) var ( - errNoEndpoints = errors.New("no endpoints specified for the static resolver") - - staticResolverMutators = []tag.Mutator{tag.Upsert(tag.MustNewKey("resolver"), "static"), successTrueMutator} + errNoEndpoints = errors.New("no endpoints specified for the static resolver") + staticResolverAttr = attribute.String("resolver", "static") + staticResolverAttrSet = attribute.NewSet(staticResolverAttr) + staticResolverSuccessAttrSet = attribute.NewSet(staticResolverAttr, attribute.Bool("success", true)) ) type staticResolver struct { endpoints []string onChangeCallbacks []func([]string) once sync.Once // we trigger the onChange only once + + telemetry *metadata.TelemetryBuilder } -func newStaticResolver(endpoints []string) (*staticResolver, error) { +func newStaticResolver(endpoints []string, tb *metadata.TelemetryBuilder) (*staticResolver, error) { if len(endpoints) == 0 { return nil, errNoEndpoints } @@ -41,6 +46,7 @@ func newStaticResolver(endpoints []string) (*staticResolver, error) { return &staticResolver{ endpoints: endpointsCopy, + telemetry: tb, }, nil } @@ -60,11 +66,10 @@ func (r *staticResolver) shutdown(context.Context) error { } func (r *staticResolver) resolve(ctx context.Context) ([]string, error) { - _ = stats.RecordWithTags(ctx, staticResolverMutators, mNumResolutions.M(1)) - + r.telemetry.LoadbalancerNumResolutions.Add(ctx, 1, metric.WithAttributeSet(staticResolverSuccessAttrSet)) r.once.Do(func() { - _ = stats.RecordWithTags(ctx, staticResolverMutators, mNumBackends.M(int64(len(r.endpoints)))) - + r.telemetry.LoadbalancerNumBackends.Record(ctx, int64(len(r.endpoints)), metric.WithAttributeSet(staticResolverAttrSet)) + r.telemetry.LoadbalancerNumBackendUpdates.Add(ctx, 1, metric.WithAttributeSet(staticResolverAttrSet)) for _, callback := range r.onChangeCallbacks { callback(r.endpoints) } diff --git a/exporter/loadbalancingexporter/resolver_static_test.go b/exporter/loadbalancingexporter/resolver_static_test.go index 63dda4be2367..7c17da4bb9a3 100644 --- a/exporter/loadbalancingexporter/resolver_static_test.go +++ b/exporter/loadbalancingexporter/resolver_static_test.go @@ -13,8 +13,9 @@ import ( func TestInitialResolution(t *testing.T) { // prepare + _, tb := getTelemetryAssets(t) provided := []string{"endpoint-2", "endpoint-1"} - res, err := newStaticResolver(provided) + res, err := newStaticResolver(provided, tb) require.NoError(t, err) // test @@ -34,8 +35,9 @@ func TestInitialResolution(t *testing.T) { func TestResolvedOnlyOnce(t *testing.T) { // prepare + _, tb := getTelemetryAssets(t) expected := []string{"endpoint-1", "endpoint-2"} - res, err := newStaticResolver(expected) + res, err := newStaticResolver(expected, tb) require.NoError(t, err) counter := 0 @@ -58,10 +60,11 @@ func TestResolvedOnlyOnce(t *testing.T) { func TestFailOnMissingEndpoints(t *testing.T) { // prepare + _, tb := getTelemetryAssets(t) var expected []string // test - res, err := newStaticResolver(expected) + res, err := newStaticResolver(expected, tb) // verify assert.Equal(t, errNoEndpoints, err) diff --git a/exporter/loadbalancingexporter/telemetry_utils_test.go b/exporter/loadbalancingexporter/telemetry_utils_test.go new file mode 100644 index 000000000000..21a9dc117f81 --- /dev/null +++ b/exporter/loadbalancingexporter/telemetry_utils_test.go @@ -0,0 +1,20 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package loadbalancingexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/loadbalancingexporter" + +import ( + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/exporter" + + "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/loadbalancingexporter/internal/metadata" +) + +func getTelemetryAssets(t require.TestingT) (exporter.Settings, *metadata.TelemetryBuilder) { + s := setupTestTelemetry() + st := s.NewSettings() + ts := st.TelemetrySettings + tb, err := metadata.NewTelemetryBuilder(ts) + require.NoError(t, err) + return st, tb +} diff --git a/exporter/loadbalancingexporter/trace_exporter.go b/exporter/loadbalancingexporter/trace_exporter.go index 2d441dbdad0b..3e088cea4bf9 100644 --- a/exporter/loadbalancingexporter/trace_exporter.go +++ b/exporter/loadbalancingexporter/trace_exporter.go @@ -10,15 +10,15 @@ import ( "sync" "time" - "go.opencensus.io/stats" - "go.opencensus.io/tag" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/exporter" "go.opentelemetry.io/collector/exporter/otlpexporter" "go.opentelemetry.io/collector/pdata/ptrace" + "go.opentelemetry.io/otel/metric" "go.uber.org/multierr" + "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/loadbalancingexporter/internal/metadata" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/batchpersignal" ) @@ -32,21 +32,32 @@ type traceExporterImp struct { stopped bool shutdownWg sync.WaitGroup + telemetry *metadata.TelemetryBuilder } // Create new traces exporter func newTracesExporter(params exporter.Settings, cfg component.Config) (*traceExporterImp, error) { - exporterFactory := otlpexporter.NewFactory() + telemetry, err := metadata.NewTelemetryBuilder(params.TelemetrySettings) + if err != nil { + return nil, err + } - lb, err := newLoadBalancer(params, cfg, func(ctx context.Context, endpoint string) (component.Component, error) { + exporterFactory := otlpexporter.NewFactory() + cfFunc := func(ctx context.Context, endpoint string) (component.Component, error) { oCfg := buildExporterConfig(cfg.(*Config), endpoint) return exporterFactory.CreateTracesExporter(ctx, params, &oCfg) - }) + } + + lb, err := newLoadBalancer(params.Logger, cfg, cfFunc, telemetry) if err != nil { return nil, err } - traceExporter := traceExporterImp{loadBalancer: lb, routingKey: traceIDRouting} + traceExporter := traceExporterImp{ + loadBalancer: lb, + routingKey: traceIDRouting, + telemetry: telemetry, + } switch cfg.(*Config).RoutingKey { case svcRoutingStr: @@ -115,17 +126,11 @@ func (e *traceExporterImp) ConsumeTraces(ctx context.Context, td ptrace.Traces) exp.consumeWG.Done() errs = multierr.Append(errs, err) duration := time.Since(start) - + e.telemetry.LoadbalancerBackendLatency.Record(ctx, duration.Milliseconds(), metric.WithAttributeSet(exp.endpointAttr)) if err == nil { - _ = stats.RecordWithTags( - ctx, - []tag.Mutator{tag.Upsert(endpointTagKey, endpoints[exp]), successTrueMutator}, - mBackendLatency.M(duration.Milliseconds())) + e.telemetry.LoadbalancerBackendOutcome.Add(ctx, 1, metric.WithAttributeSet(exp.successAttr)) } else { - _ = stats.RecordWithTags( - ctx, - []tag.Mutator{tag.Upsert(endpointTagKey, endpoints[exp]), successFalseMutator}, - mBackendLatency.M(duration.Milliseconds())) + e.telemetry.LoadbalancerBackendOutcome.Add(ctx, 1, metric.WithAttributeSet(exp.failureAttr)) } } diff --git a/exporter/loadbalancingexporter/trace_exporter_test.go b/exporter/loadbalancingexporter/trace_exporter_test.go index 1d39c4befd52..3378b37af740 100644 --- a/exporter/loadbalancingexporter/trace_exporter_test.go +++ b/exporter/loadbalancingexporter/trace_exporter_test.go @@ -28,7 +28,6 @@ import ( "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/ptrace" conventions "go.opentelemetry.io/collector/semconv/v1.9.0" - "go.uber.org/zap" "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/loadbalancingexporter/internal/metadata" ) @@ -77,8 +76,9 @@ func TestTracesExporterStart(t *testing.T) { { "error", func() *traceExporterImp { - lb, _ := newLoadBalancer(exportertest.NewNopSettings(), simpleConfig(), nil) - p, _ := newTracesExporter(exportertest.NewNopSettings(), simpleConfig()) + ts, tb := getTelemetryAssets(t) + lb, _ := newLoadBalancer(ts.Logger, simpleConfig(), nil, tb) + p, _ := newTracesExporter(ts, simpleConfig()) lb.res = &mockResolver{ onStart: func(context.Context) error { @@ -120,14 +120,15 @@ func TestTracesExporterShutdown(t *testing.T) { } func TestConsumeTraces(t *testing.T) { + ts, tb := getTelemetryAssets(t) componentFactory := func(_ context.Context, _ string) (component.Component, error) { return newNopMockTracesExporter(), nil } - lb, err := newLoadBalancer(exportertest.NewNopSettings(), simpleConfig(), componentFactory) + lb, err := newLoadBalancer(ts.Logger, simpleConfig(), componentFactory, tb) require.NotNil(t, lb) require.NoError(t, err) - p, err := newTracesExporter(exportertest.NewNopSettings(), simpleConfig()) + p, err := newTracesExporter(ts, simpleConfig()) require.NotNil(t, p) require.NoError(t, err) assert.Equal(t, p.routingKey, traceIDRouting) @@ -157,6 +158,7 @@ func TestConsumeTraces(t *testing.T) { // This test validates that exporter is can concurrently change the endpoints while consuming traces. func TestConsumeTraces_ConcurrentResolverChange(t *testing.T) { + ts, tb := getTelemetryAssets(t) consumeStarted := make(chan struct{}) consumeDone := make(chan struct{}) @@ -170,11 +172,11 @@ func TestConsumeTraces_ConcurrentResolverChange(t *testing.T) { componentFactory := func(_ context.Context, _ string) (component.Component, error) { return te, nil } - lb, err := newLoadBalancer(exportertest.NewNopSettings(), simpleConfig(), componentFactory) + lb, err := newLoadBalancer(ts.Logger, simpleConfig(), componentFactory, tb) require.NotNil(t, lb) require.NoError(t, err) - p, err := newTracesExporter(exportertest.NewNopSettings(), simpleConfig()) + p, err := newTracesExporter(ts, simpleConfig()) require.NotNil(t, p) require.NoError(t, err) assert.Equal(t, p.routingKey, traceIDRouting) @@ -209,14 +211,15 @@ func TestConsumeTraces_ConcurrentResolverChange(t *testing.T) { } func TestConsumeTracesServiceBased(t *testing.T) { + ts, tb := getTelemetryAssets(t) componentFactory := func(_ context.Context, _ string) (component.Component, error) { return newNopMockTracesExporter(), nil } - lb, err := newLoadBalancer(exportertest.NewNopSettings(), serviceBasedRoutingConfig(), componentFactory) + lb, err := newLoadBalancer(ts.Logger, serviceBasedRoutingConfig(), componentFactory, tb) require.NotNil(t, lb) require.NoError(t, err) - p, err := newTracesExporter(exportertest.NewNopSettings(), serviceBasedRoutingConfig()) + p, err := newTracesExporter(ts, serviceBasedRoutingConfig()) require.NotNil(t, p) require.NoError(t, err) assert.Equal(t, p.routingKey, svcRouting) @@ -275,14 +278,15 @@ func TestServiceBasedRoutingForSameTraceId(t *testing.T) { } func TestConsumeTracesExporterNoEndpoint(t *testing.T) { + ts, tb := getTelemetryAssets(t) componentFactory := func(_ context.Context, _ string) (component.Component, error) { return newNopMockTracesExporter(), nil } - lb, err := newLoadBalancer(exportertest.NewNopSettings(), simpleConfig(), componentFactory) + lb, err := newLoadBalancer(ts.Logger, simpleConfig(), componentFactory, tb) require.NotNil(t, lb) require.NoError(t, err) - p, err := newTracesExporter(exportertest.NewNopSettings(), simpleConfig()) + p, err := newTracesExporter(ts, simpleConfig()) require.NotNil(t, p) require.NoError(t, err) @@ -309,14 +313,15 @@ func TestConsumeTracesExporterNoEndpoint(t *testing.T) { } func TestConsumeTracesUnexpectedExporterType(t *testing.T) { + ts, tb := getTelemetryAssets(t) componentFactory := func(_ context.Context, _ string) (component.Component, error) { return newNopMockExporter(), nil } - lb, err := newLoadBalancer(exportertest.NewNopSettings(), simpleConfig(), componentFactory) + lb, err := newLoadBalancer(ts.Logger, simpleConfig(), componentFactory, tb) require.NotNil(t, lb) require.NoError(t, err) - p, err := newTracesExporter(exportertest.NewNopSettings(), simpleConfig()) + p, err := newTracesExporter(ts, simpleConfig()) require.NotNil(t, p) require.NoError(t, err) @@ -374,15 +379,16 @@ func TestBuildExporterConfig(t *testing.T) { } func TestBatchWithTwoTraces(t *testing.T) { + ts, tb := getTelemetryAssets(t) sink := new(consumertest.TracesSink) componentFactory := func(_ context.Context, _ string) (component.Component, error) { return newMockTracesExporter(sink.ConsumeTraces), nil } - lb, err := newLoadBalancer(exportertest.NewNopSettings(), simpleConfig(), componentFactory) + lb, err := newLoadBalancer(ts.Logger, simpleConfig(), componentFactory, tb) require.NotNil(t, lb) require.NoError(t, err) - p, err := newTracesExporter(exportertest.NewNopSettings(), simpleConfig()) + p, err := newTracesExporter(ts, simpleConfig()) require.NotNil(t, p) require.NoError(t, err) @@ -448,6 +454,7 @@ func TestNoTracesInBatch(t *testing.T) { func TestRollingUpdatesWhenConsumeTraces(t *testing.T) { t.Skip("Flaky Test - See https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/13331") + ts, tb := getTelemetryAssets(t) // this test is based on the discussion in the following issue for this exporter: // https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/1690 @@ -455,7 +462,7 @@ func TestRollingUpdatesWhenConsumeTraces(t *testing.T) { // simulate rolling updates, the dns resolver should resolve in the following order // ["127.0.0.1"] -> ["127.0.0.1", "127.0.0.2"] -> ["127.0.0.2"] - res, err := newDNSResolver(zap.NewNop(), "service-1", "", 5*time.Second, 1*time.Second) + res, err := newDNSResolver(ts.Logger, "service-1", "", 5*time.Second, 1*time.Second, tb) require.NoError(t, err) mu := sync.Mutex{} @@ -506,11 +513,11 @@ func TestRollingUpdatesWhenConsumeTraces(t *testing.T) { componentFactory := func(_ context.Context, _ string) (component.Component, error) { return newNopMockTracesExporter(), nil } - lb, err := newLoadBalancer(exportertest.NewNopSettings(), cfg, componentFactory) + lb, err := newLoadBalancer(ts.Logger, cfg, componentFactory, tb) require.NotNil(t, lb) require.NoError(t, err) - p, err := newTracesExporter(exportertest.NewNopSettings(), cfg) + p, err := newTracesExporter(ts, cfg) require.NotNil(t, p) require.NoError(t, err) @@ -519,17 +526,19 @@ func TestRollingUpdatesWhenConsumeTraces(t *testing.T) { counter1 := &atomic.Int64{} counter2 := &atomic.Int64{} + id1 := "127.0.0.1:4317" + id2 := "127.0.0.2:4317" defaultExporters := map[string]*wrappedExporter{ - "127.0.0.1:4317": newWrappedExporter(newMockTracesExporter(func(_ context.Context, _ ptrace.Traces) error { + id1: newWrappedExporter(newMockTracesExporter(func(_ context.Context, _ ptrace.Traces) error { counter1.Add(1) // simulate an unreachable backend time.Sleep(10 * time.Second) return nil - })), - "127.0.0.2:4317": newWrappedExporter(newMockTracesExporter(func(_ context.Context, _ ptrace.Traces) error { + }), id1), + id2: newWrappedExporter(newMockTracesExporter(func(_ context.Context, _ ptrace.Traces) error { counter2.Add(1) return nil - })), + }), id2), } // test @@ -587,6 +596,7 @@ func TestRollingUpdatesWhenConsumeTraces(t *testing.T) { } func benchConsumeTraces(b *testing.B, endpointsCount int, tracesCount int) { + ts, tb := getTelemetryAssets(b) sink := new(consumertest.TracesSink) componentFactory := func(_ context.Context, _ string) (component.Component, error) { return newMockTracesExporter(sink.ConsumeTraces), nil @@ -603,7 +613,7 @@ func benchConsumeTraces(b *testing.B, endpointsCount int, tracesCount int) { }, } - lb, err := newLoadBalancer(exportertest.NewNopSettings(), config, componentFactory) + lb, err := newLoadBalancer(ts.Logger, config, componentFactory, tb) require.NotNil(b, lb) require.NoError(b, err) diff --git a/exporter/loadbalancingexporter/wrapped_exporter.go b/exporter/loadbalancingexporter/wrapped_exporter.go index cb2491fc23ff..c6ca5f639850 100644 --- a/exporter/loadbalancingexporter/wrapped_exporter.go +++ b/exporter/loadbalancingexporter/wrapped_exporter.go @@ -13,6 +13,7 @@ import ( "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/pdata/ptrace" + "go.opentelemetry.io/otel/attribute" ) // wrappedExporter is an exporter that waits for the data processing to complete before shutting down. @@ -20,10 +21,21 @@ import ( type wrappedExporter struct { component.Component consumeWG sync.WaitGroup + + // we store the attributes here for both cases, to avoid new allocations on the hot path + endpointAttr attribute.Set + successAttr attribute.Set + failureAttr attribute.Set } -func newWrappedExporter(exp component.Component) *wrappedExporter { - return &wrappedExporter{Component: exp} +func newWrappedExporter(exp component.Component, identifier string) *wrappedExporter { + ea := attribute.String("endpoint", identifier) + return &wrappedExporter{ + Component: exp, + endpointAttr: attribute.NewSet(ea), + successAttr: attribute.NewSet(ea, attribute.Bool("success", true)), + failureAttr: attribute.NewSet(ea, attribute.Bool("success", false)), + } } func (we *wrappedExporter) Shutdown(ctx context.Context) error {