diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/BlockPropagationManager.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/BlockPropagationManager.java index 9dbd297cc9..47c52cfae8 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/BlockPropagationManager.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/BlockPropagationManager.java @@ -109,32 +109,6 @@ private void setupListeners() { .subscribe(EthPV62.NEW_BLOCK_HASHES, this::handleNewBlockHashesFromNetwork); } - private void validateAndBroadcastBlock(final Block block) { - final ProtocolSpec protocolSpec = - protocolSchedule.getByBlockNumber(block.getHeader().getNumber()); - final BlockHeaderValidator blockHeaderValidator = protocolSpec.getBlockHeaderValidator(); - final BlockHeader parent = - protocolContext - .getBlockchain() - .getBlockHeader(block.getHeader().getParentHash()) - .orElseThrow( - () -> - new IllegalArgumentException( - "Incapable of retrieving header from non-existent parent of " - + block.getHeader().getNumber() - + ".")); - if (blockHeaderValidator.validateHeader( - block.getHeader(), parent, protocolContext, HeaderValidationMode.FULL)) { - final UInt256 totalDifficulty = - protocolContext - .getBlockchain() - .getTotalDifficultyByHash(parent.getHash()) - .get() - .plus(block.getHeader().getDifficulty()); - blockBroadcaster.propagate(block, totalDifficulty); - } - } - private void onBlockAdded(final BlockAddedEvent blockAddedEvent, final Blockchain blockchain) { // Check to see if any of our pending blocks are now ready for import final Block newBlock = blockAddedEvent.getBlock(); @@ -263,6 +237,16 @@ private CompletableFuture processAnnouncedBlock( return getBlockTask.run().thenCompose((r) -> importOrSavePendingBlock(r.getResult())); } + private void broadcastBlock(final Block block, final BlockHeader parent) { + final UInt256 totalDifficulty = + protocolContext + .getBlockchain() + .getTotalDifficultyByHash(parent.getHash()) + .get() + .plus(block.getHeader().getDifficulty()); + blockBroadcaster.propagate(block, totalDifficulty); + } + @VisibleForTesting CompletableFuture importOrSavePendingBlock(final Block block) { // Synchronize to avoid race condition where block import event fires after the @@ -292,19 +276,54 @@ CompletableFuture importOrSavePendingBlock(final Block block) { return CompletableFuture.completedFuture(block); } - ethContext.getScheduler().scheduleSyncWorkerTask(() -> validateAndBroadcastBlock(block)); + final BlockHeader parent = + protocolContext + .getBlockchain() + .getBlockHeader(block.getHeader().getParentHash()) + .orElseThrow( + () -> + new IllegalArgumentException( + "Incapable of retrieving header from non-existent parent of " + + block.getHeader().getNumber() + + ".")); - // Import block - final PersistBlockTask importTask = - PersistBlockTask.create( - protocolSchedule, protocolContext, block, HeaderValidationMode.FULL, metricsSystem); + final ProtocolSpec protocolSpec = + protocolSchedule.getByBlockNumber(block.getHeader().getNumber()); + final BlockHeaderValidator blockHeaderValidator = protocolSpec.getBlockHeaderValidator(); return ethContext .getScheduler() - .scheduleSyncWorkerTask(importTask::run) + .scheduleSyncWorkerTask( + () -> validateAndProcessPendingBlock(blockHeaderValidator, block, parent)); + } + + private CompletableFuture validateAndProcessPendingBlock( + final BlockHeaderValidator blockHeaderValidator, + final Block block, + final BlockHeader parent) { + if (blockHeaderValidator.validateHeader( + block.getHeader(), parent, protocolContext, HeaderValidationMode.FULL)) { + ethContext.getScheduler().scheduleSyncWorkerTask(() -> broadcastBlock(block, parent)); + return runImportTask(block); + } else { + importingBlocks.remove(block.getHash()); + LOG.warn( + "Failed to import announced block {} ({}).", + block.getHeader().getNumber(), + block.getHash()); + return CompletableFuture.completedFuture(block); + } + } + + private CompletableFuture runImportTask(final Block block) { + final PersistBlockTask importTask = + PersistBlockTask.create( + protocolSchedule, protocolContext, block, HeaderValidationMode.NONE, metricsSystem); + return importTask + .run() .whenComplete( - (r, t) -> { + (result, throwable) -> { importingBlocks.remove(block.getHash()); - if (t != null) { + if (throwable != null) { LOG.warn( "Failed to import announced block {} ({}).", block.getHeader().getNumber(),