Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ability to cancel receive operations #19955

Merged
merged 7 commits into from
Mar 30, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 61 additions & 29 deletions sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpReceiver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -282,57 +282,89 @@ private async Task<IReadOnlyList<ServiceBusReceivedMessage>> ReceiveMessagesAsyn
{
if (!_receiveLink.TryGetOpenedObject(out link))
{
link = await _receiveLink.GetOrCreateAsync(UseMinimum(_connectionScope.SessionTimeout, timeout)).ConfigureAwait(false);
link = await _receiveLink.GetOrCreateAsync(UseMinimum(_connectionScope.SessionTimeout, timeout))
.ConfigureAwait(false);
}

cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();

var messagesReceived = await Task.Factory
.FromAsync<(ReceivingAmqpLink, int, TimeSpan?, TimeSpan), IEnumerable<AmqpMessage>>
(
static (arguments, callback, state) =>
{
var (link, maxMessages, maxWaitTime, timeout) = arguments;
return link.BeginReceiveRemoteMessages(
maxMessages,
TimeSpan.FromMilliseconds(20),
maxWaitTime ?? timeout,
callback,
link);
},
static asyncResult =>
{
var link = (ReceivingAmqpLink)asyncResult.AsyncState;
bool received = link.EndReceiveMessages(asyncResult, out IEnumerable<AmqpMessage> amqpMessages);
return received ? amqpMessages : Enumerable.Empty<AmqpMessage>();
},
(link, maxMessages, maxWaitTime, timeout),
default
var receiveMessagesCompletionSource =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately, I think we may need to revert this. @jsquire pointed out that with this approach we will just be leaving receive operations hanging on the AMQP link, which will cause a backup. I confirmed this with a test that attempts to receive after a previous cancel. I think we will need to either limit the scope of this change to just StopProcessing calls, because in that case it is okay that receive operations are blocked, or better yet, see if we can contribute Cancellation token support to the AMQP library.
/cc @xinchen10

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

        public async Task CancellingDoesNotBlockSubsequentReceives(bool prefetch)
        {
            await using (var scope = await ServiceBusScope.CreateWithQueue(enablePartitioning: false, enableSession: false))
            {
                await using var client = CreateClient();

                ServiceBusSender sender = client.CreateSender(scope.QueueName);
                var receiver = client.CreateReceiver(scope.QueueName, new ServiceBusReceiverOptions { PrefetchCount = prefetch ? 10 : 0 });

                using var cancellationTokenSource = new CancellationTokenSource(2000);
                var start = DateTime.UtcNow;

                Assert.That(
                    async () => await receiver.ReceiveMessageAsync(TimeSpan.FromSeconds(60), cancellationToken: cancellationTokenSource.Token),
                    Throws.InstanceOf<TaskCanceledException>());

                await sender.SendMessageAsync(GetMessage());
                var msg = await receiver.ReceiveMessageAsync();
                Assert.AreEqual(1, msg.DeliveryCount);
                var end = DateTime.UtcNow;
                Assert.NotNull(msg);
                Assert.Less(end - start, TimeSpan.FromSeconds(5));
            }
        }

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The above test fails on the second receive call as we are blocked on the cancelled receive.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's why we originally closed the link in the other PR but that has also other drawbacks. I think even StopProcessing can be problematic because the processor is designed to be restarted right?

Copy link
Member

@JoshLove-msft JoshLove-msft Mar 31, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the processor can be restarted - actually StopProcessing just stops receiving rather than closing any links. Close/Dispose would close links. I really think the best way forward is to try to get this integrated into the AMQP lib, so that we can actually end the operations early instead of ignoring them.

Copy link
Member

@JoshLove-msft JoshLove-msft Mar 31, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would vote for option 2.

Copy link
Member

@JoshLove-msft JoshLove-msft Apr 1, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For 1, I don't think cancelling pending receive calls when totalCredit is 0 is sufficient because there could be concurrent receives occurring on the same link. Even with option 2, we wouldn't be able to correlate receive calls with ReceiveAsyncResults. IMO the cancellation token provides the best user experience.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I really have a hard time to understand all the push back against cancellationtoken. Cooperative cancellation is the defacto standard in dotnet for IO bound operations. It is present almost anywhere in moderns async enabled API even in the runtime as well as across the ecosystem. Even the SDK guidance of the whole azure SDK where a lot of people have contributed to and intense user studies have been done adheres to those principles because this is how this ecosystem works. So why so much push back?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@danielmarbach the push back is not for cancellation tokens. Its more about supporting the shutdown scenario in a better way that also makes sense to AMQP (I admit that I am influenced more by other AMQP implementations, especially the Apache Qpid products and their JMS implementation). Your PR to the AMQP library (thank you for that) adds cancellation token to the receive method only. It gives a feeling that the library API is created on a needed basis and it was done just to make the shutdown scenario work. To properly support cancellation tokens, we will also need to look at other Task based APIs.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair enough. Unfortunately, I'm only a community contributor without corporate backing so the only thing I could commit to in my precious spare time was exactly that. Your comment put things under a different light, and it sounds more like the door is open rather than the door is closed which I have potentially unrightfully experienced or shall I say read into the conversations. I appreciate you taking the time and clarify that.

If there would be some way to openly share on the repo some plans, ideas, directions including things that could be done I'm happy to contribute a few things when I have time, and it fits my small knowledge area that I have of the AMQP lib. For me, it boils down to have this project under some sort of active governance and communication plan to see where things are heading to (or not).

new TaskCompletionSource<IEnumerable<AmqpMessage>>(TaskCreationOptions
.RunContinuationsAsynchronously);

using var registration = cancellationToken.Register(static state =>
{
var tcs = (TaskCompletionSource<IEnumerable<AmqpMessage>>)state;
tcs.TrySetCanceled();
}, receiveMessagesCompletionSource, useSynchronizationContext: false);

// in case BeginReceiveRemoteMessages throws exception will be materialized on the synchronous path
_ = Task.Factory
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At first I had an approach following this pattern

var receiveTask = Receive(...)
var completed = await Task.WhenAny(receiveTask, tcs.Task).ConfigureAwait(false)
if(completed == tcs.Task)
   await tcs.Task.ConfigureAwait()

await receiveTask.ConfigureAwait()

Copy link
Contributor Author

@danielmarbach danielmarbach Mar 30, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But then I figure with this approach we always pay the price of the array allocation of WhenAny plus the additional conditions on the path including the state machinery. So I ended up always using the TCS to materialize either the result, exceptions of end or the cancellation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me know if you prefer the WhenAny approach. While it has more state machine involved and allocates the WhenAny array it wouldn't require us to box the value tuple and might make the code slightly more straightforward to read at the cost of more allocations

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like your implementation better; I sketched out the WhenAny approach in the issue, but had no idea that Register was a thing. While this does introduce a bit more density in the FromAsync machinery, it already had some complexity, With your current approach, I find the flow easier to follow than the "which task completed" juggling that WaitAny would require.

.FromAsync<(ReceivingAmqpLink, int, TimeSpan?, TimeSpan,
TaskCompletionSource<IEnumerable<AmqpMessage>>)>
(
static(arguments, callback, state) =>
{
var (link, maxMessages, maxWaitTime, timeout, receiveMessagesCompletionSource) = arguments;
return link.BeginReceiveRemoteMessages(
maxMessages,
TimeSpan.FromMilliseconds(20),
maxWaitTime ?? timeout,
callback,
(link, receiveMessagesCompletionSource));
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comes with the cost of boxing yet I still think it is better than the alternative describe above

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed.

},
static asyncResult =>
{
var (link, receiveMessagesCompletionSource) =
((ReceivingAmqpLink, TaskCompletionSource<IEnumerable<AmqpMessage>>))asyncResult
.AsyncState;
try
{
bool received =
link.EndReceiveMessages(asyncResult, out IEnumerable<AmqpMessage> amqpMessages);
receiveMessagesCompletionSource.TrySetResult(received
? amqpMessages
: Enumerable.Empty<AmqpMessage>());
}
catch (Exception e)
{
receiveMessagesCompletionSource.TrySetException(e);
}
},
(link, maxMessages, maxWaitTime, timeout, receiveMessagesCompletionSource),
default
).ConfigureAwait(false);

cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
var messagesReceived = await receiveMessagesCompletionSource.Task
.ConfigureAwait(false);

// If event messages were received, then package them for consumption and
// return them.

foreach (AmqpMessage message in messagesReceived)
{
if (_receiveMode == ServiceBusReceiveMode.ReceiveAndDelete)
{
link.DisposeDelivery(message, true, AmqpConstants.AcceptedOutcome);
}

receivedMessages.Add(AmqpMessageConverter.AmqpMessageToSBMessage(message));
message.Dispose();
}

return receivedMessages;
}
catch (OperationCanceledException)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we check if cancellationToken.IsCancellationRequest? And also possibly restrict to TaskCanceledException?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIRC, we can't restrict there because the completion source will throw the OperationCanceledException. We try to normalize everything to TaskCanceledException around the SDK so that is what callers see. The Service Bus and Event Hubs troubleshooting guides attribute a specific meaning to OperationCanceled that indicates service behavior and we wanted to avoid confusion.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OperationCanceledException is the base of TaskCanceledException so I figured then when conditions can be removed.

{
throw;
}
catch (Exception exception)
{
ExceptionDispatchInfo.Capture(AmqpExceptionHelper.TranslateException(
exception,
link?.GetTrackingId(),
null,
!cancellationToken.IsCancellationRequested && HasLinkCommunicationError(link)))
.Throw();
exception,
link?.GetTrackingId(),
null,
!cancellationToken.IsCancellationRequested && HasLinkCommunicationError(link)))
.Throw();

throw; // will never be reached
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -637,7 +637,7 @@ public virtual async Task StopProcessingAsync(CancellationToken cancellationToke
{
await ActiveReceiveTask.ConfigureAwait(false);
}
catch (Exception ex) when (ex is TaskCanceledException || ex is OperationCanceledException)
catch (OperationCanceledException)
{
// Nothing to do here. These exceptions are expected.
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
Expand Down Expand Up @@ -513,6 +514,45 @@ Task ProcessMessage(ProcessMessageEventArgs args)
}
}

[Test]
public async Task StopProcessingAdheresToTokenSLA()
{
await using (var scope = await ServiceBusScope.CreateWithQueue(
enablePartitioning: false,
enableSession: false))
{
// very long timeout
await using var client = CreateClient(tryTimeout: 120);
var sender = client.CreateSender(scope.QueueName);
await sender.SendMessageAsync(GetMessage());
await using var processor = client.CreateProcessor(scope.QueueName, new ServiceBusProcessorOptions
{
AutoCompleteMessages = true,
});
var tcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);

Task ProcessMessage(ProcessMessageEventArgs args)
{
tcs.TrySetResult(true);
return Task.CompletedTask;
}
processor.ProcessMessageAsync += ProcessMessage;
processor.ProcessErrorAsync += ExceptionHandler;

await processor.StartProcessingAsync();
await tcs.Task;
await Task.Delay(10000); // wait long enough to be hanging in the next receive on the empty queue
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jsquire I tested the other proposed approach and unfortunately those tests passed even when I reverted my cancellation changes in the receive method. They passed because it wasn't guaranteed that the code was hanging in another receive attempt


using var cancellationTokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(3));

var start = DateTime.UtcNow;
await processor.StopProcessingAsync(cancellationTokenSource.Token);
Copy link
Member

@JoshLove-msft JoshLove-msft Mar 30, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we also have a test that doesn't pass a token here (or update an existing test to assert the time elapsed)? It should still stop processing pretty quickly (or at least as quick as the user handler takes to complete). We would also want a test that verifies that stopping still allows in-flight user handlers to complete.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can also add these tests in a follow up PR.

var stop = DateTime.UtcNow;

Assert.That(stop - start, Is.EqualTo(TimeSpan.FromSeconds(3)).Within(TimeSpan.FromSeconds(3)));
}
}

[Test]
[TestCase("")]
[TestCase("Abandon")]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,43 @@ public async Task PeekSingleMessage()
}
}

[Test]
public async Task ReceiveMessagesWhenQueueEmpty()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we also want to prove that cancelling won't increment the delivery count for any messages that were already in the Amqp library's local buffer. This may be hard to do, but maybe we can try sending a message just before we cancel?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can also add these tests in a follow up PR.

{
await using (var scope = await ServiceBusScope.CreateWithQueue(enablePartitioning: false, enableSession: false))
{
await using var client = new ServiceBusClient(TestEnvironment.ServiceBusConnectionString, new ServiceBusClientOptions
{
RetryOptions =
{
// very high TryTimeout
TryTimeout = TimeSpan.FromSeconds(120)
}
});

var messageCount = 2;
ServiceBusSender sender = client.CreateSender(scope.QueueName);
using ServiceBusMessageBatch batch = await sender.CreateMessageBatchAsync();
IEnumerable<ServiceBusMessage> messages = AddMessages(batch, messageCount).AsEnumerable<ServiceBusMessage>();
await sender.SendMessagesAsync(batch);

var receiver = client.CreateReceiver(scope.QueueName);

foreach (var message in await receiver.ReceiveMessagesAsync(2))
{
await receiver.CompleteMessageAsync(message);
}

using var cancellationTokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(3));

var start = DateTime.UtcNow;
Assert.ThrowsAsync<TaskCanceledException>(async () => await receiver.ReceiveMessagesAsync(1, cancellationToken: cancellationTokenSource.Token));
var stop = DateTime.UtcNow;

Assert.That(stop - start, Is.EqualTo(TimeSpan.FromSeconds(3)).Within(TimeSpan.FromSeconds(3)));
}
}

[Test]
public async Task ReceiveMessagesInPeekLockMode()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,49 @@ public async Task RoundRobinSessions()
}
}

[Test]
public async Task ReceiveMessagesWhenQueueEmpty()
{
await using (var scope = await ServiceBusScope.CreateWithQueue(enablePartitioning: false, enableSession: true))
{
await using var client = new ServiceBusClient(TestEnvironment.ServiceBusConnectionString, new ServiceBusClientOptions
{
RetryOptions =
{
// very high TryTimeout
TryTimeout = TimeSpan.FromSeconds(120)
}
});

var messageCount = 2;
var sessionId = "sessionId1";
ServiceBusSender sender = client.CreateSender(scope.QueueName);
using ServiceBusMessageBatch batch = await sender.CreateMessageBatchAsync();
IEnumerable<ServiceBusMessage> messages = AddMessages(batch, messageCount, sessionId).AsEnumerable<ServiceBusMessage>();
await sender.SendMessagesAsync(batch);

ServiceBusReceiver receiver = await client.AcceptNextSessionAsync(
scope.QueueName,
new ServiceBusSessionReceiverOptions
{
PrefetchCount = 100
});

foreach (var message in await receiver.ReceiveMessagesAsync(2))
{
await receiver.CompleteMessageAsync(message);
}

using var cancellationTokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(3));

var start = DateTime.UtcNow;
Assert.ThrowsAsync<TaskCanceledException>(async () => await receiver.ReceiveMessagesAsync(1, cancellationToken: cancellationTokenSource.Token));
var stop = DateTime.UtcNow;

Assert.That(stop - start, Is.EqualTo(TimeSpan.FromSeconds(3)).Within(TimeSpan.FromSeconds(3)));
}
}

[Test]
public async Task ReceiveMessagesInPeekLockMode()
{
Expand Down