From 072cae05815f76d655828961f148311eb3cae58e Mon Sep 17 00:00:00 2001 From: Florian Bacher Date: Wed, 16 Oct 2024 13:45:25 +0200 Subject: [PATCH 1/4] [chore] rework receiver adapter benchmark tests Signed-off-by: Florian Bacher --- pkg/stanza/adapter/receiver_test.go | 188 ++++++++++++++++++++++++++++ 1 file changed, 188 insertions(+) diff --git a/pkg/stanza/adapter/receiver_test.go b/pkg/stanza/adapter/receiver_test.go index 363590b1f4d4..c29ff0ef89b0 100644 --- a/pkg/stanza/adapter/receiver_test.go +++ b/pkg/stanza/adapter/receiver_test.go @@ -6,6 +6,9 @@ package adapter import ( "context" "fmt" + "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/pdata/plog" + "go.opentelemetry.io/collector/receiver/receiverhelper" "os" "path/filepath" "sync/atomic" @@ -164,6 +167,102 @@ func TestShutdownFlush(t *testing.T) { ) } +func BenchmarkReceiver(b *testing.B) { + b.Run( + "1 Log entry per iteration", + func(b *testing.B) { + benchmarkReceiver(b, 1) + }, + ) + b.Run( + "10 Log entries per iteration", + func(b *testing.B) { + benchmarkReceiver(b, 10) + }, + ) + b.Run( + "100 Log entries per iteration", + func(b *testing.B) { + benchmarkReceiver(b, 100) + }, + ) + b.Run( + "1_000 Log entries per iteration", + func(b *testing.B) { + benchmarkReceiver(b, 1_000) + }, + ) + b.Run( + "10_000 Log entries per iteration", + func(b *testing.B) { + benchmarkReceiver(b, 10_000) + }, + ) +} + +func benchmarkReceiver(b *testing.B, logsPerIteration int) { + iterationComplete := make(chan struct{}) + nextIteration := make(chan struct{}) + + inputBuilder := &testInputBuilder{ + numberOfLogEntries: logsPerIteration, + nextIteration: nextIteration, + } + inputCfg := operator.Config{ + Builder: inputBuilder, + } + + set := componenttest.NewNopTelemetrySettings() + emitter := helper.NewLogEmitter(set) + defer func() { + require.NoError(b, emitter.Stop()) + }() + + pipe, err := pipeline.Config{ + Operators: []operator.Config{inputCfg}, + DefaultOutput: emitter, + }.Build(set) + require.NoError(b, err) + + storageClient := storagetest.NewInMemoryClient( + component.KindReceiver, + component.MustNewID("foolog"), + "test", + ) + + converter := NewConverter(set) + + obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{ReceiverCreateSettings: receivertest.NewNopSettings()}) + require.NoError(b, err) + + mockConsumer := &testConsumer{ + receivedAllLogs: iterationComplete, + expectedLogs: uint32(logsPerIteration), + receivedLogs: atomic.Uint32{}, + } + rcv := &receiver{ + set: set, + pipe: pipe, + emitter: emitter, + consumer: mockConsumer, + converter: converter, + obsrecv: obsrecv, + storageClient: storageClient, + } + + b.ResetTimer() + + require.NoError(b, rcv.Start(context.Background(), nil)) + + for i := 0; i < b.N; i++ { + nextIteration <- struct{}{} + <-iterationComplete + mockConsumer.receivedLogs.Store(0) + } + + require.Nil(b, rcv.Shutdown(context.Background())) +} + func BenchmarkReadLine(b *testing.B) { filePath := filepath.Join(b.TempDir(), "bench.log") @@ -281,3 +380,92 @@ func BenchmarkParseAndMap(b *testing.B) { } } } + +const testInputOperatorTypeStr = "test_input" + +type testInputBuilder struct { + numberOfLogEntries int + nextIteration chan struct{} +} + +func (t *testInputBuilder) ID() string { + return testInputOperatorTypeStr +} + +func (t *testInputBuilder) Type() string { + return testInputOperatorTypeStr +} + +func (t *testInputBuilder) Build(settings component.TelemetrySettings) (operator.Operator, error) { + inputConfig := helper.NewInputConfig(t.ID(), testInputOperatorTypeStr) + inputOperator, err := inputConfig.Build(settings) + if err != nil { + return nil, err + } + + return &testInputOperator{ + InputOperator: inputOperator, + numberOfLogEntries: t.numberOfLogEntries, + nextIteration: t.nextIteration, + }, nil +} + +func (t *testInputBuilder) SetID(_ string) {} + +var _ operator.Operator = &testInputOperator{} + +type testInputOperator struct { + helper.InputOperator + numberOfLogEntries int + nextIteration chan struct{} + cancelFunc context.CancelFunc +} + +func (t *testInputOperator) ID() string { + return testInputOperatorTypeStr +} + +func (t *testInputOperator) Type() string { + return testInputOperatorTypeStr +} + +func (t *testInputOperator) Start(_ operator.Persister) error { + ctx, cancelFunc := context.WithCancel(context.Background()) + t.cancelFunc = cancelFunc + go func() { + for { + select { + case <-t.nextIteration: + for i := 0; i < t.numberOfLogEntries; i++ { + _ = t.Write(context.Background(), entry.New()) + } + case <-ctx.Done(): + return + } + } + + }() + return nil +} + +func (t *testInputOperator) Stop() error { + t.cancelFunc() + return nil +} + +type testConsumer struct { + receivedAllLogs chan struct{} + expectedLogs uint32 + receivedLogs atomic.Uint32 +} + +func (t *testConsumer) Capabilities() consumer.Capabilities { + return consumer.Capabilities{} +} + +func (t *testConsumer) ConsumeLogs(_ context.Context, ld plog.Logs) error { + if t.receivedLogs.Add(uint32(ld.LogRecordCount())) >= t.expectedLogs { + t.receivedAllLogs <- struct{}{} + } + return nil +} From 0b4a6f19370a7dff448e15bb3866637beadea137 Mon Sep 17 00:00:00 2001 From: Florian Bacher Date: Wed, 16 Oct 2024 15:17:33 +0200 Subject: [PATCH 2/4] trigger CI Signed-off-by: Florian Bacher From c4f1505cf659f34ab72f9fe6690bf95734d914dc Mon Sep 17 00:00:00 2001 From: Florian Bacher Date: Thu, 17 Oct 2024 07:01:42 +0200 Subject: [PATCH 3/4] use complex entry for benchmark test Signed-off-by: Florian Bacher --- pkg/stanza/adapter/receiver_test.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/pkg/stanza/adapter/receiver_test.go b/pkg/stanza/adapter/receiver_test.go index c29ff0ef89b0..82e4d6c72e05 100644 --- a/pkg/stanza/adapter/receiver_test.go +++ b/pkg/stanza/adapter/receiver_test.go @@ -260,7 +260,7 @@ func benchmarkReceiver(b *testing.B, logsPerIteration int) { mockConsumer.receivedLogs.Store(0) } - require.Nil(b, rcv.Shutdown(context.Background())) + require.NoError(b, rcv.Shutdown(context.Background())) } func BenchmarkReadLine(b *testing.B) { @@ -432,12 +432,14 @@ func (t *testInputOperator) Type() string { func (t *testInputOperator) Start(_ operator.Persister) error { ctx, cancelFunc := context.WithCancel(context.Background()) t.cancelFunc = cancelFunc + + e := complexEntry() go func() { for { select { case <-t.nextIteration: for i := 0; i < t.numberOfLogEntries; i++ { - _ = t.Write(context.Background(), entry.New()) + _ = t.Write(context.Background(), e) } case <-ctx.Done(): return From fd9988013a53e6384c8e30a0a6999dabb59109eb Mon Sep 17 00:00:00 2001 From: Florian Bacher Date: Thu, 17 Oct 2024 07:05:38 +0200 Subject: [PATCH 4/4] fix linting Signed-off-by: Florian Bacher --- pkg/stanza/adapter/receiver_test.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/stanza/adapter/receiver_test.go b/pkg/stanza/adapter/receiver_test.go index 82e4d6c72e05..a5349a479866 100644 --- a/pkg/stanza/adapter/receiver_test.go +++ b/pkg/stanza/adapter/receiver_test.go @@ -6,9 +6,6 @@ package adapter import ( "context" "fmt" - "go.opentelemetry.io/collector/consumer" - "go.opentelemetry.io/collector/pdata/plog" - "go.opentelemetry.io/collector/receiver/receiverhelper" "os" "path/filepath" "sync/atomic" @@ -19,7 +16,10 @@ import ( "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/consumertest" + "go.opentelemetry.io/collector/pdata/plog" + "go.opentelemetry.io/collector/receiver/receiverhelper" "go.opentelemetry.io/collector/receiver/receivertest" "gopkg.in/yaml.v2"