Skip to content

Commit

Permalink
Fix race condition in the RabbitTemplatePublisherCallbacksIntegration…
Browse files Browse the repository at this point in the history
…3Tests

The `CachingConnectionFactory` has an ability to wait for async channels close when it is destroyed.
However, that happens only if an `ApplicationContext` is stopped.

* Supply a mock `ApplicationContext` to the `CachingConnectionFactory` of the test
and emit respective `ContextClosedEvent` in the test before calling `cf.destroy()`
  • Loading branch information
artembilan committed Dec 23, 2024
1 parent db0c274 commit 75a05d7
Showing 1 changed file with 14 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.springframework.amqp.rabbit.core;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;

import java.util.List;
import java.util.concurrent.CountDownLatch;
Expand All @@ -33,11 +34,15 @@
import org.springframework.amqp.rabbit.junit.RabbitAvailable;
import org.springframework.amqp.rabbit.junit.RabbitAvailableCondition;
import org.springframework.amqp.utils.test.TestUtils;
import org.springframework.context.ApplicationContext;
import org.springframework.context.event.ContextClosedEvent;

import com.rabbitmq.client.Channel;

/**
* @author Gary Russell
* @author Artem Bilan
*
* @since 2.1
*
*/
Expand Down Expand Up @@ -72,15 +77,17 @@ public void testRepublishOnNackThreadNoExchange() throws Exception {

@Test
public void testDeferredChannelCacheNack() throws Exception {
final CachingConnectionFactory cf = new CachingConnectionFactory(
CachingConnectionFactory cf = new CachingConnectionFactory(
RabbitAvailableCondition.getBrokerRunning().getConnectionFactory());
cf.setPublisherReturns(true);
cf.setPublisherConfirmType(ConfirmType.CORRELATED);
final RabbitTemplate template = new RabbitTemplate(cf);
final CountDownLatch returnLatch = new CountDownLatch(1);
final CountDownLatch confirmLatch = new CountDownLatch(1);
final AtomicInteger cacheCount = new AtomicInteger();
final AtomicBoolean returnCalledFirst = new AtomicBoolean();
ApplicationContext mockApplicationContext = mock();
cf.setApplicationContext(mockApplicationContext);
RabbitTemplate template = new RabbitTemplate(cf);
CountDownLatch returnLatch = new CountDownLatch(1);
CountDownLatch confirmLatch = new CountDownLatch(1);
AtomicInteger cacheCount = new AtomicInteger();
AtomicBoolean returnCalledFirst = new AtomicBoolean();
template.setConfirmCallback((cd, a, c) -> {
cacheCount.set(TestUtils.getPropertyValue(cf, "cachedChannelsNonTransactional", List.class).size());
returnCalledFirst.set(returnLatch.getCount() == 0);
Expand All @@ -104,6 +111,7 @@ public void testDeferredChannelCacheNack() throws Exception {
assertThat(cacheCount.get()).isEqualTo(1);
assertThat(returnCalledFirst.get()).isTrue();
assertThat(correlationData.getReturned()).isNotNull();
cf.onApplicationEvent(new ContextClosedEvent(mockApplicationContext));
cf.destroy();
}

Expand Down

0 comments on commit 75a05d7

Please sign in to comment.