Skip to content

Commit

Permalink
[Event Hubs Client] Test Adjustments (Azure#11850)
Browse files Browse the repository at this point in the history
The focus of these changes is to address a race condition that was causing
one test to intermittently hang curing CI runs and to make small adjustments
for improving stability of tests that have seen intermittent issues during
nightly runs.
  • Loading branch information
jsquire authored May 7, 2020
1 parent 0ea8e2d commit 53ecaba
Show file tree
Hide file tree
Showing 6 changed files with 53 additions and 47 deletions.
2 changes: 1 addition & 1 deletion sdk/eventhub/Azure.Messaging.EventHubs/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
Thank you to our developer community members who helped to make the Event Hubs client libraries better with their contributions to this release:

- Alberto De Natale _([GitHub](https://github.com/albertodenatale))_
- Daniel Marbach _([GitHub]((https://github.com/danielmarbach)))_
- Daniel Marbach _([GitHub](https://github.com/danielmarbach))_

### Changes

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1334,7 +1334,8 @@ private async Task<bool> TryStartProcessingPartitionAsync(string partitionId,
}
}

// Create and register the partition processor.
// Create and register the partition processor. Ownership of the cancellationSource is transferred
// to the processor upon creation, including the responsibility for disposal.

cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -730,7 +730,7 @@ public async Task ConsumerCannotReadWhenClosed()
cancellationSource.CancelAfter(EventHubsTestEnvironment.Instance.TestExecutionTimeLimit);

var connectionString = EventHubsTestEnvironment.Instance.BuildConnectionStringForEventHub(scope.EventHubName);
var sourceEvents = EventGenerator.CreateEvents(15).ToList();
var sourceEvents = EventGenerator.CreateEvents(25).ToList();

await using (var consumer = new EventHubConsumerClient(EventHubConsumerClient.DefaultConsumerGroupName, connectionString))
{
Expand All @@ -744,7 +744,7 @@ public async Task ConsumerCannotReadWhenClosed()

async Task<bool> closeAfterFiveRead(ReadState state)
{
if (state.Events.Count >= 5)
if (state.Events.Count >= 2)
{
await consumer.CloseAsync(cancellationSource.Token).ConfigureAwait(false);
}
Expand Down Expand Up @@ -1371,6 +1371,7 @@ await Task.WhenAll
// The consumer should be able to read events from another partition after being superseded. Start a new reader for the other partition,
// using the same lower level. Wait for both readers to complete and then signal for cancellation.

await Task.Delay(250);
var lowerReadState = await ReadEventsFromPartitionAsync(consumer, partitions[1], sourceEvents.Count, cancellationSource.Token, readOptions: lowerOptions);

await Task.WhenAny(higherMonitor.EndCompletion.Task, Task.Delay(Timeout.Infinite, cancellationSource.Token));
Expand Down Expand Up @@ -1414,7 +1415,7 @@ public async Task ConsumerRespectsTheWaitTimeWhenReading()

var readTime = TimeSpan
.FromSeconds(options.MaximumWaitTime.Value.TotalSeconds * desiredEmptyEvents)
.Add(TimeSpan.FromSeconds(2));
.Add(TimeSpan.FromSeconds(3));

// Attempt to read from the empty partition and verify that no events are observed. Because no events are expected, the
// read operation will not naturally complete; limit the read to only a couple of seconds and trigger cancellation.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -461,6 +461,9 @@ public async Task ReadEventsFromPartitionAsyncWithOptionsReturnsAnEnumerable()
[Test]
public void ReadEventsFromPartitionAsyncThrowsIfConsumerClosedBeforeRead()
{
using var cancellationSource = new CancellationTokenSource();
cancellationSource.CancelAfter(EventHubsTestEnvironment.Instance.TestExecutionTimeLimit);

var events = new List<EventData>
{
new EventData(Encoding.UTF8.GetBytes("One")),
Expand All @@ -473,28 +476,21 @@ public void ReadEventsFromPartitionAsyncThrowsIfConsumerClosedBeforeRead()
var transportConsumer = new PublishingTransportConsumerMock(events);
var mockConnection = new MockConnection(() => transportConsumer);
var consumer = new EventHubConsumerClient(EventHubConsumerClient.DefaultConsumerGroupName, mockConnection);
var receivedEvents = 0;

using var cancellation = new CancellationTokenSource();
cancellation.CancelAfter(250);
var receivedEvents = false;

Assert.That(async () =>
{
await consumer.CloseAsync(cancellation.Token);
await consumer.CloseAsync(cancellationSource.Token);

await foreach (PartitionEvent partitionEvent in consumer.ReadEventsFromPartitionAsync("0", EventPosition.FromOffset(12), cancellation.Token))
await foreach (PartitionEvent partitionEvent in consumer.ReadEventsFromPartitionAsync("0", EventPosition.FromOffset(12), cancellationSource.Token))
{
if (partitionEvent.Data == null)
{
break;
}

++receivedEvents;
receivedEvents = true;
break;
}
}, Throws.InstanceOf<EventHubsException>().And.Property(nameof(EventHubsException.Reason)).EqualTo(EventHubsException.FailureReason.ClientClosed), "The iterator should have indicated the consumer was closed.");

Assert.That(cancellation.IsCancellationRequested, Is.False, "The cancellation should not have been requested.");
Assert.That(receivedEvents, Is.EqualTo(0), "There should have been no events received.");
Assert.That(cancellationSource.IsCancellationRequested, Is.False, "The cancellation should not have been requested.");
Assert.That(receivedEvents, Is.False, "There should have been no events received.");
}

/// <summary>
Expand Down Expand Up @@ -1288,7 +1284,7 @@ public void ReadEventsAsyncThrowsIfConsumerClosedBeforeRead()
var receivedEvents = 0;

using var cancellation = new CancellationTokenSource();
cancellation.CancelAfter(250);
cancellation.CancelAfter(EventHubsTestEnvironment.Instance.TestExecutionTimeLimit);

Assert.That(async () =>
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -484,7 +484,7 @@ public async Task BackgroundProcessingStartsProcessingForClaimedPartitions()
var secondPartition = "15";
var partitionIds = new[] { "0", secondPartition, firstPartiton };
var ownedPartitions = new List<string> { firstPartiton };
var options = new EventProcessorOptions { LoadBalancingUpdateInterval = TimeSpan.FromMilliseconds(1) };
var options = new EventProcessorOptions { LoadBalancingUpdateInterval = TimeSpan.FromMilliseconds(5) };
var completionSource = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
var mockLogger = new Mock<EventHubsEventSource>();
var mockLoadBalancer = new Mock<PartitionLoadBalancer>();
Expand Down Expand Up @@ -562,33 +562,45 @@ public async Task BackgroundProcessingStartsProcessingForClaimedPartitions()
/// </summary>
///
[Test]
[Ignore("Intermittently hanging during CI runs. Tracked by #11731")]
[Timeout(300_000)] // TEMP: Using 5 minutes as an arbitrary safety timeout while troubleshooting a suspected test hang.
public async Task BackgroundProcessingStopsProcessingAllPartitionsWhenShutdown()
{
using var cancellationSource = new CancellationTokenSource();
cancellationSource.CancelAfter(EventHubsTestEnvironment.Instance.TestExecutionTimeLimit);

var stopCompletionIndex = -1L;
var expectedProcessingCalls = 2L;
var startProcessingCalls = 0L;
var stopProcessingCalls = 0L;
var firstPartiton = "27";
var secondPartition = "15";
var partitionIds = new[] { "0", secondPartition, firstPartiton };
var ownedPartitions = new List<string>();
var options = new EventProcessorOptions { LoadBalancingUpdateInterval = TimeSpan.FromMilliseconds(1) };
var options = new EventProcessorOptions { LoadBalancingUpdateInterval = TimeSpan.FromMilliseconds(15) };
var startCompletionSource = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
var stopCompletionSources = new[] { new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously), new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously) };
var stopCompletionTasks = stopCompletionSources.Select(source => source.Task);
var stopCompletionSource = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
var mockLogger = new Mock<EventHubsEventSource>();
var mockLoadBalancer = new Mock<PartitionLoadBalancer>();
var mockConnection = new Mock<EventHubConnection>();
var mockProcessor = new Mock<EventProcessor<EventProcessorPartition>>(65, "consumerGroup", "namespace", "eventHub", Mock.Of<TokenCredential>(), options, mockLoadBalancer.Object) { CallBase = true };

mockLogger
.Setup(log => log.EventProcessorPartitionProcessingStartComplete(It.IsAny<string>(), It.IsAny<string>(), It.IsAny<string>(), It.IsAny<string>()))
.Callback(() =>
{
if (Interlocked.Increment(ref startProcessingCalls) >= expectedProcessingCalls)
{
startCompletionSource.TrySetResult(true);
}
});

mockLogger
.Setup(log => log.EventProcessorPartitionProcessingStopComplete(It.IsAny<string>(), It.IsAny<string>(), It.IsAny<string>(), It.IsAny<string>()))
.Callback(() =>
{
var activeIndex = Interlocked.Increment(ref stopCompletionIndex);
stopCompletionSources[activeIndex].TrySetResult(true);
if (Interlocked.Increment(ref stopProcessingCalls) >= expectedProcessingCalls)
{
stopCompletionSource.TrySetResult(true);
}
});

mockLoadBalancer
Expand Down Expand Up @@ -623,19 +635,14 @@ public async Task BackgroundProcessingStopsProcessingAllPartitionsWhenShutdown()
.Setup(processor => processor.CreateConsumer(It.IsAny<string>(), It.IsAny<string>(), It.IsAny<EventPosition>(), It.IsAny<EventHubConnection>(), It.IsAny<EventProcessorOptions>()))
.Returns(Mock.Of<TransportConsumer>());

mockProcessor
.Setup(processor => processor.CreatePartitionProcessor(It.Is<EventProcessorPartition>(value => value.PartitionId == secondPartition), It.IsAny<EventPosition>(), It.IsAny<CancellationTokenSource>()))
.Callback(() => startCompletionSource.TrySetResult(true))
.CallBase();

await mockProcessor.Object.StartProcessingAsync(cancellationSource.Token);
Assert.That(mockProcessor.Object.Status, Is.EqualTo(EventProcessorStatus.Running), "The processor should have started.");

await Task.WhenAny(startCompletionSource.Task, Task.Delay(Timeout.Infinite, cancellationSource.Token));
Assert.That(cancellationSource.IsCancellationRequested, Is.False, "The cancellation token should not have been signaled.");

await mockProcessor.Object.StopProcessingAsync(cancellationSource.Token);
await Task.WhenAny(Task.WhenAll(stopCompletionTasks), Task.Delay(Timeout.Infinite, cancellationSource.Token));
await Task.WhenAny(stopCompletionSource.Task, Task.Delay(Timeout.Infinite, cancellationSource.Token));
Assert.That(cancellationSource.IsCancellationRequested, Is.False, "The cancellation token should not have been signaled.");

mockProcessor
Expand Down Expand Up @@ -686,7 +693,7 @@ public async Task BackgroundProcessingLogsHandlerErrorWhenPartitionProcessingSto
var partitionId = "27";
var partitionIds = new[] { "0", partitionId };
var ownedPartitions = new List<string>();
var options = new EventProcessorOptions { LoadBalancingUpdateInterval = TimeSpan.FromMilliseconds(1) };
var options = new EventProcessorOptions { LoadBalancingUpdateInterval = TimeSpan.FromMilliseconds(5) };
var startCompletionSource = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
var stopCompletionSource = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
var mockLogger = new Mock<EventHubsEventSource>();
Expand Down Expand Up @@ -795,7 +802,7 @@ public async Task BackgroundProcessingStopsProcessingForPartitionsWithLostOwners
var secondPartition = "15";
var partitionIds = new[] { "0", secondPartition, firstPartiton };
var ownedPartitions = new List<string> { firstPartiton };
var options = new EventProcessorOptions { LoadBalancingUpdateInterval = TimeSpan.FromMilliseconds(1) };
var options = new EventProcessorOptions { LoadBalancingUpdateInterval = TimeSpan.FromMilliseconds(5) };
var completionSource = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
var mockLogger = new Mock<EventHubsEventSource>();
var mockLoadBalancer = new Mock<PartitionLoadBalancer>();
Expand Down Expand Up @@ -887,7 +894,7 @@ public async Task BackgroundProcessingRestartsProcessingForFaultedPartitions()
var partitionId = "27";
var partitionIds = new[] { "0", "14", partitionId };
var ownedPartitions = new List<string> { partitionId };
var options = new EventProcessorOptions { LoadBalancingUpdateInterval = TimeSpan.FromMilliseconds(1) };
var options = new EventProcessorOptions { LoadBalancingUpdateInterval = TimeSpan.FromMilliseconds(5) };
var completionSource = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
var mockLoadBalancer = new Mock<PartitionLoadBalancer>();
var mockConnection = new Mock<EventHubConnection>();
Expand Down Expand Up @@ -965,7 +972,7 @@ public async Task BackgroundProcessingUsesCheckpointsWhenProcessingPartitions()
var partitionId = "27";
var partitionIds = new[] { "0", partitionId, "11" };
var ownedPartitions = new List<string> { partitionId };
var options = new EventProcessorOptions { LoadBalancingUpdateInterval = TimeSpan.FromMilliseconds(1) };
var options = new EventProcessorOptions { LoadBalancingUpdateInterval = TimeSpan.FromMilliseconds(5) };
var completionSource = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
var mockLoadBalancer = new Mock<PartitionLoadBalancer>();
var mockConnection = new Mock<EventHubConnection>();
Expand Down Expand Up @@ -1037,7 +1044,7 @@ public async Task BackgroundProcessingDelegatesInitializationWhenProcessingClaim
var partitionId = "27";
var partitionIds = new[] { "0", partitionId, "77" };
var ownedPartitions = new List<string> { partitionId };
var options = new EventProcessorOptions { LoadBalancingUpdateInterval = TimeSpan.FromMilliseconds(1) };
var options = new EventProcessorOptions { LoadBalancingUpdateInterval = TimeSpan.FromMilliseconds(5) };
var completionSource = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
var mockLoadBalancer = new Mock<PartitionLoadBalancer>();
var mockConnection = new Mock<EventHubConnection>();
Expand Down Expand Up @@ -1111,7 +1118,7 @@ public async Task BackgroundProcessingLogsWhenStartingToProcessClaimedPartitions
var partitionId = "27";
var partitionIds = new[] { "0", partitionId, "111" };
var ownedPartitions = new List<string> { partitionId };
var options = new EventProcessorOptions { LoadBalancingUpdateInterval = TimeSpan.FromMilliseconds(1) };
var options = new EventProcessorOptions { LoadBalancingUpdateInterval = TimeSpan.FromMilliseconds(5) };
var completionSource = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
var mockLogger = new Mock<EventHubsEventSource>();
var mockLoadBalancer = new Mock<PartitionLoadBalancer>();
Expand Down Expand Up @@ -1196,7 +1203,7 @@ public async Task BackgroundProcessingLogsWhenStartingToProcessClaimedPartitions
var partitionId = "27";
var partitionIds = new[] { "0", partitionId, "111" };
var ownedPartitions = new List<string> { partitionId };
var options = new EventProcessorOptions { LoadBalancingUpdateInterval = TimeSpan.FromMilliseconds(1) };
var options = new EventProcessorOptions { LoadBalancingUpdateInterval = TimeSpan.FromMilliseconds(5) };
var completionSource = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
var mockLogger = new Mock<EventHubsEventSource>();
var mockLoadBalancer = new Mock<PartitionLoadBalancer>();
Expand Down Expand Up @@ -1293,7 +1300,7 @@ public async Task BackgroundProcessingDispatchesExceptionsWhenStartingToProcessC
var partitionId = "27";
var partitionIds = new[] { "0", partitionId, "111" };
var ownedPartitions = new List<string> { partitionId };
var options = new EventProcessorOptions { LoadBalancingUpdateInterval = TimeSpan.FromMilliseconds(1) };
var options = new EventProcessorOptions { LoadBalancingUpdateInterval = TimeSpan.FromMilliseconds(5) };
var completionSource = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
var mockLogger = new Mock<EventHubsEventSource>();
var mockLoadBalancer = new Mock<PartitionLoadBalancer>();
Expand Down Expand Up @@ -1383,7 +1390,7 @@ public async Task BackgroundProcessingLogsWhenSurrenderingClaimedPartitions()
var partitionId = "27";
var partitionIds = new[] { "0", partitionId, "111" };
var ownedPartitions = new List<string> { partitionId };
var options = new EventProcessorOptions { LoadBalancingUpdateInterval = TimeSpan.FromMilliseconds(1) };
var options = new EventProcessorOptions { LoadBalancingUpdateInterval = TimeSpan.FromMilliseconds(5) };
var startCompletionSource = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
var stopCompletionSource = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
var mockLogger = new Mock<EventHubsEventSource>();
Expand Down Expand Up @@ -1469,7 +1476,7 @@ public async Task BackgroundProcessingDelegatesStopNotificationWhenSurrenderingC
var partitionId = "27";
var partitionIds = new[] { "0", partitionId, "111" };
var ownedPartitions = new List<string> { partitionId };
var options = new EventProcessorOptions { LoadBalancingUpdateInterval = TimeSpan.FromMilliseconds(1) };
var options = new EventProcessorOptions { LoadBalancingUpdateInterval = TimeSpan.FromMilliseconds(5) };
var startCompletionSource = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
var stopCompletionSource = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
var mockLogger = new Mock<EventHubsEventSource>();
Expand Down
Loading

0 comments on commit 53ecaba

Please sign in to comment.