From 30b8937b9061f4abd6f3c3a8a766fa5a775b9b82 Mon Sep 17 00:00:00 2001 From: danielmarbach Date: Sat, 27 Mar 2021 09:34:19 +0100 Subject: [PATCH 1/7] A crude test to start with --- .../tests/Processor/ProcessorLiveTests.cs | 40 +++++++++++++++++++ 1 file changed, 40 insertions(+) diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Processor/ProcessorLiveTests.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Processor/ProcessorLiveTests.cs index 34e63e6fd4fce..2afea4befd276 100644 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Processor/ProcessorLiveTests.cs +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Processor/ProcessorLiveTests.cs @@ -3,6 +3,7 @@ using System; using System.Collections.Generic; +using System.Diagnostics; using System.Linq; using System.Threading; using System.Threading.Tasks; @@ -513,6 +514,45 @@ Task ProcessMessage(ProcessMessageEventArgs args) } } + [Test] + public async Task StopProcessingAdheresToTokenSLA() + { + await using (var scope = await ServiceBusScope.CreateWithQueue( + enablePartitioning: false, + enableSession: false)) + { + // very long timeout + await using var client = CreateClient(tryTimeout: 60); + var sender = client.CreateSender(scope.QueueName); + await sender.SendMessageAsync(GetMessage()); + await using var processor = client.CreateProcessor(scope.QueueName, new ServiceBusProcessorOptions + { + AutoCompleteMessages = true, + }); + var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + + Task ProcessMessage(ProcessMessageEventArgs args) + { + tcs.TrySetResult(true); + return Task.CompletedTask; + } + processor.ProcessMessageAsync += ProcessMessage; + processor.ProcessErrorAsync += ExceptionHandler; + + await processor.StartProcessingAsync(); + await tcs.Task; + await Task.Delay(2000); // better way to do this? + + using var cancellationTokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(3)); + + var start = DateTime.UtcNow; + await processor.StopProcessingAsync(cancellationTokenSource.Token); + var stop = DateTime.UtcNow; + + Assert.That(stop - start, Is.EqualTo(TimeSpan.FromSeconds(3)).Within(TimeSpan.FromSeconds(3))); + } + } + [Test] [TestCase("")] [TestCase("Abandon")] From 9be9a9de5619ae249b655a2329c320fdb8f9afdc Mon Sep 17 00:00:00 2001 From: danielmarbach Date: Tue, 30 Mar 2021 21:39:52 +0200 Subject: [PATCH 2/7] Provide the ability for the ReceiveMessagesAsyncInternal to be canceled --- .../src/Amqp/AmqpReceiver.cs | 83 ++++++++++++------- 1 file changed, 54 insertions(+), 29 deletions(-) diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpReceiver.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpReceiver.cs index d9f1c8af8861b..114b895817501 100644 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpReceiver.cs +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpReceiver.cs @@ -282,57 +282,82 @@ private async Task> ReceiveMessagesAsyn { if (!_receiveLink.TryGetOpenedObject(out link)) { - link = await _receiveLink.GetOrCreateAsync(UseMinimum(_connectionScope.SessionTimeout, timeout)).ConfigureAwait(false); + link = await _receiveLink.GetOrCreateAsync(UseMinimum(_connectionScope.SessionTimeout, timeout)) + .ConfigureAwait(false); } + cancellationToken.ThrowIfCancellationRequested(); - var messagesReceived = await Task.Factory - .FromAsync<(ReceivingAmqpLink, int, TimeSpan?, TimeSpan), IEnumerable> - ( - static (arguments, callback, state) => - { - var (link, maxMessages, maxWaitTime, timeout) = arguments; - return link.BeginReceiveRemoteMessages( - maxMessages, - TimeSpan.FromMilliseconds(20), - maxWaitTime ?? timeout, - callback, - link); - }, - static asyncResult => - { - var link = (ReceivingAmqpLink)asyncResult.AsyncState; - bool received = link.EndReceiveMessages(asyncResult, out IEnumerable amqpMessages); - return received ? amqpMessages : Enumerable.Empty(); - }, - (link, maxMessages, maxWaitTime, timeout), - default + var receiveMessagesCompletionSource = + new TaskCompletionSource>(TaskCreationOptions + .RunContinuationsAsynchronously); + + using var registration = cancellationToken.Register(static state => + { + var tcs = (TaskCompletionSource>)state; + tcs.TrySetCanceled(); + }, receiveMessagesCompletionSource, useSynchronizationContext: false); + + // in case BeginReceiveRemoteMessages throws exception will be materialized on the synchronous path + _ = Task.Factory + .FromAsync<(ReceivingAmqpLink, int, TimeSpan?, TimeSpan, + TaskCompletionSource>)> + ( + static(arguments, callback, state) => + { + var (link, maxMessages, maxWaitTime, timeout, receiveMessagesCompletionSource) = arguments; + return link.BeginReceiveRemoteMessages( + maxMessages, + TimeSpan.FromMilliseconds(20), + maxWaitTime ?? timeout, + callback, + (link, receiveMessagesCompletionSource)); + }, + static asyncResult => + { + var (link, receiveMessagesCompletionSource) = + ((ReceivingAmqpLink, TaskCompletionSource>))asyncResult + .AsyncState; + bool received = + link.EndReceiveMessages(asyncResult, out IEnumerable amqpMessages); + receiveMessagesCompletionSource.TrySetResult(received + ? amqpMessages + : Enumerable.Empty()); + }, + (link, maxMessages, maxWaitTime, timeout, receiveMessagesCompletionSource), + default ).ConfigureAwait(false); - cancellationToken.ThrowIfCancellationRequested(); + var messagesReceived = await receiveMessagesCompletionSource.Task + .ConfigureAwait(false); + // If event messages were received, then package them for consumption and // return them. - foreach (AmqpMessage message in messagesReceived) { if (_receiveMode == ServiceBusReceiveMode.ReceiveAndDelete) { link.DisposeDelivery(message, true, AmqpConstants.AcceptedOutcome); } + receivedMessages.Add(AmqpMessageConverter.AmqpMessageToSBMessage(message)); message.Dispose(); } return receivedMessages; } + catch (OperationCanceledException) + { + throw; + } catch (Exception exception) { ExceptionDispatchInfo.Capture(AmqpExceptionHelper.TranslateException( - exception, - link?.GetTrackingId(), - null, - !cancellationToken.IsCancellationRequested && HasLinkCommunicationError(link))) - .Throw(); + exception, + link?.GetTrackingId(), + null, + !cancellationToken.IsCancellationRequested && HasLinkCommunicationError(link))) + .Throw(); throw; // will never be reached } From 0605a2edcf4b6ac22639514a9aa8d8dacafbbd5f Mon Sep 17 00:00:00 2001 From: danielmarbach Date: Tue, 30 Mar 2021 21:40:17 +0200 Subject: [PATCH 3/7] Simplify processor catch --- .../src/Processor/ServiceBusProcessor.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Processor/ServiceBusProcessor.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Processor/ServiceBusProcessor.cs index c8f5edacad6dd..01d4a54df5a38 100755 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Processor/ServiceBusProcessor.cs +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Processor/ServiceBusProcessor.cs @@ -637,7 +637,7 @@ public virtual async Task StopProcessingAsync(CancellationToken cancellationToke { await ActiveReceiveTask.ConfigureAwait(false); } - catch (Exception ex) when (ex is TaskCanceledException || ex is OperationCanceledException) + catch (OperationCanceledException) { // Nothing to do here. These exceptions are expected. } From 8518f24be808d8b3da070fa608361fd9287d95fe Mon Sep 17 00:00:00 2001 From: danielmarbach Date: Tue, 30 Mar 2021 21:44:13 +0200 Subject: [PATCH 4/7] Higher try timeout --- .../tests/Processor/ProcessorLiveTests.cs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Processor/ProcessorLiveTests.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Processor/ProcessorLiveTests.cs index 2afea4befd276..df46ee23ee6e3 100644 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Processor/ProcessorLiveTests.cs +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Processor/ProcessorLiveTests.cs @@ -522,7 +522,7 @@ public async Task StopProcessingAdheresToTokenSLA() enableSession: false)) { // very long timeout - await using var client = CreateClient(tryTimeout: 60); + await using var client = CreateClient(tryTimeout: 120); var sender = client.CreateSender(scope.QueueName); await sender.SendMessageAsync(GetMessage()); await using var processor = client.CreateProcessor(scope.QueueName, new ServiceBusProcessorOptions @@ -541,7 +541,7 @@ Task ProcessMessage(ProcessMessageEventArgs args) await processor.StartProcessingAsync(); await tcs.Task; - await Task.Delay(2000); // better way to do this? + await Task.Delay(10000); // wait long enough to be hanging in the next receive on the empty queue using var cancellationTokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(3)); From 6048076bff41fd7bf622da5b642912c298cd06ea Mon Sep 17 00:00:00 2001 From: danielmarbach Date: Tue, 30 Mar 2021 21:53:03 +0200 Subject: [PATCH 5/7] Receiver Test --- .../tests/Receiver/ReceiverLiveTests.cs | 37 +++++++++++++++++++ 1 file changed, 37 insertions(+) diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Receiver/ReceiverLiveTests.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Receiver/ReceiverLiveTests.cs index 2ca1af130a4c5..2f279728f37de 100644 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Receiver/ReceiverLiveTests.cs +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Receiver/ReceiverLiveTests.cs @@ -99,6 +99,43 @@ public async Task PeekSingleMessage() } } + [Test] + public async Task ReceiveMessagesWhenQueueEmpty() + { + await using (var scope = await ServiceBusScope.CreateWithQueue(enablePartitioning: false, enableSession: false)) + { + await using var client = new ServiceBusClient(TestEnvironment.ServiceBusConnectionString, new ServiceBusClientOptions + { + RetryOptions = + { + // very high TryTimeout + TryTimeout = TimeSpan.FromSeconds(120) + } + }); + + var messageCount = 2; + ServiceBusSender sender = client.CreateSender(scope.QueueName); + using ServiceBusMessageBatch batch = await sender.CreateMessageBatchAsync(); + IEnumerable messages = AddMessages(batch, messageCount).AsEnumerable(); + await sender.SendMessagesAsync(batch); + + var receiver = client.CreateReceiver(scope.QueueName); + + foreach (var message in await receiver.ReceiveMessagesAsync(2)) + { + await receiver.CompleteMessageAsync(message); + } + + using var cancellationTokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(3)); + + var start = DateTime.UtcNow; + Assert.ThrowsAsync(async () => await receiver.ReceiveMessagesAsync(1, cancellationToken: cancellationTokenSource.Token)); + var stop = DateTime.UtcNow; + + Assert.That(stop - start, Is.EqualTo(TimeSpan.FromSeconds(3)).Within(TimeSpan.FromSeconds(3))); + } + } + [Test] public async Task ReceiveMessagesInPeekLockMode() { From 34812779725bffbabd23c0aa40b7b9d8395ab365 Mon Sep 17 00:00:00 2001 From: danielmarbach Date: Tue, 30 Mar 2021 21:57:59 +0200 Subject: [PATCH 6/7] SessionTest --- .../Receiver/SessionReceiverLiveTests.cs | 43 +++++++++++++++++++ 1 file changed, 43 insertions(+) diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Receiver/SessionReceiverLiveTests.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Receiver/SessionReceiverLiveTests.cs index c2cb0d312ab3e..2efa190a19945 100644 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Receiver/SessionReceiverLiveTests.cs +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Receiver/SessionReceiverLiveTests.cs @@ -200,6 +200,49 @@ public async Task RoundRobinSessions() } } + [Test] + public async Task ReceiveMessagesWhenQueueEmpty() + { + await using (var scope = await ServiceBusScope.CreateWithQueue(enablePartitioning: false, enableSession: true)) + { + await using var client = new ServiceBusClient(TestEnvironment.ServiceBusConnectionString, new ServiceBusClientOptions + { + RetryOptions = + { + // very high TryTimeout + TryTimeout = TimeSpan.FromSeconds(120) + } + }); + + var messageCount = 2; + var sessionId = "sessionId1"; + ServiceBusSender sender = client.CreateSender(scope.QueueName); + using ServiceBusMessageBatch batch = await sender.CreateMessageBatchAsync(); + IEnumerable messages = AddMessages(batch, messageCount, sessionId).AsEnumerable(); + await sender.SendMessagesAsync(batch); + + ServiceBusReceiver receiver = await client.AcceptNextSessionAsync( + scope.QueueName, + new ServiceBusSessionReceiverOptions + { + PrefetchCount = 100 + }); + + foreach (var message in await receiver.ReceiveMessagesAsync(2)) + { + await receiver.CompleteMessageAsync(message); + } + + using var cancellationTokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(3)); + + var start = DateTime.UtcNow; + Assert.ThrowsAsync(async () => await receiver.ReceiveMessagesAsync(1, cancellationToken: cancellationTokenSource.Token)); + var stop = DateTime.UtcNow; + + Assert.That(stop - start, Is.EqualTo(TimeSpan.FromSeconds(3)).Within(TimeSpan.FromSeconds(3))); + } + } + [Test] public async Task ReceiveMessagesInPeekLockMode() { From 72ff1f78acfa1f2c694a06cb40427cc2db7d1f4d Mon Sep 17 00:00:00 2001 From: danielmarbach Date: Tue, 30 Mar 2021 22:10:22 +0200 Subject: [PATCH 7/7] Materialize exceptions from EndReceiveMessages --- .../src/Amqp/AmqpReceiver.cs | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpReceiver.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpReceiver.cs index 114b895817501..523f7aad4195f 100644 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpReceiver.cs +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpReceiver.cs @@ -318,11 +318,18 @@ private async Task> ReceiveMessagesAsyn var (link, receiveMessagesCompletionSource) = ((ReceivingAmqpLink, TaskCompletionSource>))asyncResult .AsyncState; - bool received = - link.EndReceiveMessages(asyncResult, out IEnumerable amqpMessages); - receiveMessagesCompletionSource.TrySetResult(received - ? amqpMessages - : Enumerable.Empty()); + try + { + bool received = + link.EndReceiveMessages(asyncResult, out IEnumerable amqpMessages); + receiveMessagesCompletionSource.TrySetResult(received + ? amqpMessages + : Enumerable.Empty()); + } + catch (Exception e) + { + receiveMessagesCompletionSource.TrySetException(e); + } }, (link, maxMessages, maxWaitTime, timeout, receiveMessagesCompletionSource), default