Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Read timed out / TIMEOUT WAITING FOR ACK when sending/receiving messages in parallel #2896

Closed
nils-christian opened this issue Nov 15, 2024 · 5 comments

Comments

@nils-christian
Copy link

Hi,

In what version(s) of Spring AMQP are you seeing this issue?

3.1.7

Describe the bug

We noticed that our application sometimes fails to send messages with "TIMEOUT WAITING FOR ACK". We were able to create a minimal example which fails to send messages once we send them in parallel.

To Reproduce

Please take a look at the following example:

public class TestDemoApplication {

	private static final String MESSAGE = Stream
			.generate( ( ) -> "A" )
			.limit( 1 * 1024 * 1024 )
			.collect( Collectors.joining( ) );

	public static void main( String[] args ) {
		final SpringApplication.Running running = SpringApplication
				.from( DemoApplication::main )
				.with( RabbitMQConfiguration.class )
				.run( args );

		final ConfigurableApplicationContext applicationContext = running.getApplicationContext( );
		final RabbitTemplate rabbitTemplate = applicationContext.getBean( RabbitTemplate.class );

		final ExecutorService executorService = Executors.newFixedThreadPool( 2 );

		for ( int i = 0; i < 1000; i++ ) {
			final String messageId = "id-" + i;
			executorService.submit( ( ) -> sendMessage( rabbitTemplate, messageId ) );
		}
	}

	private static void sendMessage( final RabbitTemplate rabbitTemplate, final String messageId ) {
		try {
			final CorrelationData correlationData = new CorrelationData( );
			rabbitTemplate.convertAndSend( Constants.EXCHANGE_NAME, "key", MESSAGE, correlationData );
			System.out.println( "Message " + messageId + " has been sent." );

			final CorrelationData.Confirm confirm = correlationData.getFuture( ).get( 10, TimeUnit.SECONDS );
			System.out.println( "Confirm Ack was " + confirm.isAck( ) );
		} catch ( Exception ex ) {
			throw new RuntimeException( "An exception occured.", ex );
		}
	}

}

The configuration is

spring.rabbitmq.publisher-confirm-type=correlated
spring.rabbitmq.publisher-returns=true
spring.rabbitmq.template.mandatory=true

The example sends 1000 messages - each containing approx. 1 MiByte payload - with two threads. After some messages it fails with

2024-11-15T15:05:15.705+01:00 ERROR 8536 --- [demo] [127.0.0.1:33303] c.r.c.impl.ForgivingExceptionHandler : An unexpected connection driver error occurred

java.net.SocketTimeoutException: Read timed out
at java.base/sun.nio.ch.NioSocketImpl.timedRead(NioSocketImpl.java:278) ~[na:na]
at java.base/sun.nio.ch.NioSocketImpl.implRead(NioSocketImpl.java:304) ~[na:na]
at java.base/sun.nio.ch.NioSocketImpl.read(NioSocketImpl.java:346) ~[na:na]
at java.base/sun.nio.ch.NioSocketImpl$1.read(NioSocketImpl.java:796) ~[na:na]
at java.base/java.net.Socket$SocketInputStream.read(Socket.java:1099) ~[na:na]
at java.base/java.io.BufferedInputStream.read1(BufferedInputStream.java:345) ~[na:na]
at java.base/java.io.BufferedInputStream.implRead(BufferedInputStream.java:420) ~[na:na]
at java.base/java.io.BufferedInputStream.read(BufferedInputStream.java:399) ~[na:na]
at java.base/java.io.DataInputStream.readFully(DataInputStream.java:208) ~[na:na]
at java.base/java.io.DataInputStream.readFully(DataInputStream.java:179) ~[na:na]
at com.rabbitmq.client.impl.Frame.readFrom(Frame.java:121) ~[amqp-client-5.21.0.jar:5.21.0]
at com.rabbitmq.client.impl.SocketFrameHandler.readFrame(SocketFrameHandler.java:199) ~[amqp-client-5.21.0.jar:5.21.0]
at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:687) ~[amqp-client-5.21.0.jar:5.21.0]
at java.base/java.lang.Thread.run(Thread.java:1583) ~[na:na]

2024-11-15T15:05:15.709+01:00 ERROR 8536 --- [demo] [nectionFactory4] o.s.a.r.c.CachingConnectionFactory : Shutdown Signal: clean channel shutdown; protocol method: #method<channel.close>(reply-code=406, reply-text=TIMEOUT WAITING FOR ACK, class-id=0, method-id=0)

If we add a synchronized block around sendMessage, everything works.

Expected behavior

The messages are send without issues regarding the ack.

Thank you and best regards,

Nils

@artembilan
Copy link
Member

artembilan commented Nov 15, 2024

Please, share a simple project with us to reproduce on our side.
You probably have something else what makes such an effect.
For example, we don't know what is that RabbitMQConfiguration.

I just created one like this:

@SpringBootApplication
public class SpringAmqpIssue2896Application {

	private static final String MESSAGE = Stream
			.generate(() -> "A")
			.limit(1024 * 1024)
			.collect(Collectors.joining());

	public static void main(String[] args) {
		ConfigurableApplicationContext applicationContext = SpringApplication.run(SpringAmqpIssue2896Application.class);
		final RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);

		final ExecutorService executorService = Executors.newFixedThreadPool(2);

		for (int i = 0; i < 1000; i++) {
			final String messageId = "id-" + i;
			executorService.submit(() -> sendMessage(rabbitTemplate, messageId));
		}
	}

	private static void sendMessage(final RabbitTemplate rabbitTemplate, final String messageId) {
		try {
			final CorrelationData correlationData = new CorrelationData();
			rabbitTemplate.convertAndSend("", "someQueue", MESSAGE, correlationData);
			System.out.println("Message " + messageId + " has been sent.");

			final CorrelationData.Confirm confirm = correlationData.getFuture().get(10, TimeUnit.SECONDS);
			System.out.println("Confirm Ack was " + confirm.isAck());
		}
		catch (Exception ex) {
			throw new RuntimeException("An exception occurred.", ex);
		}
	}

	@Bean
	Queue someQueue() {
		return new Queue("someQueue");
	}

}

And looks like result is totally OK:

...
Message id-993 has been sent.
Message id-994 has been sent.
Confirm Ack was true
Confirm Ack was true
Message id-995 has been sent.
Confirm Ack was true
Message id-996 has been sent.
Confirm Ack was true
Message id-997 has been sent.
Confirm Ack was true
Message id-998 has been sent.
Confirm Ack was true
Message id-999 has been sent.
Confirm Ack was true

@nils-christian
Copy link
Author

Hi @artembilan,

Sure. Here you go: https://github.com/nils-christian/spring-amqp-issue-2896

The relevant method can be found in TestDemoApplication.java.

Best regards,

Nils

@artembilan
Copy link
Member

Thanks.
Running your application as (I mean TestDemoApplication from test scope) is does not fail for me:

2024-11-15T14:10:39.780-05:00  WARN 28292 --- [demo] [nectionFactory2] o.s.amqp.rabbit.core.RabbitTemplate      : Returned message but no callback available
Message id-995 has been sent.
Confirm Ack was true
2024-11-15T14:10:39.800-05:00  WARN 28292 --- [demo] [nectionFactory3] o.s.amqp.rabbit.core.RabbitTemplate      : Returned message but no callback available
Message id-996 has been sent.
Confirm Ack was true
2024-11-15T14:10:39.816-05:00  WARN 28292 --- [demo] [nectionFactory1] o.s.amqp.rabbit.core.RabbitTemplate      : Returned message but no callback available
Message id-997 has been sent.
Confirm Ack was true
2024-11-15T14:10:39.837-05:00  WARN 28292 --- [demo] [nectionFactory4] o.s.amqp.rabbit.core.RabbitTemplate      : Returned message but no callback available
Message id-998 has been sent.
Confirm Ack was true
2024-11-15T14:10:39.855-05:00  WARN 28292 --- [demo] [nectionFactory2] o.s.amqp.rabbit.core.RabbitTemplate      : Returned message but no callback available
Message id-999 has been sent.
Confirm Ack was true
2024-11-15T14:10:39.872-05:00  WARN 28292 --- [demo] [nectionFactory3] o.s.amqp.rabbit.core.RabbitTemplate      : Returned message but no callback available
Confirm Ack was true
2024-11-15T14:10:39.882-05:00  WARN 28292 --- [demo] [nectionFactory3] o.s.amqp.rabbit.core.RabbitTemplate      : Returned message but no callback available

And that is expected because there is no queue bound to that key.

When add some queue and bind it into your exchange:

	@Bean
	Queue testQueue() {
		return new Queue("testQueue");
	}

	@Bean
	Binding testBinding(Queue testQueue, TopicExchange testTopicExchange) {
		return BindingBuilder.bind(testQueue).to(testTopicExchange).with("key");
	}

I got same result as before with my own test:

Message id-994 has been sent.
Confirm Ack was true
Confirm Ack was true
Message id-995 has been sent.
Message id-996 has been sent.
Confirm Ack was true
Confirm Ack was true
Message id-997 has been sent.
Message id-998 has been sent.
Confirm Ack was true
Confirm Ack was true
Message id-999 has been sent.
Confirm Ack was true

I'm not sure how to reproduce your TIMEOUT WAITING FOR ACK error.

Anything else you can share to elaborate the problem?

@nils-christian
Copy link
Author

Interesting. The timeout happens on my system each time I execute the code without the queues. I will try and reproduce the example with binding and queries.

In the meantime: Do you have any (other) idea what might trigger a "TIMEOUT WAITING FOR ACK"? This occurs in our environment again and again, but cleans up itself after some time.

@artembilan
Copy link
Member

Duplication of #2907.
But looks like that one has more analysis and really states what is the problem.

Thank you and let's follow up in the issue from now on!

@artembilan artembilan closed this as not planned Won't fix, can't repro, duplicate, stale Nov 18, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants