Skip to content

Commit

Permalink
ARTEMIS-2922 artemis-cli consumer on large message results in a Class…
Browse files Browse the repository at this point in the history
…CastException

(cherry picked from commit f1004c8)
  • Loading branch information
jbertram authored and clebertsuconic committed Jan 24, 2022
1 parent d2f5ef7 commit 1b6ce30
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.activemq.artemis.api.core.ICoreMessage;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.client.impl.ClientLargeMessageImpl;
import org.apache.activemq.artemis.core.message.LargeBodyReader;
import org.apache.activemq.artemis.core.server.LargeServerMessage;
import org.apache.activemq.artemis.reader.TextMessageUtil;
Expand Down Expand Up @@ -58,7 +59,7 @@ public void printMessageBody(Message message, boolean encodeTextMessageUTF8) thr
xmlWriter.writeStartElement(XmlDataConstants.MESSAGE_BODY);

if (message.isLargeMessage()) {
printLargeMessageBody((LargeServerMessage) message);
printLargeMessageBody(message);
} else {
if (encodeTextMessageUTF8 && message.toCore().getType() == Message.TEXT_TYPE) {
xmlWriter.writeCData(TextMessageUtil.readBodyText(message.toCore().getReadOnlyBodyBuffer()).toString());
Expand All @@ -78,12 +79,18 @@ private static ByteBuffer acquireHeapBodyBuffer(ByteBuffer chunkBytes, int requi
return chunkBytes;
}

public void printLargeMessageBody(LargeServerMessage message) throws XMLStreamException {
public void printLargeMessageBody(Message message) throws XMLStreamException {
xmlWriter.writeAttribute(XmlDataConstants.MESSAGE_IS_LARGE, Boolean.TRUE.toString());
LargeBodyReader encoder = null;

try {
encoder = message.toMessage().toCore().getLargeBodyReader();
if (message instanceof LargeServerMessage) {
encoder = ((LargeServerMessage)message).toMessage().toCore().getLargeBodyReader();
} else if (message instanceof ClientLargeMessageImpl) {
encoder = ((ClientLargeMessageImpl)message).getLargeBodyReader();
} else {
throw new RuntimeException("Unrecognized message implementation: " + message.getClass().getName());
}
encoder.open();
long totalBytesWritten = 0;
int bufferSize;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.util.UUID;

import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
import org.apache.activemq.artemis.cli.commands.address.CreateAddress;
import org.apache.activemq.artemis.cli.commands.messages.Consumer;
import org.apache.activemq.artemis.cli.commands.messages.Producer;
Expand Down Expand Up @@ -93,6 +94,17 @@ private List<Message> generateTextMessages(Session session, Destination destinat
return messages;
}

private List<Message> generateLargeTextMessages(Session session, Destination destination) throws Exception {
List<Message> messages = new ArrayList<>(TEST_MESSAGE_COUNT);
for (int i = 0; i < TEST_MESSAGE_COUNT; i++) {
messages.add(session.createTextMessage(new String(RandomUtil.randomBytes(ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE * 2))));
}

sendMessages(session, destination, messages);

return messages;
}

private void checkSentMessages(Session session, List<Message> messages, String address) throws Exception {
checkSentMessages(session, messages, address, null);
}
Expand Down Expand Up @@ -188,6 +200,20 @@ public void testTextMessageImportExport() throws Exception {
checkSentMessages(session, sent, address);
}

@Test
public void testLargeMessageExport() throws Exception {
String address = "test";
File file = createMessageFile();

Session session = createSession(connection);

generateLargeTextMessages(session, getDestination(address));

exportMessages(address, file);

Wait.assertTrue(() -> verifyMessageCount(address, 0), 2000, 100);
}

@Test
public void testObjectMessageImportExport() throws Exception {
String address = "test";
Expand Down

0 comments on commit 1b6ce30

Please sign in to comment.