Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Drop messages that exceeds local message size limit #7507

Merged
merged 7 commits into from
Aug 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@
- Fix tracing in precompiled contracts when halting for out of gas [#7318](https://github.com/hyperledger/besu/issues/7318)
- Correctly release txpool save and restore lock in case of exceptions [#7473](https://github.com/hyperledger/besu/pull/7473)
- Fix for `eth_gasPrice` could not retrieve block error [#7482](https://github.com/hyperledger/besu/pull/7482)
- Correctly drops messages that exceeds local message size limit [#5455](https://github.com/hyperledger/besu/pull/7507)
- **DebugMetrics**: Fixed a `ClassCastException` occurring in `DebugMetrics` when handling nested metric structures. Previously, `Double` values within these structures were incorrectly cast to `Map` objects, leading to errors. This update allows for proper handling of both direct values and nested structures at the same level. Issue# [#7383](https://github.com/hyperledger/besu/pull/7383)
- `evmtool` was not respecting the `--genesis` setting, resulting in unexpected trace results. [#7433](https://github.com/hyperledger/besu/pull/7433)


## 24.8.0

### Upcoming Breaking Changes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
import org.hyperledger.besu.services.kvstore.InMemoryKeyValueStorage;
import org.hyperledger.besu.testutil.DeterministicEthScheduler;
import org.hyperledger.besu.testutil.TestClock;
import org.hyperledger.besu.util.number.ByteUnits;

import java.math.BigInteger;
import java.time.ZoneId;
Expand Down Expand Up @@ -154,7 +155,7 @@ public void setUp() {
.when(mockWorldStateArchive.getMutable(any(), anyBoolean()))
.thenReturn(Optional.of(mockWorldState));

blockBroadcaster = new BlockBroadcaster(mockEthContext);
blockBroadcaster = new BlockBroadcaster(mockEthContext, 10 * ByteUnits.MEGABYTE);
syncState = new SyncState(blockchain, mockEthPeers);
TransactionPoolConfiguration txPoolConfig =
ImmutableTransactionPoolConfiguration.builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -291,12 +291,13 @@ public RequestManager.ResponseStream send(
if (messageData.getSize() > maxMessageSize) {
// This is a bug or else a misconfiguration of the max message size.
LOG.error(
"Sending {} message to peer ({}) which exceeds local message size limit of {} bytes. Message code: {}, Message Size: {}",
"Dropping {} message to peer ({}) which exceeds local message size limit of {} bytes. Message code: {}, Message Size: {}",
protocolName,
this,
maxMessageSize,
messageData.getCode(),
messageData.getSize());
return null;
}

if (requestManagers.containsKey(protocolName)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,8 @@ public EthProtocolManager(
this.ethMessages = ethMessages;
this.ethContext = ethContext;

this.blockBroadcaster = new BlockBroadcaster(ethContext);
this.blockBroadcaster =
new BlockBroadcaster(ethContext, ethereumWireProtocolConfiguration.getMaxMessageSize());

this.supportedCapabilities =
calculateCapabilities(synchronizerConfiguration, ethereumWireProtocolConfiguration);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,20 @@ public int getCode() {
return MESSAGE_CODE;
}

public static NewBlockMessage create(final Block block, final Difficulty totalDifficulty) {
public static NewBlockMessage create(
final Block block, final Difficulty totalDifficulty, final int maxMessageSize)
throws IllegalArgumentException {
final NewBlockMessageData msgData = new NewBlockMessageData(block, totalDifficulty);
final BytesValueRLPOutput out = new BytesValueRLPOutput();
msgData.writeTo(out);
return new NewBlockMessage(out.encoded());
final Bytes data = out.encoded();
if (data.size() > maxMessageSize) {
throw new IllegalArgumentException(
String.format(
"Block message size %d bytes is larger than allowed message size %d bytes",
data.size(), maxMessageSize));
}
return new NewBlockMessage(data);
}

public static NewBlockMessage readFrom(final MessageData message) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,13 @@ public class BlockBroadcaster {
private static final Logger LOG = LoggerFactory.getLogger(BlockBroadcaster.class);

private final EthContext ethContext;
private final int maxMessageSize;
private final Subscribers<BlockPropagatedSubscriber> blockPropagatedSubscribers =
Subscribers.create();

public BlockBroadcaster(final EthContext ethContext) {
public BlockBroadcaster(final EthContext ethContext, final int maxMessageSize) {
this.ethContext = ethContext;
this.maxMessageSize = maxMessageSize;
}

public long subscribePropagateNewBlocks(final BlockPropagatedSubscriber callback) {
Expand All @@ -45,7 +47,13 @@ public void unsubscribePropagateNewBlocks(final long id) {

public void propagate(final Block block, final Difficulty totalDifficulty) {
blockPropagatedSubscribers.forEach(listener -> listener.accept(block, totalDifficulty));
final NewBlockMessage newBlockMessage = NewBlockMessage.create(block, totalDifficulty);
final NewBlockMessage newBlockMessage;
try {
newBlockMessage = NewBlockMessage.create(block, totalDifficulty, this.maxMessageSize);
} catch (final IllegalArgumentException e) {
LOG.error("Failed to create block", e);
return;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what are the implications of catching this here

Copy link

@rodion-lim-partior rodion-lim-partior Aug 23, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@macfarla Original send in EthPeers catches the error (when message size > limit) instead of throwing it up. Would it better to throw the error? I wasn't too sure on the original intention when they caught the error instead of throwing it.

ethContext
.getEthPeers()
.streamAvailablePeers()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,20 +24,23 @@
import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.RawMessage;
import org.hyperledger.besu.ethereum.rlp.BytesValueRLPOutput;
import org.hyperledger.besu.util.number.ByteUnits;

import org.apache.tuweni.bytes.Bytes;
import org.junit.jupiter.api.Test;

public class NewBlockMessageTest {
private static final ProtocolSchedule protocolSchedule = ProtocolScheduleFixture.MAINNET;
private static final int maxMessageSize = 10 * ByteUnits.MEGABYTE;

@Test
public void roundTripNewBlockMessage() {
final Difficulty totalDifficulty = Difficulty.of(98765);
final BlockDataGenerator blockGenerator = new BlockDataGenerator();
final Block blockForInsertion = blockGenerator.block();

final NewBlockMessage msg = NewBlockMessage.create(blockForInsertion, totalDifficulty);
final NewBlockMessage msg =
NewBlockMessage.create(blockForInsertion, totalDifficulty, maxMessageSize);
assertThat(msg.getCode()).isEqualTo(EthPV62.NEW_BLOCK);
assertThat(msg.totalDifficulty(protocolSchedule)).isEqualTo(totalDifficulty);
final Block extractedBlock = msg.block(protocolSchedule);
Expand Down Expand Up @@ -73,4 +76,14 @@ public void readFromMessageWithWrongCodeThrows() {
assertThatExceptionOfType(IllegalArgumentException.class)
.isThrownBy(() -> NewBlockMessage.readFrom(rawMsg));
}

@Test
public void createBlockMessageLargerThanLimitThrows() {
final Difficulty totalDifficulty = Difficulty.of(98765);
final BlockDataGenerator blockGenerator = new BlockDataGenerator();
final Block newBlock = blockGenerator.block();

assertThatExceptionOfType(IllegalArgumentException.class)
.isThrownBy(() -> NewBlockMessage.create(newBlock, totalDifficulty, 1));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
import org.hyperledger.besu.plugin.services.MetricsSystem;
import org.hyperledger.besu.plugin.services.storage.DataStorageFormat;
import org.hyperledger.besu.testutil.TestClock;
import org.hyperledger.besu.util.number.ByteUnits;

import java.util.Collections;
import java.util.List;
Expand Down Expand Up @@ -95,6 +96,7 @@ public abstract class AbstractBlockPropagationManagerTest {
protected SyncState syncState;
protected final MetricsSystem metricsSystem = new NoOpMetricsSystem();
private final Hash finalizedHash = Hash.fromHexStringLenient("0x1337");
private final int maxMessageSize = 10 * ByteUnits.MEGABYTE;

protected void setup(final DataStorageFormat dataStorageFormat) {
blockchainUtil = BlockchainSetupUtil.forTesting(dataStorageFormat);
Expand Down Expand Up @@ -222,11 +224,14 @@ public void importsAnnouncedNewBlocks_aheadOfChainInOrder() {
final RespondingEthPeer peer = EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 0);
final NewBlockMessage nextAnnouncement =
NewBlockMessage.create(
nextBlock, getFullBlockchain().getTotalDifficultyByHash(nextBlock.getHash()).get());
nextBlock,
getFullBlockchain().getTotalDifficultyByHash(nextBlock.getHash()).get(),
maxMessageSize);
final NewBlockMessage nextNextAnnouncement =
NewBlockMessage.create(
nextNextBlock,
getFullBlockchain().getTotalDifficultyByHash(nextNextBlock.getHash()).get());
getFullBlockchain().getTotalDifficultyByHash(nextNextBlock.getHash()).get(),
maxMessageSize);
final Responder responder = RespondingEthPeer.blockchainResponder(getFullBlockchain());

// Broadcast first message
Expand Down Expand Up @@ -256,11 +261,14 @@ public void importsAnnouncedNewBlocks_aheadOfChainOutOfOrder() {
final RespondingEthPeer peer = EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 0);
final NewBlockMessage nextAnnouncement =
NewBlockMessage.create(
nextBlock, getFullBlockchain().getTotalDifficultyByHash(nextBlock.getHash()).get());
nextBlock,
getFullBlockchain().getTotalDifficultyByHash(nextBlock.getHash()).get(),
maxMessageSize);
final NewBlockMessage nextNextAnnouncement =
NewBlockMessage.create(
nextNextBlock,
getFullBlockchain().getTotalDifficultyByHash(nextNextBlock.getHash()).get());
getFullBlockchain().getTotalDifficultyByHash(nextNextBlock.getHash()).get(),
maxMessageSize);
final Responder responder = RespondingEthPeer.blockchainResponder(getFullBlockchain());

// Broadcast second message first
Expand Down Expand Up @@ -299,15 +307,19 @@ public void importsMixedOutOfOrderMessages() {
block1.getHash(), block1.getHeader().getNumber())));
final NewBlockMessage block2Msg =
NewBlockMessage.create(
block2, getFullBlockchain().getTotalDifficultyByHash(block2.getHash()).get());
block2,
getFullBlockchain().getTotalDifficultyByHash(block2.getHash()).get(),
maxMessageSize);
final NewBlockHashesMessage block3Msg =
NewBlockHashesMessage.create(
Collections.singletonList(
new NewBlockHashesMessage.NewBlockHash(
block3.getHash(), block3.getHeader().getNumber())));
final NewBlockMessage block4Msg =
NewBlockMessage.create(
block4, getFullBlockchain().getTotalDifficultyByHash(block4.getHash()).get());
block4,
getFullBlockchain().getTotalDifficultyByHash(block4.getHash()).get(),
maxMessageSize);
final Responder responder = RespondingEthPeer.blockchainResponder(getFullBlockchain());

// Broadcast older blocks
Expand Down Expand Up @@ -362,7 +374,9 @@ public void handlesDuplicateAnnouncements() {
nextBlock.getHash(), nextBlock.getHeader().getNumber())));
final NewBlockMessage newBlock =
NewBlockMessage.create(
nextBlock, getFullBlockchain().getTotalDifficultyByHash(nextBlock.getHash()).get());
nextBlock,
getFullBlockchain().getTotalDifficultyByHash(nextBlock.getHash()).get(),
maxMessageSize);
final Responder responder = RespondingEthPeer.blockchainResponder(getFullBlockchain());

// Broadcast first message
Expand Down Expand Up @@ -413,7 +427,9 @@ public void handlesPendingDuplicateAnnouncements() {
nextBlock.getHash(), nextBlock.getHeader().getNumber())));
final NewBlockMessage newBlock =
NewBlockMessage.create(
nextBlock, getFullBlockchain().getTotalDifficultyByHash(nextBlock.getHash()).get());
nextBlock,
getFullBlockchain().getTotalDifficultyByHash(nextBlock.getHash()).get(),
maxMessageSize);

// Broadcast messages
EthProtocolManagerTestUtil.broadcastMessage(ethProtocolManager, peer, newBlock);
Expand Down Expand Up @@ -467,7 +483,9 @@ public void ignoresFutureNewBlockAnnouncement() {
final RespondingEthPeer peer = EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 0);
final NewBlockMessage futureAnnouncement =
NewBlockMessage.create(
futureBlock, getFullBlockchain().getTotalDifficultyByHash(futureBlock.getHash()).get());
futureBlock,
getFullBlockchain().getTotalDifficultyByHash(futureBlock.getHash()).get(),
maxMessageSize);

// Broadcast
EthProtocolManagerTestUtil.broadcastMessage(ethProtocolManager, peer, futureAnnouncement);
Expand Down Expand Up @@ -522,7 +540,8 @@ public void ignoresOldNewBlockAnnouncement() {

// Setup peer and messages
final RespondingEthPeer peer = EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 0);
final NewBlockMessage oldAnnouncement = NewBlockMessage.create(oldBlock, Difficulty.ZERO);
final NewBlockMessage oldAnnouncement =
NewBlockMessage.create(oldBlock, Difficulty.ZERO, maxMessageSize);

// Broadcast
EthProtocolManagerTestUtil.broadcastMessage(ethProtocolManager, peer, oldAnnouncement);
Expand Down Expand Up @@ -559,7 +578,7 @@ public void purgesOldBlocks() {
blockPropagationManager.start();
final RespondingEthPeer peer = EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 0);
final NewBlockMessage blockAnnouncementMsg =
NewBlockMessage.create(blockToPurge, Difficulty.ZERO);
NewBlockMessage.create(blockToPurge, Difficulty.ZERO, maxMessageSize);

// Broadcast
EthProtocolManagerTestUtil.broadcastMessage(ethProtocolManager, peer, blockAnnouncementMsg);
Expand Down Expand Up @@ -597,7 +616,8 @@ public void updatesChainHeadWhenNewBlockMessageReceived() {
getFullBlockchain().getTotalDifficultyByHash(nextBlock.getHeader().getParentHash()).get();
final Difficulty totalDifficulty =
getFullBlockchain().getTotalDifficultyByHash(nextBlock.getHash()).get();
final NewBlockMessage nextAnnouncement = NewBlockMessage.create(nextBlock, totalDifficulty);
final NewBlockMessage nextAnnouncement =
NewBlockMessage.create(nextBlock, totalDifficulty, maxMessageSize);

// Broadcast message
EthProtocolManagerTestUtil.broadcastMessage(ethProtocolManager, peer, nextAnnouncement);
Expand Down Expand Up @@ -735,7 +755,8 @@ public void verifyBroadcastBlockInvocation() {

final Difficulty totalDifficulty =
getFullBlockchain().getTotalDifficultyByHash(block.getHash()).get();
final NewBlockMessage newBlockMessage = NewBlockMessage.create(block, totalDifficulty);
final NewBlockMessage newBlockMessage =
NewBlockMessage.create(block, totalDifficulty, maxMessageSize);

// Broadcast message
EthProtocolManagerTestUtil.broadcastMessage(ethProtocolManager, peer, newBlockMessage);
Expand Down Expand Up @@ -933,7 +954,9 @@ public void shouldNotListenToNewBlockAnnouncementsWhenTTDReachedAndFinal() {
final RespondingEthPeer peer = EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 0);
final NewBlockMessage nextAnnouncement =
NewBlockMessage.create(
nextBlock, getFullBlockchain().getTotalDifficultyByHash(nextBlock.getHash()).get());
nextBlock,
getFullBlockchain().getTotalDifficultyByHash(nextBlock.getHash()).get(),
maxMessageSize);
final Responder responder = RespondingEthPeer.blockchainResponder(getFullBlockchain());

syncState.setReachedTerminalDifficulty(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.hyperledger.besu.ethereum.eth.manager.EthPeers;
import org.hyperledger.besu.ethereum.eth.messages.NewBlockMessage;
import org.hyperledger.besu.ethereum.p2p.rlpx.connections.PeerConnection;
import org.hyperledger.besu.util.number.ByteUnits;

import java.util.Collections;
import java.util.stream.Stream;
Expand All @@ -38,6 +39,8 @@

public class BlockBroadcasterTest {

final int maxMessageSize = 10 * ByteUnits.MEGABYTE;

@Test
public void blockPropagationUnitTest() throws PeerConnection.PeerNotConnected {
final EthPeer ethPeer = mock(EthPeer.class);
Expand All @@ -47,10 +50,10 @@ public void blockPropagationUnitTest() throws PeerConnection.PeerNotConnected {
final EthContext ethContext = mock(EthContext.class);
when(ethContext.getEthPeers()).thenReturn(ethPeers);

final BlockBroadcaster blockBroadcaster = new BlockBroadcaster(ethContext);
final BlockBroadcaster blockBroadcaster = new BlockBroadcaster(ethContext, maxMessageSize);
final Block block = generateBlock();
final NewBlockMessage newBlockMessage =
NewBlockMessage.create(block, block.getHeader().getDifficulty());
NewBlockMessage.create(block, block.getHeader().getDifficulty(), maxMessageSize);

blockBroadcaster.propagate(block, Difficulty.ZERO);

Expand All @@ -70,10 +73,10 @@ public void blockPropagationUnitTestSeenUnseen() throws PeerConnection.PeerNotCo
final EthContext ethContext = mock(EthContext.class);
when(ethContext.getEthPeers()).thenReturn(ethPeers);

final BlockBroadcaster blockBroadcaster = new BlockBroadcaster(ethContext);
final BlockBroadcaster blockBroadcaster = new BlockBroadcaster(ethContext, maxMessageSize);
final Block block = generateBlock();
final NewBlockMessage newBlockMessage =
NewBlockMessage.create(block, block.getHeader().getDifficulty());
NewBlockMessage.create(block, block.getHeader().getDifficulty(), maxMessageSize);

blockBroadcaster.propagate(block, Difficulty.ZERO);

Expand Down
Loading