-
Notifications
You must be signed in to change notification settings - Fork 4.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Ability to cancel receive operations #19955
Conversation
Thank you for your contribution @danielmarbach! We will review the pull request and get back to you soon. |
TimeSpan.FromMilliseconds(20), | ||
maxWaitTime ?? timeout, | ||
callback, | ||
(link, receiveMessagesCompletionSource)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This comes with the cost of boxing yet I still think it is better than the alternative describe above
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed.
}, receiveMessagesCompletionSource, useSynchronizationContext: false); | ||
|
||
// in case BeginReceiveRemoteMessages throws exception will be materialized on the synchronous path | ||
_ = Task.Factory |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
At first I had an approach following this pattern
var receiveTask = Receive(...)
var completed = await Task.WhenAny(receiveTask, tcs.Task).ConfigureAwait(false)
if(completed == tcs.Task)
await tcs.Task.ConfigureAwait()
await receiveTask.ConfigureAwait()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But then I figure with this approach we always pay the price of the array allocation of WhenAny
plus the additional conditions on the path including the state machinery. So I ended up always using the TCS to materialize either the result, exceptions of end or the cancellation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let me know if you prefer the WhenAny
approach. While it has more state machine involved and allocates the WhenAny array it wouldn't require us to box the value tuple and might make the code slightly more straightforward to read at the cost of more allocations
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like your implementation better; I sketched out the WhenAny
approach in the issue, but had no idea that Register
was a thing. While this does introduce a bit more density in the FromAsync
machinery, it already had some complexity, With your current approach, I find the flow easier to follow than the "which task completed" juggling that WaitAny
would require.
|
||
await processor.StartProcessingAsync(); | ||
await tcs.Task; | ||
await Task.Delay(10000); // wait long enough to be hanging in the next receive on the empty queue |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jsquire I tested the other proposed approach and unfortunately those tests passed even when I reverted my cancellation changes in the receive method. They passed because it wasn't guaranteed that the code was hanging in another receive attempt
@@ -99,6 +99,43 @@ await using (var scope = await ServiceBusScope.CreateWithQueue(enablePartitionin | |||
} | |||
} | |||
|
|||
[Test] | |||
public async Task ReceiveMessagesWhenQueueEmpty() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we also want to prove that cancelling won't increment the delivery count for any messages that were already in the Amqp library's local buffer. This may be hard to do, but maybe we can try sending a message just before we cancel?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can also add these tests in a follow up PR.
using var cancellationTokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(3)); | ||
|
||
var start = DateTime.UtcNow; | ||
await processor.StopProcessingAsync(cancellationTokenSource.Token); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we also have a test that doesn't pass a token here (or update an existing test to assert the time elapsed)? It should still stop processing pretty quickly (or at least as quick as the user handler takes to complete). We would also want a test that verifies that stopping still allows in-flight user handlers to complete.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can also add these tests in a follow up PR.
Addresses #17734 |
receivedMessages.Add(AmqpMessageConverter.AmqpMessageToSBMessage(message)); | ||
message.Dispose(); | ||
} | ||
|
||
return receivedMessages; | ||
} | ||
catch (OperationCanceledException) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we check if cancellationToken.IsCancellationRequest? And also possibly restrict to TaskCanceledException?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IIRC, we can't restrict there because the completion source will throw the OperationCanceledException
. We try to normalize everything to TaskCanceledException
around the SDK so that is what callers see. The Service Bus and Event Hubs troubleshooting guides attribute a specific meaning to OperationCanceled
that indicates service behavior and we wanted to avoid confusion.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OperationCanceledException is the base of TaskCanceledException so I figured then when conditions can be removed.
/azp run net - servicebus - tests |
Azure Pipelines successfully started running 1 pipeline(s). |
This is awesome! Thanks @danielmarbach |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Late to the party, but LGTM.
}, receiveMessagesCompletionSource, useSynchronizationContext: false); | ||
|
||
// in case BeginReceiveRemoteMessages throws exception will be materialized on the synchronous path | ||
_ = Task.Factory |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like your implementation better; I sketched out the WhenAny
approach in the issue, but had no idea that Register
was a thing. While this does introduce a bit more density in the FromAsync
machinery, it already had some complexity, With your current approach, I find the flow easier to follow than the "which task completed" juggling that WaitAny
would require.
receivedMessages.Add(AmqpMessageConverter.AmqpMessageToSBMessage(message)); | ||
message.Dispose(); | ||
} | ||
|
||
return receivedMessages; | ||
} | ||
catch (OperationCanceledException) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IIRC, we can't restrict there because the completion source will throw the OperationCanceledException
. We try to normalize everything to TaskCanceledException
around the SDK so that is what callers see. The Service Bus and Event Hubs troubleshooting guides attribute a specific meaning to OperationCanceled
that indicates service behavior and we wanted to avoid confusion.
TimeSpan.FromMilliseconds(20), | ||
maxWaitTime ?? timeout, | ||
callback, | ||
(link, receiveMessagesCompletionSource)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed.
}, | ||
(link, maxMessages, maxWaitTime, timeout), | ||
default | ||
var receiveMessagesCompletionSource = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unfortunately, I think we may need to revert this. @jsquire pointed out that with this approach we will just be leaving receive operations hanging on the AMQP link, which will cause a backup. I confirmed this with a test that attempts to receive after a previous cancel. I think we will need to either limit the scope of this change to just StopProcessing calls, because in that case it is okay that receive operations are blocked, or better yet, see if we can contribute Cancellation token support to the AMQP library.
/cc @xinchen10
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
public async Task CancellingDoesNotBlockSubsequentReceives(bool prefetch)
{
await using (var scope = await ServiceBusScope.CreateWithQueue(enablePartitioning: false, enableSession: false))
{
await using var client = CreateClient();
ServiceBusSender sender = client.CreateSender(scope.QueueName);
var receiver = client.CreateReceiver(scope.QueueName, new ServiceBusReceiverOptions { PrefetchCount = prefetch ? 10 : 0 });
using var cancellationTokenSource = new CancellationTokenSource(2000);
var start = DateTime.UtcNow;
Assert.That(
async () => await receiver.ReceiveMessageAsync(TimeSpan.FromSeconds(60), cancellationToken: cancellationTokenSource.Token),
Throws.InstanceOf<TaskCanceledException>());
await sender.SendMessageAsync(GetMessage());
var msg = await receiver.ReceiveMessageAsync();
Assert.AreEqual(1, msg.DeliveryCount);
var end = DateTime.UtcNow;
Assert.NotNull(msg);
Assert.Less(end - start, TimeSpan.FromSeconds(5));
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The above test fails on the second receive call as we are blocked on the cancelled receive.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's why we originally closed the link in the other PR but that has also other drawbacks. I think even StopProcessing can be problematic because the processor is designed to be restarted right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, the processor can be restarted - actually StopProcessing just stops receiving rather than closing any links. Close/Dispose would close links. I really think the best way forward is to try to get this integrated into the AMQP lib, so that we can actually end the operations early instead of ignoring them.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would vote for option 2.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For 1, I don't think cancelling pending receive calls when totalCredit is 0 is sufficient because there could be concurrent receives occurring on the same link. Even with option 2, we wouldn't be able to correlate receive calls with ReceiveAsyncResults. IMO the cancellation token provides the best user experience.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I really have a hard time to understand all the push back against cancellationtoken. Cooperative cancellation is the defacto standard in dotnet for IO bound operations. It is present almost anywhere in moderns async enabled API even in the runtime as well as across the ecosystem. Even the SDK guidance of the whole azure SDK where a lot of people have contributed to and intense user studies have been done adheres to those principles because this is how this ecosystem works. So why so much push back?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@danielmarbach the push back is not for cancellation tokens. Its more about supporting the shutdown scenario in a better way that also makes sense to AMQP (I admit that I am influenced more by other AMQP implementations, especially the Apache Qpid products and their JMS implementation). Your PR to the AMQP library (thank you for that) adds cancellation token to the receive method only. It gives a feeling that the library API is created on a needed basis and it was done just to make the shutdown scenario work. To properly support cancellation tokens, we will also need to look at other Task based APIs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fair enough. Unfortunately, I'm only a community contributor without corporate backing so the only thing I could commit to in my precious spare time was exactly that. Your comment put things under a different light, and it sounds more like the door is open rather than the door is closed which I have potentially unrightfully experienced or shall I say read into the conversations. I appreciate you taking the time and clarify that.
If there would be some way to openly share on the repo some plans, ideas, directions including things that could be done I'm happy to contribute a few things when I have time, and it fits my small knowledge area that I have of the AMQP lib. For me, it boils down to have this project under some sort of active governance and communication plan to see where things are heading to (or not).
[SQL] Bump ManagedDatabaseRestoreDetails and ManagedDatabase version in v5 tag (Azure#19955) * Bump managedDatabaseRestoreDetails version * bump managed databases version as well
Alternative to #19888
Closes #19306
All SDK Contribution checklist:
This checklist is used to make sure that common guidelines for a pull request are followed.
Draft
mode if it is:General Guidelines and Best Practices
Testing Guidelines
SDK Generation Guidelines
*.csproj
andAssemblyInfo.cs
files have been updated with the new version of the SDK. Please double check nuget.org current release version.Additional management plane SDK specific contribution checklist:
Note: Only applies to
Microsoft.Azure.Management.[RP]
orAzure.ResourceManager.[RP]
Management plane SDK Troubleshooting
new service
label and/or contact assigned reviewer.Verify Code Generation
step, please ensure:generate.ps1/cmd
to generate this PR instead of callingautorest
directly.Please pay attention to the @microsoft.csharp version output after running generate.ps1. If it is lower than current released version (2.3.82), please run it again as it should pull down the latest version,
Old outstanding PR cleanup
Please note:
If PRs (including draft) has been out for more than 60 days and there are no responses from our query or followups, they will be closed to maintain a concise list for our reviewers.