Skip to content

Commit

Permalink
[chore] add receiver adapter benchmark tests (#35833)
Browse files Browse the repository at this point in the history
<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue.
Ex. Adding a feature - Explain what this achieves.-->
#### Description

This PR introduces a benchmark test for the receiver adapter which tests
the performance of handling the log entries sent to the `LogEmitter`,
which are then received and converted by the receiver adapter. The
changes have been made based on the comment linked here:
#35669 (comment)

<!-- Issue number (e.g. #1234) or full URL to issue, if applicable. -->
#### Link to tracking issue
Part of #35453

<!--Describe what testing was performed and which tests were added.-->
#### Testing

Added additional benchmark tests

---------

Signed-off-by: Florian Bacher <[email protected]>
  • Loading branch information
bacherfl authored Oct 24, 2024
1 parent 2ec4906 commit 6a343fe
Showing 1 changed file with 190 additions and 0 deletions.
190 changes: 190 additions & 0 deletions pkg/stanza/adapter/receiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@ import (
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/receiver/receiverhelper"
"go.opentelemetry.io/collector/receiver/receivertest"
"gopkg.in/yaml.v2"

Expand Down Expand Up @@ -164,6 +167,102 @@ func TestShutdownFlush(t *testing.T) {
)
}

func BenchmarkReceiver(b *testing.B) {
b.Run(
"1 Log entry per iteration",
func(b *testing.B) {
benchmarkReceiver(b, 1)
},
)
b.Run(
"10 Log entries per iteration",
func(b *testing.B) {
benchmarkReceiver(b, 10)
},
)
b.Run(
"100 Log entries per iteration",
func(b *testing.B) {
benchmarkReceiver(b, 100)
},
)
b.Run(
"1_000 Log entries per iteration",
func(b *testing.B) {
benchmarkReceiver(b, 1_000)
},
)
b.Run(
"10_000 Log entries per iteration",
func(b *testing.B) {
benchmarkReceiver(b, 10_000)
},
)
}

func benchmarkReceiver(b *testing.B, logsPerIteration int) {
iterationComplete := make(chan struct{})
nextIteration := make(chan struct{})

inputBuilder := &testInputBuilder{
numberOfLogEntries: logsPerIteration,
nextIteration: nextIteration,
}
inputCfg := operator.Config{
Builder: inputBuilder,
}

set := componenttest.NewNopTelemetrySettings()
emitter := helper.NewLogEmitter(set)
defer func() {
require.NoError(b, emitter.Stop())
}()

pipe, err := pipeline.Config{
Operators: []operator.Config{inputCfg},
DefaultOutput: emitter,
}.Build(set)
require.NoError(b, err)

storageClient := storagetest.NewInMemoryClient(
component.KindReceiver,
component.MustNewID("foolog"),
"test",
)

converter := NewConverter(set)

obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{ReceiverCreateSettings: receivertest.NewNopSettings()})
require.NoError(b, err)

mockConsumer := &testConsumer{
receivedAllLogs: iterationComplete,
expectedLogs: uint32(logsPerIteration),
receivedLogs: atomic.Uint32{},
}
rcv := &receiver{
set: set,
pipe: pipe,
emitter: emitter,
consumer: mockConsumer,
converter: converter,
obsrecv: obsrecv,
storageClient: storageClient,
}

b.ResetTimer()

require.NoError(b, rcv.Start(context.Background(), nil))

for i := 0; i < b.N; i++ {
nextIteration <- struct{}{}
<-iterationComplete
mockConsumer.receivedLogs.Store(0)
}

require.NoError(b, rcv.Shutdown(context.Background()))
}

func BenchmarkReadLine(b *testing.B) {
filePath := filepath.Join(b.TempDir(), "bench.log")

Expand Down Expand Up @@ -281,3 +380,94 @@ func BenchmarkParseAndMap(b *testing.B) {
}
}
}

const testInputOperatorTypeStr = "test_input"

type testInputBuilder struct {
numberOfLogEntries int
nextIteration chan struct{}
}

func (t *testInputBuilder) ID() string {
return testInputOperatorTypeStr
}

func (t *testInputBuilder) Type() string {
return testInputOperatorTypeStr
}

func (t *testInputBuilder) Build(settings component.TelemetrySettings) (operator.Operator, error) {
inputConfig := helper.NewInputConfig(t.ID(), testInputOperatorTypeStr)
inputOperator, err := inputConfig.Build(settings)
if err != nil {
return nil, err
}

return &testInputOperator{
InputOperator: inputOperator,
numberOfLogEntries: t.numberOfLogEntries,
nextIteration: t.nextIteration,
}, nil
}

func (t *testInputBuilder) SetID(_ string) {}

var _ operator.Operator = &testInputOperator{}

type testInputOperator struct {
helper.InputOperator
numberOfLogEntries int
nextIteration chan struct{}
cancelFunc context.CancelFunc
}

func (t *testInputOperator) ID() string {
return testInputOperatorTypeStr
}

func (t *testInputOperator) Type() string {
return testInputOperatorTypeStr
}

func (t *testInputOperator) Start(_ operator.Persister) error {
ctx, cancelFunc := context.WithCancel(context.Background())
t.cancelFunc = cancelFunc

e := complexEntry()
go func() {
for {
select {
case <-t.nextIteration:
for i := 0; i < t.numberOfLogEntries; i++ {
_ = t.Write(context.Background(), e)
}
case <-ctx.Done():
return
}
}

}()
return nil
}

func (t *testInputOperator) Stop() error {
t.cancelFunc()
return nil
}

type testConsumer struct {
receivedAllLogs chan struct{}
expectedLogs uint32
receivedLogs atomic.Uint32
}

func (t *testConsumer) Capabilities() consumer.Capabilities {
return consumer.Capabilities{}
}

func (t *testConsumer) ConsumeLogs(_ context.Context, ld plog.Logs) error {
if t.receivedLogs.Add(uint32(ld.LogRecordCount())) >= t.expectedLogs {
t.receivedAllLogs <- struct{}{}
}
return nil
}

0 comments on commit 6a343fe

Please sign in to comment.