From 1235a9c7b00149f6bce187dfe3b56793c502c5ae Mon Sep 17 00:00:00 2001 From: Daniel Marbach Date: Wed, 31 Mar 2021 01:11:14 +0200 Subject: [PATCH] Ability to cancel receive operations (#19955) * A crude test to start with * Provide the ability for the ReceiveMessagesAsyncInternal to be canceled * Simplify processor catch * Higher try timeout * Receiver Test * SessionTest * Materialize exceptions from EndReceiveMessages --- .../src/Amqp/AmqpReceiver.cs | 90 +++++++++++++------ .../src/Processor/ServiceBusProcessor.cs | 2 +- .../tests/Processor/ProcessorLiveTests.cs | 40 +++++++++ .../tests/Receiver/ReceiverLiveTests.cs | 37 ++++++++ .../Receiver/SessionReceiverLiveTests.cs | 43 +++++++++ 5 files changed, 182 insertions(+), 30 deletions(-) diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpReceiver.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpReceiver.cs index d9f1c8af8861b..523f7aad4195f 100644 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpReceiver.cs +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpReceiver.cs @@ -282,57 +282,89 @@ private async Task> ReceiveMessagesAsyn { if (!_receiveLink.TryGetOpenedObject(out link)) { - link = await _receiveLink.GetOrCreateAsync(UseMinimum(_connectionScope.SessionTimeout, timeout)).ConfigureAwait(false); + link = await _receiveLink.GetOrCreateAsync(UseMinimum(_connectionScope.SessionTimeout, timeout)) + .ConfigureAwait(false); } + cancellationToken.ThrowIfCancellationRequested(); - var messagesReceived = await Task.Factory - .FromAsync<(ReceivingAmqpLink, int, TimeSpan?, TimeSpan), IEnumerable> - ( - static (arguments, callback, state) => - { - var (link, maxMessages, maxWaitTime, timeout) = arguments; - return link.BeginReceiveRemoteMessages( - maxMessages, - TimeSpan.FromMilliseconds(20), - maxWaitTime ?? timeout, - callback, - link); - }, - static asyncResult => - { - var link = (ReceivingAmqpLink)asyncResult.AsyncState; - bool received = link.EndReceiveMessages(asyncResult, out IEnumerable amqpMessages); - return received ? amqpMessages : Enumerable.Empty(); - }, - (link, maxMessages, maxWaitTime, timeout), - default + var receiveMessagesCompletionSource = + new TaskCompletionSource>(TaskCreationOptions + .RunContinuationsAsynchronously); + + using var registration = cancellationToken.Register(static state => + { + var tcs = (TaskCompletionSource>)state; + tcs.TrySetCanceled(); + }, receiveMessagesCompletionSource, useSynchronizationContext: false); + + // in case BeginReceiveRemoteMessages throws exception will be materialized on the synchronous path + _ = Task.Factory + .FromAsync<(ReceivingAmqpLink, int, TimeSpan?, TimeSpan, + TaskCompletionSource>)> + ( + static(arguments, callback, state) => + { + var (link, maxMessages, maxWaitTime, timeout, receiveMessagesCompletionSource) = arguments; + return link.BeginReceiveRemoteMessages( + maxMessages, + TimeSpan.FromMilliseconds(20), + maxWaitTime ?? timeout, + callback, + (link, receiveMessagesCompletionSource)); + }, + static asyncResult => + { + var (link, receiveMessagesCompletionSource) = + ((ReceivingAmqpLink, TaskCompletionSource>))asyncResult + .AsyncState; + try + { + bool received = + link.EndReceiveMessages(asyncResult, out IEnumerable amqpMessages); + receiveMessagesCompletionSource.TrySetResult(received + ? amqpMessages + : Enumerable.Empty()); + } + catch (Exception e) + { + receiveMessagesCompletionSource.TrySetException(e); + } + }, + (link, maxMessages, maxWaitTime, timeout, receiveMessagesCompletionSource), + default ).ConfigureAwait(false); - cancellationToken.ThrowIfCancellationRequested(); + var messagesReceived = await receiveMessagesCompletionSource.Task + .ConfigureAwait(false); + // If event messages were received, then package them for consumption and // return them. - foreach (AmqpMessage message in messagesReceived) { if (_receiveMode == ServiceBusReceiveMode.ReceiveAndDelete) { link.DisposeDelivery(message, true, AmqpConstants.AcceptedOutcome); } + receivedMessages.Add(AmqpMessageConverter.AmqpMessageToSBMessage(message)); message.Dispose(); } return receivedMessages; } + catch (OperationCanceledException) + { + throw; + } catch (Exception exception) { ExceptionDispatchInfo.Capture(AmqpExceptionHelper.TranslateException( - exception, - link?.GetTrackingId(), - null, - !cancellationToken.IsCancellationRequested && HasLinkCommunicationError(link))) - .Throw(); + exception, + link?.GetTrackingId(), + null, + !cancellationToken.IsCancellationRequested && HasLinkCommunicationError(link))) + .Throw(); throw; // will never be reached } diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Processor/ServiceBusProcessor.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Processor/ServiceBusProcessor.cs index c8f5edacad6dd..01d4a54df5a38 100755 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Processor/ServiceBusProcessor.cs +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Processor/ServiceBusProcessor.cs @@ -637,7 +637,7 @@ public virtual async Task StopProcessingAsync(CancellationToken cancellationToke { await ActiveReceiveTask.ConfigureAwait(false); } - catch (Exception ex) when (ex is TaskCanceledException || ex is OperationCanceledException) + catch (OperationCanceledException) { // Nothing to do here. These exceptions are expected. } diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Processor/ProcessorLiveTests.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Processor/ProcessorLiveTests.cs index 34e63e6fd4fce..df46ee23ee6e3 100644 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Processor/ProcessorLiveTests.cs +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Processor/ProcessorLiveTests.cs @@ -3,6 +3,7 @@ using System; using System.Collections.Generic; +using System.Diagnostics; using System.Linq; using System.Threading; using System.Threading.Tasks; @@ -513,6 +514,45 @@ Task ProcessMessage(ProcessMessageEventArgs args) } } + [Test] + public async Task StopProcessingAdheresToTokenSLA() + { + await using (var scope = await ServiceBusScope.CreateWithQueue( + enablePartitioning: false, + enableSession: false)) + { + // very long timeout + await using var client = CreateClient(tryTimeout: 120); + var sender = client.CreateSender(scope.QueueName); + await sender.SendMessageAsync(GetMessage()); + await using var processor = client.CreateProcessor(scope.QueueName, new ServiceBusProcessorOptions + { + AutoCompleteMessages = true, + }); + var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + + Task ProcessMessage(ProcessMessageEventArgs args) + { + tcs.TrySetResult(true); + return Task.CompletedTask; + } + processor.ProcessMessageAsync += ProcessMessage; + processor.ProcessErrorAsync += ExceptionHandler; + + await processor.StartProcessingAsync(); + await tcs.Task; + await Task.Delay(10000); // wait long enough to be hanging in the next receive on the empty queue + + using var cancellationTokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(3)); + + var start = DateTime.UtcNow; + await processor.StopProcessingAsync(cancellationTokenSource.Token); + var stop = DateTime.UtcNow; + + Assert.That(stop - start, Is.EqualTo(TimeSpan.FromSeconds(3)).Within(TimeSpan.FromSeconds(3))); + } + } + [Test] [TestCase("")] [TestCase("Abandon")] diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Receiver/ReceiverLiveTests.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Receiver/ReceiverLiveTests.cs index 2ca1af130a4c5..2f279728f37de 100644 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Receiver/ReceiverLiveTests.cs +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Receiver/ReceiverLiveTests.cs @@ -99,6 +99,43 @@ public async Task PeekSingleMessage() } } + [Test] + public async Task ReceiveMessagesWhenQueueEmpty() + { + await using (var scope = await ServiceBusScope.CreateWithQueue(enablePartitioning: false, enableSession: false)) + { + await using var client = new ServiceBusClient(TestEnvironment.ServiceBusConnectionString, new ServiceBusClientOptions + { + RetryOptions = + { + // very high TryTimeout + TryTimeout = TimeSpan.FromSeconds(120) + } + }); + + var messageCount = 2; + ServiceBusSender sender = client.CreateSender(scope.QueueName); + using ServiceBusMessageBatch batch = await sender.CreateMessageBatchAsync(); + IEnumerable messages = AddMessages(batch, messageCount).AsEnumerable(); + await sender.SendMessagesAsync(batch); + + var receiver = client.CreateReceiver(scope.QueueName); + + foreach (var message in await receiver.ReceiveMessagesAsync(2)) + { + await receiver.CompleteMessageAsync(message); + } + + using var cancellationTokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(3)); + + var start = DateTime.UtcNow; + Assert.ThrowsAsync(async () => await receiver.ReceiveMessagesAsync(1, cancellationToken: cancellationTokenSource.Token)); + var stop = DateTime.UtcNow; + + Assert.That(stop - start, Is.EqualTo(TimeSpan.FromSeconds(3)).Within(TimeSpan.FromSeconds(3))); + } + } + [Test] public async Task ReceiveMessagesInPeekLockMode() { diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Receiver/SessionReceiverLiveTests.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Receiver/SessionReceiverLiveTests.cs index c2cb0d312ab3e..2efa190a19945 100644 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Receiver/SessionReceiverLiveTests.cs +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Receiver/SessionReceiverLiveTests.cs @@ -200,6 +200,49 @@ public async Task RoundRobinSessions() } } + [Test] + public async Task ReceiveMessagesWhenQueueEmpty() + { + await using (var scope = await ServiceBusScope.CreateWithQueue(enablePartitioning: false, enableSession: true)) + { + await using var client = new ServiceBusClient(TestEnvironment.ServiceBusConnectionString, new ServiceBusClientOptions + { + RetryOptions = + { + // very high TryTimeout + TryTimeout = TimeSpan.FromSeconds(120) + } + }); + + var messageCount = 2; + var sessionId = "sessionId1"; + ServiceBusSender sender = client.CreateSender(scope.QueueName); + using ServiceBusMessageBatch batch = await sender.CreateMessageBatchAsync(); + IEnumerable messages = AddMessages(batch, messageCount, sessionId).AsEnumerable(); + await sender.SendMessagesAsync(batch); + + ServiceBusReceiver receiver = await client.AcceptNextSessionAsync( + scope.QueueName, + new ServiceBusSessionReceiverOptions + { + PrefetchCount = 100 + }); + + foreach (var message in await receiver.ReceiveMessagesAsync(2)) + { + await receiver.CompleteMessageAsync(message); + } + + using var cancellationTokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(3)); + + var start = DateTime.UtcNow; + Assert.ThrowsAsync(async () => await receiver.ReceiveMessagesAsync(1, cancellationToken: cancellationTokenSource.Token)); + var stop = DateTime.UtcNow; + + Assert.That(stop - start, Is.EqualTo(TimeSpan.FromSeconds(3)).Within(TimeSpan.FromSeconds(3))); + } + } + [Test] public async Task ReceiveMessagesInPeekLockMode() {