From bdbf7483ccf5b1eab7003e98eba327651016eec8 Mon Sep 17 00:00:00 2001 From: Rodion Lim Date: Thu, 22 Aug 2024 18:15:04 +0800 Subject: [PATCH] Drop messages that exceeds local message size limit Signed-off-by: Rodion Lim --- CHANGELOG.md | 2 +- .../besu/services/BesuEventsImplTest.java | 3 +- .../besu/ethereum/eth/manager/EthPeer.java | 3 +- .../eth/manager/EthProtocolManager.java | 3 +- .../eth/messages/NewBlockMessage.java | 13 ++++- .../ethereum/eth/sync/BlockBroadcaster.java | 12 ++++- .../eth/messages/NewBlockMessageTest.java | 15 +++++- .../AbstractBlockPropagationManagerTest.java | 51 ++++++++++++++----- .../eth/sync/BlockBroadcasterTest.java | 11 ++-- 9 files changed, 86 insertions(+), 27 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index b9fccf29c48..94346a52835 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -24,7 +24,7 @@ - Fix tracing in precompiled contracts when halting for out of gas [#7318](https://github.com/hyperledger/besu/issues/7318) - Correctly release txpool save and restore lock in case of exceptions [#7473](https://github.com/hyperledger/besu/pull/7473) - Fix for `eth_gasPrice` could not retrieve block error [#7482](https://github.com/hyperledger/besu/pull/7482) - +- Correctly drops messages that exceeds local message size limit [#5455](https://github.com/hyperledger/besu/pull/7507) ## 24.8.0 diff --git a/besu/src/test/java/org/hyperledger/besu/services/BesuEventsImplTest.java b/besu/src/test/java/org/hyperledger/besu/services/BesuEventsImplTest.java index 3673e78e312..478ce4ebc42 100644 --- a/besu/src/test/java/org/hyperledger/besu/services/BesuEventsImplTest.java +++ b/besu/src/test/java/org/hyperledger/besu/services/BesuEventsImplTest.java @@ -68,6 +68,7 @@ import org.hyperledger.besu.services.kvstore.InMemoryKeyValueStorage; import org.hyperledger.besu.testutil.DeterministicEthScheduler; import org.hyperledger.besu.testutil.TestClock; +import org.hyperledger.besu.util.number.ByteUnits; import java.math.BigInteger; import java.time.ZoneId; @@ -154,7 +155,7 @@ public void setUp() { .when(mockWorldStateArchive.getMutable(any(), anyBoolean())) .thenReturn(Optional.of(mockWorldState)); - blockBroadcaster = new BlockBroadcaster(mockEthContext); + blockBroadcaster = new BlockBroadcaster(mockEthContext, 10 * ByteUnits.MEGABYTE); syncState = new SyncState(blockchain, mockEthPeers); TransactionPoolConfiguration txPoolConfig = ImmutableTransactionPoolConfiguration.builder() diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthPeer.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthPeer.java index 755283b0809..899e7027243 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthPeer.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthPeer.java @@ -291,12 +291,13 @@ public RequestManager.ResponseStream send( if (messageData.getSize() > maxMessageSize) { // This is a bug or else a misconfiguration of the max message size. LOG.error( - "Sending {} message to peer ({}) which exceeds local message size limit of {} bytes. Message code: {}, Message Size: {}", + "Dropping {} message to peer ({}) which exceeds local message size limit of {} bytes. Message code: {}, Message Size: {}", protocolName, this, maxMessageSize, messageData.getCode(), messageData.getSize()); + return null; } if (requestManagers.containsKey(protocolName)) { diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthProtocolManager.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthProtocolManager.java index 5532bed865b..58318f9611e 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthProtocolManager.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthProtocolManager.java @@ -107,7 +107,8 @@ public EthProtocolManager( this.ethMessages = ethMessages; this.ethContext = ethContext; - this.blockBroadcaster = new BlockBroadcaster(ethContext); + this.blockBroadcaster = + new BlockBroadcaster(ethContext, ethereumWireProtocolConfiguration.getMaxMessageSize()); this.supportedCapabilities = calculateCapabilities(synchronizerConfiguration, ethereumWireProtocolConfiguration); diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/messages/NewBlockMessage.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/messages/NewBlockMessage.java index 9227e574136..d198cacb36c 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/messages/NewBlockMessage.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/messages/NewBlockMessage.java @@ -44,11 +44,20 @@ public int getCode() { return MESSAGE_CODE; } - public static NewBlockMessage create(final Block block, final Difficulty totalDifficulty) { + public static NewBlockMessage create( + final Block block, final Difficulty totalDifficulty, final int maxMessageSize) + throws IllegalArgumentException { final NewBlockMessageData msgData = new NewBlockMessageData(block, totalDifficulty); final BytesValueRLPOutput out = new BytesValueRLPOutput(); msgData.writeTo(out); - return new NewBlockMessage(out.encoded()); + final Bytes data = out.encoded(); + if (data.size() > maxMessageSize) { + throw new IllegalArgumentException( + String.format( + "Block message size %d bytes is larger than allowed message size %d bytes", + data.size(), maxMessageSize)); + } + return new NewBlockMessage(data); } public static NewBlockMessage readFrom(final MessageData message) { diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/BlockBroadcaster.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/BlockBroadcaster.java index 1bf55b01f81..b81c0448324 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/BlockBroadcaster.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/BlockBroadcaster.java @@ -28,11 +28,13 @@ public class BlockBroadcaster { private static final Logger LOG = LoggerFactory.getLogger(BlockBroadcaster.class); private final EthContext ethContext; + private final int maxMessageSize; private final Subscribers blockPropagatedSubscribers = Subscribers.create(); - public BlockBroadcaster(final EthContext ethContext) { + public BlockBroadcaster(final EthContext ethContext, final int maxMessageSize) { this.ethContext = ethContext; + this.maxMessageSize = maxMessageSize; } public long subscribePropagateNewBlocks(final BlockPropagatedSubscriber callback) { @@ -45,7 +47,13 @@ public void unsubscribePropagateNewBlocks(final long id) { public void propagate(final Block block, final Difficulty totalDifficulty) { blockPropagatedSubscribers.forEach(listener -> listener.accept(block, totalDifficulty)); - final NewBlockMessage newBlockMessage = NewBlockMessage.create(block, totalDifficulty); + final NewBlockMessage newBlockMessage; + try { + newBlockMessage = NewBlockMessage.create(block, totalDifficulty, this.maxMessageSize); + } catch (final IllegalArgumentException e) { + LOG.error("Failed to create block", e); + return; + } ethContext .getEthPeers() .streamAvailablePeers() diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/messages/NewBlockMessageTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/messages/NewBlockMessageTest.java index 2d7427bc2a5..4d30e9e93ae 100644 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/messages/NewBlockMessageTest.java +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/messages/NewBlockMessageTest.java @@ -24,12 +24,14 @@ import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule; import org.hyperledger.besu.ethereum.p2p.rlpx.wire.RawMessage; import org.hyperledger.besu.ethereum.rlp.BytesValueRLPOutput; +import org.hyperledger.besu.util.number.ByteUnits; import org.apache.tuweni.bytes.Bytes; import org.junit.jupiter.api.Test; public class NewBlockMessageTest { private static final ProtocolSchedule protocolSchedule = ProtocolScheduleFixture.MAINNET; + private static final int maxMessageSize = 10 * ByteUnits.MEGABYTE; @Test public void roundTripNewBlockMessage() { @@ -37,7 +39,8 @@ public void roundTripNewBlockMessage() { final BlockDataGenerator blockGenerator = new BlockDataGenerator(); final Block blockForInsertion = blockGenerator.block(); - final NewBlockMessage msg = NewBlockMessage.create(blockForInsertion, totalDifficulty); + final NewBlockMessage msg = + NewBlockMessage.create(blockForInsertion, totalDifficulty, maxMessageSize); assertThat(msg.getCode()).isEqualTo(EthPV62.NEW_BLOCK); assertThat(msg.totalDifficulty(protocolSchedule)).isEqualTo(totalDifficulty); final Block extractedBlock = msg.block(protocolSchedule); @@ -73,4 +76,14 @@ public void readFromMessageWithWrongCodeThrows() { assertThatExceptionOfType(IllegalArgumentException.class) .isThrownBy(() -> NewBlockMessage.readFrom(rawMsg)); } + + @Test + public void createBlockMessageLargerThanLimitThrows() { + final Difficulty totalDifficulty = Difficulty.of(98765); + final BlockDataGenerator blockGenerator = new BlockDataGenerator(); + final Block newBlock = blockGenerator.block(); + + assertThatExceptionOfType(IllegalArgumentException.class) + .isThrownBy(() -> NewBlockMessage.create(newBlock, totalDifficulty, 1)); + } } diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/AbstractBlockPropagationManagerTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/AbstractBlockPropagationManagerTest.java index dcc0239f616..c576b7f8e26 100644 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/AbstractBlockPropagationManagerTest.java +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/AbstractBlockPropagationManagerTest.java @@ -63,6 +63,7 @@ import org.hyperledger.besu.plugin.services.MetricsSystem; import org.hyperledger.besu.plugin.services.storage.DataStorageFormat; import org.hyperledger.besu.testutil.TestClock; +import org.hyperledger.besu.util.number.ByteUnits; import java.util.Collections; import java.util.List; @@ -95,6 +96,7 @@ public abstract class AbstractBlockPropagationManagerTest { protected SyncState syncState; protected final MetricsSystem metricsSystem = new NoOpMetricsSystem(); private final Hash finalizedHash = Hash.fromHexStringLenient("0x1337"); + private final int maxMessageSize = 10 * ByteUnits.MEGABYTE; protected void setup(final DataStorageFormat dataStorageFormat) { blockchainUtil = BlockchainSetupUtil.forTesting(dataStorageFormat); @@ -222,11 +224,14 @@ public void importsAnnouncedNewBlocks_aheadOfChainInOrder() { final RespondingEthPeer peer = EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 0); final NewBlockMessage nextAnnouncement = NewBlockMessage.create( - nextBlock, getFullBlockchain().getTotalDifficultyByHash(nextBlock.getHash()).get()); + nextBlock, + getFullBlockchain().getTotalDifficultyByHash(nextBlock.getHash()).get(), + maxMessageSize); final NewBlockMessage nextNextAnnouncement = NewBlockMessage.create( nextNextBlock, - getFullBlockchain().getTotalDifficultyByHash(nextNextBlock.getHash()).get()); + getFullBlockchain().getTotalDifficultyByHash(nextNextBlock.getHash()).get(), + maxMessageSize); final Responder responder = RespondingEthPeer.blockchainResponder(getFullBlockchain()); // Broadcast first message @@ -256,11 +261,14 @@ public void importsAnnouncedNewBlocks_aheadOfChainOutOfOrder() { final RespondingEthPeer peer = EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 0); final NewBlockMessage nextAnnouncement = NewBlockMessage.create( - nextBlock, getFullBlockchain().getTotalDifficultyByHash(nextBlock.getHash()).get()); + nextBlock, + getFullBlockchain().getTotalDifficultyByHash(nextBlock.getHash()).get(), + maxMessageSize); final NewBlockMessage nextNextAnnouncement = NewBlockMessage.create( nextNextBlock, - getFullBlockchain().getTotalDifficultyByHash(nextNextBlock.getHash()).get()); + getFullBlockchain().getTotalDifficultyByHash(nextNextBlock.getHash()).get(), + maxMessageSize); final Responder responder = RespondingEthPeer.blockchainResponder(getFullBlockchain()); // Broadcast second message first @@ -299,7 +307,9 @@ public void importsMixedOutOfOrderMessages() { block1.getHash(), block1.getHeader().getNumber()))); final NewBlockMessage block2Msg = NewBlockMessage.create( - block2, getFullBlockchain().getTotalDifficultyByHash(block2.getHash()).get()); + block2, + getFullBlockchain().getTotalDifficultyByHash(block2.getHash()).get(), + maxMessageSize); final NewBlockHashesMessage block3Msg = NewBlockHashesMessage.create( Collections.singletonList( @@ -307,7 +317,9 @@ public void importsMixedOutOfOrderMessages() { block3.getHash(), block3.getHeader().getNumber()))); final NewBlockMessage block4Msg = NewBlockMessage.create( - block4, getFullBlockchain().getTotalDifficultyByHash(block4.getHash()).get()); + block4, + getFullBlockchain().getTotalDifficultyByHash(block4.getHash()).get(), + maxMessageSize); final Responder responder = RespondingEthPeer.blockchainResponder(getFullBlockchain()); // Broadcast older blocks @@ -362,7 +374,9 @@ public void handlesDuplicateAnnouncements() { nextBlock.getHash(), nextBlock.getHeader().getNumber()))); final NewBlockMessage newBlock = NewBlockMessage.create( - nextBlock, getFullBlockchain().getTotalDifficultyByHash(nextBlock.getHash()).get()); + nextBlock, + getFullBlockchain().getTotalDifficultyByHash(nextBlock.getHash()).get(), + maxMessageSize); final Responder responder = RespondingEthPeer.blockchainResponder(getFullBlockchain()); // Broadcast first message @@ -413,7 +427,9 @@ public void handlesPendingDuplicateAnnouncements() { nextBlock.getHash(), nextBlock.getHeader().getNumber()))); final NewBlockMessage newBlock = NewBlockMessage.create( - nextBlock, getFullBlockchain().getTotalDifficultyByHash(nextBlock.getHash()).get()); + nextBlock, + getFullBlockchain().getTotalDifficultyByHash(nextBlock.getHash()).get(), + maxMessageSize); // Broadcast messages EthProtocolManagerTestUtil.broadcastMessage(ethProtocolManager, peer, newBlock); @@ -467,7 +483,9 @@ public void ignoresFutureNewBlockAnnouncement() { final RespondingEthPeer peer = EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 0); final NewBlockMessage futureAnnouncement = NewBlockMessage.create( - futureBlock, getFullBlockchain().getTotalDifficultyByHash(futureBlock.getHash()).get()); + futureBlock, + getFullBlockchain().getTotalDifficultyByHash(futureBlock.getHash()).get(), + maxMessageSize); // Broadcast EthProtocolManagerTestUtil.broadcastMessage(ethProtocolManager, peer, futureAnnouncement); @@ -522,7 +540,8 @@ public void ignoresOldNewBlockAnnouncement() { // Setup peer and messages final RespondingEthPeer peer = EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 0); - final NewBlockMessage oldAnnouncement = NewBlockMessage.create(oldBlock, Difficulty.ZERO); + final NewBlockMessage oldAnnouncement = + NewBlockMessage.create(oldBlock, Difficulty.ZERO, maxMessageSize); // Broadcast EthProtocolManagerTestUtil.broadcastMessage(ethProtocolManager, peer, oldAnnouncement); @@ -559,7 +578,7 @@ public void purgesOldBlocks() { blockPropagationManager.start(); final RespondingEthPeer peer = EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 0); final NewBlockMessage blockAnnouncementMsg = - NewBlockMessage.create(blockToPurge, Difficulty.ZERO); + NewBlockMessage.create(blockToPurge, Difficulty.ZERO, maxMessageSize); // Broadcast EthProtocolManagerTestUtil.broadcastMessage(ethProtocolManager, peer, blockAnnouncementMsg); @@ -597,7 +616,8 @@ public void updatesChainHeadWhenNewBlockMessageReceived() { getFullBlockchain().getTotalDifficultyByHash(nextBlock.getHeader().getParentHash()).get(); final Difficulty totalDifficulty = getFullBlockchain().getTotalDifficultyByHash(nextBlock.getHash()).get(); - final NewBlockMessage nextAnnouncement = NewBlockMessage.create(nextBlock, totalDifficulty); + final NewBlockMessage nextAnnouncement = + NewBlockMessage.create(nextBlock, totalDifficulty, maxMessageSize); // Broadcast message EthProtocolManagerTestUtil.broadcastMessage(ethProtocolManager, peer, nextAnnouncement); @@ -735,7 +755,8 @@ public void verifyBroadcastBlockInvocation() { final Difficulty totalDifficulty = getFullBlockchain().getTotalDifficultyByHash(block.getHash()).get(); - final NewBlockMessage newBlockMessage = NewBlockMessage.create(block, totalDifficulty); + final NewBlockMessage newBlockMessage = + NewBlockMessage.create(block, totalDifficulty, maxMessageSize); // Broadcast message EthProtocolManagerTestUtil.broadcastMessage(ethProtocolManager, peer, newBlockMessage); @@ -933,7 +954,9 @@ public void shouldNotListenToNewBlockAnnouncementsWhenTTDReachedAndFinal() { final RespondingEthPeer peer = EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 0); final NewBlockMessage nextAnnouncement = NewBlockMessage.create( - nextBlock, getFullBlockchain().getTotalDifficultyByHash(nextBlock.getHash()).get()); + nextBlock, + getFullBlockchain().getTotalDifficultyByHash(nextBlock.getHash()).get(), + maxMessageSize); final Responder responder = RespondingEthPeer.blockchainResponder(getFullBlockchain()); syncState.setReachedTerminalDifficulty(true); diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/BlockBroadcasterTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/BlockBroadcasterTest.java index da8f269b89d..ef1220fd55b 100644 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/BlockBroadcasterTest.java +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/BlockBroadcasterTest.java @@ -30,6 +30,7 @@ import org.hyperledger.besu.ethereum.eth.manager.EthPeers; import org.hyperledger.besu.ethereum.eth.messages.NewBlockMessage; import org.hyperledger.besu.ethereum.p2p.rlpx.connections.PeerConnection; +import org.hyperledger.besu.util.number.ByteUnits; import java.util.Collections; import java.util.stream.Stream; @@ -38,6 +39,8 @@ public class BlockBroadcasterTest { + final int maxMessageSize = 10 * ByteUnits.MEGABYTE; + @Test public void blockPropagationUnitTest() throws PeerConnection.PeerNotConnected { final EthPeer ethPeer = mock(EthPeer.class); @@ -47,10 +50,10 @@ public void blockPropagationUnitTest() throws PeerConnection.PeerNotConnected { final EthContext ethContext = mock(EthContext.class); when(ethContext.getEthPeers()).thenReturn(ethPeers); - final BlockBroadcaster blockBroadcaster = new BlockBroadcaster(ethContext); + final BlockBroadcaster blockBroadcaster = new BlockBroadcaster(ethContext, maxMessageSize); final Block block = generateBlock(); final NewBlockMessage newBlockMessage = - NewBlockMessage.create(block, block.getHeader().getDifficulty()); + NewBlockMessage.create(block, block.getHeader().getDifficulty(), maxMessageSize); blockBroadcaster.propagate(block, Difficulty.ZERO); @@ -70,10 +73,10 @@ public void blockPropagationUnitTestSeenUnseen() throws PeerConnection.PeerNotCo final EthContext ethContext = mock(EthContext.class); when(ethContext.getEthPeers()).thenReturn(ethPeers); - final BlockBroadcaster blockBroadcaster = new BlockBroadcaster(ethContext); + final BlockBroadcaster blockBroadcaster = new BlockBroadcaster(ethContext, maxMessageSize); final Block block = generateBlock(); final NewBlockMessage newBlockMessage = - NewBlockMessage.create(block, block.getHeader().getDifficulty()); + NewBlockMessage.create(block, block.getHeader().getDifficulty(), maxMessageSize); blockBroadcaster.propagate(block, Difficulty.ZERO);