-
Notifications
You must be signed in to change notification settings - Fork 4.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[BUG] Timeout stopping Service Bus Processor #17734
Comments
Thank you for your feedback. Tagging and routing to the team member best able to assist. Please be aware that due to the holidays, responses may be delayed. |
Hi @sireza, unfortunatley this is expected behavior. The underlying AMQP library does not support cancellation tokens and is instead based on timeouts. Therefore, when StopProcessingAsync is called, we have to wait for any Receive calls to timeout before shutting down. The timeout can be tweaked by modifying the TryTimeout in the RetryOptions that can be set in the ServiceBusClientOptions. |
@JoshLove-msft that's unfortunate. Any drawback from setting the |
There is the potential that other operations might timeout unnecessarily, but they would still get retried. I would suggest trying it out to see if is acceptable. Another option would be for us to expose the MaxWaitTime for the receive operations, so that this can be configured in the processor, as suggested in #17581. If we added this option, you wouldn't need to change the TryTimeout, but instead you would set the MaxWaitTime. |
Hi, we're sending this friendly reminder because we haven't heard back from you in 7 days. We need more information about this issue to help address it. Please be sure to give us your input. If we don't hear back from you within 14 days of this comment the issue will be automatically closed. Thank you! |
Using |
I made a minimum reproduction of this assuming I had triggered some fringe corner case with my usage. After debugging for a bit and finding the cause I found this issue (wish I did a better search first 😅). Finding that this was expected behavior was a bit shocking and I hope you all change your view on this from being expected behavior to being a limitation. Regarding the workaround of changing the TryTimeout to 15 seconds, does that mean my services will make 4 times as many AMQP requests compared to the default? The client is using that timeout for long polling right? If so, could that cause any possible issues with placing extra burden on my namespaces? Assuming ReceivingAmqpLink never has support for cancellation, are there any possibilities to work around that limitation? For example could the AmqpReceiver be placed into a state of "stopping" to quickly return to the caller for Close/Dispose? In a stopping state maybe it could reject any new messages so that the link timeout can happen asynchronously and probably even just allow the process to die much quicker. In my current context I have IHostedService background workers that close their processor within IHostedService.StopAsync . Combined with how StopAsync is called in sequence, gracefully terminating these services can now take multiple minutes. I guess I can just take a fire and forget approach to calling StopProcessingAsync/CloseAsync for now. Here is the reproduction I made: using System;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
using Azure.Messaging.ServiceBus;
namespace ConsoleApp1
{
class Program
{
static async Task Main(string[] args)
{
const string connectionString = @"<CONNECTIONSTRING>";
var ct = CancellationToken.None;
await using var client = new ServiceBusClient(connectionString);
var processorOptions = new ServiceBusProcessorOptions
{
AutoCompleteMessages = false,
MaxConcurrentCalls = 1,
ReceiveMode = ServiceBusReceiveMode.PeekLock
};
var processor = client.CreateProcessor("test", processorOptions);
processor.ProcessMessageAsync += _ => throw new NotImplementedException();
processor.ProcessErrorAsync += _ => Task.CompletedTask;
var stopwatch = Stopwatch.StartNew();
await processor.StartProcessingAsync(ct);
stopwatch.Stop();
Console.WriteLine($"Started {processor.EntityPath}: {stopwatch.Elapsed}");
// I think this delay is important so execution can get far enough into AmqpReceiver.ReceiveMessagesAsync
await Task.Delay(TimeSpan.FromSeconds(3));
Console.WriteLine($"Stopping {processor.EntityPath}...");
stopwatch = Stopwatch.StartNew();
// This is where the problem seems to be. When this is called ServiceBusProcessor.RunningTaskTokenSource will
// have cancel called on it to try to bring the receivers down. Meanwhile in AmqpReceiver.ReceiveMessagesAsyncInternal
// the call to link.BeginReceiveRemoteMessages does not seem to have any way to be notifed of that cancellation.
// My guess is that an http timeout is required to cause the cancellation to be noticed which is why it always seems
// to wait for one minute.
await processor.StopProcessingAsync(ct);
stopwatch.Stop();
Console.WriteLine($"Stopped {processor.EntityPath}: {stopwatch.Elapsed}");
Console.WriteLine($"Closing {processor.EntityPath}...");
stopwatch = Stopwatch.StartNew();
await processor.CloseAsync(ct);
stopwatch.Stop();
Console.WriteLine($"Closed {processor.EntityPath}: {stopwatch.Elapsed}");
// TODO: dispose when https://github.com/Azure/azure-sdk-for-net/pull/18027 is merged
}
}
} |
Yes, if your queues are often empty, this would mean that the processor would end up doing 4x as many receive requests over a given period of time, which would impact the number of operations you are doing against your namespace and could subject you to throttling if you are doing more than 1000 operations per second (https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-throttling#what-are-the-credit-limits). If the queues generally contain messages, then it wouldn't impact the number of requests, since the receive calls would return as soon as a message was retrieved.
I think this could be possible but it would still require work in the underlying AMQP library which lives here - https://github.com/Azure/azure-amqp. I think we can also consider having special behavior in the Service Bus SDK where we will attempt to shutdown the link early even if a receive call is in process for the specific case of stopping the processor. |
Fixed in #19955 |
I just got hit with this going through the basic samples for code using the Azure.Messaging.ServiceBus library, I was certainly convinced it was a bug. I'm not entirely sure I understand what the fix actually is, is there actually a change to the default behaviour of StopProcessingAsync( ) (which seems to take ~60 seconds to timeout)? |
Yeah there is a fix that will be available in an upcoming beta. The GA fix is still at least a month out. |
Reopening until the GA fix is available. |
It turns out the current fix for this has some unintended side effects. We still plan to fix this once support is added for cancellation tokens in the AMQP library, but may need to revert this change for now. See #21869 |
The side effects seem to be not caused by the fix to this issue. The fix for this is in 7.2.0 stable release. |
The issue is not fixed, I am having the same problem, after : just put a breakpoint, wait for a minute, then continue, you will always get this error. |
@xbaha what error are you referring to? |
@JoshLove-msft the error I got is: |
Add xms-ids for machinelearning, machinelearningcompute, machinelearningexperimentation and machinelearningservices (Azure#17734) * add xmsids for machinelearning rps * update inner properties * address code review comments
Add xms-ids for machinelearning, machinelearningcompute, machinelearningexperimentation and machinelearningservices (Azure#17734) * add xmsids for machinelearning rps * update inner properties * address code review comments
Add xms-ids for machinelearning, machinelearningcompute, machinelearningexperimentation and machinelearningservices (Azure#17734) * add xmsids for machinelearning rps * update inner properties * address code review comments
Hello, I believe I'm still facing this issue despite using Azure.Messaging.ServiceBus 7.7.0 (as well as Azure.Messaging.EventHubs 5.1.0). sample:
|
Because you are passing a cancellation token to StopProcessingAsync, the stopping will be canceled if the token is canceled. Do you know if the token is canceled? |
Given the taskCanceledException, I believe the token is canceled. However, if I run locally without the token being cancelled, it's just stuck at StopProcessingAsync and never gets to the next line. Even after >5 minutes. So currently, if the task is cancelled, it throws the exception, and if it isn't cancelled, it just runs forever. Either way, the StopProcessingAsync() call never completes. |
StopProcessingAsync will wait for any event handlers to finish running. Is it possible that your event handlers are not completing? |
This is happening even if the ServiceBus is empty. |
Description
I discovered this issue when using the
ServiceBusProcessor
withIHostedService
, but I was able to replicate the issue using the quickstart snippetThe processor itself is working fine, it was able to receive messages and successfully 'completes' it
Not sure what it is waiting for, as there is no message in the queue. I am relying on this method to function properly to ensure messages in flight gets put back in the queue if the app is shutting down.
Expected behaviour
When
StopProcessingAsync
is called, the processor should gracefully stop processing any message, then successfully stops the processor before it times outActual behaviour (include Exception or Stack Trace)
Calling
StopProcessingAsync()
freezes the app, then times out after 60 secondsI added event listener to see what's happening, and here's the output
To Reproduce
Steps to reproduce the behaviour (include a code snippet, screenshot, or any additional information that might help us reproduce the issue)
StopProcessingAsync()
Environment:
The text was updated successfully, but these errors were encountered: