Skip to content

Commit

Permalink
create metrics manually
Browse files Browse the repository at this point in the history
  • Loading branch information
atoulme committed Feb 14, 2023
1 parent 0b92477 commit 21bf349
Show file tree
Hide file tree
Showing 5 changed files with 82 additions and 86 deletions.
1 change: 1 addition & 0 deletions cmd/telemetrygen/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ require (
github.com/spf13/cobra v1.6.1
github.com/spf13/pflag v1.0.5
github.com/stretchr/testify v1.8.1
go.opentelemetry.io/collector/semconv v0.71.0
go.opentelemetry.io/otel v1.13.0
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v0.36.0
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v0.36.0
Expand Down
2 changes: 2 additions & 0 deletions cmd/telemetrygen/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

20 changes: 6 additions & 14 deletions cmd/telemetrygen/internal/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,11 @@ import (
"sync"
"time"

semconv "go.opentelemetry.io/collector/semconv/v1.13.0"
"go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc"
"go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp"
"go.opentelemetry.io/otel/metric/global"
sdkmetric "go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/resource"
semconv "go.opentelemetry.io/otel/semconv/v1.4.0"
"go.uber.org/atomic"
"go.uber.org/zap"
"golang.org/x/time/rate"
Expand Down Expand Up @@ -81,16 +80,7 @@ func Start(cfg *Config) error {
}
}()

reader := sdkmetric.NewPeriodicReader(exp, sdkmetric.WithInterval(cfg.ReportingInterval))

meterProvider := sdkmetric.NewMeterProvider(
sdkmetric.WithResource(resource.NewWithAttributes(semconv.SchemaURL, cfg.GetAttributes()...)),
sdkmetric.WithReader(reader),
)

global.SetMeterProvider(meterProvider)

if err = Run(cfg, logger); err != nil {
if err = Run(cfg, exp, logger); err != nil {
logger.Error("failed to stop the exporter", zap.Error(err))
return err
}
Expand All @@ -99,7 +89,7 @@ func Start(cfg *Config) error {
}

// Run executes the test scenario.
func Run(c *Config, logger *zap.Logger) error {
func Run(c *Config, exp sdkmetric.Exporter, logger *zap.Logger) error {
if c.TotalDuration > 0 {
c.NumMetrics = 0
} else if c.NumMetrics <= 0 {
Expand All @@ -116,6 +106,7 @@ func Run(c *Config, logger *zap.Logger) error {

wg := sync.WaitGroup{}
running := atomic.NewBool(true)
res := resource.NewWithAttributes(semconv.SchemaURL, c.GetAttributes()...)

for i := 0; i < c.WorkerCount; i++ {
wg.Add(1)
Expand All @@ -126,9 +117,10 @@ func Run(c *Config, logger *zap.Logger) error {
running: running,
wg: &wg,
logger: logger.With(zap.Int("worker", i)),
index: i,
}

go w.simulateMetrics()
go w.simulateMetrics(res, exp)
}
if c.TotalDuration > 0 {
time.Sleep(c.TotalDuration)
Expand Down
57 changes: 34 additions & 23 deletions cmd/telemetrygen/internal/metrics/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ import (
"sync"
"time"

"go.opentelemetry.io/otel/metric/global"
sdkmetric "go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
"go.opentelemetry.io/otel/sdk/resource"
"go.uber.org/atomic"
"go.uber.org/zap"
"golang.org/x/time/rate"
Expand All @@ -31,39 +33,48 @@ type worker struct {
totalDuration time.Duration // how long to run the test for (overrides `numMetrics`)
limitPerSecond rate.Limit // how many metrics per second to generate
wg *sync.WaitGroup // notify when done
logger *zap.Logger
logger *zap.Logger // logger
index int // worker index
}

func (w worker) simulateMetrics() {
func (w worker) simulateMetrics(res *resource.Resource, exporter sdkmetric.Exporter) {
limiter := rate.NewLimiter(w.limitPerSecond, 1)
var i int
meter := global.Meter("telemetrygen")
var i int64

index := int64(0)
max := int64(1000)
if w.limitPerSecond != rate.Inf {
max = int64(w.limitPerSecond)
}

counter, _ := meter.Int64Counter("gen")
for w.running.Load() {
counter.Add(context.Background(), 1)
rm := metricdata.ResourceMetrics{
Resource: res,
ScopeMetrics: []metricdata.ScopeMetrics{
{
Metrics: []metricdata.Metrics{
{
Name: "gen",
Data: metricdata.Gauge[int64]{
DataPoints: []metricdata.DataPoint[int64]{
{
Time: time.Now(),
Value: i,
},
},
},
},
},
},
},
}
if err := exporter.Export(context.Background(), rm); err != nil {
w.logger.Fatal("exporter failed", zap.Error(err))
}
if err := limiter.Wait(context.Background()); err != nil {
w.logger.Fatal("limiter waited failed, retry", zap.Error(err))
w.logger.Fatal("limiter wait failed, retry", zap.Error(err))
}

index++
if index > max {
index = 0
}
i++
if w.numMetrics != 0 {
if i >= w.numMetrics {
break
}
if w.numMetrics != 0 && i >= int64(w.numMetrics) {
break
}
}

w.logger.Info("metrics generated", zap.Int("metrics", i))
w.logger.Info("metrics generated", zap.Int64("metrics", i))
w.wg.Done()
}
88 changes: 39 additions & 49 deletions cmd/telemetrygen/internal/metrics/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,101 +21,91 @@ import (

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/otel/metric/global"
sdkmetric "go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/aggregation"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/cmd/telemetrygen/internal/common"
)

func TestFixedNumberOfMetrics(t *testing.T) {
// prepare
manualReader := sdkmetric.NewManualReader()
metricProvider := sdkmetric.NewMeterProvider(sdkmetric.WithReader(manualReader))
global.SetMeterProvider(metricProvider)
type mockExporter struct {
rms []*metricdata.ResourceMetrics
}

func (m *mockExporter) Temporality(kind sdkmetric.InstrumentKind) metricdata.Temporality {
return metricdata.DeltaTemporality
}

func (m *mockExporter) Aggregation(kind sdkmetric.InstrumentKind) aggregation.Aggregation {
return aggregation.Default{}
}

func (m *mockExporter) Export(ctx context.Context, metrics metricdata.ResourceMetrics) error {
m.rms = append(m.rms, &metrics)
return nil
}

func (m *mockExporter) ForceFlush(ctx context.Context) error {
return nil
}

func (m *mockExporter) Shutdown(ctx context.Context) error {
return nil
}

func TestFixedNumberOfMetrics(t *testing.T) {
cfg := &Config{
Config: common.Config{
WorkerCount: 1,
},
NumMetrics: 1,
NumMetrics: 5,
}

exp := &mockExporter{}

// test
logger, _ := zap.NewDevelopment()
require.NoError(t, Run(cfg, logger))
require.NoError(t, Run(cfg, exp, logger))

time.Sleep(1 * time.Second)

// verify
m, err := manualReader.Collect(context.Background())
require.Len(t, m.ScopeMetrics, 1)
require.NoError(t, err)
assert.Len(t, m.ScopeMetrics[0].Metrics, 1)
require.Len(t, exp.rms, 5)
}

func TestRateOfMetrics(t *testing.T) {
// prepare
manualReader := sdkmetric.NewManualReader()
metricProvider := sdkmetric.NewMeterProvider(sdkmetric.WithReader(manualReader))
global.SetMeterProvider(metricProvider)

cfg := &Config{
Config: common.Config{
Rate: 10,
TotalDuration: time.Second / 2,
WorkerCount: 1,
},
}

// sanity check
m, err := manualReader.Collect(context.Background())
require.Len(t, m.ScopeMetrics, 0)
require.NoError(t, err)
exp := &mockExporter{}

// test
require.NoError(t, Run(cfg, zap.NewNop()))
require.NoError(t, Run(cfg, exp, zap.NewNop()))

// verify
// the minimum acceptable number of metrics for the rate of 10/sec for half a second
m, err = manualReader.Collect(context.Background())
require.Len(t, m.ScopeMetrics, 1)
require.NoError(t, err)
assert.True(t, len(m.ScopeMetrics[0].Metrics) >= 6, "there should have been more than 6 metrics, had %d", len(m.ScopeMetrics[0].Metrics))
assert.True(t, len(exp.rms) >= 6, "there should have been more than 6 metrics, had %d", len(exp.rms))
// the maximum acceptable number of metrics for the rate of 10/sec for half a second
assert.True(t, len(m.ScopeMetrics[0].Metrics) <= 20, "there should have been less than 20 metrics, had %d", len(m.ScopeMetrics[0].Metrics))
assert.True(t, len(exp.rms) <= 20, "there should have been less than 20 metrics, had %d", len(exp.rms))
}

func TestUnthrottled(t *testing.T) {
// prepare
manualReader := sdkmetric.NewManualReader()
metricProvider := sdkmetric.NewMeterProvider(sdkmetric.WithReader(manualReader))
global.SetMeterProvider(metricProvider)

cfg := &Config{
Config: common.Config{
TotalDuration: 1 * time.Second,
WorkerCount: 1,
},
}

// sanity check
m, err := manualReader.Collect(context.Background())
require.Len(t, m.ScopeMetrics, 0)
require.NoError(t, err)
exp := &mockExporter{}

// test
logger, _ := zap.NewDevelopment()
require.NoError(t, Run(cfg, logger))

collected, err := manualReader.Collect(context.Background())
require.Len(t, collected.ScopeMetrics, 1)
count := 0
for _, m := range collected.ScopeMetrics[0].Metrics {
sum := m.Data.(metricdata.Sum[int64])
count += len(sum.DataPoints)
}
assert.True(t, count > 100, "there should have been more than 100 metrics, had %d", count)
require.NoError(t, err)
require.NoError(t, Run(cfg, exp, logger))

assert.True(t, len(exp.rms) > 100, "there should have been more than 100 metrics, had %d", len(exp.rms))
}

0 comments on commit 21bf349

Please sign in to comment.