Skip to content

Commit

Permalink
Ability to cancel receive operations (#19955)
Browse files Browse the repository at this point in the history
* A crude test to start with

* Provide the ability for the ReceiveMessagesAsyncInternal to be canceled

* Simplify processor catch

* Higher try timeout

* Receiver Test

* SessionTest

* Materialize exceptions from EndReceiveMessages
  • Loading branch information
danielmarbach authored Mar 30, 2021
1 parent 37421b7 commit 1235a9c
Show file tree
Hide file tree
Showing 5 changed files with 182 additions and 30 deletions.
90 changes: 61 additions & 29 deletions sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpReceiver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -282,57 +282,89 @@ private async Task<IReadOnlyList<ServiceBusReceivedMessage>> ReceiveMessagesAsyn
{
if (!_receiveLink.TryGetOpenedObject(out link))
{
link = await _receiveLink.GetOrCreateAsync(UseMinimum(_connectionScope.SessionTimeout, timeout)).ConfigureAwait(false);
link = await _receiveLink.GetOrCreateAsync(UseMinimum(_connectionScope.SessionTimeout, timeout))
.ConfigureAwait(false);
}

cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();

var messagesReceived = await Task.Factory
.FromAsync<(ReceivingAmqpLink, int, TimeSpan?, TimeSpan), IEnumerable<AmqpMessage>>
(
static (arguments, callback, state) =>
{
var (link, maxMessages, maxWaitTime, timeout) = arguments;
return link.BeginReceiveRemoteMessages(
maxMessages,
TimeSpan.FromMilliseconds(20),
maxWaitTime ?? timeout,
callback,
link);
},
static asyncResult =>
{
var link = (ReceivingAmqpLink)asyncResult.AsyncState;
bool received = link.EndReceiveMessages(asyncResult, out IEnumerable<AmqpMessage> amqpMessages);
return received ? amqpMessages : Enumerable.Empty<AmqpMessage>();
},
(link, maxMessages, maxWaitTime, timeout),
default
var receiveMessagesCompletionSource =
new TaskCompletionSource<IEnumerable<AmqpMessage>>(TaskCreationOptions
.RunContinuationsAsynchronously);

using var registration = cancellationToken.Register(static state =>
{
var tcs = (TaskCompletionSource<IEnumerable<AmqpMessage>>)state;
tcs.TrySetCanceled();
}, receiveMessagesCompletionSource, useSynchronizationContext: false);

// in case BeginReceiveRemoteMessages throws exception will be materialized on the synchronous path
_ = Task.Factory
.FromAsync<(ReceivingAmqpLink, int, TimeSpan?, TimeSpan,
TaskCompletionSource<IEnumerable<AmqpMessage>>)>
(
static(arguments, callback, state) =>
{
var (link, maxMessages, maxWaitTime, timeout, receiveMessagesCompletionSource) = arguments;
return link.BeginReceiveRemoteMessages(
maxMessages,
TimeSpan.FromMilliseconds(20),
maxWaitTime ?? timeout,
callback,
(link, receiveMessagesCompletionSource));
},
static asyncResult =>
{
var (link, receiveMessagesCompletionSource) =
((ReceivingAmqpLink, TaskCompletionSource<IEnumerable<AmqpMessage>>))asyncResult
.AsyncState;
try
{
bool received =
link.EndReceiveMessages(asyncResult, out IEnumerable<AmqpMessage> amqpMessages);
receiveMessagesCompletionSource.TrySetResult(received
? amqpMessages
: Enumerable.Empty<AmqpMessage>());
}
catch (Exception e)
{
receiveMessagesCompletionSource.TrySetException(e);
}
},
(link, maxMessages, maxWaitTime, timeout, receiveMessagesCompletionSource),
default
).ConfigureAwait(false);

cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
var messagesReceived = await receiveMessagesCompletionSource.Task
.ConfigureAwait(false);

// If event messages were received, then package them for consumption and
// return them.

foreach (AmqpMessage message in messagesReceived)
{
if (_receiveMode == ServiceBusReceiveMode.ReceiveAndDelete)
{
link.DisposeDelivery(message, true, AmqpConstants.AcceptedOutcome);
}

receivedMessages.Add(AmqpMessageConverter.AmqpMessageToSBMessage(message));
message.Dispose();
}

return receivedMessages;
}
catch (OperationCanceledException)
{
throw;
}
catch (Exception exception)
{
ExceptionDispatchInfo.Capture(AmqpExceptionHelper.TranslateException(
exception,
link?.GetTrackingId(),
null,
!cancellationToken.IsCancellationRequested && HasLinkCommunicationError(link)))
.Throw();
exception,
link?.GetTrackingId(),
null,
!cancellationToken.IsCancellationRequested && HasLinkCommunicationError(link)))
.Throw();

throw; // will never be reached
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -637,7 +637,7 @@ public virtual async Task StopProcessingAsync(CancellationToken cancellationToke
{
await ActiveReceiveTask.ConfigureAwait(false);
}
catch (Exception ex) when (ex is TaskCanceledException || ex is OperationCanceledException)
catch (OperationCanceledException)
{
// Nothing to do here. These exceptions are expected.
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
Expand Down Expand Up @@ -513,6 +514,45 @@ Task ProcessMessage(ProcessMessageEventArgs args)
}
}

[Test]
public async Task StopProcessingAdheresToTokenSLA()
{
await using (var scope = await ServiceBusScope.CreateWithQueue(
enablePartitioning: false,
enableSession: false))
{
// very long timeout
await using var client = CreateClient(tryTimeout: 120);
var sender = client.CreateSender(scope.QueueName);
await sender.SendMessageAsync(GetMessage());
await using var processor = client.CreateProcessor(scope.QueueName, new ServiceBusProcessorOptions
{
AutoCompleteMessages = true,
});
var tcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);

Task ProcessMessage(ProcessMessageEventArgs args)
{
tcs.TrySetResult(true);
return Task.CompletedTask;
}
processor.ProcessMessageAsync += ProcessMessage;
processor.ProcessErrorAsync += ExceptionHandler;

await processor.StartProcessingAsync();
await tcs.Task;
await Task.Delay(10000); // wait long enough to be hanging in the next receive on the empty queue

using var cancellationTokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(3));

var start = DateTime.UtcNow;
await processor.StopProcessingAsync(cancellationTokenSource.Token);
var stop = DateTime.UtcNow;

Assert.That(stop - start, Is.EqualTo(TimeSpan.FromSeconds(3)).Within(TimeSpan.FromSeconds(3)));
}
}

[Test]
[TestCase("")]
[TestCase("Abandon")]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,43 @@ public async Task PeekSingleMessage()
}
}

[Test]
public async Task ReceiveMessagesWhenQueueEmpty()
{
await using (var scope = await ServiceBusScope.CreateWithQueue(enablePartitioning: false, enableSession: false))
{
await using var client = new ServiceBusClient(TestEnvironment.ServiceBusConnectionString, new ServiceBusClientOptions
{
RetryOptions =
{
// very high TryTimeout
TryTimeout = TimeSpan.FromSeconds(120)
}
});

var messageCount = 2;
ServiceBusSender sender = client.CreateSender(scope.QueueName);
using ServiceBusMessageBatch batch = await sender.CreateMessageBatchAsync();
IEnumerable<ServiceBusMessage> messages = AddMessages(batch, messageCount).AsEnumerable<ServiceBusMessage>();
await sender.SendMessagesAsync(batch);

var receiver = client.CreateReceiver(scope.QueueName);

foreach (var message in await receiver.ReceiveMessagesAsync(2))
{
await receiver.CompleteMessageAsync(message);
}

using var cancellationTokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(3));

var start = DateTime.UtcNow;
Assert.ThrowsAsync<TaskCanceledException>(async () => await receiver.ReceiveMessagesAsync(1, cancellationToken: cancellationTokenSource.Token));
var stop = DateTime.UtcNow;

Assert.That(stop - start, Is.EqualTo(TimeSpan.FromSeconds(3)).Within(TimeSpan.FromSeconds(3)));
}
}

[Test]
public async Task ReceiveMessagesInPeekLockMode()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,49 @@ public async Task RoundRobinSessions()
}
}

[Test]
public async Task ReceiveMessagesWhenQueueEmpty()
{
await using (var scope = await ServiceBusScope.CreateWithQueue(enablePartitioning: false, enableSession: true))
{
await using var client = new ServiceBusClient(TestEnvironment.ServiceBusConnectionString, new ServiceBusClientOptions
{
RetryOptions =
{
// very high TryTimeout
TryTimeout = TimeSpan.FromSeconds(120)
}
});

var messageCount = 2;
var sessionId = "sessionId1";
ServiceBusSender sender = client.CreateSender(scope.QueueName);
using ServiceBusMessageBatch batch = await sender.CreateMessageBatchAsync();
IEnumerable<ServiceBusMessage> messages = AddMessages(batch, messageCount, sessionId).AsEnumerable<ServiceBusMessage>();
await sender.SendMessagesAsync(batch);

ServiceBusReceiver receiver = await client.AcceptNextSessionAsync(
scope.QueueName,
new ServiceBusSessionReceiverOptions
{
PrefetchCount = 100
});

foreach (var message in await receiver.ReceiveMessagesAsync(2))
{
await receiver.CompleteMessageAsync(message);
}

using var cancellationTokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(3));

var start = DateTime.UtcNow;
Assert.ThrowsAsync<TaskCanceledException>(async () => await receiver.ReceiveMessagesAsync(1, cancellationToken: cancellationTokenSource.Token));
var stop = DateTime.UtcNow;

Assert.That(stop - start, Is.EqualTo(TimeSpan.FromSeconds(3)).Within(TimeSpan.FromSeconds(3)));
}
}

[Test]
public async Task ReceiveMessagesInPeekLockMode()
{
Expand Down

0 comments on commit 1235a9c

Please sign in to comment.