diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/AbstractRabbitListenerContainerFactoryConfigurer.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/AbstractRabbitListenerContainerFactoryConfigurer.java index feab224f2ca7..154ac99ceae0 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/AbstractRabbitListenerContainerFactoryConfigurer.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/AbstractRabbitListenerContainerFactoryConfigurer.java @@ -118,6 +118,7 @@ protected void configure(T factory, ConnectionFactory connectionFactory, } factory.setMissingQueuesFatal(configuration.isMissingQueuesFatal()); factory.setDeBatchingEnabled(configuration.isDeBatchingEnabled()); + factory.setForceStop(configuration.isForceStop()); ListenerRetry retryConfig = configuration.getRetry(); if (retryConfig.isEnabled()) { RetryInterceptorBuilder builder = (retryConfig.isStateless()) ? RetryInterceptorBuilder.stateless() diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/RabbitProperties.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/RabbitProperties.java index 60e076ca56a3..4e95d7346bf9 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/RabbitProperties.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/RabbitProperties.java @@ -734,6 +734,12 @@ public abstract static class AmqpContainer extends BaseContainer { */ private boolean deBatchingEnabled = true; + /** + * Whether the container (when stopped) should stop immediately after processing + * the current message or stop after processing all pre-fetched messages. + */ + private boolean forceStop; + /** * Optional properties for a retry interceptor. */ @@ -781,6 +787,14 @@ public void setDeBatchingEnabled(boolean deBatchingEnabled) { this.deBatchingEnabled = deBatchingEnabled; } + public boolean isForceStop() { + return this.forceStop; + } + + public void setForceStop(boolean forceStop) { + this.forceStop = forceStop; + } + public ListenerRetry getRetry() { return this.retry; } diff --git a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/amqp/RabbitAutoConfigurationTests.java b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/amqp/RabbitAutoConfigurationTests.java index a53b482e60ca..363a4153b8d7 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/amqp/RabbitAutoConfigurationTests.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/amqp/RabbitAutoConfigurationTests.java @@ -519,7 +519,8 @@ void testSimpleRabbitListenerContainerFactoryWithCustomSettings() { "spring.rabbitmq.listener.simple.defaultRequeueRejected:false", "spring.rabbitmq.listener.simple.idleEventInterval:5", "spring.rabbitmq.listener.simple.batchSize:20", - "spring.rabbitmq.listener.simple.missingQueuesFatal:false") + "spring.rabbitmq.listener.simple.missingQueuesFatal:false", + "spring.rabbitmq.listener.simple.force-stop:true") .run((context) -> { SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory = context .getBean("rabbitListenerContainerFactory", SimpleRabbitListenerContainerFactory.class); @@ -531,6 +532,17 @@ void testSimpleRabbitListenerContainerFactoryWithCustomSettings() { }); } + @Test + void testSimpleRabbitListenerContainerFactoryWithDefaultForceStop() { + this.contextRunner + .withUserConfiguration(MessageConvertersConfiguration.class, MessageRecoverersConfiguration.class) + .run((context) -> { + SimpleRabbitListenerContainerFactory containerFactory = context + .getBean("rabbitListenerContainerFactory", SimpleRabbitListenerContainerFactory.class); + assertThat(containerFactory).hasFieldOrPropertyWithValue("forceStop", false); + }); + } + @Test void testDirectRabbitListenerContainerFactoryWithCustomSettings() { this.contextRunner @@ -547,7 +559,8 @@ void testDirectRabbitListenerContainerFactoryWithCustomSettings() { "spring.rabbitmq.listener.direct.prefetch:40", "spring.rabbitmq.listener.direct.defaultRequeueRejected:false", "spring.rabbitmq.listener.direct.idleEventInterval:5", - "spring.rabbitmq.listener.direct.missingQueuesFatal:true") + "spring.rabbitmq.listener.direct.missingQueuesFatal:true", + "spring.rabbitmq.listener.direct.force-stop:true") .run((context) -> { DirectRabbitListenerContainerFactory rabbitListenerContainerFactory = context .getBean("rabbitListenerContainerFactory", DirectRabbitListenerContainerFactory.class); @@ -557,6 +570,18 @@ void testDirectRabbitListenerContainerFactoryWithCustomSettings() { }); } + @Test + void testDirectRabbitListenerContainerFactoryWithDefaultForceStop() { + this.contextRunner + .withUserConfiguration(MessageConvertersConfiguration.class, MessageRecoverersConfiguration.class) + .withPropertyValues("spring.rabbitmq.listener.type:direct") + .run((context) -> { + DirectRabbitListenerContainerFactory containerFactory = context + .getBean("rabbitListenerContainerFactory", DirectRabbitListenerContainerFactory.class); + assertThat(containerFactory).hasFieldOrPropertyWithValue("forceStop", false); + }); + } + @Test void testSimpleRabbitListenerContainerFactoryRetryWithCustomizer() { this.contextRunner.withUserConfiguration(RabbitRetryTemplateCustomizerConfiguration.class) @@ -662,6 +687,7 @@ private void checkCommonProps(AssertableApplicationContext context, context.getBean("myMessageConverter")); assertThat(containerFactory).hasFieldOrPropertyWithValue("defaultRequeueRejected", Boolean.FALSE); assertThat(containerFactory).hasFieldOrPropertyWithValue("idleEventInterval", 5L); + assertThat(containerFactory).hasFieldOrPropertyWithValue("forceStop", true); Advice[] adviceChain = containerFactory.getAdviceChain(); assertThat(adviceChain).isNotNull(); assertThat(adviceChain).hasSize(1); diff --git a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/amqp/RabbitPropertiesTests.java b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/amqp/RabbitPropertiesTests.java index 998f5a88d9c6..61e89c08222a 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/amqp/RabbitPropertiesTests.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/amqp/RabbitPropertiesTests.java @@ -322,6 +322,7 @@ void simpleContainerUseConsistentDefaultValues() { assertThat(container).hasFieldOrPropertyWithValue("missingQueuesFatal", simple.isMissingQueuesFatal()); assertThat(container).hasFieldOrPropertyWithValue("deBatchingEnabled", simple.isDeBatchingEnabled()); assertThat(container).hasFieldOrPropertyWithValue("consumerBatchEnabled", simple.isConsumerBatchEnabled()); + assertThat(container).hasFieldOrPropertyWithValue("forceStop", simple.isForceStop()); } @Test @@ -332,6 +333,7 @@ void directContainerUseConsistentDefaultValues() { assertThat(direct.isAutoStartup()).isEqualTo(container.isAutoStartup()); assertThat(container).hasFieldOrPropertyWithValue("missingQueuesFatal", direct.isMissingQueuesFatal()); assertThat(container).hasFieldOrPropertyWithValue("deBatchingEnabled", direct.isDeBatchingEnabled()); + assertThat(container).hasFieldOrPropertyWithValue("forceStop", direct.isForceStop()); } @Test