diff --git a/.chloggen/add-large-file-tests.yaml b/.chloggen/add-large-file-tests.yaml new file mode 100644 index 000000000000..9cee3a4a0ebc --- /dev/null +++ b/.chloggen/add-large-file-tests.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: testbed + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Add test case scenarios to handle large files to existing testbed. + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [34288] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/testbed/testbed/validator.go b/testbed/testbed/validator.go index d7cc2fcfb48c..76d3073abc40 100644 --- a/testbed/testbed/validator.go +++ b/testbed/testbed/validator.go @@ -8,10 +8,12 @@ import ( "log" "reflect" "sort" + "strconv" "strings" "time" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/ptrace" ) @@ -561,3 +563,73 @@ func populateSpansMap(spansMap map[string]ptrace.Span, tds []ptrace.Traces) { func traceIDAndSpanIDToString(traceID pcommon.TraceID, spanID pcommon.SpanID) string { return fmt.Sprintf("%s-%s", traceID, spanID) } + +type CorrectnessLogTestValidator struct { + dataProvider DataProvider +} + +func NewCorrectnessLogTestValidator(provider DataProvider) *CorrectnessLogTestValidator { + return &CorrectnessLogTestValidator{ + dataProvider: provider, + } +} + +func (c *CorrectnessLogTestValidator) Validate(tc *TestCase) { + if dataProvider, ok := c.dataProvider.(*perfTestDataProvider); ok { + logsReceived := tc.MockBackend.ReceivedLogs + + idsSent := make([][2]string, 0) + idsReceived := make([][2]string, 0) + + for batch := 0; batch < int(dataProvider.traceIDSequence.Load()); batch++ { + for idx := 0; idx < dataProvider.options.ItemsPerBatch; idx++ { + idsSent = append(idsSent, [2]string{"batch_" + strconv.Itoa(batch), "item_" + strconv.Itoa(idx)}) + } + } + for _, log := range logsReceived { + for i := 0; i < log.ResourceLogs().Len(); i++ { + for j := 0; j < log.ResourceLogs().At(i).ScopeLogs().Len(); j++ { + s := log.ResourceLogs().At(i).ScopeLogs().At(j) + for k := 0; k < s.LogRecords().Len(); k++ { + logRecord := s.LogRecords().At(k) + batchIndex, ok := logRecord.Attributes().Get("batch_index") + require.True(tc.t, ok, "batch_index missing from attributes; use perfDataProvider") + itemIndex, ok := logRecord.Attributes().Get("item_index") + require.True(tc.t, ok, "item_index missing from attributes; use perfDataProvider") + + idsReceived = append(idsReceived, [2]string{batchIndex.Str(), itemIndex.Str()}) + } + } + } + } + + assert.ElementsMatch(tc.t, idsSent, idsReceived) + } +} + +func (c *CorrectnessLogTestValidator) RecordResults(tc *TestCase) { + rc := tc.agentProc.GetTotalConsumption() + + var result string + if tc.t.Failed() { + result = "FAIL" + } else { + result = "PASS" + } + + // Remove "Test" prefix from test name. + testName := tc.t.Name()[4:] + + tc.resultsSummary.Add(tc.t.Name(), &PerformanceTestResult{ + testName: testName, + result: result, + receivedSpanCount: tc.MockBackend.DataItemsReceived(), + sentSpanCount: tc.LoadGenerator.DataItemsSent(), + duration: time.Since(tc.startTime), + cpuPercentageAvg: rc.CPUPercentAvg, + cpuPercentageMax: rc.CPUPercentMax, + ramMibAvg: rc.RAMMiBAvg, + ramMibMax: rc.RAMMiBMax, + errorCause: tc.errorCause, + }) +} diff --git a/testbed/tests/log_test.go b/testbed/tests/log_test.go index e9e1437a467e..2d1a0ae1b481 100644 --- a/testbed/tests/log_test.go +++ b/testbed/tests/log_test.go @@ -7,7 +7,14 @@ package tests import ( + "context" + "path" + "path/filepath" + "sync/atomic" "testing" + "time" + + "github.com/stretchr/testify/require" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/common/testutil" "github.com/open-telemetry/opentelemetry-collector-contrib/testbed/datareceivers" @@ -233,3 +240,113 @@ func TestLogOtlpSendingQueue(t *testing.T) { }) } + +func TestLogLargeFiles(t *testing.T) { + tests := []struct { + name string + sender testbed.DataSender + receiver testbed.DataReceiver + loadOptions testbed.LoadOptions + sleepSeconds int + }{ + { + /* + * The FileLogWriter generates strings almost 100 bytes each. + * With a rate of 200,000 lines per second over a duration of 100 seconds, + * this results in a file size of approximately 2GB over its lifetime. + */ + name: "filelog-largefiles-2Gb-lifetime", + sender: datasenders.NewFileLogWriter(), + receiver: testbed.NewOTLPDataReceiver(testutil.GetAvailablePort(t)), + loadOptions: testbed.LoadOptions{ + DataItemsPerSecond: 200000, + ItemsPerBatch: 1, + Parallel: 100, + }, + sleepSeconds: 100, + }, + { + /* + * The FileLogWriter generates strings almost 100 bytes each. + * With a rate of 330,000 lines per second over a duration of 200 seconds, + * this results in a file size of approximately 6GB over its lifetime. + */ + name: "filelog-largefiles-6GB-lifetime", + sender: datasenders.NewFileLogWriter(), + receiver: testbed.NewOTLPDataReceiver(testutil.GetAvailablePort(t)), + loadOptions: testbed.LoadOptions{ + DataItemsPerSecond: 330000, + ItemsPerBatch: 10, + Parallel: 10, + }, + sleepSeconds: 200, + }, + } + processors := map[string]string{ + "batch": ` + batch: +`} + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + ScenarioLong( + t, + test.sender, + test.receiver, + test.loadOptions, + performanceResultsSummary, + test.sleepSeconds, + processors, + ) + }) + } +} + +func TestLargeFileOnce(t *testing.T) { + processors := map[string]string{ + "batch": ` + batch: +`, + } + resultDir, err := filepath.Abs(path.Join("results", t.Name())) + require.NoError(t, err) + sender := datasenders.NewFileLogWriter() + receiver := testbed.NewOTLPDataReceiver(testutil.GetAvailablePort(t)) + loadOptions := testbed.LoadOptions{ + DataItemsPerSecond: 1, + ItemsPerBatch: 10000000, + Parallel: 1, + } + + // Write data at once, before starting up the collector + dataProvider := testbed.NewPerfTestDataProvider(loadOptions) + dataItemsGenerated := atomic.Uint64{} + dataProvider.SetLoadGeneratorCounters(&dataItemsGenerated) + ld, _ := dataProvider.GenerateLogs() + + require.NoError(t, sender.ConsumeLogs(context.Background(), ld)) + agentProc := testbed.NewChildProcessCollector(testbed.WithEnvVar("GOMAXPROCS", "2")) + + configStr := createConfigYaml(t, sender, receiver, resultDir, processors, nil) + configCleanup, err := agentProc.PrepareConfig(configStr) + require.NoError(t, err) + defer configCleanup() + + tc := testbed.NewTestCase( + t, + dataProvider, + sender, + receiver, + agentProc, + &testbed.CorrectnessLogTestValidator{}, + performanceResultsSummary, + ) + t.Cleanup(tc.Stop) + + tc.StartBackend() + tc.StartAgent() + + tc.WaitForN(func() bool { return dataItemsGenerated.Load() == tc.MockBackend.DataItemsReceived() }, 20*time.Second, "all logs received") + + tc.StopAgent() + tc.ValidateData() +} diff --git a/testbed/tests/scenarios.go b/testbed/tests/scenarios.go index 5aa05c135853..78128d612d7e 100644 --- a/testbed/tests/scenarios.go +++ b/testbed/tests/scenarios.go @@ -501,6 +501,52 @@ func ScenarioSendingQueuesNotFull( tc.ValidateData() } +func ScenarioLong( + t *testing.T, + sender testbed.DataSender, + receiver testbed.DataReceiver, + loadOptions testbed.LoadOptions, + resultsSummary testbed.TestResultsSummary, + sleepTime int, + processors map[string]string, +) { + resultDir, err := filepath.Abs(path.Join("results", t.Name())) + require.NoError(t, err) + + agentProc := testbed.NewChildProcessCollector(testbed.WithEnvVar("GOMAXPROCS", "2")) + + configStr := createConfigYaml(t, sender, receiver, resultDir, processors, nil) + configCleanup, err := agentProc.PrepareConfig(configStr) + require.NoError(t, err) + defer configCleanup() + dataProvider := testbed.NewPerfTestDataProvider(loadOptions) + tc := testbed.NewTestCase( + t, + dataProvider, + sender, + receiver, + agentProc, + &testbed.CorrectnessLogTestValidator{}, + resultsSummary, + ) + t.Cleanup(tc.Stop) + + tc.StartBackend() + tc.StartAgent() + + tc.StartLoad(loadOptions) + + tc.WaitFor(func() bool { return tc.LoadGenerator.DataItemsSent() > 0 }, "load generator started") + + tc.Sleep(time.Second * time.Duration(sleepTime)) + + tc.StopLoad() + + tc.WaitFor(func() bool { return tc.LoadGenerator.DataItemsSent() == tc.MockBackend.DataItemsReceived() }, "all logs received") + + tc.ValidateData() +} + func constructLoadOptions(test TestCase) testbed.LoadOptions { options := testbed.LoadOptions{DataItemsPerSecond: 1000, ItemsPerBatch: 10} options.Attributes = make(map[string]string)