Skip to content

Commit

Permalink
Merge pull request #9 from adrienlauer/fix-8-sessions-cleaning
Browse files Browse the repository at this point in the history
Cleanup session tracking after close, fixes #8
  • Loading branch information
adrienlauer authored Feb 23, 2017
2 parents f173d8a + 18acddb commit d601481
Show file tree
Hide file tree
Showing 11 changed files with 145 additions and 99 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
# Version 3.0.1 (2017-02-23)

* [fix] JMS sessions were still tracked after being closed leading to trying to reconnect all sessions used in the past.

# Version 3.0.0 (2016-12-13)

* [brk] Update to SeedStack 16.11 new configuration system.
Expand Down
6 changes: 3 additions & 3 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,12 @@

<groupId>org.seedstack.addons.jms</groupId>
<artifactId>jms</artifactId>
<version>3.0.0-SNAPSHOT</version>
<version>3.0.1-SNAPSHOT</version>

<properties>
<seed.version>3.0.0</seed.version>
<seed.version>3.1.0</seed.version>

<compatibility.skip>true</compatibility.skip>
<compatibility.version>3.0.0</compatibility.version>

<bintray.package>jms-addon</bintray.package>
</properties>
Expand Down
1 change: 1 addition & 0 deletions src/it/resources/application.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
#

logging: WARN
jms:
connectionFactories:
connectionFactory1:
Expand Down
50 changes: 28 additions & 22 deletions src/main/java/org/seedstack/jms/internal/ManagedConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
*/
package org.seedstack.jms.internal;

import com.google.common.collect.Sets;
import org.seedstack.jms.spi.ConnectionDefinition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -20,9 +21,8 @@
import javax.jms.ServerSessionPool;
import javax.jms.Session;
import javax.jms.Topic;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.List;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.atomic.AtomicBoolean;
Expand All @@ -35,15 +35,12 @@
*/
class ManagedConnection implements Connection, ExceptionListener {
private static final Logger LOGGER = LoggerFactory.getLogger(ManagedConnection.class);

private final List<ManagedSession> sessions = new ArrayList<>();
private final Set<ManagedSession> sessions = Sets.newConcurrentHashSet();
private final AtomicBoolean needToStart = new AtomicBoolean(false);

private final ConnectionDefinition connectionDefinition;
private final JmsFactoryImpl jmsFactoryImpl;
private final AtomicBoolean scheduleInProgress;
private final ReentrantReadWriteLock connectionLock = new ReentrantReadWriteLock();

private Connection connection;
private ExceptionListener exceptionListener;

Expand All @@ -57,15 +54,15 @@ class ManagedConnection implements Connection, ExceptionListener {
}

private Connection createConnection() throws JMSException {
LOGGER.debug("Initializing managed JMS connection {}", connectionDefinition.getName());

Connection newConnection = jmsFactoryImpl.createRawConnection(connectionDefinition);

// Set the exception listener to ourselves so we can monitor the underlying connection
if (!connectionDefinition.isJeeMode()) {
newConnection.setExceptionListener(this);
}

LOGGER.debug("Initialized JMS connection {}", connectionDefinition.getName());

return newConnection;
}

Expand All @@ -77,8 +74,8 @@ public void run() {

try {
// Recreate the connection
LOGGER.info("Recreating managed JMS connection {}", connectionDefinition.getName());
connection = createConnection();
LOGGER.info("Recreated JMS connection {}", connectionDefinition.getName());

// Refresh sessions
for (ManagedSession session : sessions) {
Expand All @@ -87,12 +84,12 @@ public void run() {

// Start the new connection if needed
if (needToStart.get()) {
LOGGER.info("Restarting managed JMS connection {}", connectionDefinition.getName());
connection.start();
scheduleInProgress.set(false);
LOGGER.info("Restarted JMS connection {}", connectionDefinition.getName());
}
} catch (Exception e) {
LOGGER.error("Failed to restart JMS connection {}, next attempt in {} ms", connectionDefinition.getName(), connectionDefinition.getReconnectionDelay());
LOGGER.error("Failed to restart managed JMS connection {}, next attempt in {} ms", connectionDefinition.getName(), connectionDefinition.getReconnectionDelay());
scheduleReconnection();
} finally {
connectionLock.writeLock().unlock();
Expand All @@ -110,7 +107,7 @@ private Connection getConnection() throws JMSException {
connectionLock.readLock().lock();
try {
if (connection == null) {
throw new JMSException("Connection " + connectionDefinition.getName() + " is not ready");
throw new JMSException("Managed JMS connection " + connectionDefinition.getName() + " is not ready");
}

return connection;
Expand All @@ -121,7 +118,7 @@ private Connection getConnection() throws JMSException {

@Override
public void onException(JMSException exception) {
LOGGER.error("An exception occurred on JMS connection {}", connectionDefinition.getName());
LOGGER.error("An exception occurred on managed JMS connection {}", connectionDefinition.getName());
if (exception != null && LOGGER.isDebugEnabled()) {
LOGGER.debug("Original exception below", exception);
}
Expand All @@ -135,10 +132,10 @@ public void onException(JMSException exception) {

private void reset() {
if (scheduleInProgress.getAndSet(true)) {
LOGGER.debug("JMS connection {} already scheduled for restart", connectionDefinition.getName());
LOGGER.debug("Managed JMS connection {} already scheduled for restart", connectionDefinition.getName());
} else {
// reset the connection
LOGGER.warn("Closing JMS connection {} and scheduling restart in {} ms", connectionDefinition.getName(), connectionDefinition.getReconnectionDelay());
LOGGER.warn("Resetting managed JMS connection {} and scheduling restart in {} ms", connectionDefinition.getName(), connectionDefinition.getReconnectionDelay());

connectionLock.writeLock().lock();
try {
Expand All @@ -152,7 +149,7 @@ private void reset() {
try {
connection.close();
} catch (JMSException e) {
LOGGER.warn("Unable to cleanly close the JMS connection {}", connectionDefinition.getName());
LOGGER.warn("Unable to cleanly close the managed JMS connection {}", connectionDefinition.getName());
}
}
connection = null;
Expand All @@ -171,7 +168,12 @@ private void reset() {
public Session createSession(boolean transacted, int acknowledgeMode) throws JMSException {
connectionLock.readLock().lock();
try {
ManagedSession managedSession = new ManagedSession(getConnection().createSession(transacted, acknowledgeMode), transacted, acknowledgeMode, connectionDefinition.isJeeMode());
ManagedSession managedSession = new ManagedSession(
getConnection().createSession(transacted, acknowledgeMode),
transacted,
acknowledgeMode,
connectionDefinition.isJeeMode(),
this);
sessions.add(managedSession);
return managedSession;
} finally {
Expand All @@ -191,22 +193,22 @@ public ConnectionConsumer createDurableConnectionConsumer(Topic topic, String su

@Override
public void close() throws JMSException {
LOGGER.info("Closing managed JMS connection {}", connectionDefinition.getName());
getConnection().close();
LOGGER.info("Closed JMS connection {}", connectionDefinition.getName());
}

@Override
public void start() throws JMSException {
LOGGER.info("Starting managed JMS connection {}", connectionDefinition.getName());
getConnection().start();
needToStart.set(true);
LOGGER.info("Started JMS connection {}", connectionDefinition.getName());
}

@Override
public void stop() throws JMSException {
LOGGER.info("Stopping managed JMS connection {}", connectionDefinition.getName());
getConnection().stop();
needToStart.set(false);
LOGGER.info("Stopped JMS connection {}", connectionDefinition.getName());
}

@Override
Expand All @@ -226,11 +228,15 @@ public ExceptionListener getExceptionListener() throws JMSException {

@Override
public void setClientID(String clientID) throws JMSException {
throw new IllegalStateException("Client id cannot be changed on managed connections");
throw new IllegalStateException("Client ID cannot be changed on managed connections");
}

@Override
public String getClientID() throws JMSException {
throw new IllegalStateException("Client id cannot be retrieved on managed connections");
throw new IllegalStateException("Client ID cannot be retrieved on managed connections");
}

void removeSession(ManagedSession managedSession) {
sessions.remove(managedSession);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,32 +25,35 @@
* This session is a facade of a jms messageConsumer. It allows the reconnection mechanism.
*/
class ManagedMessageConsumer implements MessageConsumer {
private static final Logger logger = LoggerFactory.getLogger(ManagedMessageConsumer.class);

private MessageListener messageListener;
private MessageConsumer messageConsumer;

private static final Logger LOGGER = LoggerFactory.getLogger(ManagedMessageConsumer.class);
private final Destination destination;
private final String messageSelector;
private final boolean noLocal;
private final boolean polling;
protected final ReentrantReadWriteLock messageConsumerLock = new ReentrantReadWriteLock();
private final ReentrantReadWriteLock messageConsumerLock = new ReentrantReadWriteLock();
private final ManagedSession managedSession;
private MessageListener messageListener;
private MessageConsumer messageConsumer;

ManagedMessageConsumer(MessageConsumer messageConsumer, Destination destination, @Nullable String messageSelector, boolean noLocal, boolean polling) {
ManagedMessageConsumer(MessageConsumer messageConsumer, Destination destination, @Nullable String messageSelector, boolean noLocal, boolean polling, ManagedSession managedSession) {
checkNotNull(messageConsumer);
checkNotNull(destination);

LOGGER.debug("Creating managed JMS message consumer {}", this);

this.messageConsumer = messageConsumer;
this.messageSelector = messageSelector;
this.destination = destination;
this.noLocal = noLocal;
this.polling = polling;
this.managedSession = managedSession;
}

void refresh(Session session) throws JMSException {
messageConsumerLock.writeLock().lock();
try {
// Create a new messageConsumer
LOGGER.debug("Refreshing managed JMS message consumer {}", this);
if (this.noLocal) {
messageConsumer = session.createConsumer(destination, messageSelector, true);
} else if (messageSelector != null && !"".equals(messageSelector)) {
Expand All @@ -71,7 +74,7 @@ void refresh(Session session) throws JMSException {
void reset() {
messageConsumerLock.writeLock().lock();
try {
logger.trace("Resetting message consumer");
LOGGER.debug("Resetting managed JMS message consumer {}", this);
messageConsumer = null;
} finally {
messageConsumerLock.writeLock().unlock();
Expand Down Expand Up @@ -107,7 +110,12 @@ public MessageListener getMessageListener() throws JMSException {

@Override
public void close() throws JMSException {
getMessageConsumer().close();
try {
LOGGER.debug("Closing managed JMS message consumer {}", this);
getMessageConsumer().close();
} finally {
managedSession.removeMessageConsumer(this);
}
}

@Override
Expand Down
Loading

0 comments on commit d601481

Please sign in to comment.