Skip to content

Commit

Permalink
[processor/lsminterval] Add benchmarks (#232)
Browse files Browse the repository at this point in the history
Add benchmarks to facilitate perf optimization work
  • Loading branch information
carsonip authored Nov 28, 2024
1 parent 95c8db6 commit 3651d70
Showing 1 changed file with 89 additions and 3 deletions.
92 changes: 89 additions & 3 deletions processor/lsmintervalprocessor/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package lsmintervalprocessor

import (
"context"
"fmt"
"path/filepath"
"testing"
"time"
Expand All @@ -29,6 +30,7 @@ import (
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/processor/processortest"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"go.uber.org/zap/zaptest"

Expand All @@ -53,9 +55,6 @@ func TestAggregation(t *testing.T) {
{name: "summary_passthrough", passThrough: true},
}

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

for _, tc := range testCases {
config := &Config{
Intervals: []IntervalConfig{
Expand All @@ -76,6 +75,9 @@ func TestAggregation(t *testing.T) {
t.Run(tc.name, func(t *testing.T) {
t.Parallel()

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

next := &consumertest.MetricsSink{}

factory := NewFactory()
Expand Down Expand Up @@ -118,3 +120,87 @@ func TestAggregation(t *testing.T) {
})
}
}

func benchmarkAggregation(b *testing.B, ottlStatements []string) {
testCases := []struct {
name string
passThrough bool
}{
{name: "sum_cumulative"},
{name: "sum_delta"},
{name: "histogram_cumulative"},
{name: "histogram_delta"},
{name: "exphistogram_cumulative"},
{name: "exphistogram_delta"},
{name: "summary_enabled"},
{name: "summary_passthrough", passThrough: true},
}

for _, tc := range testCases {
config := &Config{
Intervals: []IntervalConfig{
{
Duration: time.Hour,
Statements: ottlStatements,
},
},
PassThrough: PassThrough{
Summary: tc.passThrough,
},
}
b.Run(tc.name, func(b *testing.B) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

next := &consumertest.MetricsSink{}

factory := NewFactory()
settings := processortest.NewNopSettings()
settings.TelemetrySettings.Logger = zap.NewNop()
mgp, err := factory.CreateMetrics(
context.Background(),
settings,
config,
next,
)
require.NoError(b, err)

dir := filepath.Join("testdata", tc.name)
md, err := golden.ReadMetrics(filepath.Join(dir, "input.yaml"))
require.NoError(b, err)
md.MarkReadOnly()
b.ResetTimer()

err = mgp.Start(context.Background(), componenttest.NewNopHost())
require.NoError(b, err)
for i := 0; i < b.N; i++ {
mdCopy := pmetric.NewMetrics()
md.CopyTo(mdCopy)
// Overwrites the asdf attribute in metrics such that it becomes high cardinality
mdCopy.ResourceMetrics().At(0).Resource().Attributes().PutStr("asdf", fmt.Sprintf("%d", i))
err = mgp.ConsumeMetrics(ctx, mdCopy)
require.NoError(b, err)
}

err = mgp.(*Processor).Shutdown(context.Background())
require.NoError(b, err)
allMetrics := next.AllMetrics()
// There should be 1 empty metrics for each of the b.N input metrics,
// then at last 1 actual merged metrics on shutdown
assert.Len(b, allMetrics, b.N+1)
})
}
}

func BenchmarkAggregation(b *testing.B) {
benchmarkAggregation(b, nil)
}

func BenchmarkAggregationWithOTTL(b *testing.B) {
benchmarkAggregation(b, []string{
`set(resource.attributes["custom_res_attr"], "res")`,
`set(instrumentation_scope.attributes["custom_scope_attr"], "scope")`,
`set(attributes["custom_dp_attr"], "dp")`,
`set(resource.attributes["dependent_attr"], Concat([attributes["aaa"], "dependent"], "-"))`,
})
}

0 comments on commit 3651d70

Please sign in to comment.