Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BESU-194] Remove max pivot block resets during fast sync #427

Merged
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand Down Expand Up @@ -88,6 +89,11 @@ public CompletableFuture<FastSyncState> waitForSuitablePeers(final FastSyncState
.thenApply(successfulWaitResult -> fastSyncState);
}

public <T> CompletableFuture<T> scheduleFutureTask(
final Supplier<CompletableFuture<T>> future, final Duration duration) {
return ethContext.getScheduler().scheduleFutureTask(future, duration);
}

private CompletableFuture<Void> waitForAnyPeer() {
final CompletableFuture<Void> waitForPeerResult =
ethContext.getScheduler().timeout(WaitForPeersTask.create(ethContext, 1, metricsSystem));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

import java.io.IOException;
import java.nio.file.Path;
import java.time.Duration;
import java.util.Optional;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
Expand All @@ -38,6 +39,9 @@
import org.apache.logging.log4j.Logger;

public class FastSyncDownloader<C> {

private static final Duration FAST_SYNC_RETRY_DELAY = Duration.ofSeconds(5);

private static final Logger LOG = LogManager.getLogger();
private final FastSyncActions<C> fastSyncActions;
private final WorldStateDownloader worldStateDownloader;
Expand Down Expand Up @@ -80,17 +84,25 @@ private CompletableFuture<FastSyncState> start(final FastSyncState fastSyncState
.thenApply(this::updateMaxTrailingPeers)
.thenApply(this::storeState)
.thenCompose(this::downloadChainAndWorldState),
this::handleWorldStateUnavailable);
this::handleFailure);
}

private CompletableFuture<FastSyncState> handleWorldStateUnavailable(final Throwable error) {
private CompletableFuture<FastSyncState> handleFailure(final Throwable error) {
trailingPeerRequirements = Optional.empty();
if (ExceptionUtils.rootCause(error) instanceof StalledDownloadException) {
if (ExceptionUtils.rootCause(error) instanceof FastSyncException) {
return completedExceptionally(error);
} else if (ExceptionUtils.rootCause(error) instanceof StalledDownloadException) {
LOG.warn(
"Fast sync was unable to download the world state. Retrying with a new pivot block.");
return start(FastSyncState.EMPTY_SYNC_STATE);
} else {
return completedExceptionally(error);
LOG.error(
"Encountered an unexpected error during fast sync. Restarting fast sync in "
+ FAST_SYNC_RETRY_DELAY
+ " seconds.",
error);
return fastSyncActions.scheduleFutureTask(
() -> start(FastSyncState.EMPTY_SYNC_STATE), FAST_SYNC_RETRY_DELAY);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ public class PivotBlockRetriever<C> {

private static final Logger LOG = LogManager.getLogger();
public static final int MAX_QUERY_RETRIES_PER_PEER = 3;
private static final int DEFAULT_MAX_PIVOT_BLOCK_RESETS = 5;
private static final int DEFAULT_MAX_PIVOT_BLOCK_RESETS = 50;
private static final int SUSPICIOUS_NUMBER_OF_RETRIES = 5;

private final EthContext ethContext;
private final MetricsSystem metricsSystem;
Expand Down Expand Up @@ -148,6 +149,11 @@ private void handleContestedPivotBlock(final long contestedBlockNumber) {
LOG.info("Received conflicting pivot blocks for {}.", contestedBlockNumber);

final int retryCount = confirmationTasks.size();

if ((retryCount % SUSPICIOUS_NUMBER_OF_RETRIES) == 0) {
LOG.warn("{} attempts have failed to find a fast sync pivot block", retryCount);
}

if (retryCount > maxPivotBlockResets
|| pivotBlockNumber.get() <= BlockHeader.GENESIS_BLOCK_NUMBER) {
LOG.info("Max retries reached, cancel pivot block download.");
matkt marked this conversation as resolved.
Show resolved Hide resolved
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import static java.util.concurrent.CompletableFuture.completedFuture;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
Expand All @@ -34,6 +35,7 @@
import java.nio.file.Path;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;

import org.assertj.core.api.Assertions;
import org.junit.Test;
Expand Down Expand Up @@ -358,6 +360,77 @@ public void shouldResetFastSyncStateAndRestartProcessIfWorldStateIsUnavailable()
assertThat(result).isCompletedWithValue(secondDownloadPivotBlockHeaderState);
}

@SuppressWarnings("unchecked")
@Test
public void shouldResetFastSyncStateAndRestartProcessIfANonFastSyncExceptionOccurs() {
final CompletableFuture<Void> firstWorldStateFuture = new CompletableFuture<>();
final CompletableFuture<Void> secondWorldStateFuture = new CompletableFuture<>();
final CompletableFuture<Void> chainFuture = new CompletableFuture<>();
final ChainDownloader secondChainDownloader = mock(ChainDownloader.class);
final FastSyncState selectPivotBlockState = new FastSyncState(50);
final FastSyncState secondSelectPivotBlockState = new FastSyncState(90);
final BlockHeader pivotBlockHeader = new BlockHeaderTestFixture().number(50).buildHeader();
final BlockHeader secondPivotBlockHeader =
new BlockHeaderTestFixture().number(90).buildHeader();
final FastSyncState downloadPivotBlockHeaderState = new FastSyncState(pivotBlockHeader);
final FastSyncState secondDownloadPivotBlockHeaderState =
new FastSyncState(secondPivotBlockHeader);
// First attempt
when(fastSyncActions.waitForSuitablePeers(FastSyncState.EMPTY_SYNC_STATE)).thenReturn(COMPLETE);
when(fastSyncActions.selectPivotBlock(FastSyncState.EMPTY_SYNC_STATE))
.thenReturn(
completedFuture(selectPivotBlockState), completedFuture(secondSelectPivotBlockState));
when(fastSyncActions.downloadPivotBlockHeader(selectPivotBlockState))
.thenReturn(completedFuture(downloadPivotBlockHeaderState));
when(fastSyncActions.createChainDownloader(downloadPivotBlockHeaderState))
.thenReturn(chainDownloader);
when(chainDownloader.start()).thenReturn(chainFuture);
when(worldStateDownloader.run(pivotBlockHeader)).thenReturn(firstWorldStateFuture);
when(fastSyncActions.scheduleFutureTask(any(), any()))
.thenAnswer(invocation -> ((Supplier) invocation.getArgument(0)).get());

// Second attempt
when(fastSyncActions.downloadPivotBlockHeader(secondSelectPivotBlockState))
.thenReturn(completedFuture(secondDownloadPivotBlockHeaderState));

when(fastSyncActions.createChainDownloader(secondDownloadPivotBlockHeaderState))
.thenReturn(secondChainDownloader);
when(secondChainDownloader.start()).thenReturn(completedFuture(null));
when(worldStateDownloader.run(secondPivotBlockHeader)).thenReturn(secondWorldStateFuture);

final CompletableFuture<FastSyncState> result = downloader.start();

verify(fastSyncActions).waitForSuitablePeers(FastSyncState.EMPTY_SYNC_STATE);
verify(fastSyncActions).selectPivotBlock(FastSyncState.EMPTY_SYNC_STATE);
verify(fastSyncActions).downloadPivotBlockHeader(selectPivotBlockState);
verify(storage).storeState(downloadPivotBlockHeaderState);
verify(fastSyncActions).createChainDownloader(downloadPivotBlockHeaderState);
verify(worldStateDownloader).run(pivotBlockHeader);
verifyNoMoreInteractions(fastSyncActions, worldStateDownloader, storage);

assertThat(result).isNotDone();

firstWorldStateFuture.completeExceptionally(new RuntimeException("Test"));

assertThat(result).isNotDone();
verify(chainDownloader).cancel();
// A real chain downloader would cause the chainFuture to complete when cancel is called.
chainFuture.completeExceptionally(new CancellationException());

verify(fastSyncActions).scheduleFutureTask(any(), any());
verify(fastSyncActions, times(2)).waitForSuitablePeers(FastSyncState.EMPTY_SYNC_STATE);
verify(fastSyncActions, times(2)).selectPivotBlock(FastSyncState.EMPTY_SYNC_STATE);
verify(fastSyncActions).downloadPivotBlockHeader(secondSelectPivotBlockState);
verify(storage).storeState(secondDownloadPivotBlockHeaderState);
verify(fastSyncActions).createChainDownloader(secondDownloadPivotBlockHeaderState);
verify(worldStateDownloader).run(secondPivotBlockHeader);
verifyNoMoreInteractions(fastSyncActions, worldStateDownloader, storage);

secondWorldStateFuture.complete(null);

assertThat(result).isCompletedWithValue(secondDownloadPivotBlockHeaderState);
}

@Test
public void shouldNotHaveTrailingPeerRequirementsBeforePivotBlockSelected() {
when(fastSyncActions.waitForSuitablePeers(FastSyncState.EMPTY_SYNC_STATE))
Expand Down