Skip to content

Commit

Permalink
[Event Hubs Client] Processor Stop Aborts Links
Browse files Browse the repository at this point in the history
The focus of these changes is to detect that an event processor is attempting
to stop and force-close the AMQP links being used to read events.  This
should help to avoid pauses when stopping due to an active AMQP operation
in the transport consumer being blocked because no events were available.
  • Loading branch information
jsquire committed May 21, 2021
1 parent 2103c38 commit 2b179cf
Show file tree
Hide file tree
Showing 7 changed files with 321 additions and 3 deletions.
15 changes: 14 additions & 1 deletion sdk/eventhub/Azure.Messaging.EventHubs.Processor/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,19 @@

## 5.5.0-beta.1 (Unreleased)

## 5.5.0-beta.1 (Unreleased)

### Acknowledgments

Thank you to our developer community members who helped to make the Event Hubs client libraries better with their contributions to this release:

- Daniel Marbach _([GitHub](https://github.com/danielmarbach))_

### Changes

#### New Features

- When stopping, the `EventProcessorClient` will now attempt to force-close the connection to the Event Hubs service to abort in-process read operations blocked on their timeout. This should significantly help reduce the amount of time the processor takes to stop in many scenarios. _(Based on a community prototype contribution, courtesy of [danielmarbach](https://github.com/danielmarbach))_

## 5.4.1 (2021-05-11)

Expand Down Expand Up @@ -33,7 +46,7 @@ Thank you to our developer community members who helped to make the Event Hubs c

- The `EventProcessorClient` now supports shared key and shared access signature authentication using the `AzureNamedKeyCredential` and `AzureSasCredential` types in addition to the connection string. Use of the credential allows the shared key or SAS to be updated without the need to create a new processor.

- Multiple enhancements were made to the AMQP transport paths for reading events to reduce memory allocations and increase performance. (A community contribution, courtesy of _[danielmarbach](https://github.com/danielmarbach))_
- Multiple enhancements were made to the AMQP transport paths for reading events to reduce memory allocations and increase performance. _(A community contribution, courtesy of [danielmarbach](https://github.com/danielmarbach))_

#### Key Bug Fixes

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,11 @@
</ItemGroup>

<ItemGroup>
<PackageReference Include="Azure.Messaging.EventHubs" />
<!-- RESTORE PACKAGE REFERENCE FOR RELEASE -->
<!--PackageReference Include="Azure.Messaging.EventHubs" /-->
<ProjectReference Include="$(MSBuildThisFileDirectory)..\..\Azure.Messaging.EventHubs\src\Azure.Messaging.EventHubs.csproj" />
<!-- END RESTORE -->

<PackageReference Include="Azure.Core" />
<PackageReference Include="Azure.Storage.Blobs" />
<PackageReference Include="Microsoft.Azure.Amqp" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -710,6 +710,145 @@ public async Task ProcessorClientDetectsAnInvalidStorageContainer(bool async)
cancellationSource.Cancel();
}

/// <summary>
/// Verifies that the <see cref="EventProcessorClient" /> can stop when no events are
/// available to read.
/// </summary>
///
[Test]
[TestCase(true)]
[TestCase(false)]
public async Task ProcessorClientStopsWithoutWaitingForTimeoutWhenPartitionsAreEmpty(bool async)
{
// Setup the environment.

await using EventHubScope scope = await EventHubScope.CreateAsync(4);
var connectionString = EventHubsTestEnvironment.Instance.BuildConnectionStringForEventHub(scope.EventHubName);

using var cancellationSource = new CancellationTokenSource();
cancellationSource.CancelAfter(EventHubsTestEnvironment.Instance.TestExecutionTimeLimit);

// Send a single event.

var sentCount = await SendEvents(connectionString, EventGenerator.CreateEvents(1), cancellationSource.Token);
Assert.That(sentCount, Is.EqualTo(1), "A single event should have been sent.");

// Attempt to read events using the longest possible TryTimeout.

var options = new EventProcessorOptions { LoadBalancingStrategy = LoadBalancingStrategy.Greedy, MaximumWaitTime = null };
options.RetryOptions.TryTimeout = EventHubsTestEnvironment.Instance.TestExecutionTimeLimit.Add(TimeSpan.FromSeconds(30));

var processedEvents = new ConcurrentDictionary<string, EventData>();
var completionSource = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
var processor = CreateProcessor(scope.ConsumerGroups.First(), connectionString, options: options);

processor.ProcessErrorAsync += CreateAssertingErrorHandler();
processor.ProcessEventAsync += CreateEventTrackingHandler(sentCount, processedEvents, completionSource, cancellationSource.Token);

await processor.StartProcessingAsync(cancellationSource.Token);

// Once the event has confirmed to have been read, then at least one partition is owned. Any further
// receive attempts will block for the duration of the TryTimeout, which is set to the test execution limit.

await Task.WhenAny(completionSource.Task, Task.Delay(Timeout.Infinite, cancellationSource.Token));
Assert.That(cancellationSource.IsCancellationRequested, Is.False, "The cancellation token should not have been signaled.");

// Stopping should close the consumers used by the processor and allow completion before the
// receive timeout expires. If this isn't successful, the cancellation token will have been signaled.

if (async)
{
await processor.StopProcessingAsync(cancellationSource.Token);
}
else
{
processor.StopProcessing(cancellationSource.Token);
}

Assert.That(processor.IsRunning, Is.False, "The processor should have stopped.");
Assert.That(cancellationSource.IsCancellationRequested, Is.False, "The cancellation token should not have been signaled.");

cancellationSource.Cancel();
}

/// <summary>
/// Verifies that the <see cref="EventProcessorClient" /> can stop when no events are
/// available to read.
/// </summary>
///
[Test]
public async Task ProcessorClientCanRestartAfterStopping()
{
// Setup the environment.

await using EventHubScope scope = await EventHubScope.CreateAsync(4);
var connectionString = EventHubsTestEnvironment.Instance.BuildConnectionStringForEventHub(scope.EventHubName);

using var cancellationSource = new CancellationTokenSource();
cancellationSource.CancelAfter(EventHubsTestEnvironment.Instance.TestExecutionTimeLimit);

// Send a single event.

var sentCount = await SendEvents(connectionString, EventGenerator.CreateEvents(1), cancellationSource.Token);
Assert.That(sentCount, Is.EqualTo(1), "A single event should have been sent.");

// Attempt to read events using the longest possible TryTimeout.

var options = new EventProcessorOptions { LoadBalancingStrategy = LoadBalancingStrategy.Greedy, MaximumWaitTime = null };
options.RetryOptions.TryTimeout = EventHubsTestEnvironment.Instance.TestExecutionTimeLimit.Add(TimeSpan.FromSeconds(30));

var processedEvents = new ConcurrentDictionary<string, EventData>();
var completionSource = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
var processor = CreateProcessor(scope.ConsumerGroups.First(), connectionString, options: options);

var activeEventHandler = CreateEventTrackingHandler(sentCount, processedEvents, completionSource, cancellationSource.Token);
processor.ProcessEventAsync += activeEventHandler;

processor.ProcessErrorAsync += CreateAssertingErrorHandler();
await processor.StartProcessingAsync(cancellationSource.Token);

// Once the event has confirmed to have been read, then at least one partition is owned. Any further
// receive attempts will block for the duration of the TryTimeout, which is set to the test execution limit.

await Task.WhenAny(completionSource.Task, Task.Delay(Timeout.Infinite, cancellationSource.Token));
Assert.That(cancellationSource.IsCancellationRequested, Is.False, "The cancellation token should not have been signaled.");

// Stopping should close the consumers used by the processor and allow completion before the
// receive timeout expires. If this isn't successful, the cancellation token will have been signaled.

await processor.StopProcessingAsync(cancellationSource.Token);

Assert.That(processor.IsRunning, Is.False, "The processor should have stopped.");
Assert.That(cancellationSource.IsCancellationRequested, Is.False, "The cancellation token should not have been signaled.");

// Send another single event to prove restart was successful.

sentCount = await SendEvents(connectionString, EventGenerator.CreateEvents(1), cancellationSource.Token);
Assert.That(sentCount, Is.EqualTo(1), "A single event should have been sent.");

// Reset the event handler so that it uses a completion source that hasn't been signaled..

completionSource = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);

processor.ProcessEventAsync -= activeEventHandler;
processor.ProcessEventAsync += CreateEventTrackingHandler(sentCount, processedEvents, completionSource, cancellationSource.Token);

// Restart the processor and confirm that the event was read.

await processor.StartProcessingAsync(cancellationSource.Token);

await Task.WhenAny(completionSource.Task, Task.Delay(Timeout.Infinite, cancellationSource.Token));
Assert.That(cancellationSource.IsCancellationRequested, Is.False, "The cancellation token should not have been signaled.");

// Confirm that an event was read, then stop the processor.

await Task.WhenAny(completionSource.Task, Task.Delay(Timeout.Infinite, cancellationSource.Token));
Assert.That(cancellationSource.IsCancellationRequested, Is.False, "The cancellation token should not have been signaled.");

await processor.StopProcessingAsync().IgnoreExceptions();
cancellationSource.Cancel();
}

/// <summary>
/// Creates an <see cref="EventProcessorClient" /> that uses mock storage and
/// a connection based on a connection string.
Expand Down
13 changes: 12 additions & 1 deletion sdk/eventhub/Azure.Messaging.EventHubs/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,17 @@

## 5.5.0-beta.1 (Unreleased)

### Acknowledgments

Thank you to our developer community members who helped to make the Event Hubs client libraries better with their contributions to this release:

- Daniel Marbach _([GitHub](https://github.com/danielmarbach))_

### Changes

#### New Features

- When stopping, the `EventProcessor<TPartition>` will now attempt to force-close the connection to the Event Hubs service to abort in-process read operations blocked on their timeout. This should significantly help reduce the amount of time the processor takes to stop in many scenarios. _(Based on a community prototype contribution, courtesy of [danielmarbach](https://github.com/danielmarbach))_

## 5.4.1 (2021-05-11)

Expand Down Expand Up @@ -41,7 +52,7 @@ Thank you to our developer community members who helped to make the Event Hubs c

- The `SystemProperties` collection used by `EventData` will not use a shared empty set for events that have not been read from the Event Hubs service, reducing memory allocation.

- Multiple enhancements were made to the transport paths for publishing and reading events to reduce memory allocations and increase performance. (A community contribution, courtesy of _[danielmarbach](https://github.com/danielmarbach))_
- Multiple enhancements were made to the transport paths for publishing and reading events to reduce memory allocations and increase performance. _(A community contribution, courtesy of [danielmarbach](https://github.com/danielmarbach))_

#### Key Bug Fixes

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1071,6 +1071,28 @@ public virtual void IdempotentPublishInitializeState(string eventHubName,
}
}

/// <summary>
/// Indicates that an <see cref="EventProcessor{TPartition}" /> instance has closed the transport consumer in response
/// to a stop request and a receive operation was aborted.
/// </summary>
///
/// <param name="partitionId">The identifier of the Event Hub partition whose processing is stopping.</param>
/// <param name="identifier">A unique name used to identify the event processor.</param>
/// <param name="eventHubName">The name of the Event Hub that the processor is associated with.</param>
/// <param name="consumerGroup">The name of the consumer group that the processor is associated with.</param>
///
[Event(54, Level = EventLevel.Verbose, Message = "Event Processor successfully closed the transport consumer when stopping processing for partition '{0}' by processor instance with identifier '{1}' for Event Hub: {2} and Consumer Group: {3}.")]
public virtual void EventProcessorPartitionProcessingStopConsumerClose(string partitionId,
string identifier,
string eventHubName,
string consumerGroup)
{
if (IsEnabled())
{
WriteEvent(54, partitionId ?? string.Empty, identifier ?? string.Empty, eventHubName ?? string.Empty, consumerGroup ?? string.Empty);
}
}

/// <summary>
/// Indicates that an exception was encountered in an unexpected code path, not directly associated with
/// an Event Hubs operation.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -685,6 +685,18 @@ async Task performProcessing()
{
consumer = CreateConsumer(ConsumerGroup, partition.PartitionId, startingPosition, connection, Options);

// Register for notification when the cancellation token is triggered. Attempt to close the consumer
// in response to force-close the link and short-circuit any receive operation that is blocked and
// awaiting timeout.

using var cancellationRegistration = cancellationSource.Token.Register(static state =>
{
// Because this is a best-effort attempt and exceptions are expected and not relevant to
// callers, use a fire-and-forget approach rather than awaiting.

_ = ((TransportConsumer)state).CloseAsync(CancellationToken.None);
}, consumer, useSynchronizationContext: false);

// Allow the core dispatching loop to apply an additional set of retries over any provided by the consumer
// itself, as a processor should be as resilient as possible and retain partition ownership if processing is
// able to make forward progress. If the retries are exhausted or a non-retriable exception occurs, the
Expand Down Expand Up @@ -719,6 +731,14 @@ async Task performProcessing()

throw;
}
catch (EventHubsException ex)
when ((ex.Reason == EventHubsException.FailureReason.ClientClosed) && (cancellationSource.IsCancellationRequested))
{
// Do not log as an exception; this is an expected scenario when partition processing is asked to stop.

Logger.EventProcessorPartitionProcessingStopConsumerClose(partition.PartitionId, Identifier, EventHubName, ConsumerGroup);
throw new TaskCanceledException();
}
catch (Exception ex) when (ex.IsNotType<DeveloperCodeException>())
{
// The error handler is invoked as a fire-and-forget task; the processor does not assume responsibility
Expand Down
Loading

0 comments on commit 2b179cf

Please sign in to comment.