From 02db80bc8a5095927701a3b296b30aee6c8435bb Mon Sep 17 00:00:00 2001 From: Artem Bilan Date: Tue, 9 Jan 2024 11:21:11 -0500 Subject: [PATCH] GH-2593: Reliably shutdown SMLC Fixes: #2593 * Add `activeObjectCounter` release into the `BlockingQueueConsumer.handleCancelOk()` in reply to the `basicCancel()` call * Adjust `BlockingQueueConsumer.basicCancel()` to call `RabbitUtils.closeMessageConsumer()` to setisfy transactional context * Adjust `SimpleMessageListenerContainerIntegrationTests` to eventually setisfy to the transaction rollback when container is shuted down * Add new tests into the `ContainerShutDownTests` to verify the listener containers are not blocked waiting on the `cancelationLock` **Cherry-pick to `3.0.x`** (cherry picked from commit 70ba65f2c2b8c32ea11a7257d64e05f9a8389ca2) --- .../AbstractMessageListenerContainer.java | 4 +- .../listener/BlockingQueueConsumer.java | 12 ++--- .../listener/ContainerShutDownTests.java | 47 +++++++++++++++++-- ...sageListenerContainerIntegrationTests.java | 5 +- .../SimpleMessageListenerContainerTests.java | 11 +++-- 5 files changed, 60 insertions(+), 19 deletions(-) diff --git a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/AbstractMessageListenerContainer.java b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/AbstractMessageListenerContainer.java index 7392fdab3a..17d5c6255a 100644 --- a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/AbstractMessageListenerContainer.java +++ b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/AbstractMessageListenerContainer.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2023 the original author or authors. + * Copyright 2002-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -1169,7 +1169,7 @@ protected boolean isForceStop() { /** * Set to true to stop the container after the current message(s) are processed and * requeue any prefetched. Useful when using exclusive or single-active consumers. - * @param forceStop true to stop when current messsage(s) are processed. + * @param forceStop true to stop when current message(s) are processed. * @since 2.4.14 */ public void setForceStop(boolean forceStop) { diff --git a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/BlockingQueueConsumer.java b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/BlockingQueueConsumer.java index c6a95816af..84ff336be9 100644 --- a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/BlockingQueueConsumer.java +++ b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/BlockingQueueConsumer.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2023 the original author or authors. + * Copyright 2002-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -455,11 +455,10 @@ protected void basicCancel() { protected void basicCancel(boolean expected) { this.normalCancel = expected; - getConsumerTags().forEach(consumerTag -> { - if (this.channel.isOpen()) { - RabbitUtils.cancel(this.channel, consumerTag); - } - }); + Collection consumerTags = getConsumerTags(); + if (!CollectionUtils.isEmpty(consumerTags)) { + RabbitUtils.closeMessageConsumer(this.channel, consumerTags, this.transactional); + } this.cancelled.set(true); this.abortStarted = System.currentTimeMillis(); } @@ -989,6 +988,7 @@ public void handleCancelOk(String consumerTag) { + "); " + BlockingQueueConsumer.this); } this.canceled = true; + BlockingQueueConsumer.this.activeObjectCounter.release(BlockingQueueConsumer.this); } @Override diff --git a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/ContainerShutDownTests.java b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/ContainerShutDownTests.java index 6156cb534d..db23cde289 100644 --- a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/ContainerShutDownTests.java +++ b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/ContainerShutDownTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2017-2019 the original author or authors. + * Copyright 2017-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -30,11 +30,13 @@ import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.junit.RabbitAvailable; import org.springframework.amqp.utils.test.TestUtils; +import org.springframework.util.StopWatch; import com.rabbitmq.client.AMQP.BasicProperties; /** * @author Gary Russell + * @author Artem Bilan * @since 2.0 * */ @@ -56,7 +58,6 @@ public void testUninterruptibleListenerDMLC() throws Exception { public void testUninterruptibleListener(AbstractMessageListenerContainer container) throws Exception { CachingConnectionFactory cf = new CachingConnectionFactory("localhost"); container.setConnectionFactory(cf); - container.setShutdownTimeout(500); container.setQueueNames("test.shutdown"); final CountDownLatch latch = new CountDownLatch(1); final CountDownLatch testEnded = new CountDownLatch(1); @@ -91,11 +92,49 @@ public void testUninterruptibleListener(AbstractMessageListenerContainer contain assertThat(channels).hasSize(2); } finally { + testEnded.countDown(); container.stop(); - assertThat(channels).hasSize(1); + cf.destroy(); + } + } + + @Test + public void consumersCorrectlyCancelledOnShutdownSMLC() throws Exception { + SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(); + consumersCorrectlyCancelledOnShutdown(container); + } + @Test + public void consumersCorrectlyCancelledOnShutdownDMLC() throws Exception { + DirectMessageListenerContainer container = new DirectMessageListenerContainer(); + consumersCorrectlyCancelledOnShutdown(container); + } + + private void consumersCorrectlyCancelledOnShutdown(AbstractMessageListenerContainer container) + throws InterruptedException { + + CachingConnectionFactory cf = new CachingConnectionFactory("localhost"); + container.setConnectionFactory(cf); + container.setQueueNames("test.shutdown"); + container.setMessageListener(m -> { + }); + final CountDownLatch startLatch = new CountDownLatch(1); + container.setApplicationEventPublisher(e -> { + if (e instanceof AsyncConsumerStartedEvent) { + startLatch.countDown(); + } + }); + container.start(); + try { + assertThat(startLatch.await(30, TimeUnit.SECONDS)).isTrue(); + StopWatch stopWatch = new StopWatch(); + stopWatch.start(); + container.shutdown(); + stopWatch.stop(); + assertThat(stopWatch.getTotalTimeMillis()).isLessThan(3000); + } + finally { cf.destroy(); - testEnded.countDown(); } } diff --git a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainerIntegrationTests.java b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainerIntegrationTests.java index ad19a5526d..64fee8a554 100755 --- a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainerIntegrationTests.java +++ b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainerIntegrationTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2019 the original author or authors. + * Copyright 2002-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -18,6 +18,7 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException; +import static org.awaitility.Awaitility.await; import java.util.ArrayList; import java.util.Arrays; @@ -314,7 +315,7 @@ private void doListenerWithExceptionTest(CountDownLatch latch, MessageListener l container.shutdown(); } if (acknowledgeMode.isTransactionAllowed()) { - assertThat(template.receiveAndConvert(queue.getName())).isNotNull(); + await().untilAsserted(() -> assertThat(template.receiveAndConvert(queue.getName())).isNotNull()); } else { assertThat(template.receiveAndConvert(queue.getName())).isNull(); diff --git a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainerTests.java b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainerTests.java index 204df12abb..7044f5ee6c 100644 --- a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainerTests.java +++ b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainerTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2023 the original author or authors. + * Copyright 2002-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -50,6 +50,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executor; @@ -517,7 +518,7 @@ public void testWithConnectionPerListenerThread() throws Exception { waitForConsumersToStop(consumers); Set allocatedConnections = TestUtils.getPropertyValue(ccf, "allocatedConnections", Set.class); assertThat(allocatedConnections).hasSize(2); - assertThat(ccf.getCacheProperties().get("openConnections")).isEqualTo("1"); + assertThat(ccf.getCacheProperties().get("openConnections")).isEqualTo("2"); } @Test @@ -807,15 +808,15 @@ private Answer messageToConsumer(final Channel mockChannel, final Simple } - private void waitForConsumersToStop(Set consumers) throws Exception { + private void waitForConsumersToStop(Set consumers) { with().pollInterval(Duration.ofMillis(10)).atMost(Duration.ofSeconds(10)) .until(() -> consumers.stream() .map(consumer -> TestUtils.getPropertyValue(consumer, "consumer")) - .allMatch(c -> c == null)); + .allMatch(Objects::isNull)); } @SuppressWarnings("serial") - private class TestTransactionManager extends AbstractPlatformTransactionManager { + private static class TestTransactionManager extends AbstractPlatformTransactionManager { @Override protected void doBegin(Object transaction, TransactionDefinition definition) throws TransactionException {