From 6126116e7f3a1bcf4a2747a302caafbe34e7fc83 Mon Sep 17 00:00:00 2001 From: shemnon Date: Thu, 7 Feb 2019 20:31:15 -0700 Subject: [PATCH] review updates. --- .../eth/manager/AbstractPipelinedPeerTask.java | 12 ++++-------- .../sync/tasks/ParallelDownloadHeadersTask.java | 3 +-- .../sync/tasks/ParallelImportChainSegmentTask.java | 14 +++++++------- .../tasks/ParallelValidateAndImportBodiesTask.java | 2 +- .../sync/fastsync/FastSyncChainDownloaderTest.java | 6 ++++-- 5 files changed, 17 insertions(+), 20 deletions(-) diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/AbstractPipelinedPeerTask.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/AbstractPipelinedPeerTask.java index 15d6c90caf..805ae26475 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/AbstractPipelinedPeerTask.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/AbstractPipelinedPeerTask.java @@ -35,7 +35,7 @@ public abstract class AbstractPipelinedPeerTask extends AbstractPeerTask outboundQueue; private List results; - private boolean lameDuckMode = false; + private boolean shuttingDown = false; private AtomicReference processingException = new AtomicReference<>(null); protected AbstractPipelinedPeerTask( @@ -53,7 +53,7 @@ protected AbstractPipelinedPeerTask( protected void executeTaskWithPeer(final EthPeer peer) { Optional previousInput = Optional.empty(); while (!isDone() && processingException.get() == null) { - if (lameDuckMode && inboundQueue.isEmpty()) { + if (shuttingDown && inboundQueue.isEmpty()) { break; } final I input; @@ -90,12 +90,8 @@ public BlockingQueue getOutboundQueue() { return outboundQueue; } - public boolean isLameDuckMode() { - return lameDuckMode; - } - - public void setLameDuckMode(final boolean lameDuckMode) { - this.lameDuckMode = lameDuckMode; + public void shutdown() { + this.shuttingDown = true; } protected void failExceptionally(final Throwable t) { diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/ParallelDownloadHeadersTask.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/ParallelDownloadHeadersTask.java index cc97d34f7e..9762646ddc 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/ParallelDownloadHeadersTask.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/ParallelDownloadHeadersTask.java @@ -61,7 +61,6 @@ protected Optional> processStep( } final int segmentLength = (int) (nextCheckpointHeader.getNumber() - previousCheckpointHeader.get().getNumber()) - 1; - // body LOG.trace( "Requesting download of {} blocks ending at {}", segmentLength, @@ -84,7 +83,7 @@ protected Optional> processStep( result.get().completeExceptionally(e); return Optional.empty(); } - headers.add(nextCheckpointHeader); + // FIXME headers.add(nextCheckpointHeader); if (headers.size() > 2) { LOG.debug( "Downloaded headers {} to {}", diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/ParallelImportChainSegmentTask.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/ParallelImportChainSegmentTask.java index f29e52568c..f9f5ff0377 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/ParallelImportChainSegmentTask.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/ParallelImportChainSegmentTask.java @@ -75,7 +75,6 @@ private ParallelImportChainSegmentTask( } this.checkpointHeaders = new ArrayBlockingQueue<>(checkpointHeaders.size(), false, checkpointHeaders); - // this.chunksInTotal = checkpointHeaders.size() - 1; this.blockHandler = blockHandler; this.validationPolicy = validationPolicy; } @@ -150,10 +149,10 @@ protected void executeTask() { scheduler.scheduleServiceTask(validateAndImportBodiesTask); // Hook in pipeline completion signaling. - downloadHeadersTask.setLameDuckMode(true); - downloadHeaderFuture.thenRun(() -> validateHeadersTask.setLameDuckMode(true)); - validateHeaderFuture.thenRun(() -> downloadBodiesTask.setLameDuckMode(true)); - downloadBodiesFuture.thenRun(() -> validateAndImportBodiesTask.setLameDuckMode(true)); + downloadHeadersTask.shutdown(); + downloadHeaderFuture.thenRun(() -> validateHeadersTask.shutdown()); + validateHeaderFuture.thenRun(() -> downloadBodiesTask.shutdown()); + downloadBodiesFuture.thenRun(() -> validateAndImportBodiesTask.shutdown()); final BiConsumer cancelOnException = (s, e) -> { @@ -171,8 +170,9 @@ protected void executeTask() { downloadBodiesFuture.whenComplete(cancelOnException); validateBodiesFuture.whenComplete( (r, e) -> { - cancelOnException.accept(r, e); - if (r != null) { + if (e != null) { + cancelOnException.accept(null, e); + } else if (r != null) { try { final List importedBlocks = validateBodiesFuture.get().getResult().stream() diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/ParallelValidateAndImportBodiesTask.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/ParallelValidateAndImportBodiesTask.java index f79cd46f18..ccc79be3c0 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/ParallelValidateAndImportBodiesTask.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/ParallelValidateAndImportBodiesTask.java @@ -47,7 +47,7 @@ public class ParallelValidateAndImportBodiesTask @Override protected Optional> processStep( - final List blocks, final Optional> previousHeaders, final EthPeer peer) { + final List blocks, final Optional> previousBlocks, final EthPeer peer) { final long firstBlock = blockHandler.extractBlockNumber(blocks.get(0)); final long lastBlock = blockHandler.extractBlockNumber(blocks.get(blocks.size() - 1)); LOG.debug("Starting import of chain segment {} to {}", firstBlock, lastBlock); diff --git a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/fastsync/FastSyncChainDownloaderTest.java b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/fastsync/FastSyncChainDownloaderTest.java index ceeb356c65..0b2b93c615 100644 --- a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/fastsync/FastSyncChainDownloaderTest.java +++ b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/fastsync/FastSyncChainDownloaderTest.java @@ -128,7 +128,8 @@ public void shouldSyncToPivotBlockInSingleSegment() { final FastSyncChainDownloader downloader = downloader(syncConfig, pivotBlockNumber); final CompletableFuture result = downloader.start(); - peer.respondWhileOtherThreadsWork(responder, () -> !result.isDone());; + peer.respondWhileOtherThreadsWork(responder, () -> !result.isDone()); + ; assertThat(result).isCompleted(); assertThat(localBlockchain.getChainHeadBlockNumber()).isEqualTo(pivotBlockNumber); @@ -171,7 +172,8 @@ public void recoversFromSyncTargetDisconnect() { ethProtocolManager.handleDisconnect(bestPeer.getPeerConnection(), TOO_MANY_PEERS, true); - secondBestPeer.respondWhileOtherThreadsWork(shorterResponder, () -> !result.isDone());; + secondBestPeer.respondWhileOtherThreadsWork(shorterResponder, () -> !result.isDone()); + ; assertThat(result).isCompleted(); assertThat(localBlockchain.getChainHeadBlockNumber()).isEqualTo(pivotBlockNumber);