Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Incorrect Recover commands during shutdown with spring-rabbit >= 3.0.11 #2941

Open
marcogramy opened this issue Jan 16, 2025 · 14 comments
Open

Comments

@marcogramy
Copy link

marcogramy commented Jan 16, 2025

I'm using Spring Boot 3.2.12 (which includes spring-rabbit 3.1.8 and spring-amqp 3.1.8) and I'm using several SimpleMessageListenerContainer consumers.

During consumer shutdown, specifically during connection closure, I noticed unusual Recover commands that I hadn't observed before. By analyzing the network traffic (using tcpdump) on port 5673, I captured the following message exchange, which is unexpected:
Image

After conducting several tests with previous versions of the spring-rabbit and spring-amqp libraries, I found that downgrading to spring-rabbit-3.0.10 resolves the issue. The network traffic in this scenario shows the expected behavior:
Image

Starting from spring-rabbit-3.0.11 onwards, I observe the problematic Recover commands again.
My RabbitMQ server version is 4.0.5.

I suspect this behavior is related to changes introduced in spring-rabbit-3.0.11 concerning shutdown procedures, as documented in issue #2594 and the corresponding commit: 02db80b.

Can you help me understand why this behavior occurs?
Thank you in advance for your assistance.

@artembilan
Copy link
Member

I'm not very good in understanding TCP dumps, but can you, please, try your solution to upgrade to the latest Spring Boot 3.4.1, which brings us Spring AMQP 3.2.1?

Any chances to have a simple project where we can reproduce?

I don't mean something TCP dump-related , but Java application to let me to run in debug mode to catch that Recover command.
Perhaps those your SimpleMessageListenerContainer instances are not beans or so.

@artembilan
Copy link
Member

OK. So, here is a JavaDoc of the command:

    /**
     * Ask the broker to resend unacknowledged messages.  In 0-8
     * basic.recover is asynchronous; in 0-9-1 it is synchronous, and
     * the new, deprecated method basic.recover_async is asynchronous.
     * @param requeue If true, messages will be requeued and possibly
     * delivered to a different consumer. If false, messages will be
     * redelivered to the same consumer.
     */
    Basic.RecoverOk basicRecover(boolean requeue) throws IOException;

Apparently that is really correct to do when we close consumers and we have in-flight un'acked messages.
Here is a logic from the Framework:

	public static void closeMessageConsumer(Channel channel, Collection<String> consumerTags, boolean transactional) {
		if (!channel.isOpen() && !(channel instanceof ChannelProxy proxy
					&& proxy.getTargetChannel() instanceof AutorecoveringChannel)
				&& !(channel instanceof AutorecoveringChannel)) {
			return;
		}
		try {
			for (String consumerTag : consumerTags) {
				cancel(channel, consumerTag);
			}
			if (transactional) {
				/*
				 * Re-queue in-flight messages if any (after the consumer is cancelled to prevent the broker from simply
				 * sending them back to us). Does not require a tx.commit.
				 */
				channel.basicRecover(true);
			}
			/*
			 * If not transactional then we are auto-acking (at least as of 1.0.0.M2) so there is nothing to recover.
			 * Messages are going to be lost in general.
			 */
		}
		catch (Exception ex) {
			throw RabbitExceptionTranslator.convertRabbitAccessException(ex);
		}
	}

That is only the place in Spring AMQP and AMQP Java Client where this command explicitly called.

@davidemarrone
Copy link

I think you have found the root cause. It appears that the "issue" was introduced by this commit:

70ba65f

Adjust BlockingQueueConsumer.basicCancel() to call RabbitUtils.closeMessageConsumer()

This change corresponds exactly to when it stopped working for us, as shown in the difference between v3.1.1 and v3.1.2:
v3.1.1...v3.1.2

After we introduced this change, our process no longer stopped cleanly. The supervisor killed the process after 10 seconds of waiting for the shutdown to complete. We will try to reproduce the problem in an isolated environment or try to understand why with this change it take so long to shutdown.
We will let you know.

@artembilan
Copy link
Member

Hold on.
Are you still claiming that RabbitUtils.closeMessageConsumer() is not correct?
Why do you treat such an improvement as an issue?

I think that is totally OK that this recovery for unack'ed messages may take some time.
But we still will be in the state where we don't lose our messages.

Therefore I don't understand why your team look at this change in the shutdown behavior as a problem.
Why you worked before with the previous behavior that does not mean that such a behavior was correct.

Please, advice.

@davidemarrone
Copy link

No, I'm not saying that RabbitUtils.closeMessageConsumer() is incorrect. I was only saying that you pointed out the right area to investigate, nothing more. We’ve noticed a change in behavior since that modification, and we’re simply trying to understand why it happens and determine the best way to handle it.

@marcogramy
Copy link
Author

Hi,
We've conducted further tests in our high-load production environment.
As mentioned, we've recently migrated to Spring Boot 3.2 which includes spring-amqp 3.1.8 and the change made to the shutdown method.

While shutting down our process, we observed several error logs of this kind:

2025-01-23T09:17:37,734GMT ERROR v10.2.1 [AMQP Connection 172.31.10.87:5674](AbstractConnectionFactory.java:779) org.springframework.amqp.rabbit.connection.CachingConnectionFactory: Shutdown Signal: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - unknown delivery tag 970, class-id=60, method-id=80)
2025-01-23T09:17:37,746GMT ERROR v10.2.1 [AMQP Connection 172.31.10.87:5674](AbstractConnectionFactory.java:779) org.springframework.amqp.rabbit.connection.CachingConnectionFactory: Shutdown Signal: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - unknown delivery tag 1056, class-id=60, method-id=80)
2025-01-23T09:17:37,748GMT ERROR v10.2.1 [AMQP Connection 172.31.10.87:5674](AbstractConnectionFactory.java:779) org.springframework.amqp.rabbit.connection.CachingConnectionFactory: Shutdown Signal: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - unknown delivery tag 1056, class-id=60, method-id=80)
2025-01-23T09:17:37,749GMT ERROR v10.2.1 [AMQP Connection 172.31.10.87:5674](AbstractConnectionFactory.java:779) org.springframework.amqp.rabbit.connection.CachingConnectionFactory: Shutdown Signal: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - unknown delivery tag 627, class-id=60, method-id=80)

Similarly repeated among RabbitMQ server logs:

2025-01-23 09:17:37.720617+00:00 [error] <0.5703.1221> Channel error on connection <0.19615.1357> (172.31.80.131:53552 -> 172.31.3.235:5674, vhost: '/', user: 'javamt'), channel 21:
2025-01-23 09:17:37.720617+00:00 [error] <0.5703.1221> operation basic.ack caused a channel exception precondition_failed: unknown delivery tag 970
2025-01-23 09:17:37.720751+00:00 [error] <0.11578.1291> Channel error on connection <0.19615.1357> (172.31.80.131:53552 -> 172.31.3.235:5674, vhost: '/', user: 'javamt'), channel 17:
2025-01-23 09:17:37.720751+00:00 [error] <0.11578.1291> operation basic.ack caused a channel exception precondition_failed: unknown delivery tag 1056
2025-01-23 09:17:37.721006+00:00 [error] <0.14850.779> Channel error on connection <0.19615.1357> (172.31.80.131:53552 -> 172.31.3.235:5674, vhost: '/', user: 'javamt'), channel 15:
2025-01-23 09:17:37.721006+00:00 [error] <0.14850.779> operation basic.ack caused a channel exception precondition_failed: unknown delivery tag 1056
2025-01-23 09:17:37.722063+00:00 [error] <0.18064.1330> Channel error on connection <0.19615.1357> (172.31.80.131:53552 -> 172.31.3.235:5674, vhost: '/', user: 'javamt'), channel 26:
2025-01-23 09:17:37.722063+00:00 [error] <0.18064.1330> operation basic.ack caused a channel exception precondition_failed: unknown delivery tag 627

Therefore, we cannot use this version at the moment.
However, if we downgrade to spring-amqp 3.1.1, the issue does not occur.

We haven't been able yet to create a demo project with the latest versions to reproduce the issue, but perhaps you could help us understand the cause of these errors?
The Recover command during shutdown, as you mentioned, is an improvement, but why are we seeing these errors during the shutdown? They seem related in some way to the introduced change, but we don't understand why.

Thank you in advance!

@artembilan
Copy link
Member

That is strange.
Because unknown delivery tag error happens when we try to ack the message twice or on a different channel.

You might mitigate the problem if you don't use transactions or don't use manual ack.

The sample to reproduce would be great.

Thanks

@marcogramy
Copy link
Author

Yes, the behavior is strange.
Furthermore, this issue is exclusively occurring in our production environment, ONLY when we observe high load.

Our configuration parameters are the follows:

spring.rabbitmq.listener.simple.concurrency=16
spring.rabbitmq.listener.simple.max-concurrency=400
spring.rabbitmq.listener.simple.prefetch=15

Given the high traffic, could it be a concurrency issue?

To be able to reproduce and debug, we are currently developing a sample project, but it will take us a few days.

@marcogramy
Copy link
Author

Hi,
we've put together a demo project replicating the issue we're encountering: https://github.com/marcogramy/rabbitmqdemo.

This project mirrors the configurations of our production environment (e.g. prefetch count, consumer concurrency, batch size, etc.).
The error occurs when a significant number of messages remain in the Unacked state within a queue, so we've simulated this scenario. Upon execution, the project throws the error during shutdown.

Could you please run this demo on your end and confirm if you observe the same behavior?

Thanks a lot!

@artembilan
Copy link
Member

@marcogramy ,

I have a lot on my plate, so, help me to recollect what exactly you'd like me to look into?

I guess we don't talk about Basic.Recover any more?

Now your concern that there are some unknown delivery tag errors.

Correct me if I'm wrong.

Thanks

@marcogramy
Copy link
Author

I believe the two things are related.
But the unknown delivery tag error is perhaps the easiest to identify and investigate at this stage.

@artembilan artembilan added this to the 4.0.0-M1 milestone Feb 7, 2025
artembilan added a commit that referenced this issue Feb 7, 2025
Fixes: #2941
Issue link: #2941

Now `BlockingQueueConsumer.basicCancel()` performs `RabbitUtils.closeMessageConsumer()`
to initiate `basicRecovery` on the transactional consumer to re-queue all the un-acked messages.
However, there is a race condition when one in-flight message may still be delivered
to the listener and then TX commit is initiated.
There a `basicAck()` is initiated.
However, such a tag might already be discarded because of the previous `basicRecovery`.
Therefore, adjust `BlockingQueueConsumer.commitIfNecessary()` to skip `basicAck()` if locally transacted and already cancelled.
Right, this may lead to the duplication delivery, but having abnormal shutdown situation we cannot guarantee that this message
to commit has been processed properly.
Also, adjust `BlockingQueueConsumer.nextMessage()` to rollback a message if consumer is cancelled instead of
going through the loop via listener

* Increase `replyTimeout` in the `EnableRabbitReturnTypesTests` for resource-sensitive builds
@artembilan
Copy link
Member

Hey, @marcogramy !

Sorry for some delay: was busy with migration to JSpecify in main for this project.

So, today I had a chance to look into your application and investigate the problem.

I have pushed this fix into 3.2.x: a800b31.

Would be great if you take a look over there and much better if you can run your solution against 3.2.3-SNAPSHOT to see if that is OK so far.

Thank you for your time and all the help around this issue!

@marcogramy
Copy link
Author

Hi,
I'm happy to test version 3.2.3-SNAPSHOT, but I cannot find it.
How can I import it into my project?

Thanks.

@artembilan
Copy link
Member

The SNAPSHOT is here: https://repo.spring.io/ui/native/snapshot/org/springframework/amqp/spring-rabbit/3.2.3-SNAPSHOT/

So, you need to add https://repo.spring.io/snapshot into your repositories management:

 <repositories>
    <repository>
      <id>spring-snapshots</id>
      <name>Spring Snapshots</name>
      <url>https://repo.spring.io/snapshot</url>
      <releases>
        <enabled>false</enabled>
      </releases>
    </repository>
  </repositories>

And use 3.2.3-SNAPSHOT for Spring AMQP dependencies.
Unfortunately, the project you have shared with me does not use Spring Boot management, so I had to replace spring-boot-starter-amqp dependency with:

		<dependency>
			<groupId>org.springframework.amqp</groupId>
			<artifactId>spring-rabbit</artifactId>
			<version>3.2.3-SNAPSHOT</version>
			<exclusions>
				<exclusion>
					<groupId>org.slf4j</groupId>
					<artifactId>slf4j-api</artifactId>
				</exclusion>
			</exclusions>
		</dependency>

If you use Spring Boot dependency management, similar to what https://start.spring.io/ does for us when we generate Maven project:

 <parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>3.4.2</version>
    <relativePath/> <!-- lookup parent from repository -->
  </parent>

Then you can use special properties to override versions managed by Spring Boot: https://docs.spring.io/spring-boot/maven-plugin/using.html#using.parent-pom.
For Spring AMQP that one is: spring-amqp.version.

Please, share your feedback before next week where we are planning to release next version which should include the fix for this problem as well.

Thanks

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

4 participants