Skip to content
This repository has been archived by the owner on Sep 26, 2019. It is now read-only.

Commit

Permalink
review updates.
Browse files Browse the repository at this point in the history
  • Loading branch information
shemnon committed Feb 8, 2019
1 parent 6db15d3 commit 6126116
Show file tree
Hide file tree
Showing 5 changed files with 17 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public abstract class AbstractPipelinedPeerTask<I, O> extends AbstractPeerTask<L
private BlockingQueue<O> outboundQueue;
private List<O> results;

private boolean lameDuckMode = false;
private boolean shuttingDown = false;
private AtomicReference<Throwable> processingException = new AtomicReference<>(null);

protected AbstractPipelinedPeerTask(
Expand All @@ -53,7 +53,7 @@ protected AbstractPipelinedPeerTask(
protected void executeTaskWithPeer(final EthPeer peer) {
Optional<I> previousInput = Optional.empty();
while (!isDone() && processingException.get() == null) {
if (lameDuckMode && inboundQueue.isEmpty()) {
if (shuttingDown && inboundQueue.isEmpty()) {
break;
}
final I input;
Expand Down Expand Up @@ -90,12 +90,8 @@ public BlockingQueue<O> getOutboundQueue() {
return outboundQueue;
}

public boolean isLameDuckMode() {
return lameDuckMode;
}

public void setLameDuckMode(final boolean lameDuckMode) {
this.lameDuckMode = lameDuckMode;
public void shutdown() {
this.shuttingDown = true;
}

protected void failExceptionally(final Throwable t) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ protected Optional<List<BlockHeader>> processStep(
}
final int segmentLength =
(int) (nextCheckpointHeader.getNumber() - previousCheckpointHeader.get().getNumber()) - 1;
// body
LOG.trace(
"Requesting download of {} blocks ending at {}",
segmentLength,
Expand All @@ -84,7 +83,7 @@ protected Optional<List<BlockHeader>> processStep(
result.get().completeExceptionally(e);
return Optional.empty();
}
headers.add(nextCheckpointHeader);
// FIXME headers.add(nextCheckpointHeader);
if (headers.size() > 2) {
LOG.debug(
"Downloaded headers {} to {}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@ private ParallelImportChainSegmentTask(
}
this.checkpointHeaders =
new ArrayBlockingQueue<>(checkpointHeaders.size(), false, checkpointHeaders);
// this.chunksInTotal = checkpointHeaders.size() - 1;
this.blockHandler = blockHandler;
this.validationPolicy = validationPolicy;
}
Expand Down Expand Up @@ -150,10 +149,10 @@ protected void executeTask() {
scheduler.scheduleServiceTask(validateAndImportBodiesTask);

// Hook in pipeline completion signaling.
downloadHeadersTask.setLameDuckMode(true);
downloadHeaderFuture.thenRun(() -> validateHeadersTask.setLameDuckMode(true));
validateHeaderFuture.thenRun(() -> downloadBodiesTask.setLameDuckMode(true));
downloadBodiesFuture.thenRun(() -> validateAndImportBodiesTask.setLameDuckMode(true));
downloadHeadersTask.shutdown();
downloadHeaderFuture.thenRun(() -> validateHeadersTask.shutdown());
validateHeaderFuture.thenRun(() -> downloadBodiesTask.shutdown());
downloadBodiesFuture.thenRun(() -> validateAndImportBodiesTask.shutdown());

final BiConsumer<? super Object, ? super Throwable> cancelOnException =
(s, e) -> {
Expand All @@ -171,8 +170,9 @@ protected void executeTask() {
downloadBodiesFuture.whenComplete(cancelOnException);
validateBodiesFuture.whenComplete(
(r, e) -> {
cancelOnException.accept(r, e);
if (r != null) {
if (e != null) {
cancelOnException.accept(null, e);
} else if (r != null) {
try {
final List<B> importedBlocks =
validateBodiesFuture.get().getResult().stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public class ParallelValidateAndImportBodiesTask<B>

@Override
protected Optional<List<B>> processStep(
final List<B> blocks, final Optional<List<B>> previousHeaders, final EthPeer peer) {
final List<B> blocks, final Optional<List<B>> previousBlocks, final EthPeer peer) {
final long firstBlock = blockHandler.extractBlockNumber(blocks.get(0));
final long lastBlock = blockHandler.extractBlockNumber(blocks.get(blocks.size() - 1));
LOG.debug("Starting import of chain segment {} to {}", firstBlock, lastBlock);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,8 @@ public void shouldSyncToPivotBlockInSingleSegment() {
final FastSyncChainDownloader<?> downloader = downloader(syncConfig, pivotBlockNumber);
final CompletableFuture<Void> result = downloader.start();

peer.respondWhileOtherThreadsWork(responder, () -> !result.isDone());;
peer.respondWhileOtherThreadsWork(responder, () -> !result.isDone());
;

This comment has been minimized.

Copy link
@ajsutton

ajsutton Feb 8, 2019

Contributor

This is my fault, but there's an extra semi-colon here and on line 175... Sorry. :)


assertThat(result).isCompleted();
assertThat(localBlockchain.getChainHeadBlockNumber()).isEqualTo(pivotBlockNumber);
Expand Down Expand Up @@ -171,7 +172,8 @@ public void recoversFromSyncTargetDisconnect() {

ethProtocolManager.handleDisconnect(bestPeer.getPeerConnection(), TOO_MANY_PEERS, true);

secondBestPeer.respondWhileOtherThreadsWork(shorterResponder, () -> !result.isDone());;
secondBestPeer.respondWhileOtherThreadsWork(shorterResponder, () -> !result.isDone());
;

assertThat(result).isCompleted();
assertThat(localBlockchain.getChainHeadBlockNumber()).isEqualTo(pivotBlockNumber);
Expand Down

0 comments on commit 6126116

Please sign in to comment.