Skip to content
This repository has been archived by the owner on Sep 26, 2019. It is now read-only.

[NC-2058] Improve block propagation time #808

Merged
merged 25 commits into from
Feb 18, 2019
Merged
Show file tree
Hide file tree
Changes from 21 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Copyright 2019 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.pantheon.ethereum.eth.sync;

import tech.pegasys.pantheon.ethereum.core.Block;
import tech.pegasys.pantheon.ethereum.eth.manager.EthContext;
import tech.pegasys.pantheon.util.uint.UInt256;

class BlockBroadcaster {

private final EthContext ethContext;

BlockBroadcaster(final EthContext ethContext) {
this.ethContext = ethContext;
}

void propagate(final Block block, final UInt256 difficulty) {
ethContext
.getEthPeers()
.availablePeers()
.forEach(ethPeer -> ethPeer.propagateBlock(block, difficulty));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We'll need to filter out peers that have already seen this block:
.filter(ethPeer -> ~ethPeer.hasSeenBlock(block.getHash()))
and then should add a test for that as well.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And in EthPeer.propagateBlock we should add the block to the knownBlocks set.

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import tech.pegasys.pantheon.ethereum.chain.BlockAddedEvent.EventType;
import tech.pegasys.pantheon.ethereum.chain.Blockchain;
import tech.pegasys.pantheon.ethereum.core.Block;
import tech.pegasys.pantheon.ethereum.core.BlockHeader;
import tech.pegasys.pantheon.ethereum.core.Hash;
import tech.pegasys.pantheon.ethereum.eth.manager.AbstractPeerTask;
import tech.pegasys.pantheon.ethereum.eth.manager.EthContext;
Expand All @@ -30,8 +31,10 @@
import tech.pegasys.pantheon.ethereum.eth.sync.state.SyncState;
import tech.pegasys.pantheon.ethereum.eth.sync.tasks.GetBlockFromPeerTask;
import tech.pegasys.pantheon.ethereum.eth.sync.tasks.PersistBlockTask;
import tech.pegasys.pantheon.ethereum.mainnet.BlockHeaderValidator;
import tech.pegasys.pantheon.ethereum.mainnet.HeaderValidationMode;
import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule;
import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSpec;
import tech.pegasys.pantheon.ethereum.p2p.wire.messages.DisconnectMessage.DisconnectReason;
import tech.pegasys.pantheon.ethereum.rlp.RLPException;
import tech.pegasys.pantheon.metrics.LabelledMetric;
Expand Down Expand Up @@ -63,6 +66,7 @@ public class BlockPropagationManager<C> {
private final EthContext ethContext;
private final SyncState syncState;
private final LabelledMetric<OperationTimer> ethTasksTimer;
private final BlockBroadcaster blockBroadcaster;

private final AtomicBoolean started = new AtomicBoolean(false);

Expand All @@ -77,13 +81,14 @@ public class BlockPropagationManager<C> {
final EthContext ethContext,
final SyncState syncState,
final PendingBlocks pendingBlocks,
final LabelledMetric<OperationTimer> ethTasksTimer) {
final LabelledMetric<OperationTimer> ethTasksTimer,
final BlockBroadcaster blockBroadcaster) {
this.config = config;
this.protocolSchedule = protocolSchedule;
this.protocolContext = protocolContext;
this.ethContext = ethContext;
this.ethTasksTimer = ethTasksTimer;

this.blockBroadcaster = blockBroadcaster;
this.syncState = syncState;
this.pendingBlocks = pendingBlocks;
}
Expand All @@ -105,6 +110,32 @@ private void setupListeners() {
.subscribe(EthPV62.NEW_BLOCK_HASHES, this::handleNewBlockHashesFromNetwork);
}

protected void validateAndBroadcastBlock(final Block block) {
final ProtocolSpec<C> protocolSpec =
protocolSchedule.getByBlockNumber(block.getHeader().getNumber());
final BlockHeaderValidator<C> blockHeaderValidator = protocolSpec.getBlockHeaderValidator();
final BlockHeader parent =
protocolContext
.getBlockchain()
.getBlockHeader(block.getHeader().getParentHash())
.orElseThrow(
() ->
new IllegalArgumentException(
"Incapable of retrieving header from non-existent parent of "
+ block.getHeader().getNumber()
+ "."));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is ok for this PR, but could you follow up with a PR to change importOrSavePendingBlock so that instead of just using protocolContext.getBlockchain().contains(block.getHeader().getParentHash()) it actually gets the parent header and checks if it's present. Then you can pass that header into broadcastBlock and not need this exception. Just be careful that the lookup of the parent header stays inside the synchronized block.

if (blockHeaderValidator.validateHeader(
block.getHeader(), parent, protocolContext, HeaderValidationMode.FULL)) {
final UInt256 totalDifficulty =
protocolContext
.getBlockchain()
.getTotalDifficultyByHash(parent.getHash())
.get()
.plus(block.getHeader().getDifficulty());
blockBroadcaster.propagate(block, totalDifficulty);
}
}

private void onBlockAdded(final BlockAddedEvent blockAddedEvent, final Blockchain blockchain) {
// Check to see if any of our pending blocks are now ready for import
final Block newBlock = blockAddedEvent.getBlock();
Expand Down Expand Up @@ -144,23 +175,13 @@ private void onBlockAdded(final BlockAddedEvent blockAddedEvent, final Blockchai
}
}

void broadcastBlock(final Block block, final UInt256 difficulty) {
ethContext
.getEthPeers()
.availablePeers()
.forEach(ethPeer -> ethPeer.propagateBlock(block, difficulty));
}

void handleNewBlockFromNetwork(final EthMessage message) {
final Blockchain blockchain = protocolContext.getBlockchain();
final NewBlockMessage newBlockMessage = NewBlockMessage.readFrom(message.getData());
try {
final Block block = newBlockMessage.block(protocolSchedule);
final UInt256 totalDifficulty = newBlockMessage.totalDifficulty(protocolSchedule);

// TODO: Extract broadcast functionality to independent class.
// broadcastBlock(block, totalDifficulty);

message.getPeer().chainState().updateForAnnouncedBlock(block.getHeader(), totalDifficulty);

// Return early if we don't care about this block
Expand Down Expand Up @@ -272,6 +293,8 @@ CompletableFuture<Block> importOrSavePendingBlock(final Block block) {
return CompletableFuture.completedFuture(block);
}

validateAndBroadcastBlock(block);

// Import block
final PersistBlockTask<C> importTask =
PersistBlockTask.create(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,8 @@ public DefaultSynchronizer(
ethContext,
syncState,
new PendingBlocks(),
ethTasksTimer);
ethTasksTimer,
new BlockBroadcaster(ethContext));

ChainHeadTracker.trackChainHeadForPeers(
ethContext, protocolSchedule, protocolContext.getBlockchain(), syncConfig, ethTasksTimer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ public class BlockPropagationManagerTest {
private ProtocolSchedule<Void> protocolSchedule;
private ProtocolContext<Void> protocolContext;
private MutableBlockchain blockchain;
private BlockBroadcaster blockBroadcaster;
private EthProtocolManager ethProtocolManager;
private BlockPropagationManager<Void> blockPropagationManager;
private SynchronizerConfiguration syncConfig;
Expand Down Expand Up @@ -90,6 +91,7 @@ public void setup() {
EthProtocolManagerTestUtil.create(blockchain, blockchainUtil.getWorldArchive());
syncConfig = SynchronizerConfiguration.builder().blockPropagationRange(-3, 5).build();
syncState = new SyncState(blockchain, ethProtocolManager.ethContext().getEthPeers());
blockBroadcaster = mock(BlockBroadcaster.class);
blockPropagationManager =
new BlockPropagationManager<>(
syncConfig,
Expand All @@ -98,7 +100,8 @@ public void setup() {
ethProtocolManager.ethContext(),
syncState,
pendingBlocks,
ethTasksTimer);
ethTasksTimer,
blockBroadcaster);
}

@Test
Expand Down Expand Up @@ -471,7 +474,8 @@ public void purgesOldBlocks() {
ethProtocolManager.ethContext(),
syncState,
pendingBlocks,
ethTasksTimer);
ethTasksTimer,
blockBroadcaster);

final BlockDataGenerator gen = new BlockDataGenerator();
// Import some blocks
Expand Down Expand Up @@ -551,7 +555,8 @@ public void shouldNotImportBlocksThatAreAlreadyBeingImported() {
ethContext,
syncState,
pendingBlocks,
ethTasksTimer);
ethTasksTimer,
blockBroadcaster);

blockchainUtil.importFirstBlocks(2);
final Block nextBlock = blockchainUtil.getBlock(2);
Expand All @@ -561,4 +566,25 @@ public void shouldNotImportBlocksThatAreAlreadyBeingImported() {

verify(ethScheduler, times(1)).scheduleSyncWorkerTask(any(Supplier.class));
}

@Test
public void verifyBroadcastBlockInvocation() {
blockchainUtil.importFirstBlocks(2);
final Block block = blockchainUtil.getBlock(2);
blockPropagationManager.start();

// Setup peer and messages
final RespondingEthPeer peer = EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 0);

final UInt256 totalDifficulty = fullBlockchain.getTotalDifficultyByHash(block.getHash()).get();
final NewBlockMessage newBlockMessage = NewBlockMessage.create(block, totalDifficulty);

// Broadcast message
EthProtocolManagerTestUtil.broadcastMessage(ethProtocolManager, peer, newBlockMessage);

final Responder responder = RespondingEthPeer.blockchainResponder(fullBlockchain);
peer.respondWhile(responder, peer::hasOutstandingRequests);

verify(blockBroadcaster, times(1)).propagate(block, totalDifficulty);
}
}