From 18acddb2ba305619f9036cb979547d18676ca1cc Mon Sep 17 00:00:00 2001 From: Adrien LAUER Date: Thu, 23 Feb 2017 09:48:23 +0100 Subject: [PATCH] Cleanup session tracking after close, fixes #8 --- CHANGELOG.md | 4 ++ pom.xml | 6 +- src/it/resources/application.yaml | 1 + .../jms/internal/ManagedConnection.java | 50 ++++++++------- .../jms/internal/ManagedMessageConsumer.java | 26 +++++--- .../jms/internal/ManagedSession.java | 63 +++++++++++-------- .../jms/internal/MessageListenerAdapter.java | 1 - .../jms/pollers/SimpleMessagePoller.java | 13 ++-- .../jms/internal/ManagedConnectionTest.java | 29 ++++----- .../internal/ManagedMessageConsumerTest.java | 14 +++-- .../jms/internal/ManagedSessionTest.java | 37 +++++++---- 11 files changed, 145 insertions(+), 99 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index a5c4c5b..5ffbc51 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,7 @@ +# Version 3.0.1 (2017-02-23) + +* [fix] JMS sessions were still tracked after being closed leading to trying to reconnect all sessions used in the past. + # Version 3.0.0 (2016-12-13) * [brk] Update to SeedStack 16.11 new configuration system. diff --git a/pom.xml b/pom.xml index dd52824..7c0c7bc 100644 --- a/pom.xml +++ b/pom.xml @@ -19,12 +19,12 @@ org.seedstack.addons.jms jms - 3.0.0-SNAPSHOT + 3.0.1-SNAPSHOT - 3.0.0 + 3.1.0 - true + 3.0.0 jms-addon diff --git a/src/it/resources/application.yaml b/src/it/resources/application.yaml index 6b61812..15f0034 100644 --- a/src/it/resources/application.yaml +++ b/src/it/resources/application.yaml @@ -6,6 +6,7 @@ # file, You can obtain one at http://mozilla.org/MPL/2.0/. # +logging: WARN jms: connectionFactories: connectionFactory1: diff --git a/src/main/java/org/seedstack/jms/internal/ManagedConnection.java b/src/main/java/org/seedstack/jms/internal/ManagedConnection.java index 3f68523..3846b1c 100644 --- a/src/main/java/org/seedstack/jms/internal/ManagedConnection.java +++ b/src/main/java/org/seedstack/jms/internal/ManagedConnection.java @@ -7,6 +7,7 @@ */ package org.seedstack.jms.internal; +import com.google.common.collect.Sets; import org.seedstack.jms.spi.ConnectionDefinition; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -20,9 +21,8 @@ import javax.jms.ServerSessionPool; import javax.jms.Session; import javax.jms.Topic; -import java.util.ArrayList; import java.util.Calendar; -import java.util.List; +import java.util.Set; import java.util.Timer; import java.util.TimerTask; import java.util.concurrent.atomic.AtomicBoolean; @@ -35,15 +35,12 @@ */ class ManagedConnection implements Connection, ExceptionListener { private static final Logger LOGGER = LoggerFactory.getLogger(ManagedConnection.class); - - private final List sessions = new ArrayList<>(); + private final Set sessions = Sets.newConcurrentHashSet(); private final AtomicBoolean needToStart = new AtomicBoolean(false); - private final ConnectionDefinition connectionDefinition; private final JmsFactoryImpl jmsFactoryImpl; private final AtomicBoolean scheduleInProgress; private final ReentrantReadWriteLock connectionLock = new ReentrantReadWriteLock(); - private Connection connection; private ExceptionListener exceptionListener; @@ -57,6 +54,8 @@ class ManagedConnection implements Connection, ExceptionListener { } private Connection createConnection() throws JMSException { + LOGGER.debug("Initializing managed JMS connection {}", connectionDefinition.getName()); + Connection newConnection = jmsFactoryImpl.createRawConnection(connectionDefinition); // Set the exception listener to ourselves so we can monitor the underlying connection @@ -64,8 +63,6 @@ private Connection createConnection() throws JMSException { newConnection.setExceptionListener(this); } - LOGGER.debug("Initialized JMS connection {}", connectionDefinition.getName()); - return newConnection; } @@ -77,8 +74,8 @@ public void run() { try { // Recreate the connection + LOGGER.info("Recreating managed JMS connection {}", connectionDefinition.getName()); connection = createConnection(); - LOGGER.info("Recreated JMS connection {}", connectionDefinition.getName()); // Refresh sessions for (ManagedSession session : sessions) { @@ -87,12 +84,12 @@ public void run() { // Start the new connection if needed if (needToStart.get()) { + LOGGER.info("Restarting managed JMS connection {}", connectionDefinition.getName()); connection.start(); scheduleInProgress.set(false); - LOGGER.info("Restarted JMS connection {}", connectionDefinition.getName()); } } catch (Exception e) { - LOGGER.error("Failed to restart JMS connection {}, next attempt in {} ms", connectionDefinition.getName(), connectionDefinition.getReconnectionDelay()); + LOGGER.error("Failed to restart managed JMS connection {}, next attempt in {} ms", connectionDefinition.getName(), connectionDefinition.getReconnectionDelay()); scheduleReconnection(); } finally { connectionLock.writeLock().unlock(); @@ -110,7 +107,7 @@ private Connection getConnection() throws JMSException { connectionLock.readLock().lock(); try { if (connection == null) { - throw new JMSException("Connection " + connectionDefinition.getName() + " is not ready"); + throw new JMSException("Managed JMS connection " + connectionDefinition.getName() + " is not ready"); } return connection; @@ -121,7 +118,7 @@ private Connection getConnection() throws JMSException { @Override public void onException(JMSException exception) { - LOGGER.error("An exception occurred on JMS connection {}", connectionDefinition.getName()); + LOGGER.error("An exception occurred on managed JMS connection {}", connectionDefinition.getName()); if (exception != null && LOGGER.isDebugEnabled()) { LOGGER.debug("Original exception below", exception); } @@ -135,10 +132,10 @@ public void onException(JMSException exception) { private void reset() { if (scheduleInProgress.getAndSet(true)) { - LOGGER.debug("JMS connection {} already scheduled for restart", connectionDefinition.getName()); + LOGGER.debug("Managed JMS connection {} already scheduled for restart", connectionDefinition.getName()); } else { // reset the connection - LOGGER.warn("Closing JMS connection {} and scheduling restart in {} ms", connectionDefinition.getName(), connectionDefinition.getReconnectionDelay()); + LOGGER.warn("Resetting managed JMS connection {} and scheduling restart in {} ms", connectionDefinition.getName(), connectionDefinition.getReconnectionDelay()); connectionLock.writeLock().lock(); try { @@ -152,7 +149,7 @@ private void reset() { try { connection.close(); } catch (JMSException e) { - LOGGER.warn("Unable to cleanly close the JMS connection {}", connectionDefinition.getName()); + LOGGER.warn("Unable to cleanly close the managed JMS connection {}", connectionDefinition.getName()); } } connection = null; @@ -171,7 +168,12 @@ private void reset() { public Session createSession(boolean transacted, int acknowledgeMode) throws JMSException { connectionLock.readLock().lock(); try { - ManagedSession managedSession = new ManagedSession(getConnection().createSession(transacted, acknowledgeMode), transacted, acknowledgeMode, connectionDefinition.isJeeMode()); + ManagedSession managedSession = new ManagedSession( + getConnection().createSession(transacted, acknowledgeMode), + transacted, + acknowledgeMode, + connectionDefinition.isJeeMode(), + this); sessions.add(managedSession); return managedSession; } finally { @@ -191,22 +193,22 @@ public ConnectionConsumer createDurableConnectionConsumer(Topic topic, String su @Override public void close() throws JMSException { + LOGGER.info("Closing managed JMS connection {}", connectionDefinition.getName()); getConnection().close(); - LOGGER.info("Closed JMS connection {}", connectionDefinition.getName()); } @Override public void start() throws JMSException { + LOGGER.info("Starting managed JMS connection {}", connectionDefinition.getName()); getConnection().start(); needToStart.set(true); - LOGGER.info("Started JMS connection {}", connectionDefinition.getName()); } @Override public void stop() throws JMSException { + LOGGER.info("Stopping managed JMS connection {}", connectionDefinition.getName()); getConnection().stop(); needToStart.set(false); - LOGGER.info("Stopped JMS connection {}", connectionDefinition.getName()); } @Override @@ -226,11 +228,15 @@ public ExceptionListener getExceptionListener() throws JMSException { @Override public void setClientID(String clientID) throws JMSException { - throw new IllegalStateException("Client id cannot be changed on managed connections"); + throw new IllegalStateException("Client ID cannot be changed on managed connections"); } @Override public String getClientID() throws JMSException { - throw new IllegalStateException("Client id cannot be retrieved on managed connections"); + throw new IllegalStateException("Client ID cannot be retrieved on managed connections"); + } + + void removeSession(ManagedSession managedSession) { + sessions.remove(managedSession); } } diff --git a/src/main/java/org/seedstack/jms/internal/ManagedMessageConsumer.java b/src/main/java/org/seedstack/jms/internal/ManagedMessageConsumer.java index 4f25711..39e945f 100644 --- a/src/main/java/org/seedstack/jms/internal/ManagedMessageConsumer.java +++ b/src/main/java/org/seedstack/jms/internal/ManagedMessageConsumer.java @@ -25,32 +25,35 @@ * This session is a facade of a jms messageConsumer. It allows the reconnection mechanism. */ class ManagedMessageConsumer implements MessageConsumer { - private static final Logger logger = LoggerFactory.getLogger(ManagedMessageConsumer.class); - - private MessageListener messageListener; - private MessageConsumer messageConsumer; - + private static final Logger LOGGER = LoggerFactory.getLogger(ManagedMessageConsumer.class); private final Destination destination; private final String messageSelector; private final boolean noLocal; private final boolean polling; - protected final ReentrantReadWriteLock messageConsumerLock = new ReentrantReadWriteLock(); + private final ReentrantReadWriteLock messageConsumerLock = new ReentrantReadWriteLock(); + private final ManagedSession managedSession; + private MessageListener messageListener; + private MessageConsumer messageConsumer; - ManagedMessageConsumer(MessageConsumer messageConsumer, Destination destination, @Nullable String messageSelector, boolean noLocal, boolean polling) { + ManagedMessageConsumer(MessageConsumer messageConsumer, Destination destination, @Nullable String messageSelector, boolean noLocal, boolean polling, ManagedSession managedSession) { checkNotNull(messageConsumer); checkNotNull(destination); + LOGGER.debug("Creating managed JMS message consumer {}", this); + this.messageConsumer = messageConsumer; this.messageSelector = messageSelector; this.destination = destination; this.noLocal = noLocal; this.polling = polling; + this.managedSession = managedSession; } void refresh(Session session) throws JMSException { messageConsumerLock.writeLock().lock(); try { // Create a new messageConsumer + LOGGER.debug("Refreshing managed JMS message consumer {}", this); if (this.noLocal) { messageConsumer = session.createConsumer(destination, messageSelector, true); } else if (messageSelector != null && !"".equals(messageSelector)) { @@ -71,7 +74,7 @@ void refresh(Session session) throws JMSException { void reset() { messageConsumerLock.writeLock().lock(); try { - logger.trace("Resetting message consumer"); + LOGGER.debug("Resetting managed JMS message consumer {}", this); messageConsumer = null; } finally { messageConsumerLock.writeLock().unlock(); @@ -107,7 +110,12 @@ public MessageListener getMessageListener() throws JMSException { @Override public void close() throws JMSException { - getMessageConsumer().close(); + try { + LOGGER.debug("Closing managed JMS message consumer {}", this); + getMessageConsumer().close(); + } finally { + managedSession.removeMessageConsumer(this); + } } @Override diff --git a/src/main/java/org/seedstack/jms/internal/ManagedSession.java b/src/main/java/org/seedstack/jms/internal/ManagedSession.java index 4f0479a..457a1b6 100644 --- a/src/main/java/org/seedstack/jms/internal/ManagedSession.java +++ b/src/main/java/org/seedstack/jms/internal/ManagedSession.java @@ -30,47 +30,44 @@ import javax.jms.Topic; import javax.jms.TopicSubscriber; import java.io.Serializable; -import java.util.ArrayList; -import java.util.List; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.locks.ReentrantReadWriteLock; import static com.google.common.base.Preconditions.checkNotNull; /** - * This session is a facade of a jms session. It allows the reconnection mechanism. + * This session is a facade of a JMS session. It allows the reconnection mechanism. */ class ManagedSession implements Session { - - private static final Logger logger = LoggerFactory.getLogger(ManagedSession.class); - + private static final Logger LOGGER = LoggerFactory.getLogger(ManagedSession.class); private final Boolean transacted; - private final Integer acknowledgeMode; - - private Session session; - private final boolean polling; + private final Set messageConsumers = ConcurrentHashMap.newKeySet(); + private final ReentrantReadWriteLock sessionLock = new ReentrantReadWriteLock(); + private final ManagedConnection managedConnection; + private Session session; - private List managedMessageConsumers = new ArrayList<>(); + ManagedSession(Session session, boolean transacted, int acknowledgeMode, boolean polling, ManagedConnection managedConnection) { + checkNotNull(session); - private ReentrantReadWriteLock sessionLock = new ReentrantReadWriteLock(); + LOGGER.debug("Creating managed JMS session {}", this); - ManagedSession(Session session, boolean transacted, int acknowledgeMode, boolean polling) { - checkNotNull(session); - this.session = session; this.transacted = transacted; this.acknowledgeMode = acknowledgeMode; this.polling = polling; + this.managedConnection = managedConnection; } void refresh(Connection connection) throws JMSException { sessionLock.writeLock().lock(); try { + LOGGER.debug("Refreshing managed JMS session {}", this); session = connection.createSession(this.transacted, this.acknowledgeMode); - - for (ManagedMessageConsumer managedMessageConsumer : managedMessageConsumers) { - managedMessageConsumer.refresh(session); + for (ManagedMessageConsumer messageConsumer : messageConsumers) { + messageConsumer.refresh(session); } } finally { sessionLock.writeLock().unlock(); @@ -78,14 +75,14 @@ void refresh(Connection connection) throws JMSException { } /** - * Reset the session and the message consumers on cascade. + * Reset the session and the message consumers in cascade. */ void reset() { sessionLock.writeLock().lock(); try { - logger.trace("Resetting session"); + LOGGER.debug("Resetting managed JMS session {}", this); session = null; - for (ManagedMessageConsumer managedMessageConsumer : managedMessageConsumers) { + for (ManagedMessageConsumer managedMessageConsumer : messageConsumers) { managedMessageConsumer.reset(); } } finally { @@ -99,7 +96,6 @@ private Session getSession() throws JMSException { if (session == null) { throw new JMSException("Attempt to use a session during connection refresh"); } - return session; } finally { sessionLock.readLock().unlock(); @@ -168,7 +164,12 @@ public void rollback() throws JMSException { @Override public void close() throws JMSException { - getSession().close(); + try { + LOGGER.debug("Closing managed JMS session {}", this); + getSession().close(); + } finally { + managedConnection.removeSession(this); + } } @Override @@ -215,9 +216,15 @@ public MessageConsumer createConsumer(Destination destination, String messageSel } @Override - public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean NoLocal) throws JMSException { - ManagedMessageConsumer consumer = new ManagedMessageConsumer(getSession().createConsumer(destination, messageSelector, NoLocal), destination, messageSelector, NoLocal, polling); - managedMessageConsumers.add(consumer); + public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal) throws JMSException { + ManagedMessageConsumer consumer = new ManagedMessageConsumer( + getSession().createConsumer(destination, messageSelector, noLocal), + destination, + messageSelector, + noLocal, + polling, + this); + messageConsumers.add(consumer); return consumer; } @@ -265,4 +272,8 @@ public TemporaryTopic createTemporaryTopic() throws JMSException { public void unsubscribe(String name) throws JMSException { getSession().unsubscribe(name); } + + void removeMessageConsumer(ManagedMessageConsumer managedMessageConsumer) { + messageConsumers.remove(managedMessageConsumer); + } } diff --git a/src/main/java/org/seedstack/jms/internal/MessageListenerAdapter.java b/src/main/java/org/seedstack/jms/internal/MessageListenerAdapter.java index 0728a05..12295b5 100644 --- a/src/main/java/org/seedstack/jms/internal/MessageListenerAdapter.java +++ b/src/main/java/org/seedstack/jms/internal/MessageListenerAdapter.java @@ -18,7 +18,6 @@ class MessageListenerAdapter implements MessageListener { @Inject private static Injector injector; - private final Key key; private final String name; diff --git a/src/main/java/org/seedstack/jms/pollers/SimpleMessagePoller.java b/src/main/java/org/seedstack/jms/pollers/SimpleMessagePoller.java index 6c61500..653d1ec 100644 --- a/src/main/java/org/seedstack/jms/pollers/SimpleMessagePoller.java +++ b/src/main/java/org/seedstack/jms/pollers/SimpleMessagePoller.java @@ -31,19 +31,16 @@ */ public class SimpleMessagePoller implements MessagePoller, Runnable { private static final Logger LOGGER = LoggerFactory.getLogger(SimpleMessagePoller.class); - private final AtomicBoolean active = new AtomicBoolean(false); private final Timer timer = new Timer(); + private long receiveTimeout = 30000; + private int restartDelay = 10000; private Thread thread; - private Session session; private ExceptionListener exceptionListener; private MessageListener messageListener; private MessageConsumer messageConsumer; - private long receiveTimeout = 30000; - private int restartDelay = 10000; - @Override public void setSession(Session session) { this.session = session; @@ -85,7 +82,7 @@ public synchronized void stop() { @Override public void run() { - LOGGER.info("Starting to poll messages for JMS listener {}", messageListener); + LOGGER.debug("Starting to poll messages for JMS listener {}", messageListener); while (active.get()) { try { @@ -120,7 +117,7 @@ public void run() { LOGGER.error("Unable to schedule polling restart for JMS listener {}, consider restarting the poller manually if possible", messageListener); } } else { - LOGGER.info("Stopping to poll messages for JMS listener {}", messageListener, restartDelay); + LOGGER.debug("Stopping to poll messages for JMS listener {}", messageListener, restartDelay); } } @@ -133,7 +130,7 @@ private void startThread() { private class MyTimerTask extends TimerTask { private final SimpleMessagePoller poller; - public MyTimerTask(SimpleMessagePoller poller) { + MyTimerTask(SimpleMessagePoller poller) { this.poller = poller; } diff --git a/src/test/java/org/seedstack/jms/internal/ManagedConnectionTest.java b/src/test/java/org/seedstack/jms/internal/ManagedConnectionTest.java index 8072436..7c416fd 100644 --- a/src/test/java/org/seedstack/jms/internal/ManagedConnectionTest.java +++ b/src/test/java/org/seedstack/jms/internal/ManagedConnectionTest.java @@ -8,7 +8,7 @@ package org.seedstack.jms.internal; import com.google.common.collect.Lists; - +import com.google.common.collect.Sets; import org.assertj.core.api.Assertions; import org.junit.Before; import org.junit.Test; @@ -24,6 +24,9 @@ import javax.jms.ExceptionListener; import javax.jms.JMSException; import javax.jms.Session; +import java.util.Set; + +import static org.assertj.core.api.Assertions.assertThat; @RunWith(MockitoJUnitRunner.class) public class ManagedConnectionTest { @@ -57,18 +60,13 @@ public void connection_should_be_refreshed_on_failure() throws JMSException, Int Assertions.assertThat(connection).isNotNull(); // Mock the created session ManagedSession session = Mockito.mock(ManagedSession.class); - Whitebox.setInternalState(underTest, "sessions", Lists.newArrayList(session)); + Whitebox.setInternalState(underTest, "sessions", Sets.newConcurrentHashSet(Lists.newArrayList(session))); // Failure Object jmsFactoryImpl = Whitebox.getInternalState(underTest, "jmsFactoryImpl"); Whitebox.setInternalState(underTest, "jmsFactoryImpl", new FakeConnectionFactoryImpl()); - underTest.setExceptionListener(new javax.jms.ExceptionListener() { - - @Override - public void onException(JMSException exception) { - - } - }); + underTest.setExceptionListener(exception -> { + }); underTest.onException(new JMSException("Connection closed")); // Reset @@ -90,11 +88,7 @@ public void onException(JMSException exception) { @Test public void test_that_wraped_exceptionlistener_from_managedConnection_differs_from_jmsbroker_connection() throws InterruptedException, JMSException { Whitebox.setInternalState(underTest, "jmsFactoryImpl", new FakeConnectionFactoryImpl()); - javax.jms.ExceptionListener exceptionListener = new javax.jms.ExceptionListener() { - @Override - public void onException(JMSException e) { - - } + javax.jms.ExceptionListener exceptionListener = e -> { }; underTest.setExceptionListener(exceptionListener); Connection jmsConnection = (Connection) Whitebox.getInternalState(underTest, "connection"); @@ -102,6 +96,13 @@ public void onException(JMSException e) { javax.jms.ExceptionListener exceptionListenerMC = (ExceptionListener) Whitebox.getInternalState(underTest, "exceptionListener"); Assertions.assertThat(exceptionListenerAQ).isNotEqualTo(exceptionListenerMC); Assertions.assertThat(exceptionListenerMC).isEqualTo(exceptionListener); + } + @Test + public void consumerIsRemovedFromSessionAfterClose() throws Exception { + Session session = underTest.createSession(true, Session.AUTO_ACKNOWLEDGE); + assertThat((Set) Whitebox.getInternalState(underTest, "sessions")).containsExactly((ManagedSession) session); + session.close(); + assertThat((Set) Whitebox.getInternalState(underTest, "sessions")).isEmpty(); } } diff --git a/src/test/java/org/seedstack/jms/internal/ManagedMessageConsumerTest.java b/src/test/java/org/seedstack/jms/internal/ManagedMessageConsumerTest.java index f117c5a..ee19902 100644 --- a/src/test/java/org/seedstack/jms/internal/ManagedMessageConsumerTest.java +++ b/src/test/java/org/seedstack/jms/internal/ManagedMessageConsumerTest.java @@ -15,9 +15,15 @@ import org.mockito.internal.util.reflection.Whitebox; import org.mockito.runners.MockitoJUnitRunner; -import javax.jms.*; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; -import static org.mockito.Mockito.*; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; @RunWith(MockitoJUnitRunner.class) public class ManagedMessageConsumerTest { @@ -28,13 +34,13 @@ public class ManagedMessageConsumerTest { @Mock private Destination destination; @Mock - private Session session; + private ManagedSession session; private MyMessageListener messageListener = new MyMessageListener(); @Before public void setUp() throws JMSException { - underTest = new ManagedMessageConsumer(messageConsumer, destination, null, false, false); + underTest = new ManagedMessageConsumer(messageConsumer, destination, null, false, false, session); when(session.createConsumer(destination)).thenReturn(messageConsumer); } diff --git a/src/test/java/org/seedstack/jms/internal/ManagedSessionTest.java b/src/test/java/org/seedstack/jms/internal/ManagedSessionTest.java index 5a4c913..b51e867 100644 --- a/src/test/java/org/seedstack/jms/internal/ManagedSessionTest.java +++ b/src/test/java/org/seedstack/jms/internal/ManagedSessionTest.java @@ -8,7 +8,7 @@ package org.seedstack.jms.internal; import com.google.common.collect.Lists; -import org.assertj.core.api.Assertions; +import com.google.common.collect.Sets; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -17,15 +17,20 @@ import org.mockito.internal.util.reflection.Whitebox; import org.mockito.runners.MockitoJUnitRunner; -import javax.jms.*; -import java.util.List; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.MessageConsumer; +import javax.jms.Session; +import java.util.Set; + +import static org.assertj.core.api.Assertions.assertThat; @RunWith(MockitoJUnitRunner.class) public class ManagedSessionTest { private ManagedSession underTest; @Mock - private Connection connection; + private ManagedConnection connection; @Mock private Session session; @Mock @@ -33,7 +38,7 @@ public class ManagedSessionTest { @Before public void setUp() throws JMSException { - underTest = new ManagedSession(session, true, Session.AUTO_ACKNOWLEDGE, false); + underTest = new ManagedSession(session, true, Session.AUTO_ACKNOWLEDGE, false, connection); Mockito.when(session.createConsumer(destination, null, false)).thenReturn(Mockito.mock(MessageConsumer.class)); } @@ -41,23 +46,23 @@ public void setUp() throws JMSException { public void session_is_reset() throws JMSException { // Check the session state Session actualSession = (Session) Whitebox.getInternalState(underTest, "session"); - Assertions.assertThat(actualSession).isNotNull(); + assertThat(actualSession).isNotNull(); // Create two consumers underTest.createConsumer(destination); underTest.createConsumer(destination); - List managedMessageConsumers = (List) Whitebox.getInternalState(underTest, "managedMessageConsumers"); - Assertions.assertThat(managedMessageConsumers).hasSize(2); + Set managedMessageConsumers = (Set) Whitebox.getInternalState(underTest, "messageConsumers"); + assertThat(managedMessageConsumers).hasSize(2); // Mock the message consumers ManagedMessageConsumer messageConsumer1 = Mockito.mock(ManagedMessageConsumer.class); ManagedMessageConsumer messageConsumer2 = Mockito.mock(ManagedMessageConsumer.class); - Whitebox.setInternalState(underTest, "managedMessageConsumers", Lists.newArrayList(messageConsumer1, messageConsumer2)); + Whitebox.setInternalState(underTest, "messageConsumers", Sets.newConcurrentHashSet(Lists.newArrayList(messageConsumer1, messageConsumer2))); // reset the connection and the message consumers on cascade underTest.reset(); actualSession = (Session) Whitebox.getInternalState(underTest, "session"); - Assertions.assertThat(actualSession).isNull(); + assertThat(actualSession).isNull(); Mockito.verify(messageConsumer1, Mockito.times(1)).reset(); Mockito.verify(messageConsumer2, Mockito.times(1)).reset(); } @@ -67,7 +72,7 @@ public void session_is_refreshed() throws JMSException { // Mock ManagedMessageConsumer messageConsumer1 = Mockito.mock(ManagedMessageConsumer.class); ManagedMessageConsumer messageConsumer2 = Mockito.mock(ManagedMessageConsumer.class); - Whitebox.setInternalState(underTest, "managedMessageConsumers", Lists.newArrayList(messageConsumer1, messageConsumer2)); + Whitebox.setInternalState(underTest, "messageConsumers", Sets.newConcurrentHashSet(Lists.newArrayList(messageConsumer1, messageConsumer2))); Mockito.when(connection.createSession(true, Session.AUTO_ACKNOWLEDGE)).thenReturn(session); // Create two consumers @@ -80,10 +85,18 @@ public void session_is_refreshed() throws JMSException { // refresh session underTest.refresh(connection); Session actualSession = (Session) Whitebox.getInternalState(underTest, "session"); - Assertions.assertThat(actualSession).isNotNull(); + assertThat(actualSession).isNotNull(); // refresh consumers on cascade Mockito.verify(messageConsumer1, Mockito.times(1)).refresh(actualSession); Mockito.verify(messageConsumer2, Mockito.times(1)).refresh(actualSession); } + + @Test + public void consumerIsRemovedFromSessionAfterClose() throws Exception { + MessageConsumer consumer = underTest.createConsumer(destination); + assertThat((Set) Whitebox.getInternalState(underTest, "messageConsumers")).containsExactly((ManagedMessageConsumer) consumer); + consumer.close(); + assertThat((Set) Whitebox.getInternalState(underTest, "messageConsumers")).isEmpty(); + } }