Skip to content

Commit

Permalink
Merge pull request #36539 from garyrussell
Browse files Browse the repository at this point in the history
* pr/36539:
  Polish "Add RabbitMQ container forceStop property"
  Add RabbitMQ container forceStop property

Closes gh-36539
  • Loading branch information
snicoll committed Jul 26, 2023
2 parents e8dd775 + 5406679 commit b8e8b1d
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ protected void configure(T factory, ConnectionFactory connectionFactory,
}
factory.setMissingQueuesFatal(configuration.isMissingQueuesFatal());
factory.setDeBatchingEnabled(configuration.isDeBatchingEnabled());
factory.setForceStop(configuration.isForceStop());
ListenerRetry retryConfig = configuration.getRetry();
if (retryConfig.isEnabled()) {
RetryInterceptorBuilder<?, ?> builder = (retryConfig.isStateless()) ? RetryInterceptorBuilder.stateless()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -734,6 +734,12 @@ public abstract static class AmqpContainer extends BaseContainer {
*/
private boolean deBatchingEnabled = true;

/**
* Whether the container (when stopped) should stop immediately after processing
* the current message or stop after processing all pre-fetched messages.
*/
private boolean forceStop;

/**
* Optional properties for a retry interceptor.
*/
Expand Down Expand Up @@ -781,6 +787,14 @@ public void setDeBatchingEnabled(boolean deBatchingEnabled) {
this.deBatchingEnabled = deBatchingEnabled;
}

public boolean isForceStop() {
return this.forceStop;
}

public void setForceStop(boolean forceStop) {
this.forceStop = forceStop;
}

public ListenerRetry getRetry() {
return this.retry;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -519,7 +519,8 @@ void testSimpleRabbitListenerContainerFactoryWithCustomSettings() {
"spring.rabbitmq.listener.simple.defaultRequeueRejected:false",
"spring.rabbitmq.listener.simple.idleEventInterval:5",
"spring.rabbitmq.listener.simple.batchSize:20",
"spring.rabbitmq.listener.simple.missingQueuesFatal:false")
"spring.rabbitmq.listener.simple.missingQueuesFatal:false",
"spring.rabbitmq.listener.simple.force-stop:true")
.run((context) -> {
SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory = context
.getBean("rabbitListenerContainerFactory", SimpleRabbitListenerContainerFactory.class);
Expand All @@ -531,6 +532,17 @@ void testSimpleRabbitListenerContainerFactoryWithCustomSettings() {
});
}

@Test
void testSimpleRabbitListenerContainerFactoryWithDefaultForceStop() {
this.contextRunner
.withUserConfiguration(MessageConvertersConfiguration.class, MessageRecoverersConfiguration.class)
.run((context) -> {
SimpleRabbitListenerContainerFactory containerFactory = context
.getBean("rabbitListenerContainerFactory", SimpleRabbitListenerContainerFactory.class);
assertThat(containerFactory).hasFieldOrPropertyWithValue("forceStop", false);
});
}

@Test
void testDirectRabbitListenerContainerFactoryWithCustomSettings() {
this.contextRunner
Expand All @@ -547,7 +559,8 @@ void testDirectRabbitListenerContainerFactoryWithCustomSettings() {
"spring.rabbitmq.listener.direct.prefetch:40",
"spring.rabbitmq.listener.direct.defaultRequeueRejected:false",
"spring.rabbitmq.listener.direct.idleEventInterval:5",
"spring.rabbitmq.listener.direct.missingQueuesFatal:true")
"spring.rabbitmq.listener.direct.missingQueuesFatal:true",
"spring.rabbitmq.listener.direct.force-stop:true")
.run((context) -> {
DirectRabbitListenerContainerFactory rabbitListenerContainerFactory = context
.getBean("rabbitListenerContainerFactory", DirectRabbitListenerContainerFactory.class);
Expand All @@ -557,6 +570,18 @@ void testDirectRabbitListenerContainerFactoryWithCustomSettings() {
});
}

@Test
void testDirectRabbitListenerContainerFactoryWithDefaultForceStop() {
this.contextRunner
.withUserConfiguration(MessageConvertersConfiguration.class, MessageRecoverersConfiguration.class)
.withPropertyValues("spring.rabbitmq.listener.type:direct")
.run((context) -> {
DirectRabbitListenerContainerFactory containerFactory = context
.getBean("rabbitListenerContainerFactory", DirectRabbitListenerContainerFactory.class);
assertThat(containerFactory).hasFieldOrPropertyWithValue("forceStop", false);
});
}

@Test
void testSimpleRabbitListenerContainerFactoryRetryWithCustomizer() {
this.contextRunner.withUserConfiguration(RabbitRetryTemplateCustomizerConfiguration.class)
Expand Down Expand Up @@ -662,6 +687,7 @@ private void checkCommonProps(AssertableApplicationContext context,
context.getBean("myMessageConverter"));
assertThat(containerFactory).hasFieldOrPropertyWithValue("defaultRequeueRejected", Boolean.FALSE);
assertThat(containerFactory).hasFieldOrPropertyWithValue("idleEventInterval", 5L);
assertThat(containerFactory).hasFieldOrPropertyWithValue("forceStop", true);
Advice[] adviceChain = containerFactory.getAdviceChain();
assertThat(adviceChain).isNotNull();
assertThat(adviceChain).hasSize(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,7 @@ void simpleContainerUseConsistentDefaultValues() {
assertThat(container).hasFieldOrPropertyWithValue("missingQueuesFatal", simple.isMissingQueuesFatal());
assertThat(container).hasFieldOrPropertyWithValue("deBatchingEnabled", simple.isDeBatchingEnabled());
assertThat(container).hasFieldOrPropertyWithValue("consumerBatchEnabled", simple.isConsumerBatchEnabled());
assertThat(container).hasFieldOrPropertyWithValue("forceStop", simple.isForceStop());
}

@Test
Expand All @@ -332,6 +333,7 @@ void directContainerUseConsistentDefaultValues() {
assertThat(direct.isAutoStartup()).isEqualTo(container.isAutoStartup());
assertThat(container).hasFieldOrPropertyWithValue("missingQueuesFatal", direct.isMissingQueuesFatal());
assertThat(container).hasFieldOrPropertyWithValue("deBatchingEnabled", direct.isDeBatchingEnabled());
assertThat(container).hasFieldOrPropertyWithValue("forceStop", direct.isForceStop());
}

@Test
Expand Down

0 comments on commit b8e8b1d

Please sign in to comment.