Skip to content
This repository has been archived by the owner on Sep 26, 2019. It is now read-only.

Commit

Permalink
[NC-2058] Improve block propagation time (#808)
Browse files Browse the repository at this point in the history
* o

* add test

* clean up

* scaffolding

* update

* update

* comments

* add test

* update

* update ii

* format

* update ii

* fix

* verifyBroadcastBlockInvocation

* test

* update

* update to difficulty calculation

* remove BlockBroadcasterTest from this pr

* update

* update

* update II
  • Loading branch information
smatthewenglish authored Feb 18, 2019
1 parent 6c05c21 commit 7171172
Show file tree
Hide file tree
Showing 5 changed files with 101 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ public ResponseStream send(final MessageData messageData) throws PeerNotConnecte
}

public void propagateBlock(final Block block, final UInt256 totalDifficulty) {
registerKnownBlock(block.getHash());
final NewBlockMessage newBlockMessage = NewBlockMessage.create(block, totalDifficulty);
try {
connection.sendForProtocol(protocolName, newBlockMessage);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright 2019 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.pantheon.ethereum.eth.sync;

import tech.pegasys.pantheon.ethereum.core.Block;
import tech.pegasys.pantheon.ethereum.eth.manager.EthContext;
import tech.pegasys.pantheon.util.uint.UInt256;

class BlockBroadcaster {

private final EthContext ethContext;

BlockBroadcaster(final EthContext ethContext) {
this.ethContext = ethContext;
}

void propagate(final Block block, final UInt256 difficulty) {
ethContext
.getEthPeers()
.availablePeers()
.filter(ethPeer -> !ethPeer.hasSeenBlock(block.getHash()))
.forEach(ethPeer -> ethPeer.propagateBlock(block, difficulty));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import tech.pegasys.pantheon.ethereum.chain.BlockAddedEvent.EventType;
import tech.pegasys.pantheon.ethereum.chain.Blockchain;
import tech.pegasys.pantheon.ethereum.core.Block;
import tech.pegasys.pantheon.ethereum.core.BlockHeader;
import tech.pegasys.pantheon.ethereum.core.Hash;
import tech.pegasys.pantheon.ethereum.eth.manager.AbstractPeerTask;
import tech.pegasys.pantheon.ethereum.eth.manager.EthContext;
Expand All @@ -30,8 +31,10 @@
import tech.pegasys.pantheon.ethereum.eth.sync.state.SyncState;
import tech.pegasys.pantheon.ethereum.eth.sync.tasks.GetBlockFromPeerTask;
import tech.pegasys.pantheon.ethereum.eth.sync.tasks.PersistBlockTask;
import tech.pegasys.pantheon.ethereum.mainnet.BlockHeaderValidator;
import tech.pegasys.pantheon.ethereum.mainnet.HeaderValidationMode;
import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule;
import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSpec;
import tech.pegasys.pantheon.ethereum.p2p.wire.messages.DisconnectMessage.DisconnectReason;
import tech.pegasys.pantheon.ethereum.rlp.RLPException;
import tech.pegasys.pantheon.metrics.LabelledMetric;
Expand Down Expand Up @@ -63,6 +66,7 @@ public class BlockPropagationManager<C> {
private final EthContext ethContext;
private final SyncState syncState;
private final LabelledMetric<OperationTimer> ethTasksTimer;
private final BlockBroadcaster blockBroadcaster;

private final AtomicBoolean started = new AtomicBoolean(false);

Expand All @@ -77,13 +81,14 @@ public class BlockPropagationManager<C> {
final EthContext ethContext,
final SyncState syncState,
final PendingBlocks pendingBlocks,
final LabelledMetric<OperationTimer> ethTasksTimer) {
final LabelledMetric<OperationTimer> ethTasksTimer,
final BlockBroadcaster blockBroadcaster) {
this.config = config;
this.protocolSchedule = protocolSchedule;
this.protocolContext = protocolContext;
this.ethContext = ethContext;
this.ethTasksTimer = ethTasksTimer;

this.blockBroadcaster = blockBroadcaster;
this.syncState = syncState;
this.pendingBlocks = pendingBlocks;
}
Expand All @@ -105,6 +110,32 @@ private void setupListeners() {
.subscribe(EthPV62.NEW_BLOCK_HASHES, this::handleNewBlockHashesFromNetwork);
}

protected void validateAndBroadcastBlock(final Block block) {
final ProtocolSpec<C> protocolSpec =
protocolSchedule.getByBlockNumber(block.getHeader().getNumber());
final BlockHeaderValidator<C> blockHeaderValidator = protocolSpec.getBlockHeaderValidator();
final BlockHeader parent =
protocolContext
.getBlockchain()
.getBlockHeader(block.getHeader().getParentHash())
.orElseThrow(
() ->
new IllegalArgumentException(
"Incapable of retrieving header from non-existent parent of "
+ block.getHeader().getNumber()
+ "."));
if (blockHeaderValidator.validateHeader(
block.getHeader(), parent, protocolContext, HeaderValidationMode.FULL)) {
final UInt256 totalDifficulty =
protocolContext
.getBlockchain()
.getTotalDifficultyByHash(parent.getHash())
.get()
.plus(block.getHeader().getDifficulty());
blockBroadcaster.propagate(block, totalDifficulty);
}
}

private void onBlockAdded(final BlockAddedEvent blockAddedEvent, final Blockchain blockchain) {
// Check to see if any of our pending blocks are now ready for import
final Block newBlock = blockAddedEvent.getBlock();
Expand Down Expand Up @@ -144,23 +175,13 @@ private void onBlockAdded(final BlockAddedEvent blockAddedEvent, final Blockchai
}
}

void broadcastBlock(final Block block, final UInt256 difficulty) {
ethContext
.getEthPeers()
.availablePeers()
.forEach(ethPeer -> ethPeer.propagateBlock(block, difficulty));
}

void handleNewBlockFromNetwork(final EthMessage message) {
final Blockchain blockchain = protocolContext.getBlockchain();
final NewBlockMessage newBlockMessage = NewBlockMessage.readFrom(message.getData());
try {
final Block block = newBlockMessage.block(protocolSchedule);
final UInt256 totalDifficulty = newBlockMessage.totalDifficulty(protocolSchedule);

// TODO: Extract broadcast functionality to independent class.
// broadcastBlock(block, totalDifficulty);

message.getPeer().chainState().updateForAnnouncedBlock(block.getHeader(), totalDifficulty);

// Return early if we don't care about this block
Expand Down Expand Up @@ -272,6 +293,8 @@ CompletableFuture<Block> importOrSavePendingBlock(final Block block) {
return CompletableFuture.completedFuture(block);
}

validateAndBroadcastBlock(block);

// Import block
final PersistBlockTask<C> importTask =
PersistBlockTask.create(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,8 @@ public DefaultSynchronizer(
ethContext,
syncState,
new PendingBlocks(),
ethTasksTimer);
ethTasksTimer,
new BlockBroadcaster(ethContext));

ChainHeadTracker.trackChainHeadForPeers(
ethContext, protocolSchedule, protocolContext.getBlockchain(), syncConfig, ethTasksTimer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ public class BlockPropagationManagerTest {
private ProtocolSchedule<Void> protocolSchedule;
private ProtocolContext<Void> protocolContext;
private MutableBlockchain blockchain;
private BlockBroadcaster blockBroadcaster;
private EthProtocolManager ethProtocolManager;
private BlockPropagationManager<Void> blockPropagationManager;
private SynchronizerConfiguration syncConfig;
Expand Down Expand Up @@ -90,6 +91,7 @@ public void setup() {
EthProtocolManagerTestUtil.create(blockchain, blockchainUtil.getWorldArchive());
syncConfig = SynchronizerConfiguration.builder().blockPropagationRange(-3, 5).build();
syncState = new SyncState(blockchain, ethProtocolManager.ethContext().getEthPeers());
blockBroadcaster = mock(BlockBroadcaster.class);
blockPropagationManager =
new BlockPropagationManager<>(
syncConfig,
Expand All @@ -98,7 +100,8 @@ public void setup() {
ethProtocolManager.ethContext(),
syncState,
pendingBlocks,
ethTasksTimer);
ethTasksTimer,
blockBroadcaster);
}

@Test
Expand Down Expand Up @@ -471,7 +474,8 @@ public void purgesOldBlocks() {
ethProtocolManager.ethContext(),
syncState,
pendingBlocks,
ethTasksTimer);
ethTasksTimer,
blockBroadcaster);

final BlockDataGenerator gen = new BlockDataGenerator();
// Import some blocks
Expand Down Expand Up @@ -551,7 +555,8 @@ public void shouldNotImportBlocksThatAreAlreadyBeingImported() {
ethContext,
syncState,
pendingBlocks,
ethTasksTimer);
ethTasksTimer,
blockBroadcaster);

blockchainUtil.importFirstBlocks(2);
final Block nextBlock = blockchainUtil.getBlock(2);
Expand All @@ -561,4 +566,25 @@ public void shouldNotImportBlocksThatAreAlreadyBeingImported() {

verify(ethScheduler, times(1)).scheduleSyncWorkerTask(any(Supplier.class));
}

@Test
public void verifyBroadcastBlockInvocation() {
blockchainUtil.importFirstBlocks(2);
final Block block = blockchainUtil.getBlock(2);
blockPropagationManager.start();

// Setup peer and messages
final RespondingEthPeer peer = EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 0);

final UInt256 totalDifficulty = fullBlockchain.getTotalDifficultyByHash(block.getHash()).get();
final NewBlockMessage newBlockMessage = NewBlockMessage.create(block, totalDifficulty);

// Broadcast message
EthProtocolManagerTestUtil.broadcastMessage(ethProtocolManager, peer, newBlockMessage);

final Responder responder = RespondingEthPeer.blockchainResponder(fullBlockchain);
peer.respondWhile(responder, peer::hasOutstandingRequests);

verify(blockBroadcaster, times(1)).propagate(block, totalDifficulty);
}
}

0 comments on commit 7171172

Please sign in to comment.