-
Notifications
You must be signed in to change notification settings - Fork 4.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[BUG] ServiceBusSessionProcessor fails to stop in a timely manner #42410
Comments
Hi @paul-datatech911. Thanks for reaching out and we regret that you're experiencing difficulties. One thing to note is that the processor will wait for your message processing handler to complete before processing stops. If your code chooses not to honor the cancellation token in the EventArgs when signaled, then the processor will remain running without dispatching additional messages until all outstanding handler calls complete. As you did not include your handlers in your repro sample, I can't say whether or not that is the cause of what you're seeing. I'm unable to reproduce the behavior that you're reporting using the latest release (7.17.4) using a basic set of handlers with the same client pattern. await using var client = new ServiceBusClient("<< CONNECTION STRING >>");
await using var processor = client.CreateSessionProcessor("<< TOPIC >>", "<< SUBSCRIPTION >>");
processor.ProcessMessageAsync += async args =>
{
args.Message.Body.ToString().Dump();
await args.CompleteMessageAsync(args.Message);
};
processor.ProcessErrorAsync += args =>
{
args.Exception.Dump();
return Task.CompletedTask;
};
await processor.StartProcessingAsync();
await Task.Delay(TimeSpan.FromSeconds(10));
var stopWatch = Stopwatch.StartNew();
await processor.StopProcessingAsync();
stopWatch.Stop();
stopWatch.Elapsed.Dump(); With that snippet, I'm seeing a number of messages be processed while the delay is active and then stopping completes in milliseconds:
If you are able to reproduce with the latest version of the library and you're sure that your handler is not blocking shutdown, please unresolve and provide a small, self-contained minimal repro and we'd be happy to take another look. |
Hi @paul-datatech911. Thank you for opening this issue and giving us the opportunity to assist. We believe that this has been addressed. If you feel that further discussion is needed, please add a comment with the text "/unresolve" to remove the "issue-addressed" label and continue the conversation. |
The handlers are empty in this case. They do no work.
Get Outlook for iOS<https://aka.ms/o0ukef>
…________________________________
From: Jesse Squire ***@***.***>
Sent: Tuesday, March 5, 2024 2:20:50 PM
To: Azure/azure-sdk-for-net ***@***.***>
Cc: Paul Seabury ***@***.***>; Mention ***@***.***>
Subject: [EXT] Re: [Azure/azure-sdk-for-net] [BUG] ServiceBusSessionProcessor fails to stop in a timely manner (Issue #42410)
Hi @paul-datatech911<https://github.com/paul-datatech911>. Thanks for reaching out and we regret that you're experiencing difficulties. One thing to note is that the processor will wait for your message processing handler to complete before processing stops. If your code chooses not to honor the cancellation token in the EventArgs when signaled, then the processor will remain running without dispatching additional messages until all outstanding handler calls complete. As you did not include your handlers in your repro sample, I can't say whether or not that is the cause of what you're seeing.
I'm unable to reproduce the behavior that you're reporting using the latest release (7.17.4) using a basic set of handlers with the same client pattern.
await using var client = new ServiceBusClient("<< CONNECTION STRING >>");
await using var processor = client.CreateSessionProcessor("<< TOPIC >>", "<< SUBSCRIPTION >>");
processor.ProcessMessageAsync += async args =>
{
args.Message.Body.ToString().Dump();
await args.CompleteMessageAsync(args.Message);
};
processor.ProcessErrorAsync += args =>
{
args.Exception.Dump();
return Task.CompletedTask;
};
await processor.StartProcessingAsync();
await Task.Delay(TimeSpan.FromSeconds(10));
var stopWatch = Stopwatch.StartNew();
await processor.StopProcessingAsync();
stopWatch.Stop();
stopWatch.Elapsed.Dump();
With that snippet, I'm seeing a number of messages be processed while the delay is active and then stopping completes in milliseconds:
... SNIP ...
Message-80
Message-81
Message-82
Message-83
Message-84
Message-85
00:00:00.0362204
If you are able to reproduce with the latest version of the library and you're sure that your handler is not blocking shutdown, please unresolve and provide a small, self-contained minimal repro and we'd be happy to take another look.
—
Reply to this email directly, view it on GitHub<#42410 (comment)>, or unsubscribe<https://github.com/notifications/unsubscribe-auth/AS2DH5F6JXJRBT2JNPDDSE3YWYSKFAVCNFSM6AAAAABEHPJX3CVHI2DSMVQWIX3LMV43OSLTON2WKQ3PNVWWK3TUHMYTSNZZGU3TCNRSGA>.
You are receiving this because you were mentioned.Message ID: ***@***.***>
|
/unresolve Yes I do realize that CloseAsync() and DisposeAsync() don't both need to be called, but for completeness they are all included. using Azure.Messaging.ServiceBus;
using Azure.Messaging.ServiceBus.Administration;
using ConsoleDump;
using System.Diagnostics;
var connStr = "<<CONNSTRING>>";
var topic = "testtopic";
var subscription = "testSubscription";
var adminClient = new ServiceBusAdministrationClient(connStr);
if(!(await adminClient.TopicExistsAsync(topic)))
await adminClient.CreateTopicAsync(topic);
if(!(await adminClient.SubscriptionExistsAsync(topic, subscription)))
await adminClient.CreateSubscriptionAsync(new CreateSubscriptionOptions(topic, subscription)
{
RequiresSession = true
});
await using var client = new ServiceBusClient(connStr);
await using var processor = client.CreateSessionProcessor("testtopic", "testsubscription", new ServiceBusSessionProcessorOptions()
{
AutoCompleteMessages = false,
MaxConcurrentCallsPerSession = 1,
ReceiveMode = ServiceBusReceiveMode.PeekLock
//,MaxConcurrentSessions = 16
});
processor.ProcessMessageAsync += async args =>
{
await args.CompleteMessageAsync(args.Message);
args.Message.Body.ToString().Dump();
await args.CompleteMessageAsync(args.Message);
};
processor.ProcessErrorAsync += args =>
{
args.Exception.Dump();
return Task.CompletedTask;
};
await processor.StartProcessingAsync();
await Task.Delay(TimeSpan.FromSeconds(5));
TimeAction("StopProcessingAsync", async () => await processor.StopProcessingAsync());
TimeAction("CloseAsync",async () => await processor.CloseAsync());
TimeAction("DisposeAsync", async () => await processor.DisposeAsync());
Console.Read();
static void TimeAction(string name, Action toTime)
{
try
{
var sw = Stopwatch.StartNew();
toTime();
$"'{name ?? "UNKNOWN"}' took {sw.Elapsed.TotalSeconds}s".Dump();
}
catch { }
}
|
@paul-datatech911 : The results that you're seeing are normal behavior under high degrees of concurrency. Cancellation in the AMQP transport is best effort and is possible only during points in the protocol where it can be safely performed. When stopping, the processor will immediately request cancellation of all active network operations and then must wait for the transport to complete. Each network operation is asynchronous and cancellation/completing requires the async continuations to be scheduled and completed. As your host is unlikely to have 128+ CPU cores available to run that amount of work in parallel, the .NET thread pool has to wait for resources to become available to schedule completion of those network operations and return control to the processor. I'd suggest reading through the Processor appears to hang or have latency issues when using high concurrency section of the Troubleshooting Guide for additional context and discussion. |
Hi @paul-datatech911. Thank you for opening this issue and giving us the opportunity to assist. We believe that this has been addressed. If you feel that further discussion is needed, please add a comment with the text "/unresolve" to remove the "issue-addressed" label and continue the conversation. |
/unresolve I read it thanks for the link - although it doesn't give any good reason for why high concurrency should be an issue (in the absence of blocking sync or async-over-async), it just says that it is. That just means that there is still some sort of blocking behavior in the transport itself, correct? I mean, it doesn't matter if I have 1 core and need to honor joined cancellation on 1000 threads, it can be and should be fast if there's no blocking synchronous calls. Which team/repo would you suggest an improvement request be created in? In case you're curious, the use case is a pretty simple multi-tenant with the sessionId being the tenantId - doesn't seem exotic. Also, if I set the concurrency to the number of cores on my system, I don't get the same performance as you do:
Paul |
The TSG doesn't go into deep concurrency discussion because it is not an issue related to the Azure SDK - it is a general development concept. The relevant information from the SDK perspective is that each degree of concurrency is managed as an independent background task in the thread pool.
As mentioned, the performance characteristics here will depend on the host system, its network, and at what point the cancellation is honored by the transport. It is within what would be a reasonable range, and isn't something that indicates an issue that with the Service Bus library.
You're welcome to open an issue in the Microsoft.Azure.Amqp repository. |
Hi @paul-datatech911. Thank you for opening this issue and giving us the opportunity to assist. We believe that this has been addressed. If you feel that further discussion is needed, please add a comment with the text "/unresolve" to remove the "issue-addressed" label and continue the conversation. |
Thanks for explaining concurrency as a general development concept... I think you are correct at hinting at "at which point the cancellation is honored in the transport", and I'll pass the buck over to someone else that may be willing to take a look! Paul |
Hi @paul-datatech911, since you haven’t asked that we |
Library name and version
Azure.Messagin.ServiceBus 7.17.3
Describe the bug
Calling processor.StopProcessingAsync(), processor.CloseAsync() or processor.DisposeAsync() takes up to 60 seconds.
This has been documented in multiple threads (now closed), and marked as merged/fix for EventHubs (#21242).
The issue remains for ServiceBusSessionProcessor.
#17734
#19306
#21869
Expected behavior
Calling processor.StopProcessingAsync(), processor.CloseAsync() or processor.DisposeAsync() should stop the processing of messages and close the underlying transport immediately.
Actual behavior
Calling processor.StopProcessingAsync(), processor.CloseAsync() or processor.DisposeAsync() take up to the default 60s polling period of the underlying amqp transport to return.
Reproduction Steps
Environment
No response
The text was updated successfully, but these errors were encountered: