Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stop cancellation spike #19888

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -284,10 +284,19 @@ private async Task<IReadOnlyList<ServiceBusReceivedMessage>> ReceiveMessagesAsyn
{
if (!_receiveLink.TryGetOpenedObject(out link))
{
link = await _receiveLink.GetOrCreateAsync(UseMinimum(_connectionScope.SessionTimeout, timeout)).ConfigureAwait(false);
link = await _receiveLink.GetOrCreateAsync(UseMinimum(_connectionScope.SessionTimeout, timeout))
.ConfigureAwait(false);
}

cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();

using var registration = cancellationToken.Register(static state =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I.... did not know that Register was a thing. So much nicer than what my initial set of thoughts looked like for this!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be await using since we're already in an asynchronous method?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it doesn't implement IAsyncDisposable

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

bah. I got suckered into looking at netstandard2.1 again by the docs site. Pay me no mind here.

{
ReceivingAmqpLink receiveLink = (ReceivingAmqpLink)state;
// deliberate fire & forget since this is a best effort and we are not interested in any exceptions
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should probably register a continuation and capture any exceptions as event source logs, even if verbose, to help with insight if we ever need it.

_ = receiveLink.CloseAsync(TimeSpan.Zero);
}, link, useSynchronizationContext: false);

var messagesReceived = await Task.Factory.FromAsync
(
(callback, state) => link.BeginReceiveRemoteMessages(
Expand All @@ -312,13 +321,18 @@ private async Task<IReadOnlyList<ServiceBusReceivedMessage>> ReceiveMessagesAsyn
{
link.DisposeDelivery(message, true, AmqpConstants.AcceptedOutcome);
}

receivedMessages.Add(AmqpMessageConverter.AmqpMessageToSBMessage(message));
message.Dispose();
}
}

return receivedMessages;
}
catch (OperationCanceledException) when(cancellationToken.IsCancellationRequested)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
catch (OperationCanceledException) when(cancellationToken.IsCancellationRequested)
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah sorry it was really just a sloppy spike to see what you think about this approach

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All good. I just can't turn off the OCD when reading things. 😛

{
throw;
}
catch (Exception exception)
{
ExceptionDispatchInfo.Capture(AmqpExceptionHelper.TranslateException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -637,7 +637,7 @@ public virtual async Task StopProcessingAsync(CancellationToken cancellationToke
{
await ActiveReceiveTask.ConfigureAwait(false);
}
catch (Exception ex) when (ex is TaskCanceledException || ex is OperationCanceledException)
catch (OperationCanceledException)
{
// Nothing to do here. These exceptions are expected.
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
Expand Down Expand Up @@ -513,6 +514,45 @@ Task ProcessMessage(ProcessMessageEventArgs args)
}
}

[Test]
public async Task StopProcessingAdheresToTokenSLA()
{
await using (var scope = await ServiceBusScope.CreateWithQueue(
enablePartitioning: false,
enableSession: false))
{
// very long timeout
await using var client = CreateClient(tryTimeout: 60);
var sender = client.CreateSender(scope.QueueName);
await sender.SendMessageAsync(GetMessage());
await using var processor = client.CreateProcessor(scope.QueueName, new ServiceBusProcessorOptions
{
AutoCompleteMessages = true,
});
var tcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);

Task ProcessMessage(ProcessMessageEventArgs args)
{
tcs.TrySetResult(true);
return Task.CompletedTask;
}
processor.ProcessMessageAsync += ProcessMessage;
processor.ProcessErrorAsync += ExceptionHandler;

await processor.StartProcessingAsync();
await tcs.Task;
await Task.Delay(2000); // better way to do this?
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could publish a couple of messages to the queue and then have your message handler set a TCS when all published messages were received. Here, you could await that TCS which would give you a deterministic point to know that a message had been received and, therefore, the processor tasks were active.

It would also ensure that we know the queue is empty, since you've already seen all of the messages that you had published. Stopping under those conditions would have previously taken the TryTimeout but should behave differently with these changes.


using var cancellationTokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(3));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would set this for something longer, like 10 minutes or so. That would help avoid flaky behavior in Live pipeline runs where we see some longer pauses in async calls. Something as short as 3 seconds would potentially cause intermittent failures if it takes too long to acquire the semaphore and the call aborts.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmm... Don't know I understand. The interval here is to be less than the TryTimeout. So why would you set it to 10 min?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That would seem to be a bad assumption on my part. I assumed the 60 for TryTimeout was 60 minutes to be unreasonably long. I was advocating for using the cancellation as a sanity check to prevent the test from stalling for 60 minutes but still allowing for a good amount of time variance, which we often see in the pipeline runs. That's the pattern that we often take to try and keep maximum test stability.

In that case, I'd likely push the TryTimeout to something much longer than 60 seconds for the same reaon. The concurrency and management plane use for the test suite tends to cause some long pauses in the pipelines when running.


var start = DateTime.UtcNow;
await processor.StopProcessingAsync(cancellationTokenSource.Token);
var stop = DateTime.UtcNow;

Assert.That(stop - start, Is.EqualTo(TimeSpan.FromSeconds(3)).Within(TimeSpan.FromSeconds(3)));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, does this test pass? The issue with stopping the processor is that we await the receive call that the receiver managers are doing. So when you call StopProcessing, it can still take up to 60s (the TryTimeout default value from the RetryOptions) for it to actually stop. The behavior we want is that when a user calls StopProcessing (or Close/Dispose) any ongoing Receive operations are immediately cancelled, but any ongoing user handlers that are running are allowed to complete.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

image

In a recent conversation with the AMQP library team, it was suggested that force-closing an active AMQP link could be safely used to cancel an operation in-flight. This would potentially allow the client types to respect cancellation while also gracefully shutting down and cleaning up.

This is what this code does.

The flow is the following:

  • Once StopProcessingAsync is called the cancellation token that signals the event handlers things are about to shutdown is triggered (as before).
  • This leaves handlers the chance to complete either indefinitely (when no cancellation token provided to StopProcessingAsync) or up "SLA" enforced by the token passed into the StopProcessingAsync. Once the token triggers the link is forcefully closed which then makes the Task.Factory.FromAsync part return and throw right next due to the token being canceled.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I may be overlooking something, but the gap that I see here is that the token that you're passing into StopProcessingAsync is only used in the scope of that method to request that the stop operation be aborted. It doesn't get propagated to the handlers, so it really only allows for us to abort when the semaphore is taking too long to acquire. No?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With the cooperative cancellation approach of .NET the cancellation token only applied to the flow of the method you pass it in. So from the perspective of the caller it really means stop processing and here is the SLA that I'm going to give you to do so. And you are correct that the token is not "lifted" to the handlers. The token in the handlers is immediately flagged to give the handler methods a possibility to stop doing what they are doing to gracefully stop things (and yes graceful can mean throwing because of cancellation). Once the SLA of the stop is reached the stop processing method returns and from that stand point it is like switching the lights off.

to the handlers, so it really only allows for us to abort when the semaphore is taking too long to acquire. No?

Not sure what you mean here. Which semaphore do you mean? There are multiple semaphores at play. Once is to make sure a single caller can stop and another one is to make sure the concurrency settings are guaranteed.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Apologies for the confusion. The point that I was making is that cancellation in StopProcessingAsync is best effort. We intentionally don't honor it if it would result in corruption or inconsistent state. If you make it past the check on Line 626, that's the point of no return. In the case that the processing task takes too long to stop, it will and should) ignore the token passed in.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair enough. I followed the proposal in the original issue description in this spike. Maybe I find a few minutes to do the other approach as a community PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@JoshLove-msft @jsquire Quick question. I think I have a good proposal. Just wanted to clarify something. My plan is to implement it in the AMQP abstractions of the service bus library as close to the bare metal as possible so that regular receives can also properly benefit from the cancellation token and not just the processor. What do you think?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this makes sense. Ideally it would be in the AMQP library itself.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I strongly agree with both of those statements. I don't see much opportunity for this to move into the AMQP library, given the history of attempts there.... the next best thing would be to go with Daniel's proposal. Assuming that the "close it and things don't blow up" works, the form that is used here is quite solid.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here we go gents #19955

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather than asserting on the timing here, I'd recommend asserting that the stop completed and that the token you passed to StopProcessingAsync has not been signaled. Since we know that the TryTimeout is set to a super-long interval, if the stop completes before your cancellation token, we know that the ActiveReceiveTask was torn down based on the RunningTaskTokenSource cancellation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well technically if we would rethrow the OperationCancelledException we could pass the token that triggered the cancellation to the exception and then verify that. But then we would need to catch the exception at other stages so that the user code still behaves the same on stop.

I need to digest this input. Don't know if I understand it yet

}
}

[Test]
[TestCase("")]
[TestCase("Abandon")]
Expand Down