diff --git a/exporter/loadbalancingexporter/metrics_exporter_test.go b/exporter/loadbalancingexporter/metrics_exporter_test.go index b19ddedc924d..f7c7f1df55df 100644 --- a/exporter/loadbalancingexporter/metrics_exporter_test.go +++ b/exporter/loadbalancingexporter/metrics_exporter_test.go @@ -11,7 +11,6 @@ import ( "net" "os" "path/filepath" - "strconv" "sync" "sync/atomic" "testing" @@ -27,12 +26,12 @@ import ( "go.opentelemetry.io/collector/exporter" "go.opentelemetry.io/collector/exporter/otlpexporter" "go.opentelemetry.io/collector/otelcol/otelcoltest" + "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/pmetric" conventions "go.opentelemetry.io/collector/semconv/v1.9.0" "gopkg.in/yaml.v2" "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/loadbalancingexporter/internal/metadata" - "github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest/pmetrictest" ) @@ -832,7 +831,7 @@ func TestRollingUpdatesWhenConsumeMetrics(t *testing.T) { return case <-ticker.C: go func() { - require.NoError(t, p.ConsumeMetrics(ctx, randomMetrics())) + require.NoError(t, p.ConsumeMetrics(ctx, randomMetrics(t, 1, 1, 1, 1))) }() } } @@ -858,15 +857,61 @@ func TestRollingUpdatesWhenConsumeMetrics(t *testing.T) { require.Greater(t, counter2.Load(), int64(0)) } -func appendSimpleMetricWithServiceName(metric pmetric.Metrics, serviceName string, sigName string) { - metric.ResourceMetrics().EnsureCapacity(1) - rmetrics := metric.ResourceMetrics().AppendEmpty() - rmetrics.Resource().Attributes().PutStr(conventions.AttributeServiceName, serviceName) - rmetrics.ScopeMetrics().AppendEmpty().Metrics().AppendEmpty().SetName(sigName) +func randomMetrics(t require.TestingT, rmCount int, smCount int, mCount int, dpCount int) pmetric.Metrics { + md := pmetric.NewMetrics() + + timeStamp := pcommon.Timestamp(rand.Intn(256)) + value := int64(rand.Intn(256)) + + for i := 0; i < rmCount; i++ { + rm := md.ResourceMetrics().AppendEmpty() + err := rm.Resource().Attributes().FromRaw(map[string]any{ + conventions.AttributeServiceName: fmt.Sprintf("service-%d", rand.Intn(512)), + }) + require.NoError(t, err) + + for j := 0; j < smCount; j++ { + sm := rm.ScopeMetrics().AppendEmpty() + scope := sm.Scope() + scope.SetName("MyTestInstrument") + scope.SetVersion("1.2.3") + err = scope.Attributes().FromRaw(map[string]any{ + "scope.key": fmt.Sprintf("scope-%d", rand.Intn(512)), + }) + require.NoError(t, err) + + for k := 0; k < mCount; k++ { + m := sm.Metrics().AppendEmpty() + m.SetName(fmt.Sprintf("metric.%d.test", rand.Intn(512))) + + sum := m.SetEmptySum() + sum.SetAggregationTemporality(pmetric.AggregationTemporalityCumulative) + sum.SetIsMonotonic(true) + + for l := 0; l < dpCount; l++ { + dp := sum.DataPoints().AppendEmpty() + + dp.SetTimestamp(timeStamp) + timeStamp += 10 + + dp.SetIntValue(value) + value += 15 + + err = dp.Attributes().FromRaw(map[string]any{ + "datapoint.key": fmt.Sprintf("dp-%d", rand.Intn(512)), + }) + require.NoError(t, err) + } + } + } + } + + return md } -func benchConsumeMetrics(b *testing.B, endpointsCount int, metricsCount int) { +func benchConsumeMetrics(b *testing.B, routingKey string, endpointsCount int, rmCount int, smCount int, mCount int, dpCount int) { ts, tb := getTelemetryAssets(b) + sink := new(consumertest.MetricsSink) componentFactory := func(_ context.Context, _ string) (component.Component, error) { return newMockMetricsExporter(sink.ConsumeMetrics), nil @@ -881,6 +926,7 @@ func benchConsumeMetrics(b *testing.B, endpointsCount int, metricsCount int) { Resolver: ResolverSettings{ Static: &StaticResolver{Hostnames: endpoints}, }, + RoutingKey: routingKey, } lb, err := newLoadBalancer(ts.Logger, config, componentFactory, tb) @@ -896,15 +942,7 @@ func benchConsumeMetrics(b *testing.B, endpointsCount int, metricsCount int) { err = p.Start(context.Background(), componenttest.NewNopHost()) require.NoError(b, err) - metric1 := pmetric.NewMetrics() - metric2 := pmetric.NewMetrics() - for i := 0; i < endpointsCount; i++ { - for j := 0; j < metricsCount/endpointsCount; j++ { - appendSimpleMetricWithServiceName(metric2, fmt.Sprintf("service-%d", i), fmt.Sprintf("sig-%d", i)) - } - } - simpleMetricsWithServiceName() - md := metrics.Merge(metric1, metric2) + md := randomMetrics(b, rmCount, smCount, mCount, dpCount) b.ResetTimer() @@ -918,36 +956,39 @@ func benchConsumeMetrics(b *testing.B, endpointsCount int, metricsCount int) { require.NoError(b, err) } -func BenchmarkConsumeMetrics_1E100T(b *testing.B) { - benchConsumeMetrics(b, 1, 100) -} - -func BenchmarkConsumeMetrics_1E1000T(b *testing.B) { - benchConsumeMetrics(b, 1, 1000) -} - -func BenchmarkConsumeMetrics_5E100T(b *testing.B) { - benchConsumeMetrics(b, 5, 100) -} - -func BenchmarkConsumeMetrics_5E500T(b *testing.B) { - benchConsumeMetrics(b, 5, 500) -} - -func BenchmarkConsumeMetrics_5E1000T(b *testing.B) { - benchConsumeMetrics(b, 5, 1000) -} - -func BenchmarkConsumeMetrics_10E100T(b *testing.B) { - benchConsumeMetrics(b, 10, 100) -} - -func BenchmarkConsumeMetrics_10E500T(b *testing.B) { - benchConsumeMetrics(b, 10, 500) -} +func BenchmarkConsumeMetrics(b *testing.B) { + testCases := []struct { + routingKey string + }{ + { + routingKey: svcRoutingStr, + }, + { + routingKey: resourceRoutingStr, + }, + { + routingKey: metricNameRoutingStr, + }, + } -func BenchmarkConsumeMetrics_10E1000T(b *testing.B) { - benchConsumeMetrics(b, 10, 1000) + for _, tc := range testCases { + b.Run(tc.routingKey, func(b *testing.B) { + for _, endpointCount := range []int{1, 5, 10} { + for _, rmCount := range []int{1, 3} { + for _, smCount := range []int{1, 3} { + for _, totalMCount := range []int{100, 500, 1000} { + mCount := totalMCount / smCount / rmCount + dpCount := 2 + + b.Run(fmt.Sprintf("%dE_%dRM_%dSM_%dM", endpointCount, rmCount, smCount, mCount), func(b *testing.B) { + benchConsumeMetrics(b, tc.routingKey, endpointCount, rmCount, smCount, mCount, dpCount) + }) + } + } + } + } + }) + } } func endpoint2Config() *Config { @@ -977,14 +1018,6 @@ func metricNameBasedRoutingConfig() *Config { } } -func randomMetrics() pmetric.Metrics { - v1 := uint64(rand.Intn(256)) - name := strconv.FormatUint(v1, 10) - metrics := pmetric.NewMetrics() - appendSimpleMetricWithID(metrics.ResourceMetrics().AppendEmpty(), name) - return metrics -} - func simpleMetricsWithServiceName() pmetric.Metrics { metrics := pmetric.NewMetrics() metrics.ResourceMetrics().EnsureCapacity(1)