Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add pluggable strategy for dealing with reply to queues. #299

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions src/main/java/com/rabbitmq/jms/admin/RMQConnectionFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,12 @@
import com.rabbitmq.client.MetricsCollector;
import com.rabbitmq.jms.client.ConfirmListener;
import com.rabbitmq.jms.client.ConnectionParams;
import com.rabbitmq.jms.client.DefaultReplyToStrategy;
import com.rabbitmq.jms.client.RMQConnection;
import com.rabbitmq.jms.client.RMQMessage;
import com.rabbitmq.jms.client.ReceivingContext;
import com.rabbitmq.jms.client.ReceivingContextConsumer;
import com.rabbitmq.jms.client.ReplyToStrategy;
import com.rabbitmq.jms.client.RmqJmsContext;
import com.rabbitmq.jms.client.SendingContext;
import com.rabbitmq.jms.client.SendingContextConsumer;
Expand Down Expand Up @@ -263,6 +265,14 @@ public class RMQConnectionFactory implements ConnectionFactory, Referenceable, S
*/
private boolean validateSubscriptionNames = false;

/**
* The strategy to applied to reply to queue handling. Defaults to handling
* "amq.rabbitmq.reply-to" queues.
*
* @since 2.9.0
*/
private ReplyToStrategy replyToStrategy = DefaultReplyToStrategy.INSTANCE;

/**
* {@inheritDoc}
*/
Expand Down Expand Up @@ -345,6 +355,7 @@ protected Connection createConnection(String username, String password, Connecti
.setRequeueOnTimeout(this.requeueOnTimeout)
.setKeepTextMessageType(this.keepTextMessageType)
.setValidateSubscriptionNames(this.validateSubscriptionNames)
.setReplyToStrategy(replyToStrategy)
);
logger.debug("Connection {} created.", conn);
return conn;
Expand Down Expand Up @@ -1059,6 +1070,26 @@ public void setDeclareReplyToDestination(boolean declareReplyToDestination) {
this.declareReplyToDestination = declareReplyToDestination;
}

/**
* Sets the strategy to use when receiving messages with a reply to
* specified.
*
* @param replyToStrategy The reply to strategy.
*/
public void setReplyToStrategy(final ReplyToStrategy replyToStrategy) {
this.replyToStrategy = replyToStrategy;
}

/**
* Gets ths reply to strategy to use when receiving messages with a
* reply to specified.
*
* @return The strategy.
*/
public ReplyToStrategy getReplyToStrategy() {
return replyToStrategy;
}

/**
* Set the callback to be notified of publisher confirms.
* <p>
Expand Down
17 changes: 17 additions & 0 deletions src/main/java/com/rabbitmq/jms/client/ConnectionParams.java
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,14 @@ public class ConnectionParams {

private boolean validateSubscriptionNames = false;

/**
* The reply to strategy to use when dealing with received messages
* with a reply to specified.
*
* @since 2.9.0
*/
private ReplyToStrategy replyToStrategy = DefaultReplyToStrategy.INSTANCE;

public Connection getRabbitConnection() {
return rabbitConnection;
}
Expand Down Expand Up @@ -271,4 +279,13 @@ public ConnectionParams setValidateSubscriptionNames(boolean validateSubscriptio
public boolean isValidateSubscriptionNames() {
return validateSubscriptionNames;
}

public ConnectionParams setReplyToStrategy(final ReplyToStrategy replyToStrategy) {
this.replyToStrategy = replyToStrategy;
return this;
}

public ReplyToStrategy getReplyToStrategy() {
return replyToStrategy;
}
}
44 changes: 44 additions & 0 deletions src/main/java/com/rabbitmq/jms/client/DefaultReplyToStrategy.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at https://mozilla.org/MPL/2.0/.
//
// Copyright (c) 2016-2022 VMware, Inc. or its affiliates. All rights reserved.
package com.rabbitmq.jms.client;

import javax.jms.JMSException;

import com.rabbitmq.jms.admin.RMQDestination;

/**
* Default implementation of the reply to strategy.
* <b>
* This will ensure that any reply to queues using the
* amq.rabbitmq.reply-to.<id> are correctly created.
*
* @since 2.9.0
*/
public class DefaultReplyToStrategy implements ReplyToStrategy {

private static final long serialVersionUID = -496756742546656456L;

/** An instance of the strategy to avoid having to create multiple copies of the object. */
public static final DefaultReplyToStrategy INSTANCE = new DefaultReplyToStrategy();

/**
* Handles the reply to on a received message.
*
* @param message The RMQMessage that has been received.
* @param replyTo The reply to queue value received.
* @throws JMSException if there's an issue updating the RMQMessage
*/
@Override
public void handleReplyTo(
final RMQDestination dest,
final RMQMessage message,
final String replyTo) throws JMSException {

if (replyTo != null && replyTo.startsWith(DIRECT_REPLY_TO)) {
message.setJMSReplyTo(new RMQDestination(DIRECT_REPLY_TO, "", replyTo, replyTo));
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at https://mozilla.org/MPL/2.0/.
//
// Copyright (c) 2016-2022 VMware, Inc. or its affiliates. All rights reserved.
package com.rabbitmq.jms.client;

import javax.jms.JMSException;

import com.rabbitmq.jms.admin.RMQDestination;

/**
* Implementation of the reply to strategy that deals with any reply
* to value received and will use the default, i.e. "", exchange.
* <b>
* If the reply-to starts with "amq.rabbitmq.reply-to", this will
* correctly handle these types of temporary queues.
*
* @since 2.9.0
*/
public class HandleAnyReplyToStrategy implements ReplyToStrategy {

private static final long serialVersionUID = -496756742546656456L;

/** An instance of the strategy to avoid having to create multiple copies of the object. */
public static final HandleAnyReplyToStrategy INSTANCE = new HandleAnyReplyToStrategy();

/**
* Handles the reply to on a received message.
*
* @param message The RMQMessage that has been received.
* @param replyTo The reply to queue value received.
* @throws JMSException if there's an issue updating the RMQMessage
*/
@Override
public void handleReplyTo(
final RMQDestination dest,
final RMQMessage message,
final String replyTo) throws JMSException {

if (replyTo != null && "".equals(replyTo) == false) {
if (replyTo.startsWith(DIRECT_REPLY_TO)) {
message.setJMSReplyTo(new RMQDestination(DIRECT_REPLY_TO, "", replyTo, replyTo));
} else {
message.setJMSReplyTo(new RMQDestination(replyTo, "", replyTo, replyTo));
}
}
}
}
22 changes: 22 additions & 0 deletions src/main/java/com/rabbitmq/jms/client/RMQConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,14 @@ public class RMQConnection implements Connection, QueueConnection, TopicConnecti

private final DelayedMessageService delayedMessageService;

/**
* The reply to strategy to use when handling reply to properties
* on received messages.
*
* @Since 2.9.0
*/
private final ReplyToStrategy replyToStrategy;

/**
* Creates an RMQConnection object.
* @param connectionParams parameters for this connection
Expand Down Expand Up @@ -192,6 +200,7 @@ public RMQConnection(ConnectionParams connectionParams) {
this.keepTextMessageType = connectionParams.isKeepTextMessageType();
this.validateSubscriptionNames = connectionParams.isValidateSubscriptionNames();
this.delayedMessageService = new DelayedMessageService();
this.replyToStrategy = connectionParams.getReplyToStrategy();
}

/**
Expand Down Expand Up @@ -250,6 +259,7 @@ public Session createSession(boolean transacted, int acknowledgeMode) throws JMS
.setKeepTextMessageType(this.keepTextMessageType)
.setValidateSubscriptionNames(this.validateSubscriptionNames)
.setDelayedMessageService(this.delayedMessageService)
.setReplyToStrategy(this.replyToStrategy)
);
this.sessions.add(session);
return session;
Expand Down Expand Up @@ -564,4 +574,16 @@ public ConnectionConsumer createSharedDurableConnectionConsumer(Topic topic,
int maxMessages) {
throw new UnsupportedOperationException();
}

/**
* Gets the reply to strategy that should be followed if as reply to is
* found on a received message.
*
* @return The reply to strategy.
*
* @since 2.9.0
*/
public ReplyToStrategy getReplyToStrategy() {
return replyToStrategy;
}
}
20 changes: 10 additions & 10 deletions src/main/java/com/rabbitmq/jms/client/RMQMessage.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,6 @@ public abstract class RMQMessage implements Message, Cloneable {
/** Logger shared with derived classes */
protected final Logger logger = LoggerFactory.getLogger(RMQMessage.class);

private static final String DIRECT_REPLY_TO = "amq.rabbitmq.reply-to";

static final String JMS_TYPE_HEADER = "JMSType";

static final String TEXT_MESSAGE_HEADER_VALUE = "TextMessage";
Expand Down Expand Up @@ -834,11 +832,11 @@ static RMQMessage convertMessage(RMQSession session, RMQDestination dest, GetRes
if (dest.isAmqp()) {
return convertAmqpMessage(session, dest, response, receivingContextConsumer);
} else {
return convertJmsMessage(session, response, receivingContextConsumer);
return convertJmsMessage(session, dest, response, receivingContextConsumer);
}
}

static RMQMessage convertJmsMessage(RMQSession session, GetResponse response, ReceivingContextConsumer receivingContextConsumer) throws JMSException {
static RMQMessage convertJmsMessage(RMQSession session, RMQDestination dest, GetResponse response, ReceivingContextConsumer receivingContextConsumer) throws JMSException {
// Deserialize the message payload from the byte[] body
RMQMessage message = fromMessage(response.getBody(), session.getTrustedPackages());

Expand All @@ -849,7 +847,7 @@ static RMQMessage convertJmsMessage(RMQSession session, GetResponse response, Re
// JMSProperties already set
message.setReadonly(true); // Set readOnly - mandatory for received messages

maybeSetupDirectReplyTo(message, response.getProps().getReplyTo());
maybeSetupDirectReplyTo(session, dest, message, response.getProps().getReplyTo());
receivingContextConsumer.accept(new ReceivingContext(message));

return message;
Expand All @@ -869,7 +867,7 @@ private static RMQMessage convertAmqpMessage(RMQSession session, RMQDestination
message.setJMSPropertiesFromAmqpProperties(props);
message.setReadonly(true); // Set readOnly - mandatory for received messages

maybeSetupDirectReplyTo(message, response.getProps().getReplyTo());
maybeSetupDirectReplyTo(session, dest, message, response.getProps().getReplyTo());
receivingContextConsumer.accept(new ReceivingContext(message));

return message;
Expand Down Expand Up @@ -912,10 +910,12 @@ private static RMQMessage handleJmsRedeliveredAndDeliveryCount(GetResponse respo
* @throws JMSException
* @since 1.11.0
*/
private static void maybeSetupDirectReplyTo(RMQMessage message, String replyTo) throws JMSException {
if (replyTo != null && replyTo.startsWith(DIRECT_REPLY_TO)) {
RMQDestination replyToDestination = new RMQDestination(DIRECT_REPLY_TO, "", replyTo, replyTo);
message.setJMSReplyTo(replyToDestination);
private static void maybeSetupDirectReplyTo(RMQSession session, RMQDestination dest, RMQMessage message, String replyTo) throws JMSException {
if (session.getReplyToStrategy() != null) {
session.getReplyToStrategy().handleReplyTo(dest, message, replyTo);
} else {
// Ensure that we alway apply the default strategy if one is missing:
DefaultReplyToStrategy.INSTANCE.handleReplyTo(dest, message, replyTo);
}
}

Expand Down
25 changes: 25 additions & 0 deletions src/main/java/com/rabbitmq/jms/client/RMQSession.java
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,15 @@ public class RMQSession implements Session, QueueSession, TopicSession {

private final DelayedMessageService delayedMessageService;

/**
* The reply to strategy to use when dealing with received messages
* with a reply to specified.
*
* @since 2.9.0
*/
private final ReplyToStrategy replyToStrategy;


static boolean validateSessionMode(int sessionMode) {
return sessionMode >= 0 && sessionMode <= CLIENT_INDIVIDUAL_ACKNOWLEDGE;
}
Expand Down Expand Up @@ -266,6 +275,8 @@ public RMQSession(SessionParams sessionParams) throws JMSException {
}
this.delayedMessageService = sessionParams.getDelayedMessageService();

this.replyToStrategy = sessionParams.getReplyToStrategy();

if (transacted) {
this.acknowledgeMode = Session.SESSION_TRANSACTED;
this.isIndividualAck = false;
Expand Down Expand Up @@ -310,6 +321,7 @@ public RMQSession(RMQConnection connection, boolean transacted, int onMessageTim
.setMode(mode)
.setSubscriptions(subscriptions)
.setDelayedMessageService(delayedMessageService)
.setReplyToStrategy(connection.getReplyToStrategy())
);
}

Expand Down Expand Up @@ -1468,4 +1480,17 @@ private interface SubscriptionNameValidator {
void validate(String name) throws JMSException;

}

/**
* Gets the reply to strategy that should be followed if as reply to is
* found on a received message.
*
* @return The reply to strategy.
*
* @since 2.9.0
*/
public ReplyToStrategy getReplyToStrategy() {
return replyToStrategy;
}

}
35 changes: 35 additions & 0 deletions src/main/java/com/rabbitmq/jms/client/ReplyToStrategy.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at https://mozilla.org/MPL/2.0/.
//
// Copyright (c) 2016-2022 VMware, Inc. or its affiliates. All rights reserved.
package com.rabbitmq.jms.client;

import java.io.Serializable;

import javax.jms.JMSException;

import com.rabbitmq.jms.admin.RMQDestination;

/**
* Interface to provide a plugable mechanism for dealing with messages
* received with a reply to queue specified.
* <b>
* Implementations of this interface should update the message's JMSReplyTo
* property directly.
*
* @since 2.9.0
*/
public interface ReplyToStrategy extends Serializable {

public static final String DIRECT_REPLY_TO = "amq.rabbitmq.reply-to";

/**
* Handles the reply to on a received message.
*
* @param message The RMQMessage that has been received.
* @param replyTo The reply to queue value received.
* @throws JMSException if there's an issue updating the RMQMessage
*/
void handleReplyTo(RMQDestination dest, RMQMessage message, String replyTo) throws JMSException;
}
Loading