-
Notifications
You must be signed in to change notification settings - Fork 4.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Stop cancellation spike #19888
Stop cancellation spike #19888
Conversation
Thank you for your contribution @danielmarbach! We will review the pull request and get back to you soon. |
await processor.StopProcessingAsync(cancellationTokenSource.Token); | ||
var stop = DateTime.UtcNow; | ||
|
||
Assert.That(stop - start, Is.EqualTo(TimeSpan.FromSeconds(3)).Within(TimeSpan.FromSeconds(3))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, does this test pass? The issue with stopping the processor is that we await the receive call that the receiver managers are doing. So when you call StopProcessing, it can still take up to 60s (the TryTimeout default value from the RetryOptions) for it to actually stop. The behavior we want is that when a user calls StopProcessing (or Close/Dispose) any ongoing Receive operations are immediately cancelled, but any ongoing user handlers that are running are allowed to complete.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In a recent conversation with the AMQP library team, it was suggested that force-closing an active AMQP link could be safely used to cancel an operation in-flight. This would potentially allow the client types to respect cancellation while also gracefully shutting down and cleaning up.
This is what this code does.
The flow is the following:
- Once
StopProcessingAsync
is called the cancellation token that signals the event handlers things are about to shutdown is triggered (as before). - This leaves handlers the chance to complete either indefinitely (when no cancellation token provided to
StopProcessingAsync
) or up "SLA" enforced by the token passed into theStopProcessingAsync
. Once the token triggers the link is forcefully closed which then makes theTask.Factory.FromAsync
part return and throw right next due to the token being canceled.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I may be overlooking something, but the gap that I see here is that the token that you're passing into StopProcessingAsync
is only used in the scope of that method to request that the stop operation be aborted. It doesn't get propagated to the handlers, so it really only allows for us to abort when the semaphore is taking too long to acquire. No?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With the cooperative cancellation approach of .NET the cancellation token only applied to the flow of the method you pass it in. So from the perspective of the caller it really means stop processing and here is the SLA that I'm going to give you to do so. And you are correct that the token is not "lifted" to the handlers. The token in the handlers is immediately flagged to give the handler methods a possibility to stop doing what they are doing to gracefully stop things (and yes graceful can mean throwing because of cancellation). Once the SLA of the stop is reached the stop processing method returns and from that stand point it is like switching the lights off.
to the handlers, so it really only allows for us to abort when the semaphore is taking too long to acquire. No?
Not sure what you mean here. Which semaphore do you mean? There are multiple semaphores at play. Once is to make sure a single caller can stop and another one is to make sure the concurrency settings are guaranteed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Apologies for the confusion. The point that I was making is that cancellation in StopProcessingAsync
is best effort. We intentionally don't honor it if it would result in corruption or inconsistent state. If you make it past the check on Line 626, that's the point of no return. In the case that the processing task takes too long to stop, it will and should) ignore the token passed in.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fair enough. I followed the proposal in the original issue description in this spike. Maybe I find a few minutes to do the other approach as a community PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@JoshLove-msft @jsquire Quick question. I think I have a good proposal. Just wanted to clarify something. My plan is to implement it in the AMQP abstractions of the service bus library as close to the bare metal as possible so that regular receives can also properly benefit from the cancellation token and not just the processor. What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this makes sense. Ideally it would be in the AMQP library itself.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I strongly agree with both of those statements. I don't see much opportunity for this to move into the AMQP library, given the history of attempts there.... the next best thing would be to go with Daniel's proposal. Assuming that the "close it and things don't blow up" works, the form that is used here is quite solid.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here we go gents #19955
using var registration = cancellationToken.Register(static state => | ||
{ | ||
ReceivingAmqpLink receiveLink = (ReceivingAmqpLink)state; | ||
// deliberate fire & forget since this is a best effort and we are not interested in any exceptions |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should probably register a continuation and capture any exceptions as event source logs, even if verbose
, to help with insight if we ever need it.
cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>(); | ||
|
||
using var registration = cancellationToken.Register(static state => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I.... did not know that Register
was a thing. So much nicer than what my initial set of thoughts looked like for this!
cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>(); | ||
|
||
using var registration = cancellationToken.Register(static state => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be await using
since we're already in an asynchronous method?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it doesn't implement IAsyncDisposable
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
bah. I got suckered into looking at netstandard2.1
again by the docs site. Pay me no mind here.
receivedMessages.Add(AmqpMessageConverter.AmqpMessageToSBMessage(message)); | ||
message.Dispose(); | ||
} | ||
} | ||
|
||
return receivedMessages; | ||
} | ||
catch (OperationCanceledException) when(cancellationToken.IsCancellationRequested) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
catch (OperationCanceledException) when(cancellationToken.IsCancellationRequested) | |
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah sorry it was really just a sloppy spike to see what you think about this approach
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All good. I just can't turn off the OCD when reading things. 😛
|
||
await processor.StartProcessingAsync(); | ||
await tcs.Task; | ||
await Task.Delay(2000); // better way to do this? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You could publish a couple of messages to the queue and then have your message handler set a TCS when all published messages were received. Here, you could await that TCS which would give you a deterministic point to know that a message had been received and, therefore, the processor tasks were active.
It would also ensure that we know the queue is empty, since you've already seen all of the messages that you had published. Stopping under those conditions would have previously taken the TryTimeout
but should behave differently with these changes.
await tcs.Task; | ||
await Task.Delay(2000); // better way to do this? | ||
|
||
using var cancellationTokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(3)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would set this for something longer, like 10 minutes or so. That would help avoid flaky behavior in Live pipeline runs where we see some longer pauses in async calls. Something as short as 3 seconds would potentially cause intermittent failures if it takes too long to acquire the semaphore and the call aborts.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmmm... Don't know I understand. The interval here is to be less than the TryTimeout. So why would you set it to 10 min?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That would seem to be a bad assumption on my part. I assumed the 60
for TryTimeout was 60 minutes to be unreasonably long. I was advocating for using the cancellation as a sanity check to prevent the test from stalling for 60 minutes but still allowing for a good amount of time variance, which we often see in the pipeline runs. That's the pattern that we often take to try and keep maximum test stability.
In that case, I'd likely push the TryTimeout
to something much longer than 60 seconds for the same reaon. The concurrency and management plane use for the test suite tends to cause some long pauses in the pipelines when running.
await processor.StopProcessingAsync(cancellationTokenSource.Token); | ||
var stop = DateTime.UtcNow; | ||
|
||
Assert.That(stop - start, Is.EqualTo(TimeSpan.FromSeconds(3)).Within(TimeSpan.FromSeconds(3))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rather than asserting on the timing here, I'd recommend asserting that the stop completed and that the token you passed to StopProcessingAsync
has not been signaled. Since we know that the TryTimeout
is set to a super-long interval, if the stop completes before your cancellation token, we know that the ActiveReceiveTask
was torn down based on the RunningTaskTokenSource
cancellation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well technically if we would rethrow the OperationCancelledException we could pass the token that triggered the cancellation to the exception and then verify that. But then we would need to catch the exception at other stages so that the user code still behaves the same on stop.
I need to digest this input. Don't know if I understand it yet
await processor.StopProcessingAsync(cancellationTokenSource.Token); | ||
var stop = DateTime.UtcNow; | ||
|
||
Assert.That(stop - start, Is.EqualTo(TimeSpan.FromSeconds(3)).Within(TimeSpan.FromSeconds(3))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I may be overlooking something, but the gap that I see here is that the token that you're passing into StopProcessingAsync
is only used in the scope of that method to request that the stop operation be aborted. It doesn't get propagated to the handlers, so it really only allows for us to abort when the semaphore is taking too long to acquire. No?
I think that we may want an additional Live test that follows the general flow of Daniel's new test, but then publishes a few messages and restarts the processor. That would help us prove that the abort doesn't cause issues in the AMQP library state. Ideally, maybe doing that with the receiver would also be a good safety check. |
All SDK Contribution checklist:
This checklist is used to make sure that common guidelines for a pull request are followed.
Draft
mode if it is:General Guidelines and Best Practices
Testing Guidelines
SDK Generation Guidelines
*.csproj
andAssemblyInfo.cs
files have been updated with the new version of the SDK. Please double check nuget.org current release version.Additional management plane SDK specific contribution checklist:
Note: Only applies to
Microsoft.Azure.Management.[RP]
orAzure.ResourceManager.[RP]
Management plane SDK Troubleshooting
new service
label and/or contact assigned reviewer.Verify Code Generation
step, please ensure:generate.ps1/cmd
to generate this PR instead of callingautorest
directly.Please pay attention to the @microsoft.csharp version output after running generate.ps1. If it is lower than current released version (2.3.82), please run it again as it should pull down the latest version,
Old outstanding PR cleanup
Please note:
If PRs (including draft) has been out for more than 60 days and there are no responses from our query or followups, they will be closed to maintain a concise list for our reviewers.