Skip to content

Commit

Permalink
[Event Hub Client] Expiring transport producers Azure#8592 (Azure#9431)
Browse files Browse the repository at this point in the history
The focus of these changes is to add a pool for transport producers used by the EventHubProducerClient which manages their lifespan using a sliding time window approach.


Authored-by: Alberto De Natale <[email protected]>
  • Loading branch information
albertodenatale authored Apr 15, 2020
1 parent 99f0a09 commit b2c31ed
Show file tree
Hide file tree
Showing 9 changed files with 1,542 additions and 146 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ public static EventHubsRetryOptions Clone(this EventHubsRetryOptions instance) =
/// <param name="instance">The instance that this method was invoked on.</param>
///
/// <returns>The <see cref="EventHubsRetryPolicy" /> represented by the options.</returns>
///
public static EventHubsRetryPolicy ToRetryPolicy(this EventHubsRetryOptions instance) =>
instance.CustomRetryPolicy ?? new BasicRetryPolicy(instance);

Expand Down
8 changes: 8 additions & 0 deletions sdk/eventhub/Azure.Messaging.EventHubs/src/Amqp/AmqpError.cs
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,14 @@ private static Exception CreateException(string condition,
return new EventHubsException(eventHubsResource, description, EventHubsException.FailureReason.QuotaExceeded);
}

// The link was closed, generally this exception would be thrown for partition specific producers and would be caused by race conditions
// between an operation and a request to close a client.

if (string.Equals(condition, AmqpErrorCode.IllegalState.Value, StringComparison.InvariantCultureIgnoreCase))
{
return new EventHubsException(eventHubsResource, description, EventHubsException.FailureReason.ClientClosed);
}

// The service does not understand how to process the request.

if (string.Equals(condition, AmqpErrorCode.NotAllowed.Value, StringComparison.InvariantCultureIgnoreCase))
Expand Down

Large diffs are not rendered by default.

236 changes: 123 additions & 113 deletions sdk/eventhub/Azure.Messaging.EventHubs/src/Producer/EventHubProducerClient.cs
100755 → 100644

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ public SendEventOptions()
/// <param name="partitionKey">The hashing key to use for influencing the partition to which the events are routed.</param>
///
internal SendEventOptions(string partitionId,
string partitionKey)
string partitionKey)
{
PartitionId = partitionId;
PartitionKey = partitionKey;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ public static IEnumerable<object[]> SimpleConditionExceptionMatchTestCases()
yield return new object[] { AmqpErrorCode.ResourceLimitExceeded, typeof(EventHubsException), EventHubsException.FailureReason.QuotaExceeded };
yield return new object[] { AmqpErrorCode.NotAllowed, typeof(InvalidOperationException), null };
yield return new object[] { AmqpErrorCode.NotImplemented, typeof(NotSupportedException), null };
yield return new object[] { AmqpErrorCode.IllegalState, typeof(EventHubsException), EventHubsException.FailureReason.ClientClosed };
}

/// <summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2422,9 +2422,17 @@ internal override Task<PartitionProperties> GetPartitionPropertiesAsync(string p
return Task.FromResult(default(PartitionProperties));
}

internal override TransportConsumer CreateTransportConsumer(string consumerGroup, string partitionId, EventPosition eventPosition, EventHubsRetryPolicy retryPolicy, bool trackLastEnqueuedEventProperties = true, long? ownerLevel = default, uint? prefetchCount = default) => TransportConsumerFactory();

internal override TransportClient CreateTransportClient(string fullyQualifiedNamespace, string eventHubName, EventHubTokenCredential credential, EventHubConnectionOptions options)
internal override TransportConsumer CreateTransportConsumer(string consumerGroup,
string partitionId, EventPosition eventPosition,
EventHubsRetryPolicy retryPolicy,
bool trackLastEnqueuedEventProperties = true,
long? ownerLevel = default,
uint? prefetchCount = default) => TransportConsumerFactory();

internal override TransportClient CreateTransportClient(string fullyQualifiedNamespace,
string eventHubName,
EventHubTokenCredential credential,
EventHubConnectionOptions options)
{
var client = new Mock<TransportClient>();

Expand Down
681 changes: 652 additions & 29 deletions sdk/eventhub/Azure.Messaging.EventHubs/tests/Producer/EventHubProducerClientTests.cs
100755 → 100644

Large diffs are not rendered by default.

Large diffs are not rendered by default.

0 comments on commit b2c31ed

Please sign in to comment.