Skip to content

Commit

Permalink
[Event Hubs Client] Track Two: Second Preview (Retry/Timeout)
Browse files Browse the repository at this point in the history
  General
    - Ran formatting over the project, which now impacts the track one code,
      since it is embedded an no longer an external reference.

    - Fixed parallel scope for unit tests, allowing the fixture and tests to
      be recognized as parallelizable.

    - Increased timeout for some tests dealing with larger payloads to counter
      the difference between nightly test runs and local test runs with respect
      to stability.

    - Misc formatting and minor cleanup.

  Retries
    - Removed the exposed retry policies in favor of a set of retry options
      for use with client options.  These were based on the shape of the
      similar options found in Azure.Core in order to keep the API familiar.

    - A new abstract EventHubsRetryPolicy class has been created to serve as
      the contract for custom retry policy implementations.

    - An internal default retry policy based on the new retry options has been
      created to handle the non-custom needs.

    - A wrapper shim for retry policies was created to allow the track two
      retry policies to be used directly with the track one code, rather than
      being mapped to the RetryExponential class.

    - An option for fixed retry has been added to accompany the exponential
      retry that was in place previously.

  Timeouts
    - Operation timeouts have been moved from the associated client options and
      incorporated into the retry options and retry policies.
  • Loading branch information
jsquire committed Jul 17, 2019
1 parent f006e31 commit 8892fdf
Show file tree
Hide file tree
Showing 73 changed files with 2,946 additions and 949 deletions.
12 changes: 12 additions & 0 deletions sdk/core/Azure.Core/src/Pipeline/RetryMode.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,21 @@

namespace Azure.Core.Pipeline
{
/// <summary>
/// The type of approach to apply when calculating the delay
/// between retry attempts.
/// </summary>
public enum RetryMode
{
/// <summary>
/// Retry attempts happen at fixed intervals; each delay is a consistent duration.
/// </summary>
Fixed,

/// <summary>
/// Retry attempts will delay based on a backoff strategy, where each attempt will increase
/// the duration that it waits before retrying.
/// </summary>
Exponential
}
}
13 changes: 9 additions & 4 deletions sdk/core/Azure.Core/src/Pipeline/RetryOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,25 +5,30 @@

namespace Azure.Core.Pipeline
{
/// <summary>
/// The set of options that can be specified to influence how
/// retry attempts are made, and a failure is eligible to be retried.
/// </summary>
public class RetryOptions
{
/// <summary>
/// Gets or sets the maximum number of retry attempts before giving up.
/// The maximum number of retry attempts before giving up.
/// </summary>
public int MaxRetries { get; set; } = 3;

/// <summary>
/// Gets or sets the timespan used as delay between the retries or as a base for exponential backoff.
/// The delay between retry attempts for a fixed approach or the delay
/// on which to base calculations for a backoff-based approach.
/// </summary>
public TimeSpan Delay { get; set; } = TimeSpan.FromSeconds(0.8);

/// <summary>
/// Gets or sets maximum timespan to pause between requests.
/// The maximum permissible delay between retry attempts.
/// </summary>
public TimeSpan MaxDelay { get; set; } = TimeSpan.FromMinutes(1);

/// <summary>
/// Gets os sets retry mode
/// The approach to use for calculating retry delays.
/// </summary>
public RetryMode Mode { get; set; } = RetryMode.Exponential;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,13 @@ public async Task RunAsync(string connectionString,

var clientOptions = new EventHubClientOptions
{
DefaultTimeout = TimeSpan.FromMinutes(1),
Retry = new ExponentialRetry(TimeSpan.FromSeconds(0.25), TimeSpan.FromSeconds(30), 5),
TransportType = TransportType.AmqpWebSockets,
Proxy = (IWebProxy)null
};

clientOptions.RetryOptions.MaximumRetries = 5;
clientOptions.RetryOptions.TryTimeout = TimeSpan.FromMinutes(1);

await using (var client = new EventHubClient(connectionString, eventHubName, clientOptions))
{
// Using the client, we will inspect the Event Hub that it is connected to, getting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
<ItemGroup Condition="'$(IsTargetingNetFx)' == 'true'">
<Reference Include="System" />
<Reference Include="Microsoft.CSharp" />
<Reference Include="System.IdentityModel" />
</ItemGroup>

<ItemGroup Condition="'$(IsTargetingNetStandard)' == 'true'">
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ namespace Azure.Messaging.EventHubs.Compatibility
///
internal sealed class TrackOneEventHubClient : TransportEventHubClient
{
/// <summary>The active retry policy for the client.</summary>
private EventHubRetryPolicy _retryPolicy;

/// <summary>A lazy instantiation of the client instance to delegate operation to.</summary>
private Lazy<TrackOne.EventHubClient> _trackOneClient;

Expand All @@ -40,6 +43,7 @@ internal sealed class TrackOneEventHubClient : TransportEventHubClient
/// <param name="eventHubPath">The path of the specific Event Hub to connect the client to.</param>
/// <param name="credential">The Azure managed identity credential to use for authorization. Access controls may be specified by the Event Hubs namespace or the requeseted Event Hub, depending on Azure configuration.</param>
/// <param name="clientOptions">A set of options to apply when configuring the client.</param>
/// <param name="defaultRetryPolicy">The default retry policy to use if no retry options were specified in the <paramref name="clientOptions" />.</param>
///
/// <remarks>
/// As an internal type, this class performs only basic sanity checks against its arguments. It
Expand All @@ -53,7 +57,8 @@ internal sealed class TrackOneEventHubClient : TransportEventHubClient
public TrackOneEventHubClient(string host,
string eventHubPath,
TokenCredential credential,
EventHubClientOptions clientOptions) : this(host, eventHubPath, credential, clientOptions, CreateClient)
EventHubClientOptions clientOptions,
EventHubRetryPolicy defaultRetryPolicy) : this(host, eventHubPath, credential, clientOptions, defaultRetryPolicy, CreateClient)
{
}

Expand All @@ -65,6 +70,7 @@ public TrackOneEventHubClient(string host,
/// <param name="eventHubPath">The path of the specific Event Hub to connect the client to.</param>
/// <param name="credential">The Azure managed identity credential to use for authorization. Access controls may be specified by the Event Hubs namespace or the requeseted Event Hub, depending on Azure configuration.</param>
/// <param name="clientOptions">A set of options to apply when configuring the client.</param>
/// <param name="defaultRetryPolicy">The default retry policy to use if no retry options were specified in the <paramref name="clientOptions" />.</param>
/// <param name="eventHubClientFactory">A delegate that can be used for creation of the <see cref="TrackOne.EventHubClient" /> to which operations are delegated to.</param>
///
/// <remarks>
Expand All @@ -76,18 +82,21 @@ public TrackOneEventHubClient(string host,
/// caller.
/// </remarks>
///
internal TrackOneEventHubClient(string host,
string eventHubPath,
TokenCredential credential,
EventHubClientOptions clientOptions,
Func<string, string, TokenCredential, EventHubClientOptions, TrackOne.EventHubClient> eventHubClientFactory)
public TrackOneEventHubClient(string host,
string eventHubPath,
TokenCredential credential,
EventHubClientOptions clientOptions,
EventHubRetryPolicy defaultRetryPolicy,
Func<string, string, TokenCredential, EventHubClientOptions, Func<EventHubRetryPolicy>, TrackOne.EventHubClient> eventHubClientFactory)
{
Guard.ArgumentNotNullOrEmpty(nameof(host), host);
Guard.ArgumentNotNullOrEmpty(nameof(eventHubPath), eventHubPath);
Guard.ArgumentNotNull(nameof(credential), credential);
Guard.ArgumentNotNull(nameof(clientOptions), clientOptions);
Guard.ArgumentNotNull(nameof(defaultRetryPolicy), defaultRetryPolicy);

_trackOneClient = new Lazy<TrackOne.EventHubClient>(() => eventHubClientFactory(host, eventHubPath, credential, clientOptions), LazyThreadSafetyMode.PublicationOnly);
_retryPolicy = defaultRetryPolicy;
_trackOneClient = new Lazy<TrackOne.EventHubClient>(() => eventHubClientFactory(host, eventHubPath, credential, clientOptions, () => _retryPolicy), LazyThreadSafetyMode.PublicationOnly);
}

/// <summary>
Expand All @@ -99,13 +108,15 @@ internal TrackOneEventHubClient(string host,
/// <param name="eventHubPath">The path of the specific Event Hub to connect the client to.</param>
/// <param name="credential">The Azure managed identity credential to use for authorization. Access controls may be specified by the Event Hubs namespace or the requeseted Event Hub, depending on Azure configuration.</param>
/// <param name="clientOptions">A set of options to apply when configuring the client.</param>
/// <param name="defaultRetryPolicyFactory">A function that retrieves a default retry policy to use if no retry options were specified in the <paramref name="clientOptions" />.</param>
///
/// <returns>The <see cref="TrackOne.EventHubClient" /> to use.</returns>
///
internal static TrackOne.EventHubClient CreateClient(string host,
string eventHubPath,
TokenCredential credential,
EventHubClientOptions clientOptions)
public static TrackOne.EventHubClient CreateClient(string host,
string eventHubPath,
TokenCredential credential,
EventHubClientOptions clientOptions,
Func<EventHubRetryPolicy> defaultRetryPolicyFactory)
{
// Translate the connection type into the corresponding Track One transport type.

Expand Down Expand Up @@ -155,12 +166,34 @@ internal static TrackOne.EventHubClient CreateClient(string host,

// Build and configure the client.

var client = TrackOne.EventHubClient.Create(endpointBuilder.Uri, eventHubPath, tokenProvider, clientOptions.DefaultTimeout, transportType);
var retryPolicy = (clientOptions.RetryOptions != null)
? new BasicRetryPolicy(clientOptions.RetryOptions)
: defaultRetryPolicyFactory();

var client = TrackOne.EventHubClient.Create(endpointBuilder.Uri, eventHubPath, tokenProvider, retryPolicy.CalculateTryTimeout(0), transportType);
client.WebProxy = clientOptions.Proxy;
client.RetryPolicy = new TrackOneRetryPolicy(retryPolicy);

return client;
}

/// <summary>
/// Updates the active retry policy for the client.
/// </summary>
///
/// <param name="newRetryPolicy">The retry policy to set as active.</param>
///
public override void UpdateRetryPolicy(EventHubRetryPolicy newRetryPolicy)
{
_retryPolicy = newRetryPolicy;

if (_trackOneClient.IsValueCreated)
{
TrackOneClient.RetryPolicy = new TrackOneRetryPolicy(newRetryPolicy);
TrackOneClient.ConnectionStringBuilder.OperationTimeout = newRetryPolicy.CalculateTryTimeout(0);
}
}

/// <summary>
/// Retrieves information about an Event Hub, including the number of partitions present
/// and their identifiers.
Expand Down Expand Up @@ -236,26 +269,31 @@ public override async Task<PartitionProperties> GetPartitionPropertiesAsync(stri
/// </summary>
///
/// <param name="producerOptions">The set of options to apply when creating the producer.</param>
/// <param name="defaultRetryPolicy">The default retry policy to use if no retry options were specified in the <paramref name="producerOptions" />.</param>
///
/// <returns>An Event Hub producer configured in the requested manner.</returns>
///
public override EventHubProducer CreateProducer(EventHubProducerOptions producerOptions)
public override EventHubProducer CreateProducer(EventHubProducerOptions producerOptions,
EventHubRetryPolicy defaultRetryPolicy)
{
TrackOne.EventDataSender CreateSenderFactory()
TrackOne.EventDataSender CreateSenderFactory(EventHubRetryPolicy activeRetryPolicy)
{
var producer = TrackOneClient.CreateEventSender(producerOptions.PartitionId);

(TimeSpan minBackoff, TimeSpan maxBackoff, int maxRetries) = ((ExponentialRetry)producerOptions.Retry).GetProperties();
producer.RetryPolicy = new RetryExponential(minBackoff, maxBackoff, maxRetries);
producer.RetryPolicy = new TrackOneRetryPolicy(activeRetryPolicy);

return producer;
}

var initialRetryPolicy = (producerOptions.RetryOptions != null)
? new BasicRetryPolicy(producerOptions.RetryOptions)
: defaultRetryPolicy;

return new EventHubProducer
(
new TrackOneEventHubProducer(CreateSenderFactory),
new TrackOneEventHubProducer(CreateSenderFactory, initialRetryPolicy),
TrackOneClient.EventHubName,
producerOptions
producerOptions,
initialRetryPolicy
);
}

Expand All @@ -279,15 +317,17 @@ TrackOne.EventDataSender CreateSenderFactory()
/// <param name="partitionId">The identifier of the Event Hub partition from which events will be received.</param>
/// <param name="eventPosition">The position within the partition where the consumer should begin reading events.</param>
/// <param name="consumerOptions">The set of options to apply when creating the consumer.</param>
/// <param name="defaultRetryPolicy">The default retry policy to use if no retry options were specified in the <paramref name="consumerOptions" />.</param>
///
/// <returns>An Event Hub consumer configured in the requested manner.</returns>
///
public override EventHubConsumer CreateConsumer(string consumerGroup,
string partitionId,
EventPosition eventPosition,
EventHubConsumerOptions consumerOptions)
EventHubConsumerOptions consumerOptions,
EventHubRetryPolicy defaultRetryPolicy)
{
TrackOne.PartitionReceiver CreateReceiverFactory()
TrackOne.PartitionReceiver CreateReceiverFactory(EventHubRetryPolicy activeRetryPolicy)
{
var position = new TrackOne.EventPosition
{
Expand All @@ -297,7 +337,10 @@ TrackOne.PartitionReceiver CreateReceiverFactory()
EnqueuedTimeUtc = eventPosition.EnqueuedTime?.UtcDateTime
};

var trackOneOptions = new TrackOne.ReceiverOptions { Identifier = consumerOptions.Identifier };
var trackOneOptions = new TrackOne.ReceiverOptions
{
Identifier = consumerOptions.Identifier
};

PartitionReceiver consumer;

Expand All @@ -310,20 +353,24 @@ TrackOne.PartitionReceiver CreateReceiverFactory()
consumer = TrackOneClient.CreateReceiver(consumerGroup, partitionId, position, trackOneOptions);
}

(TimeSpan minBackoff, TimeSpan maxBackoff, int maxRetries) = ((ExponentialRetry)consumerOptions.Retry).GetProperties();
consumer.RetryPolicy = new RetryExponential(minBackoff, maxBackoff, maxRetries);
consumer.RetryPolicy = new TrackOneRetryPolicy(activeRetryPolicy);

return consumer;
}

var initialRetryPolicy = (consumerOptions.RetryOptions != null)
? new BasicRetryPolicy(consumerOptions.RetryOptions)
: defaultRetryPolicy;

return new EventHubConsumer
(
new TrackOneEventHubConsumer(CreateReceiverFactory),
new TrackOneEventHubConsumer(CreateReceiverFactory, initialRetryPolicy),
TrackOneClient.EventHubName,
partitionId,
consumerGroup,
eventPosition,
consumerOptions
consumerOptions,
initialRetryPolicy
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ namespace Azure.Messaging.EventHubs.Compatibility
///
internal sealed class TrackOneEventHubConsumer : TransportEventHubConsumer
{
/// <summary>The active retry policy for the producer.</summary>
private EventHubRetryPolicy _retryPolicy;

/// <summary>A lazy instantiation of the producer instance to delegate operation to.</summary>
private Lazy<TrackOne.PartitionReceiver> _trackOneReceiver;

Expand All @@ -34,6 +37,7 @@ internal sealed class TrackOneEventHubConsumer : TransportEventHubConsumer
/// </summary>
///
/// <param name="trackOneReceiverFactory">A delegate that can be used for creation of the <see cref="TrackOne.PartitionReceiver" /> to which operations are delegated to.</param>
/// <param name="retryPolicy">The retry policy to use when creating the <see cref="TrackOne.PartitionReceiver" />.</param>
///
/// <remarks>
/// As an internal type, this class performs only basic sanity checks against its arguments. It
Expand All @@ -44,10 +48,30 @@ internal sealed class TrackOneEventHubConsumer : TransportEventHubConsumer
/// caller.
/// </remarks>
///
public TrackOneEventHubConsumer(Func<TrackOne.PartitionReceiver> trackOneReceiverFactory)
public TrackOneEventHubConsumer(Func<EventHubRetryPolicy, TrackOne.PartitionReceiver> trackOneReceiverFactory,
EventHubRetryPolicy retryPolicy)
{
Guard.ArgumentNotNull(nameof(trackOneReceiverFactory), trackOneReceiverFactory);
_trackOneReceiver = new Lazy<TrackOne.PartitionReceiver>(trackOneReceiverFactory, LazyThreadSafetyMode.PublicationOnly);
Guard.ArgumentNotNull(nameof(retryPolicy), retryPolicy);

_retryPolicy = retryPolicy;
_trackOneReceiver = new Lazy<TrackOne.PartitionReceiver>(() => trackOneReceiverFactory(_retryPolicy), LazyThreadSafetyMode.PublicationOnly);
}

/// <summary>
/// Updates the active retry policy for the client.
/// </summary>
///
/// <param name="newRetryPolicy">The retry policy to set as active.</param>
///
public override void UpdateRetryPolicy(EventHubRetryPolicy newRetryPolicy)
{
_retryPolicy = newRetryPolicy;

if (_trackOneReceiver.IsValueCreated)
{
TrackOneReceiver.RetryPolicy = new TrackOneRetryPolicy(newRetryPolicy);
}
}

/// <summary>
Expand Down
Loading

0 comments on commit 8892fdf

Please sign in to comment.