Skip to content

Commit

Permalink
ARTEMIS-4924 Do not allow sending messages directly to store-and-forw…
Browse files Browse the repository at this point in the history
…ard queues
  • Loading branch information
howardgao committed Jul 16, 2024
1 parent 615f249 commit 161c9a1
Show file tree
Hide file tree
Showing 5 changed files with 135 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@

public interface ClusterConnection extends ActiveMQComponent, ClusterTopologyListener {

String SN_PREFIX = "sf.";

SimpleString getName();

String getNodeID();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,6 @@ public final class ClusterConnectionImpl implements ClusterConnection, AfterConn

private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());

private static final String SN_PREFIX = "sf.";
/**
* When getting member on node-up and down we have to remove the name from the transport config
* as the setting we build here doesn't need to consider the name, so use the same name on all
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.QueueConfig;
import org.apache.activemq.artemis.core.server.QueueFactory;
import org.apache.activemq.artemis.core.server.cluster.ClusterConnection;
import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.utils.ExecutorFactory;
Expand Down Expand Up @@ -94,13 +95,22 @@ public Queue createQueueWith(final QueueConfiguration config, PagingManager pagi
PageSubscription pageSubscription = getPageSubscription(config, pagingManager, filter);
if (lastValueKey(config) != null) {
queue = new LastValueQueue(config.setLastValueKey(lastValueKey(config)), filter, pageSubscription != null ? pageSubscription.getPagingStore() : null, pageSubscription, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor(), server, this);
} else if (isSnf(config)) {
queue = new StoreAndForwardQueue(config, filter, pageSubscription != null ? pageSubscription.getPagingStore() : null, pageSubscription, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor(), server, this);
} else {
queue = new QueueImpl(config, filter, pageSubscription != null ? pageSubscription.getPagingStore() : null, pageSubscription, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor(), server, this);
}
server.getCriticalAnalyzer().add(queue);
return queue;
}

private boolean isSnf(QueueConfiguration config) {
if (config.isInternal()) {
return config.getName().toString().startsWith(server.getInternalNamingPrefix() + ClusterConnection.SN_PREFIX);
}
return false;
}

@Deprecated
@Override
public Queue createQueue(final long persistenceID,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.activemq.artemis.core.server.impl;

import java.util.concurrent.ScheduledExecutorService;

import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.filter.Filter;
import org.apache.activemq.artemis.core.paging.PagingStore;
import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.postoffice.PostOffice;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.QueueFactory;
import org.apache.activemq.artemis.core.server.RoutingContext;
import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.utils.actors.ArtemisExecutor;

public class StoreAndForwardQueue extends QueueImpl {

public StoreAndForwardQueue(QueueConfiguration queueConfiguration,
Filter filter,
PagingStore pagingStore,
PageSubscription pageSubscription,
ScheduledExecutorService scheduledExecutor,
PostOffice postOffice,
StorageManager storageManager,
HierarchicalRepository<AddressSettings> addressSettingsRepository,
ArtemisExecutor executor,
ActiveMQServer server,
QueueFactory factory) {
super(queueConfiguration, filter, pagingStore, pageSubscription, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executor, server, factory);
}

@Override
public void route(final Message message, final RoutingContext context) throws Exception {
if (!isValidMessage(message)) {
throw ActiveMQExceptionType.ROUTING_EXCEPTION.createException("Invalid message being routed to store and forward queue");
}
super.route(message, context);
}

private boolean isValidMessage(final Message message) {
for (SimpleString propName : message.getPropertyNames()) {
if (propName.startsWith(Message.HDR_ROUTE_TO_IDS)) {
return true;
}
}
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;

import java.util.ArrayList;
import java.util.UUID;
Expand All @@ -28,6 +29,8 @@
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQRoutingException;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientProducer;
Expand Down Expand Up @@ -305,6 +308,56 @@ public void testClusterBridgeAddRemoteBinding() throws Exception {
stopServers(0, 1);
}

@Test
public void testBadClientSendMessagesToSnFQueue() throws Exception {
setupServer(0, isFileStorage(), isNetty());
setupServer(1, isFileStorage(), isNetty());

setupClusterConnection("cluster0", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 0, 1);

setupClusterConnection("cluster1", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 1, 0);

startServers(0, 1);

setupSessionFactory(0, isNetty());
setupSessionFactory(1, isNetty());

ClientSession session0 = sfs[0].createSession();
ClientSession session1 = sfs[1].createSession();

session0.start();
session1.start();

//sending directly to snf queues
SimpleString nodeId0 = servers[0].getNodeID();
SimpleString nodeId1 = servers[1].getNodeID();
ClusterConnectionImpl cc0 = (ClusterConnectionImpl) servers[0].getClusterManager().getClusterConnection("cluster0");
SimpleString snfQueue0 = cc0.getSfQueueName(nodeId1.toString());
ClusterConnectionImpl cc1 = (ClusterConnectionImpl) servers[1].getClusterManager().getClusterConnection("cluster1");
SimpleString snfQueue1 = cc1.getSfQueueName(nodeId0.toString());

Wait.assertTrue(()->cc0.isStarted());
Wait.assertTrue(()->cc1.isStarted());

try {
ClientProducer badProducer0 = session0.createProducer(snfQueue0);
Message normalMessage = session0.createMessage(false);
badProducer0.send(normalMessage);
fail("Message sent directly to snf queue should be rejected " + snfQueue0);
} catch (ActiveMQRoutingException e) {
// ok
}
try {
ClientProducer badProducer1 = session1.createProducer(snfQueue1);
Message normalMessage = session1.createMessage(false);
badProducer1.send(normalMessage);
fail("Message sent directly to snf queue should be rejected " + snfQueue1);
} catch (Exception e) {
// ok
}

stopServers(0, 1);
}

@Override
@AfterEach
Expand Down

0 comments on commit 161c9a1

Please sign in to comment.