-
Notifications
You must be signed in to change notification settings - Fork 2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[BUG] ServiceBusProcessorClient stop consuming messages when maxConcurrentCalls > 1 and the network is down for a short period of time #33493
Comments
There are two questions from the issue comments and I give my answers below:
This could be an known issue we already addressed. Please check client-hangs-or-stalls-with-a-high-prefetch-and-maxconcurrentcall-value for the solution. I assume you are running the processor client on a VM with less than or equal to 1 CPU core. In this case, your default thread pool size is 10, you can increase the thread pool size by adding vm option
Instead of adding a timeout option for these calls, I think the changes would be better to provide async methods, i.e. However, for some reasons, we didn't provide these async methods before. I don't know the original design, one reason I think maybe we want to prevent our users from using them incorrectly, for example, forgetting to add block(). Another important reason is that inside these methods, we already have the retry timeout to wait for the service response, which can be configured by RetryOptions. If we want to add an additional timeout for the compete()/abandon()/defer(), we need to investigate its side effects. For now, we need to make sure if your issue root cause is the problem#1. Please try increasing the default thread pool size and let me know if this issue is still happening. |
Hi, thanks a lot for splitting the issue and commenting back!
No in fact I can even reproduce this issue pretty reliably within a few tries locally on my machine with 4 real, 8 hyperthreading CPUs and a setting of prefetch=0 and concurrent=2. To me it really seems that any concurrent >1 has the chance of it happening, it just get's more frequent in some threads the higher the number is. As can be seen we see nine pods, while we actually have ten replicas, so one is already doing absolutely nothing anymore, out of the remaining nine there are seven still working on all 5 threads, 1 with only 3 threads and one with only 2 threads. After restarting them we are back to all ten running 5 threads each.
I'm also perfectly fine with this since by all intends for us it would achieve the same thing and allow us to call it with some timeout (currently we are considering the max lock renew time) so the threads would at least recover without the application needing a restart, even if I also feel this would basically just hide the underlying root issue |
Oh, I see. Let me try to repro your issue on my local to understand the root cause. Can you provide me more information to help me to do the repro:
For deployment, I think you are running the processor client on 10 pods and consuming messages from one queue. Please correct me if I am wrong. How about memory allocation for each pod? |
Hi again, sorry for the delay, I was trying to make an as small program as possible to reproduce it and finally got something working that is reasonable enough to post here now. Note that it might take a few attempts to disconnect before it happens it's not 100% for me at least. As for versions the following pom.xml is used for version management pom.xml<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>at.test</groupId>
<artifactId>azure-test3</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>Azure Test Application</name>
<description>A test project to show issues with azure hanging threads</description>
<properties>
<!-- Basic Stuff -->
<java.version>11</java.version>
<compileSource>11</compileSource>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
</properties>
<dependencies>
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-core</artifactId>
<version>1.36.0</version>
</dependency>
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-messaging-servicebus</artifactId>
<version>7.13.1</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.2.11</version>
</dependency>
</dependencies>
<build>
<pluginManagement>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.1</version>
<configuration>
<source>${java.version}</source>
<target>${java.version}</target>
</configuration>
</plugin>
</plugins>
</pluginManagement>
</build>
</project> And the following main java class AzureTestMain.javapackage at.test;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.HashSet;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.azure.messaging.servicebus.ServiceBusClientBuilder;
import com.azure.messaging.servicebus.ServiceBusErrorContext;
import com.azure.messaging.servicebus.ServiceBusMessage;
import com.azure.messaging.servicebus.ServiceBusMessageBatch;
import com.azure.messaging.servicebus.ServiceBusProcessorClient;
import com.azure.messaging.servicebus.ServiceBusReceivedMessageContext;
import com.azure.messaging.servicebus.ServiceBusSenderClient;
import com.azure.messaging.servicebus.models.CreateMessageBatchOptions;
import com.azure.messaging.servicebus.models.ServiceBusReceiveMode;
public class AzureTestMain
{
// replace with valid url, also ensure topic and subscription exist as required
private static final String CON_URL =
"Endpoint=sb://xxx.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=xxx";
private static final String TOPIC = "azure-test";
private static final String SUBSCRIPTION = "TEST";
private static final int MESSAGES = 100;
private static final Logger LOGGER = LoggerFactory.getLogger(AzureTestMain.class);
private static final Set<Integer> PROCESSED_MESSAGES = new HashSet<>();
public static void main(String[] args) throws InterruptedException
{
// create the topic client
// create a Service Bus Sender client for the topic and send messages, note the topic + subscription have to exist in advance
ServiceBusSenderClient senderClient = new ServiceBusClientBuilder()
.connectionString(CON_URL)
.sender()
.topicName(TOPIC)
.buildClient();
LOGGER.info("Going to send {} messages to the topic '{}'", MESSAGES, TOPIC);
ServiceBusMessageBatch currentBatch = senderClient.createMessageBatch(
new CreateMessageBatchOptions().setMaximumSizeInBytes(256 * 1024));
int count = 0;
for (int j = 1; j <= MESSAGES; j++)
{
// this examples assumes the topic already exists
ServiceBusMessage message = new ServiceBusMessage("Some Body: " + j);
message.getApplicationProperties().put("message-id", j);
if (currentBatch.tryAddMessage(message))
{
count++;
continue;
}
// The batch is full, so we create a new batch and send the batch.
senderClient.sendMessages(currentBatch);
LOGGER.info("finished sending message batch with size {}", count);
count = 0;
currentBatch =
senderClient.createMessageBatch(new CreateMessageBatchOptions().setMaximumSizeInBytes(256 * 1024));
// Add that message that we couldn't before.
if (!currentBatch.tryAddMessage(message))
{
LOGGER.error("Message is too large for an empty batch. Skipping. Max size: {}. Message: {}",
currentBatch.getMaxSizeInBytes(), message.getBody());
}
else
{
count++;
}
}
senderClient.sendMessages(currentBatch);
LOGGER.info("finished sending message batch with size {}", count);
senderClient.close();
PROCESSED_MESSAGES.clear();
// Start message processing with a defined number of concurrent calls
ServiceBusClientBuilder.ServiceBusProcessorClientBuilder builder = new ServiceBusClientBuilder()
.connectionString(CON_URL)
.processor()
.receiveMode(ServiceBusReceiveMode.PEEK_LOCK)
.topicName(TOPIC)
.subscriptionName(SUBSCRIPTION)
.prefetchCount(0)
.maxConcurrentCalls(10)
.maxAutoLockRenewDuration(Duration.of(10L, ChronoUnit.MINUTES))
.disableAutoComplete()
.processMessage(AzureTestMain::receiveMessage)
.processError(AzureTestMain::handleError);
ServiceBusProcessorClient serviceBusProcessorClient = builder.buildProcessorClient();
serviceBusProcessorClient.start();
// Just wait for all messages to be received
while (PROCESSED_MESSAGES.size() < MESSAGES)
{
LOGGER.info("Not all messages received yet only {}, sleeping for 10 seconds", PROCESSED_MESSAGES.size());
Thread.sleep(10000);
}
LOGGER.info("All messages processed, completing");
serviceBusProcessorClient.close();
System.exit(0);
}
private static void receiveMessage(ServiceBusReceivedMessageContext paramServiceBusReceivedMessageContext)
{
int messageId =
(int) paramServiceBusReceivedMessageContext.getMessage().getApplicationProperties().get("message-id");
PROCESSED_MESSAGES.add(messageId);
LOGGER.info("Got message {} in thread {}", messageId, Thread.currentThread().getName());
paramServiceBusReceivedMessageContext.complete();
}
private static void handleError(ServiceBusErrorContext paramServiceBusErrorContext)
{
LOGGER.error("Error in message processing in thread {}", Thread.currentThread().getName(),
paramServiceBusErrorContext.getException());
}
} With this running on java 11 right now - but I also previously had it for a java17 app - I can usually reproduce it after a few attempts by simply disconnecting my network during the message processing. The number of messages, concurrentConsumers, the final sleeptime and so on can be adjusted as needed. log.txt
What can be seen there is that initially the boundedElastic threads 3 to 12 without any break in between were processing messages, for a total of 10 as configured. After pulling my network cable after message 17 and plugging it back in only 8 threads recovered: 3,4,5,6,7,10,11,12. However 2 threads 8 and 9 never did anything again, also no different thread took over to ensure it's 10 in total again. Checking jconsole while the program is running shows threads 8 and 9 once again waiting in the complete().block() I think the example should answer most questions but to make sure all are answered:
there are some slight variances between real apps at around 7.12.x and 7.13.x but all are pretty recent releases
This actually largely fluctuates between apps some are sending very small a few byte single messages while others are sending 1kb+ messages with a large "batch" body in our logic. In the example there's pretty much no significant body, increasing it does not seem to change anything for the frequency it happens either
I feel like I'm repeating myself but that's just how it is, in real apps anything from <100ms to upwards of ~2 minutes for some processings. It really depends a lot on what is being done but we have observed it for both. The example right now has pretty much 0 processing time other than writing one log, could easily add some sleep there to make it longer
No we are running an on prem openshift cluster for now. The cluster is located in Europe as is our service bus subscription
The application above from my kibana screenshot that shows that some threads are no longer logging has a consumer client for 1 subscription on 1 topic, as does my code example above. This is often the most common case for our applications, however we have a few applications that consume from more than 1 subscription each with a different client object in the same process JVM where we are seeing the same behavior for all the individual client object consumer threads
Our applications range from around 1GB java heap with around 2GB pod memory to up to 9GB java heap with 12GB pod memory. We usually also monitor for long garbage collections and OOM issues and pretty much all of them are mostly running stable small GC cycles with only very infrequent larger ones. With the start of more and more customers with are slowly but steadily increasing the pod amount whenever we notice GCs becoming more frequent. But we are still just a normal human run project so I won't deny there's some long GC pause ever now and then during some unexpected traffic spikes and so on but it's only rare and I could not spot any correlation to the timing when threads seemed to stop their processing (by checking that they stopped to produce logs) Hope this helps, if anything is missing just ask! |
One more thing I've actually noticed the first time with this newly created test program is that often (but not always weirdly) on the final
My previous local setup was a small springboot application similar to our real application and the Eclipse termination action for apps is just ruthlessly killing them so there never was a clean shutdown with a close so that explains why I never saw them before. |
Hi @TheDevOps |
I found the root cause is that the internal timeout we set for the This is why if you add a timeout option to Give more internal context for this issue:
I'll continue to work for a fix for this issue. |
Great to hear you already managed to identify the root cause and thanks for the detailed explanation, everything makes a lot of sense now! |
@TheDevOps We've merged the fix for this issue that was root caused using the sample and the logs you shared. Towards the 2nd last week of March, this will be released. Thank you for collaborating on this! |
This issue comes from #26465 (comment)
Describe the bug
User encounter a problem that their
ServiceBusProcessorClient
stop consuming messages when they setmaxConcurrentCalls
to 10 and randomly cut the network connection for a short period of time (< 5 seconds). The threads will stop working and stay in the state "waiting" and never start working again. But if they set themaxConcurrentCalls
to 1, everything is working well and the processing of messages never stops.They suspected that the
.block()
calls in theServiceBusReceivedMessageContext
didn't have a timeout and due to this missing timeout the thread is eternally blocked, so they added the timeout options forcomplete()/abandon()/defer()
. After local testing, they can see the thread recovered after the timeout expired and process messages again as normal.Exception or Stack Trace
The thread dump state for complete() calls when they encounter this issue:
Code Snippet
The text was updated successfully, but these errors were encountered: