Skip to content

Commit

Permalink
[testbed] - Add scenarios to handle large files (open-telemetry#34417)
Browse files Browse the repository at this point in the history
**Description:** 

Add test cases covering large files to existing testbed.
This PR adds following scenarios:

Scenario 1: Ensure that all logs are captured for files that reach a
size of 2GB.
Scenario 2: Ensure that all logs are captured for files that reach a
size of 6GB
Scenario 3: Ensure that all logs are captured for a file of
approximately 1.5GB that contains prewritten data.


**Link to tracking Issue:**
open-telemetry#34288

**Testing:** Added

---------

Co-authored-by: Antoine Toulme <[email protected]>
  • Loading branch information
2 people authored and f7o committed Sep 12, 2024
1 parent c7f879a commit 93bddee
Show file tree
Hide file tree
Showing 4 changed files with 262 additions and 0 deletions.
27 changes: 27 additions & 0 deletions .chloggen/add-large-file-tests.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: testbed

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add test case scenarios to handle large files to existing testbed.

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [34288]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
72 changes: 72 additions & 0 deletions testbed/testbed/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,12 @@ import (
"log"
"reflect"
"sort"
"strconv"
"strings"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/ptrace"
)
Expand Down Expand Up @@ -561,3 +563,73 @@ func populateSpansMap(spansMap map[string]ptrace.Span, tds []ptrace.Traces) {
func traceIDAndSpanIDToString(traceID pcommon.TraceID, spanID pcommon.SpanID) string {
return fmt.Sprintf("%s-%s", traceID, spanID)
}

type CorrectnessLogTestValidator struct {
dataProvider DataProvider
}

func NewCorrectnessLogTestValidator(provider DataProvider) *CorrectnessLogTestValidator {
return &CorrectnessLogTestValidator{
dataProvider: provider,
}
}

func (c *CorrectnessLogTestValidator) Validate(tc *TestCase) {
if dataProvider, ok := c.dataProvider.(*perfTestDataProvider); ok {
logsReceived := tc.MockBackend.ReceivedLogs

idsSent := make([][2]string, 0)
idsReceived := make([][2]string, 0)

for batch := 0; batch < int(dataProvider.traceIDSequence.Load()); batch++ {
for idx := 0; idx < dataProvider.options.ItemsPerBatch; idx++ {
idsSent = append(idsSent, [2]string{"batch_" + strconv.Itoa(batch), "item_" + strconv.Itoa(idx)})
}
}
for _, log := range logsReceived {
for i := 0; i < log.ResourceLogs().Len(); i++ {
for j := 0; j < log.ResourceLogs().At(i).ScopeLogs().Len(); j++ {
s := log.ResourceLogs().At(i).ScopeLogs().At(j)
for k := 0; k < s.LogRecords().Len(); k++ {
logRecord := s.LogRecords().At(k)
batchIndex, ok := logRecord.Attributes().Get("batch_index")
require.True(tc.t, ok, "batch_index missing from attributes; use perfDataProvider")
itemIndex, ok := logRecord.Attributes().Get("item_index")
require.True(tc.t, ok, "item_index missing from attributes; use perfDataProvider")

idsReceived = append(idsReceived, [2]string{batchIndex.Str(), itemIndex.Str()})
}
}
}
}

assert.ElementsMatch(tc.t, idsSent, idsReceived)
}
}

func (c *CorrectnessLogTestValidator) RecordResults(tc *TestCase) {
rc := tc.agentProc.GetTotalConsumption()

var result string
if tc.t.Failed() {
result = "FAIL"
} else {
result = "PASS"
}

// Remove "Test" prefix from test name.
testName := tc.t.Name()[4:]

tc.resultsSummary.Add(tc.t.Name(), &PerformanceTestResult{
testName: testName,
result: result,
receivedSpanCount: tc.MockBackend.DataItemsReceived(),
sentSpanCount: tc.LoadGenerator.DataItemsSent(),
duration: time.Since(tc.startTime),
cpuPercentageAvg: rc.CPUPercentAvg,
cpuPercentageMax: rc.CPUPercentMax,
ramMibAvg: rc.RAMMiBAvg,
ramMibMax: rc.RAMMiBMax,
errorCause: tc.errorCause,
})
}
117 changes: 117 additions & 0 deletions testbed/tests/log_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,14 @@
package tests

import (
"context"
"path"
"path/filepath"
"sync/atomic"
"testing"
"time"

"github.com/stretchr/testify/require"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/common/testutil"
"github.com/open-telemetry/opentelemetry-collector-contrib/testbed/datareceivers"
Expand Down Expand Up @@ -233,3 +240,113 @@ func TestLogOtlpSendingQueue(t *testing.T) {
})

}

func TestLogLargeFiles(t *testing.T) {
tests := []struct {
name string
sender testbed.DataSender
receiver testbed.DataReceiver
loadOptions testbed.LoadOptions
sleepSeconds int
}{
{
/*
* The FileLogWriter generates strings almost 100 bytes each.
* With a rate of 200,000 lines per second over a duration of 100 seconds,
* this results in a file size of approximately 2GB over its lifetime.
*/
name: "filelog-largefiles-2Gb-lifetime",
sender: datasenders.NewFileLogWriter(),
receiver: testbed.NewOTLPDataReceiver(testutil.GetAvailablePort(t)),
loadOptions: testbed.LoadOptions{
DataItemsPerSecond: 200000,
ItemsPerBatch: 1,
Parallel: 100,
},
sleepSeconds: 100,
},
{
/*
* The FileLogWriter generates strings almost 100 bytes each.
* With a rate of 330,000 lines per second over a duration of 200 seconds,
* this results in a file size of approximately 6GB over its lifetime.
*/
name: "filelog-largefiles-6GB-lifetime",
sender: datasenders.NewFileLogWriter(),
receiver: testbed.NewOTLPDataReceiver(testutil.GetAvailablePort(t)),
loadOptions: testbed.LoadOptions{
DataItemsPerSecond: 330000,
ItemsPerBatch: 10,
Parallel: 10,
},
sleepSeconds: 200,
},
}
processors := map[string]string{
"batch": `
batch:
`}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
ScenarioLong(
t,
test.sender,
test.receiver,
test.loadOptions,
performanceResultsSummary,
test.sleepSeconds,
processors,
)
})
}
}

func TestLargeFileOnce(t *testing.T) {
processors := map[string]string{
"batch": `
batch:
`,
}
resultDir, err := filepath.Abs(path.Join("results", t.Name()))
require.NoError(t, err)
sender := datasenders.NewFileLogWriter()
receiver := testbed.NewOTLPDataReceiver(testutil.GetAvailablePort(t))
loadOptions := testbed.LoadOptions{
DataItemsPerSecond: 1,
ItemsPerBatch: 10000000,
Parallel: 1,
}

// Write data at once, before starting up the collector
dataProvider := testbed.NewPerfTestDataProvider(loadOptions)
dataItemsGenerated := atomic.Uint64{}
dataProvider.SetLoadGeneratorCounters(&dataItemsGenerated)
ld, _ := dataProvider.GenerateLogs()

require.NoError(t, sender.ConsumeLogs(context.Background(), ld))
agentProc := testbed.NewChildProcessCollector(testbed.WithEnvVar("GOMAXPROCS", "2"))

configStr := createConfigYaml(t, sender, receiver, resultDir, processors, nil)
configCleanup, err := agentProc.PrepareConfig(configStr)
require.NoError(t, err)
defer configCleanup()

tc := testbed.NewTestCase(
t,
dataProvider,
sender,
receiver,
agentProc,
&testbed.CorrectnessLogTestValidator{},
performanceResultsSummary,
)
t.Cleanup(tc.Stop)

tc.StartBackend()
tc.StartAgent()

tc.WaitForN(func() bool { return dataItemsGenerated.Load() == tc.MockBackend.DataItemsReceived() }, 20*time.Second, "all logs received")

tc.StopAgent()
tc.ValidateData()
}
46 changes: 46 additions & 0 deletions testbed/tests/scenarios.go
Original file line number Diff line number Diff line change
Expand Up @@ -501,6 +501,52 @@ func ScenarioSendingQueuesNotFull(
tc.ValidateData()
}

func ScenarioLong(
t *testing.T,
sender testbed.DataSender,
receiver testbed.DataReceiver,
loadOptions testbed.LoadOptions,
resultsSummary testbed.TestResultsSummary,
sleepTime int,
processors map[string]string,
) {
resultDir, err := filepath.Abs(path.Join("results", t.Name()))
require.NoError(t, err)

agentProc := testbed.NewChildProcessCollector(testbed.WithEnvVar("GOMAXPROCS", "2"))

configStr := createConfigYaml(t, sender, receiver, resultDir, processors, nil)
configCleanup, err := agentProc.PrepareConfig(configStr)
require.NoError(t, err)
defer configCleanup()
dataProvider := testbed.NewPerfTestDataProvider(loadOptions)
tc := testbed.NewTestCase(
t,
dataProvider,
sender,
receiver,
agentProc,
&testbed.CorrectnessLogTestValidator{},
resultsSummary,
)
t.Cleanup(tc.Stop)

tc.StartBackend()
tc.StartAgent()

tc.StartLoad(loadOptions)

tc.WaitFor(func() bool { return tc.LoadGenerator.DataItemsSent() > 0 }, "load generator started")

tc.Sleep(time.Second * time.Duration(sleepTime))

tc.StopLoad()

tc.WaitFor(func() bool { return tc.LoadGenerator.DataItemsSent() == tc.MockBackend.DataItemsReceived() }, "all logs received")

tc.ValidateData()
}

func constructLoadOptions(test TestCase) testbed.LoadOptions {
options := testbed.LoadOptions{DataItemsPerSecond: 1000, ItemsPerBatch: 10}
options.Attributes = make(map[string]string)
Expand Down

0 comments on commit 93bddee

Please sign in to comment.