Skip to content
This repository has been archived by the owner on Sep 26, 2019. It is now read-only.

Commit

Permalink
[NC-2236] Parallel Block importer (#774)
Browse files Browse the repository at this point in the history
ParallelImportChainSegmentTask, an explicitly parallel re-implementation
of PipelinedImportChainSegmentTask.  Data is passed between stages
via BlockingQueues.

Pipeline stages are implemented in AbstractPipelinePeerTask and the
parent task assembles and initiates the pipeline execution.

Other changes to support this:
* Move ethTaskTimer to abstract root
* Don't use deterministic scheduler for downloader tests, this depends on explicit parallelism
* Change download segment size = 200
* Increase timeout in recoversFromSyncTargetDisconnect, the chain downloader
may stall for 10 seconds looking for an alternative target.
* Use a blocking queue instead of fixed wait period for the test peers
  • Loading branch information
shemnon authored Feb 8, 2019
1 parent 539c4ad commit c05576c
Show file tree
Hide file tree
Showing 22 changed files with 765 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;

Expand Down Expand Up @@ -52,6 +53,20 @@ public final CompletableFuture<T> run() {
return result.get();
}

@Override
public final CompletableFuture<T> runAsync(final ExecutorService executor) {
if (result.compareAndSet(null, new CompletableFuture<>())) {
executor.submit(this::executeTaskTimed);
result
.get()
.whenComplete(
(r, t) -> {
cleanup();
});
}
return result.get();
}

@Override
public final void cancel() {
synchronized (result) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
/*
* Copyright 2019 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.pantheon.ethereum.eth.manager;

import tech.pegasys.pantheon.metrics.LabelledMetric;
import tech.pegasys.pantheon.metrics.OperationTimer;

import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

public abstract class AbstractPipelinedPeerTask<I, O> extends AbstractPeerTask<List<O>> {
private static final Logger LOG = LogManager.getLogger();

static final int TIMEOUT_MS = 1000;

private BlockingQueue<I> inboundQueue;
private BlockingQueue<O> outboundQueue;
private List<O> results;

private boolean shuttingDown = false;
private AtomicReference<Throwable> processingException = new AtomicReference<>(null);

protected AbstractPipelinedPeerTask(
final BlockingQueue<I> inboundQueue,
final int outboundBacklogSize,
final EthContext ethContext,
final LabelledMetric<OperationTimer> ethTasksTimer) {
super(ethContext, ethTasksTimer);
this.inboundQueue = inboundQueue;
outboundQueue = new LinkedBlockingQueue<>(outboundBacklogSize);
results = new ArrayList<>();
}

@Override
protected void executeTaskWithPeer(final EthPeer peer) {
Optional<I> previousInput = Optional.empty();
while (!isDone() && processingException.get() == null) {
if (shuttingDown && inboundQueue.isEmpty()) {
break;
}
final I input;
try {
input = inboundQueue.poll(TIMEOUT_MS, TimeUnit.MILLISECONDS);
if (input == null) {
// timed out waiting for a result
continue;
}
} catch (final InterruptedException e) {
// this is expected
continue;
}
final Optional<O> output = processStep(input, previousInput, peer);
output.ifPresent(
o -> {
try {
outboundQueue.put(o);
} catch (final InterruptedException e) {
processingException.compareAndSet(null, e);
}
results.add(o);
});
previousInput = Optional.of(input);
}
if (processingException.get() == null) {
result.get().complete(new PeerTaskResult<>(peer, results));
} else {
result.get().completeExceptionally(processingException.get());
}
}

public BlockingQueue<O> getOutboundQueue() {
return outboundQueue;
}

public void shutdown() {
this.shuttingDown = true;
}

protected void failExceptionally(final Throwable t) {
LOG.error("Task Failure", t);
processingException.compareAndSet(null, t);
result.get().completeExceptionally(t);
cancel();
}

protected abstract Optional<O> processStep(I input, Optional<I> previousInput, EthPeer peer);
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public class EthScheduler {
private final ExecutorService servicesExecutor;
private final ExecutorService computationExecutor;

EthScheduler(
public EthScheduler(
final int syncWorkerCount, final int txWorkerCount, final int computationWorkerCount) {
this(
Executors.newFixedThreadPool(
Expand Down Expand Up @@ -125,11 +125,11 @@ public void scheduleTxWorkerTask(final Runnable command) {
txWorkerExecutor.submit(command);
}

CompletableFuture<Void> scheduleServiceTask(final Runnable service) {
return CompletableFuture.runAsync(service, servicesExecutor);
public <T> CompletableFuture<T> scheduleServiceTask(final EthTask<T> task) {
return task.runAsync(servicesExecutor);
}

<T> CompletableFuture<T> scheduleComputationTask(final Supplier<T> computation) {
public <T> CompletableFuture<T> scheduleComputationTask(final Supplier<T> computation) {
return CompletableFuture.supplyAsync(computation, computationExecutor);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,13 @@
package tech.pegasys.pantheon.ethereum.eth.manager;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;

public interface EthTask<T> {

CompletableFuture<T> run();

CompletableFuture<T> runAsync(ExecutorService executor);

void cancel();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright 2019 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.pantheon.ethereum.eth.sync;

import tech.pegasys.pantheon.ethereum.core.BlockHeader;

import java.util.List;
import java.util.concurrent.CompletableFuture;

public interface BlockHandler<B> {
CompletableFuture<List<B>> downloadBlocks(final List<BlockHeader> headers);

CompletableFuture<List<B>> validateAndImportBlocks(final List<B> blocks);

long extractBlockNumber(final B block);
}
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ public static class Builder {
private int downloaderHeaderRequestSize = 10;
private int downloaderCheckpointTimeoutsPermitted = 5;
private int downloaderChainSegmentTimeoutsPermitted = 5;
private int downloaderChainSegmentSize = 20;
private int downloaderChainSegmentSize = 200;
private long trailingPeerBlocksBehindThreshold;
private int maxTrailingPeers = Integer.MAX_VALUE;
private int downloaderParallelism = 2;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@
import tech.pegasys.pantheon.ethereum.core.BlockImporter;
import tech.pegasys.pantheon.ethereum.core.TransactionReceipt;
import tech.pegasys.pantheon.ethereum.eth.manager.EthContext;
import tech.pegasys.pantheon.ethereum.eth.sync.BlockHandler;
import tech.pegasys.pantheon.ethereum.eth.sync.ValidationPolicy;
import tech.pegasys.pantheon.ethereum.eth.sync.tasks.CompleteBlocksTask;
import tech.pegasys.pantheon.ethereum.eth.sync.tasks.GetReceiptsForHeadersTask;
import tech.pegasys.pantheon.ethereum.eth.sync.tasks.PipelinedImportChainSegmentTask.BlockHandler;
import tech.pegasys.pantheon.ethereum.eth.sync.tasks.exceptions.InvalidBlockException;
import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule;
import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSpec;
Expand Down Expand Up @@ -125,4 +125,9 @@ private BlockImporter<C> getBlockImporter(final BlockWithReceipts blockWithRecei

return protocolSpec.getBlockImporter();
}

@Override
public long extractBlockNumber(final BlockWithReceipts blockWithReceipt) {
return blockWithReceipt.getHeader().getNumber();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import tech.pegasys.pantheon.ethereum.eth.sync.ChainDownloader;
import tech.pegasys.pantheon.ethereum.eth.sync.SynchronizerConfiguration;
import tech.pegasys.pantheon.ethereum.eth.sync.state.SyncState;
import tech.pegasys.pantheon.ethereum.eth.sync.tasks.PipelinedImportChainSegmentTask;
import tech.pegasys.pantheon.ethereum.eth.sync.tasks.ParallelImportChainSegmentTask;
import tech.pegasys.pantheon.ethereum.mainnet.HeaderValidationMode;
import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule;
import tech.pegasys.pantheon.metrics.Counter;
Expand Down Expand Up @@ -103,8 +103,8 @@ private CompletableFuture<List<Block>> importBlocksForCheckpoints(
HeaderValidationMode.DETACHED_ONLY,
fastSyncValidationCounter);

final PipelinedImportChainSegmentTask<C, BlockWithReceipts> importTask =
PipelinedImportChainSegmentTask.forCheckpoints(
final ParallelImportChainSegmentTask<C, BlockWithReceipts> importTask =
ParallelImportChainSegmentTask.forCheckpoints(
protocolSchedule,
protocolContext,
ethContext,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@
import tech.pegasys.pantheon.ethereum.core.BlockHeader;
import tech.pegasys.pantheon.ethereum.core.Transaction;
import tech.pegasys.pantheon.ethereum.eth.manager.EthContext;
import tech.pegasys.pantheon.ethereum.eth.sync.BlockHandler;
import tech.pegasys.pantheon.ethereum.eth.sync.tasks.CompleteBlocksTask;
import tech.pegasys.pantheon.ethereum.eth.sync.tasks.PersistBlockTask;
import tech.pegasys.pantheon.ethereum.eth.sync.tasks.PipelinedImportChainSegmentTask.BlockHandler;
import tech.pegasys.pantheon.ethereum.mainnet.HeaderValidationMode;
import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule;
import tech.pegasys.pantheon.metrics.LabelledMetric;
Expand Down Expand Up @@ -72,6 +72,11 @@ public CompletableFuture<List<Block>> downloadBlocks(final List<BlockHeader> hea
.thenCompose(this::extractTransactionSenders);
}

@Override
public long extractBlockNumber(final Block block) {
return block.getHeader().getNumber();
}

private CompletableFuture<List<Block>> extractTransactionSenders(final List<Block> blocks) {
LOG.debug(
"Extracting sender {} to {}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import tech.pegasys.pantheon.ethereum.eth.sync.SynchronizerConfiguration;
import tech.pegasys.pantheon.ethereum.eth.sync.state.SyncState;
import tech.pegasys.pantheon.ethereum.eth.sync.tasks.ImportBlocksTask;
import tech.pegasys.pantheon.ethereum.eth.sync.tasks.PipelinedImportChainSegmentTask;
import tech.pegasys.pantheon.ethereum.eth.sync.tasks.ParallelImportChainSegmentTask;
import tech.pegasys.pantheon.ethereum.mainnet.HeaderValidationMode;
import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule;
import tech.pegasys.pantheon.metrics.LabelledMetric;
Expand Down Expand Up @@ -93,8 +93,8 @@ private CompletableFuture<List<Block>> importBlocksForCheckpoints(
ethTasksTimer);
importedBlocks = importTask.run().thenApply(PeerTaskResult::getResult);
} else {
final PipelinedImportChainSegmentTask<C, Block> importTask =
PipelinedImportChainSegmentTask.forCheckpoints(
final ParallelImportChainSegmentTask<C, Block> importTask =
ParallelImportChainSegmentTask.forCheckpoints(
protocolSchedule,
protocolContext,
ethContext,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ public static <C> DownloadHeaderSequenceTask<C> endingAtHeader(
protected CompletableFuture<List<BlockHeader>> executePeerTask(
final Optional<EthPeer> assignedPeer) {
LOG.debug(
"Downloading headers from {} to {}.", startingBlockNumber, referenceHeader.getNumber() - 1);
"Downloading headers from {} to {}.", startingBlockNumber, referenceHeader.getNumber());
final CompletableFuture<List<BlockHeader>> task =
downloadHeaders(assignedPeer).thenCompose(this::processHeaders);
return task.whenComplete(
Expand All @@ -129,8 +129,8 @@ protected CompletableFuture<List<BlockHeader>> executePeerTask(
if (lastFilledHeaderIndex == 0) {
LOG.debug(
"Finished downloading headers from {} to {}.",
startingBlockNumber,
referenceHeader.getNumber() - 1);
headers[0].getNumber(),
headers[segmentLength - 1].getNumber());
result.get().complete(Arrays.asList(headers));
}
});
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Copyright 2019 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.pantheon.ethereum.eth.sync.tasks;

import tech.pegasys.pantheon.ethereum.core.BlockHeader;
import tech.pegasys.pantheon.ethereum.eth.manager.AbstractPipelinedPeerTask;
import tech.pegasys.pantheon.ethereum.eth.manager.EthContext;
import tech.pegasys.pantheon.ethereum.eth.manager.EthPeer;
import tech.pegasys.pantheon.ethereum.eth.sync.BlockHandler;
import tech.pegasys.pantheon.metrics.LabelledMetric;
import tech.pegasys.pantheon.metrics.OperationTimer;

import java.util.List;
import java.util.Optional;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutionException;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

public class ParallelDownloadBodiesTask<B>
extends AbstractPipelinedPeerTask<List<BlockHeader>, List<B>> {
private static final Logger LOG = LogManager.getLogger();

private final BlockHandler<B> blockHandler;

ParallelDownloadBodiesTask(
final BlockHandler<B> blockHandler,
final BlockingQueue<List<BlockHeader>> inboundQueue,
final int outboundBacklogSize,
final EthContext ethContext,
final LabelledMetric<OperationTimer> ethTasksTimer) {
super(inboundQueue, outboundBacklogSize, ethContext, ethTasksTimer);

this.blockHandler = blockHandler;
}

@Override
protected Optional<List<B>> processStep(
final List<BlockHeader> headers,
final Optional<List<BlockHeader>> previousHeaders,
final EthPeer peer) {
LOG.trace(
"Downloading bodies {} to {}",
headers.get(0).getNumber(),
headers.get(headers.size() - 1).getNumber());
try {
final List<B> blocks = blockHandler.downloadBlocks(headers).get();
LOG.debug(
"Downloaded bodies {} to {}",
headers.get(0).getNumber(),
headers.get(headers.size() - 1).getNumber());
return Optional.of(blocks);
} catch (final InterruptedException | ExecutionException e) {
failExceptionally(e);
return Optional.empty();
}
}
}
Loading

0 comments on commit c05576c

Please sign in to comment.