From b2c31ed8de74ca8e2b1ca2b92a3a5e49ebd32bf9 Mon Sep 17 00:00:00 2001 From: Alberto De Natale Date: Wed, 15 Apr 2020 18:10:09 +0200 Subject: [PATCH] [Event Hub Client] Expiring transport producers #8592 (#9431) The focus of these changes is to add a pool for transport producers used by the EventHubProducerClient which manages their lifespan using a sliding time window approach. Authored-by: Alberto De Natale --- .../Core/EventHubsRetryOptionsExtensions.cs | 1 + .../src/Amqp/AmqpError.cs | 8 + .../src/Core/TransportProducerPool.cs | 355 +++++++++ .../src/Producer/EventHubProducerClient.cs | 236 +++--- .../src/Producer/SendEventOptions.cs | 2 +- .../tests/Amqp/AmqpErrorTests.cs | 1 + .../Consumer/EventHubConsumerClientTests.cs | 14 +- .../Producer/EventHubProducerClientTests.cs | 681 +++++++++++++++++- .../Producer/TransportProducerPoolTests.cs | 390 ++++++++++ 9 files changed, 1542 insertions(+), 146 deletions(-) create mode 100644 sdk/eventhub/Azure.Messaging.EventHubs/src/Core/TransportProducerPool.cs mode change 100755 => 100644 sdk/eventhub/Azure.Messaging.EventHubs/src/Producer/EventHubProducerClient.cs mode change 100755 => 100644 sdk/eventhub/Azure.Messaging.EventHubs/tests/Producer/EventHubProducerClientTests.cs create mode 100644 sdk/eventhub/Azure.Messaging.EventHubs/tests/Producer/TransportProducerPoolTests.cs diff --git a/sdk/eventhub/Azure.Messaging.EventHubs.Shared/src/Core/EventHubsRetryOptionsExtensions.cs b/sdk/eventhub/Azure.Messaging.EventHubs.Shared/src/Core/EventHubsRetryOptionsExtensions.cs index ad62dbc5949c7..0895b53e7642f 100755 --- a/sdk/eventhub/Azure.Messaging.EventHubs.Shared/src/Core/EventHubsRetryOptionsExtensions.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs.Shared/src/Core/EventHubsRetryOptionsExtensions.cs @@ -38,6 +38,7 @@ public static EventHubsRetryOptions Clone(this EventHubsRetryOptions instance) = /// The instance that this method was invoked on. /// /// The represented by the options. + /// public static EventHubsRetryPolicy ToRetryPolicy(this EventHubsRetryOptions instance) => instance.CustomRetryPolicy ?? new BasicRetryPolicy(instance); diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/src/Amqp/AmqpError.cs b/sdk/eventhub/Azure.Messaging.EventHubs/src/Amqp/AmqpError.cs index b9e7abfcd0312..f05502d8e2992 100755 --- a/sdk/eventhub/Azure.Messaging.EventHubs/src/Amqp/AmqpError.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs/src/Amqp/AmqpError.cs @@ -193,6 +193,14 @@ private static Exception CreateException(string condition, return new EventHubsException(eventHubsResource, description, EventHubsException.FailureReason.QuotaExceeded); } + // The link was closed, generally this exception would be thrown for partition specific producers and would be caused by race conditions + // between an operation and a request to close a client. + + if (string.Equals(condition, AmqpErrorCode.IllegalState.Value, StringComparison.InvariantCultureIgnoreCase)) + { + return new EventHubsException(eventHubsResource, description, EventHubsException.FailureReason.ClientClosed); + } + // The service does not understand how to process the request. if (string.Equals(condition, AmqpErrorCode.NotAllowed.Value, StringComparison.InvariantCultureIgnoreCase)) diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/src/Core/TransportProducerPool.cs b/sdk/eventhub/Azure.Messaging.EventHubs/src/Core/TransportProducerPool.cs new file mode 100644 index 0000000000000..16430a3c0c9e0 --- /dev/null +++ b/sdk/eventhub/Azure.Messaging.EventHubs/src/Core/TransportProducerPool.cs @@ -0,0 +1,355 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using Azure.Core; + +namespace Azure.Messaging.EventHubs.Core +{ + /// + /// A pool of instances that automatically expire after a period of inactivity. + /// + /// + internal class TransportProducerPool + { + /// The period after which is run. + private static readonly TimeSpan DefaultPerformExpirationPeriod = TimeSpan.FromMinutes(10); + + /// + /// The set of active Event Hub transport-specific producers specific to a given partition; + /// intended to perform delegated operations. + /// + /// + private ConcurrentDictionary Pool { get; } + + /// + /// An abstracted Event Hub transport-specific producer that is associated with the + /// Event Hub gateway rather than a specific partition; intended to perform delegated operations. + /// + /// + public TransportProducer EventHubProducer { get; } + + /// + /// The active connection to the Azure Event Hubs service, enabling client communications for metadata + /// about the associated Event Hub and access to a transport-aware producer. + /// + /// + private EventHubConnection Connection { get; } + + /// + /// The policy to use for determining retry behavior for when an operation fails. + /// + /// + private EventHubsRetryPolicy RetryPolicy { get; } + + /// + /// A reference to a periodically checking every + /// the that are in use and those that can be closed. + /// + /// + private Timer ExpirationTimer { get; } + + /// + /// Initializes a new instance of the class. + /// + /// + internal TransportProducerPool() + { + } + + /// + /// Initializes a new instance of the class. + /// + /// + /// The connection to use for communication with the Event Hubs service. + /// The policy to use for determining retry behavior for when an operation fails. + /// The pool of that is going to be used to store the partition specific . + /// The period after which is run. Overrides . + /// An abstracted Event Hub transport-specific producer that is associated with the Event Hub gateway rather than a specific partition. + /// + public TransportProducerPool(EventHubConnection connection, + EventHubsRetryPolicy retryPolicy, + ConcurrentDictionary pool = default, + TimeSpan? performExpirationPeriod = default, + TransportProducer eventHubProducer = default) + { + Connection = connection; + RetryPolicy = retryPolicy; + Pool = pool ?? new ConcurrentDictionary(); + performExpirationPeriod ??= DefaultPerformExpirationPeriod; + EventHubProducer = eventHubProducer ?? connection.CreateTransportProducer(null, retryPolicy); + + ExpirationTimer = new Timer(CreateExpirationTimerCallback(), + null, + performExpirationPeriod.Value, + performExpirationPeriod.Value); + } + + /// + /// Retrieves a for the requested partition, + /// creating one if needed or extending the expiration for an existing instance. + /// + /// + /// The unique identifier of a partition associated with the Event Hub. + /// The period of inactivity after which a will become eligible for eviction. Overrides . + /// + /// A mapping to the partition id passed in as parameter. + /// + /// + /// There is a slight probability that the returned producer may be closed at any time + /// after it is returned and the caller may want to handle that scenario. + /// + /// + public virtual PooledProducer GetPooledProducer(string partitionId, + TimeSpan? removeAfterDuration = default) + { + if (string.IsNullOrEmpty(partitionId)) + { + return new PooledProducer(EventHubProducer); + } + + var identifier = Guid.NewGuid().ToString(); + + var item = Pool.GetOrAdd(partitionId, id => new PoolItem(partitionId, Connection.CreateTransportProducer(id, RetryPolicy), removeAfterDuration)); + + // A race condition at this point may end with CloseAsync called on + // the returned PoolItem if it had expired. The probability is very low and + // possible exceptions should be handled by the invoking methods. + + if (item.PartitionProducer.IsClosed || !item.ActiveInstances.TryAdd(identifier, 0)) + { + identifier = Guid.NewGuid().ToString(); + item = Pool.GetOrAdd(partitionId, id => new PoolItem(partitionId, Connection.CreateTransportProducer(id, RetryPolicy), removeAfterDuration)); + item.ActiveInstances.TryAdd(identifier, 0); + } + + item.ExtendRemoveAfter(removeAfterDuration); + + return new PooledProducer(item.PartitionProducer, cleanUp: producer => + { + Argument.AssertNotNull(producer, nameof(producer)); + + if (Pool.TryGetValue(partitionId, out PoolItem pooledItem)) + { + pooledItem.ExtendRemoveAfter(removeAfterDuration); + } + + // If TryRemove returned false the PoolItem would not be closed deterministically + // and the ExpirationTimer callback would eventually remove it from the + // Pool leaving to the Garbage Collector the responsability of closing + // the TransportProducer and the AMQP link. + + item.ActiveInstances.TryRemove(identifier, out _); + + // The second TryGetValue runs after the extension would have been seen, so it + // is intended to be sure that the item wasn't removed in the meantime. + + if (!Pool.TryGetValue(partitionId, out _) && !item.ActiveInstances.Any()) + { + return producer.CloseAsync(CancellationToken.None); + } + + return Task.CompletedTask; + }); + } + + /// + /// Closes the producers in the pool and performs any cleanup necessary + /// for resources used by the . + /// + /// + /// A task to be resolved on when the operation has completed. + /// + public virtual async Task CloseAsync(CancellationToken cancellationToken = default) + { + try + { + ExpirationTimer.Dispose(); + } + catch (Exception) + { + } + + var pendingCloses = new List(); + + pendingCloses.Add(EventHubProducer.CloseAsync(cancellationToken)); + + foreach (var poolItem in Pool.Values) + { + pendingCloses.Add(poolItem.PartitionProducer.CloseAsync(cancellationToken)); + } + + Pool.Clear(); + + await Task.WhenAll(pendingCloses).ConfigureAwait(false); + } + + /// + /// Returns a that will search for all the expired + /// in the and will try to close those that have expired. + /// + /// + /// A that is periodically run every . + /// + private TimerCallback CreateExpirationTimerCallback() + { + return _ => + { + // Capture the timestamp to use a consistent value. + var now = DateTimeOffset.UtcNow; + + foreach (var key in Pool.Keys.ToList()) + { + if (Pool.TryGetValue(key, out var poolItem)) + { + if (poolItem.RemoveAfter <= now) + { + if (Pool.TryRemove(key, out var _) && !poolItem.ActiveInstances.Any()) + { + // At this point the pool item may have been closed already + // if there was a context switch between the if conditions + // and the pool item clean up kicked in. + +#pragma warning disable AZC0102 // Do not use GetAwaiter().GetResult(). Use the TaskExtensions.EnsureCompleted() extension method instead. + poolItem.PartitionProducer.CloseAsync(CancellationToken.None).GetAwaiter().GetResult(); +#pragma warning restore AZC0102 // Do not use GetAwaiter().GetResult(). Use the TaskExtensions.EnsureCompleted() extension method instead. + } + } + } + } + }; + } + + /// + /// An item of the pool, adding tracking information to a . + /// + /// + internal class PoolItem + { + /// The period of inactivity after which an item would be removed from the pool. + internal static readonly TimeSpan DefaultRemoveAfterDuration = TimeSpan.FromMinutes(10); + + /// + /// An abstracted Event Hub transport-specific that is associated with a specific partition. + /// + /// + public TransportProducer PartitionProducer { get; private set; } + + /// + /// The unique identifier of a partition associated with the Event Hub. + /// + /// + public string PartitionId { get; private set; } + + /// + /// A set of unique identifiers used to track which instances of a are active. + /// + /// + public ConcurrentDictionary ActiveInstances { get; } = new ConcurrentDictionary(); + + /// + /// The UTC date and time when a will become eligible for eviction. + /// + /// + public DateTimeOffset RemoveAfter { get; set; } + + /// + /// Extends the UTC date and time when will become eligible for eviction. + /// + /// + /// The period of inactivity after which a will become eligible for eviction. + /// + public void ExtendRemoveAfter(TimeSpan? removeAfterDuration) + { + RemoveAfter = DateTimeOffset.UtcNow.Add(removeAfterDuration ?? DefaultRemoveAfterDuration); + } + + /// + /// Initializes a new instance of the class with a default timespan of . + /// + /// + /// The unique identifier of a partition associated with the Event Hub. + /// An Event Hub transport-specific producer specific to a given partition. + /// The interval after which a will become eligible for eviction. Overrides . + /// The UTC date and time when a will become eligible for eviction. + /// + public PoolItem(string partitionId, + TransportProducer partitionProducer, + TimeSpan? removeAfterDuration = default, + DateTimeOffset? removeAfter = default) + { + Argument.AssertNotNullOrEmpty(partitionId, nameof(partitionId)); + Argument.AssertNotNull(partitionProducer, nameof(partitionProducer)); + + PartitionProducer = partitionProducer; + PartitionId = partitionId; + + if (removeAfter == default) + { + ExtendRemoveAfter(removeAfterDuration); + } + else + { + RemoveAfter = removeAfter.Value; + } + } + } + + /// + /// A class wrapping a , triggering a clean-up when the object is disposed. + /// + /// + internal class PooledProducer: IAsyncDisposable + { + /// + /// A function responsible of cleaning up the resources in use. + /// + /// + private Func CleanUp { get; } + + /// + /// An abstracted Event Hub transport-specific producer that is associated with the + /// Event Hub gateway or a specific partition. + /// + /// + public TransportProducer TransportProducer { get; } + + /// + /// Initializes a new instance of the class. + /// + /// + /// An abstracted Event Hub transport-specific producer that is associated with the Event Hub gateway or a specific partition. + /// The function responsible of cleaning up the resources in use. + /// + public PooledProducer(TransportProducer transportProducer, + Func cleanUp = default) + { + Argument.AssertNotNull(transportProducer, nameof(transportProducer)); + + TransportProducer = transportProducer; + CleanUp = cleanUp; + } + + /// + /// Performs the task needed to clean up resources used by the . + /// + /// + /// A task to be resolved on when the operation has completed. + /// + public virtual ValueTask DisposeAsync() + { + if (CleanUp != default) + { + return new ValueTask(CleanUp(TransportProducer)); + } + + return new ValueTask(Task.CompletedTask); + } + } + } +} diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/src/Producer/EventHubProducerClient.cs b/sdk/eventhub/Azure.Messaging.EventHubs/src/Producer/EventHubProducerClient.cs old mode 100755 new mode 100644 index 8642a13cf38e9..5725bb8fcc489 --- a/sdk/eventhub/Azure.Messaging.EventHubs/src/Producer/EventHubProducerClient.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs/src/Producer/EventHubProducerClient.cs @@ -2,7 +2,6 @@ // Licensed under the MIT License. using System; -using System.Collections.Concurrent; using System.Collections.Generic; using System.ComponentModel; using System.Diagnostics.CodeAnalysis; @@ -36,12 +35,18 @@ namespace Azure.Messaging.EventHubs.Producer /// public class EventHubProducerClient : IAsyncDisposable { + /// The maximum number of attempts that may be made to get a from the pool. + internal const int MaximumCreateProducerAttempts = 3; + /// The minimum allowable size, in bytes, for a batch to be sent. internal const int MinimumBatchSizeLimit = 24; /// The set of default publishing options to use when no specific options are requested. private static readonly SendEventOptions DefaultSendOptions = new SendEventOptions(); + /// Sets how long a dedicated would sit in memory before its would remove and close it. + private static readonly TimeSpan PartitionProducerLifespan = TimeSpan.FromMinutes(5); + /// /// The fully qualified Event Hubs namespace that the producer is associated with. This is likely /// to be similar to {yournamespace}.servicebus.windows.net. @@ -57,7 +62,7 @@ public class EventHubProducerClient : IAsyncDisposable public string EventHubName => Connection.EventHubName; /// - /// Indicates whether or not this has been closed. + /// Indicates whether or not this has been closed. /// /// /// @@ -87,21 +92,13 @@ public class EventHubProducerClient : IAsyncDisposable private EventHubConnection Connection { get; } /// - /// An abstracted Event Hub transport-specific producer that is associated with the - /// Event Hub gateway rather than a specific partition; intended to perform delegated operations. - /// - /// - private TransportProducer EventHubProducer { get; } - - /// - /// The set of active Event Hub transport-specific producers created by this client which are specific to - /// a given partition; intended to perform delegated operations. + /// A used to manage a set of partition specific . /// /// - private ConcurrentDictionary PartitionProducers { get; } = new ConcurrentDictionary(); + private TransportProducerPool PartitionProducerPool { get; } /// - /// Initializes a new instance of the class. + /// Initializes a new instance of the class. /// /// /// The connection string to use for connecting to the Event Hubs namespace; it is expected that the Event Hub name and the shared key properties are contained in this connection string. @@ -122,7 +119,7 @@ public EventHubProducerClient(string connectionString) : this(connectionString, } /// - /// Initializes a new instance of the class. + /// Initializes a new instance of the class. /// /// /// The connection string to use for connecting to the Event Hubs namespace; it is expected that the Event Hub name and the shared key properties are contained in this connection string. @@ -145,7 +142,7 @@ public EventHubProducerClient(string connectionString, } /// - /// Initializes a new instance of the class. + /// Initializes a new instance of the class. /// /// /// The connection string to use for connecting to the Event Hubs namespace; it is expected that the shared key properties are contained in this connection string, but not the Event Hub name. @@ -165,7 +162,7 @@ public EventHubProducerClient(string connectionString, } /// - /// Initializes a new instance of the class. + /// Initializes a new instance of the class. /// /// /// The connection string to use for connecting to the Event Hubs namespace; it is expected that the shared key properties are contained in this connection string, but not the Event Hub name. @@ -190,11 +187,11 @@ public EventHubProducerClient(string connectionString, OwnsConnection = true; Connection = new EventHubConnection(connectionString, eventHubName, clientOptions.ConnectionOptions); RetryPolicy = clientOptions.RetryOptions.ToRetryPolicy(); - EventHubProducer = Connection.CreateTransportProducer(null, RetryPolicy); + PartitionProducerPool = new TransportProducerPool(Connection, RetryPolicy); } /// - /// Initializes a new instance of the class. + /// Initializes a new instance of the class. /// /// /// The fully qualified Event Hubs namespace to connect to. This is likely to be similar to {yournamespace}.servicebus.windows.net. @@ -216,11 +213,11 @@ public EventHubProducerClient(string fullyQualifiedNamespace, OwnsConnection = true; Connection = new EventHubConnection(fullyQualifiedNamespace, eventHubName, credential, clientOptions.ConnectionOptions); RetryPolicy = clientOptions.RetryOptions.ToRetryPolicy(); - EventHubProducer = Connection.CreateTransportProducer(null, RetryPolicy); + PartitionProducerPool = new TransportProducerPool(Connection, RetryPolicy); } /// - /// Initializes a new instance of the class. + /// Initializes a new instance of the class. /// /// /// The connection to use for communication with the Event Hubs service. @@ -235,15 +232,16 @@ public EventHubProducerClient(EventHubConnection connection, OwnsConnection = false; Connection = connection; RetryPolicy = clientOptions.RetryOptions.ToRetryPolicy(); - EventHubProducer = Connection.CreateTransportProducer(null, RetryPolicy); + PartitionProducerPool = new TransportProducerPool(Connection, RetryPolicy); } /// - /// Initializes a new instance of the class. + /// Initializes a new instance of the class. /// /// /// The connection to use as the basis for delegation of client-type operations. /// The transport producer instance to use as the basis for service communication. + /// A used to manage a set of partition specific . /// /// /// This constructor is intended to be used internally for functional @@ -251,18 +249,20 @@ public EventHubProducerClient(EventHubConnection connection, /// /// internal EventHubProducerClient(EventHubConnection connection, - TransportProducer transportProducer) + TransportProducer transportProducer, + TransportProducerPool partitionProducerPool = default) { Argument.AssertNotNull(connection, nameof(connection)); Argument.AssertNotNull(transportProducer, nameof(transportProducer)); OwnsConnection = false; Connection = connection; - EventHubProducer = transportProducer; + RetryPolicy = new EventHubsRetryOptions().ToRetryPolicy(); + PartitionProducerPool = partitionProducerPool ?? new TransportProducerPool(Connection, RetryPolicy, eventHubProducer: transportProducer); } /// - /// Initializes a new instance of the class. + /// Initializes a new instance of the class. /// /// protected EventHubProducerClient() @@ -275,7 +275,7 @@ protected EventHubProducerClient() /// the number of partitions present and their identifiers. /// /// - /// An optional instance to signal the request to cancel the operation. + /// An optional instance to signal the request to cancel the operation. /// /// The set of information for the Event Hub that this client is associated with. /// @@ -289,12 +289,12 @@ public virtual async Task GetEventHubPropertiesAsync(Cancell /// Retrieves the set of identifiers for the partitions of an Event Hub. /// /// - /// An optional instance to signal the request to cancel the operation. + /// An optional instance to signal the request to cancel the operation. /// /// The set of identifiers for the partitions within the Event Hub that this client is associated with. /// /// - /// This method is synonymous with invoking and reading the + /// This method is synonymous with invoking and reading the /// property that is returned. It is offered as a convenience for quick access to the set of partition identifiers for the associated Event Hub. /// No new or extended information is presented. /// @@ -312,7 +312,7 @@ public virtual async Task GetPartitionIdsAsync(CancellationToken cance /// /// /// The unique identifier of a partition associated with the Event Hub. - /// An optional instance to signal the request to cancel the operation. + /// An optional instance to signal the request to cancel the operation. /// /// The set of information for the requested partition under the Event Hub this client is associated with. /// @@ -329,7 +329,7 @@ public virtual async Task GetPartitionPropertiesAsync(strin /// /// /// The event data to send. - /// An optional instance to signal the request to cancel the operation. + /// An optional instance to signal the request to cancel the operation. /// /// A task to be resolved on when the operation has completed. /// @@ -352,7 +352,7 @@ internal virtual async Task SendAsync(EventData eventData, /// /// The event data to send. /// The set of options to consider when sending this batch. - /// An optional instance to signal the request to cancel the operation. + /// An optional instance to signal the request to cancel the operation. /// /// A task to be resolved on when the operation has completed. /// @@ -375,7 +375,7 @@ internal virtual async Task SendAsync(EventData eventData, /// /// /// The set of event data to send. - /// An optional instance to signal the request to cancel the operation. + /// An optional instance to signal the request to cancel the operation. /// /// A task to be resolved on when the operation has completed. /// @@ -391,7 +391,7 @@ internal virtual async Task SendAsync(IEnumerable events, /// /// The set of event data to send. /// The set of options to consider when sending this batch. - /// An optional instance to signal the request to cancel the operation. + /// An optional instance to signal the request to cancel the operation. /// /// A task to be resolved on when the operation has completed. /// @@ -409,26 +409,7 @@ internal virtual async Task SendAsync(IEnumerable events, Argument.AssertNotNull(events, nameof(events)); AssertSinglePartitionReference(options.PartitionId, options.PartitionKey); - // Determine the transport producer to delegate the send operation to. Because sending to a specific - // partition requires a dedicated client, use (or create) that client if a partition was specified. Otherwise - // the default gateway producer can be used to request automatic routing from the Event Hubs service gateway. - - TransportProducer activeProducer; - - if (string.IsNullOrEmpty(options.PartitionId)) - { - activeProducer = EventHubProducer; - } - else - { - // This assertion is intended as an additional check, not as a guarantee. There still exists a benign - // race condition where a transport producer may be created after the client has been closed; in this case - // the transport producer will be force-closed with the associated connection or, worst case, will close once - // its idle timeout period elapses. - - Argument.AssertNotClosed(IsClosed, nameof(EventHubProducerClient)); - activeProducer = PartitionProducers.GetOrAdd(options.PartitionId, id => Connection.CreateTransportProducer(id, RetryPolicy)); - } + int attempts = 0; events = (events as IList) ?? events.ToList(); InstrumentMessages(events); @@ -445,15 +426,37 @@ internal virtual async Task SendAsync(IEnumerable events, using DiagnosticScope scope = CreateDiagnosticScope(diagnosticIdentifiers); - try - { - await activeProducer.SendAsync(events, options, cancellationToken).ConfigureAwait(false); - } - catch (Exception ex) + var pooledProducer = PartitionProducerPool.GetPooledProducer(options.PartitionId, PartitionProducerLifespan); + + while (!cancellationToken.IsCancellationRequested) { - scope.Failed(ex); - throw; + try + { + await using var _ = pooledProducer.ConfigureAwait(false); + + await pooledProducer.TransportProducer.SendAsync(events, options, cancellationToken).ConfigureAwait(false); + + return; + } + catch (EventHubsException eventHubException) + when (eventHubException.Reason == EventHubsException.FailureReason.ClientClosed && ShouldRecreateProducer(pooledProducer.TransportProducer, options.PartitionId)) + { + if (++attempts >= MaximumCreateProducerAttempts) + { + scope.Failed(eventHubException); + throw; + } + + pooledProducer = PartitionProducerPool.GetPooledProducer(options.PartitionId, PartitionProducerLifespan); + } + catch (Exception ex) + { + scope.Failed(ex); + throw; + } } + + throw new TaskCanceledException(); } /// @@ -461,7 +464,7 @@ internal virtual async Task SendAsync(IEnumerable events, /// /// /// The set of event data to send. A batch may be created using . - /// An optional instance to signal the request to cancel the operation. + /// An optional instance to signal the request to cancel the operation. /// /// A task to be resolved on when the operation has completed. /// @@ -477,43 +480,45 @@ public virtual async Task SendAsync(EventDataBatch eventBatch, Argument.AssertNotNull(eventBatch, nameof(eventBatch)); AssertSinglePartitionReference(eventBatch.SendOptions.PartitionId, eventBatch.SendOptions.PartitionKey); - // Determine the transport producer to delegate the send operation to. Because sending to a specific - // partition requires a dedicated client, use (or create) that client if a partition was specified. Otherwise - // the default gateway producer can be used to request automatic routing from the Event Hubs service gateway. + int attempts = 0; + using DiagnosticScope scope = CreateDiagnosticScope(eventBatch.GetEventDiagnosticIdentifiers()); - TransportProducer activeProducer; + var pooledProducer = PartitionProducerPool.GetPooledProducer(eventBatch.SendOptions.PartitionId, PartitionProducerLifespan); - if (string.IsNullOrEmpty(eventBatch.SendOptions.PartitionId)) - { - activeProducer = EventHubProducer; - } - else + while (!cancellationToken.IsCancellationRequested) { - // This assertion is intended as an additional check, not as a guarantee. There still exists a benign - // race condition where a transport producer may be created after the client has been closed; in this case - // the transport producer will be force-closed with the associated connection or, worst case, will close once - // its idle timeout period elapses. + try + { + await using var _ = pooledProducer.ConfigureAwait(false); - Argument.AssertNotClosed(IsClosed, nameof(EventHubProducerClient)); - activeProducer = PartitionProducers.GetOrAdd(eventBatch.SendOptions.PartitionId, id => Connection.CreateTransportProducer(id, RetryPolicy)); - } + eventBatch.Lock(); + await pooledProducer.TransportProducer.SendAsync(eventBatch, cancellationToken).ConfigureAwait(false); - using DiagnosticScope scope = CreateDiagnosticScope(eventBatch.GetEventDiagnosticIdentifiers()); + return; + } + catch (EventHubsException eventHubException) + when (eventHubException.Reason == EventHubsException.FailureReason.ClientClosed && ShouldRecreateProducer(pooledProducer.TransportProducer, eventBatch.SendOptions.PartitionId)) + { + if (++attempts >= MaximumCreateProducerAttempts) + { + scope.Failed(eventHubException); + throw; + } - try - { - eventBatch.Lock(); - await activeProducer.SendAsync(eventBatch, cancellationToken).ConfigureAwait(false); - } - catch (Exception ex) - { - scope.Failed(ex); - throw; - } - finally - { - eventBatch.Unlock(); + pooledProducer = PartitionProducerPool.GetPooledProducer(eventBatch.SendOptions.PartitionId, PartitionProducerLifespan); + } + catch (Exception ex) + { + scope.Failed(ex); + throw; + } + finally + { + eventBatch.Unlock(); + } } + + throw new TaskCanceledException(); } /// @@ -525,7 +530,7 @@ public virtual async Task SendAsync(EventDataBatch eventBatch, /// attempting to send the events to the Event Hubs service. /// /// - /// An optional instance to signal the request to cancel the operation. + /// An optional instance to signal the request to cancel the operation. /// /// An with the default batch options. /// @@ -543,7 +548,7 @@ public virtual async Task SendAsync(EventDataBatch eventBatch, /// /// /// The set of options to consider when creating this batch. - /// An optional instance to signal the request to cancel the operation. + /// An optional instance to signal the request to cancel the operation. /// /// An with the requested . /// @@ -555,7 +560,7 @@ public virtual async ValueTask CreateBatchAsync(CreateBatchOptio options = options?.Clone() ?? new CreateBatchOptions(); AssertSinglePartitionReference(options.PartitionId, options.PartitionKey); - TransportEventBatch transportBatch = await EventHubProducer.CreateBatchAsync(options, cancellationToken).ConfigureAwait(false); + TransportEventBatch transportBatch = await PartitionProducerPool.EventHubProducer.CreateBatchAsync(options, cancellationToken).ConfigureAwait(false); return new EventDataBatch(transportBatch, FullyQualifiedNamespace, EventHubName, options.ToSendOptions()); } @@ -563,7 +568,7 @@ public virtual async ValueTask CreateBatchAsync(CreateBatchOptio /// Closes the producer. /// /// - /// An optional instance to signal the request to cancel the operation. + /// An optional instance to signal the request to cancel the operation. /// /// A task to be resolved on when the operation has completed. /// @@ -581,29 +586,19 @@ public virtual async Task CloseAsync(CancellationToken cancellationToken = defau var identifier = GetHashCode().ToString(); EventHubsEventSource.Log.ClientCloseStart(typeof(EventHubProducerClient), EventHubName, identifier); - // Attempt to close the active transport producers. In the event that an exception is encountered, + // Attempt to close the pool of producers. In the event that an exception is encountered, // it should not impact the attempt to close the connection, assuming ownership. - var transportProducerException = default(Exception); + var transportProducerPoolException = default(Exception); try { - await EventHubProducer.CloseAsync(cancellationToken).ConfigureAwait(false); - - var pendingCloses = new List(); - - foreach (var producer in PartitionProducers.Values) - { - pendingCloses.Add(producer.CloseAsync(CancellationToken.None)); - } - - PartitionProducers.Clear(); - await Task.WhenAll(pendingCloses).ConfigureAwait(false); + await PartitionProducerPool.CloseAsync(cancellationToken).ConfigureAwait(false); } catch (Exception ex) { EventHubsEventSource.Log.ClientCloseError(typeof(EventHubProducerClient), EventHubName, identifier, ex.Message); - transportProducerException = ex; + transportProducerPoolException = ex; } // An exception when closing the connection supersedes one observed when closing the @@ -626,12 +621,12 @@ public virtual async Task CloseAsync(CancellationToken cancellationToken = defau EventHubsEventSource.Log.ClientCloseComplete(typeof(EventHubProducerClient), EventHubName, identifier); } - // If there was an active exception pending from closing the individual - // transport producers, surface it now. + // If there was an active exception pending from closing the + // transport producer pool, surface it now. - if (transportProducerException != default) + if (transportProducerPoolException != default) { - ExceptionDispatchInfo.Capture(transportProducerException).Throw(); + ExceptionDispatchInfo.Capture(transportProducerPoolException).Throw(); } } @@ -733,5 +728,20 @@ private static void AssertSinglePartitionReference(string partitionId, throw new InvalidOperationException(string.Format(CultureInfo.CurrentCulture, Resources.CannotSendWithPartitionIdAndPartitionKey, partitionId)); } } + + /// + /// Checks if the returned by the is still open. + /// + /// + /// The that has being checked. + /// The unique identifier of a partition associated with the Event Hub. + /// + /// true if the specified is closed; otherwise, false. + /// + private bool ShouldRecreateProducer(TransportProducer producer, + string partitionId) => !string.IsNullOrEmpty(partitionId) + && producer.IsClosed + && !IsClosed + && !Connection.IsClosed; } } diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/src/Producer/SendEventOptions.cs b/sdk/eventhub/Azure.Messaging.EventHubs/src/Producer/SendEventOptions.cs index ed4bdab9fb878..04eb6e3eab4bf 100755 --- a/sdk/eventhub/Azure.Messaging.EventHubs/src/Producer/SendEventOptions.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs/src/Producer/SendEventOptions.cs @@ -80,7 +80,7 @@ public SendEventOptions() /// The hashing key to use for influencing the partition to which the events are routed. /// internal SendEventOptions(string partitionId, - string partitionKey) + string partitionKey) { PartitionId = partitionId; PartitionKey = partitionKey; diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/tests/Amqp/AmqpErrorTests.cs b/sdk/eventhub/Azure.Messaging.EventHubs/tests/Amqp/AmqpErrorTests.cs index 2e026a8229dce..3122ceb9f431a 100755 --- a/sdk/eventhub/Azure.Messaging.EventHubs/tests/Amqp/AmqpErrorTests.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs/tests/Amqp/AmqpErrorTests.cs @@ -41,6 +41,7 @@ public static IEnumerable SimpleConditionExceptionMatchTestCases() yield return new object[] { AmqpErrorCode.ResourceLimitExceeded, typeof(EventHubsException), EventHubsException.FailureReason.QuotaExceeded }; yield return new object[] { AmqpErrorCode.NotAllowed, typeof(InvalidOperationException), null }; yield return new object[] { AmqpErrorCode.NotImplemented, typeof(NotSupportedException), null }; + yield return new object[] { AmqpErrorCode.IllegalState, typeof(EventHubsException), EventHubsException.FailureReason.ClientClosed }; } /// diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/tests/Consumer/EventHubConsumerClientTests.cs b/sdk/eventhub/Azure.Messaging.EventHubs/tests/Consumer/EventHubConsumerClientTests.cs index 28d2a9a9c620d..152f0bdff8ceb 100755 --- a/sdk/eventhub/Azure.Messaging.EventHubs/tests/Consumer/EventHubConsumerClientTests.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs/tests/Consumer/EventHubConsumerClientTests.cs @@ -2422,9 +2422,17 @@ internal override Task GetPartitionPropertiesAsync(string p return Task.FromResult(default(PartitionProperties)); } - internal override TransportConsumer CreateTransportConsumer(string consumerGroup, string partitionId, EventPosition eventPosition, EventHubsRetryPolicy retryPolicy, bool trackLastEnqueuedEventProperties = true, long? ownerLevel = default, uint? prefetchCount = default) => TransportConsumerFactory(); - - internal override TransportClient CreateTransportClient(string fullyQualifiedNamespace, string eventHubName, EventHubTokenCredential credential, EventHubConnectionOptions options) + internal override TransportConsumer CreateTransportConsumer(string consumerGroup, + string partitionId, EventPosition eventPosition, + EventHubsRetryPolicy retryPolicy, + bool trackLastEnqueuedEventProperties = true, + long? ownerLevel = default, + uint? prefetchCount = default) => TransportConsumerFactory(); + + internal override TransportClient CreateTransportClient(string fullyQualifiedNamespace, + string eventHubName, + EventHubTokenCredential credential, + EventHubConnectionOptions options) { var client = new Mock(); diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/tests/Producer/EventHubProducerClientTests.cs b/sdk/eventhub/Azure.Messaging.EventHubs/tests/Producer/EventHubProducerClientTests.cs old mode 100755 new mode 100644 index c136d1a34d62a..4e39075e9eb91 --- a/sdk/eventhub/Azure.Messaging.EventHubs/tests/Producer/EventHubProducerClientTests.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs/tests/Producer/EventHubProducerClientTests.cs @@ -2,6 +2,7 @@ // Licensed under the MIT License. using System; +using System.Collections.Concurrent; using System.Collections.Generic; using System.Linq; using System.Reflection; @@ -215,7 +216,7 @@ public void ProducerDelegatesForTheEventHubName() } /// - /// Verifies functionality of the + /// Verifies functionality of the /// method. /// /// @@ -232,7 +233,7 @@ public async Task GetEventHubPropertiesAsyncUsesTheRetryPolicy() } /// - /// Verifies functionality of the + /// Verifies functionality of the /// method. /// /// @@ -249,7 +250,7 @@ public async Task GetPartitionIdsUsesTheRetryPolicy() } /// - /// Verifies functionality of the + /// Verifies functionality of the /// method. /// /// @@ -266,7 +267,7 @@ public async Task GetPartitionPropertiesUsesTheRetryPolicy() } /// - /// Verifies functionality of the + /// Verifies functionality of the /// method. /// /// @@ -278,7 +279,7 @@ public void SendSingleWithoutOptionsRequiresAnEvent() } /// - /// Verifies functionality of the + /// Verifies functionality of the /// method. /// /// @@ -290,7 +291,7 @@ public void SendSingleRequiresAnEvent() } /// - /// Verifies functionality of the + /// Verifies functionality of the /// method. /// /// @@ -309,7 +310,7 @@ public async Task SendSingleWithoutOptionsDelegatesToBatchSend() } /// - /// Verifies functionality of the + /// Verifies functionality of the /// method. /// /// @@ -328,7 +329,7 @@ public async Task SendSingleWitOptionsDelegatesToBatchSend() } /// - /// Verifies functionality of the + /// Verifies functionality of the /// method. /// /// @@ -342,7 +343,7 @@ public void SendWithoutOptionsRequiresEvents() } /// - /// Verifies functionality of the + /// Verifies functionality of the /// method. /// /// @@ -356,7 +357,7 @@ public void SendRequiresEvents() } /// - /// Verifies functionality of the + /// Verifies functionality of the /// method. /// /// @@ -370,7 +371,7 @@ public void SendRequiresTheBatch() } /// - /// Verifies functionality of the + /// Verifies functionality of the /// method. /// /// @@ -386,7 +387,7 @@ public void SendAllowsAPartitionHashKey() } /// - /// Verifies functionality of the + /// Verifies functionality of the /// method. /// /// @@ -402,7 +403,7 @@ public void SendAllowsAPartitionHashKeyWithABatch() } /// - /// Verifies functionality of the + /// Verifies functionality of the /// method. /// /// @@ -418,7 +419,7 @@ public void SendForASpecificPartitionDoesNotAllowAPartitionHashKey() } /// - /// Verifies functionality of the + /// Verifies functionality of the /// method. /// /// @@ -434,7 +435,7 @@ public void SendForASpecificPartitionDoesNotAllowAPartitionHashKeyWithABatch() } /// - /// Verifies functionality of the + /// Verifies functionality of the /// method. /// /// @@ -454,7 +455,7 @@ public async Task SendWithoutOptionsInvokesTheTransportProducer() } /// - /// Verifies functionality of the + /// Verifies functionality of the /// method. /// /// @@ -475,7 +476,7 @@ public async Task SendInvokesTheTransportProducer() } /// - /// Verifies functionality of the + /// Verifies functionality of the /// method. /// /// @@ -546,7 +547,7 @@ public void CreateBatchForASpecificPartitionDoesNotAllowAPartitionHashKey() } /// - /// Verifies functionality of the + /// Verifies functionality of the /// method. /// /// @@ -566,7 +567,7 @@ public async Task CreateBatchInvokesTheTransportProducer() } /// - /// Verifies functionality of the + /// Verifies functionality of the /// method. /// /// @@ -586,7 +587,7 @@ public async Task CreateBatchDefaultsBatchOptions() } /// - /// Verifies functionality of the + /// Verifies functionality of the /// method. /// /// @@ -634,15 +635,16 @@ public async Task CloseAsyncClosesTheTransportProducers() /// /// [Test] - public async Task CloseAsyncSurfacesExceptionsForTransportConsumers() + public async Task CloseAsyncSurfacesExceptionsForTransportProducers() { var mockTransportProducer = new Mock(); var mockConnection = new MockConnection(() => mockTransportProducer.Object); + var mockTransportProducerPool = new Mock(); var mockBatch = new EventDataBatch(new MockTransportBatch(), "ns", "eh", new SendEventOptions { PartitionId = "1" }); - var producer = new EventHubProducerClient(mockConnection); + var producer = new EventHubProducerClient(mockConnection, mockTransportProducer.Object, mockTransportProducerPool.Object); - mockTransportProducer - .Setup(producer => producer.CloseAsync(It.IsAny())) + mockTransportProducerPool + .Setup(pool => pool.CloseAsync(It.IsAny())) .Returns(Task.FromException(new InvalidCastException())); try @@ -685,6 +687,554 @@ public async Task CloseAsyncDoesNotCloseTheConnectionWhenNotOwned() Assert.That(connection.IsClosed, Is.False); } + /// + /// Verifies that when calling + /// a is taken from a + /// when a partition id is specified. + /// + /// + [Test] + public async Task EventHubProducerClientShouldPickAnItemFromPool() + { + var options = new SendEventOptions { PartitionId = "0" }; + var transportProducer = new ObservableTransportProducerMock(); + var eventHubConnection = new MockConnection(() => transportProducer); + var retryPolicy = new EventHubProducerClientOptions().RetryOptions.ToRetryPolicy(); + var mockTransportProducerPool = new MockTransportProducerPool(new ObservableTransportProducerMock(), eventHubConnection, retryPolicy); + var mockPooledProducer = mockTransportProducerPool.GetPooledProducer(options.PartitionId) as MockPooledProducer; + var producerClient = new EventHubProducerClient(eventHubConnection, transportProducer, mockTransportProducerPool); + var events = new EventData[0]; + + await producerClient.SendAsync(events, options); + + Assert.That(mockTransportProducerPool.GetPooledProducerWasCalled, Is.True, $"The method { nameof(TransportProducerPool.GetPooledProducer) } should have been called."); + } + + /// + /// Verifies that when calling for batches + /// a is taken from a when a partition id is specified. + /// + /// + [Test] + public async Task EventHubProducerClientShouldPickAnItemFromPoolWithABatch() + { + var batchOptions = new CreateBatchOptions { PartitionId = "0" }; + var batch = new EventDataBatch(new MockTransportBatch(), "ns", "eh", batchOptions.ToSendOptions()); + var transportProducer = new ObservableTransportProducerMock(); + var eventHubConnection = new MockConnection(() => transportProducer); + var retryPolicy = new EventHubProducerClientOptions().RetryOptions.ToRetryPolicy(); + var mockTransportProducerPool = new MockTransportProducerPool(new ObservableTransportProducerMock(), eventHubConnection, retryPolicy); + var mockPooledProducer = mockTransportProducerPool.GetPooledProducer(batchOptions.PartitionId) as MockPooledProducer; + var producerClient = new EventHubProducerClient(eventHubConnection, transportProducer, mockTransportProducerPool); + + await producerClient.SendAsync(batch); + + Assert.That(mockTransportProducerPool.GetPooledProducerWasCalled, Is.True, $"The method { nameof(TransportProducerPool.GetPooledProducer) } should have been called (for a batch)."); + } + + /// + /// Verifies that a is called + /// to signal the usage of a has ended. + /// + /// + /// + /// Users of a , such as , + /// can signal their usage of a has ended + /// by invoking . + /// + /// + [Test] + public async Task EventHubProducerClientShouldCloseAProducer() + { + var batchOptions = new CreateBatchOptions { PartitionId = "0" }; + var batch = new EventDataBatch(new MockTransportBatch(), "ns", "eh", batchOptions.ToSendOptions()); + var transportProducer = new ObservableTransportProducerMock(); + var eventHubConnection = new MockConnection(() => transportProducer); + var retryPolicy = new EventHubProducerClientOptions().RetryOptions.ToRetryPolicy(); + var mockTransportProducerPool = new MockTransportProducerPool(new ObservableTransportProducerMock(), eventHubConnection, retryPolicy); + var mockPooledProducer = mockTransportProducerPool.GetPooledProducer(batchOptions.PartitionId) as MockPooledProducer; + var producerClient = new EventHubProducerClient(eventHubConnection, transportProducer, mockTransportProducerPool); + + await producerClient.SendAsync(batch); + + Assert.That(mockPooledProducer.WasClosed, Is.True, $"A { nameof(TransportProducerPool.PooledProducer) } should be closed when disposed."); + } + + /// + /// Verifies that a is called + /// to signal the usage of a has ended. + /// + /// + /// + /// Users of a , such as , + /// can signal their usage of a has ended + /// by invoking . + /// + /// + [Test] + public async Task EventHubProducerClientShouldCloseAProducerWithABatch() + { + var options = new SendEventOptions { PartitionId = "0" }; + var transportProducer = new ObservableTransportProducerMock(); + var eventHubConnection = new MockConnection(() => transportProducer); + var retryPolicy = new EventHubProducerClientOptions().RetryOptions.ToRetryPolicy(); + var mockTransportProducerPool = new MockTransportProducerPool(new ObservableTransportProducerMock(), eventHubConnection, retryPolicy); + var mockPooledProducer = mockTransportProducerPool.GetPooledProducer(options.PartitionId) as MockPooledProducer; + var producerClient = new EventHubProducerClient(eventHubConnection, transportProducer, mockTransportProducerPool); + var events = new EventData[0]; + + await producerClient.SendAsync(events, options); + + Assert.That(mockPooledProducer.WasClosed, Is.True, $"A { nameof(TransportProducerPool.PooledProducer) } should be closed when disposed (for a batch)."); + } + + /// + /// Verifies that an retries sending an + /// event if a partition producer returned by the pool was closed due to a race condition between an + /// AMQP operation and a request to close a client. + /// + /// + [Test] + public void EventHubProducerClientShouldRetrySending() + { + var options = new SendEventOptions { PartitionId = "0" }; + var transportProducer = new Mock(); + var eventHubConnection = new MockConnection(() => transportProducer.Object); + var retryPolicy = new EventHubProducerClientOptions().RetryOptions.ToRetryPolicy(); + var mockTransportProducerPool = new MockTransportProducerPool(transportProducer.Object, eventHubConnection, retryPolicy); + var mockPooledProducer = mockTransportProducerPool.GetPooledProducer(options.PartitionId) as MockPooledProducer; + var producerClient = new EventHubProducerClient(eventHubConnection, transportProducer.Object, mockTransportProducerPool); + var events = new EventData[0]; + + transportProducer + .Setup(transportProducer => transportProducer.SendAsync(It.IsAny>(), + It.IsAny(), + It.IsAny())) + .Throws(new EventHubsException(false, "test", EventHubsException.FailureReason.ClientClosed)); + + transportProducer + .SetupGet(transportProducer => transportProducer.IsClosed) + .Returns(true); + + Assert.That(async () => await producerClient.SendAsync(events, options), Throws.InstanceOf().And.Property(nameof(EventHubsException.Reason)).EqualTo(EventHubsException.FailureReason.ClientClosed)); + + transportProducer.Verify(t => t.SendAsync(It.IsAny>(), + It.IsAny(), + It.IsAny()), + Times.Exactly(EventHubProducerClient.MaximumCreateProducerAttempts), + $"The retry logic should have called { nameof(TransportProducer.SendAsync) } { EventHubProducerClient.MaximumCreateProducerAttempts } times."); + } + + /// + /// Verifies that an retries sending an + /// event if a partition producer returned by the pool was closed due to a race condition between an + /// AMQP operation and a request to close a client. + /// + /// + [Test] + public void EventHubProducerClientShouldRetrySendingWithABatch() + { + var batchOptions = new CreateBatchOptions { PartitionId = "0" }; + var batch = new EventDataBatch(new MockTransportBatch(), "ns", "eh", batchOptions.ToSendOptions()); + var transportProducer = new Mock(); + var eventHubConnection = new MockConnection(() => transportProducer.Object); + var retryPolicy = new EventHubProducerClientOptions().RetryOptions.ToRetryPolicy(); + var mockTransportProducerPool = new MockTransportProducerPool(transportProducer.Object, eventHubConnection, retryPolicy); + var producerClient = new EventHubProducerClient(eventHubConnection, transportProducer.Object, mockTransportProducerPool); + + transportProducer + .Setup(transportProducer => transportProducer.SendAsync(It.IsAny(), + It.IsAny())) + .Throws(new EventHubsException(false, "test", EventHubsException.FailureReason.ClientClosed)); + + transportProducer + .SetupGet(transportProducer => transportProducer.IsClosed) + .Returns(true); + + Assert.That(async () => await producerClient.SendAsync(batch), Throws.InstanceOf().And.Property(nameof(EventHubsException.Reason)).EqualTo(EventHubsException.FailureReason.ClientClosed)); + + transportProducer.Verify(t => t.SendAsync(It.IsAny(), + It.IsAny()), + Times.Exactly(EventHubProducerClient.MaximumCreateProducerAttempts), + $"The retry logic should have called { nameof(TransportProducer.SendAsync) } { EventHubProducerClient.MaximumCreateProducerAttempts } times (for a batch)."); + } + + /// + /// Verifies that the retry logic does not loop endlessly. + /// + /// + [Test] + public void RetryLogicEnds() + { + var options = new SendEventOptions { PartitionId = "0" }; + var events = new EventData[0]; + var transportProducer = new Mock(); + var eventHubConnection = new MockConnection(() => transportProducer.Object); + var retryPolicy = new EventHubProducerClientOptions().RetryOptions.ToRetryPolicy(); + var mockTransportProducerPool = new MockTransportProducerPool(new ObservableTransportProducerMock(), eventHubConnection, retryPolicy); + var mockPooledProducer = mockTransportProducerPool.GetPooledProducer(options.PartitionId) as MockPooledProducer; + var producerClient = new EventHubProducerClient(eventHubConnection, transportProducer.Object, mockTransportProducerPool); + var numberOfCalls = 0; + + transportProducer + .Setup(transportProducer => transportProducer.SendAsync(It.IsAny>(), + It.IsAny(), + It.IsAny())) + .Callback(() => + { + if (++numberOfCalls < EventHubProducerClient.MaximumCreateProducerAttempts) + { + throw new EventHubsException(false, string.Empty, EventHubsException.FailureReason.ClientClosed); + } + }) + .Returns(Task.CompletedTask); + + Assert.That(async () => await producerClient.SendAsync(events, options), Throws.Nothing, $"The retry logic should not run endlessly."); + } + + /// + /// Verifies that the retry logic does not loop endlessly for batches. + /// + /// + [Test] + public void RetryLogicEndsWithABatch() + { + var batchOptions = new CreateBatchOptions { PartitionId = "0" }; + var batch = new EventDataBatch(new MockTransportBatch(), "ns", "eh", batchOptions.ToSendOptions()); + var transportProducer = new Mock(); + var eventHubConnection = new MockConnection(() => transportProducer.Object); + var retryPolicy = new EventHubProducerClientOptions().RetryOptions.ToRetryPolicy(); + var mockTransportProducerPool = new MockTransportProducerPool(new ObservableTransportProducerMock(), eventHubConnection, retryPolicy); + var mockPooledProducer = mockTransportProducerPool.GetPooledProducer(batchOptions.PartitionId) as MockPooledProducer; + var producerClient = new EventHubProducerClient(eventHubConnection, transportProducer.Object, mockTransportProducerPool); + var numberOfCalls = 0; + + transportProducer + .Setup(transportProducer => transportProducer.SendAsync(It.IsAny(), + It.IsAny())) + .Callback(() => + { + if (++numberOfCalls < EventHubProducerClient.MaximumCreateProducerAttempts) + { + throw new EventHubsException(false, string.Empty, EventHubsException.FailureReason.ClientClosed); + } + }) + .Returns(Task.CompletedTask); + + Assert.That(async () => await producerClient.SendAsync(batch), Throws.Nothing, $"The retry logic should not run endlessly (for a batch)."); + } + + /// + /// Retry logic does not kick-in for the main-stream scenario (i.e. partition id is null). + /// + /// + [Test] + public void RetryLogicDoesNotStartWhenPartitionIdIsNull() + { + var options = new SendEventOptions { PartitionId = "0" }; + var events = new EventData[0]; + var transportProducer = new Mock(); + var eventHubConnection = new MockConnection(() => transportProducer.Object); + var retryPolicy = new EventHubProducerClientOptions().RetryOptions.ToRetryPolicy(); + var mockTransportProducerPool = new MockTransportProducerPool(transportProducer.Object, eventHubConnection, retryPolicy); + var mockPooledProducer = mockTransportProducerPool.GetPooledProducer(options.PartitionId) as MockPooledProducer; + var producerClient = new EventHubProducerClient(eventHubConnection, transportProducer.Object, mockTransportProducerPool); + + transportProducer + .Setup(transportProducer => transportProducer.SendAsync(It.IsAny>(), + It.IsAny(), + It.IsAny())) + .Throws(new EventHubsException(false, "test", EventHubsException.FailureReason.ClientClosed)); + + Assert.That(async () => await producerClient.SendAsync(events, options), Throws.InstanceOf().And.Property(nameof(EventHubsException.Reason)).EqualTo(EventHubsException.FailureReason.ClientClosed)); + + transportProducer.Verify(t => t.SendAsync(It.IsAny>(), + It.IsAny(), + It.IsAny()), + Times.Once, + $"The retry logic should not start when the partition id is null."); + } + + /// + /// Retry logic does not kick-in for the main-stream scenario (i.e. partition id is null). + /// + /// + [Test] + public void RetryLogicDoesNotStartWhenPartitionIdIsNullWithABatch() + { + var batchOptions = new CreateBatchOptions { PartitionId = "0" }; + var batch = new EventDataBatch(new MockTransportBatch(), "ns", "eh", batchOptions.ToSendOptions()); + var transportProducer = new Mock(); + var eventHubConnection = new MockConnection(() => transportProducer.Object); + var retryPolicy = new EventHubProducerClientOptions().RetryOptions.ToRetryPolicy(); + var mockTransportProducerPool = new MockTransportProducerPool(transportProducer.Object, eventHubConnection, retryPolicy); + var mockPooledProducer = mockTransportProducerPool.GetPooledProducer(batchOptions.PartitionId) as MockPooledProducer; + var producerClient = new EventHubProducerClient(eventHubConnection, transportProducer.Object, mockTransportProducerPool); + + transportProducer + .Setup(transportProducer => transportProducer.SendAsync(It.IsAny(), + It.IsAny())) + .Throws(new EventHubsException(false, "test", EventHubsException.FailureReason.ClientClosed)); + + Assert.That(async () => await producerClient.SendAsync(batch), Throws.InstanceOf().And.Property(nameof(EventHubsException.Reason)).EqualTo(EventHubsException.FailureReason.ClientClosed)); + + transportProducer.Verify(t => t.SendAsync(It.IsAny(), + It.IsAny()), + Times.Once, + $"The retry logic should not start when the partition id is null (for a batch)."); + } + + /// + /// Retry logic starts only when the owned by an is open. + /// + /// + [Test] + public async Task RetryLogicDoesNotWorkForClosedConnections() + { + var options = new SendEventOptions { PartitionId = "0" }; + var events = new EventData[0]; + var transportProducer = new Mock(); + var eventHubConnection = new MockConnection(() => transportProducer.Object); + var retryPolicy = new EventHubProducerClientOptions().RetryOptions.ToRetryPolicy(); + var mockTransportProducerPool = new MockTransportProducerPool(transportProducer.Object, eventHubConnection, retryPolicy); + var mockPooledProducer = mockTransportProducerPool.GetPooledProducer(options.PartitionId) as MockPooledProducer; + var producerClient = new EventHubProducerClient(eventHubConnection, transportProducer.Object, mockTransportProducerPool); + + transportProducer + .Setup(transportProducer => transportProducer.SendAsync(It.IsAny>(), + It.IsAny(), + It.IsAny())) + .Throws(new EventHubsException(false, "test", EventHubsException.FailureReason.ClientClosed)); + + transportProducer + .SetupGet(transportProducer => transportProducer.IsClosed) + .Returns(true); + + await eventHubConnection.CloseAsync(CancellationToken.None); + + Assert.That(async () => await producerClient.SendAsync(events, options), Throws.InstanceOf().And.Property(nameof(EventHubsException.Reason)).EqualTo(EventHubsException.FailureReason.ClientClosed)); + + transportProducer.Verify(t => t.SendAsync(It.IsAny>(), + It.IsAny(), + It.IsAny()), + Times.Once, + $"The retry logic should not start when the { nameof(EventHubConnection) } was closed."); + } + + /// + /// Retry logic starts only when the owned by an is open. + /// + /// + [Test] + public async Task RetryLogicDoesNotWorkForClosedConnectionsWithABatch() + { + var batchOptions = new CreateBatchOptions { PartitionId = "0" }; + var batch = new EventDataBatch(new MockTransportBatch(), "ns", "eh", batchOptions.ToSendOptions()); + var transportProducer = new Mock(); + var eventHubConnection = new MockConnection(() => transportProducer.Object); + var retryPolicy = new EventHubProducerClientOptions().RetryOptions.ToRetryPolicy(); + var mockTransportProducerPool = new MockTransportProducerPool(transportProducer.Object, eventHubConnection, retryPolicy); + var mockPooledProducer = mockTransportProducerPool.GetPooledProducer(batchOptions.PartitionId) as MockPooledProducer; + var producerClient = new EventHubProducerClient(eventHubConnection, transportProducer.Object, mockTransportProducerPool); + + transportProducer + .Setup(transportProducer => transportProducer.SendAsync(It.IsAny(), + It.IsAny())) + .Throws(new EventHubsException(false, "test", EventHubsException.FailureReason.ClientClosed)); + + transportProducer + .SetupGet(transportProducer => transportProducer.IsClosed) + .Returns(true); + + await eventHubConnection.CloseAsync(CancellationToken.None); + + Assert.That(async () => await producerClient.SendAsync(batch), Throws.InstanceOf().And.Property(nameof(EventHubsException.Reason)).EqualTo(EventHubsException.FailureReason.ClientClosed)); + + transportProducer.Verify(t => t.SendAsync(It.IsAny(), + It.IsAny()), + Times.Once, + $"The retry logic should not start when the { nameof(EventHubConnection) } was closed (for a batch)."); + } + + /// + /// Retry logic starts only when the an is open. + /// + /// + [Test] + public void RetryLogicDoesNotWorkForClosedEventHubProducerClients() + { + var options = new SendEventOptions { PartitionId = "0" }; + var events = new EventData[0]; + var transportProducer = new Mock(); + var eventHubConnection = new MockConnection(() => transportProducer.Object); + var retryPolicy = new EventHubProducerClientOptions().RetryOptions.ToRetryPolicy(); + var mockTransportProducerPool = new MockTransportProducerPool(transportProducer.Object, eventHubConnection, retryPolicy); + var mockPooledProducer = mockTransportProducerPool.GetPooledProducer(options.PartitionId) as MockPooledProducer; + var producerClient = new EventHubProducerClient(eventHubConnection, transportProducer.Object, mockTransportProducerPool); + + transportProducer + .Setup(transportProducer => transportProducer.SendAsync(It.IsAny>(), + It.IsAny(), + It.IsAny())) + .Throws(new EventHubsException(false, "test", EventHubsException.FailureReason.ClientClosed)); + + transportProducer + .SetupGet(transportProducer => transportProducer.IsClosed) + .Returns(true); + + SetIsClosed(producerClient, true); + + Assert.That(async () => await producerClient.SendAsync(events, options), Throws.InstanceOf().And.Property(nameof(EventHubsException.Reason)).EqualTo(EventHubsException.FailureReason.ClientClosed)); + + transportProducer.Verify(t => t.SendAsync(It.IsAny>(), + It.IsAny(), + It.IsAny()), + Times.Once, + $"The retry logic should not start when a { nameof(TransportProducer) } was closed."); + } + + /// + /// Retry logic starts only when the an is open. + /// + /// + [Test] + public void RetryLogicDoesNotWorkForClosedEventHubProducerClientsWithABatch() + { + var batchOptions = new CreateBatchOptions { PartitionId = "0" }; + var batch = new EventDataBatch(new MockTransportBatch(), "ns", "eh", batchOptions.ToSendOptions()); + var transportProducer = new Mock(); + var eventHubConnection = new MockConnection(() => transportProducer.Object); + var retryPolicy = new EventHubProducerClientOptions().RetryOptions.ToRetryPolicy(); + var mockTransportProducerPool = new MockTransportProducerPool(transportProducer.Object, eventHubConnection, retryPolicy); + var mockPooledProducer = mockTransportProducerPool.GetPooledProducer(batchOptions.PartitionId) as MockPooledProducer; + var producerClient = new EventHubProducerClient(eventHubConnection, transportProducer.Object, mockTransportProducerPool); + + transportProducer + .Setup(transportProducer => transportProducer.SendAsync(It.IsAny(), + It.IsAny())) + .Throws(new EventHubsException(false, "test", EventHubsException.FailureReason.ClientClosed)); + + transportProducer + .SetupGet(transportProducer => transportProducer.IsClosed) + .Returns(true); + + SetIsClosed(producerClient, true); + + Assert.That(async () => await producerClient.SendAsync(batch), Throws.InstanceOf().And.Property(nameof(EventHubsException.Reason)).EqualTo(EventHubsException.FailureReason.ClientClosed)); + + transportProducer.Verify(t => t.SendAsync(It.IsAny(), + It.IsAny()), + Times.Once, + $"The retry logic should not start when a { nameof(TransportProducer) } was closed (for a batch)."); + } + + /// + /// Retry logic would not start after a cancellation is requested. + /// + /// + [Test] + public void RetryLogicShouldNotStartWhenCancellationTriggered() + { + var options = new SendEventOptions { PartitionId = "0" }; + var events = new EventData[0]; + var transportProducer = new Mock(); + var eventHubConnection = new MockConnection(() => transportProducer.Object); + var retryPolicy = new EventHubProducerClientOptions().RetryOptions.ToRetryPolicy(); + var mockTransportProducerPool = new MockTransportProducerPool(transportProducer.Object, eventHubConnection, retryPolicy); + var producerClient = new EventHubProducerClient(eventHubConnection, transportProducer.Object, mockTransportProducerPool); + var cancellationTokenSource = new CancellationTokenSource(); + + cancellationTokenSource.Cancel(); + + Assert.That(async () => await producerClient.SendAsync(events, options, cancellationTokenSource.Token), Throws.InstanceOf()); + + transportProducer.Verify(t => t.SendAsync(It.IsAny>(), + It.IsAny(), + It.IsAny()), + Times.Never, + "The retry logic should not start when cancellation is triggered."); + } + + /// + /// Retry logic would not start after a cancellation is requested. + /// + /// + [Test] + public void RetryLogicShouldNotStartWhenCancellationTriggeredWithABatch() + { + var batchOptions = new CreateBatchOptions { PartitionId = "0" }; + var batch = new EventDataBatch(new MockTransportBatch(), "ns", "eh", batchOptions.ToSendOptions()); + var transportProducer = new Mock(); + var eventHubConnection = new MockConnection(() => transportProducer.Object); + var retryPolicy = new EventHubProducerClientOptions().RetryOptions.ToRetryPolicy(); + var mockTransportProducerPool = new MockTransportProducerPool(transportProducer.Object, eventHubConnection, retryPolicy); + var mockPooledProducer = mockTransportProducerPool.GetPooledProducer(batchOptions.PartitionId) as MockPooledProducer; + var producerClient = new EventHubProducerClient(eventHubConnection, transportProducer.Object, mockTransportProducerPool); + var cancellationTokenSource = new CancellationTokenSource(); + + cancellationTokenSource.Cancel(); + + Assert.That(async () => await producerClient.SendAsync(batch, cancellationTokenSource.Token), Throws.InstanceOf()); + + transportProducer.Verify(t => t.SendAsync(It.IsAny(), + It.IsAny()), + Times.Never, + "The retry logic should not start when cancellation is triggered (for a batch)."); + } + + /// + /// Retry logic will end if a is thrown + /// by and will rethrow the + /// exception. + /// + /// + [Test] + public void RetryLogicDetectsAnEmbeddedAmqpErrorForOperationCanceled() + { + var options = new SendEventOptions { PartitionId = "0" }; + var events = new EventData[0]; + var transportProducer = new Mock(); + var eventHubConnection = new MockConnection(() => transportProducer.Object); + var retryPolicy = new EventHubProducerClientOptions().RetryOptions.ToRetryPolicy(); + var mockTransportProducerPool = new MockTransportProducerPool(transportProducer.Object, eventHubConnection, retryPolicy); + var producerClient = new EventHubProducerClient(eventHubConnection, transportProducer.Object, mockTransportProducerPool); + var embeddedException = new OperationCanceledException("", new ArgumentNullException()); + var cancellationTokenSource = new CancellationTokenSource(); + + transportProducer + .Setup(transportProducer => transportProducer.SendAsync(events, options, cancellationTokenSource.Token)) + .Throws(embeddedException); + + Assert.That(async () => await producerClient.SendAsync(events, options, cancellationTokenSource.Token), Throws.InstanceOf()); + } + + /// + /// Retry logic will end if a is thrown + /// by and will rethrow the + /// exception. + /// + /// + [Test] + public void RetryLogicDetectsAnEmbeddedAmqpErrorForOperationCanceledWithABatch() + { + var batchOptions = new CreateBatchOptions { PartitionId = "0" }; + var batch = new EventDataBatch(new MockTransportBatch(), "ns", "eh", batchOptions.ToSendOptions()); + var transportProducer = new Mock(); + var eventHubConnection = new MockConnection(() => transportProducer.Object); + var retryPolicy = new EventHubProducerClientOptions().RetryOptions.ToRetryPolicy(); + var mockTransportProducerPool = new MockTransportProducerPool(transportProducer.Object, eventHubConnection, retryPolicy); + var mockPooledProducer = mockTransportProducerPool.GetPooledProducer(batchOptions.PartitionId) as MockPooledProducer; + var producerClient = new EventHubProducerClient(eventHubConnection, transportProducer.Object, mockTransportProducerPool); + var embeddedException = new OperationCanceledException("", new ArgumentNullException()); + var cancellationTokenSource = new CancellationTokenSource(); + + transportProducer + .Setup(transportProducer => transportProducer.SendAsync(batch, cancellationTokenSource.Token)) + .Throws(embeddedException); + + Assert.That(async () => await producerClient.SendAsync(batch, cancellationTokenSource.Token), Throws.InstanceOf()); + } + /// /// Retrieves the Connection for the producer using its private accessor. /// @@ -705,6 +1255,20 @@ private static EventHubsRetryPolicy GetRetryPolicy(EventHubProducerClient produc .GetProperty("RetryPolicy", BindingFlags.Instance | BindingFlags.NonPublic) .GetValue(producer); + /// + /// Sets property using its protected accessor. + /// + /// + /// The that should be set to closed. + /// The value for the property. + /// + private static void SetIsClosed(EventHubProducerClient producer, + bool isClosed) => + typeof(EventHubProducerClient) + .GetProperty("IsClosed") + .GetSetMethod(true) + .Invoke(producer, new object[] { isClosed }); + /// /// Allows for observation of operations performed by the producer for testing purposes. /// @@ -757,6 +1321,7 @@ private class MockConnection : EventHubConnection public EventHubsRetryPolicy GetPartitionIdsInvokedWith = null; public EventHubsRetryPolicy GetPartitionPropertiesInvokedWith = null; public Func TransportProducerFactory = () => Mock.Of(); + public Mock InnerClientMock = null; public bool WasClosed = false; @@ -777,7 +1342,7 @@ public MockConnection(Func transportProducerFactory) : this(t } internal override Task GetPropertiesAsync(EventHubsRetryPolicy retryPolicy, - CancellationToken cancellationToken = default) + CancellationToken cancellationToken = default) { GetPropertiesInvokedWith = retryPolicy; return Task.FromResult(new EventHubProperties(EventHubName, DateTimeOffset.Parse("2015-10-27T00:00:00Z"), new string[] { "0", "1" })); @@ -801,15 +1366,25 @@ internal override Task GetPartitionPropertiesAsync(string p internal override TransportProducer CreateTransportProducer(string partitionId, EventHubsRetryPolicy retryPolicy) => TransportProducerFactory(); - internal override TransportClient CreateTransportClient(string fullyQualifiedNamespace, string eventHubName, EventHubTokenCredential credential, EventHubConnectionOptions options) + internal override TransportClient CreateTransportClient(string fullyQualifiedNamespace, + string eventHubName, EventHubTokenCredential credential, + EventHubConnectionOptions options) { - var client = new Mock(); + InnerClientMock = new Mock(); - client + InnerClientMock .Setup(client => client.ServiceEndpoint) .Returns(new Uri($"amgp://{ fullyQualifiedNamespace }.com/{ eventHubName }")); - return client.Object; + return InnerClientMock.Object; + } + + public override Task CloseAsync(CancellationToken cancellationToken = default) + { + InnerClientMock.Setup(client => client.IsClosed) + .Returns(true); + + return Task.CompletedTask; } } @@ -829,5 +1404,53 @@ private class MockTransportBatch : TransportEventBatch public override void Clear() => throw new NotImplementedException(); public override void Dispose() => throw new NotImplementedException(); } + + /// + /// Allows for observation of operations performed by the producer for testing purposes. + /// + /// + private class MockPooledProducer : TransportProducerPool.PooledProducer + { + public bool WasClosed { get; set; } = false; + + public MockPooledProducer(TransportProducer transportProducer): base(transportProducer, (_) => Task.CompletedTask) + { + } + + public override ValueTask DisposeAsync() + { + WasClosed = true; + + return new ValueTask(Task.CompletedTask); + } + } + + /// + /// Allows for observation of operations performed by the producer for testing purposes. + /// + /// + private class MockTransportProducerPool : TransportProducerPool + { + public bool GetPooledProducerWasCalled { get; set; } + + public MockPooledProducer MockPooledProducer { get; } + + public MockTransportProducerPool(TransportProducer transportProducer, + EventHubConnection connection, + EventHubsRetryPolicy retryPolicy, + ConcurrentDictionary pool = default, + TimeSpan? expirationInterval = default): base(connection, retryPolicy, pool, expirationInterval) + { + MockPooledProducer = new MockPooledProducer(transportProducer); + } + + public override PooledProducer GetPooledProducer(string partitionId, + TimeSpan? removeAfterDuration = default) + { + GetPooledProducerWasCalled = true; + + return MockPooledProducer; + } + } } } diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/tests/Producer/TransportProducerPoolTests.cs b/sdk/eventhub/Azure.Messaging.EventHubs/tests/Producer/TransportProducerPoolTests.cs new file mode 100644 index 0000000000000..b1b14e95e073f --- /dev/null +++ b/sdk/eventhub/Azure.Messaging.EventHubs/tests/Producer/TransportProducerPoolTests.cs @@ -0,0 +1,390 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Reflection; +using System.Threading; +using System.Threading.Tasks; +using Azure.Core; +using Azure.Messaging.EventHubs.Authorization; +using Azure.Messaging.EventHubs.Core; +using Azure.Messaging.EventHubs.Producer; +using Moq; +using NUnit.Framework; + +namespace Azure.Messaging.EventHubs.Tests +{ + /// + /// The suite of tests for the class. + /// + /// + [TestFixture] + public class TransportProducerPoolTests + { + /// + /// The pool periodically removes and closes expired items. + /// + /// + [Test] + public void TransportProducerPoolRemovesExpiredItems() + { + var transportProducer = new ObservableTransportProducerMock(); + var connection = new MockConnection(() => transportProducer); + var retryPolicy = new EventHubProducerClientOptions().RetryOptions.ToRetryPolicy(); + DateTimeOffset oneMinuteAgo = DateTimeOffset.UtcNow.Subtract(TimeSpan.FromMinutes(1)); + var startingPool = new ConcurrentDictionary + { + // An expired item in the pool + ["0"] = new TransportProducerPool.PoolItem("0", transportProducer, removeAfter: oneMinuteAgo), + ["1"] = new TransportProducerPool.PoolItem("0", transportProducer), + ["2"] = new TransportProducerPool.PoolItem("0", transportProducer), + }; + TransportProducerPool transportProducerPool = new TransportProducerPool(connection, retryPolicy, startingPool); + + GetExpirationCallBack(transportProducerPool).Invoke(null); + + Assert.That(startingPool.TryGetValue("0", out _), Is.False, "PerformExpiration should remove an expired producer from the pool."); + Assert.That(transportProducer.CloseCallCount, Is.EqualTo(1), "PerformExpiration should close an expired producer."); + Assert.That(startingPool.TryGetValue("1", out _), Is.True, "PerformExpiration should not remove valid producers."); + Assert.That(startingPool.TryGetValue("2", out _), Is.True, "PerformExpiration should not remove valid producers."); + } + + /// + /// When a is requested + /// its will be increased. + /// + /// + [Test] + public void TransportProducerPoolRefreshesAccessedItems() + { + DateTimeOffset oneMinuteAgo = DateTimeOffset.UtcNow.Subtract(TimeSpan.FromMinutes(1)); + var transportProducer = new ObservableTransportProducerMock(); + var connection = new MockConnection(() => transportProducer); + var retryPolicy = new EventHubProducerClientOptions().RetryOptions.ToRetryPolicy(); + var startingPool = new ConcurrentDictionary + { + // An expired item in the pool + ["0"] = new TransportProducerPool.PoolItem("0", transportProducer, removeAfter: oneMinuteAgo) + }; + TransportProducerPool transportProducerPool = new TransportProducerPool(connection, retryPolicy, startingPool); + + // This call should refresh the timespan associated to the item + _ = transportProducerPool.GetPooledProducer("0"); + + // The expiration call back should not remove the item + GetExpirationCallBack(transportProducerPool).Invoke(null); + + Assert.That(startingPool.TryGetValue("0", out _), Is.True, "The item in the pool should be refreshed and not have been removed."); + } + + /// + /// When a is disposed, the + /// of the associated is increased. + /// + /// + [Test] + public async Task PoolItemsAreRefreshedOnDisposal() + { + var transportProducer = new ObservableTransportProducerMock(); + var startingPool = new ConcurrentDictionary + { + ["0"] = new TransportProducerPool.PoolItem("0", transportProducer) + }; + var connection = new MockConnection(() => transportProducer); + var retryPolicy = new EventHubProducerClientOptions().RetryOptions.ToRetryPolicy(); + TransportProducerPool transportProducerPool = new TransportProducerPool(connection, retryPolicy); + var expectedTime = DateTimeOffset.UtcNow.AddMinutes(10); + + await using var pooledProducer = transportProducerPool.GetPooledProducer("0"); + + // This call should refresh the timespan associated to an item in the pool + await pooledProducer.DisposeAsync(); + + Assert.That(startingPool["0"].RemoveAfter, Is.InRange(expectedTime.AddMinutes(-1), expectedTime.AddMinutes(1)), $"The remove after of a pool item should be extended."); + } + + /// + /// When a partition producer is requested its expiration time will be increased. + /// + /// + [Test] + public async Task TransportProducerPoolTracksAProducerUsage() + { + DateTimeOffset oneMinuteAgo = DateTimeOffset.UtcNow.Subtract(TimeSpan.FromMinutes(1)); + var transportProducer = new ObservableTransportProducerMock(); + var connection = new MockConnection(() => transportProducer); + var retryPolicy = new EventHubProducerClientOptions().RetryOptions.ToRetryPolicy(); + var startingPool = new ConcurrentDictionary + { + // An expired item in the pool + ["0"] = new TransportProducerPool.PoolItem("0", transportProducer, removeAfter: oneMinuteAgo) + }; + TransportProducerPool transportProducerPool = new TransportProducerPool(connection, retryPolicy, startingPool); + + var pooledProducer = transportProducerPool.GetPooledProducer("0"); + startingPool.TryGetValue("0", out var poolItem); + + await using (pooledProducer) + { + Assert.That(poolItem.ActiveInstances.Count, Is.EqualTo(1), "The usage of a transport producer should be tracked."); + } + + Assert.That(poolItem.ActiveInstances.Count, Is.EqualTo(0), "After usage an active instance should be removed from the pool."); + } + + /// + /// It is possible to configure how long a should sit in memory. + /// + /// + [Test] + public async Task TransportProducerPoolAllowsConfiguringRemoveAfter() + { + var transportProducer = new ObservableTransportProducerMock(); + var connection = new MockConnection(() => transportProducer); + var retryPolicy = new EventHubProducerClientOptions().RetryOptions.ToRetryPolicy(); + var startingPool = new ConcurrentDictionary + { + ["0"] = new TransportProducerPool.PoolItem("0", transportProducer) + }; + TransportProducerPool transportProducerPool = new TransportProducerPool(connection, retryPolicy, startingPool); + + var pooledProducer = transportProducerPool.GetPooledProducer("0", TimeSpan.FromMinutes(-1)); + + await using (var _ = pooledProducer.ConfigureAwait(false)) + { + }; + + GetExpirationCallBack(transportProducerPool).Invoke(null); + + Assert.That(transportProducer.CloseCallCount, Is.EqualTo(1)); + } + + /// + /// The returns the + /// matching the right partition id. + /// + /// + [Test] + [TestCase(null)] + [TestCase("0")] + public void TransportProducerPoolAllowsTakingTheRightTransportProducer(string partitionId) + { + var transportProducer = new ObservableTransportProducerMock(); + var partitionProducer = new ObservableTransportProducerMock(partitionId); + var connection = new MockConnection(() => partitionProducer); + var retryPolicy = new EventHubProducerClientOptions().RetryOptions.ToRetryPolicy(); + var startingPool = new ConcurrentDictionary + { + ["0"] = new TransportProducerPool.PoolItem("0", partitionProducer) + }; + TransportProducerPool transportProducerPool = new TransportProducerPool(connection, retryPolicy, eventHubProducer: transportProducer); + + var returnedProducer = transportProducerPool.GetPooledProducer(partitionId).TransportProducer as ObservableTransportProducerMock; + + Assert.That(returnedProducer.PartitionId, Is.EqualTo(partitionId)); + } + + /// + /// Verifies functionality of the + /// method. + /// + /// + [Test] + public void CloseAsyncSurfacesExceptionsForTransportProducer() + { + var transportProducer = new Mock(); + var partitionProducer = new Mock(); + var connection = new MockConnection(() => partitionProducer.Object); + var retryPolicy = new EventHubProducerClientOptions().RetryOptions.ToRetryPolicy(); + var startingPool = new ConcurrentDictionary + { + ["0"] = new TransportProducerPool.PoolItem("0", partitionProducer.Object) + }; + TransportProducerPool transportProducerPool = new TransportProducerPool(connection, retryPolicy, eventHubProducer: transportProducer.Object); + + transportProducer + .Setup(producer => producer.CloseAsync(It.IsAny())) + .Returns(Task.FromException(new InvalidCastException())); + + var _ = transportProducerPool.GetPooledProducer(null).TransportProducer as ObservableTransportProducerMock; + + Assert.That(async () => await transportProducerPool.CloseAsync(), Throws.InstanceOf()); + } + + /// + /// Verifies functionality of the + /// method. + /// + /// + [Test] + public void CloseAsyncSurfacesExceptionsForPartitionTransportProducer() + { + var transportProducer = new Mock(); + var partitionProducer = new Mock(); + var connection = new MockConnection(() => partitionProducer.Object); + var retryPolicy = new EventHubProducerClientOptions().RetryOptions.ToRetryPolicy(); + var startingPool = new ConcurrentDictionary + { + ["0"] = new TransportProducerPool.PoolItem("0", partitionProducer.Object) + }; + TransportProducerPool transportProducerPool = new TransportProducerPool(connection, retryPolicy, eventHubProducer: transportProducer.Object); + + partitionProducer + .Setup(producer => producer.CloseAsync(It.IsAny())) + .Returns(Task.FromException(new InvalidCastException())); + + var _ = transportProducerPool.GetPooledProducer("0"); + + Assert.That(async () => await transportProducerPool.CloseAsync(), Throws.InstanceOf()); + } + + /// + /// Gets the routine responsible of finding expired producers. + /// + /// + private static TimerCallback GetExpirationCallBack(TransportProducerPool pool) => + (TimerCallback) + typeof(TransportProducerPool) + .GetMethod("CreateExpirationTimerCallback", BindingFlags.NonPublic | BindingFlags.Instance) + .Invoke(pool, null); + + /// + /// Serves as a non-functional connection for testing producer functionality. + /// + /// + private class MockConnection : EventHubConnection + { + public EventHubsRetryPolicy GetPropertiesInvokedWith = null; + public EventHubsRetryPolicy GetPartitionIdsInvokedWith = null; + public EventHubsRetryPolicy GetPartitionPropertiesInvokedWith = null; + public Func TransportProducerFactory = () => Mock.Of(); + + public bool WasClosed = false; + + public MockConnection(string namespaceName = "fakeNamespace", + string eventHubName = "fakeEventHub") : base(namespaceName, eventHubName, new Mock(Mock.Of(), "{namespace}.servicebus.windows.net").Object) + { + } + + public MockConnection(Func transportProducerFactory, + string namespaceName, + string eventHubName) : this(namespaceName, eventHubName) + { + TransportProducerFactory = transportProducerFactory; + } + + public MockConnection(Func transportProducerFactory) : this(transportProducerFactory, "fakeNamespace", "fakeEventHub") + { + } + + internal override Task GetPropertiesAsync(EventHubsRetryPolicy retryPolicy, + CancellationToken cancellationToken = default) + { + GetPropertiesInvokedWith = retryPolicy; + return Task.FromResult(new EventHubProperties(EventHubName, DateTimeOffset.Parse("2015-10-27T00:00:00Z"), new string[] { "0", "1" })); + } + + internal async override Task GetPartitionIdsAsync(EventHubsRetryPolicy retryPolicy, + CancellationToken cancellationToken = default) + { + GetPartitionIdsInvokedWith = retryPolicy; + return await base.GetPartitionIdsAsync(retryPolicy, cancellationToken); + } + + internal override Task GetPartitionPropertiesAsync(string partitionId, + EventHubsRetryPolicy retryPolicy, + CancellationToken cancellationToken = default) + { + GetPartitionPropertiesInvokedWith = retryPolicy; + return Task.FromResult(default(PartitionProperties)); + } + + internal override TransportProducer CreateTransportProducer(string partitionId, + EventHubsRetryPolicy retryPolicy) => TransportProducerFactory(); + + internal override TransportClient CreateTransportClient(string fullyQualifiedNamespace, + string eventHubName, + EventHubTokenCredential credential, + EventHubConnectionOptions options) + { + var client = new Mock(); + + client + .Setup(client => client.ServiceEndpoint) + .Returns(new Uri($"amgp://{ fullyQualifiedNamespace }.com/{ eventHubName }")); + + return client.Object; + } + } + + /// + /// Allows for observation of operations performed by the producer for testing purposes. + /// + /// + private class ObservableTransportProducerMock : TransportProducer + { + public int CloseCallCount = 0; + public bool WasCloseCalled = false; + public (IEnumerable, SendEventOptions) SendCalledWith; + public EventDataBatch SendBatchCalledWith; + public CreateBatchOptions CreateBatchCalledWith; + public string PartitionId { get; set; } + + public ObservableTransportProducerMock(string partitionId = default) + { + PartitionId = partitionId; + } + + public override Task SendAsync(IEnumerable events, + SendEventOptions sendOptions, + CancellationToken cancellationToken) + { + SendCalledWith = (events, sendOptions); + return Task.CompletedTask; + } + + public override Task SendAsync(EventDataBatch batch, + CancellationToken cancellationToken) + { + SendBatchCalledWith = batch; + return Task.CompletedTask; + } + + public override ValueTask CreateBatchAsync(CreateBatchOptions options, + CancellationToken cancellationToken) + { + CreateBatchCalledWith = options; + return new ValueTask(Task.FromResult((TransportEventBatch)new MockTransportBatch())); + } + + public override Task CloseAsync(CancellationToken cancellationToken) + { + WasCloseCalled = true; + ++CloseCallCount; + return Task.CompletedTask; + } + } + + /// + /// Serves as a non-functional transport event batch for satisfying the + /// non-null constraints of the created by + /// the producer being tested. + /// + /// + private class MockTransportBatch : TransportEventBatch + { + public override long MaximumSizeInBytes { get; } + public override long SizeInBytes { get; } + public override int Count { get; } + public override bool TryAdd(EventData eventData) => throw new NotImplementedException(); + public override IEnumerable AsEnumerable() => throw new NotImplementedException(); + public override void Dispose() => throw new NotImplementedException(); + + public override void Clear() + { + } + } + } +}