diff --git a/eng/Packages.Data.props b/eng/Packages.Data.props index bc9ff86e2735e..99617b3caccad 100644 --- a/eng/Packages.Data.props +++ b/eng/Packages.Data.props @@ -187,6 +187,7 @@ + diff --git a/sdk/eventhub/Azure.Messaging.EventHubs.Shared/src/Azure.Messaging.EventHubs.Shared.Stress.projitems b/sdk/eventhub/Azure.Messaging.EventHubs.Shared/src/Azure.Messaging.EventHubs.Shared.Stress.projitems new file mode 100644 index 0000000000000..bf907dbcbbb52 --- /dev/null +++ b/sdk/eventhub/Azure.Messaging.EventHubs.Shared/src/Azure.Messaging.EventHubs.Shared.Stress.projitems @@ -0,0 +1,15 @@ + + + + $(MSBuildAllProjects);$(MSBuildThisFileFullPath) + true + + + + Azure.Messaging.EventHubs.Tests + + + + + + diff --git a/sdk/eventhub/Azure.Messaging.EventHubs.Shared/src/Testing/EventGenerator.cs b/sdk/eventhub/Azure.Messaging.EventHubs.Shared/src/Testing/EventGenerator.cs index c9e54e697ace3..517a6cf9f72d6 100644 --- a/sdk/eventhub/Azure.Messaging.EventHubs.Shared/src/Testing/EventGenerator.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs.Shared/src/Testing/EventGenerator.cs @@ -64,6 +64,47 @@ public static IEnumerable CreateEvents(int numberOfEvents) } } + /// + /// Creates a random set of events with random data and random body size within the specified constraints. + /// + /// + /// The maximum size of the batch. + /// The number of events to create. + /// The percentage of events that should be large. + /// The minimum size of the event body. + /// The maximum size of the event body. + /// + /// The requested set of events. + /// + public static IEnumerable CreateEvents(long maximumBatchSize, + int numberOfEvents, + int largeMessageRandomFactor = 30, + int minimumBodySize = 15, + int maximumBodySize = 83886) + { + var activeMinimumBodySize = minimumBodySize; + var activeMaximumBodySize = maximumBodySize; + var totalBytesGenerated = 0; + + if (RandomNumberGenerator.Value.Next(1, 100) < largeMessageRandomFactor) + { + activeMinimumBodySize = (int)Math.Ceiling(maximumBodySize * 0.65); + } + else + { + activeMaximumBodySize = (int)Math.Floor((maximumBatchSize * 1.0f) / numberOfEvents); + } + + for (var index = 0; ((index < numberOfEvents) && (totalBytesGenerated <= maximumBatchSize)); ++index) + { + var buffer = new byte[RandomNumberGenerator.Value.Next(activeMinimumBodySize, activeMaximumBodySize)]; + RandomNumberGenerator.Value.NextBytes(buffer); + totalBytesGenerated += buffer.Length; + + yield return CreateEventFromBody(buffer); + } + } + /// /// Creates a set of events with random data and a small body size. /// diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/stress/.gitignore b/sdk/eventhub/Azure.Messaging.EventHubs/stress/.gitignore new file mode 100644 index 0000000000000..5e7951dbc6be5 --- /dev/null +++ b/sdk/eventhub/Azure.Messaging.EventHubs/stress/.gitignore @@ -0,0 +1,2 @@ +stress-test-resources.json +charts \ No newline at end of file diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/stress/Chart.lock b/sdk/eventhub/Azure.Messaging.EventHubs/stress/Chart.lock new file mode 100644 index 0000000000000..7ce2e1e916768 --- /dev/null +++ b/sdk/eventhub/Azure.Messaging.EventHubs/stress/Chart.lock @@ -0,0 +1,6 @@ +dependencies: +- name: stress-test-addons + repository: https://stresstestcharts.blob.core.windows.net/helm/ + version: 0.1.13 +digest: sha256:007ec9983233e0f8c8ad8aa7f1df5f57b1b4c127d223d59d233b3c5366bd5173 +generated: "2022-04-12T11:20:37.5250017-07:00" diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/stress/Chart.yaml b/sdk/eventhub/Azure.Messaging.EventHubs/stress/Chart.yaml new file mode 100644 index 0000000000000..20c2c58e20296 --- /dev/null +++ b/sdk/eventhub/Azure.Messaging.EventHubs/stress/Chart.yaml @@ -0,0 +1,15 @@ +apiVersion: v2 +name: eventhub-net-stress-test-2 +version: 0.1.2 +description: Stress tests for Event Hubs for .NET + +dependencies: +- name: stress-test-addons + version: 0.1.13 + repository: https://stresstestcharts.blob.core.windows.net/helm/ + +annotations: + stressTest: 'true' + namespace: 'net' + dockerbuilddir: '../../../..' + dockerfile: './Dockerfile' \ No newline at end of file diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/stress/Dockerfile b/sdk/eventhub/Azure.Messaging.EventHubs/stress/Dockerfile new file mode 100644 index 0000000000000..43f02cd87fb51 --- /dev/null +++ b/sdk/eventhub/Azure.Messaging.EventHubs/stress/Dockerfile @@ -0,0 +1,34 @@ +# right version of dotnet? +FROM mcr.microsoft.com/dotnet/sdk:6.0-cbl-mariner1.0 AS build-env + +# Copy in engineering system needed to build +COPY ./eng /app/eng +COPY ./tools /app/tools +COPY ./*.proj /app +COPY ./Directory.Build.props /app +COPY ./Directory.Build.targets /app +COPY ./NuGet.Config /app + +# Copy in Event Hubs Source Code +COPY ./sdk/eventhub/Azure.Messaging.EventHubs/ /app/sdk/eventhub/Azure.Messaging.EventHubs/ +COPY ./sdk/eventhub/Azure.Messaging.EventHubs.Shared /app/sdk/eventhub/Azure.Messaging.EventHubs.Shared + +# Copy in Core Source Code +COPY ./sdk/core /app/sdk/core + +# Run restore and build, copy the .dll files over so they can used in the running container +WORKDIR /app +RUN dotnet build './sdk/eventhub/Azure.Messaging.EventHubs/stress/src/' --configuration Release + +# Copy in the dll files to be ready to run +FROM build-env as publish +WORKDIR /app +COPY --from=build-env /app/artifacts/bin/Azure.Messaging.EventHubs.Stress/Release/net6.0 /app/artifacts/bin/Azure.Messaging.EventHubs.Stress/Release/net6.0 +COPY --from=build-env /app/sdk/eventhub/Azure.Messaging.EventHubs.Shared /app/sdk/evethub/Azure.Messaging.EventHubs.Shared +COPY --from=build-env /app/sdk/eventhub/Azure.Messaging.EventHubs/stress /app/sdk/eventhub/Azure.Messaging.EventHubs/stress + +WORKDIR /app/artifacts/bin/Azure.Messaging.EventHubs.Stress/Release/net6.0 + +# The default is running just the "EventProducerTest" +ENTRYPOINT ["dotnet Azure.Messaging.EventHubs.Stress.dll", "--tests"] +CMD ["EventProducerTest"] \ No newline at end of file diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/stress/README.md b/sdk/eventhub/Azure.Messaging.EventHubs/stress/README.md new file mode 100644 index 0000000000000..344a54b153bff --- /dev/null +++ b/sdk/eventhub/Azure.Messaging.EventHubs/stress/README.md @@ -0,0 +1,120 @@ +# Azure Event Hubs client library for .NET +The scenarios in this directory provide a suite of stress tests that test the Event Hubs producer and buffered producer client types for long-term durability and reliability. For more in-depth information about the Azure SDK stress test tools, see the [stress test readme](https://github.com/Azure/azure-sdk-tools/blob/main/tools/stress-cluster/chaos/README.md). + +Test runs can call any of the following tests: +- "EventProducerTest" +- "EventBufferedProducerTest" +- "BurstBufferedProducerTest" +- "ConcurrentBufferedProducerTest" + +Or, you can run all tests with the "--all" flag. + +## Getting started + +### Install the package + +```cmd +(env) /sdk/eventhub/Azure.Messaging.EventHubs/stress/src> dotnet clean +(env) /sdk/eventhub/Azure.Messaging.EventHubs/stress/src> dotnet publish +``` + +### Prerequisites + +When tests are run locally, Azure resources need to be created prior to running the test. This can be done through the Azure CLI, an ARM pr bicep file, or the Azure Portal. The bicep file included in this directory can be used to [deploy all resources](https://docs.microsoft.com/azure/azure-resource-manager/bicep/deploy-to-resource-group?tabs=azure-cli) aside from the application insights portal. + +To run the compiled .dll file, navigate to the `/artifacts/bin/Azure.Messaging.EventHubs.Stress/Release/net6.0` directory. + +### Authenticate the client + +The user is required to input the connection strings upon request on the command line when the test is being run, or include them in a .env file. To use the CLI input, add the `-i` or `--interactive` flag to the call: + + +#### Running stress tests locally +```cmd +(env) /sdk/eventhub/Azure.Messaging.EventHubs/stress/src> dotnet build './sdk/eventhub/Azure.Messaging.EventHubs/stress/src/' --configuration Release +(env) /artifacts/bin/Azure.Messaging.EventHubs.Stress/Release/net6.0> dotnet Azure.Messaging.EventHubs.Stress.dll --tests --interactive +``` + +To see the variable names for the environment file, see `EnvironmentVariables.cs`, The path to the environment file needs to be stored in the environment variable `$ENV_FILE`. In this case, do not include the `--interactive flag`. For more information about what specific resources are needed for each test, see the "Scenario Information" section below. + +The recommended approach is to run tests one at a time when running locally. +To run any one test, run the following: +```cmd +(env) /artifacts/bin/Azure.Messaging.EventHubs.Stress/Release/net6.0> dotnet Azure.Messaging.EventHubs.Stress.dll --tests --interactive +``` +If running all tests is desired, run the following: +```cmd +(env) /artifacts/bin/Azure.Messaging.EventHubs.Stress/Release/net6.0> dotnet Azure.Messaging.EventHubs.Stress.dll --all --interactive +``` + +## Key concepts + +### Scenario: Event Producer Test +This test requires an event hub namespace, an event hub, and an application insights resource. Note that an event hub may experience throttling if too few partitions are used. This test creates 2 producers and has 5 concurrent processes per producer sending batches of events. This is a long-running, consistent volume test. To run this test in interactive mode, run the following: +```cmd +(env) /artifacts/bin/Azure.Messaging.EventHubs.Stress/Release/net6.0> dotnet Azure.Messaging.EventHubs.Stress.dll --tests EventProducerTest --interactive +``` + +### Scenario: Event Buffered Producer Test +This test requires an event hub namespace, an event hub, and an application insights resource. High CPU usage may be experienced if too few partitions are used. This test creates 2 producers and continuously sends to each producer separately. This is a long-running, consistent volume test. To run this test in interactive mode, run the following: +```cmd +(env) /artifacts/bin/Azure.Messaging.EventHubs.Stress/Release/net6.0> dotnet Azure.Messaging.EventHubs.Stress.dll --tests BufferedProducerTest --interactive +``` + +### Scenario: Concurrent Buffered Producer Test +This test requires an event hub namespace, an event hub, and an application insights resource. High CPU usage may be experienced if too few partitions are used. This test creates 2 producers and has 5 concurrent processes per producer continuously sending events. This is a long-running, consistent volume test. To run this test in interactive mode, run the following: +```cmd +(env) /artifacts/bin/Azure.Messaging.EventHubs.Stress/Release/net6.0> dotnet Azure.Messaging.EventHubs.Stress.dll --tests ConcurrentBufferedProducerTest --interactive +``` + +### Scenario: Burst Buffered Producer Test +This test requires an event hub namespace, an event hub, and an application insights resource. Note that this test may have a high CPU usage. This test creates 2 producers and sends sets of events to each producer separately every 15 minutes. This is a long-running, variable volume test. To run this test in interactive mode, run the following: +```cmd +(env) /artifacts/bin/Azure.Messaging.EventHubs.Stress/Release/net6.0> dotnet Azure.Messaging.EventHubs.Stress.dll --tests BurstBufferedProducerTest --interactive +``` + +### Seeing Metrics and Logging in App Insights +All metrics and logging are sent to App Insights via the Instrumentation Key provided during the initialization of the test. A brief explanation of the metrics collection approach is described below. +- Any exceptions that occur are tracked by the telemetry client as exception telemetry. They can be accessed through the logs by filtering for exceptions, or through the application insights portal in the "Exceptions" blade. If an exception occured during send, the exception telemetry will include a "process" property containing "send" +- Successful enqueues and sends are tracked through Metrics. For the buffered producer, the total number of enqueues is tracked, and the actual sends include the ability to separate counts into partition Ids. + +## Examples + +#### Local Stress Test Run + +With a .env file: +```cmd +(env) /artifacts/bin/Azure.Messaging.EventHubs.Stress/Release/net6.0> dotnet Azure.Messaging.EventHubs.Stress.dll --all +``` + +Without a .env file: +```cmd +(env) /artifacts/bin/Azure.Messaging.EventHubs.Stress/Release/net6.0> dotnet Azure.Messaging.EventHubs.Stress.dll --all --interactive +``` + +## Troubleshooting + +- Ensure that all connection strings, istrumentation keys, and event hub names are correct. + +## Next steps + +### Deploying a stress test +In order to deploy stress tests to be run in kubernetes clusters, run: +```cmd +(env) /eng/common/scripts/stress-testing/deploy-stress-tests.ps1 ` +>> -Login ` +>> -PushImages +``` +This command requires Azure login credentials. + +## Contributing + +This project welcomes contributions and suggestions. Most contributions require you to agree to a Contributor License Agreement (CLA) declaring that you have the right to, and actually do, grant us the rights to use your contribution. For details, visit https://cla.microsoft.com. + +When you submit a pull request, a CLA-bot will automatically determine whether you need to provide a CLA and decorate the PR appropriately (e.g., label, comment). Simply follow the instructions provided by the bot. You will only need to do this once across all repos using our CLA. + +This project has adopted the [Microsoft Open Source Code of Conduct](https://opensource.microsoft.com/codeofconduct/). For more information see the [Code of Conduct FAQ](https://opensource.microsoft.com/codeofconduct/faq/) or contact [opencode@microsoft.com](mailto:opencode@microsoft.com) with any additional questions or comments. + +Please see our [contributing guide](https://github.com/Azure/azure-sdk-for-net/blob/main/sdk/eventhub/Azure.Messaging.EventHubs/CONTRIBUTING.md) for more information. + +![Impressions](https://azure-sdk-impressions.azurewebsites.net/api/impressions/azure-sdk-for-net%2Fsdk%2Feventhub%2FAzure.Messaging.EventHubs%2Fstress%2FREADME.png) diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/stress/charts/stress-test-addons-0.1.13.tgz b/sdk/eventhub/Azure.Messaging.EventHubs/stress/charts/stress-test-addons-0.1.13.tgz new file mode 100644 index 0000000000000..80e119cb82ddf Binary files /dev/null and b/sdk/eventhub/Azure.Messaging.EventHubs/stress/charts/stress-test-addons-0.1.13.tgz differ diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/stress/src/Azure.Messaging.EventHubs.Stress.csproj b/sdk/eventhub/Azure.Messaging.EventHubs/stress/src/Azure.Messaging.EventHubs.Stress.csproj new file mode 100644 index 0000000000000..6c86a756831eb --- /dev/null +++ b/sdk/eventhub/Azure.Messaging.EventHubs/stress/src/Azure.Messaging.EventHubs.Stress.csproj @@ -0,0 +1,29 @@ + + + 1.0.0 + Exe + latest + true + false + + + + + + + + + + + + + + + + + + + + + + diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/stress/src/Configurations/BufferedProducerTestConfig.cs b/sdk/eventhub/Azure.Messaging.EventHubs/stress/src/Configurations/BufferedProducerTestConfig.cs new file mode 100644 index 0000000000000..b9c8f4f5d0a9c --- /dev/null +++ b/sdk/eventhub/Azure.Messaging.EventHubs/stress/src/Configurations/BufferedProducerTestConfig.cs @@ -0,0 +1,20 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +using System; + +namespace Azure.Messaging.EventHubs.Stress +{ + internal class BufferedProducerTestConfig : TestConfig + { + // Test Configurations + + // The number of events to generate and enqueue during each iteration of PerformEnqueue + public int EventEnqueueListSize = 50; + public long MaximumEventListSize = 1_000_000; + + // Buffered Producer Configuration + + public TimeSpan MaxWaitTime = TimeSpan.FromSeconds(5); + } +} \ No newline at end of file diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/stress/src/Configurations/EventProducerTestConfig.cs b/sdk/eventhub/Azure.Messaging.EventHubs/stress/src/Configurations/EventProducerTestConfig.cs new file mode 100644 index 0000000000000..98ef893607fdf --- /dev/null +++ b/sdk/eventhub/Azure.Messaging.EventHubs/stress/src/Configurations/EventProducerTestConfig.cs @@ -0,0 +1,13 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +using System; + +namespace Azure.Messaging.EventHubs.Stress +{ + internal class EventProducerTestConfig : TestConfig + { + // The number of events to generate and put into a batch during each iteration of PerformSend + public int PublishBatchSize = 50; + } +} \ No newline at end of file diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/stress/src/Configurations/TestConfig.cs b/sdk/eventhub/Azure.Messaging.EventHubs/stress/src/Configurations/TestConfig.cs new file mode 100644 index 0000000000000..073d2cf03336b --- /dev/null +++ b/sdk/eventhub/Azure.Messaging.EventHubs/stress/src/Configurations/TestConfig.cs @@ -0,0 +1,32 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +using System; + +namespace Azure.Messaging.EventHubs.Stress +{ + internal class TestConfig + { + // true if the given test should be run. + public bool Run; + + // Resource Configurations + + public string EventHubsConnectionString; + public string EventHub; + public string InstrumentationKey; + + // Test Run Configurations + + public int DurationInHours = 120; + + // Publishing Configurations + + public int ConcurrentSends = 5; + public int PublishingBodyMinBytes = 100; + public int PublishingBodyRegularMaxBytes = 262144; + public int LargeMessageRandomFactorPercent = 50; + public TimeSpan SendTimeout = TimeSpan.FromMinutes(3); + public TimeSpan? ProducerPublishingDelay = TimeSpan.FromMilliseconds(4000); + } +} \ No newline at end of file diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/stress/src/Infrastructure/BufferedPublisher.cs b/sdk/eventhub/Azure.Messaging.EventHubs/stress/src/Infrastructure/BufferedPublisher.cs new file mode 100644 index 0000000000000..9609d0b0f29e5 --- /dev/null +++ b/sdk/eventhub/Azure.Messaging.EventHubs/stress/src/Infrastructure/BufferedPublisher.cs @@ -0,0 +1,175 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Diagnostics; +using System.IO; +using System.Linq; +using System.Text; +using System.Threading; +using System.Threading.Tasks; +using Azure.Messaging.EventHubs.Consumer; +using Azure.Messaging.EventHubs.Producer; +using Azure.Messaging.EventHubs.Tests; + +namespace Azure.Messaging.EventHubs.Stress +{ + internal class BufferedPublisher + { + private readonly string _connectionString; + private readonly string _eventHubName; + private readonly Metrics _metrics; + private readonly BufferedProducerTestConfig _testConfiguration; + + public BufferedPublisher(BufferedProducerTestConfig testConfiguration, + Metrics metrics) + { + _connectionString = testConfiguration.EventHubsConnectionString; + _eventHubName = testConfiguration.EventHub; + _metrics = metrics; + _testConfiguration = testConfiguration; + } + + public async Task Start(CancellationToken cancellationToken) + { + var enqueueTasks = new List(); + + while (!cancellationToken.IsCancellationRequested) + { + using var backgroundCancellationSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); + + // Create the buffered producer client and register the success and failure handlers + + var options = new EventHubBufferedProducerClientOptions + { + RetryOptions = new EventHubsRetryOptions + { + TryTimeout = _testConfiguration.SendTimeout, + MaximumRetries = 15 + }, + MaximumWaitTime = _testConfiguration.MaxWaitTime + }; + var producer = new EventHubBufferedProducerClient(_connectionString, _eventHubName, options); + + try + { + producer.SendEventBatchSucceededAsync += args => + { + var numEvents = args.EventBatch.Count; + + _metrics.Client.GetMetric(_metrics.SuccessfullySentFromQueue, "PartitionId").TrackValue(numEvents, args.PartitionId); + _metrics.Client.GetMetric(_metrics.BatchesPublished).TrackValue(1); + + return Task.CompletedTask; + }; + + producer.SendEventBatchFailedAsync += args => + { + var numEvents = args.EventBatch.Count; + + _metrics.Client.GetMetric(_metrics.EventsNotSentAfterEnqueue, "PartitionId").TrackValue(numEvents, args.PartitionId); + _metrics.Client.TrackException(args.Exception); + + return Task.CompletedTask; + }; + + // Start concurrent enqueuing tasks + + if (_testConfiguration.ConcurrentSends > 1) + { + for (var index = 0; index < _testConfiguration.ConcurrentSends - 1; ++index) + { + enqueueTasks.Add(Task.Run(async () => + { + while (!cancellationToken.IsCancellationRequested) + { + await PerformEnqueue(producer, cancellationToken).ConfigureAwait(false); + + if ((_testConfiguration.ProducerPublishingDelay.HasValue) && (_testConfiguration.ProducerPublishingDelay.Value > TimeSpan.Zero)) + { + await Task.Delay(_testConfiguration.ProducerPublishingDelay.Value, backgroundCancellationSource.Token).ConfigureAwait(false); + } + } + })); + } + } + // Perform one of the sends in the foreground, which will allow easier detection of a + // processor-level issue. + + while (!cancellationToken.IsCancellationRequested) + { + try + { + await PerformEnqueue(producer, cancellationToken).ConfigureAwait(false); + + if ((_testConfiguration.ProducerPublishingDelay.HasValue) && (_testConfiguration.ProducerPublishingDelay.Value > TimeSpan.Zero)) + { + await Task.Delay(_testConfiguration.ProducerPublishingDelay.Value, cancellationToken).ConfigureAwait(false); + } + } + catch (TaskCanceledException) + { + backgroundCancellationSource.Cancel(); + await Task.WhenAll(enqueueTasks).ConfigureAwait(false); + } + } + } + catch (TaskCanceledException) + { + // No action needed, the cancellation token has been cancelled. + } + catch (Exception ex) when + (ex is OutOfMemoryException + || ex is StackOverflowException + || ex is ThreadAbortException) + { + throw; + } + catch (Exception ex) + { + // If this catch is hit, it means the producer has restarted, collect metrics. + + _metrics.Client.GetMetric(_metrics.ProducerRestarted).TrackValue(1); + _metrics.Client.TrackException(ex); + } + finally + { + await producer.CloseAsync(false); + } + } + } + + private async Task PerformEnqueue(EventHubBufferedProducerClient producer, + CancellationToken cancellationToken) + { + var events = EventGenerator.CreateEvents(_testConfiguration.MaximumEventListSize, + _testConfiguration.EventEnqueueListSize, + _testConfiguration.LargeMessageRandomFactorPercent, + _testConfiguration.PublishingBodyMinBytes, + _testConfiguration.PublishingBodyRegularMaxBytes); + + try + { + await producer.EnqueueEventsAsync(events, cancellationToken).ConfigureAwait(false); + + _metrics.Client.GetMetric(_metrics.EventsEnqueued).TrackValue(_testConfiguration.EventEnqueueListSize); + } + catch (TaskCanceledException) + { + // Run is completed. + } + catch (Exception ex) + { + var eventProperties = new Dictionary(); + + // Track that the exception took place during the enqueuing of an event + + eventProperties.Add("Process", "Enqueue"); + + _metrics.Client.TrackException(ex, eventProperties); + } + } + } +} \ No newline at end of file diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/stress/src/Infrastructure/EnvironmentReader.cs b/sdk/eventhub/Azure.Messaging.EventHubs/stress/src/Infrastructure/EnvironmentReader.cs new file mode 100644 index 0000000000000..fe96a934b643f --- /dev/null +++ b/sdk/eventhub/Azure.Messaging.EventHubs/stress/src/Infrastructure/EnvironmentReader.cs @@ -0,0 +1,126 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +using System; +using System.Collections.Generic; +using System.IO; +using Azure.Core; + +namespace Azure.Messaging.EventHubs.Stress; + +/// +/// Allows for reading and parsing a set of environment information in the +/// dotenv format. +/// +/// +/// +/// +internal static class EnvironmentReader +{ + /// + /// Loads a dotenv file and parses the contents. + /// + /// + /// The full path, including filename, to the dotenv file. + /// + /// The set of environment content parsed from the file. + /// + /// Occurs when is null or empty. + /// Occurs when does not exist. + /// Occurs when a line of the file is malformed or the file contains duplicate environment variables. + /// + public static Dictionary LoadFromFile(string filePath) + { + Argument.AssertNotNullOrEmpty(filePath, nameof(filePath)); + + if (!File.Exists(filePath)) + { + throw new FileNotFoundException($"The environment file was not found.in '{ filePath }'"); + } + + var environment = new Dictionary(); + var count = 0; + + foreach (var fileLine in ReadFileLines(filePath)) + { + ++count; + + var line = fileLine.AsSpan(); + var firstCharacterPos = FindFirstCharacterIndex(line); + + if ((firstCharacterPos == -1) || (line[firstCharacterPos] == '#')) + { + continue; + } + + var separator = line.IndexOf('='); + + if (separator == -1) + { + throw new FormatException($"The environment file is malformed at line { count }: '{ line.ToString() }'"); + } + + try + { + var parsedKey = line.Slice(firstCharacterPos, separator).Trim().ToString(); + var parsedValue = line.Slice(separator + 1).Trim().ToString(); + + if (environment.ContainsKey(parsedKey)) + { + environment[parsedKey] = parsedValue; + } + else + { + environment.Add(parsedKey, parsedValue); + } + } + catch (ArgumentException ex) + { + throw new FormatException(ex.Message, ex); + } + } + + return environment; + } + + /// + /// Reads all lines from a file. + /// + /// + /// The full path, including filename, to the file to be read. + /// + /// An enumerable of the lines in a file. + /// + private static IEnumerable ReadFileLines(string filePath) + { + using var reader = new StreamReader(filePath); + + string line; + + while ((line = reader.ReadLine()) != null) + { + yield return line; + } + } + + /// + /// Finds the first non-whitespace character in a line of text. + /// + /// + /// The line of text to consider. + /// + /// The index of the first non-whitespace character; -1, if no character was found. + /// + private static int FindFirstCharacterIndex(ReadOnlySpan line) + { + var position = 0; + var length = line.Length; + + while ((position < length) && (char.IsWhiteSpace(line[position]))) + { + ++position; + } + + return (position < length) ? position : -1; + } +} diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/stress/src/Infrastructure/EnvironmentVariables.cs b/sdk/eventhub/Azure.Messaging.EventHubs/stress/src/Infrastructure/EnvironmentVariables.cs new file mode 100644 index 0000000000000..3a21bd955d4d4 --- /dev/null +++ b/sdk/eventhub/Azure.Messaging.EventHubs/stress/src/Infrastructure/EnvironmentVariables.cs @@ -0,0 +1,20 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +using System; + +namespace Azure.Messaging.EventHubs.Stress +{ + public static class EnvironmentVariables + { + // Shared Resources + public const string ApplicationInsightsKey = "APPINSIGHTS_INSTRUMENTATIONKEY"; + public const string EventHubsConnectionString = "EVENTHUB_NAMESPACE_CONNECTION_STRING"; + + // Event Hub Names + public const string EventHubBufferedProducerTest = "EVENTHUB_NAME_BUFFERED_PRODUCER_TEST"; + public const string EventHubEventProducerTest = "EVENTHUB_NAME_EVENT_PRODUCER_TEST"; + public const string EventHubBurstBufferedProducerTest = "EVENTHUB_NAME_BURST_BUFFERED_PRODUCER_TEST"; + public const string EventHubConcurrentBufferedProducerTest = "EVENTHUB_NAME_CONCURRENT_BUFFERED_PRODUCER_TEST"; + } +} \ No newline at end of file diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/stress/src/Infrastructure/Metrics.cs b/sdk/eventhub/Azure.Messaging.EventHubs/stress/src/Infrastructure/Metrics.cs new file mode 100644 index 0000000000000..db8153003033c --- /dev/null +++ b/sdk/eventhub/Azure.Messaging.EventHubs/stress/src/Infrastructure/Metrics.cs @@ -0,0 +1,47 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +using System; +using System.Collections.Concurrent; +using System.Diagnostics; +using System.Threading; +using Microsoft.ApplicationInsights; +using Microsoft.ApplicationInsights.Metrics; +using Microsoft.ApplicationInsights.Extensibility; + +namespace Azure.Messaging.EventHubs.Stress +{ + // Metrics names are shared between tests, since test information is collected through + // the container orchestration. Metrics are still filterable by test with application insights. + internal class Metrics + { + public TelemetryClient Client; + + // Environment statistics - Garbage Collection + public MetricIdentifier GenerationZeroCollections = new MetricIdentifier("GenerationZeroCollections"); + public MetricIdentifier GenerationOneCollections = new MetricIdentifier("GenerationOneCollections"); + public MetricIdentifier GenerationTwoCollections = new MetricIdentifier("GenerationTwoCollections"); + + // Shared Producer statistics + public MetricIdentifier BatchesPublished = new MetricIdentifier("BatchesPublished"); + public MetricIdentifier ProducerRestarted = new MetricIdentifier("ProducerRestarted"); + + // Buffered Producer statistics + public string EventsNotSentAfterEnqueue = "EventsNotSentAfterEnqueue"; + public string SuccessfullySentFromQueue = "SuccessfullySentFromQueue"; + public MetricIdentifier EventsEnqueued = new MetricIdentifier("EventsEnqueued"); + + // Event Producer statistics + public MetricIdentifier EventsPublished = new MetricIdentifier("EventsPublished"); + public MetricIdentifier PublishAttempts = new MetricIdentifier("PublishAttempts"); + public MetricIdentifier TotalPublishedSizeBytes = new MetricIdentifier("TotalPublishedSizeBytes"); + + public Metrics(string instrumentationKey) + { + var configuration = TelemetryConfiguration.CreateDefault(); + configuration.InstrumentationKey = instrumentationKey; + + Client = new TelemetryClient(configuration); + } + } +} \ No newline at end of file diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/stress/src/Infrastructure/Publisher.cs b/sdk/eventhub/Azure.Messaging.EventHubs/stress/src/Infrastructure/Publisher.cs new file mode 100644 index 0000000000000..584cdeb13f402 --- /dev/null +++ b/sdk/eventhub/Azure.Messaging.EventHubs/stress/src/Infrastructure/Publisher.cs @@ -0,0 +1,170 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Diagnostics; +using System.Threading; +using System.Threading.Tasks; +using Azure.Messaging.EventHubs; +using Azure.Messaging.EventHubs.Producer; +using Azure.Messaging.EventHubs.Tests; +using Microsoft.ApplicationInsights; + +namespace Azure.Messaging.EventHubs.Stress +{ + internal class Publisher + { + private readonly Metrics _metrics; + private readonly EventProducerTestConfig _testConfiguration; + + public Publisher(EventProducerTestConfig configuration, + Metrics metrics) + { + _testConfiguration = configuration; + _metrics = metrics; + } + + public async Task Start(CancellationToken cancellationToken) + { + var sendTasks = new List(); + + while (!cancellationToken.IsCancellationRequested) + { + using var backgroundCancellationSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); + var options = new EventHubProducerClientOptions + { + RetryOptions = new EventHubsRetryOptions + { + TryTimeout = _testConfiguration.SendTimeout + } + }; + var producer = new EventHubProducerClient(_testConfiguration.EventHubsConnectionString, _testConfiguration.EventHub, options); + + try + { + sendTasks.Clear(); + + // Create a set of background tasks to handle all but one of the concurrent sends. The final + // send will be performed directly. + + if (_testConfiguration.ConcurrentSends > 1) + { + for (var index = 0; index < _testConfiguration.ConcurrentSends - 1; ++index) + { + sendTasks.Add(Task.Run(async () => + { + while (!backgroundCancellationSource.Token.IsCancellationRequested) + { + await PerformSend(producer, backgroundCancellationSource.Token).ConfigureAwait(false); + + if ((_testConfiguration.ProducerPublishingDelay.HasValue) && (_testConfiguration.ProducerPublishingDelay.Value > TimeSpan.Zero)) + { + await Task.Delay(_testConfiguration.ProducerPublishingDelay.Value, backgroundCancellationSource.Token).ConfigureAwait(false); + } + } + })); + } + } + // Perform one of the sends in the foreground, which will allow easier detection of a + // processor-level issue. + + while (!cancellationToken.IsCancellationRequested) + { + try + { + await PerformSend(producer, cancellationToken).ConfigureAwait(false); + + if ((_testConfiguration.ProducerPublishingDelay.HasValue) && (_testConfiguration.ProducerPublishingDelay.Value > TimeSpan.Zero)) + { + await Task.Delay(_testConfiguration.ProducerPublishingDelay.Value, cancellationToken).ConfigureAwait(false); + } + } + catch (TaskCanceledException) + { + backgroundCancellationSource.Cancel(); + await Task.WhenAll(sendTasks).ConfigureAwait(false); + } + } + } + catch (TaskCanceledException) + { + // No action needed. + } + catch (Exception ex) when + (ex is OutOfMemoryException + || ex is StackOverflowException + || ex is ThreadAbortException) + { + throw; + } + catch (Exception ex) + { + // If this catch is hit, there's a problem with the producer itself; cancel the + // background sending and wait before allowing the producer to restart. + + backgroundCancellationSource.Cancel(); + await Task.WhenAll(sendTasks).ConfigureAwait(false); + + _metrics.Client.GetMetric(_metrics.ProducerRestarted).TrackValue(1); + _metrics.Client.TrackException(ex); + } + finally + { + await producer.CloseAsync(); + } + } + } + + private async Task PerformSend(EventHubProducerClient producer, + CancellationToken cancellationToken) + { + // Create the batch and generate a set of random events, keeping only those that were able to fit into the batch. + // Because there is a side-effect of TryAdd in the statement, ensure that ToList is called to materialize the set + // or the batch will be empty at send. + + using var batch = await producer.CreateBatchAsync().ConfigureAwait(false); + + var events = EventGenerator.CreateEvents(batch.MaximumSizeInBytes, + _testConfiguration.PublishBatchSize, + _testConfiguration.LargeMessageRandomFactorPercent, + _testConfiguration.PublishingBodyMinBytes, + _testConfiguration.PublishingBodyRegularMaxBytes); + + foreach (var currentEvent in events) + { + if (!batch.TryAdd(currentEvent)) + { + break; + } + } + + // Publish the events and report them, capturing any failures specific to the send operation. + + try + { + if (batch.Count > 0) + { + _metrics.Client.GetMetric(_metrics.PublishAttempts).TrackValue(1); + await producer.SendAsync(batch, cancellationToken).ConfigureAwait(false); + } + + _metrics.Client.GetMetric(_metrics.EventsPublished).TrackValue(batch.Count); + _metrics.Client.GetMetric(_metrics.BatchesPublished).TrackValue(1); + _metrics.Client.GetMetric(_metrics.TotalPublishedSizeBytes).TrackValue(batch.SizeInBytes); + } + catch (TaskCanceledException) + { + _metrics.Client.GetMetric(_metrics.PublishAttempts).TrackValue(-1); + } + catch (Exception ex) + { + var eventProperties = new Dictionary(); + eventProperties.Add("Process", "Send"); + + _metrics.Client.TrackException(ex, eventProperties); + } + } + } +} \ No newline at end of file diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/stress/src/Program.cs b/sdk/eventhub/Azure.Messaging.EventHubs/stress/src/Program.cs new file mode 100644 index 0000000000000..f276fe836a5eb --- /dev/null +++ b/sdk/eventhub/Azure.Messaging.EventHubs/stress/src/Program.cs @@ -0,0 +1,227 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Diagnostics; +using System.IO; +using System.Linq; +using System.Text; +using CommandLine; +using System.Threading; +using System.Threading.Tasks; +using Azure.Messaging.EventHubs.Consumer; +using Azure.Messaging.EventHubs.Producer; + +namespace Azure.Messaging.EventHubs.Stress +{ + /// + /// The main program thread that allows for both local and deployed test runs. + /// Determines which tests should be run and starts them in the background. + /// + /// + public class Program + { + /// + /// Parses the command line arguments and runs the specified tests. + /// + /// + /// The command line inputs. + /// + public static async Task Main(string[] args) + { + // Parse command line arguments + + await CommandLine.Parser.Default.ParseArguments(args).WithParsedAsync(RunOptions).ConfigureAwait(false); + } + + /// + /// Starts a background task for each test that needs to be run, and waits for all + /// test runs to completed before returning. + /// + /// + /// The parsed command line inputs. + /// + private static async Task RunOptions(Options opts) + { + var environment = new Dictionary(); + var appInsightsKey = String.Empty; + var eventHubsConnectionString = String.Empty; + var eventProducerTestConfig = new EventProducerTestConfig(); + var bufferedProducerTestConfig = new BufferedProducerTestConfig(); + var burstBufferedProducerTestConfig = new BufferedProducerTestConfig(); + var concurrentBufferedProducerTestConfig = new BufferedProducerTestConfig(); + + var testRunTasks = new List(); + + // Determine which tests should be run based on the command line args + + if (opts.All) + { + eventProducerTestConfig.Run = true; + bufferedProducerTestConfig.Run = true; + burstBufferedProducerTestConfig.Run = true; + concurrentBufferedProducerTestConfig.Run = true; + } + + foreach (var testRun in opts.Tests) + { + if (testRun == "EventProd" || testRun == "EventProducerTest") + { + eventProducerTestConfig.Run = true; + } + if (testRun == "BuffProd" || testRun == "BufferedProducerTest") + { + bufferedProducerTestConfig.Run = true; + } + if (testRun == "BurstBuffProd" || testRun == "BurstBufferedProducerTest") + { + burstBufferedProducerTestConfig.Run = true; + } + if (testRun == "ConcurBuffProd" || testRun == "ConcurrentBufferedProducerTest") + { + concurrentBufferedProducerTestConfig.Run = true; + } + } + + // See if there are environment variables available to use in the .env file + + var environmentFile = Environment.GetEnvironmentVariable("ENV_FILE"); + if (!(string.IsNullOrEmpty(environmentFile))) + { + environment = EnvironmentReader.LoadFromFile(environmentFile); + } + + environment.TryGetValue(EnvironmentVariables.ApplicationInsightsKey, out appInsightsKey); + environment.TryGetValue(EnvironmentVariables.EventHubsConnectionString, out eventHubsConnectionString); + + // Prompt for needed resources if interactive mode is enabled + + if (opts.Interactive) + { + eventHubsConnectionString = _promptForResources("Event Hubs connection string", "all test runs", eventHubsConnectionString); + appInsightsKey = _promptForResources("Application Insights key", "all test runs", appInsightsKey); + } + + // Run the event producer test + + if (eventProducerTestConfig.Run) + { + eventProducerTestConfig.InstrumentationKey = appInsightsKey; + eventProducerTestConfig.EventHubsConnectionString = eventHubsConnectionString; + + environment.TryGetValue(EnvironmentVariables.EventHubEventProducerTest, out eventProducerTestConfig.EventHub); + + if (opts.Interactive) + { + eventProducerTestConfig.EventHub = _promptForResources("Event Hub name", "event producer test", eventProducerTestConfig.EventHub); + } + + var testRun = new EventProducerTest(eventProducerTestConfig); + testRunTasks.Add(testRun.Run()); + } + + // Run the buffered producer test + + if (bufferedProducerTestConfig.Run) + { + bufferedProducerTestConfig.InstrumentationKey = appInsightsKey; + bufferedProducerTestConfig.EventHubsConnectionString = eventHubsConnectionString; + + environment.TryGetValue(EnvironmentVariables.EventHubBufferedProducerTest, out bufferedProducerTestConfig.EventHub); + + if (opts.Interactive) + { + bufferedProducerTestConfig.EventHub = _promptForResources("Event Hub name", "buffered producer test", bufferedProducerTestConfig.EventHub); + } + + bufferedProducerTestConfig.ConcurrentSends = 1; + + var testRun = new BufferedProducerTest(bufferedProducerTestConfig); + testRunTasks.Add(testRun.Run()); + } + + // Run the burst buffered producer test + + if (burstBufferedProducerTestConfig.Run) + { + burstBufferedProducerTestConfig.InstrumentationKey = appInsightsKey; + burstBufferedProducerTestConfig.EventHubsConnectionString = eventHubsConnectionString; + + environment.TryGetValue(EnvironmentVariables.EventHubBurstBufferedProducerTest, out burstBufferedProducerTestConfig.EventHub); + + if (opts.Interactive) + { + burstBufferedProducerTestConfig.EventHub = _promptForResources("Event Hub name", "burst buffered producer test", burstBufferedProducerTestConfig.EventHub); + } + + burstBufferedProducerTestConfig.ConcurrentSends = 5; + burstBufferedProducerTestConfig.ProducerPublishingDelay = TimeSpan.FromMinutes(15); + + var testRun = new BufferedProducerTest(burstBufferedProducerTestConfig); + testRunTasks.Add(testRun.Run()); + } + + // Run the concurrent buffered producer test + + if (concurrentBufferedProducerTestConfig.Run) + { + concurrentBufferedProducerTestConfig.InstrumentationKey = appInsightsKey; + concurrentBufferedProducerTestConfig.EventHubsConnectionString = eventHubsConnectionString; + + environment.TryGetValue(EnvironmentVariables.EventHubConcurrentBufferedProducerTest, out concurrentBufferedProducerTestConfig.EventHub); + + if (opts.Interactive) + { + concurrentBufferedProducerTestConfig.EventHub = _promptForResources("Event Hub name", "concurrent buffered producer test", concurrentBufferedProducerTestConfig.EventHub); + } + + concurrentBufferedProducerTestConfig.ConcurrentSends = 5; + + var testRun = new BufferedProducerTest(concurrentBufferedProducerTestConfig); + testRunTasks.Add(testRun.Run()); + } + + // Wait for all test runs to complete + + await Task.WhenAll(testRunTasks).ConfigureAwait(false); + } + + /// + /// Prompts the user using the command line for resources if they have not been provided yet. + /// + /// + /// The name of the needed resource. + /// Which test(s) for which the resource is needed. + /// The current value of the resource. + /// + private static string _promptForResources(string resourceName, string testName, string currentValue) + { + // If the resource hasn't been provided already, wait for it to be provided through the CLI + + while (string.IsNullOrEmpty(currentValue)) + { + Console.Write($"Please provide the {resourceName} for {testName}: "); + currentValue = Console.ReadLine().Trim(); + } + return currentValue; + } + + /// + /// The available command line options that can be parsed. + /// + /// + private class Options + { + [Option('i', "interactive", HelpText = "Set up stress tests in interactive mode.")] + public bool Interactive { get; set; } + + [Option("all", HelpText = "Run all available tests.")] + public bool All { get; set; } + + [Option('t', "tests", HelpText = "Enter which tests to run.")] + public IEnumerable Tests { get; set; } + } + } +} \ No newline at end of file diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/stress/src/Scenarios/BufferedProducerTest.cs b/sdk/eventhub/Azure.Messaging.EventHubs/stress/src/Scenarios/BufferedProducerTest.cs new file mode 100644 index 0000000000000..b520dcdce8f42 --- /dev/null +++ b/sdk/eventhub/Azure.Messaging.EventHubs/stress/src/Scenarios/BufferedProducerTest.cs @@ -0,0 +1,100 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Diagnostics; +using System.IO; +using System.Linq; +using System.Text; +using System.Threading; +using System.Threading.Tasks; +using Azure.Core.Diagnostics; +using Azure.Messaging.EventHubs.Consumer; +using Azure.Messaging.EventHubs.Producer; +using Azure.Messaging.EventHubs.Tests; +using System.Diagnostics.Tracing; + +namespace Azure.Messaging.EventHubs.Stress +{ + internal class BufferedProducerTest + { + private Metrics _metrics; + private BufferedProducerTestConfig _testConfiguration; + + public BufferedProducerTest(BufferedProducerTestConfig testConfiguration) + { + _testConfiguration = testConfiguration; + _metrics = new Metrics(testConfiguration.InstrumentationKey); + } + + public async Task Run() + { + // Create a new metrics instance for the given test run. + _metrics = new Metrics(_testConfiguration.InstrumentationKey); + + // Set up cancellation token + using var enqueueingCancellationSource = new CancellationTokenSource(); + var runDuration = TimeSpan.FromHours(_testConfiguration.DurationInHours); + enqueueingCancellationSource.CancelAfter(runDuration); + + using var azureEventListener = new AzureEventSourceListener(SendHeardException, EventLevel.Error); + + var enqueuingTasks = default(IEnumerable); + + // Start two buffered producer background tasks + try + { + enqueuingTasks = Enumerable + .Range(0, 2) + .Select(_ => Task.Run(() => new BufferedPublisher(_testConfiguration, _metrics).Start(enqueueingCancellationSource.Token))) + .ToList(); + + // Periodically update garbage collection metrics + while (!enqueueingCancellationSource.Token.IsCancellationRequested) + { + UpdateEnvironmentStatistics(_metrics); + await Task.Delay(TimeSpan.FromMinutes(1), enqueueingCancellationSource.Token).ConfigureAwait(false); + } + + await Task.WhenAll(enqueuingTasks).ConfigureAwait(false); + } + catch (TaskCanceledException) + { + // No action needed. + } + catch (Exception ex) when + (ex is OutOfMemoryException + || ex is StackOverflowException + || ex is ThreadAbortException) + { + _metrics.Client.TrackException(ex); + Environment.FailFast(ex.Message); + } + catch (Exception ex) + { + _metrics.Client.TrackException(ex); + } + finally + { + // Flush and wait for all metrics to be aggregated and sent to application insights + _metrics.Client.Flush(); + await Task.Delay(60000).ConfigureAwait(false); + } + } + + private void UpdateEnvironmentStatistics(Metrics _metrics) + { + _metrics.Client.GetMetric(_metrics.GenerationZeroCollections).TrackValue(GC.CollectionCount(0)); + _metrics.Client.GetMetric(_metrics.GenerationOneCollections).TrackValue(GC.CollectionCount(1)); + _metrics.Client.GetMetric(_metrics.GenerationTwoCollections).TrackValue(GC.CollectionCount(2)); + } + + private void SendHeardException(EventWrittenEventArgs args, string level) + { + var output = args.ToString(); + _metrics.Client.TrackTrace($"EventWritten: {output} Level: {level}."); + } + } +} \ No newline at end of file diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/stress/src/Scenarios/EventProducerTest.cs b/sdk/eventhub/Azure.Messaging.EventHubs/stress/src/Scenarios/EventProducerTest.cs new file mode 100644 index 0000000000000..f0a5f46947c95 --- /dev/null +++ b/sdk/eventhub/Azure.Messaging.EventHubs/stress/src/Scenarios/EventProducerTest.cs @@ -0,0 +1,110 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Diagnostics; +using System.IO; +using System.Linq; +using System.Text; +using System.Threading; +using System.Threading.Tasks; +using Azure.Messaging.EventHubs.Consumer; +using Azure.Messaging.EventHubs.Producer; +using Microsoft.ApplicationInsights; +using Microsoft.ApplicationInsights.Extensibility; +using System.Diagnostics.Tracing; +using Azure.Core.Diagnostics; + +namespace Azure.Messaging.EventHubs.Stress +{ + internal class EventProducerTest + { + private Metrics _metrics; + private EventProducerTestConfig _testConfiguration; + + public EventProducerTest(EventProducerTestConfig testConfiguration) + { + _testConfiguration = testConfiguration; + _metrics = new Metrics(testConfiguration.InstrumentationKey); + } + + public async Task Run() + { + using var publishCancellationSource = new CancellationTokenSource(); + + var runDuration = TimeSpan.FromHours(_testConfiguration.DurationInHours); + publishCancellationSource.CancelAfter(runDuration); + + using var azureEventListener = new AzureEventSourceListener(SendHeardException, EventLevel.Error); + + var publishingTasks = default(IEnumerable); + + try + { + // Begin publishing events in the background. + + publishingTasks = Enumerable + .Range(0, 2) + .Select(_ => Task.Run(() => new Publisher(_testConfiguration, _metrics).Start(publishCancellationSource.Token))) + .ToList(); + + // Periodically update garbage collection metrics + while (!publishCancellationSource.Token.IsCancellationRequested) + { + UpdateEnvironmentStatistics(_metrics); + await Task.Delay(TimeSpan.FromMinutes(1), publishCancellationSource.Token).ConfigureAwait(false); + } + + await Task.WhenAll(publishingTasks).ConfigureAwait(false); + } + catch (TaskCanceledException) + { + // No action needed. + } + catch (Exception ex) when + (ex is OutOfMemoryException + || ex is StackOverflowException + || ex is ThreadAbortException) + { + throw; + } + catch (Exception ex) + { + _metrics.Client.TrackException(ex); + } + + // The run is ending. Clean up the outstanding background operations. + + try + { + publishCancellationSource.Cancel(); + await Task.WhenAll(publishingTasks).ConfigureAwait(false); + } + catch (Exception ex) + { + _metrics.Client.TrackException(ex); + } + finally + { + // Flush and wait for all metrics to be aggregated and sent to application insights + _metrics.Client.Flush(); + await Task.Delay(60000).ConfigureAwait(false); + } + } + + private void UpdateEnvironmentStatistics(Metrics _metrics) + { + _metrics.Client.GetMetric(_metrics.GenerationZeroCollections).TrackValue(GC.CollectionCount(0)); + _metrics.Client.GetMetric(_metrics.GenerationOneCollections).TrackValue(GC.CollectionCount(1)); + _metrics.Client.GetMetric(_metrics.GenerationTwoCollections).TrackValue(GC.CollectionCount(2)); + } + + private void SendHeardException(EventWrittenEventArgs args, string level) + { + var output = args.ToString(); + _metrics.Client.TrackTrace($"EventWritten: {output} Level: {level}."); + } + } +} \ No newline at end of file diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/stress/stress-test-resources.bicep b/sdk/eventhub/Azure.Messaging.EventHubs/stress/stress-test-resources.bicep new file mode 100644 index 0000000000000..6690ad3c942e3 --- /dev/null +++ b/sdk/eventhub/Azure.Messaging.EventHubs/stress/stress-test-resources.bicep @@ -0,0 +1,93 @@ +@description('The url suffix to use when creating storage connection strings.') +param storageEndpointSuffix string = environment().suffixes.storage + +@description('The maximum duration, in minutes, that a single test is permitted to run before it is considered at-risk for being hung.') +param perTestExecutionLimitMinutes string = '15' + +@description('The client OID to grant access to test resources.') +param testApplicationOid string + +@description('The location of the resource group.') +param location string = resourceGroup().location + +var eventHubsNamespace_var = resourceGroup().name +var defaultSASKeyName = 'RootManageSharedAccessKey' +var eventHubsAuthRuleResourceId = resourceId('Microsoft.EventHub/namespaces/authorizationRules', eventHubsNamespace_var, defaultSASKeyName) + +var bufferedProducerEventHubName = 'bufferedproducertest' +var bufferproducerEventHubPartitions = 10 + +var eventProducerEventHubName = 'eventproducertest' +var eventProducerEventHubPartitions = 10 + +var burstBufferedProducerEventHubName = 'burstbufferedproducertest' +var burstBufferedProducerEventHubPartitions = 10 + +var concurrentBufferedProducerEventHubName = 'concurrentbufferedproducertest' +var concurrentBufferedProducerEventHubPartitions = 10 + +// Event Hubs Namespace Creation +resource eventHubsNamespace 'Microsoft.EventHub/Namespaces@2015-08-01' = { + name: eventHubsNamespace_var + location: location + sku: { + name: 'Standard' + tier: 'Standard' + } +} + +// Event Hubs Creation +resource eventHubBufferedProducer 'Microsoft.EventHub/namespaces/eventhubs@2021-11-01' = { + parent: eventHubsNamespace + name: bufferedProducerEventHubName + properties: { + messageRetentionInDays: 7 + partitionCount: bufferproducerEventHubPartitions + } +} + +resource eventHubEventProducer 'Microsoft.EventHub/namespaces/eventhubs@2021-11-01' = { + parent: eventHubsNamespace + name: eventProducerEventHubName + properties: { + messageRetentionInDays: 7 + partitionCount: eventProducerEventHubPartitions + } +} + +resource eventHubBurstBufferedProducer 'Microsoft.EventHub/namespaces/eventhubs@2021-11-01' = { + parent: eventHubsNamespace + name: burstBufferedProducerEventHubName + properties: { + messageRetentionInDays: 7 + partitionCount: burstBufferedProducerEventHubPartitions + } +} + +resource eventHubConcurrentBufferedProducer 'Microsoft.EventHub/namespaces/eventhubs@2021-11-01' = { + parent: eventHubsNamespace + name: concurrentBufferedProducerEventHubName + properties: { + messageRetentionInDays: 7 + partitionCount: concurrentBufferedProducerEventHubPartitions + } +} + +// Shared Resource output +output AZURE_CLIENT_OID string = testApplicationOid +output RESOURCE_GROUP string = resourceGroup().name +output EVENTHUB_STORAGE_ENDPOINT_SUFFIX string = storageEndpointSuffix +output EVENTHUB_PER_TEST_LIMIT_MINUTES string = perTestExecutionLimitMinutes +output EVENTHUB_NAMESPACE_CONNECTION_STRING string = listkeys(eventHubsAuthRuleResourceId, '2015-08-01').primaryConnectionString + +// Outputs for the BufferedProducerTest scenario +output EVENTHUB_NAME_BUFFERED_PRODUCER_TEST string = eventHubBufferedProducer.name + +// Outputs for the EventProducerTest +output EVENTHUB_NAME_EVENT_PRODUCER_TEST string = eventHubEventProducer.name + +// Outputs for the BurstBufferedProducerTest scenario +output EVENTHUB_NAME_BURST_BUFFERED_PRODUCER_TEST string = eventHubBurstBufferedProducer.name + +// Outputs for the ConcurrentBufferedProducerTest scenario +output EVENTHUB_NAME_CONCURRENT_BUFFERED_PRODUCER_TEST string = eventHubConcurrentBufferedProducer.name diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/stress/templates/deploy-job.yaml b/sdk/eventhub/Azure.Messaging.EventHubs/stress/templates/deploy-job.yaml new file mode 100644 index 0000000000000..e7694ad916e59 --- /dev/null +++ b/sdk/eventhub/Azure.Messaging.EventHubs/stress/templates/deploy-job.yaml @@ -0,0 +1,14 @@ +{{- include "stress-test-addons.deploy-job-template.from-pod" (list . "stress.event-hub") -}} +{{- define "stress.event-hub" -}} +metadata: + labels: + testName: "net-eh-stress-{{ .Scenario }}" + testInstance: "eventhubs-{{.Scenario }}-{{ .Release.Name }}-{{ .Release.Revision }}" +spec: + containers: + - name: net-eh-stress + image: {{ .Values.image }} + imagePullPolicy: Always + command: ['dotnet', 'Azure.Messaging.EventHubs.Stress.dll', '--tests', {{ .Scenario }}] + {{- include "stress-test-addons.container-env" . | nindent 6 }} +{{- end -}} diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/stress/values.yaml b/sdk/eventhub/Azure.Messaging.EventHubs/stress/values.yaml new file mode 100644 index 0000000000000..7f493eecc758a --- /dev/null +++ b/sdk/eventhub/Azure.Messaging.EventHubs/stress/values.yaml @@ -0,0 +1,5 @@ +scenarios: +- "EventProd" +- "BuffProd" +- "BurstBuffProd" +- "ConcurBuffProd" \ No newline at end of file