From d8c7e9b8a508b75df47811b9c0b161eda7745d98 Mon Sep 17 00:00:00 2001 From: Jesse Squire Date: Tue, 17 Dec 2019 15:23:43 -0500 Subject: [PATCH] [Event Hubs Client] Track Two (Test Stability) The focus of these changes is to tweak some of the timings and loosen some assumptions around the non-deterministic areas of the tests to offer better stability in the nightly runs, where there is a higher resource contention and timing tends to vary more than with local runs. --- .../samples/Sample03_BasicEventProcessing.cs | 2 +- .../samples/Sample04_BasicCheckpointing.cs | 2 +- .../samples/Sample05_InitializeAPartition.cs | 2 +- .../Sample06_TrackWhenAPartitionIsClosed.cs | 2 +- .../Sample08_EventProcessingHeartbeat.cs | 2 +- .../samples/Sample09_ProcessEventsByBatch.cs | 2 +- .../samples/Sample04_ReadEvents.cs | 2 +- .../samples/Sample08_ReadOnlyNewEvents.cs | 2 +- .../Sample09_ReadEventsFromAKnownPosition.cs | 2 +- ...11_AuthenticateWithClientSecretCredential.cs | 2 +- .../tests/Core/PartitionContextTests.cs | 1 + .../EventHubConsumerClientLiveTests.cs | 7 +++---- .../EventHubConsumerClientTests.cs | 17 ++++++++++++++++- .../EventHubProducerClientLiveTests.cs | 6 +++--- 14 files changed, 33 insertions(+), 18 deletions(-) diff --git a/sdk/eventhub/Azure.Messaging.EventHubs.Processor/samples/Sample03_BasicEventProcessing.cs b/sdk/eventhub/Azure.Messaging.EventHubs.Processor/samples/Sample03_BasicEventProcessing.cs index 92ec5978feca2..8a837526c61c6 100755 --- a/sdk/eventhub/Azure.Messaging.EventHubs.Processor/samples/Sample03_BasicEventProcessing.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs.Processor/samples/Sample03_BasicEventProcessing.cs @@ -209,7 +209,7 @@ Task processErrorHandler(ProcessErrorEventArgs eventArgs) // error be encountered, we'll also add a timed cancellation. using var cancellationSource = new CancellationTokenSource(); - cancellationSource.CancelAfter(TimeSpan.FromSeconds(30)); + cancellationSource.CancelAfter(TimeSpan.FromSeconds(60)); while ((!cancellationSource.IsCancellationRequested) && (eventIndex <= expectedEvents.Count)) { diff --git a/sdk/eventhub/Azure.Messaging.EventHubs.Processor/samples/Sample04_BasicCheckpointing.cs b/sdk/eventhub/Azure.Messaging.EventHubs.Processor/samples/Sample04_BasicCheckpointing.cs index 9377113a136dc..5d6de3e548ee0 100755 --- a/sdk/eventhub/Azure.Messaging.EventHubs.Processor/samples/Sample04_BasicCheckpointing.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs.Processor/samples/Sample04_BasicCheckpointing.cs @@ -204,7 +204,7 @@ Task processErrorHandler(ProcessErrorEventArgs eventArgs) // error be encountered, we'll also add a timed cancellation. using var cancellationSource = new CancellationTokenSource(); - cancellationSource.CancelAfter(TimeSpan.FromSeconds(30)); + cancellationSource.CancelAfter(TimeSpan.FromSeconds(60)); while ((!cancellationSource.IsCancellationRequested) && (eventIndex <= expectedEvents.Count)) { diff --git a/sdk/eventhub/Azure.Messaging.EventHubs.Processor/samples/Sample05_InitializeAPartition.cs b/sdk/eventhub/Azure.Messaging.EventHubs.Processor/samples/Sample05_InitializeAPartition.cs index 5bbdcc77fd5ad..cbdda6cd1fa0d 100755 --- a/sdk/eventhub/Azure.Messaging.EventHubs.Processor/samples/Sample05_InitializeAPartition.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs.Processor/samples/Sample05_InitializeAPartition.cs @@ -144,7 +144,7 @@ Task processErrorHandler(ProcessErrorEventArgs eventArgs) // cancellation. using var cancellationSource = new CancellationTokenSource(); - cancellationSource.CancelAfter(TimeSpan.FromSeconds(12)); + cancellationSource.CancelAfter(TimeSpan.FromSeconds(30)); while (!cancellationSource.IsCancellationRequested) { diff --git a/sdk/eventhub/Azure.Messaging.EventHubs.Processor/samples/Sample06_TrackWhenAPartitionIsClosed.cs b/sdk/eventhub/Azure.Messaging.EventHubs.Processor/samples/Sample06_TrackWhenAPartitionIsClosed.cs index 1d0acf89edf68..0f2ba2615b9aa 100755 --- a/sdk/eventhub/Azure.Messaging.EventHubs.Processor/samples/Sample06_TrackWhenAPartitionIsClosed.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs.Processor/samples/Sample06_TrackWhenAPartitionIsClosed.cs @@ -232,7 +232,7 @@ Task processErrorHandler(ProcessErrorEventArgs eventArgs) // error be encountered, we'll also add a timed cancellation. using var cancellationSource = new CancellationTokenSource(); - cancellationSource.CancelAfter(TimeSpan.FromSeconds(30)); + cancellationSource.CancelAfter(TimeSpan.FromSeconds(60)); while ((!cancellationSource.IsCancellationRequested) && (eventsProcessed <= expectedEvents.Count)) { diff --git a/sdk/eventhub/Azure.Messaging.EventHubs.Processor/samples/Sample08_EventProcessingHeartbeat.cs b/sdk/eventhub/Azure.Messaging.EventHubs.Processor/samples/Sample08_EventProcessingHeartbeat.cs index cb34cc5fe9560..643f291050346 100755 --- a/sdk/eventhub/Azure.Messaging.EventHubs.Processor/samples/Sample08_EventProcessingHeartbeat.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs.Processor/samples/Sample08_EventProcessingHeartbeat.cs @@ -134,7 +134,7 @@ Task processErrorHandler(ProcessErrorEventArgs eventArgs) await processor.StartProcessingAsync(); using var cancellationSource = new CancellationTokenSource(); - cancellationSource.CancelAfter(TimeSpan.FromSeconds(45)); + cancellationSource.CancelAfter(TimeSpan.FromSeconds(90)); // We'll publish a batch of events for our processor to receive. We'll split the events into a couple of batches to // increase the chance they'll be spread around to different partitions and introduce a delay between batches to diff --git a/sdk/eventhub/Azure.Messaging.EventHubs.Processor/samples/Sample09_ProcessEventsByBatch.cs b/sdk/eventhub/Azure.Messaging.EventHubs.Processor/samples/Sample09_ProcessEventsByBatch.cs index b3419791b46f7..cc9853ea23737 100755 --- a/sdk/eventhub/Azure.Messaging.EventHubs.Processor/samples/Sample09_ProcessEventsByBatch.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs.Processor/samples/Sample09_ProcessEventsByBatch.cs @@ -180,7 +180,7 @@ Task processErrorHandler(ProcessErrorEventArgs eventArgs) await processor.StartProcessingAsync(); using var cancellationSource = new CancellationTokenSource(); - cancellationSource.CancelAfter(TimeSpan.FromSeconds(45)); + cancellationSource.CancelAfter(TimeSpan.FromSeconds(90)); // We'll publish a batch of events for our processor to receive. We'll split the events into a couple of batches to // increase the chance they'll be spread around to different partitions and introduce a delay between batches to diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/samples/Sample04_ReadEvents.cs b/sdk/eventhub/Azure.Messaging.EventHubs/samples/Sample04_ReadEvents.cs index b13fd58399859..9248703b6404d 100755 --- a/sdk/eventhub/Azure.Messaging.EventHubs/samples/Sample04_ReadEvents.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs/samples/Sample04_ReadEvents.cs @@ -81,7 +81,7 @@ public async Task RunAsync(string connectionString, // safe. using CancellationTokenSource cancellationSource = new CancellationTokenSource(); - cancellationSource.CancelAfter(TimeSpan.FromSeconds(30)); + cancellationSource.CancelAfter(TimeSpan.FromSeconds(90)); int eventsRead = 0; int maximumEvents = 3; diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/samples/Sample08_ReadOnlyNewEvents.cs b/sdk/eventhub/Azure.Messaging.EventHubs/samples/Sample08_ReadOnlyNewEvents.cs index 814241472eb20..fa08ce4744925 100755 --- a/sdk/eventhub/Azure.Messaging.EventHubs/samples/Sample08_ReadOnlyNewEvents.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs/samples/Sample08_ReadOnlyNewEvents.cs @@ -77,7 +77,7 @@ public async Task RunAsync(string connectionString, // in the event of a service error where the events we've published cannot be read. using CancellationTokenSource cancellationSource = new CancellationTokenSource(); - cancellationSource.CancelAfter(TimeSpan.FromSeconds(30)); + cancellationSource.CancelAfter(TimeSpan.FromSeconds(90)); // The reading of all events will default to the earliest events available in each partition; in order to begin reading at the // latest event, we'll need to specify that reading should not start at earliest. diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/samples/Sample09_ReadEventsFromAKnownPosition.cs b/sdk/eventhub/Azure.Messaging.EventHubs/samples/Sample09_ReadEventsFromAKnownPosition.cs index 487b41e9d8e6a..ec886b3a93023 100755 --- a/sdk/eventhub/Azure.Messaging.EventHubs/samples/Sample09_ReadEventsFromAKnownPosition.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs/samples/Sample09_ReadEventsFromAKnownPosition.cs @@ -87,7 +87,7 @@ public async Task RunAsync(string connectionString, // in the event of a service error where the events we've published cannot be read. using CancellationTokenSource cancellationSource = new CancellationTokenSource(); - cancellationSource.CancelAfter(TimeSpan.FromSeconds(30)); + cancellationSource.CancelAfter(TimeSpan.FromSeconds(60)); List receivedEvents = new List(); bool wereEventsPublished = false; diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/samples/Sample11_AuthenticateWithClientSecretCredential.cs b/sdk/eventhub/Azure.Messaging.EventHubs/samples/Sample11_AuthenticateWithClientSecretCredential.cs index 456ccf014b953..e67be77ea6c65 100755 --- a/sdk/eventhub/Azure.Messaging.EventHubs/samples/Sample11_AuthenticateWithClientSecretCredential.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs/samples/Sample11_AuthenticateWithClientSecretCredential.cs @@ -94,7 +94,7 @@ public async Task RunAsync(string fullyQualifiedNamespace, // safe. using CancellationTokenSource cancellationSource = new CancellationTokenSource(); - cancellationSource.CancelAfter(TimeSpan.FromSeconds(30)); + cancellationSource.CancelAfter(TimeSpan.FromSeconds(60)); await foreach (PartitionEvent partitionEvent in consumerClient.ReadEventsAsync(cancellationSource.Token)) { diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/tests/Core/PartitionContextTests.cs b/sdk/eventhub/Azure.Messaging.EventHubs/tests/Core/PartitionContextTests.cs index 403f8149e343b..8e79609999668 100755 --- a/sdk/eventhub/Azure.Messaging.EventHubs/tests/Core/PartitionContextTests.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs/tests/Core/PartitionContextTests.cs @@ -69,6 +69,7 @@ public void ReadLastEnqueuedEventPropertiesDelegatesToTheConsumer() Assert.That(information.Offset, Is.EqualTo(lastEvent.LastPartitionOffset), "The offset should match."); Assert.That(information.EnqueuedTime, Is.EqualTo(lastEvent.LastPartitionEnqueuedTime), "The last enqueue time should match."); Assert.That(information.LastReceivedTime, Is.EqualTo(lastEvent.LastPartitionPropertiesRetrievalTime), "The retrieval time should match."); + Assert.That(mockConsumer.IsClosed, Is.False, "The consumer should not have been closed or disposed of."); } /// diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/tests/EventHubConsumerClient/EventHubConsumerClientLiveTests.cs b/sdk/eventhub/Azure.Messaging.EventHubs/tests/EventHubConsumerClient/EventHubConsumerClientLiveTests.cs index 8cf1848cbacd9..a453c63a0ebd2 100755 --- a/sdk/eventhub/Azure.Messaging.EventHubs/tests/EventHubConsumerClient/EventHubConsumerClientLiveTests.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs/tests/EventHubConsumerClient/EventHubConsumerClientLiveTests.cs @@ -1956,7 +1956,7 @@ public async Task ConsumerCannotReadEventsSentToAnotherPartition() var receivedEvents = new List(); var wereEventsPublished = false; - var maximumConsecutiveEmpties = 5; + var maximumConsecutiveEmpties = 10; var consecutiveEmpties = 0; await foreach (var partitionEvent in consumer.ReadEventsFromPartitionAsync(partitionIds[1], EventPosition.Latest, DefaultReadOptions, cancellationSource.Token)) @@ -2030,7 +2030,7 @@ public async Task ConsumersInDifferentConsumerGroupsShouldAllReadEvents() // Read back the events from two different consumer groups. - var maximumConsecutiveEmpties = 5; + var maximumConsecutiveEmpties = 10; var consecutiveEmpties = 0; var consumerReceivedEvents = new List(); var anotherReceivedEvents = new List(); @@ -2116,7 +2116,6 @@ public async Task ReadStopsWhenMaximumWaitTimeIsReached(int maximumWaitTimeInSec /// /// [Test] - [Ignore("Consistently failing on Linux and macOS. To be fixed with the upcoming test stabilization.")] public async Task ConsumerCannotReadWhenProxyIsInvalid() { await using (EventHubScope scope = await EventHubScope.CreateAsync(1)) @@ -2140,7 +2139,7 @@ public async Task ConsumerCannotReadWhenProxyIsInvalid() await using (var invalidProxyConsumer = new EventHubConsumerClient(EventHubConsumerClient.DefaultConsumerGroupName, connectionString, options)) { - var readOptions = new ReadEventOptions { MaximumWaitTime = TimeSpan.FromMilliseconds(250) }; + var readOptions = new ReadEventOptions { MaximumWaitTime = null }; Assert.That(async () => await ReadNothingAsync(invalidProxyConsumer, partition, EventPosition.Latest, readOptions, 25), Throws.InstanceOf().Or.InstanceOf()); } } diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/tests/EventHubConsumerClient/EventHubConsumerClientTests.cs b/sdk/eventhub/Azure.Messaging.EventHubs/tests/EventHubConsumerClient/EventHubConsumerClientTests.cs index dac8075f253fd..de3425b38f011 100755 --- a/sdk/eventhub/Azure.Messaging.EventHubs/tests/EventHubConsumerClient/EventHubConsumerClientTests.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs/tests/EventHubConsumerClient/EventHubConsumerClientTests.cs @@ -1779,7 +1779,22 @@ public async Task ReadEventsAsyncRespectsWaitTimeWhenPublishingEvents() Assert.That(cancellation.IsCancellationRequested, Is.False, "The iteration should have completed normally."); Assert.That(receivedEvents.Count, Is.AtLeast(events.Count + 1).And.LessThanOrEqualTo(events.Count * thresholdModifier), "There should be empty events present due to the wait time."); - Assert.That(receivedEvents.Where(item => item != null), Is.EquivalentTo(events), "The received events should match the published events when empty events are removed."); + Assert.That(receivedEvents.Where(item => item != null).Count(), Is.EqualTo(events.Count), "The received event count should match the published events when empty events are removed."); + + // Validate that each message received appeared in the source set once. + + var sourceEventMessages = new HashSet(); + + foreach (var message in events.Select(item => Encoding.UTF8.GetString(item.Body.ToArray()))) + { + sourceEventMessages.Add(message); + } + + foreach (var receivedMessage in receivedEvents.Where(item => item != null).Select(item => Encoding.UTF8.GetString(item.Body.ToArray()))) + { + Assert.That(sourceEventMessages.Contains(receivedMessage), $"The message: { receivedEvents } was not in the source set or has appeared more than oncesss."); + sourceEventMessages.Remove(receivedMessage); + } } /// diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/tests/EventHubProducerClient/EventHubProducerClientLiveTests.cs b/sdk/eventhub/Azure.Messaging.EventHubs/tests/EventHubProducerClient/EventHubProducerClientLiveTests.cs index 400b6cc75947b..c8bc68818c9ff 100755 --- a/sdk/eventhub/Azure.Messaging.EventHubs/tests/EventHubProducerClient/EventHubProducerClientLiveTests.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs/tests/EventHubProducerClient/EventHubProducerClientLiveTests.cs @@ -856,7 +856,7 @@ public async Task ProducerDoesNotSendToSpecificPartitionWhenPartitionIdIsNotSpec { var receivedEvents = new List(); var consecutiveEmpties = 0; - var maximumConsecutiveEmpties = 5; + var maximumConsecutiveEmpties = 10; await foreach (var partitionEvent in consumer.ReadEventsFromPartitionAsync(partition, EventPosition.Earliest, DefaultReadOptions, cancellationSource.Token)) { @@ -923,7 +923,7 @@ public async Task ProducerSendsEventsInTheSameSetToTheSamePartition() { var receivedEvents = new List(); var consecutiveEmpties = 0; - var maximumConsecutiveEmpties = 5; + var maximumConsecutiveEmpties = 10; await foreach (var partitionEvent in consumer.ReadEventsFromPartitionAsync(partition, EventPosition.Earliest, DefaultReadOptions, cancellationSource.Token)) { @@ -993,7 +993,7 @@ public async Task ProducerSendsEventsWithTheSamePartitionHashKeyToTheSamePartiti { var receivedEvents = new List(); var consecutiveEmpties = 0; - var maximumConsecutiveEmpties = 5; + var maximumConsecutiveEmpties = 10; await foreach (var partitionEvent in consumer.ReadEventsFromPartitionAsync(partition, EventPosition.Earliest, DefaultReadOptions, cancellationSource.Token)) {