diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/BlockPropagationManager.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/BlockPropagationManager.java index 4be7e72939..b853c102fb 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/BlockPropagationManager.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/BlockPropagationManager.java @@ -58,275 +58,281 @@ import org.apache.logging.log4j.Logger; public class BlockPropagationManager { - private static final Logger LOG = LogManager.getLogger(); - - private final SynchronizerConfiguration config; - private final ProtocolSchedule protocolSchedule; - private final ProtocolContext protocolContext; - private final EthContext ethContext; - private final SyncState syncState; - private final LabelledMetric ethTasksTimer; - private final BlockBroadcaster blockBroadcaster; - - private final AtomicBoolean started = new AtomicBoolean(false); - - private final Set requestedBlocks = Collections.newSetFromMap(new ConcurrentHashMap<>()); - private final Set importingBlocks = Collections.newSetFromMap(new ConcurrentHashMap<>()); - private final PendingBlocks pendingBlocks; - - BlockPropagationManager( - final SynchronizerConfiguration config, - final ProtocolSchedule protocolSchedule, - final ProtocolContext protocolContext, - final EthContext ethContext, - final SyncState syncState, - final PendingBlocks pendingBlocks, - final LabelledMetric ethTasksTimer, - final BlockBroadcaster blockBroadcaster) { - this.config = config; - this.protocolSchedule = protocolSchedule; - this.protocolContext = protocolContext; - this.ethContext = ethContext; - this.ethTasksTimer = ethTasksTimer; - this.blockBroadcaster = blockBroadcaster; - this.syncState = syncState; - this.pendingBlocks = pendingBlocks; - } - - public void start() { - if (started.compareAndSet(false, true)) { - setupListeners(); - } else { - throw new IllegalStateException( - "Attempt to start an already started " + this.getClass().getSimpleName() + "."); + private static final Logger LOG = LogManager.getLogger(); + + private final SynchronizerConfiguration config; + private final ProtocolSchedule protocolSchedule; + private final ProtocolContext protocolContext; + private final EthContext ethContext; + private final SyncState syncState; + private final LabelledMetric ethTasksTimer; + private final BlockBroadcaster blockBroadcaster; + + private final AtomicBoolean started = new AtomicBoolean(false); + + private final Set requestedBlocks = Collections.newSetFromMap(new ConcurrentHashMap<>()); + private final Set importingBlocks = Collections.newSetFromMap(new ConcurrentHashMap<>()); + private final PendingBlocks pendingBlocks; + + BlockPropagationManager( + final SynchronizerConfiguration config, + final ProtocolSchedule protocolSchedule, + final ProtocolContext protocolContext, + final EthContext ethContext, + final SyncState syncState, + final PendingBlocks pendingBlocks, + final LabelledMetric ethTasksTimer, + final BlockBroadcaster blockBroadcaster) { + this.config = config; + this.protocolSchedule = protocolSchedule; + this.protocolContext = protocolContext; + this.ethContext = ethContext; + this.ethTasksTimer = ethTasksTimer; + this.blockBroadcaster = blockBroadcaster; + this.syncState = syncState; + this.pendingBlocks = pendingBlocks; } - } - - private void setupListeners() { - protocolContext.getBlockchain().observeBlockAdded(this::onBlockAdded); - ethContext.getEthMessages().subscribe(EthPV62.NEW_BLOCK, this::handleNewBlockFromNetwork); - ethContext - .getEthMessages() - .subscribe(EthPV62.NEW_BLOCK_HASHES, this::handleNewBlockHashesFromNetwork); - } - - private void validateAndBroadcastBlock(final Block block) { - final ProtocolSpec protocolSpec = - protocolSchedule.getByBlockNumber(block.getHeader().getNumber()); - final BlockHeaderValidator blockHeaderValidator = protocolSpec.getBlockHeaderValidator(); - final BlockHeader parent = - protocolContext - .getBlockchain() - .getBlockHeader(block.getHeader().getParentHash()) - .orElseThrow( - () -> - new IllegalArgumentException( - "Incapable of retrieving header from non-existent parent of " - + block.getHeader().getNumber() - + ".")); - if (blockHeaderValidator.validateHeader( - block.getHeader(), parent, protocolContext, HeaderValidationMode.FULL)) { - final UInt256 totalDifficulty = - protocolContext - .getBlockchain() - .getTotalDifficultyByHash(parent.getHash()) - .get() - .plus(block.getHeader().getDifficulty()); - blockBroadcaster.propagate(block, totalDifficulty); - } - } - - private void onBlockAdded(final BlockAddedEvent blockAddedEvent, final Blockchain blockchain) { - // Check to see if any of our pending blocks are now ready for import - final Block newBlock = blockAddedEvent.getBlock(); - final List readyForImport; - synchronized (pendingBlocks) { - // Remove block from pendingBlocks list - pendingBlocks.deregisterPendingBlock(newBlock); - - // Import any pending blocks that are children of the newly added block - readyForImport = pendingBlocks.childrenOf(newBlock.getHash()); + public void start() { + if (started.compareAndSet(false, true)) { + setupListeners(); + } else { + throw new IllegalStateException( + "Attempt to start an already started " + this.getClass().getSimpleName() + "."); + } } - if (!readyForImport.isEmpty()) { - final Supplier>> importBlocksTask = - PersistBlockTask.forUnorderedBlocks( - protocolSchedule, - protocolContext, - readyForImport, - HeaderValidationMode.FULL, - ethTasksTimer); - ethContext - .getScheduler() - .scheduleSyncWorkerTask(importBlocksTask) - .whenComplete( - (r, t) -> { - if (r != null) { - LOG.info("Imported {} pending blocks", r.size()); - } - }); + private void setupListeners() { + protocolContext.getBlockchain().observeBlockAdded(this::onBlockAdded); + ethContext.getEthMessages().subscribe(EthPV62.NEW_BLOCK, this::handleNewBlockFromNetwork); + ethContext + .getEthMessages() + .subscribe(EthPV62.NEW_BLOCK_HASHES, this::handleNewBlockHashesFromNetwork); } - if (blockAddedEvent.getEventType().equals(EventType.HEAD_ADVANCED)) { - final long head = blockchain.getChainHeadBlockNumber(); - final long cutoff = head + config.blockPropagationRange().lowerEndpoint(); - pendingBlocks.purgeBlocksOlderThan(cutoff); - } - } - - void handleNewBlockFromNetwork(final EthMessage message) { - final Blockchain blockchain = protocolContext.getBlockchain(); - final NewBlockMessage newBlockMessage = NewBlockMessage.readFrom(message.getData()); - try { - final Block block = newBlockMessage.block(protocolSchedule); - final UInt256 totalDifficulty = newBlockMessage.totalDifficulty(protocolSchedule); - - message.getPeer().chainState().updateForAnnouncedBlock(block.getHeader(), totalDifficulty); - - // Return early if we don't care about this block - final long localChainHeight = protocolContext.getBlockchain().getChainHeadBlockNumber(); - final long bestChainHeight = syncState.bestChainHeight(localChainHeight); - if (!shouldImportBlockAtHeight( - block.getHeader().getNumber(), localChainHeight, bestChainHeight)) { - return; - } - if (pendingBlocks.contains(block.getHash())) { - return; - } - if (blockchain.contains(block.getHash())) { - return; - } - - importOrSavePendingBlock(block); - } catch (final RLPException e) { - message.getPeer().disconnect(DisconnectReason.BREACH_OF_PROTOCOL); - } - } - - private void handleNewBlockHashesFromNetwork(final EthMessage message) { - final Blockchain blockchain = protocolContext.getBlockchain(); - final NewBlockHashesMessage newBlockHashesMessage = - NewBlockHashesMessage.readFrom(message.getData()); - try { - // Register announced blocks - final List announcedBlocks = - Lists.newArrayList(newBlockHashesMessage.getNewHashes()); - for (final NewBlockHash announcedBlock : announcedBlocks) { - message.getPeer().registerKnownBlock(announcedBlock.hash()); - message.getPeer().registerHeight(announcedBlock.hash(), announcedBlock.number()); - } - - // Filter announced blocks for blocks we care to import - final long localChainHeight = protocolContext.getBlockchain().getChainHeadBlockNumber(); - final long bestChainHeight = syncState.bestChainHeight(localChainHeight); - final List relevantAnnouncements = - announcedBlocks.stream() - .filter(a -> shouldImportBlockAtHeight(a.number(), localChainHeight, bestChainHeight)) - .collect(Collectors.toList()); - - // Filter for blocks we don't yet know about - final List newBlocks = new ArrayList<>(); - for (final NewBlockHash announcedBlock : relevantAnnouncements) { - if (requestedBlocks.contains(announcedBlock.hash())) { - continue; + private void validateAndBroadcastBlock(final Block block) { + final ProtocolSpec protocolSpec = protocolSchedule.getByBlockNumber(block.getHeader().getNumber()); + final BlockHeaderValidator blockHeaderValidator = protocolSpec.getBlockHeaderValidator(); + final BlockHeader parent = protocolContext.getBlockchain() + .getBlockHeader(block.getHeader().getParentHash()) + .orElseThrow( + () -> + new IllegalArgumentException( + "Incapable of retrieving header from non-existent parent of " + + block.getHeader().getNumber() + + ".")); + if (blockHeaderValidator.validateHeader( + block.getHeader(), parent, protocolContext, HeaderValidationMode.FULL)) { + final UInt256 totalDifficulty = + protocolContext + .getBlockchain() + .getTotalDifficultyByHash(parent.getHash()) + .get() + .plus(block.getHeader().getDifficulty()); + blockBroadcaster.propagate(block, totalDifficulty); } - if (pendingBlocks.contains(announcedBlock.hash())) { - continue; + } + + private void onBlockAdded(final BlockAddedEvent blockAddedEvent, final Blockchain blockchain) { + // Check to see if any of our pending blocks are now ready for import + final Block newBlock = blockAddedEvent.getBlock(); + + final List readyForImport; + synchronized (pendingBlocks) { + // Remove block from pendingBlocks list + pendingBlocks.deregisterPendingBlock(newBlock); + + // Import any pending blocks that are children of the newly added block + readyForImport = pendingBlocks.childrenOf(newBlock.getHash()); } - if (importingBlocks.contains(announcedBlock.hash())) { - continue; + + if (!readyForImport.isEmpty()) { + final Supplier>> importBlocksTask = + PersistBlockTask.forUnorderedBlocks( + protocolSchedule, + protocolContext, + readyForImport, + HeaderValidationMode.FULL, + ethTasksTimer); + ethContext + .getScheduler() + .scheduleSyncWorkerTask(importBlocksTask) + .whenComplete( + (r, t) -> { + if (r != null) { + LOG.info("Imported {} pending blocks", r.size()); + } + }); } - if (blockchain.contains(announcedBlock.hash())) { - continue; + + if (blockAddedEvent.getEventType().equals(EventType.HEAD_ADVANCED)) { + final long head = blockchain.getChainHeadBlockNumber(); + final long cutoff = head + config.blockPropagationRange().lowerEndpoint(); + pendingBlocks.purgeBlocksOlderThan(cutoff); } - if (requestedBlocks.add(announcedBlock.hash())) { - newBlocks.add(announcedBlock); + } + + void handleNewBlockFromNetwork(final EthMessage message) { + final Blockchain blockchain = protocolContext.getBlockchain(); + final NewBlockMessage newBlockMessage = NewBlockMessage.readFrom(message.getData()); + try { + final Block block = newBlockMessage.block(protocolSchedule); + final UInt256 totalDifficulty = newBlockMessage.totalDifficulty(protocolSchedule); + + message.getPeer().chainState().updateForAnnouncedBlock(block.getHeader(), totalDifficulty); + + // Return early if we don't care about this block + final long localChainHeight = protocolContext.getBlockchain().getChainHeadBlockNumber(); + final long bestChainHeight = syncState.bestChainHeight(localChainHeight); + if (!shouldImportBlockAtHeight( + block.getHeader().getNumber(), localChainHeight, bestChainHeight)) { + return; + } + if (pendingBlocks.contains(block.getHash())) { + return; + } + if (blockchain.contains(block.getHash())) { + return; + } + + importOrSavePendingBlock(block); + } catch (final RLPException e) { + message.getPeer().disconnect(DisconnectReason.BREACH_OF_PROTOCOL); } - } - - // Process known blocks we care about - for (final NewBlockHash newBlock : newBlocks) { - processAnnouncedBlock(message.getPeer(), newBlock) - .whenComplete((r, t) -> requestedBlocks.remove(newBlock.hash())); - } - } catch (final RLPException e) { - message.getPeer().disconnect(DisconnectReason.BREACH_OF_PROTOCOL); } - } - - private CompletableFuture processAnnouncedBlock( - final EthPeer peer, final NewBlockHash newBlock) { - final AbstractPeerTask getBlockTask = - GetBlockFromPeerTask.create(protocolSchedule, ethContext, newBlock.hash(), ethTasksTimer) - .assignPeer(peer); - - return getBlockTask.run().thenCompose((r) -> importOrSavePendingBlock(r.getResult())); - } - - @VisibleForTesting - CompletableFuture importOrSavePendingBlock(final Block block) { - // Synchronize to avoid race condition where block import event fires after the - // blockchain.contains() check and before the block is registered, causing onBlockAdded() to be - // invoked for the parent of this block before we are able to register it. - synchronized (pendingBlocks) { - if (!protocolContext.getBlockchain().contains(block.getHeader().getParentHash())) { - // Block isn't connected to local chain, save it to pending blocks collection - if (pendingBlocks.registerPendingBlock(block)) { - LOG.info( - "Saving announced block {} ({}) for future import", - block.getHeader().getNumber(), - block.getHash()); + + private void handleNewBlockHashesFromNetwork(final EthMessage message) { + final Blockchain blockchain = protocolContext.getBlockchain(); + final NewBlockHashesMessage newBlockHashesMessage = + NewBlockHashesMessage.readFrom(message.getData()); + try { + // Register announced blocks + final List announcedBlocks = + Lists.newArrayList(newBlockHashesMessage.getNewHashes()); + for (final NewBlockHash announcedBlock : announcedBlocks) { + message.getPeer().registerKnownBlock(announcedBlock.hash()); + message.getPeer().registerHeight(announcedBlock.hash(), announcedBlock.number()); + } + + // Filter announced blocks for blocks we care to import + final long localChainHeight = protocolContext.getBlockchain().getChainHeadBlockNumber(); + final long bestChainHeight = syncState.bestChainHeight(localChainHeight); + final List relevantAnnouncements = + announcedBlocks.stream() + .filter(a -> shouldImportBlockAtHeight(a.number(), localChainHeight, bestChainHeight)) + .collect(Collectors.toList()); + + // Filter for blocks we don't yet know about + final List newBlocks = new ArrayList<>(); + for (final NewBlockHash announcedBlock : relevantAnnouncements) { + if (requestedBlocks.contains(announcedBlock.hash())) { + continue; + } + if (pendingBlocks.contains(announcedBlock.hash())) { + continue; + } + if (importingBlocks.contains(announcedBlock.hash())) { + continue; + } + if (blockchain.contains(announcedBlock.hash())) { + continue; + } + if (requestedBlocks.add(announcedBlock.hash())) { + newBlocks.add(announcedBlock); + } + } + + // Process known blocks we care about + for (final NewBlockHash newBlock : newBlocks) { + processAnnouncedBlock(message.getPeer(), newBlock) + .whenComplete((r, t) -> requestedBlocks.remove(newBlock.hash())); + } + } catch (final RLPException e) { + message.getPeer().disconnect(DisconnectReason.BREACH_OF_PROTOCOL); } - return CompletableFuture.completedFuture(block); - } } - if (!importingBlocks.add(block.getHash())) { - // We're already importing this block. - return CompletableFuture.completedFuture(block); + private CompletableFuture processAnnouncedBlock( + final EthPeer peer, final NewBlockHash newBlock) { + final AbstractPeerTask getBlockTask = + GetBlockFromPeerTask.create(protocolSchedule, ethContext, newBlock.hash(), ethTasksTimer) + .assignPeer(peer); + + return getBlockTask.run().thenCompose((r) -> importOrSavePendingBlock(r.getResult())); } - if (protocolContext.getBlockchain().contains(block.getHash())) { - // We've already imported this block. - importingBlocks.remove(block.getHash()); - return CompletableFuture.completedFuture(block); + @VisibleForTesting + CompletableFuture importOrSavePendingBlock(final Block block) { + // Synchronize to avoid race condition where block import event fires after the + // blockchain.contains() check and before the block is registered, causing onBlockAdded() to be + // invoked for the parent of this block before we are able to register it. + synchronized (pendingBlocks) { + if (!protocolContext.getBlockchain().contains(block.getHeader().getParentHash())) { + // Block isn't connected to local chain, save it to pending blocks collection + if (pendingBlocks.registerPendingBlock(block)) { + LOG.info( + "Saving announced block {} ({}) for future import", + block.getHeader().getNumber(), + block.getHash()); + } + return CompletableFuture.completedFuture(block); + } + } + + if (!importingBlocks.add(block.getHash())) { + // We're already importing this block. + return CompletableFuture.completedFuture(block); + } + + if (protocolContext.getBlockchain().contains(block.getHash())) { + // We've already imported this block. + importingBlocks.remove(block.getHash()); + return CompletableFuture.completedFuture(block); + } + + ethContext.getScheduler().scheduleSyncWorkerTask(() -> validateAndBroadcastBlock(block)); + + // Import block + final PersistBlockTask importTask = PersistBlockTask.create(protocolSchedule, protocolContext, block, HeaderValidationMode.NONE, ethTasksTimer); + return ethContext + .getScheduler() + .scheduleSyncWorkerTask(() -> { + + final ProtocolSpec protocolSpec = protocolSchedule.getByBlockNumber(block.getHeader().getNumber()); + final BlockHeaderValidator blockHeaderValidator = protocolSpec.getBlockHeaderValidator(); + final BlockHeader parent = protocolContext.getBlockchain().getBlockHeader(block.getHeader().getParentHash()).orElseThrow(() -> new IllegalArgumentException("Incapable of retrieving header from non-existent parent of " + block.getHeader().getNumber() + ".")); + + if (blockHeaderValidator.validateHeader(block.getHeader(), parent, protocolContext, HeaderValidationMode.FULL)) { + final UInt256 totalDifficulty = protocolContext.getBlockchain().getTotalDifficultyByHash(parent.getHash()).get().plus(block.getHeader().getDifficulty()); + blockBroadcaster.propagate(block, totalDifficulty); + + importTask.run(); + } + }).whenComplete( + (r, t) -> { + importingBlocks.remove(block.getHash()); + if (t != null) { + LOG.warn( + "Failed to import announced block {} ({}).", + block.getHeader().getNumber(), + block.getHash()); + } else { + final double timeInMs = importTask.getTaskTimeInSec() * 1000; + LOG.info( + String.format( + "Successfully imported announced block %d (%s) in %01.3fms.", + block.getHeader().getNumber(), block.getHash(), timeInMs)); + } + }); } - ethContext.getScheduler().scheduleSyncWorkerTask(() -> validateAndBroadcastBlock(block)); - - // Import block - final PersistBlockTask importTask = - PersistBlockTask.create( - protocolSchedule, protocolContext, block, HeaderValidationMode.FULL, ethTasksTimer); - return ethContext - .getScheduler() - .scheduleSyncWorkerTask(importTask::run) - .whenComplete( - (r, t) -> { - importingBlocks.remove(block.getHash()); - if (t != null) { - LOG.warn( - "Failed to import announced block {} ({}).", - block.getHeader().getNumber(), - block.getHash()); - } else { - final double timeInMs = importTask.getTaskTimeInSec() * 1000; - LOG.info( - String.format( - "Successfully imported announced block %d (%s) in %01.3fms.", - block.getHeader().getNumber(), block.getHash(), timeInMs)); - } - }); - } - - // Only import blocks within a certain range of our head and sync target - private boolean shouldImportBlockAtHeight( - final long blockNumber, final long localHeight, final long bestChainHeight) { - final long distanceFromLocalHead = blockNumber - localHeight; - final long distanceFromBestPeer = blockNumber - bestChainHeight; - final Range importRange = config.blockPropagationRange(); - return importRange.contains(distanceFromLocalHead) - && importRange.contains(distanceFromBestPeer); - } + // Only import blocks within a certain range of our head and sync target + private boolean shouldImportBlockAtHeight( + final long blockNumber, final long localHeight, final long bestChainHeight) { + final long distanceFromLocalHead = blockNumber - localHeight; + final long distanceFromBestPeer = blockNumber - bestChainHeight; + final Range importRange = config.blockPropagationRange(); + return importRange.contains(distanceFromLocalHead) + && importRange.contains(distanceFromBestPeer); + } }