Skip to content

Commit

Permalink
[Event Hubs Client] PR amends on transport producer expiration (Azure…
Browse files Browse the repository at this point in the history
  • Loading branch information
albertodenatale committed Feb 16, 2020
1 parent f004809 commit 4ac7fb1
Show file tree
Hide file tree
Showing 6 changed files with 160 additions and 152 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ namespace Azure.Messaging.EventHubs.Core
///
internal class TransportProducerPool : IAsyncDisposable
{
/// <summary>The period in minutes after which <see cref="PerformExpiration" /> is run.</summary>
/// <summary>The period after which <see cref="PerformExpiration" /> is run.</summary>
private static readonly TimeSpan DefaultPerformExpirationPeriod = TimeSpan.FromMinutes(10);

/// <summary>
Expand Down Expand Up @@ -113,7 +113,7 @@ public virtual PooledProducer GetPooledProducer(string partitionId,
// the returned PoolItem if it had expired. The probability is very low and
// possible exceptions should be handled by the invoking methods.

if (item.PartitionProducer.IsClosed == true || !item.ActiveInstances.TryAdd(identifier, 0))
if (item.PartitionProducer.IsClosed || !item.ActiveInstances.TryAdd(identifier, 0))
{
identifier = Guid.NewGuid().ToString();
item = Pool.GetOrAdd(partitionId, id => new PoolItem(partitionId, Connection.CreateTransportProducer(id, RetryPolicy), removeAfterDuration));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,6 @@ internal virtual async Task SendAsync(IEnumerable<EventData> events,
Argument.AssertNotNull(events, nameof(events));
AssertSinglePartitionReference(options.PartitionId, options.PartitionKey);

bool isMessageSent = false;
int attempts = 0;

using DiagnosticScope scope = CreateDiagnosticScope();
Expand All @@ -410,22 +409,20 @@ internal virtual async Task SendAsync(IEnumerable<EventData> events,

TransportProducerPool.PooledProducer pooledProducer = PartitionProducerPool.GetPooledProducer(options.PartitionId, PartitionProducerLifespan);

while (!cancellationToken.IsCancellationRequested
&& ++attempts <= MaximumCreateProducerAttempts
&& !isMessageSent)
while (!cancellationToken.IsCancellationRequested)
{
try
{
await using var _ = pooledProducer.ConfigureAwait(false);

await pooledProducer.TransportProducer.SendAsync(events, options, cancellationToken).ConfigureAwait(false);

isMessageSent = true;
return;
}
catch (EventHubsException eventHubException) when (eventHubException.Reason == EventHubsException.FailureReason.ClientClosed
&& ShouldRecreateProducer(pooledProducer.TransportProducer, options.PartitionId))
&& ShouldRecreateProducer(pooledProducer.TransportProducer, options.PartitionId))
{
if (attempts >= MaximumCreateProducerAttempts)
if (++attempts >= MaximumCreateProducerAttempts)
{
scope.Failed(eventHubException);
throw;
Expand All @@ -439,6 +436,8 @@ internal virtual async Task SendAsync(IEnumerable<EventData> events,
throw;
}
}

throw new TaskCanceledException();
}

/// <summary>
Expand All @@ -463,27 +462,24 @@ public virtual async Task SendAsync(EventDataBatch eventBatch,
AssertSinglePartitionReference(eventBatch.SendOptions.PartitionId, eventBatch.SendOptions.PartitionKey);

int attempts = 0;
bool isMessageSent = false;
using DiagnosticScope scope = CreateDiagnosticScope();

var pooledProducer = PartitionProducerPool.GetPooledProducer(eventBatch.SendOptions.PartitionId, PartitionProducerLifespan);

while (!cancellationToken.IsCancellationRequested
&& ++attempts <= MaximumCreateProducerAttempts
&& !isMessageSent)
while (!cancellationToken.IsCancellationRequested)
{
try
{
await using var _ = pooledProducer.ConfigureAwait(false);

await pooledProducer.TransportProducer.SendAsync(eventBatch, cancellationToken).ConfigureAwait(false);

isMessageSent = true;
return;
}
catch (EventHubsException eventHubException) when (eventHubException.Reason == EventHubsException.FailureReason.ClientClosed
&& ShouldRecreateProducer(pooledProducer.TransportProducer, eventBatch.SendOptions.PartitionId))
&& ShouldRecreateProducer(pooledProducer.TransportProducer, eventBatch.SendOptions.PartitionId))
{
if (attempts >= MaximumCreateProducerAttempts)
if (++attempts >= MaximumCreateProducerAttempts)
{
scope.Failed(eventHubException);
throw;
Expand All @@ -497,6 +493,8 @@ public virtual async Task SendAsync(EventDataBatch eventBatch,
throw;
}
}

throw new TaskCanceledException();
}

/// <summary>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

using System;
using System.ComponentModel;

namespace Azure.Messaging.EventHubs.Producer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2419,9 +2419,17 @@ internal override Task<PartitionProperties> GetPartitionPropertiesAsync(string p
return Task.FromResult(default(PartitionProperties));
}

internal override TransportConsumer CreateTransportConsumer(string consumerGroup, string partitionId, EventPosition eventPosition, EventHubsRetryPolicy retryPolicy, bool trackLastEnqueuedEventProperties = true, long? ownerLevel = default, uint? prefetchCount = default) => TransportConsumerFactory();

internal override TransportClient CreateTransportClient(string fullyQualifiedNamespace, string eventHubName, EventHubTokenCredential credential, EventHubConnectionOptions options)
internal override TransportConsumer CreateTransportConsumer(string consumerGroup,
string partitionId, EventPosition eventPosition,
EventHubsRetryPolicy retryPolicy,
bool trackLastEnqueuedEventProperties = true,
long? ownerLevel = default,
uint? prefetchCount = default) => TransportConsumerFactory();

internal override TransportClient CreateTransportClient(string fullyQualifiedNamespace,
string eventHubName,
EventHubTokenCredential credential,
EventHubConnectionOptions options)
{
var client = new Mock<TransportClient>();

Expand Down
Loading

0 comments on commit 4ac7fb1

Please sign in to comment.