-
Notifications
You must be signed in to change notification settings - Fork 4.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Stop cancellation spike #19888
Stop cancellation spike #19888
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -284,10 +284,19 @@ private async Task<IReadOnlyList<ServiceBusReceivedMessage>> ReceiveMessagesAsyn | |||||
{ | ||||||
if (!_receiveLink.TryGetOpenedObject(out link)) | ||||||
{ | ||||||
link = await _receiveLink.GetOrCreateAsync(UseMinimum(_connectionScope.SessionTimeout, timeout)).ConfigureAwait(false); | ||||||
link = await _receiveLink.GetOrCreateAsync(UseMinimum(_connectionScope.SessionTimeout, timeout)) | ||||||
.ConfigureAwait(false); | ||||||
} | ||||||
|
||||||
cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>(); | ||||||
|
||||||
using var registration = cancellationToken.Register(static state => | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should this be There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it doesn't implement There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. bah. I got suckered into looking at |
||||||
{ | ||||||
ReceivingAmqpLink receiveLink = (ReceivingAmqpLink)state; | ||||||
// deliberate fire & forget since this is a best effort and we are not interested in any exceptions | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We should probably register a continuation and capture any exceptions as event source logs, even if |
||||||
_ = receiveLink.CloseAsync(TimeSpan.Zero); | ||||||
}, link, useSynchronizationContext: false); | ||||||
|
||||||
var messagesReceived = await Task.Factory.FromAsync | ||||||
( | ||||||
(callback, state) => link.BeginReceiveRemoteMessages( | ||||||
|
@@ -312,13 +321,18 @@ private async Task<IReadOnlyList<ServiceBusReceivedMessage>> ReceiveMessagesAsyn | |||||
{ | ||||||
link.DisposeDelivery(message, true, AmqpConstants.AcceptedOutcome); | ||||||
} | ||||||
|
||||||
receivedMessages.Add(AmqpMessageConverter.AmqpMessageToSBMessage(message)); | ||||||
message.Dispose(); | ||||||
} | ||||||
} | ||||||
|
||||||
return receivedMessages; | ||||||
} | ||||||
catch (OperationCanceledException) when(cancellationToken.IsCancellationRequested) | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah sorry it was really just a sloppy spike to see what you think about this approach There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. All good. I just can't turn off the OCD when reading things. 😛 |
||||||
{ | ||||||
throw; | ||||||
} | ||||||
catch (Exception exception) | ||||||
{ | ||||||
ExceptionDispatchInfo.Capture(AmqpExceptionHelper.TranslateException( | ||||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -3,6 +3,7 @@ | |
|
||
using System; | ||
using System.Collections.Generic; | ||
using System.Diagnostics; | ||
using System.Linq; | ||
using System.Threading; | ||
using System.Threading.Tasks; | ||
|
@@ -513,6 +514,45 @@ Task ProcessMessage(ProcessMessageEventArgs args) | |
} | ||
} | ||
|
||
[Test] | ||
public async Task StopProcessingAdheresToTokenSLA() | ||
{ | ||
await using (var scope = await ServiceBusScope.CreateWithQueue( | ||
enablePartitioning: false, | ||
enableSession: false)) | ||
{ | ||
// very long timeout | ||
await using var client = CreateClient(tryTimeout: 60); | ||
var sender = client.CreateSender(scope.QueueName); | ||
await sender.SendMessageAsync(GetMessage()); | ||
await using var processor = client.CreateProcessor(scope.QueueName, new ServiceBusProcessorOptions | ||
{ | ||
AutoCompleteMessages = true, | ||
}); | ||
var tcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously); | ||
|
||
Task ProcessMessage(ProcessMessageEventArgs args) | ||
{ | ||
tcs.TrySetResult(true); | ||
return Task.CompletedTask; | ||
} | ||
processor.ProcessMessageAsync += ProcessMessage; | ||
processor.ProcessErrorAsync += ExceptionHandler; | ||
|
||
await processor.StartProcessingAsync(); | ||
await tcs.Task; | ||
await Task.Delay(2000); // better way to do this? | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You could publish a couple of messages to the queue and then have your message handler set a TCS when all published messages were received. Here, you could await that TCS which would give you a deterministic point to know that a message had been received and, therefore, the processor tasks were active. It would also ensure that we know the queue is empty, since you've already seen all of the messages that you had published. Stopping under those conditions would have previously taken the |
||
|
||
using var cancellationTokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(3)); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would set this for something longer, like 10 minutes or so. That would help avoid flaky behavior in Live pipeline runs where we see some longer pauses in async calls. Something as short as 3 seconds would potentially cause intermittent failures if it takes too long to acquire the semaphore and the call aborts. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hmmm... Don't know I understand. The interval here is to be less than the TryTimeout. So why would you set it to 10 min? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That would seem to be a bad assumption on my part. I assumed the In that case, I'd likely push the |
||
|
||
var start = DateTime.UtcNow; | ||
await processor.StopProcessingAsync(cancellationTokenSource.Token); | ||
var stop = DateTime.UtcNow; | ||
|
||
Assert.That(stop - start, Is.EqualTo(TimeSpan.FromSeconds(3)).Within(TimeSpan.FromSeconds(3))); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hmm, does this test pass? The issue with stopping the processor is that we await the receive call that the receiver managers are doing. So when you call StopProcessing, it can still take up to 60s (the TryTimeout default value from the RetryOptions) for it to actually stop. The behavior we want is that when a user calls StopProcessing (or Close/Dispose) any ongoing Receive operations are immediately cancelled, but any ongoing user handlers that are running are allowed to complete. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
This is what this code does. The flow is the following:
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I may be overlooking something, but the gap that I see here is that the token that you're passing into There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. With the cooperative cancellation approach of .NET the cancellation token only applied to the flow of the method you pass it in. So from the perspective of the caller it really means stop processing and here is the SLA that I'm going to give you to do so. And you are correct that the token is not "lifted" to the handlers. The token in the handlers is immediately flagged to give the handler methods a possibility to stop doing what they are doing to gracefully stop things (and yes graceful can mean throwing because of cancellation). Once the SLA of the stop is reached the stop processing method returns and from that stand point it is like switching the lights off.
Not sure what you mean here. Which semaphore do you mean? There are multiple semaphores at play. Once is to make sure a single caller can stop and another one is to make sure the concurrency settings are guaranteed. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Apologies for the confusion. The point that I was making is that cancellation in There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fair enough. I followed the proposal in the original issue description in this spike. Maybe I find a few minutes to do the other approach as a community PR. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @JoshLove-msft @jsquire Quick question. I think I have a good proposal. Just wanted to clarify something. My plan is to implement it in the AMQP abstractions of the service bus library as close to the bare metal as possible so that regular receives can also properly benefit from the cancellation token and not just the processor. What do you think? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this makes sense. Ideally it would be in the AMQP library itself. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I strongly agree with both of those statements. I don't see much opportunity for this to move into the AMQP library, given the history of attempts there.... the next best thing would be to go with Daniel's proposal. Assuming that the "close it and things don't blow up" works, the form that is used here is quite solid. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Here we go gents #19955 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Rather than asserting on the timing here, I'd recommend asserting that the stop completed and that the token you passed to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Well technically if we would rethrow the OperationCancelledException we could pass the token that triggered the cancellation to the exception and then verify that. But then we would need to catch the exception at other stages so that the user code still behaves the same on stop. I need to digest this input. Don't know if I understand it yet |
||
} | ||
} | ||
|
||
[Test] | ||
[TestCase("")] | ||
[TestCase("Abandon")] | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I.... did not know that
Register
was a thing. So much nicer than what my initial set of thoughts looked like for this!