From 25822b534945add4ff3c5ab8538dc2c549d31381 Mon Sep 17 00:00:00 2001 From: zhaoyuhan Date: Mon, 18 Nov 2024 10:02:24 +0800 Subject: [PATCH 1/3] feat dledger support lmq --- .../org/apache/rocketmq/store/CommitLog.java | 69 +--------------- .../rocketmq/store/MessageExtEncoder.java | 6 +- .../store/dledger/DLedgerCommitLog.java | 30 +++++-- .../store/queue/MultiDispatchUtils.java | 81 +++++++++++++++++++ 4 files changed, 111 insertions(+), 75 deletions(-) diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java index 378518d249d..9176dc539ac 100644 --- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java +++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java @@ -21,6 +21,7 @@ import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.LinkedList; import java.util.List; @@ -64,6 +65,7 @@ import org.apache.rocketmq.store.ha.autoswitch.AutoSwitchHAService; import org.apache.rocketmq.store.lock.AdaptiveBackOffSpinLockImpl; import org.apache.rocketmq.store.logfile.MappedFile; +import org.apache.rocketmq.store.queue.MultiDispatchUtils; import org.apache.rocketmq.store.util.LibC; import org.rocksdb.RocksDBException; @@ -624,6 +626,7 @@ public DispatchRequest checkMessageAndReturnSize(java.nio.ByteBuffer byteBuffer, return dispatchRequest; } catch (Exception e) { + e.printStackTrace(); } return new DispatchRequest(-1, false /* success */); @@ -1850,70 +1853,6 @@ class DefaultAppendMessageCallback implements AppendMessageCallback { this.crc32ReservedLength = messageStoreConfig.isEnabledAppendPropCRC() ? CommitLog.CRC32_RESERVED_LEN : 0; } - public AppendMessageResult handlePropertiesForLmqMsg(ByteBuffer preEncodeBuffer, - final MessageExtBrokerInner msgInner) { - if (msgInner.isEncodeCompleted()) { - return null; - } - - try { - LmqDispatch.wrapLmqDispatch(defaultMessageStore, msgInner); - } catch (ConsumeQueueException e) { - if (e.getCause() instanceof RocksDBException) { - log.error("Failed to wrap multi-dispatch", e); - return new AppendMessageResult(AppendMessageStatus.ROCKSDB_ERROR); - } - log.error("Failed to wrap multi-dispatch", e); - return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR); - } - - msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties())); - - final byte[] propertiesData = - msgInner.getPropertiesString() == null ? null : msgInner.getPropertiesString().getBytes(MessageDecoder.CHARSET_UTF8); - - boolean needAppendLastPropertySeparator = enabledAppendPropCRC && propertiesData != null && propertiesData.length > 0 - && propertiesData[propertiesData.length - 1] != MessageDecoder.PROPERTY_SEPARATOR; - - final int propertiesLength = (propertiesData == null ? 0 : propertiesData.length) + (needAppendLastPropertySeparator ? 1 : 0) + crc32ReservedLength; - - if (propertiesLength > Short.MAX_VALUE) { - log.warn("putMessage message properties length too long. length={}", propertiesData.length); - return new AppendMessageResult(AppendMessageStatus.PROPERTIES_SIZE_EXCEEDED); - } - - int msgLenWithoutProperties = preEncodeBuffer.getInt(0); - - int msgLen = msgLenWithoutProperties + 2 + propertiesLength; - - // Exceeds the maximum message - if (msgLen > this.messageStoreConfig.getMaxMessageSize()) { - log.warn("message size exceeded, msg total size: " + msgLen + ", maxMessageSize: " + this.messageStoreConfig.getMaxMessageSize()); - return new AppendMessageResult(AppendMessageStatus.MESSAGE_SIZE_EXCEEDED); - } - - // Back filling total message length - preEncodeBuffer.putInt(0, msgLen); - // Modify position to msgLenWithoutProperties - preEncodeBuffer.position(msgLenWithoutProperties); - - preEncodeBuffer.putShort((short) propertiesLength); - - if (propertiesLength > crc32ReservedLength) { - preEncodeBuffer.put(propertiesData); - } - - if (needAppendLastPropertySeparator) { - preEncodeBuffer.put((byte) MessageDecoder.PROPERTY_SEPARATOR); - } - // 18 CRC32 - preEncodeBuffer.position(preEncodeBuffer.position() + crc32ReservedLength); - - msgInner.setEncodeCompleted(true); - - return null; - } - public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer, final int maxBlank, final MessageExtBrokerInner msgInner, PutMessageContext putMessageContext) { // STORETIMESTAMP + STOREHOSTADDRESS + OFFSET
@@ -1921,7 +1860,7 @@ public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer ByteBuffer preEncodeBuffer = msgInner.getEncodedBuff(); boolean isMultiDispatchMsg = messageStoreConfig.isEnableLmq() && msgInner.needDispatchLMQ(); if (isMultiDispatchMsg) { - AppendMessageResult appendMessageResult = handlePropertiesForLmqMsg(preEncodeBuffer, msgInner); + AppendMessageResult appendMessageResult = MultiDispatchUtils.handlePropertiesForLmqMsg(preEncodeBuffer, msgInner, defaultMessageStore); if (appendMessageResult != null) { return appendMessageResult; } diff --git a/store/src/main/java/org/apache/rocketmq/store/MessageExtEncoder.java b/store/src/main/java/org/apache/rocketmq/store/MessageExtEncoder.java index 7531c96d119..51a225c3a40 100644 --- a/store/src/main/java/org/apache/rocketmq/store/MessageExtEncoder.java +++ b/store/src/main/java/org/apache/rocketmq/store/MessageExtEncoder.java @@ -132,8 +132,8 @@ public PutMessageResult encodeWithoutProperties(MessageExtBrokerInner msgInner) this.byteBuf.writeInt(msgInner.getQueueId()); // 5 FLAG this.byteBuf.writeInt(msgInner.getFlag()); - // 6 QUEUEOFFSET, need update later - this.byteBuf.writeLong(0); + // 6 QUEUEOFFSET + this.byteBuf.writeLong(msgInner.getQueueOffset()); // 7 PHYSICALOFFSET, need update later this.byteBuf.writeLong(0); // 8 SYSFLAG @@ -416,7 +416,7 @@ public void updateEncoderBufferCapacity(int newMaxMessageBodySize) { this.byteBuf.capacity(this.maxMessageSize); } - static class PutMessageThreadLocal { + public static class PutMessageThreadLocal { private final MessageExtEncoder encoder; private final StringBuilder keyBuilder; diff --git a/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java b/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java index e617343f9ad..fea8db33102 100644 --- a/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java +++ b/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java @@ -35,12 +35,14 @@ import org.apache.rocketmq.store.DefaultMessageStore; import org.apache.rocketmq.store.DispatchRequest; import org.apache.rocketmq.store.MessageExtEncoder; +import org.apache.rocketmq.store.MessageExtEncoder.PutMessageThreadLocal; import org.apache.rocketmq.store.PutMessageResult; import org.apache.rocketmq.store.PutMessageStatus; import org.apache.rocketmq.store.SelectMappedBufferResult; import org.apache.rocketmq.store.StoreStatsService; import org.apache.rocketmq.store.config.MessageStoreConfig; import org.apache.rocketmq.store.logfile.MappedFile; +import org.apache.rocketmq.store.queue.MultiDispatchUtils; import org.rocksdb.RocksDBException; import io.openmessaging.storage.dledger.AppendFuture; @@ -566,28 +568,42 @@ public CompletableFuture asyncPutMessage(MessageExtBrokerInner // Back to Results AppendMessageResult appendResult; AppendFuture dledgerFuture; - EncodeResult encodeResult; + PutMessageThreadLocal putMessageThreadLocal = this.getPutMessageThreadLocal().get(); + final boolean isMultiDispatchMsg = this.getMessageStore().getMessageStoreConfig().isEnableLmq() && msg.needDispatchLMQ(); String topicQueueKey = msg.getTopic() + "-" + msg.getQueueId(); topicQueueLock.lock(topicQueueKey); try { defaultMessageStore.assignOffset(msg); - encodeResult = this.messageSerializer.serialize(msg); - if (encodeResult.status != AppendMessageStatus.PUT_OK) { - return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, new AppendMessageResult(encodeResult.status))); + PutMessageResult putMessageResult = putMessageThreadLocal.getEncoder().encode(msg); + if (putMessageResult != null) { + return CompletableFuture.completedFuture(putMessageResult); } + msg.setEncodedBuff(putMessageThreadLocal.getEncoder().getEncoderBuffer()); putMessageLock.lock(); //spin or ReentrantLock ,depending on store config long elapsedTimeInLock; long queueOffset; + ByteBuffer preEncoder = msg.getEncodedBuff(); + if (isMultiDispatchMsg) { + AppendMessageResult appendMessageResult = MultiDispatchUtils.handlePropertiesForLmqMsg( + preEncoder, msg, defaultMessageStore); + if (appendMessageResult != null) { + return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, appendMessageResult)); + } + } + final int msgLen = preEncoder.getInt(0); + preEncoder.position(0); + preEncoder.limit(msgLen); try { beginTimeInDledgerLock = this.defaultMessageStore.getSystemClock().now(); queueOffset = getQueueOffsetByKey(msg, tranType); - encodeResult.setQueueOffsetKey(queueOffset, false); AppendEntryRequest request = new AppendEntryRequest(); request.setGroup(dLedgerConfig.getGroup()); request.setRemoteId(dLedgerServer.getMemberState().getSelfId()); - request.setBody(encodeResult.getData()); + byte[] data = new byte[msgLen]; + preEncoder.get(data); + request.setBody(data); dledgerFuture = (AppendFuture) dLedgerServer.handleAppend(request); if (dledgerFuture.getPos() == -1) { return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.OS_PAGE_CACHE_BUSY, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR))); @@ -599,7 +615,7 @@ public CompletableFuture asyncPutMessage(MessageExtBrokerInner String msgId = MessageDecoder.createMessageId(buffer, msg.getStoreHostBytes(), wroteOffset); elapsedTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginTimeInDledgerLock; - appendResult = new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, encodeResult.getData().length, msgId, System.currentTimeMillis(), queueOffset, elapsedTimeInLock); + appendResult = new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, data.length, msgId, System.currentTimeMillis(), queueOffset, elapsedTimeInLock); } finally { beginTimeInDledgerLock = 0; putMessageLock.unlock(); diff --git a/store/src/main/java/org/apache/rocketmq/store/queue/MultiDispatchUtils.java b/store/src/main/java/org/apache/rocketmq/store/queue/MultiDispatchUtils.java index 44397a2fce1..897046d0bb7 100644 --- a/store/src/main/java/org/apache/rocketmq/store/queue/MultiDispatchUtils.java +++ b/store/src/main/java/org/apache/rocketmq/store/queue/MultiDispatchUtils.java @@ -16,16 +16,30 @@ */ package org.apache.rocketmq.store.queue; +import java.nio.ByteBuffer; import java.util.Map; import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.common.MixAll; +import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.message.MessageConst; +import org.apache.rocketmq.common.message.MessageDecoder; +import org.apache.rocketmq.common.message.MessageExtBrokerInner; import org.apache.rocketmq.common.topic.TopicValidator; +import org.apache.rocketmq.logging.org.slf4j.Logger; +import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; +import org.apache.rocketmq.store.AppendMessageResult; +import org.apache.rocketmq.store.AppendMessageStatus; +import org.apache.rocketmq.store.CommitLog; +import org.apache.rocketmq.store.DefaultMessageStore; import org.apache.rocketmq.store.DispatchRequest; +import org.apache.rocketmq.store.LmqDispatch; import org.apache.rocketmq.store.config.MessageStoreConfig; +import org.apache.rocketmq.store.exception.ConsumeQueueException; +import org.rocksdb.RocksDBException; public class MultiDispatchUtils { + protected static final Logger log = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME); public static String lmqQueueKey(String queueName) { StringBuilder keyBuilder = new StringBuilder(); @@ -58,4 +72,71 @@ public static boolean checkMultiDispatchQueue(MessageStoreConfig messageStoreCon } return true; } + + public static AppendMessageResult handlePropertiesForLmqMsg(ByteBuffer preEncodeBuffer, final MessageExtBrokerInner msgInner, + DefaultMessageStore defaultMessageStore) { + if (msgInner.isEncodeCompleted()) { + return null; + } + + boolean enabledAppendPropCRC = defaultMessageStore.getMessageStoreConfig().isEnabledAppendPropCRC(); + int crc32ReservedLength = defaultMessageStore.getMessageStoreConfig().isEnabledAppendPropCRC() ? CommitLog.CRC32_RESERVED_LEN : 0; + + try { + LmqDispatch.wrapLmqDispatch(defaultMessageStore, msgInner); + } catch (ConsumeQueueException e) { + if (e.getCause() instanceof RocksDBException) { + log.error("Failed to wrap multi-dispatch", e); + return new AppendMessageResult(AppendMessageStatus.ROCKSDB_ERROR); + } + log.error("Failed to wrap multi-dispatch", e); + return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR); + } + + msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties())); + + final byte[] propertiesData = + msgInner.getPropertiesString() == null ? null : msgInner.getPropertiesString().getBytes(MessageDecoder.CHARSET_UTF8); + + boolean needAppendLastPropertySeparator = enabledAppendPropCRC && propertiesData != null && propertiesData.length > 0 + && propertiesData[propertiesData.length - 1] != MessageDecoder.PROPERTY_SEPARATOR; + + final int propertiesLength = (propertiesData == null ? 0 : propertiesData.length) + (needAppendLastPropertySeparator ? 1 : 0) + crc32ReservedLength; + + if (propertiesLength > Short.MAX_VALUE) { + log.warn("putMessage message properties length too long. length={}", propertiesData.length); + return new AppendMessageResult(AppendMessageStatus.PROPERTIES_SIZE_EXCEEDED); + } + + int msgLenWithoutProperties = preEncodeBuffer.getInt(0); + + int msgLen = msgLenWithoutProperties + 2 + propertiesLength; + + // Exceeds the maximum message + if (msgLen > defaultMessageStore.getMessageStoreConfig().getMaxMessageSize()) { + log.warn("message size exceeded, msg total size: " + msgLen + ", maxMessageSize: " + defaultMessageStore.getMessageStoreConfig().getMaxMessageSize()); + return new AppendMessageResult(AppendMessageStatus.MESSAGE_SIZE_EXCEEDED); + } + + // Back filling total message length + preEncodeBuffer.putInt(0, msgLen); + // Modify position to msgLenWithoutProperties + preEncodeBuffer.position(msgLenWithoutProperties); + + preEncodeBuffer.putShort((short) propertiesLength); + + if (propertiesLength > crc32ReservedLength) { + preEncodeBuffer.put(propertiesData); + } + + if (needAppendLastPropertySeparator) { + preEncodeBuffer.put((byte) MessageDecoder.PROPERTY_SEPARATOR); + } + // 18 CRC32 + preEncodeBuffer.position(preEncodeBuffer.position() + crc32ReservedLength); + + msgInner.setEncodeCompleted(true); + + return null; + } } From 09389bf1d4fc7b989994e4c229e09a28ef27c18e Mon Sep 17 00:00:00 2001 From: zhaoyuhan Date: Mon, 18 Nov 2024 10:40:58 +0800 Subject: [PATCH 2/3] add ut --- .../store/dledger/DLedgerCommitLog.java | 11 ++++ .../store/dledger/DLedgerCommitlogTest.java | 50 +++++++++++++++++++ 2 files changed, 61 insertions(+) diff --git a/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java b/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java index fea8db33102..430c295ec72 100644 --- a/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java +++ b/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java @@ -34,6 +34,7 @@ import org.apache.rocketmq.store.CommitLog; import org.apache.rocketmq.store.DefaultMessageStore; import org.apache.rocketmq.store.DispatchRequest; +import org.apache.rocketmq.store.LmqDispatch; import org.apache.rocketmq.store.MessageExtEncoder; import org.apache.rocketmq.store.MessageExtEncoder.PutMessageThreadLocal; import org.apache.rocketmq.store.PutMessageResult; @@ -41,6 +42,7 @@ import org.apache.rocketmq.store.SelectMappedBufferResult; import org.apache.rocketmq.store.StoreStatsService; import org.apache.rocketmq.store.config.MessageStoreConfig; +import org.apache.rocketmq.store.exception.ConsumeQueueException; import org.apache.rocketmq.store.logfile.MappedFile; import org.apache.rocketmq.store.queue.MultiDispatchUtils; import org.rocksdb.RocksDBException; @@ -616,6 +618,15 @@ public CompletableFuture asyncPutMessage(MessageExtBrokerInner String msgId = MessageDecoder.createMessageId(buffer, msg.getStoreHostBytes(), wroteOffset); elapsedTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginTimeInDledgerLock; appendResult = new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, data.length, msgId, System.currentTimeMillis(), queueOffset, elapsedTimeInLock); + if (isMultiDispatchMsg) { + try { + LmqDispatch.updateLmqOffsets(defaultMessageStore, msg); + } catch (ConsumeQueueException e) { + // Increase in-memory max offset of the queue should not fail. + log.error("[BUG] DLedger update lmq offset failed"); + return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR))); + } + } } finally { beginTimeInDledgerLock = 0; putMessageLock.unlock(); diff --git a/store/src/test/java/org/apache/rocketmq/store/dledger/DLedgerCommitlogTest.java b/store/src/test/java/org/apache/rocketmq/store/dledger/DLedgerCommitlogTest.java index 386cb1f6787..fe724849042 100644 --- a/store/src/test/java/org/apache/rocketmq/store/dledger/DLedgerCommitlogTest.java +++ b/store/src/test/java/org/apache/rocketmq/store/dledger/DLedgerCommitlogTest.java @@ -29,6 +29,8 @@ import java.util.concurrent.Callable; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; + +import org.apache.rocketmq.common.message.MessageConst; import org.apache.rocketmq.common.message.MessageDecoder; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageExtBatch; @@ -436,6 +438,54 @@ public void testIPv6HostMsgCommittedPos() throws Exception { followerStore.shutdown(); } + @Test + public void testMultiDispatch() throws Exception { + String base = createBaseDir(); + String peers = String.format("n0-localhost:%d", nextPort()); + String group = UUID.randomUUID().toString(); + DefaultMessageStore messageStore = createDledgerMessageStore(base, group, "n0", peers, null, false, 0); + messageStore.getMessageStoreConfig().setEnableLmq(true); + DLedgerCommitLog dLedgerCommitLog = (DLedgerCommitLog) messageStore.getCommitLog(); + Boolean success = await().atMost(Duration.ofSeconds(4)).until(() -> dLedgerCommitLog.getdLedgerServer().getMemberState().isLeader(), item -> item); + Assert.assertTrue(success); + String topic = UUID.randomUUID().toString(); + + List results = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + MessageExtBrokerInner msgInner = buildMessage(); + msgInner.setTopic(topic); + msgInner.setQueueId(0); + msgInner.putUserProperty(MessageConst.PROPERTY_INNER_MULTI_DISPATCH, "%LMQ%123,%LMQ%456"); + PutMessageResult putMessageResult = messageStore.putMessage(msgInner); + results.add(putMessageResult); + Assert.assertEquals(PutMessageStatus.PUT_OK, putMessageResult.getPutMessageStatus()); + Assert.assertEquals(i, putMessageResult.getAppendMessageResult().getLogicsOffset()); + } + await().atMost(Duration.ofSeconds(10)).until(() -> 10 == messageStore.getMaxOffsetInQueue(topic, 0)); + Assert.assertEquals(0, messageStore.getMinOffsetInQueue(topic, 0)); + Assert.assertEquals(10, messageStore.getMaxOffsetInQueue(topic, 0)); + Assert.assertEquals(0, messageStore.dispatchBehindBytes()); + GetMessageResult getMessageResult = messageStore.getMessage("group", topic, 0, 0, 32, null); + Assert.assertEquals(GetMessageStatus.FOUND, getMessageResult.getStatus()); + Assert.assertEquals(10, getMessageResult.getMessageBufferList().size()); + Assert.assertEquals(10, getMessageResult.getMessageMapedList().size()); + + for (int i = 0; i < results.size(); i++) { + ByteBuffer buffer = getMessageResult.getMessageBufferList().get(i); + MessageExt messageExt = MessageDecoder.decode(buffer); + Assert.assertEquals(i, messageExt.getQueueOffset()); + Assert.assertEquals(results.get(i).getAppendMessageResult().getMsgId(), messageExt.getMsgId()); + Assert.assertEquals(results.get(i).getAppendMessageResult().getWroteOffset(), messageExt.getCommitLogOffset()); + } + + Assert.assertEquals(10, messageStore.getConsumeQueueStore().getLmqQueueOffset("%LMQ%123", 0)); + Assert.assertEquals(10, messageStore.getConsumeQueueStore().getLmqQueueOffset("%LMQ%456", 0)); + + messageStore.destroy(); + messageStore.shutdown(); + } + + private Callable followerCatchesUp(DefaultMessageStore followerStore, String topic) { return () -> followerStore.getMaxOffsetInQueue(topic, 0) == 1; } From 1efbd7f0fa4ac185bd78d885893dd8837a31344e Mon Sep 17 00:00:00 2001 From: zhaoyuhan Date: Mon, 18 Nov 2024 11:12:32 +0800 Subject: [PATCH 3/3] fix import --- store/src/main/java/org/apache/rocketmq/store/CommitLog.java | 1 - 1 file changed, 1 deletion(-) diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java index 9176dc539ac..889c14993c1 100644 --- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java +++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java @@ -21,7 +21,6 @@ import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collections; import java.util.LinkedList; import java.util.List;