Skip to content

Commit

Permalink
[chore][testbed] - Further testbed enhancements (#35209)
Browse files Browse the repository at this point in the history
PR to add memory limiter test cases.

This tests scenarios where collector is understress and we verify that
we receive all the data, without drops/duplication.
  • Loading branch information
VihasMakwana authored Nov 7, 2024
1 parent 9ae4b5b commit cd2970a
Show file tree
Hide file tree
Showing 3 changed files with 133 additions and 1 deletion.
9 changes: 8 additions & 1 deletion testbed/testbed/receivers.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ type BaseOTLPDataReceiver struct {
compression string
retry string
sendingQueue string
timeout string
}

func (bor *BaseOTLPDataReceiver) Start(tc consumer.Traces, mc consumer.Metrics, lc consumer.Logs) error {
Expand Down Expand Up @@ -98,6 +99,11 @@ func (bor *BaseOTLPDataReceiver) WithQueue(sendingQueue string) *BaseOTLPDataRec
return bor
}

func (bor *BaseOTLPDataReceiver) WithTimeout(timeout string) *BaseOTLPDataReceiver {
bor.timeout = timeout
return bor
}

func (bor *BaseOTLPDataReceiver) Stop() error {
// we reuse the receiver across signals. Shutting down the log receiver shuts down the metrics and traces receiver.
return bor.logReceiver.Shutdown(context.Background())
Expand All @@ -118,8 +124,9 @@ func (bor *BaseOTLPDataReceiver) GenConfigYAMLStr() string {
endpoint: "%s"
%s
%s
%s
tls:
insecure: true`, bor.exporterType, addr, bor.retry, bor.sendingQueue)
insecure: true`, bor.exporterType, addr, bor.retry, bor.sendingQueue, bor.timeout)
comp := "none"
if bor.compression != "" {
comp = bor.compression
Expand Down
36 changes: 36 additions & 0 deletions testbed/tests/log_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -369,3 +369,39 @@ func TestLargeFileOnce(t *testing.T) {
tc.StopAgent()
tc.ValidateData()
}

func TestMemoryLimiterHit(t *testing.T) {
otlpreceiver := testbed.NewOTLPDataReceiver(testutil.GetAvailablePort(t))
otlpreceiver.WithRetry(`
retry_on_failure:
enabled: true
max_interval: 5s
`)
otlpreceiver.WithQueue(`
sending_queue:
enabled: true
queue_size: 100000
num_consumers: 20
`)
otlpreceiver.WithTimeout(`
timeout: 0s
`)
processors := map[string]string{
"memory_limiter": `
memory_limiter:
check_interval: 1s
limit_mib: 300
spike_limit_mib: 150
`,
}
ScenarioMemoryLimiterHit(
t,
testbed.NewOTLPLogsDataSender(testbed.DefaultHost, testutil.GetAvailablePort(t)),
otlpreceiver,
testbed.LoadOptions{
DataItemsPerSecond: 100000,
ItemsPerBatch: 1000,
Parallel: 1,
},
performanceResultsSummary, 100, processors)
}
89 changes: 89 additions & 0 deletions testbed/tests/scenarios.go
Original file line number Diff line number Diff line change
Expand Up @@ -552,6 +552,95 @@ func ScenarioLong(
tc.ValidateData()
}

func ScenarioMemoryLimiterHit(
t *testing.T,
sender testbed.DataSender,
receiver testbed.DataReceiver,
loadOptions testbed.LoadOptions,
resultsSummary testbed.TestResultsSummary,
sleepTime int,
processors map[string]string,
) {
resultDir, err := filepath.Abs(path.Join("results", t.Name()))
require.NoError(t, err)

agentProc := testbed.NewChildProcessCollector(testbed.WithEnvVar("GOMAXPROCS", "2"))

configStr := createConfigYaml(t, sender, receiver, resultDir, processors, nil)
fmt.Println(configStr)
configCleanup, err := agentProc.PrepareConfig(configStr)
require.NoError(t, err)
defer configCleanup()
dataProvider := testbed.NewPerfTestDataProvider(loadOptions)
dataChannel := make(chan bool)
tc := testbed.NewTestCase(
t,
dataProvider,
sender,
receiver,
agentProc,
&testbed.CorrectnessLogTestValidator{},
resultsSummary,
testbed.WithDecisionFunc(func() error { return testbed.GenerateNonPernamentErrorUntil(dataChannel) }),
)
t.Cleanup(tc.Stop)
tc.MockBackend.EnableRecording()

tc.StartBackend()
tc.StartAgent()

tc.StartLoad(loadOptions)

tc.WaitFor(func() bool { return tc.LoadGenerator.DataItemsSent() > 0 }, "load generator started")

var timer *time.Timer

// check for "Memory usage is above hard limit"
tc.WaitForN(func() bool {
logFound := tc.AgentLogsContains("Memory usage is above soft limit. Refusing data.")
if !logFound {
dataChannel <- true
return false
}
// Log found. But keep the collector under stress for 10 more seconds so it starts refusing data
if timer == nil {
timer = time.NewTimer(10 * time.Second)
}
select {
case <-timer.C:
default:
return false
}
close(dataChannel)
return logFound
}, time.Second*time.Duration(sleepTime), "memory limit not hit")

// check if data started to be received successfully
tc.WaitForN(func() bool {
return tc.MockBackend.DataItemsReceived() > 0
}, time.Second*time.Duration(sleepTime), "data started to be successfully received")

// stop sending any more data
tc.StopLoad()

tc.WaitForN(func() bool { return tc.LoadGenerator.DataItemsSent() == tc.MockBackend.DataItemsReceived() }, time.Second*time.Duration(sleepTime), "all logs received")

tc.WaitForN(func() bool {
// get IDs from logs to retry
logsToRetry := getLogsID(tc.MockBackend.LogsToRetry)

// get IDs from logs received successfully
successfulLogs := getLogsID(tc.MockBackend.ReceivedLogs)

// check if all the logs to retry were actually retried
logsWereRetried := allElementsExistInSlice(logsToRetry, successfulLogs)
return logsWereRetried
}, time.Second*time.Duration(sleepTime), "all logs were retried successfully")

tc.StopAgent()
tc.ValidateData()
}

func constructLoadOptions(test TestCase) testbed.LoadOptions {
options := testbed.LoadOptions{DataItemsPerSecond: 1000, ItemsPerBatch: 10}
options.Attributes = make(map[string]string)
Expand Down

0 comments on commit cd2970a

Please sign in to comment.