diff --git a/cmd/telemetrygen/go.mod b/cmd/telemetrygen/go.mod index c351e9b73da3..527bc98e6f5f 100644 --- a/cmd/telemetrygen/go.mod +++ b/cmd/telemetrygen/go.mod @@ -7,6 +7,7 @@ require ( github.com/spf13/cobra v1.6.1 github.com/spf13/pflag v1.0.5 github.com/stretchr/testify v1.8.1 + go.opentelemetry.io/collector/semconv v0.71.0 go.opentelemetry.io/otel v1.13.0 go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v0.36.0 go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v0.36.0 diff --git a/cmd/telemetrygen/go.sum b/cmd/telemetrygen/go.sum index df33c90a1a3a..b8909f5147d6 100644 --- a/cmd/telemetrygen/go.sum +++ b/cmd/telemetrygen/go.sum @@ -191,6 +191,8 @@ go.opencensus.io v0.22.0/go.mod h1:+kGneAE2xo2IficOXnaByMWTGM9T73dGwxeWcUqIpI8= go.opencensus.io v0.22.2/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= go.opencensus.io v0.22.3/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= go.opencensus.io v0.22.4/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= +go.opentelemetry.io/collector/semconv v0.71.0 h1:g2bMdtciW2BmKximUxaF0L962U/EIlH9ms3iOGblCKE= +go.opentelemetry.io/collector/semconv v0.71.0/go.mod h1:UAp+qAMqEXOD0eEBmWJ3IJ5+LkF7zVTgmfufwpHmL8w= go.opentelemetry.io/otel v1.13.0 h1:1ZAKnNQKwBBxFtww/GwxNUyTf0AxkZzrukO8MeXqe4Y= go.opentelemetry.io/otel v1.13.0/go.mod h1:FH3RtdZCzRkJYFTCsAKDy9l/XYjMdNv6QrkFFB8DvVg= go.opentelemetry.io/otel/exporters/otlp/internal/retry v1.13.0 h1:pa05sNT/P8OsIQ8mPZKTIyiBuzS/xDGLVx+DCt0y6Vs= diff --git a/cmd/telemetrygen/internal/metrics/metrics.go b/cmd/telemetrygen/internal/metrics/metrics.go index d90459a6334e..82f3bd0059eb 100644 --- a/cmd/telemetrygen/internal/metrics/metrics.go +++ b/cmd/telemetrygen/internal/metrics/metrics.go @@ -20,12 +20,11 @@ import ( "sync" "time" + semconv "go.opentelemetry.io/collector/semconv/v1.13.0" "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc" "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp" - "go.opentelemetry.io/otel/metric/global" sdkmetric "go.opentelemetry.io/otel/sdk/metric" "go.opentelemetry.io/otel/sdk/resource" - semconv "go.opentelemetry.io/otel/semconv/v1.4.0" "go.uber.org/atomic" "go.uber.org/zap" "golang.org/x/time/rate" @@ -81,16 +80,7 @@ func Start(cfg *Config) error { } }() - reader := sdkmetric.NewPeriodicReader(exp, sdkmetric.WithInterval(cfg.ReportingInterval)) - - meterProvider := sdkmetric.NewMeterProvider( - sdkmetric.WithResource(resource.NewWithAttributes(semconv.SchemaURL, cfg.GetAttributes()...)), - sdkmetric.WithReader(reader), - ) - - global.SetMeterProvider(meterProvider) - - if err = Run(cfg, logger); err != nil { + if err = Run(cfg, exp, logger); err != nil { logger.Error("failed to stop the exporter", zap.Error(err)) return err } @@ -99,7 +89,7 @@ func Start(cfg *Config) error { } // Run executes the test scenario. -func Run(c *Config, logger *zap.Logger) error { +func Run(c *Config, exp sdkmetric.Exporter, logger *zap.Logger) error { if c.TotalDuration > 0 { c.NumMetrics = 0 } else if c.NumMetrics <= 0 { @@ -116,6 +106,7 @@ func Run(c *Config, logger *zap.Logger) error { wg := sync.WaitGroup{} running := atomic.NewBool(true) + res := resource.NewWithAttributes(semconv.SchemaURL, c.GetAttributes()...) for i := 0; i < c.WorkerCount; i++ { wg.Add(1) @@ -126,9 +117,10 @@ func Run(c *Config, logger *zap.Logger) error { running: running, wg: &wg, logger: logger.With(zap.Int("worker", i)), + index: i, } - go w.simulateMetrics() + go w.simulateMetrics(res, exp) } if c.TotalDuration > 0 { time.Sleep(c.TotalDuration) diff --git a/cmd/telemetrygen/internal/metrics/worker.go b/cmd/telemetrygen/internal/metrics/worker.go index 4ab3566e8dae..3612fa617d36 100644 --- a/cmd/telemetrygen/internal/metrics/worker.go +++ b/cmd/telemetrygen/internal/metrics/worker.go @@ -19,7 +19,9 @@ import ( "sync" "time" - "go.opentelemetry.io/otel/metric/global" + sdkmetric "go.opentelemetry.io/otel/sdk/metric" + "go.opentelemetry.io/otel/sdk/metric/metricdata" + "go.opentelemetry.io/otel/sdk/resource" "go.uber.org/atomic" "go.uber.org/zap" "golang.org/x/time/rate" @@ -31,39 +33,48 @@ type worker struct { totalDuration time.Duration // how long to run the test for (overrides `numMetrics`) limitPerSecond rate.Limit // how many metrics per second to generate wg *sync.WaitGroup // notify when done - logger *zap.Logger + logger *zap.Logger // logger + index int // worker index } -func (w worker) simulateMetrics() { +func (w worker) simulateMetrics(res *resource.Resource, exporter sdkmetric.Exporter) { limiter := rate.NewLimiter(w.limitPerSecond, 1) - var i int - meter := global.Meter("telemetrygen") + var i int64 - index := int64(0) - max := int64(1000) - if w.limitPerSecond != rate.Inf { - max = int64(w.limitPerSecond) - } - - counter, _ := meter.Int64Counter("gen") for w.running.Load() { - counter.Add(context.Background(), 1) + rm := metricdata.ResourceMetrics{ + Resource: res, + ScopeMetrics: []metricdata.ScopeMetrics{ + { + Metrics: []metricdata.Metrics{ + { + Name: "gen", + Data: metricdata.Gauge[int64]{ + DataPoints: []metricdata.DataPoint[int64]{ + { + Time: time.Now(), + Value: i, + }, + }, + }, + }, + }, + }, + }, + } + if err := exporter.Export(context.Background(), rm); err != nil { + w.logger.Fatal("exporter failed", zap.Error(err)) + } if err := limiter.Wait(context.Background()); err != nil { - w.logger.Fatal("limiter waited failed, retry", zap.Error(err)) + w.logger.Fatal("limiter wait failed, retry", zap.Error(err)) } - index++ - if index > max { - index = 0 - } i++ - if w.numMetrics != 0 { - if i >= w.numMetrics { - break - } + if w.numMetrics != 0 && i >= int64(w.numMetrics) { + break } } - w.logger.Info("metrics generated", zap.Int("metrics", i)) + w.logger.Info("metrics generated", zap.Int64("metrics", i)) w.wg.Done() } diff --git a/cmd/telemetrygen/internal/metrics/worker_test.go b/cmd/telemetrygen/internal/metrics/worker_test.go index 9e9face6a158..fc8c9b2edbf8 100644 --- a/cmd/telemetrygen/internal/metrics/worker_test.go +++ b/cmd/telemetrygen/internal/metrics/worker_test.go @@ -21,46 +21,60 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "go.opentelemetry.io/otel/metric/global" sdkmetric "go.opentelemetry.io/otel/sdk/metric" + "go.opentelemetry.io/otel/sdk/metric/aggregation" "go.opentelemetry.io/otel/sdk/metric/metricdata" "go.uber.org/zap" "github.com/open-telemetry/opentelemetry-collector-contrib/cmd/telemetrygen/internal/common" ) -func TestFixedNumberOfMetrics(t *testing.T) { - // prepare - manualReader := sdkmetric.NewManualReader() - metricProvider := sdkmetric.NewMeterProvider(sdkmetric.WithReader(manualReader)) - global.SetMeterProvider(metricProvider) +type mockExporter struct { + rms []*metricdata.ResourceMetrics +} + +func (m *mockExporter) Temporality(kind sdkmetric.InstrumentKind) metricdata.Temporality { + return metricdata.DeltaTemporality +} + +func (m *mockExporter) Aggregation(kind sdkmetric.InstrumentKind) aggregation.Aggregation { + return aggregation.Default{} +} + +func (m *mockExporter) Export(ctx context.Context, metrics metricdata.ResourceMetrics) error { + m.rms = append(m.rms, &metrics) + return nil +} + +func (m *mockExporter) ForceFlush(ctx context.Context) error { + return nil +} +func (m *mockExporter) Shutdown(ctx context.Context) error { + return nil +} + +func TestFixedNumberOfMetrics(t *testing.T) { cfg := &Config{ Config: common.Config{ WorkerCount: 1, }, - NumMetrics: 1, + NumMetrics: 5, } + exp := &mockExporter{} + // test logger, _ := zap.NewDevelopment() - require.NoError(t, Run(cfg, logger)) + require.NoError(t, Run(cfg, exp, logger)) time.Sleep(1 * time.Second) // verify - m, err := manualReader.Collect(context.Background()) - require.Len(t, m.ScopeMetrics, 1) - require.NoError(t, err) - assert.Len(t, m.ScopeMetrics[0].Metrics, 1) + require.Len(t, exp.rms, 5) } func TestRateOfMetrics(t *testing.T) { - // prepare - manualReader := sdkmetric.NewManualReader() - metricProvider := sdkmetric.NewMeterProvider(sdkmetric.WithReader(manualReader)) - global.SetMeterProvider(metricProvider) - cfg := &Config{ Config: common.Config{ Rate: 10, @@ -68,54 +82,30 @@ func TestRateOfMetrics(t *testing.T) { WorkerCount: 1, }, } - - // sanity check - m, err := manualReader.Collect(context.Background()) - require.Len(t, m.ScopeMetrics, 0) - require.NoError(t, err) + exp := &mockExporter{} // test - require.NoError(t, Run(cfg, zap.NewNop())) + require.NoError(t, Run(cfg, exp, zap.NewNop())) // verify // the minimum acceptable number of metrics for the rate of 10/sec for half a second - m, err = manualReader.Collect(context.Background()) - require.Len(t, m.ScopeMetrics, 1) - require.NoError(t, err) - assert.True(t, len(m.ScopeMetrics[0].Metrics) >= 6, "there should have been more than 6 metrics, had %d", len(m.ScopeMetrics[0].Metrics)) + assert.True(t, len(exp.rms) >= 6, "there should have been more than 6 metrics, had %d", len(exp.rms)) // the maximum acceptable number of metrics for the rate of 10/sec for half a second - assert.True(t, len(m.ScopeMetrics[0].Metrics) <= 20, "there should have been less than 20 metrics, had %d", len(m.ScopeMetrics[0].Metrics)) + assert.True(t, len(exp.rms) <= 20, "there should have been less than 20 metrics, had %d", len(exp.rms)) } func TestUnthrottled(t *testing.T) { - // prepare - manualReader := sdkmetric.NewManualReader() - metricProvider := sdkmetric.NewMeterProvider(sdkmetric.WithReader(manualReader)) - global.SetMeterProvider(metricProvider) - cfg := &Config{ Config: common.Config{ TotalDuration: 1 * time.Second, WorkerCount: 1, }, } - - // sanity check - m, err := manualReader.Collect(context.Background()) - require.Len(t, m.ScopeMetrics, 0) - require.NoError(t, err) + exp := &mockExporter{} // test logger, _ := zap.NewDevelopment() - require.NoError(t, Run(cfg, logger)) - - collected, err := manualReader.Collect(context.Background()) - require.Len(t, collected.ScopeMetrics, 1) - count := 0 - for _, m := range collected.ScopeMetrics[0].Metrics { - sum := m.Data.(metricdata.Sum[int64]) - count += len(sum.DataPoints) - } - assert.True(t, count > 100, "there should have been more than 100 metrics, had %d", count) - require.NoError(t, err) + require.NoError(t, Run(cfg, exp, logger)) + + assert.True(t, len(exp.rms) > 100, "there should have been more than 100 metrics, had %d", len(exp.rms)) }