From 0f4982913fff32516a1eb50aae71b05e19debe99 Mon Sep 17 00:00:00 2001 From: a181321 Date: Mon, 27 Feb 2023 21:06:24 +0100 Subject: [PATCH] ARTEMIS-4185 Resending compressed message uncompressed throws exception in consumer --- .../core/client/impl/ClientProducerImpl.java | 4 ++ .../client/LargeMessageCompressTest.java | 54 +++++++++++++++++++ 2 files changed, 58 insertions(+) diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerImpl.java index f3b01a7eca7..c0a238bf2ac 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerImpl.java @@ -443,6 +443,10 @@ private void largeMessageSendStreamed(final boolean sendBlocking, deflaterReader = new DeflaterReader(inputStreamParameter, messageSize); deflaterReader.setLevel(session.getCompressionLevel()); input = deflaterReader; + } else if (msgI.getBooleanProperty(Message.HDR_LARGE_COMPRESSED)) { + //This needs to be false if we do not intend to compress the message + //and the header already exists + msgI.putBooleanProperty(Message.HDR_LARGE_COMPRESSED, false); } long totalSize = 0; diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageCompressTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageCompressTest.java index 2d48f8639ae..a3271e739dd 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageCompressTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageCompressTest.java @@ -31,6 +31,7 @@ import io.netty.util.internal.PlatformDependent; import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.QueueConfiguration; +import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.client.ActiveMQClient; import org.apache.activemq.artemis.api.core.client.ClientConsumer; @@ -513,6 +514,59 @@ public void testLargeMessageCompressionRestartAndCheckSize() throws Exception { validateNoFilesOnLargeDir(); } + @Test + public void testPreviouslyCompressedMessageCleanup() throws Exception { + final int messageSize = 1024 * 1024; + + ActiveMQServer server = createServer(true, isNetty()); + server.start(); + + ClientSessionFactory sf1 = createSessionFactory(locator); + ClientSession session1 = addClientSession(sf1.createSession(false, true, true)); + session1.createQueue(new QueueConfiguration(ADDRESS).setRoutingType(RoutingType.ANYCAST)); + ClientProducer producer = session1.createProducer(ADDRESS); + + ServerLocator locator2 = ActiveMQClient.createServerLocator("vm://0"); + locator2.setCompressLargeMessage(false); + ClientSessionFactory sf2 = locator2.createSessionFactory(); + ClientSession session2 = sf2.createSession(false, true, true); + ClientConsumer consumer = session2.createConsumer(ADDRESS); + ClientProducer producer2 = session2.createProducer(ADDRESS); + session2.start(); + + byte[] payload = new byte[messageSize]; + byte[] response = new byte[messageSize]; + + for (int i = 0; i < payload.length; i++) { + payload[i] = RandomUtil.randomByte(); + } + + ClientMessage message = session1.createMessage(true); + message.getBodyBuffer().writeBytes(payload); + producer.send(message); + + message = consumer.receive(); + assertTrue(message.getBooleanProperty(Message.HDR_LARGE_COMPRESSED)); + + message.getBodyBuffer().readBytes(response); + message.getBodyBuffer().writeBytes(response); + producer2.send(message); + + message = consumer.receive(); + assertFalse(message.getBooleanProperty(Message.HDR_LARGE_COMPRESSED)); + + message.getBodyBuffer().readBytes(payload); + message.getBodySize(); + assertTrue(Arrays.equals(payload, response)); + + session1.close(); + session2.close(); + sf1.close(); + locator.close(); + sf2.close(); + locator2.close(); + } + @Test public void testLargeMessageCompressionLevel() throws Exception {