Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Event Hubs Client] Track Two (Test Stability) #9188

Merged
merged 1 commit into from
Dec 19, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ Task processErrorHandler(ProcessErrorEventArgs eventArgs)
// error be encountered, we'll also add a timed cancellation.

using var cancellationSource = new CancellationTokenSource();
cancellationSource.CancelAfter(TimeSpan.FromSeconds(30));
cancellationSource.CancelAfter(TimeSpan.FromSeconds(60));

while ((!cancellationSource.IsCancellationRequested) && (eventIndex <= expectedEvents.Count))
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ Task processErrorHandler(ProcessErrorEventArgs eventArgs)
// error be encountered, we'll also add a timed cancellation.

using var cancellationSource = new CancellationTokenSource();
cancellationSource.CancelAfter(TimeSpan.FromSeconds(30));
cancellationSource.CancelAfter(TimeSpan.FromSeconds(60));

while ((!cancellationSource.IsCancellationRequested) && (eventIndex <= expectedEvents.Count))
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ Task processErrorHandler(ProcessErrorEventArgs eventArgs)
// cancellation.

using var cancellationSource = new CancellationTokenSource();
cancellationSource.CancelAfter(TimeSpan.FromSeconds(12));
cancellationSource.CancelAfter(TimeSpan.FromSeconds(30));

while (!cancellationSource.IsCancellationRequested)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ Task processErrorHandler(ProcessErrorEventArgs eventArgs)
// error be encountered, we'll also add a timed cancellation.

using var cancellationSource = new CancellationTokenSource();
cancellationSource.CancelAfter(TimeSpan.FromSeconds(30));
cancellationSource.CancelAfter(TimeSpan.FromSeconds(60));

while ((!cancellationSource.IsCancellationRequested) && (eventsProcessed <= expectedEvents.Count))
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ Task processErrorHandler(ProcessErrorEventArgs eventArgs)
await processor.StartProcessingAsync();

using var cancellationSource = new CancellationTokenSource();
cancellationSource.CancelAfter(TimeSpan.FromSeconds(45));
cancellationSource.CancelAfter(TimeSpan.FromSeconds(90));

// We'll publish a batch of events for our processor to receive. We'll split the events into a couple of batches to
// increase the chance they'll be spread around to different partitions and introduce a delay between batches to
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ Task processErrorHandler(ProcessErrorEventArgs eventArgs)
await processor.StartProcessingAsync();

using var cancellationSource = new CancellationTokenSource();
cancellationSource.CancelAfter(TimeSpan.FromSeconds(45));
cancellationSource.CancelAfter(TimeSpan.FromSeconds(90));

// We'll publish a batch of events for our processor to receive. We'll split the events into a couple of batches to
// increase the chance they'll be spread around to different partitions and introduce a delay between batches to
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public async Task RunAsync(string connectionString,
// safe.

using CancellationTokenSource cancellationSource = new CancellationTokenSource();
cancellationSource.CancelAfter(TimeSpan.FromSeconds(30));
cancellationSource.CancelAfter(TimeSpan.FromSeconds(90));

int eventsRead = 0;
int maximumEvents = 3;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public async Task RunAsync(string connectionString,
// in the event of a service error where the events we've published cannot be read.

using CancellationTokenSource cancellationSource = new CancellationTokenSource();
cancellationSource.CancelAfter(TimeSpan.FromSeconds(30));
cancellationSource.CancelAfter(TimeSpan.FromSeconds(90));

// The reading of all events will default to the earliest events available in each partition; in order to begin reading at the
// latest event, we'll need to specify that reading should not start at earliest.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ public async Task RunAsync(string connectionString,
// in the event of a service error where the events we've published cannot be read.

using CancellationTokenSource cancellationSource = new CancellationTokenSource();
cancellationSource.CancelAfter(TimeSpan.FromSeconds(30));
cancellationSource.CancelAfter(TimeSpan.FromSeconds(60));

List<EventData> receivedEvents = new List<EventData>();
bool wereEventsPublished = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ public async Task RunAsync(string fullyQualifiedNamespace,
// safe.

using CancellationTokenSource cancellationSource = new CancellationTokenSource();
cancellationSource.CancelAfter(TimeSpan.FromSeconds(30));
cancellationSource.CancelAfter(TimeSpan.FromSeconds(60));

await foreach (PartitionEvent partitionEvent in consumerClient.ReadEventsAsync(cancellationSource.Token))
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ public void ReadLastEnqueuedEventPropertiesDelegatesToTheConsumer()
Assert.That(information.Offset, Is.EqualTo(lastEvent.LastPartitionOffset), "The offset should match.");
Assert.That(information.EnqueuedTime, Is.EqualTo(lastEvent.LastPartitionEnqueuedTime), "The last enqueue time should match.");
Assert.That(information.LastReceivedTime, Is.EqualTo(lastEvent.LastPartitionPropertiesRetrievalTime), "The retrieval time should match.");
Assert.That(mockConsumer.IsClosed, Is.False, "The consumer should not have been closed or disposed of.");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to assert that? It doesn't feel like something we should worry about in this scenario.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There was previously a difficult-to-detect issue where garbage collection was kicking in and the test was failing when the run was slow. Adding this check here is mostly unnecessary other than it offers an active reference to the variable, preventing the compiler from optimizing such that it may be collected.

I had originally left a comment with some silly does-nothing property access outside of an assert, but that looked and felt odd.

}

/// <summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1956,7 +1956,7 @@ public async Task ConsumerCannotReadEventsSentToAnotherPartition()

var receivedEvents = new List<EventData>();
var wereEventsPublished = false;
var maximumConsecutiveEmpties = 5;
var maximumConsecutiveEmpties = 10;
var consecutiveEmpties = 0;

await foreach (var partitionEvent in consumer.ReadEventsFromPartitionAsync(partitionIds[1], EventPosition.Latest, DefaultReadOptions, cancellationSource.Token))
Expand Down Expand Up @@ -2030,7 +2030,7 @@ public async Task ConsumersInDifferentConsumerGroupsShouldAllReadEvents()

// Read back the events from two different consumer groups.

var maximumConsecutiveEmpties = 5;
var maximumConsecutiveEmpties = 10;
var consecutiveEmpties = 0;
var consumerReceivedEvents = new List<EventData>();
var anotherReceivedEvents = new List<EventData>();
Expand Down Expand Up @@ -2116,7 +2116,6 @@ public async Task ReadStopsWhenMaximumWaitTimeIsReached(int maximumWaitTimeInSec
/// </summary>
///
[Test]
[Ignore("Consistently failing on Linux and macOS. To be fixed with the upcoming test stabilization.")]
public async Task ConsumerCannotReadWhenProxyIsInvalid()
{
await using (EventHubScope scope = await EventHubScope.CreateAsync(1))
Expand All @@ -2140,7 +2139,7 @@ public async Task ConsumerCannotReadWhenProxyIsInvalid()

await using (var invalidProxyConsumer = new EventHubConsumerClient(EventHubConsumerClient.DefaultConsumerGroupName, connectionString, options))
{
var readOptions = new ReadEventOptions { MaximumWaitTime = TimeSpan.FromMilliseconds(250) };
var readOptions = new ReadEventOptions { MaximumWaitTime = null };
Copy link
Member Author

@jsquire jsquire Dec 18, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Behavior on Linux and macOS is different for this scenario than it is on Windows. They do not immediately terminate with a web socket exception, but continue to attempt to connect until the timeout is exhausted. The previous setting was causing the test to terminate before the timeout period. Since it is not important that the read pulls events, setting no wait time and allowing it to block until the timeout period for connections (60 seconds) expires correctly executes on all platforms.

Note that the cancellation token continues to prevent stalling by allowing for twice the connection timeout, just in case.

Assert.That(async () => await ReadNothingAsync(invalidProxyConsumer, partition, EventPosition.Latest, readOptions, 25), Throws.InstanceOf<WebSocketException>().Or.InstanceOf<TimeoutException>());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1779,7 +1779,22 @@ public async Task ReadEventsAsyncRespectsWaitTimeWhenPublishingEvents()

Assert.That(cancellation.IsCancellationRequested, Is.False, "The iteration should have completed normally.");
Assert.That(receivedEvents.Count, Is.AtLeast(events.Count + 1).And.LessThanOrEqualTo(events.Count * thresholdModifier), "There should be empty events present due to the wait time.");
Assert.That(receivedEvents.Where(item => item != null), Is.EquivalentTo(events), "The received events should match the published events when empty events are removed.");
Assert.That(receivedEvents.Where(item => item != null).Count(), Is.EqualTo(events.Count), "The received event count should match the published events when empty events are removed.");

// Validate that each message received appeared in the source set once.

var sourceEventMessages = new HashSet<string>();

foreach (var message in events.Select(item => Encoding.UTF8.GetString(item.Body.ToArray())))
{
sourceEventMessages.Add(message);
}

foreach (var receivedMessage in receivedEvents.Where(item => item != null).Select(item => Encoding.UTF8.GetString(item.Body.ToArray())))
{
Assert.That(sourceEventMessages.Contains(receivedMessage), $"The message: { receivedEvents } was not in the source set or has appeared more than oncesss.");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: oncesss => once

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm going to take a note and defer this in order to avoid pushing just this change.

sourceEventMessages.Remove(receivedMessage);
}
}

/// <summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -856,7 +856,7 @@ public async Task ProducerDoesNotSendToSpecificPartitionWhenPartitionIdIsNotSpec
{
var receivedEvents = new List<EventData>();
var consecutiveEmpties = 0;
var maximumConsecutiveEmpties = 5;
var maximumConsecutiveEmpties = 10;

await foreach (var partitionEvent in consumer.ReadEventsFromPartitionAsync(partition, EventPosition.Earliest, DefaultReadOptions, cancellationSource.Token))
{
Expand Down Expand Up @@ -923,7 +923,7 @@ public async Task ProducerSendsEventsInTheSameSetToTheSamePartition()
{
var receivedEvents = new List<EventData>();
var consecutiveEmpties = 0;
var maximumConsecutiveEmpties = 5;
var maximumConsecutiveEmpties = 10;

await foreach (var partitionEvent in consumer.ReadEventsFromPartitionAsync(partition, EventPosition.Earliest, DefaultReadOptions, cancellationSource.Token))
{
Expand Down Expand Up @@ -993,7 +993,7 @@ public async Task ProducerSendsEventsWithTheSamePartitionHashKeyToTheSamePartiti
{
var receivedEvents = new List<EventData>();
var consecutiveEmpties = 0;
var maximumConsecutiveEmpties = 5;
var maximumConsecutiveEmpties = 10;

await foreach (var partitionEvent in consumer.ReadEventsFromPartitionAsync(partition, EventPosition.Earliest, DefaultReadOptions, cancellationSource.Token))
{
Expand Down