Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Event Hubs Client] Processor Stop - Aborts Links #21242

Merged
merged 1 commit into from
May 21, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion sdk/eventhub/Azure.Messaging.EventHubs.Processor/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,19 @@

## 5.5.0-beta.1 (Unreleased)

## 5.5.0-beta.1 (Unreleased)

### Acknowledgments

Thank you to our developer community members who helped to make the Event Hubs client libraries better with their contributions to this release:

- Daniel Marbach _([GitHub](https://github.com/danielmarbach))_

### Changes

#### New Features

- When stopping, the `EventProcessorClient` will now attempt to force-close the connection to the Event Hubs service to abort in-process read operations blocked on their timeout. This should significantly help reduce the amount of time the processor takes to stop in many scenarios. _(Based on a community prototype contribution, courtesy of [danielmarbach](https://github.com/danielmarbach))_

## 5.4.1 (2021-05-11)

Expand Down Expand Up @@ -33,7 +46,7 @@ Thank you to our developer community members who helped to make the Event Hubs c

- The `EventProcessorClient` now supports shared key and shared access signature authentication using the `AzureNamedKeyCredential` and `AzureSasCredential` types in addition to the connection string. Use of the credential allows the shared key or SAS to be updated without the need to create a new processor.

- Multiple enhancements were made to the AMQP transport paths for reading events to reduce memory allocations and increase performance. (A community contribution, courtesy of _[danielmarbach](https://github.com/danielmarbach))_
- Multiple enhancements were made to the AMQP transport paths for reading events to reduce memory allocations and increase performance. _(A community contribution, courtesy of [danielmarbach](https://github.com/danielmarbach))_

#### Key Bug Fixes

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,11 @@
</ItemGroup>

<ItemGroup>
<PackageReference Include="Azure.Messaging.EventHubs" />
<!-- RESTORE PACKAGE REFERENCE FOR RELEASE -->
<!--PackageReference Include="Azure.Messaging.EventHubs" /-->
<ProjectReference Include="$(MSBuildThisFileDirectory)..\..\Azure.Messaging.EventHubs\src\Azure.Messaging.EventHubs.csproj" />
<!-- END RESTORE -->

<PackageReference Include="Azure.Core" />
<PackageReference Include="Azure.Storage.Blobs" />
<PackageReference Include="Microsoft.Azure.Amqp" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -710,6 +710,145 @@ public async Task ProcessorClientDetectsAnInvalidStorageContainer(bool async)
cancellationSource.Cancel();
}

/// <summary>
/// Verifies that the <see cref="EventProcessorClient" /> can stop when no events are
/// available to read.
/// </summary>
///
[Test]
[TestCase(true)]
[TestCase(false)]
public async Task ProcessorClientStopsWithoutWaitingForTimeoutWhenPartitionsAreEmpty(bool async)
jsquire marked this conversation as resolved.
Show resolved Hide resolved
{
// Setup the environment.

await using EventHubScope scope = await EventHubScope.CreateAsync(4);
var connectionString = EventHubsTestEnvironment.Instance.BuildConnectionStringForEventHub(scope.EventHubName);

using var cancellationSource = new CancellationTokenSource();
cancellationSource.CancelAfter(EventHubsTestEnvironment.Instance.TestExecutionTimeLimit);

// Send a single event.

var sentCount = await SendEvents(connectionString, EventGenerator.CreateEvents(1), cancellationSource.Token);
Assert.That(sentCount, Is.EqualTo(1), "A single event should have been sent.");

// Attempt to read events using the longest possible TryTimeout.

var options = new EventProcessorOptions { LoadBalancingStrategy = LoadBalancingStrategy.Greedy, MaximumWaitTime = null };
options.RetryOptions.TryTimeout = EventHubsTestEnvironment.Instance.TestExecutionTimeLimit.Add(TimeSpan.FromSeconds(30));

var processedEvents = new ConcurrentDictionary<string, EventData>();
var completionSource = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
var processor = CreateProcessor(scope.ConsumerGroups.First(), connectionString, options: options);

processor.ProcessErrorAsync += CreateAssertingErrorHandler();
processor.ProcessEventAsync += CreateEventTrackingHandler(sentCount, processedEvents, completionSource, cancellationSource.Token);

await processor.StartProcessingAsync(cancellationSource.Token);

// Once the event has confirmed to have been read, then at least one partition is owned. Any further
// receive attempts will block for the duration of the TryTimeout, which is set to the test execution limit.

await Task.WhenAny(completionSource.Task, Task.Delay(Timeout.Infinite, cancellationSource.Token));
Assert.That(cancellationSource.IsCancellationRequested, Is.False, "The cancellation token should not have been signaled.");

// Stopping should close the consumers used by the processor and allow completion before the
// receive timeout expires. If this isn't successful, the cancellation token will have been signaled.

if (async)
{
await processor.StopProcessingAsync(cancellationSource.Token);
}
else
{
processor.StopProcessing(cancellationSource.Token);
jsquire marked this conversation as resolved.
Show resolved Hide resolved
}

Assert.That(processor.IsRunning, Is.False, "The processor should have stopped.");
Assert.That(cancellationSource.IsCancellationRequested, Is.False, "The cancellation token should not have been signaled.");
jsquire marked this conversation as resolved.
Show resolved Hide resolved

cancellationSource.Cancel();
}

/// <summary>
/// Verifies that the <see cref="EventProcessorClient" /> can stop when no events are
/// available to read.
/// </summary>
///
[Test]
public async Task ProcessorClientCanRestartAfterStopping()
{
// Setup the environment.

await using EventHubScope scope = await EventHubScope.CreateAsync(4);
var connectionString = EventHubsTestEnvironment.Instance.BuildConnectionStringForEventHub(scope.EventHubName);

using var cancellationSource = new CancellationTokenSource();
cancellationSource.CancelAfter(EventHubsTestEnvironment.Instance.TestExecutionTimeLimit);

// Send a single event.

var sentCount = await SendEvents(connectionString, EventGenerator.CreateEvents(1), cancellationSource.Token);
Assert.That(sentCount, Is.EqualTo(1), "A single event should have been sent.");

// Attempt to read events using the longest possible TryTimeout.

var options = new EventProcessorOptions { LoadBalancingStrategy = LoadBalancingStrategy.Greedy, MaximumWaitTime = null };
options.RetryOptions.TryTimeout = EventHubsTestEnvironment.Instance.TestExecutionTimeLimit.Add(TimeSpan.FromSeconds(30));

var processedEvents = new ConcurrentDictionary<string, EventData>();
var completionSource = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
var processor = CreateProcessor(scope.ConsumerGroups.First(), connectionString, options: options);

var activeEventHandler = CreateEventTrackingHandler(sentCount, processedEvents, completionSource, cancellationSource.Token);
processor.ProcessEventAsync += activeEventHandler;

processor.ProcessErrorAsync += CreateAssertingErrorHandler();
await processor.StartProcessingAsync(cancellationSource.Token);

// Once the event has confirmed to have been read, then at least one partition is owned. Any further
// receive attempts will block for the duration of the TryTimeout, which is set to the test execution limit.

await Task.WhenAny(completionSource.Task, Task.Delay(Timeout.Infinite, cancellationSource.Token));
Assert.That(cancellationSource.IsCancellationRequested, Is.False, "The cancellation token should not have been signaled.");

// Stopping should close the consumers used by the processor and allow completion before the
// receive timeout expires. If this isn't successful, the cancellation token will have been signaled.

await processor.StopProcessingAsync(cancellationSource.Token);

Assert.That(processor.IsRunning, Is.False, "The processor should have stopped.");
Assert.That(cancellationSource.IsCancellationRequested, Is.False, "The cancellation token should not have been signaled.");

// Send another single event to prove restart was successful.

sentCount = await SendEvents(connectionString, EventGenerator.CreateEvents(1), cancellationSource.Token);
Assert.That(sentCount, Is.EqualTo(1), "A single event should have been sent.");

// Reset the event handler so that it uses a completion source that hasn't been signaled..

completionSource = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);

processor.ProcessEventAsync -= activeEventHandler;
processor.ProcessEventAsync += CreateEventTrackingHandler(sentCount, processedEvents, completionSource, cancellationSource.Token);

// Restart the processor and confirm that the event was read.

await processor.StartProcessingAsync(cancellationSource.Token);

await Task.WhenAny(completionSource.Task, Task.Delay(Timeout.Infinite, cancellationSource.Token));
Assert.That(cancellationSource.IsCancellationRequested, Is.False, "The cancellation token should not have been signaled.");

// Confirm that an event was read, then stop the processor.

await Task.WhenAny(completionSource.Task, Task.Delay(Timeout.Infinite, cancellationSource.Token));
Assert.That(cancellationSource.IsCancellationRequested, Is.False, "The cancellation token should not have been signaled.");

await processor.StopProcessingAsync().IgnoreExceptions();
cancellationSource.Cancel();
}

/// <summary>
/// Creates an <see cref="EventProcessorClient" /> that uses mock storage and
/// a connection based on a connection string.
Expand Down
13 changes: 12 additions & 1 deletion sdk/eventhub/Azure.Messaging.EventHubs/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,17 @@

## 5.5.0-beta.1 (Unreleased)

### Acknowledgments

Thank you to our developer community members who helped to make the Event Hubs client libraries better with their contributions to this release:

- Daniel Marbach _([GitHub](https://github.com/danielmarbach))_

### Changes

#### New Features

- When stopping, the `EventProcessor<TPartition>` will now attempt to force-close the connection to the Event Hubs service to abort in-process read operations blocked on their timeout. This should significantly help reduce the amount of time the processor takes to stop in many scenarios. _(Based on a community prototype contribution, courtesy of [danielmarbach](https://github.com/danielmarbach))_

## 5.4.1 (2021-05-11)

Expand Down Expand Up @@ -41,7 +52,7 @@ Thank you to our developer community members who helped to make the Event Hubs c

- The `SystemProperties` collection used by `EventData` will not use a shared empty set for events that have not been read from the Event Hubs service, reducing memory allocation.

- Multiple enhancements were made to the transport paths for publishing and reading events to reduce memory allocations and increase performance. (A community contribution, courtesy of _[danielmarbach](https://github.com/danielmarbach))_
- Multiple enhancements were made to the transport paths for publishing and reading events to reduce memory allocations and increase performance. _(A community contribution, courtesy of [danielmarbach](https://github.com/danielmarbach))_

#### Key Bug Fixes

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1071,6 +1071,28 @@ public virtual void IdempotentPublishInitializeState(string eventHubName,
}
}

/// <summary>
/// Indicates that an <see cref="EventProcessor{TPartition}" /> instance has closed the transport consumer in response
/// to a stop request and a receive operation was aborted.
/// </summary>
///
/// <param name="partitionId">The identifier of the Event Hub partition whose processing is stopping.</param>
/// <param name="identifier">A unique name used to identify the event processor.</param>
/// <param name="eventHubName">The name of the Event Hub that the processor is associated with.</param>
/// <param name="consumerGroup">The name of the consumer group that the processor is associated with.</param>
///
[Event(54, Level = EventLevel.Verbose, Message = "Event Processor successfully closed the transport consumer when stopping processing for partition '{0}' by processor instance with identifier '{1}' for Event Hub: {2} and Consumer Group: {3}.")]
public virtual void EventProcessorPartitionProcessingStopConsumerClose(string partitionId,
string identifier,
string eventHubName,
string consumerGroup)
{
if (IsEnabled())
{
WriteEvent(54, partitionId ?? string.Empty, identifier ?? string.Empty, eventHubName ?? string.Empty, consumerGroup ?? string.Empty);
}
}

/// <summary>
/// Indicates that an exception was encountered in an unexpected code path, not directly associated with
/// an Event Hubs operation.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -685,6 +685,18 @@ async Task performProcessing()
{
consumer = CreateConsumer(ConsumerGroup, partition.PartitionId, startingPosition, connection, Options);

// Register for notification when the cancellation token is triggered. Attempt to close the consumer
// in response to force-close the link and short-circuit any receive operation that is blocked and
// awaiting timeout.

using var cancellationRegistration = cancellationSource.Token.Register(static state =>
{
// Because this is a best-effort attempt and exceptions are expected and not relevant to
// callers, use a fire-and-forget approach rather than awaiting.

_ = ((TransportConsumer)state).CloseAsync(CancellationToken.None);
}, consumer, useSynchronizationContext: false);

// Allow the core dispatching loop to apply an additional set of retries over any provided by the consumer
// itself, as a processor should be as resilient as possible and retain partition ownership if processing is
// able to make forward progress. If the retries are exhausted or a non-retriable exception occurs, the
Expand Down Expand Up @@ -719,6 +731,14 @@ async Task performProcessing()

throw;
}
catch (EventHubsException ex)
when ((ex.Reason == EventHubsException.FailureReason.ClientClosed) && (cancellationSource.IsCancellationRequested))
{
// Do not log as an exception; this is an expected scenario when partition processing is asked to stop.

Logger.EventProcessorPartitionProcessingStopConsumerClose(partition.PartitionId, Identifier, EventHubName, ConsumerGroup);
throw new TaskCanceledException();
}
catch (Exception ex) when (ex.IsNotType<DeveloperCodeException>())
{
// The error handler is invoked as a fire-and-forget task; the processor does not assume responsibility
Expand Down
Loading