Skip to content

Commit

Permalink
[Event Hubs Client] fixed AZC0102 and AZC0106 (Azure#8592) (Azure#9431)
Browse files Browse the repository at this point in the history
  • Loading branch information
albertodenatale committed Feb 29, 2020
1 parent c5f9d70 commit b4dad4a
Show file tree
Hide file tree
Showing 6 changed files with 29 additions and 15 deletions.
14 changes: 12 additions & 2 deletions sdk/eventhub/Azure.Messaging.EventHubs/src/Amqp/AmqpProducer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -222,9 +222,11 @@ public override async ValueTask<TransportEventBatch> CreateBatchAsync(CreateBatc
/// Closes the connection to the transport producer instance.
/// </summary>
///
/// <param name="isAsync"><c>true</c> if the method will be executed asynchronously; otherwise, false.</param>
/// <param name="cancellationToken">An optional <see cref="CancellationToken"/> instance to signal the request to cancel the operation.</param>
///
public override async Task CloseAsync(CancellationToken cancellationToken)
public override async Task CloseAsync(bool isAsync,
CancellationToken cancellationToken)
{
if (_closed)
{
Expand All @@ -244,7 +246,15 @@ public override async Task CloseAsync(CancellationToken cancellationToken)
if (SendLink?.TryGetOpenedObject(out var _) == true)
{
cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
await SendLink.CloseAsync().ConfigureAwait(false);

if (isAsync)
{
await SendLink.CloseAsync().ConfigureAwait(false);
}
else
{
SendLink.Close();
}
}

SendLink?.Dispose();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,10 @@ public abstract ValueTask<TransportEventBatch> CreateBatchAsync(CreateBatchOptio
/// Closes the connection to the transport producer instance.
/// </summary>
///
/// <param name="isAsync"><c>true</c> if the method will be executed asynchronously; otherwise, false.</param>
/// <param name="cancellationToken">An optional <see cref="CancellationToken"/> instance to signal the request to cancel the operation.</param>
///
public abstract Task CloseAsync(CancellationToken cancellationToken);
public abstract Task CloseAsync(bool isAsync,
CancellationToken cancellationToken);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ public virtual PooledProducer GetPooledProducer(string partitionId,

if (!Pool.TryGetValue(partitionId, out _) && !item.ActiveInstances.Any())
{
return producer.CloseAsync(CancellationToken.None);
return producer.CloseAsync(false, CancellationToken.None);
}

return Task.CompletedTask;
Expand All @@ -167,13 +167,13 @@ public virtual PooledProducer GetPooledProducer(string partitionId,
///
public async Task CloseAsync(CancellationToken cancellationToken = default)
{
await EventHubProducer.CloseAsync(cancellationToken).ConfigureAwait(false);
await EventHubProducer.CloseAsync(true, cancellationToken).ConfigureAwait(false);

var pendingCloses = new List<Task>();

foreach (var poolItem in Pool.Values)
{
pendingCloses.Add(poolItem.PartitionProducer.CloseAsync(CancellationToken.None));
pendingCloses.Add(poolItem.PartitionProducer.CloseAsync(true, CancellationToken.None));
}

Pool.Clear();
Expand Down Expand Up @@ -206,9 +206,9 @@ internal TimerCallback PerformExpiration()
{
// At this point the pool item may have been closed already
// if there was a context switch between the if conditions
// and the pool item clean up kicked in.
// and the pool item clean up started.

poolItem.PartitionProducer.CloseAsync(CancellationToken.None).GetAwaiter().GetResult();
poolItem.PartitionProducer.CloseAsync(false, CancellationToken.None);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public async Task CloseMarksTheProducerAsClosed()
var producer = new AmqpProducer("aHub", "0", Mock.Of<AmqpConnectionScope>(), Mock.Of<AmqpMessageConverter>(), Mock.Of<EventHubsRetryPolicy>());
Assert.That(producer.IsClosed, Is.False, "The producer should not be closed on creation");

await producer.CloseAsync(CancellationToken.None);
await producer.CloseAsync(true, CancellationToken.None);
Assert.That(producer.IsClosed, Is.True, "The producer should be marked as closed after closing");
}

Expand All @@ -95,7 +95,7 @@ public void CloseRespectsTheCancellationToken()
using var cancellationSource = new CancellationTokenSource();

cancellationSource.Cancel();
Assert.That(async () => await producer.CloseAsync(cancellationSource.Token), Throws.InstanceOf<TaskCanceledException>(), "Cancellation should trigger the appropriate exception.");
Assert.That(async () => await producer.CloseAsync(true, cancellationSource.Token), Throws.InstanceOf<TaskCanceledException>(), "Cancellation should trigger the appropriate exception.");
Assert.That(producer.IsClosed, Is.False, "Cancellation should have interrupted closing and left the producer in an open state.");
}

Expand Down Expand Up @@ -285,7 +285,7 @@ public void SendEnumerableValidatesTheEvents()
public async Task SendEnumerableEnsuresNotClosed()
{
var producer = new AmqpProducer("aHub", null, Mock.Of<AmqpConnectionScope>(), new AmqpMessageConverter(), Mock.Of<EventHubsRetryPolicy>());
await producer.CloseAsync(CancellationToken.None);
await producer.CloseAsync(true, CancellationToken.None);

Assert.That(async () => await producer.SendAsync(Enumerable.Empty<EventData>(), new SendEventOptions(), CancellationToken.None), Throws.InstanceOf<EventHubsException>().And.Property(nameof(EventHubsException.Reason)).EqualTo(EventHubsException.FailureReason.ClientClosed));
}
Expand Down Expand Up @@ -598,7 +598,7 @@ public async Task SendBatchEnsuresNotClosed()

using TransportEventBatch batch = await producer.Object.CreateBatchAsync(options, default);

await producer.Object.CloseAsync(CancellationToken.None);
await producer.Object.CloseAsync(true, CancellationToken.None);
Assert.That(async () => await producer.Object.SendAsync(new EventDataBatch(batch, "ns", "eh", new SendEventOptions()), CancellationToken.None), Throws.InstanceOf<EventHubsException>().And.Property(nameof(EventHubsException.Reason)).EqualTo(EventHubsException.FailureReason.ClientClosed));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -603,7 +603,7 @@ public async Task CloseAsyncSurfacesExceptionsForTransportConsumers()
var producer = new EventHubProducerClient(mockConnection);

mockTransportProducer
.Setup(producer => producer.CloseAsync(It.IsAny<CancellationToken>()))
.Setup(producer => producer.CloseAsync(false, It.IsAny<CancellationToken>()))
.Returns(Task.FromException(new InvalidCastException()));

try
Expand Down Expand Up @@ -1258,7 +1258,8 @@ public override ValueTask<TransportEventBatch> CreateBatchAsync(CreateBatchOptio
return new ValueTask<TransportEventBatch>(Task.FromResult((TransportEventBatch)new MockTransportBatch()));
}

public override Task CloseAsync(CancellationToken cancellationToken)
public override Task CloseAsync(bool isAsync,
CancellationToken cancellationToken)
{
WasCloseCalled = true;
++CloseCallCount;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,8 @@ public override ValueTask<TransportEventBatch> CreateBatchAsync(CreateBatchOptio
return new ValueTask<TransportEventBatch>(Task.FromResult((TransportEventBatch)new MockTransportBatch()));
}

public override Task CloseAsync(CancellationToken cancellationToken)
public override Task CloseAsync(bool isAsync,
CancellationToken cancellationToken)
{
WasCloseCalled = true;
++CloseCallCount;
Expand Down

0 comments on commit b4dad4a

Please sign in to comment.