Skip to content

Commit

Permalink
ARTEMIS-4185 Resending compressed message uncompressed throws excepti…
Browse files Browse the repository at this point in the history
…on in consumer
  • Loading branch information
AntonRoskvist authored and clebertsuconic committed Jun 14, 2023
1 parent ef3a91b commit 0f49829
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -443,6 +443,10 @@ private void largeMessageSendStreamed(final boolean sendBlocking,
deflaterReader = new DeflaterReader(inputStreamParameter, messageSize);
deflaterReader.setLevel(session.getCompressionLevel());
input = deflaterReader;
} else if (msgI.getBooleanProperty(Message.HDR_LARGE_COMPRESSED)) {
//This needs to be false if we do not intend to compress the message
//and the header already exists
msgI.putBooleanProperty(Message.HDR_LARGE_COMPRESSED, false);
}

long totalSize = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import io.netty.util.internal.PlatformDependent;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
Expand Down Expand Up @@ -513,6 +514,59 @@ public void testLargeMessageCompressionRestartAndCheckSize() throws Exception {
validateNoFilesOnLargeDir();
}

@Test
public void testPreviouslyCompressedMessageCleanup() throws Exception {
final int messageSize = 1024 * 1024;

ActiveMQServer server = createServer(true, isNetty());
server.start();

ClientSessionFactory sf1 = createSessionFactory(locator);
ClientSession session1 = addClientSession(sf1.createSession(false, true, true));
session1.createQueue(new QueueConfiguration(ADDRESS).setRoutingType(RoutingType.ANYCAST));
ClientProducer producer = session1.createProducer(ADDRESS);

ServerLocator locator2 = ActiveMQClient.createServerLocator("vm://0");
locator2.setCompressLargeMessage(false);
ClientSessionFactory sf2 = locator2.createSessionFactory();
ClientSession session2 = sf2.createSession(false, true, true);
ClientConsumer consumer = session2.createConsumer(ADDRESS);
ClientProducer producer2 = session2.createProducer(ADDRESS);
session2.start();

byte[] payload = new byte[messageSize];
byte[] response = new byte[messageSize];

for (int i = 0; i < payload.length; i++) {
payload[i] = RandomUtil.randomByte();
}

ClientMessage message = session1.createMessage(true);
message.getBodyBuffer().writeBytes(payload);
producer.send(message);

message = consumer.receive();
assertTrue(message.getBooleanProperty(Message.HDR_LARGE_COMPRESSED));

message.getBodyBuffer().readBytes(response);
message.getBodyBuffer().writeBytes(response);
producer2.send(message);

message = consumer.receive();
assertFalse(message.getBooleanProperty(Message.HDR_LARGE_COMPRESSED));

message.getBodyBuffer().readBytes(payload);
message.getBodySize();
assertTrue(Arrays.equals(payload, response));

session1.close();
session2.close();
sf1.close();
locator.close();
sf2.close();
locator2.close();
}

@Test
public void testLargeMessageCompressionLevel() throws Exception {

Expand Down

0 comments on commit 0f49829

Please sign in to comment.