Skip to content
This repository has been archived by the owner on Sep 26, 2019. It is now read-only.

[NC-2236] Parallel Block importer #774

Merged
merged 20 commits into from
Feb 8, 2019
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;

Expand Down Expand Up @@ -52,6 +53,20 @@ public final CompletableFuture<T> run() {
return result.get();
}

@Override
public final CompletableFuture<T> runAsync(final ExecutorService executor) {
if (result.compareAndSet(null, new CompletableFuture<>())) {
executor.submit(this::executeTaskTimed);
result
.get()
.whenComplete(
(r, t) -> {
cleanup();
});
}
return result.get();
}

@Override
public final void cancel() {
synchronized (result) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
/*
* Copyright 2019 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.pantheon.ethereum.eth.manager;

import tech.pegasys.pantheon.metrics.LabelledMetric;
import tech.pegasys.pantheon.metrics.OperationTimer;

import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

public abstract class AbstractPipelinedPeerTask<I, O> extends AbstractPeerTask<List<O>> {
private static final Logger LOG = LogManager.getLogger();

static final int TIMEOUT_MS = 1000;

private BlockingQueue<I> inboundQueue;
private BlockingQueue<O> outboundQueue;
private List<O> results;

private boolean lameDuckMode = false;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we name this something a little more descriptive? Maybe just stopWhenInboundQueueEmpty?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lame duck is google server speak, keep processing but accept no new connections and then stop when you're done. Like a Lame Duck session in the US congress. But this is Java, shutdown is better for the method and shuttingDown for the var.

private AtomicReference<Throwable> processingException = new AtomicReference<>(null);

protected AbstractPipelinedPeerTask(
final BlockingQueue<I> inboundQueue,
final int outboundBacklogSize,
final EthContext ethContext,
final LabelledMetric<OperationTimer> ethTasksTimer) {
super(ethContext, ethTasksTimer);
this.inboundQueue = inboundQueue;
outboundQueue = new LinkedBlockingQueue<>(outboundBacklogSize);
results = new ArrayList<>();
}

@Override
protected void executeTaskWithPeer(final EthPeer peer) {
Optional<I> previousInput = Optional.empty();
while (!isDone() && processingException.get() == null) {
if (lameDuckMode && inboundQueue.isEmpty()) {
break;
}
final I input;
try {
input = inboundQueue.poll(TIMEOUT_MS, TimeUnit.MILLISECONDS);
if (input == null) {
// timed out waiting for a result
continue;
}
} catch (final InterruptedException e) {
// this is expected
continue;
}
final Optional<O> output = processStep(input, previousInput, peer);
output.ifPresent(
o -> {
try {
outboundQueue.put(o);
} catch (final InterruptedException e) {
processingException.compareAndSet(null, e);
}
results.add(o);
});
previousInput = Optional.of(input);
}
if (processingException.get() == null) {
result.get().complete(new PeerTaskResult<>(peer, results));
} else {
result.get().completeExceptionally(processingException.get());
}
}

public BlockingQueue<O> getOutboundQueue() {
return outboundQueue;
}

public boolean isLameDuckMode() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems to be unused so can be removed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

return lameDuckMode;
}

public void setLameDuckMode(final boolean lameDuckMode) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Naming-wise this seems quite similar to Executor.shutdown and may read better with that kind of action name rather than a setter style name - especially since we never need to setLameDuckMode(false)

this.lameDuckMode = lameDuckMode;
}

protected void failExceptionally(final Throwable t) {
LOG.error("Task Failure", t);
processingException.compareAndSet(null, t);
result.get().completeExceptionally(t);
cancel();
}

protected abstract Optional<O> processStep(I input, Optional<I> previousInput, EthPeer peer);
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public class EthScheduler {
private final ExecutorService servicesExecutor;
private final ExecutorService computationExecutor;

EthScheduler(
public EthScheduler(
final int syncWorkerCount, final int txWorkerCount, final int computationWorkerCount) {
this(
Executors.newFixedThreadPool(
Expand Down Expand Up @@ -125,11 +125,11 @@ public void scheduleTxWorkerTask(final Runnable command) {
txWorkerExecutor.submit(command);
}

CompletableFuture<Void> scheduleServiceTask(final Runnable service) {
return CompletableFuture.runAsync(service, servicesExecutor);
public <T> CompletableFuture<T> scheduleServiceTask(final EthTask<T> task) {
return task.runAsync(servicesExecutor);
}

<T> CompletableFuture<T> scheduleComputationTask(final Supplier<T> computation) {
public <T> CompletableFuture<T> scheduleComputationTask(final Supplier<T> computation) {
return CompletableFuture.supplyAsync(computation, computationExecutor);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,13 @@
package tech.pegasys.pantheon.ethereum.eth.manager;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;

public interface EthTask<T> {

CompletableFuture<T> run();

CompletableFuture<T> runAsync(ExecutorService executor);

void cancel();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright 2019 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.pantheon.ethereum.eth.sync;

import tech.pegasys.pantheon.ethereum.core.BlockHeader;

import java.util.List;
import java.util.concurrent.CompletableFuture;

public interface BlockHandler<B> {
CompletableFuture<List<B>> downloadBlocks(final List<BlockHeader> headers);

CompletableFuture<List<B>> validateAndImportBlocks(final List<B> blocks);

long extractBlockNumber(final B block);
}
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ public static class Builder {
private int downloaderHeaderRequestSize = 10;
private int downloaderCheckpointTimeoutsPermitted = 5;
private int downloaderChainSegmentTimeoutsPermitted = 5;
private int downloaderChainSegmentSize = 20;
private int downloaderChainSegmentSize = 200;
private long trailingPeerBlocksBehindThreshold;
private int maxTrailingPeers = Integer.MAX_VALUE;
private int downloaderParallelism = 2;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@
import tech.pegasys.pantheon.ethereum.core.BlockImporter;
import tech.pegasys.pantheon.ethereum.core.TransactionReceipt;
import tech.pegasys.pantheon.ethereum.eth.manager.EthContext;
import tech.pegasys.pantheon.ethereum.eth.sync.BlockHandler;
import tech.pegasys.pantheon.ethereum.eth.sync.ValidationPolicy;
import tech.pegasys.pantheon.ethereum.eth.sync.tasks.CompleteBlocksTask;
import tech.pegasys.pantheon.ethereum.eth.sync.tasks.GetReceiptsForHeadersTask;
import tech.pegasys.pantheon.ethereum.eth.sync.tasks.PipelinedImportChainSegmentTask.BlockHandler;
import tech.pegasys.pantheon.ethereum.eth.sync.tasks.exceptions.InvalidBlockException;
import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule;
import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSpec;
Expand Down Expand Up @@ -125,4 +125,9 @@ private BlockImporter<C> getBlockImporter(final BlockWithReceipts blockWithRecei

return protocolSpec.getBlockImporter();
}

@Override
public long extractBlockNumber(final BlockWithReceipts blockWithReceipt) {
return blockWithReceipt.getHeader().getNumber();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import tech.pegasys.pantheon.ethereum.eth.sync.ChainDownloader;
import tech.pegasys.pantheon.ethereum.eth.sync.SynchronizerConfiguration;
import tech.pegasys.pantheon.ethereum.eth.sync.state.SyncState;
import tech.pegasys.pantheon.ethereum.eth.sync.tasks.PipelinedImportChainSegmentTask;
import tech.pegasys.pantheon.ethereum.eth.sync.tasks.ParallelImportChainSegmentTask;
import tech.pegasys.pantheon.ethereum.mainnet.HeaderValidationMode;
import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule;
import tech.pegasys.pantheon.metrics.Counter;
Expand Down Expand Up @@ -103,8 +103,8 @@ private CompletableFuture<List<Block>> importBlocksForCheckpoints(
HeaderValidationMode.DETACHED_ONLY,
fastSyncValidationCounter);

final PipelinedImportChainSegmentTask<C, BlockWithReceipts> importTask =
PipelinedImportChainSegmentTask.forCheckpoints(
final ParallelImportChainSegmentTask<C, BlockWithReceipts> importTask =
ParallelImportChainSegmentTask.forCheckpoints(
protocolSchedule,
protocolContext,
ethContext,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@
import tech.pegasys.pantheon.ethereum.core.BlockHeader;
import tech.pegasys.pantheon.ethereum.core.Transaction;
import tech.pegasys.pantheon.ethereum.eth.manager.EthContext;
import tech.pegasys.pantheon.ethereum.eth.sync.BlockHandler;
import tech.pegasys.pantheon.ethereum.eth.sync.tasks.CompleteBlocksTask;
import tech.pegasys.pantheon.ethereum.eth.sync.tasks.PersistBlockTask;
import tech.pegasys.pantheon.ethereum.eth.sync.tasks.PipelinedImportChainSegmentTask.BlockHandler;
import tech.pegasys.pantheon.ethereum.mainnet.HeaderValidationMode;
import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule;
import tech.pegasys.pantheon.metrics.LabelledMetric;
Expand Down Expand Up @@ -72,6 +72,11 @@ public CompletableFuture<List<Block>> downloadBlocks(final List<BlockHeader> hea
.thenCompose(this::extractTransactionSenders);
}

@Override
public long extractBlockNumber(final Block block) {
return block.getHeader().getNumber();
}

private CompletableFuture<List<Block>> extractTransactionSenders(final List<Block> blocks) {
LOG.debug(
"Extracting sender {} to {}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import tech.pegasys.pantheon.ethereum.eth.sync.SynchronizerConfiguration;
import tech.pegasys.pantheon.ethereum.eth.sync.state.SyncState;
import tech.pegasys.pantheon.ethereum.eth.sync.tasks.ImportBlocksTask;
import tech.pegasys.pantheon.ethereum.eth.sync.tasks.PipelinedImportChainSegmentTask;
import tech.pegasys.pantheon.ethereum.eth.sync.tasks.ParallelImportChainSegmentTask;
import tech.pegasys.pantheon.ethereum.mainnet.HeaderValidationMode;
import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule;
import tech.pegasys.pantheon.metrics.LabelledMetric;
Expand Down Expand Up @@ -93,8 +93,8 @@ private CompletableFuture<List<Block>> importBlocksForCheckpoints(
ethTasksTimer);
importedBlocks = importTask.run().thenApply(PeerTaskResult::getResult);
} else {
final PipelinedImportChainSegmentTask<C, Block> importTask =
PipelinedImportChainSegmentTask.forCheckpoints(
final ParallelImportChainSegmentTask<C, Block> importTask =
ParallelImportChainSegmentTask.forCheckpoints(
protocolSchedule,
protocolContext,
ethContext,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ public static <C> DownloadHeaderSequenceTask<C> endingAtHeader(
protected CompletableFuture<List<BlockHeader>> executePeerTask(
final Optional<EthPeer> assignedPeer) {
LOG.debug(
"Downloading headers from {} to {}.", startingBlockNumber, referenceHeader.getNumber() - 1);
"Downloading headers from {} to {}.", startingBlockNumber, referenceHeader.getNumber());
final CompletableFuture<List<BlockHeader>> task =
downloadHeaders(assignedPeer).thenCompose(this::processHeaders);
return task.whenComplete(
Expand All @@ -129,8 +129,8 @@ protected CompletableFuture<List<BlockHeader>> executePeerTask(
if (lastFilledHeaderIndex == 0) {
LOG.debug(
"Finished downloading headers from {} to {}.",
startingBlockNumber,
referenceHeader.getNumber() - 1);
headers[0].getNumber(),
headers[segmentLength - 1].getNumber());
result.get().complete(Arrays.asList(headers));
}
});
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Copyright 2019 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.pantheon.ethereum.eth.sync.tasks;

import tech.pegasys.pantheon.ethereum.core.BlockHeader;
import tech.pegasys.pantheon.ethereum.eth.manager.AbstractPipelinedPeerTask;
import tech.pegasys.pantheon.ethereum.eth.manager.EthContext;
import tech.pegasys.pantheon.ethereum.eth.manager.EthPeer;
import tech.pegasys.pantheon.ethereum.eth.sync.BlockHandler;
import tech.pegasys.pantheon.metrics.LabelledMetric;
import tech.pegasys.pantheon.metrics.OperationTimer;

import java.util.List;
import java.util.Optional;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutionException;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

public class ParallelDownloadBodiesTask<B>
extends AbstractPipelinedPeerTask<List<BlockHeader>, List<B>> {
private static final Logger LOG = LogManager.getLogger();

private final BlockHandler<B> blockHandler;

ParallelDownloadBodiesTask(
final BlockHandler<B> blockHandler,
final BlockingQueue<List<BlockHeader>> inboundQueue,
final int outboundBacklogSize,
final EthContext ethContext,
final LabelledMetric<OperationTimer> ethTasksTimer) {
super(inboundQueue, outboundBacklogSize, ethContext, ethTasksTimer);

this.blockHandler = blockHandler;
}

@Override
protected Optional<List<B>> processStep(
final List<BlockHeader> headers,
final Optional<List<BlockHeader>> previousHeaders,
final EthPeer peer) {
LOG.trace(
"Downloading bodies {} to {}",
headers.get(0).getNumber(),
headers.get(headers.size() - 1).getNumber());
try {
final List<B> blocks = blockHandler.downloadBlocks(headers).get();
LOG.debug(
"Downloaded bodies {} to {}",
headers.get(0).getNumber(),
headers.get(headers.size() - 1).getNumber());
return Optional.of(blocks);
} catch (final InterruptedException | ExecutionException e) {
failExceptionally(e);
return Optional.empty();
}
}
}
Loading