diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/AbstractEthTask.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/AbstractEthTask.java index 23506e834c..bd5c0ca6bc 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/AbstractEthTask.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/AbstractEthTask.java @@ -20,6 +20,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Supplier; @@ -52,6 +53,20 @@ public final CompletableFuture run() { return result.get(); } + @Override + public final CompletableFuture runAsync(final ExecutorService executor) { + if (result.compareAndSet(null, new CompletableFuture<>())) { + executor.submit(this::executeTaskTimed); + result + .get() + .whenComplete( + (r, t) -> { + cleanup(); + }); + } + return result.get(); + } + @Override public final void cancel() { synchronized (result) { diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/AbstractPipelinedPeerTask.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/AbstractPipelinedPeerTask.java new file mode 100644 index 0000000000..805ae26475 --- /dev/null +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/AbstractPipelinedPeerTask.java @@ -0,0 +1,105 @@ +/* + * Copyright 2019 ConsenSys AG. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package tech.pegasys.pantheon.ethereum.eth.manager; + +import tech.pegasys.pantheon.metrics.LabelledMetric; +import tech.pegasys.pantheon.metrics.OperationTimer; + +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +public abstract class AbstractPipelinedPeerTask extends AbstractPeerTask> { + private static final Logger LOG = LogManager.getLogger(); + + static final int TIMEOUT_MS = 1000; + + private BlockingQueue inboundQueue; + private BlockingQueue outboundQueue; + private List results; + + private boolean shuttingDown = false; + private AtomicReference processingException = new AtomicReference<>(null); + + protected AbstractPipelinedPeerTask( + final BlockingQueue inboundQueue, + final int outboundBacklogSize, + final EthContext ethContext, + final LabelledMetric ethTasksTimer) { + super(ethContext, ethTasksTimer); + this.inboundQueue = inboundQueue; + outboundQueue = new LinkedBlockingQueue<>(outboundBacklogSize); + results = new ArrayList<>(); + } + + @Override + protected void executeTaskWithPeer(final EthPeer peer) { + Optional previousInput = Optional.empty(); + while (!isDone() && processingException.get() == null) { + if (shuttingDown && inboundQueue.isEmpty()) { + break; + } + final I input; + try { + input = inboundQueue.poll(TIMEOUT_MS, TimeUnit.MILLISECONDS); + if (input == null) { + // timed out waiting for a result + continue; + } + } catch (final InterruptedException e) { + // this is expected + continue; + } + final Optional output = processStep(input, previousInput, peer); + output.ifPresent( + o -> { + try { + outboundQueue.put(o); + } catch (final InterruptedException e) { + processingException.compareAndSet(null, e); + } + results.add(o); + }); + previousInput = Optional.of(input); + } + if (processingException.get() == null) { + result.get().complete(new PeerTaskResult<>(peer, results)); + } else { + result.get().completeExceptionally(processingException.get()); + } + } + + public BlockingQueue getOutboundQueue() { + return outboundQueue; + } + + public void shutdown() { + this.shuttingDown = true; + } + + protected void failExceptionally(final Throwable t) { + LOG.error("Task Failure", t); + processingException.compareAndSet(null, t); + result.get().completeExceptionally(t); + cancel(); + } + + protected abstract Optional processStep(I input, Optional previousInput, EthPeer peer); +} diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/EthScheduler.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/EthScheduler.java index 18dfb9cdda..95317a11ca 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/EthScheduler.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/EthScheduler.java @@ -47,7 +47,7 @@ public class EthScheduler { private final ExecutorService servicesExecutor; private final ExecutorService computationExecutor; - EthScheduler( + public EthScheduler( final int syncWorkerCount, final int txWorkerCount, final int computationWorkerCount) { this( Executors.newFixedThreadPool( @@ -125,11 +125,11 @@ public void scheduleTxWorkerTask(final Runnable command) { txWorkerExecutor.submit(command); } - CompletableFuture scheduleServiceTask(final Runnable service) { - return CompletableFuture.runAsync(service, servicesExecutor); + public CompletableFuture scheduleServiceTask(final EthTask task) { + return task.runAsync(servicesExecutor); } - CompletableFuture scheduleComputationTask(final Supplier computation) { + public CompletableFuture scheduleComputationTask(final Supplier computation) { return CompletableFuture.supplyAsync(computation, computationExecutor); } diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/EthTask.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/EthTask.java index 25fbba2d4d..43cee1cd64 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/EthTask.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/EthTask.java @@ -13,10 +13,13 @@ package tech.pegasys.pantheon.ethereum.eth.manager; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; public interface EthTask { CompletableFuture run(); + CompletableFuture runAsync(ExecutorService executor); + void cancel(); } diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/BlockHandler.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/BlockHandler.java new file mode 100644 index 0000000000..b02e2089ea --- /dev/null +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/BlockHandler.java @@ -0,0 +1,26 @@ +/* + * Copyright 2019 ConsenSys AG. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package tech.pegasys.pantheon.ethereum.eth.sync; + +import tech.pegasys.pantheon.ethereum.core.BlockHeader; + +import java.util.List; +import java.util.concurrent.CompletableFuture; + +public interface BlockHandler { + CompletableFuture> downloadBlocks(final List headers); + + CompletableFuture> validateAndImportBlocks(final List blocks); + + long extractBlockNumber(final B block); +} diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/SynchronizerConfiguration.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/SynchronizerConfiguration.java index f8a4a3cc95..5fed6be83e 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/SynchronizerConfiguration.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/SynchronizerConfiguration.java @@ -275,7 +275,7 @@ public static class Builder { private int downloaderHeaderRequestSize = 10; private int downloaderCheckpointTimeoutsPermitted = 5; private int downloaderChainSegmentTimeoutsPermitted = 5; - private int downloaderChainSegmentSize = 20; + private int downloaderChainSegmentSize = 200; private long trailingPeerBlocksBehindThreshold; private int maxTrailingPeers = Integer.MAX_VALUE; private int downloaderParallelism = 2; diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/fastsync/FastSyncBlockHandler.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/fastsync/FastSyncBlockHandler.java index 6e29dacb03..decd2040e3 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/fastsync/FastSyncBlockHandler.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/fastsync/FastSyncBlockHandler.java @@ -20,10 +20,10 @@ import tech.pegasys.pantheon.ethereum.core.BlockImporter; import tech.pegasys.pantheon.ethereum.core.TransactionReceipt; import tech.pegasys.pantheon.ethereum.eth.manager.EthContext; +import tech.pegasys.pantheon.ethereum.eth.sync.BlockHandler; import tech.pegasys.pantheon.ethereum.eth.sync.ValidationPolicy; import tech.pegasys.pantheon.ethereum.eth.sync.tasks.CompleteBlocksTask; import tech.pegasys.pantheon.ethereum.eth.sync.tasks.GetReceiptsForHeadersTask; -import tech.pegasys.pantheon.ethereum.eth.sync.tasks.PipelinedImportChainSegmentTask.BlockHandler; import tech.pegasys.pantheon.ethereum.eth.sync.tasks.exceptions.InvalidBlockException; import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule; import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSpec; @@ -125,4 +125,9 @@ private BlockImporter getBlockImporter(final BlockWithReceipts blockWithRecei return protocolSpec.getBlockImporter(); } + + @Override + public long extractBlockNumber(final BlockWithReceipts blockWithReceipt) { + return blockWithReceipt.getHeader().getNumber(); + } } diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/fastsync/FastSyncChainDownloader.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/fastsync/FastSyncChainDownloader.java index 1e8da58a54..9c4c8d0464 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/fastsync/FastSyncChainDownloader.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/fastsync/FastSyncChainDownloader.java @@ -21,7 +21,7 @@ import tech.pegasys.pantheon.ethereum.eth.sync.ChainDownloader; import tech.pegasys.pantheon.ethereum.eth.sync.SynchronizerConfiguration; import tech.pegasys.pantheon.ethereum.eth.sync.state.SyncState; -import tech.pegasys.pantheon.ethereum.eth.sync.tasks.PipelinedImportChainSegmentTask; +import tech.pegasys.pantheon.ethereum.eth.sync.tasks.ParallelImportChainSegmentTask; import tech.pegasys.pantheon.ethereum.mainnet.HeaderValidationMode; import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule; import tech.pegasys.pantheon.metrics.Counter; @@ -103,8 +103,8 @@ private CompletableFuture> importBlocksForCheckpoints( HeaderValidationMode.DETACHED_ONLY, fastSyncValidationCounter); - final PipelinedImportChainSegmentTask importTask = - PipelinedImportChainSegmentTask.forCheckpoints( + final ParallelImportChainSegmentTask importTask = + ParallelImportChainSegmentTask.forCheckpoints( protocolSchedule, protocolContext, ethContext, diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/fullsync/FullSyncBlockHandler.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/fullsync/FullSyncBlockHandler.java index 8e2f920a5e..5c9571af44 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/fullsync/FullSyncBlockHandler.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/fullsync/FullSyncBlockHandler.java @@ -17,9 +17,9 @@ import tech.pegasys.pantheon.ethereum.core.BlockHeader; import tech.pegasys.pantheon.ethereum.core.Transaction; import tech.pegasys.pantheon.ethereum.eth.manager.EthContext; +import tech.pegasys.pantheon.ethereum.eth.sync.BlockHandler; import tech.pegasys.pantheon.ethereum.eth.sync.tasks.CompleteBlocksTask; import tech.pegasys.pantheon.ethereum.eth.sync.tasks.PersistBlockTask; -import tech.pegasys.pantheon.ethereum.eth.sync.tasks.PipelinedImportChainSegmentTask.BlockHandler; import tech.pegasys.pantheon.ethereum.mainnet.HeaderValidationMode; import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule; import tech.pegasys.pantheon.metrics.LabelledMetric; @@ -72,6 +72,11 @@ public CompletableFuture> downloadBlocks(final List hea .thenCompose(this::extractTransactionSenders); } + @Override + public long extractBlockNumber(final Block block) { + return block.getHeader().getNumber(); + } + private CompletableFuture> extractTransactionSenders(final List blocks) { LOG.debug( "Extracting sender {} to {}", diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/fullsync/FullSyncDownloader.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/fullsync/FullSyncDownloader.java index 0df90f491d..bd4cfffd7e 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/fullsync/FullSyncDownloader.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/fullsync/FullSyncDownloader.java @@ -22,7 +22,7 @@ import tech.pegasys.pantheon.ethereum.eth.sync.SynchronizerConfiguration; import tech.pegasys.pantheon.ethereum.eth.sync.state.SyncState; import tech.pegasys.pantheon.ethereum.eth.sync.tasks.ImportBlocksTask; -import tech.pegasys.pantheon.ethereum.eth.sync.tasks.PipelinedImportChainSegmentTask; +import tech.pegasys.pantheon.ethereum.eth.sync.tasks.ParallelImportChainSegmentTask; import tech.pegasys.pantheon.ethereum.mainnet.HeaderValidationMode; import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule; import tech.pegasys.pantheon.metrics.LabelledMetric; @@ -93,8 +93,8 @@ private CompletableFuture> importBlocksForCheckpoints( ethTasksTimer); importedBlocks = importTask.run().thenApply(PeerTaskResult::getResult); } else { - final PipelinedImportChainSegmentTask importTask = - PipelinedImportChainSegmentTask.forCheckpoints( + final ParallelImportChainSegmentTask importTask = + ParallelImportChainSegmentTask.forCheckpoints( protocolSchedule, protocolContext, ethContext, diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/DownloadHeaderSequenceTask.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/DownloadHeaderSequenceTask.java index 032772ba55..fcad141a26 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/DownloadHeaderSequenceTask.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/DownloadHeaderSequenceTask.java @@ -120,7 +120,7 @@ public static DownloadHeaderSequenceTask endingAtHeader( protected CompletableFuture> executePeerTask( final Optional assignedPeer) { LOG.debug( - "Downloading headers from {} to {}.", startingBlockNumber, referenceHeader.getNumber() - 1); + "Downloading headers from {} to {}.", startingBlockNumber, referenceHeader.getNumber()); final CompletableFuture> task = downloadHeaders(assignedPeer).thenCompose(this::processHeaders); return task.whenComplete( @@ -129,8 +129,8 @@ protected CompletableFuture> executePeerTask( if (lastFilledHeaderIndex == 0) { LOG.debug( "Finished downloading headers from {} to {}.", - startingBlockNumber, - referenceHeader.getNumber() - 1); + headers[0].getNumber(), + headers[segmentLength - 1].getNumber()); result.get().complete(Arrays.asList(headers)); } }); diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/ParallelDownloadBodiesTask.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/ParallelDownloadBodiesTask.java new file mode 100644 index 0000000000..9350c74b73 --- /dev/null +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/ParallelDownloadBodiesTask.java @@ -0,0 +1,69 @@ +/* + * Copyright 2019 ConsenSys AG. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package tech.pegasys.pantheon.ethereum.eth.sync.tasks; + +import tech.pegasys.pantheon.ethereum.core.BlockHeader; +import tech.pegasys.pantheon.ethereum.eth.manager.AbstractPipelinedPeerTask; +import tech.pegasys.pantheon.ethereum.eth.manager.EthContext; +import tech.pegasys.pantheon.ethereum.eth.manager.EthPeer; +import tech.pegasys.pantheon.ethereum.eth.sync.BlockHandler; +import tech.pegasys.pantheon.metrics.LabelledMetric; +import tech.pegasys.pantheon.metrics.OperationTimer; + +import java.util.List; +import java.util.Optional; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutionException; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +public class ParallelDownloadBodiesTask + extends AbstractPipelinedPeerTask, List> { + private static final Logger LOG = LogManager.getLogger(); + + private final BlockHandler blockHandler; + + ParallelDownloadBodiesTask( + final BlockHandler blockHandler, + final BlockingQueue> inboundQueue, + final int outboundBacklogSize, + final EthContext ethContext, + final LabelledMetric ethTasksTimer) { + super(inboundQueue, outboundBacklogSize, ethContext, ethTasksTimer); + + this.blockHandler = blockHandler; + } + + @Override + protected Optional> processStep( + final List headers, + final Optional> previousHeaders, + final EthPeer peer) { + LOG.trace( + "Downloading bodies {} to {}", + headers.get(0).getNumber(), + headers.get(headers.size() - 1).getNumber()); + try { + final List blocks = blockHandler.downloadBlocks(headers).get(); + LOG.debug( + "Downloaded bodies {} to {}", + headers.get(0).getNumber(), + headers.get(headers.size() - 1).getNumber()); + return Optional.of(blocks); + } catch (final InterruptedException | ExecutionException e) { + failExceptionally(e); + return Optional.empty(); + } + } +} diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/ParallelDownloadHeadersTask.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/ParallelDownloadHeadersTask.java new file mode 100644 index 0000000000..c134a54d6e --- /dev/null +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/ParallelDownloadHeadersTask.java @@ -0,0 +1,95 @@ +/* + * Copyright 2019 ConsenSys AG. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package tech.pegasys.pantheon.ethereum.eth.sync.tasks; + +import tech.pegasys.pantheon.ethereum.ProtocolContext; +import tech.pegasys.pantheon.ethereum.core.BlockHeader; +import tech.pegasys.pantheon.ethereum.eth.manager.AbstractPipelinedPeerTask; +import tech.pegasys.pantheon.ethereum.eth.manager.EthContext; +import tech.pegasys.pantheon.ethereum.eth.manager.EthPeer; +import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule; +import tech.pegasys.pantheon.metrics.LabelledMetric; +import tech.pegasys.pantheon.metrics.OperationTimer; + +import java.util.List; +import java.util.Optional; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; + +import com.google.common.collect.Lists; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +public class ParallelDownloadHeadersTask + extends AbstractPipelinedPeerTask> { + private static final Logger LOG = LogManager.getLogger(); + + private final ProtocolSchedule protocolSchedule; + private final ProtocolContext protocolContext; + + ParallelDownloadHeadersTask( + final BlockingQueue inboundQueue, + final int outboundBacklogSize, + final ProtocolSchedule protocolSchedule, + final ProtocolContext protocolContext, + final EthContext ethContext, + final LabelledMetric ethTasksTimer) { + super(inboundQueue, outboundBacklogSize, ethContext, ethTasksTimer); + + this.protocolSchedule = protocolSchedule; + this.protocolContext = protocolContext; + } + + @Override + protected Optional> processStep( + final BlockHeader nextCheckpointHeader, + final Optional previousCheckpointHeader, + final EthPeer peer) { + if (!previousCheckpointHeader.isPresent()) { + return Optional.empty(); + } + final int segmentLength = + (int) (nextCheckpointHeader.getNumber() - previousCheckpointHeader.get().getNumber()) - 1; + LOG.trace( + "Requesting download of {} blocks ending at {}", + segmentLength, + nextCheckpointHeader.getHash()); + final DownloadHeaderSequenceTask downloadTask = + DownloadHeaderSequenceTask.endingAtHeader( + protocolSchedule, + protocolContext, + ethContext, + nextCheckpointHeader, + segmentLength, + ethTasksTimer); + downloadTask.assignPeer(peer); + final CompletableFuture> headerFuture = executeSubTask(downloadTask::run); + + final List headers = Lists.newArrayList(previousCheckpointHeader.get()); + try { + headers.addAll(headerFuture.get()); + } catch (final InterruptedException | ExecutionException e) { + result.get().completeExceptionally(e); + return Optional.empty(); + } + headers.add(nextCheckpointHeader); + if (headers.size() > 2) { + LOG.debug( + "Downloaded headers {} to {}", + headers.get(1).getNumber(), + headers.get(headers.size() - 1).getNumber()); + } + return Optional.of(headers); + } +} diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/ParallelImportChainSegmentTask.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/ParallelImportChainSegmentTask.java new file mode 100644 index 0000000000..f9f5ff0377 --- /dev/null +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/ParallelImportChainSegmentTask.java @@ -0,0 +1,192 @@ +/* + * Copyright 2018 ConsenSys AG. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package tech.pegasys.pantheon.ethereum.eth.sync.tasks; + +import tech.pegasys.pantheon.ethereum.ProtocolContext; +import tech.pegasys.pantheon.ethereum.core.BlockHeader; +import tech.pegasys.pantheon.ethereum.eth.manager.AbstractEthTask; +import tech.pegasys.pantheon.ethereum.eth.manager.AbstractPeerTask; +import tech.pegasys.pantheon.ethereum.eth.manager.EthContext; +import tech.pegasys.pantheon.ethereum.eth.manager.EthScheduler; +import tech.pegasys.pantheon.ethereum.eth.sync.BlockHandler; +import tech.pegasys.pantheon.ethereum.eth.sync.ValidationPolicy; +import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule; +import tech.pegasys.pantheon.metrics.LabelledMetric; +import tech.pegasys.pantheon.metrics.OperationTimer; + +import java.util.Collection; +import java.util.List; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.CancellationException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.function.BiConsumer; +import java.util.stream.Collectors; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +public class ParallelImportChainSegmentTask extends AbstractEthTask> { + private static final Logger LOG = LogManager.getLogger(); + + private final EthContext ethContext; + private final ProtocolSchedule protocolSchedule; + private final ProtocolContext protocolContext; + + private final ArrayBlockingQueue checkpointHeaders; + private final int maxActiveChunks; + private final long firstHeaderNumber; + private final long lastHeaderNumber; + + private final BlockHandler blockHandler; + private final ValidationPolicy validationPolicy; + + private ParallelImportChainSegmentTask( + final ProtocolSchedule protocolSchedule, + final ProtocolContext protocolContext, + final EthContext ethContext, + final int maxActiveChunks, + final List checkpointHeaders, + final LabelledMetric ethTasksTimer, + final BlockHandler blockHandler, + final ValidationPolicy validationPolicy) { + super(ethTasksTimer); + this.ethContext = ethContext; + this.protocolSchedule = protocolSchedule; + this.protocolContext = protocolContext; + this.maxActiveChunks = maxActiveChunks; + + if (checkpointHeaders.size() > 1) { + this.firstHeaderNumber = checkpointHeaders.get(0).getNumber(); + this.lastHeaderNumber = checkpointHeaders.get(checkpointHeaders.size() - 1).getNumber(); + } else { + this.firstHeaderNumber = -1; + this.lastHeaderNumber = -1; + } + this.checkpointHeaders = + new ArrayBlockingQueue<>(checkpointHeaders.size(), false, checkpointHeaders); + this.blockHandler = blockHandler; + this.validationPolicy = validationPolicy; + } + + public static ParallelImportChainSegmentTask forCheckpoints( + final ProtocolSchedule protocolSchedule, + final ProtocolContext protocolContext, + final EthContext ethContext, + final int maxActiveChunks, + final LabelledMetric ethTasksTimer, + final BlockHandler blockHandler, + final ValidationPolicy validationPolicy, + final List checkpointHeaders) { + return new ParallelImportChainSegmentTask<>( + protocolSchedule, + protocolContext, + ethContext, + maxActiveChunks, + checkpointHeaders, + ethTasksTimer, + blockHandler, + validationPolicy); + } + + @Override + protected void executeTask() { + if (firstHeaderNumber >= 0) { + LOG.debug("Importing chain segment from {} to {}.", firstHeaderNumber, lastHeaderNumber); + + // build pipeline + final ParallelDownloadHeadersTask downloadHeadersTask = + new ParallelDownloadHeadersTask<>( + checkpointHeaders, + maxActiveChunks, + protocolSchedule, + protocolContext, + ethContext, + ethTasksTimer); + final ParallelValidateHeadersTask validateHeadersTask = + new ParallelValidateHeadersTask<>( + validationPolicy, + downloadHeadersTask.getOutboundQueue(), + maxActiveChunks, + protocolSchedule, + protocolContext, + ethContext, + ethTasksTimer); + final ParallelDownloadBodiesTask downloadBodiesTask = + new ParallelDownloadBodiesTask<>( + blockHandler, + validateHeadersTask.getOutboundQueue(), + maxActiveChunks, + ethContext, + ethTasksTimer); + final ParallelValidateAndImportBodiesTask validateAndImportBodiesTask = + new ParallelValidateAndImportBodiesTask<>( + blockHandler, + downloadBodiesTask.getOutboundQueue(), + Integer.MAX_VALUE, + ethContext, + ethTasksTimer); + + // Start the pipeline. + final EthScheduler scheduler = ethContext.getScheduler(); + final CompletableFuture downloadHeaderFuture = + scheduler.scheduleServiceTask(downloadHeadersTask); + final CompletableFuture validateHeaderFuture = + scheduler.scheduleServiceTask(validateHeadersTask); + final CompletableFuture downloadBodiesFuture = + scheduler.scheduleServiceTask(downloadBodiesTask); + final CompletableFuture>>> validateBodiesFuture = + scheduler.scheduleServiceTask(validateAndImportBodiesTask); + + // Hook in pipeline completion signaling. + downloadHeadersTask.shutdown(); + downloadHeaderFuture.thenRun(() -> validateHeadersTask.shutdown()); + validateHeaderFuture.thenRun(() -> downloadBodiesTask.shutdown()); + downloadBodiesFuture.thenRun(() -> validateAndImportBodiesTask.shutdown()); + + final BiConsumer cancelOnException = + (s, e) -> { + if (e != null && !(e instanceof CancellationException)) { + downloadHeadersTask.cancel(); + validateHeadersTask.cancel(); + downloadBodiesTask.cancel(); + validateAndImportBodiesTask.cancel(); + result.get().completeExceptionally(e); + } + }; + + downloadHeaderFuture.whenComplete(cancelOnException); + validateHeaderFuture.whenComplete(cancelOnException); + downloadBodiesFuture.whenComplete(cancelOnException); + validateBodiesFuture.whenComplete( + (r, e) -> { + if (e != null) { + cancelOnException.accept(null, e); + } else if (r != null) { + try { + final List importedBlocks = + validateBodiesFuture.get().getResult().stream() + .flatMap(Collection::stream) + .collect(Collectors.toList()); + result.get().complete(importedBlocks); + } catch (final InterruptedException | ExecutionException ex) { + result.get().completeExceptionally(ex); + } + } + }); + + } else { + LOG.warn("Import task requested with no checkpoint headers."); + } + } +} diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/ParallelValidateAndImportBodiesTask.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/ParallelValidateAndImportBodiesTask.java new file mode 100644 index 0000000000..ccc79be3c0 --- /dev/null +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/ParallelValidateAndImportBodiesTask.java @@ -0,0 +1,65 @@ +/* + * Copyright 2019 ConsenSys AG. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package tech.pegasys.pantheon.ethereum.eth.sync.tasks; + +import tech.pegasys.pantheon.ethereum.eth.manager.AbstractPipelinedPeerTask; +import tech.pegasys.pantheon.ethereum.eth.manager.EthContext; +import tech.pegasys.pantheon.ethereum.eth.manager.EthPeer; +import tech.pegasys.pantheon.ethereum.eth.sync.BlockHandler; +import tech.pegasys.pantheon.metrics.LabelledMetric; +import tech.pegasys.pantheon.metrics.OperationTimer; + +import java.util.List; +import java.util.Optional; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +public class ParallelValidateAndImportBodiesTask + extends AbstractPipelinedPeerTask, List> { + private static final Logger LOG = LogManager.getLogger(); + + private final BlockHandler blockHandler; + + ParallelValidateAndImportBodiesTask( + final BlockHandler blockHandler, + final BlockingQueue> inboundQueue, + final int outboundBacklogSize, + final EthContext ethContext, + final LabelledMetric ethTasksTimer) { + super(inboundQueue, outboundBacklogSize, ethContext, ethTasksTimer); + + this.blockHandler = blockHandler; + } + + @Override + protected Optional> processStep( + final List blocks, final Optional> previousBlocks, final EthPeer peer) { + final long firstBlock = blockHandler.extractBlockNumber(blocks.get(0)); + final long lastBlock = blockHandler.extractBlockNumber(blocks.get(blocks.size() - 1)); + LOG.debug("Starting import of chain segment {} to {}", firstBlock, lastBlock); + final CompletableFuture> importedBlocksFuture = + blockHandler.validateAndImportBlocks(blocks); + try { + final List downloadedBlocks = importedBlocksFuture.get(); + LOG.info("Completed importing chain segment {} to {}", firstBlock, lastBlock); + return Optional.of(downloadedBlocks); + } catch (final InterruptedException | ExecutionException e) { + failExceptionally(e); + return Optional.empty(); + } + } +} diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/ParallelValidateHeadersTask.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/ParallelValidateHeadersTask.java new file mode 100644 index 0000000000..27d1cc13de --- /dev/null +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/ParallelValidateHeadersTask.java @@ -0,0 +1,97 @@ +/* + * Copyright 2019 ConsenSys AG. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package tech.pegasys.pantheon.ethereum.eth.sync.tasks; + +import tech.pegasys.pantheon.ethereum.ProtocolContext; +import tech.pegasys.pantheon.ethereum.core.BlockHeader; +import tech.pegasys.pantheon.ethereum.eth.manager.AbstractPipelinedPeerTask; +import tech.pegasys.pantheon.ethereum.eth.manager.EthContext; +import tech.pegasys.pantheon.ethereum.eth.manager.EthPeer; +import tech.pegasys.pantheon.ethereum.eth.sync.ValidationPolicy; +import tech.pegasys.pantheon.ethereum.eth.sync.tasks.exceptions.InvalidBlockException; +import tech.pegasys.pantheon.ethereum.mainnet.BlockHeaderValidator; +import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule; +import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSpec; +import tech.pegasys.pantheon.metrics.LabelledMetric; +import tech.pegasys.pantheon.metrics.OperationTimer; + +import java.util.List; +import java.util.Optional; +import java.util.concurrent.BlockingQueue; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +public class ParallelValidateHeadersTask + extends AbstractPipelinedPeerTask, List> { + private static final Logger LOG = LogManager.getLogger(); + + private final ProtocolSchedule protocolSchedule; + private final ProtocolContext protocolContext; + private final ValidationPolicy validationPolicy; + + ParallelValidateHeadersTask( + final ValidationPolicy validationPolicy, + final BlockingQueue> inboundQueue, + final int outboundBacklogSize, + final ProtocolSchedule protocolSchedule, + final ProtocolContext protocolContext, + final EthContext ethContext, + final LabelledMetric ethTasksTimer) { + super(inboundQueue, outboundBacklogSize, ethContext, ethTasksTimer); + + this.protocolSchedule = protocolSchedule; + this.protocolContext = protocolContext; + this.validationPolicy = validationPolicy; + } + + @Override + protected Optional> processStep( + final List headers, + final Optional> previousHeaders, + final EthPeer peer) { + LOG.debug( + "Validating Headers {} to {}", + headers.get(0).getNumber(), + headers.get(headers.size() - 1).getNumber()); + + final BlockHeader parentHeader = headers.get(0); + final BlockHeader childHeader = headers.get(1); + final ProtocolSpec protocolSpec = protocolSchedule.getByBlockNumber(childHeader.getNumber()); + final BlockHeaderValidator blockHeaderValidator = protocolSpec.getBlockHeaderValidator(); + if (blockHeaderValidator.validateHeader( + childHeader, + parentHeader, + protocolContext, + validationPolicy.getValidationModeForNextBlock())) { + LOG.debug( + "Validated Headers {} to {}", + headers.get(0).getNumber(), + headers.get(headers.size() - 1).getNumber()); + // The first header will be imported by the previous request range. + return Optional.of(headers.subList(1, headers.size())); + } else { + LOG.debug( + "Could not validate Headers {} to {}", + headers.get(0).getNumber(), + headers.get(headers.size() - 1).getNumber()); + // ignore the value, we only want the first exception to be there + failExceptionally( + new InvalidBlockException( + "Provided first header does not connect to last header.", + parentHeader.getNumber(), + parentHeader.getHash())); + return Optional.empty(); + } + } +} diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/PipelinedImportChainSegmentTask.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/PipelinedImportChainSegmentTask.java index 0b234cbb1a..e0f6f5b1af 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/PipelinedImportChainSegmentTask.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/PipelinedImportChainSegmentTask.java @@ -16,6 +16,7 @@ import tech.pegasys.pantheon.ethereum.core.BlockHeader; import tech.pegasys.pantheon.ethereum.eth.manager.AbstractEthTask; import tech.pegasys.pantheon.ethereum.eth.manager.EthContext; +import tech.pegasys.pantheon.ethereum.eth.sync.BlockHandler; import tech.pegasys.pantheon.ethereum.eth.sync.ValidationPolicy; import tech.pegasys.pantheon.ethereum.eth.sync.tasks.exceptions.InvalidBlockException; import tech.pegasys.pantheon.ethereum.mainnet.BlockHeaderValidator; @@ -295,10 +296,4 @@ private CompletableFuture> lastValidateAndImportBlocksTasks() { return validateAndImportBlocksTasks.getLast(); } } - - public interface BlockHandler { - CompletableFuture> downloadBlocks(final List headers); - - CompletableFuture> validateAndImportBlocks(final List blocks); - } } diff --git a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/EthProtocolManagerTestUtil.java b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/EthProtocolManagerTestUtil.java index ff523b04f8..6a2680a157 100644 --- a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/EthProtocolManagerTestUtil.java +++ b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/EthProtocolManagerTestUtil.java @@ -34,8 +34,16 @@ public static EthProtocolManager create( final Blockchain blockchain, final WorldStateArchive worldStateArchive, final TimeoutPolicy timeoutPolicy) { + return create( + blockchain, worldStateArchive, timeoutPolicy, new DeterministicEthScheduler(timeoutPolicy)); + } + + public static EthProtocolManager create( + final Blockchain blockchain, + final WorldStateArchive worldStateArchive, + final TimeoutPolicy timeoutPolicy, + final EthScheduler ethScheduler) { final int networkId = 1; - final EthScheduler ethScheduler = new DeterministicEthScheduler(timeoutPolicy); return new EthProtocolManager( blockchain, worldStateArchive, diff --git a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/EthSchedulerShutdownTest.java b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/EthSchedulerShutdownTest.java index 18e264e3db..1defda8129 100644 --- a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/EthSchedulerShutdownTest.java +++ b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/EthSchedulerShutdownTest.java @@ -102,7 +102,7 @@ public void shutdown_txWorkerShutsDown() throws InterruptedException { public void shutdown_servicesShutsDown() throws InterruptedException { final MockEthTask task = new MockEthTask(1); - ethScheduler.scheduleServiceTask(task::executeTask); + ethScheduler.scheduleServiceTask(task); ethScheduler.stop(); assertThat(servicesExecutor.isShutdown()).isTrue(); diff --git a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/RespondingEthPeer.java b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/RespondingEthPeer.java index f78f755bbe..dbefd235d5 100644 --- a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/RespondingEthPeer.java +++ b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/RespondingEthPeer.java @@ -36,15 +36,16 @@ import tech.pegasys.pantheon.util.bytes.BytesValue; import tech.pegasys.pantheon.util.uint.UInt256; -import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Optional; -import java.util.Queue; import java.util.Set; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.TimeUnit; import java.util.stream.Stream; import com.google.common.collect.Lists; @@ -53,7 +54,7 @@ public class RespondingEthPeer { private static final BlockDataGenerator gen = new BlockDataGenerator(); private static final int DEFAULT_ESTIMATED_HEIGHT = 1000; private final EthPeer ethPeer; - private final Queue outgoingMessages; + private final BlockingQueue outgoingMessages; private final EthProtocolManager ethProtocolManager; private final MockPeerConnection peerConnection; @@ -61,7 +62,7 @@ private RespondingEthPeer( final EthProtocolManager ethProtocolManager, final MockPeerConnection peerConnection, final EthPeer ethPeer, - final Queue outgoingMessages) { + final BlockingQueue outgoingMessages) { this.ethProtocolManager = ethProtocolManager; this.peerConnection = peerConnection; this.ethPeer = ethPeer; @@ -112,7 +113,7 @@ public static RespondingEthPeer create( final EthPeers ethPeers = ethProtocolManager.ethContext().getEthPeers(); final Set caps = new HashSet<>(Collections.singletonList(EthProtocol.ETH63)); - final Queue outgoingMessages = new ArrayDeque<>(); + final BlockingQueue outgoingMessages = new ArrayBlockingQueue<>(1000); final MockPeerConnection peerConnection = new MockPeerConnection( caps, (cap, msg, conn) -> outgoingMessages.add(new OutgoingMessage(cap, msg))); @@ -142,6 +143,27 @@ public void respondWhile(final Responder responder, final RespondWhileCondition } } + public void respondWhileOtherThreadsWork( + final Responder responder, final RespondWhileCondition condition) { + int counter = 0; + while (condition.shouldRespond()) { + try { + final OutgoingMessage message = outgoingMessages.poll(1, TimeUnit.SECONDS); + if (message != null) { + respondToMessage(responder, message); + counter++; + if (counter > 10_000) { + // Limit applied to avoid tests hanging forever which is hard to track down. + throw new IllegalStateException( + "Responded 10,000 times and stop condition still not reached."); + } + } + } catch (final InterruptedException e) { + // Ignore and recheck condition. + } + } + } + public void respondTimes(final Responder responder, final int maxCycles) { // Respond repeatedly, as each round may produce new outgoing messages int count = 0; @@ -157,19 +179,22 @@ public void respondTimes(final Responder responder, final int maxCycles) { /** @return True if any requests were processed */ public boolean respond(final Responder responder) { // Respond to queued messages - final List currentMessages = new ArrayList<>(outgoingMessages); - outgoingMessages.clear(); + final List currentMessages = new ArrayList<>(); + outgoingMessages.drainTo(currentMessages); for (final OutgoingMessage msg : currentMessages) { - final Optional maybeResponse = - responder.respond(msg.capability, msg.messageData); - maybeResponse.ifPresent( - (response) -> - ethProtocolManager.processMessage( - msg.capability, new DefaultMessage(peerConnection, response))); + respondToMessage(responder, msg); } return currentMessages.size() > 0; } + private void respondToMessage(final Responder responder, final OutgoingMessage msg) { + final Optional maybeResponse = responder.respond(msg.capability, msg.messageData); + maybeResponse.ifPresent( + (response) -> + ethProtocolManager.processMessage( + msg.capability, new DefaultMessage(peerConnection, response))); + } + public Optional peekNextOutgoingRequest() { if (outgoingMessages.isEmpty()) { return Optional.empty(); diff --git a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/fastsync/FastSyncChainDownloaderTest.java b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/fastsync/FastSyncChainDownloaderTest.java index f60c6567c1..d67da74e9c 100644 --- a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/fastsync/FastSyncChainDownloaderTest.java +++ b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/fastsync/FastSyncChainDownloaderTest.java @@ -21,9 +21,11 @@ import tech.pegasys.pantheon.ethereum.ProtocolContext; import tech.pegasys.pantheon.ethereum.chain.Blockchain; import tech.pegasys.pantheon.ethereum.chain.MutableBlockchain; +import tech.pegasys.pantheon.ethereum.eth.manager.DeterministicEthScheduler; import tech.pegasys.pantheon.ethereum.eth.manager.EthContext; import tech.pegasys.pantheon.ethereum.eth.manager.EthProtocolManager; import tech.pegasys.pantheon.ethereum.eth.manager.EthProtocolManagerTestUtil; +import tech.pegasys.pantheon.ethereum.eth.manager.EthScheduler; import tech.pegasys.pantheon.ethereum.eth.manager.RespondingEthPeer; import tech.pegasys.pantheon.ethereum.eth.manager.RespondingEthPeer.Responder; import tech.pegasys.pantheon.ethereum.eth.manager.ethtaskutils.BlockchainSetupUtil; @@ -33,6 +35,7 @@ import tech.pegasys.pantheon.metrics.noop.NoOpMetricsSystem; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; import org.junit.Before; import org.junit.Test; @@ -62,7 +65,11 @@ public void setup() { protocolSchedule = localBlockchainSetup.getProtocolSchedule(); protocolContext = localBlockchainSetup.getProtocolContext(); ethProtocolManager = - EthProtocolManagerTestUtil.create(localBlockchain, localBlockchainSetup.getWorldArchive()); + EthProtocolManagerTestUtil.create( + localBlockchain, + localBlockchainSetup.getWorldArchive(), + DeterministicEthScheduler.TimeoutPolicy.NEVER, + new EthScheduler(1, 1, 1)); ethContext = ethProtocolManager.ethContext(); syncState = new SyncState(protocolContext.getBlockchain(), ethContext.getEthPeers()); } @@ -81,7 +88,8 @@ private FastSyncChainDownloader downloader( } @Test - public void shouldSyncToPivotBlockInMultipleSegments() { + public void shouldSyncToPivotBlockInMultipleSegments() + throws ExecutionException, InterruptedException { otherBlockchainSetup.importFirstBlocks(30); final RespondingEthPeer peer = @@ -98,7 +106,7 @@ public void shouldSyncToPivotBlockInMultipleSegments() { final FastSyncChainDownloader downloader = downloader(syncConfig, pivotBlockNumber); final CompletableFuture result = downloader.start(); - peer.respondWhile(responder, () -> !result.isDone()); + peer.respondWhileOtherThreadsWork(responder, () -> !result.isDone()); assertThat(result).isCompleted(); assertThat(localBlockchain.getChainHeadBlockNumber()).isEqualTo(pivotBlockNumber); @@ -120,7 +128,7 @@ public void shouldSyncToPivotBlockInSingleSegment() { final FastSyncChainDownloader downloader = downloader(syncConfig, pivotBlockNumber); final CompletableFuture result = downloader.start(); - peer.respondWhile(responder, () -> !result.isDone()); + peer.respondWhileOtherThreadsWork(responder, () -> !result.isDone()); assertThat(result).isCompleted(); assertThat(localBlockchain.getChainHeadBlockNumber()).isEqualTo(pivotBlockNumber); @@ -163,7 +171,7 @@ public void recoversFromSyncTargetDisconnect() { ethProtocolManager.handleDisconnect(bestPeer.getPeerConnection(), TOO_MANY_PEERS, true); - secondBestPeer.respondWhile(shorterResponder, () -> !result.isDone()); + secondBestPeer.respondWhileOtherThreadsWork(shorterResponder, () -> !result.isDone()); assertThat(result).isCompleted(); assertThat(localBlockchain.getChainHeadBlockNumber()).isEqualTo(pivotBlockNumber); diff --git a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/fullsync/FullSyncDownloaderTest.java b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/fullsync/FullSyncDownloaderTest.java index 2eddc406fc..de4fe452c5 100644 --- a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/fullsync/FullSyncDownloaderTest.java +++ b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/fullsync/FullSyncDownloaderTest.java @@ -28,9 +28,11 @@ import tech.pegasys.pantheon.ethereum.core.BlockDataGenerator; import tech.pegasys.pantheon.ethereum.core.BlockHeader; import tech.pegasys.pantheon.ethereum.core.TransactionReceipt; +import tech.pegasys.pantheon.ethereum.eth.manager.DeterministicEthScheduler; import tech.pegasys.pantheon.ethereum.eth.manager.EthContext; import tech.pegasys.pantheon.ethereum.eth.manager.EthProtocolManager; import tech.pegasys.pantheon.ethereum.eth.manager.EthProtocolManagerTestUtil; +import tech.pegasys.pantheon.ethereum.eth.manager.EthScheduler; import tech.pegasys.pantheon.ethereum.eth.manager.RespondingEthPeer; import tech.pegasys.pantheon.ethereum.eth.manager.RespondingEthPeer.Responder; import tech.pegasys.pantheon.ethereum.eth.manager.ethtaskutils.BlockchainSetupUtil; @@ -82,7 +84,11 @@ public void setupTest() { protocolSchedule = localBlockchainSetup.getProtocolSchedule(); protocolContext = localBlockchainSetup.getProtocolContext(); ethProtocolManager = - EthProtocolManagerTestUtil.create(localBlockchain, localBlockchainSetup.getWorldArchive()); + EthProtocolManagerTestUtil.create( + localBlockchain, + localBlockchainSetup.getWorldArchive(), + DeterministicEthScheduler.TimeoutPolicy.NEVER, + new EthScheduler(1, 1, 1)); ethContext = ethProtocolManager.ethContext(); syncState = new SyncState(protocolContext.getBlockchain(), ethContext.getEthPeers()); @@ -541,9 +547,10 @@ public void recoversFromSyncTargetDisconnect() { ethProtocolManager.handleDisconnect( bestPeer.getPeerConnection(), DisconnectReason.TOO_MANY_PEERS, true); - // Downloader should recover and sync to next best peer + // Downloader should recover and sync to next best peer, but it may stall + // for 10 seconds first (by design). await() - .atMost(10, TimeUnit.SECONDS) + .atMost(20, TimeUnit.SECONDS) .untilAsserted( () -> { secondBestPeer.respond(secondBestResponder);