Skip to content

Commit

Permalink
[BESU-194] Remove max pivot block resets during fast sync (#427)
Browse files Browse the repository at this point in the history
* remove max pivot block resets during fast sync

* increase max retry number and fix test

* change logs in the handleFailure method

* change logs related to suspicious number of retries
Signed-off-by: Karim TAAM <[email protected]>
  • Loading branch information
matkt authored Feb 27, 2020
1 parent 2f6c58e commit 8ced45c
Show file tree
Hide file tree
Showing 4 changed files with 102 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand Down Expand Up @@ -88,6 +89,11 @@ public CompletableFuture<FastSyncState> waitForSuitablePeers(final FastSyncState
.thenApply(successfulWaitResult -> fastSyncState);
}

public <T> CompletableFuture<T> scheduleFutureTask(
final Supplier<CompletableFuture<T>> future, final Duration duration) {
return ethContext.getScheduler().scheduleFutureTask(future, duration);
}

private CompletableFuture<Void> waitForAnyPeer() {
final CompletableFuture<Void> waitForPeerResult =
ethContext.getScheduler().timeout(WaitForPeersTask.create(ethContext, 1, metricsSystem));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

import java.io.IOException;
import java.nio.file.Path;
import java.time.Duration;
import java.util.Optional;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
Expand All @@ -38,6 +39,9 @@
import org.apache.logging.log4j.Logger;

public class FastSyncDownloader<C> {

private static final Duration FAST_SYNC_RETRY_DELAY = Duration.ofSeconds(5);

private static final Logger LOG = LogManager.getLogger();
private final FastSyncActions<C> fastSyncActions;
private final WorldStateDownloader worldStateDownloader;
Expand Down Expand Up @@ -80,17 +84,25 @@ private CompletableFuture<FastSyncState> start(final FastSyncState fastSyncState
.thenApply(this::updateMaxTrailingPeers)
.thenApply(this::storeState)
.thenCompose(this::downloadChainAndWorldState),
this::handleWorldStateUnavailable);
this::handleFailure);
}

private CompletableFuture<FastSyncState> handleWorldStateUnavailable(final Throwable error) {
private CompletableFuture<FastSyncState> handleFailure(final Throwable error) {
trailingPeerRequirements = Optional.empty();
if (ExceptionUtils.rootCause(error) instanceof StalledDownloadException) {
if (ExceptionUtils.rootCause(error) instanceof FastSyncException) {
return completedExceptionally(error);
} else if (ExceptionUtils.rootCause(error) instanceof StalledDownloadException) {
LOG.warn(
"Fast sync was unable to download the world state. Retrying with a new pivot block.");
return start(FastSyncState.EMPTY_SYNC_STATE);
} else {
return completedExceptionally(error);
LOG.error(
"Encountered an unexpected error during fast sync. Restarting fast sync in "
+ FAST_SYNC_RETRY_DELAY
+ " seconds.",
error);
return fastSyncActions.scheduleFutureTask(
() -> start(FastSyncState.EMPTY_SYNC_STATE), FAST_SYNC_RETRY_DELAY);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ public class PivotBlockRetriever<C> {

private static final Logger LOG = LogManager.getLogger();
public static final int MAX_QUERY_RETRIES_PER_PEER = 3;
private static final int DEFAULT_MAX_PIVOT_BLOCK_RESETS = 5;
private static final int DEFAULT_MAX_PIVOT_BLOCK_RESETS = 50;
private static final int SUSPICIOUS_NUMBER_OF_RETRIES = 5;

private final EthContext ethContext;
private final MetricsSystem metricsSystem;
Expand Down Expand Up @@ -148,6 +149,11 @@ private void handleContestedPivotBlock(final long contestedBlockNumber) {
LOG.info("Received conflicting pivot blocks for {}.", contestedBlockNumber);

final int retryCount = confirmationTasks.size();

if ((retryCount % SUSPICIOUS_NUMBER_OF_RETRIES) == 0) {
LOG.warn("{} attempts have failed to find a fast sync pivot block", retryCount);
}

if (retryCount > maxPivotBlockResets
|| pivotBlockNumber.get() <= BlockHeader.GENESIS_BLOCK_NUMBER) {
LOG.info("Max retries reached, cancel pivot block download.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import static java.util.concurrent.CompletableFuture.completedFuture;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
Expand All @@ -34,6 +35,7 @@
import java.nio.file.Path;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;

import org.assertj.core.api.Assertions;
import org.junit.Test;
Expand Down Expand Up @@ -358,6 +360,77 @@ public void shouldResetFastSyncStateAndRestartProcessIfWorldStateIsUnavailable()
assertThat(result).isCompletedWithValue(secondDownloadPivotBlockHeaderState);
}

@SuppressWarnings("unchecked")
@Test
public void shouldResetFastSyncStateAndRestartProcessIfANonFastSyncExceptionOccurs() {
final CompletableFuture<Void> firstWorldStateFuture = new CompletableFuture<>();
final CompletableFuture<Void> secondWorldStateFuture = new CompletableFuture<>();
final CompletableFuture<Void> chainFuture = new CompletableFuture<>();
final ChainDownloader secondChainDownloader = mock(ChainDownloader.class);
final FastSyncState selectPivotBlockState = new FastSyncState(50);
final FastSyncState secondSelectPivotBlockState = new FastSyncState(90);
final BlockHeader pivotBlockHeader = new BlockHeaderTestFixture().number(50).buildHeader();
final BlockHeader secondPivotBlockHeader =
new BlockHeaderTestFixture().number(90).buildHeader();
final FastSyncState downloadPivotBlockHeaderState = new FastSyncState(pivotBlockHeader);
final FastSyncState secondDownloadPivotBlockHeaderState =
new FastSyncState(secondPivotBlockHeader);
// First attempt
when(fastSyncActions.waitForSuitablePeers(FastSyncState.EMPTY_SYNC_STATE)).thenReturn(COMPLETE);
when(fastSyncActions.selectPivotBlock(FastSyncState.EMPTY_SYNC_STATE))
.thenReturn(
completedFuture(selectPivotBlockState), completedFuture(secondSelectPivotBlockState));
when(fastSyncActions.downloadPivotBlockHeader(selectPivotBlockState))
.thenReturn(completedFuture(downloadPivotBlockHeaderState));
when(fastSyncActions.createChainDownloader(downloadPivotBlockHeaderState))
.thenReturn(chainDownloader);
when(chainDownloader.start()).thenReturn(chainFuture);
when(worldStateDownloader.run(pivotBlockHeader)).thenReturn(firstWorldStateFuture);
when(fastSyncActions.scheduleFutureTask(any(), any()))
.thenAnswer(invocation -> ((Supplier) invocation.getArgument(0)).get());

// Second attempt
when(fastSyncActions.downloadPivotBlockHeader(secondSelectPivotBlockState))
.thenReturn(completedFuture(secondDownloadPivotBlockHeaderState));

when(fastSyncActions.createChainDownloader(secondDownloadPivotBlockHeaderState))
.thenReturn(secondChainDownloader);
when(secondChainDownloader.start()).thenReturn(completedFuture(null));
when(worldStateDownloader.run(secondPivotBlockHeader)).thenReturn(secondWorldStateFuture);

final CompletableFuture<FastSyncState> result = downloader.start();

verify(fastSyncActions).waitForSuitablePeers(FastSyncState.EMPTY_SYNC_STATE);
verify(fastSyncActions).selectPivotBlock(FastSyncState.EMPTY_SYNC_STATE);
verify(fastSyncActions).downloadPivotBlockHeader(selectPivotBlockState);
verify(storage).storeState(downloadPivotBlockHeaderState);
verify(fastSyncActions).createChainDownloader(downloadPivotBlockHeaderState);
verify(worldStateDownloader).run(pivotBlockHeader);
verifyNoMoreInteractions(fastSyncActions, worldStateDownloader, storage);

assertThat(result).isNotDone();

firstWorldStateFuture.completeExceptionally(new RuntimeException("Test"));

assertThat(result).isNotDone();
verify(chainDownloader).cancel();
// A real chain downloader would cause the chainFuture to complete when cancel is called.
chainFuture.completeExceptionally(new CancellationException());

verify(fastSyncActions).scheduleFutureTask(any(), any());
verify(fastSyncActions, times(2)).waitForSuitablePeers(FastSyncState.EMPTY_SYNC_STATE);
verify(fastSyncActions, times(2)).selectPivotBlock(FastSyncState.EMPTY_SYNC_STATE);
verify(fastSyncActions).downloadPivotBlockHeader(secondSelectPivotBlockState);
verify(storage).storeState(secondDownloadPivotBlockHeaderState);
verify(fastSyncActions).createChainDownloader(secondDownloadPivotBlockHeaderState);
verify(worldStateDownloader).run(secondPivotBlockHeader);
verifyNoMoreInteractions(fastSyncActions, worldStateDownloader, storage);

secondWorldStateFuture.complete(null);

assertThat(result).isCompletedWithValue(secondDownloadPivotBlockHeaderState);
}

@Test
public void shouldNotHaveTrailingPeerRequirementsBeforePivotBlockSelected() {
when(fastSyncActions.waitForSuitablePeers(FastSyncState.EMPTY_SYNC_STATE))
Expand Down

0 comments on commit 8ced45c

Please sign in to comment.