Skip to content
This repository has been archived by the owner on Sep 26, 2019. It is now read-only.

Commit

Permalink
[PAN-2372] Consider a world state download stalled after 100 requests…
Browse files Browse the repository at this point in the history
… with no progress (#1007)
  • Loading branch information
ajsutton authored Feb 28, 2019
1 parent 1b37c83 commit 5986da9
Show file tree
Hide file tree
Showing 8 changed files with 75 additions and 58 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ public void setUpUnchangedState() throws Exception {
pendingRequests,
syncConfig.getWorldStateHashCountPerRequest(),
syncConfig.getWorldStateRequestParallelism(),
syncConfig.getWorldStateRequestMaxRetries(),
syncConfig.getWorldStateMaxRequestsWithoutProgress(),
metricsSystem);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ public static <C> Optional<FastSynchronizer<C>> create(
stateQueue,
syncConfig.getWorldStateHashCountPerRequest(),
syncConfig.getWorldStateRequestParallelism(),
syncConfig.getWorldStateRequestMaxRetries(),
syncConfig.getWorldStateMaxRequestsWithoutProgress(),
metricsSystem);
final FastSyncDownloader<C> fastSyncDownloader =
new FastSyncDownloader<>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public class SynchronizerConfiguration {
private static final Duration DEFAULT_FAST_SYNC_MAXIMUM_PEER_WAIT_TIME = Duration.ofMinutes(5);
private static final int DEFAULT_WORLD_STATE_HASH_COUNT_PER_REQUEST = 384;
private static final int DEFAULT_WORLD_STATE_REQUEST_PARALLELISM = 10;
private static final int DEFAULT_WORLD_STATE_REQUEST_MAX_RETRIES = 25;
private static final int DEFAULT_WORLD_STATE_MAX_REQUESTS_WITHOUT_PROGRESS = 100;

// Fast sync config
private final int fastSyncPivotDistance;
Expand All @@ -38,7 +38,7 @@ public class SynchronizerConfiguration {
private final Duration fastSyncMaximumPeerWaitTime;
private final int worldStateHashCountPerRequest;
private final int worldStateRequestParallelism;
private final int worldStateRequestMaxRetries;
private final int worldStateMaxRequestsWithoutProgress;

// Block propagation config
private final Range<Long> blockPropagationRange;
Expand Down Expand Up @@ -66,7 +66,7 @@ private SynchronizerConfiguration(
final Duration fastSyncMaximumPeerWaitTime,
final int worldStateHashCountPerRequest,
final int worldStateRequestParallelism,
final int worldStateRequestMaxRetries,
final int worldStateMaxRequestsWithoutProgress,
final Range<Long> blockPropagationRange,
final SyncMode syncMode,
final long downloaderChangeTargetThresholdByHeight,
Expand All @@ -86,7 +86,7 @@ private SynchronizerConfiguration(
this.fastSyncMaximumPeerWaitTime = fastSyncMaximumPeerWaitTime;
this.worldStateHashCountPerRequest = worldStateHashCountPerRequest;
this.worldStateRequestParallelism = worldStateRequestParallelism;
this.worldStateRequestMaxRetries = worldStateRequestMaxRetries;
this.worldStateMaxRequestsWithoutProgress = worldStateMaxRequestsWithoutProgress;
this.blockPropagationRange = blockPropagationRange;
this.syncMode = syncMode;
this.downloaderChangeTargetThresholdByHeight = downloaderChangeTargetThresholdByHeight;
Expand Down Expand Up @@ -211,8 +211,8 @@ public int getWorldStateRequestParallelism() {
return worldStateRequestParallelism;
}

public int getWorldStateRequestMaxRetries() {
return worldStateRequestMaxRetries;
public int getWorldStateMaxRequestsWithoutProgress() {
return worldStateMaxRequestsWithoutProgress;
}

public static class Builder {
Expand All @@ -234,7 +234,8 @@ public static class Builder {
private int fastSyncMinimumPeerCount = DEFAULT_FAST_SYNC_MINIMUM_PEERS;
private int worldStateHashCountPerRequest = DEFAULT_WORLD_STATE_HASH_COUNT_PER_REQUEST;
private int worldStateRequestParallelism = DEFAULT_WORLD_STATE_REQUEST_PARALLELISM;
private int worldStateRequestMaxRetries = DEFAULT_WORLD_STATE_REQUEST_MAX_RETRIES;
private int worldStateMaxRequestsWithoutProgress =
DEFAULT_WORLD_STATE_MAX_REQUESTS_WITHOUT_PROGRESS;
private Duration fastSyncMaximumPeerWaitTime = DEFAULT_FAST_SYNC_MAXIMUM_PEER_WAIT_TIME;

public Builder fastSyncPivotDistance(final int distance) {
Expand Down Expand Up @@ -332,8 +333,9 @@ public Builder worldStateRequestParallelism(final int worldStateRequestParalleli
return this;
}

public Builder worldStateRequestMaxRetries(final int worldStateRequestMaxRetries) {
this.worldStateRequestMaxRetries = worldStateRequestMaxRetries;
public Builder worldStateMaxRequestsWithoutProgress(
final int worldStateMaxRequestsWithoutProgress) {
this.worldStateMaxRequestsWithoutProgress = worldStateMaxRequestsWithoutProgress;
return this;
}

Expand All @@ -350,7 +352,7 @@ public SynchronizerConfiguration build() {
fastSyncMaximumPeerWaitTime,
worldStateHashCountPerRequest,
worldStateRequestParallelism,
worldStateRequestMaxRetries,
worldStateMaxRequestsWithoutProgress,
blockPropagationRange,
syncMode,
downloaderChangeTargetThresholdByHeight,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,13 @@
import tech.pegasys.pantheon.util.bytes.BytesValue;

import java.util.Optional;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Stream;

public abstract class NodeDataRequest {
private final RequestType requestType;
private final Hash hash;
private BytesValue data;
private boolean requiresPersisting = true;
private final AtomicInteger failedRequestCount = new AtomicInteger(0);

protected NodeDataRequest(final RequestType requestType, final Hash hash) {
this.requestType = requestType;
Expand Down Expand Up @@ -58,7 +56,6 @@ public static NodeDataRequest deserialize(final BytesValue encoded) {
in.enterList();
final RequestType requestType = RequestType.fromValue(in.readByte());
final Hash hash = Hash.wrap(in.readBytes32());
final int failureCount = in.readIntScalar();
in.leaveList();

final NodeDataRequest deserialized;
Expand All @@ -78,15 +75,13 @@ public static NodeDataRequest deserialize(final BytesValue encoded) {
+ NodeDataRequest.class.getSimpleName());
}

deserialized.setFailureCount(failureCount);
return deserialized;
}

private void writeTo(final RLPOutput out) {
out.startList();
out.writeByte(requestType.getValue());
out.writeBytesValue(hash);
out.writeIntScalar(failedRequestCount.get());
out.endList();
}

Expand All @@ -112,14 +107,6 @@ public NodeDataRequest setRequiresPersisting(final boolean requiresPersisting) {
return this;
}

public int trackFailure() {
return failedRequestCount.incrementAndGet();
}

private void setFailureCount(final int failures) {
failedRequestCount.set(failures);
}

public final void persist(final WorldStateStorage.Updater updater) {
if (requiresPersisting) {
checkNotNull(getData(), "Must set data before node can be persisted.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,22 +40,27 @@ class WorldDownloadState {
private final TaskQueue<NodeDataRequest> pendingRequests;
private final ArrayBlockingQueue<Task<NodeDataRequest>> requestsToPersist;
private final int maxOutstandingRequests;
private final int maxRequestsWithoutProgress;
private final Set<EthTask<?>> outstandingRequests =
Collections.newSetFromMap(new ConcurrentHashMap<>());
private final AtomicBoolean sendingRequests = new AtomicBoolean(false);
private final CompletableFuture<Void> internalFuture;
private final CompletableFuture<Void> downloadFuture;
// Volatile so monitoring can access it without having to synchronize.
private volatile int requestsSinceLastProgress = 0;
private boolean waitingForNewPeer = false;
private BytesValue rootNodeData;
private EthTask<?> persistenceTask;

public WorldDownloadState(
final TaskQueue<NodeDataRequest> pendingRequests,
final ArrayBlockingQueue<Task<NodeDataRequest>> requestsToPersist,
final int maxOutstandingRequests) {
final int maxOutstandingRequests,
final int maxRequestsWithoutProgress) {
this.pendingRequests = pendingRequests;
this.requestsToPersist = requestsToPersist;
this.maxOutstandingRequests = maxOutstandingRequests;
this.maxRequestsWithoutProgress = maxRequestsWithoutProgress;
this.internalFuture = new CompletableFuture<>();
this.downloadFuture = new CompletableFuture<>();
this.internalFuture.whenComplete(this::cleanup);
Expand Down Expand Up @@ -183,11 +188,26 @@ public int getPersistenceQueueSize() {
return requestsToPersist.size();
}

public synchronized void markAsStalled(final int maxNodeRequestRetries) {
public synchronized void requestComplete(final boolean madeProgress) {
if (madeProgress) {
requestsSinceLastProgress = 0;
} else {
requestsSinceLastProgress++;
if (requestsSinceLastProgress >= maxRequestsWithoutProgress) {
markAsStalled(maxRequestsWithoutProgress);
}
}
}

public int getRequestsSinceLastProgress() {
return requestsSinceLastProgress;
}

private synchronized void markAsStalled(final int maxNodeRequestRetries) {
final String message =
"Download stalled due to too many failures to retrieve node data (>"
+ maxNodeRequestRetries
+ " failures)";
+ " requests without making progress)";
final WorldStateDownloaderException e = new StalledDownloadException(message);
internalFuture.completeExceptionally(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.function.Supplier;
Expand All @@ -60,13 +59,12 @@ public class WorldStateDownloader {
private final Counter retriedRequestsCounter;
private final Counter existingNodeCounter;
private final MetricsSystem metricsSystem;
private final AtomicInteger highestRetryCount = new AtomicInteger(0);

private final EthContext ethContext;
private final TaskQueue<NodeDataRequest> taskQueue;
private final int hashCountPerRequest;
private final int maxOutstandingRequests;
private final int maxNodeRequestRetries;
private final int maxNodeRequestsWithoutProgress;
private final WorldStateStorage worldStateStorage;

private final AtomicReference<WorldDownloadState> downloadState = new AtomicReference<>();
Expand All @@ -77,14 +75,14 @@ public WorldStateDownloader(
final TaskQueue<NodeDataRequest> taskQueue,
final int hashCountPerRequest,
final int maxOutstandingRequests,
final int maxNodeRequestRetries,
final int maxNodeRequestsWithoutProgress,
final MetricsSystem metricsSystem) {
this.ethContext = ethContext;
this.worldStateStorage = worldStateStorage;
this.taskQueue = taskQueue;
this.hashCountPerRequest = hashCountPerRequest;
this.maxOutstandingRequests = maxOutstandingRequests;
this.maxNodeRequestRetries = maxNodeRequestRetries;
this.maxNodeRequestsWithoutProgress = maxNodeRequestsWithoutProgress;
this.metricsSystem = metricsSystem;

metricsSystem.createLongGauge(
Expand Down Expand Up @@ -112,9 +110,9 @@ public WorldStateDownloader(

metricsSystem.createIntegerGauge(
MetricCategory.SYNCHRONIZER,
"world_state_node_request_failures_max",
"Highest number of times a node data request has been retried in this download",
highestRetryCount::get);
"world_state_node_requests_since_last_progress_current",
"Number of world state requests made since the last time new data was returned",
downloadStateValue(WorldDownloadState::getRequestsSinceLastProgress));

metricsSystem.createIntegerGauge(
MetricCategory.SYNCHRONIZER,
Expand Down Expand Up @@ -158,14 +156,14 @@ public CompletableFuture<Void> run(final BlockHeader header) {
}

// Room for the requests we expect to do in parallel plus some buffer but not unlimited.
final int persistenceQueueCapacity = hashCountPerRequest * maxNodeRequestRetries * 2;
final int persistenceQueueCapacity = hashCountPerRequest * maxOutstandingRequests * 2;
final WorldDownloadState newDownloadState =
new WorldDownloadState(
taskQueue,
new ArrayBlockingQueue<>(persistenceQueueCapacity),
maxOutstandingRequests);
maxOutstandingRequests,
maxNodeRequestsWithoutProgress);
this.downloadState.set(newDownloadState);
highestRetryCount.set(0);

newDownloadState.enqueueRequest(NodeDataRequest.createAccountDataRequest(stateRoot));

Expand Down Expand Up @@ -301,19 +299,15 @@ private void storeData(
final BlockHeader blockHeader,
final Map<Hash, BytesValue> data,
final WorldDownloadState downloadState) {
boolean madeProgress = false;
for (final Task<NodeDataRequest> task : requestTasks) {
final NodeDataRequest request = task.getData();
final BytesValue matchingData = data.get(request.getHash());
if (matchingData == null) {
retriedRequestsCounter.inc();
final int requestFailures = request.trackFailure();
updateHighestRetryCount(requestFailures);
task.markFailed();
if (requestFailures > maxNodeRequestRetries) {
LOG.info("Unavailable node {}", request.getHash());
downloadState.markAsStalled(maxNodeRequestRetries);
}
} else {
madeProgress = true;
request.setData(matchingData);
if (isRootState(blockHeader, request)) {
downloadState.enqueueRequests(request.getChildRequests());
Expand All @@ -324,17 +318,10 @@ private void storeData(
}
}
}
downloadState.requestComplete(madeProgress);
requestNodeData(blockHeader, downloadState);
}

private void updateHighestRetryCount(final int requestFailures) {
int previousHighestRetry = highestRetryCount.get();
while (requestFailures > previousHighestRetry) {
highestRetryCount.compareAndSet(previousHighestRetry, requestFailures);
previousHighestRetry = highestRetryCount.get();
}
}

private boolean isRootState(final BlockHeader blockHeader, final NodeDataRequest request) {
return request.getHash().equals(blockHeader.getStateRoot());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ public class WorldDownloadStateTest {
private static final BytesValue ROOT_NODE_DATA = BytesValue.of(1, 2, 3, 4);
private static final Hash ROOT_NODE_HASH = Hash.hash(ROOT_NODE_DATA);
private static final int MAX_OUTSTANDING_REQUESTS = 3;
private static final int MAX_REQUESTS_WITHOUT_PROGRESS = 10;

private final WorldStateStorage worldStateStorage =
new KeyValueStorageWorldStateStorage(new InMemoryKeyValueStorage());
Expand All @@ -55,7 +56,11 @@ public class WorldDownloadStateTest {
new ArrayBlockingQueue<>(100);

private final WorldDownloadState downloadState =
new WorldDownloadState(pendingRequests, requestsToPersist, MAX_OUTSTANDING_REQUESTS);
new WorldDownloadState(
pendingRequests,
requestsToPersist,
MAX_OUTSTANDING_REQUESTS,
MAX_REQUESTS_WITHOUT_PROGRESS);

private final CompletableFuture<Void> future = downloadState.getDownloadFuture();

Expand Down Expand Up @@ -181,10 +186,26 @@ public void shouldStopSendingAdditionalRequestsWhenFutureIsCancelled() {
@Test
public void shouldStopSendingAdditionalRequestsWhenDownloadIsMarkedAsStalled() {
pendingRequests.enqueue(createAccountDataRequest(Hash.EMPTY_TRIE_HASH));
final Runnable sendRequest = mockWithAction(() -> downloadState.markAsStalled(1));
final Runnable sendRequest = mockWithAction(() -> downloadState.requestComplete(false));

downloadState.whileAdditionalRequestsCanBeSent(sendRequest);
verify(sendRequest, times(1)).run();
verify(sendRequest, times(MAX_REQUESTS_WITHOUT_PROGRESS)).run();
}

@Test
public void shouldResetRequestsSinceProgressCountWhenProgressIsMade() {
downloadState.requestComplete(false);
downloadState.requestComplete(false);

downloadState.requestComplete(true);

for (int i = 0; i < MAX_REQUESTS_WITHOUT_PROGRESS - 1; i++) {
downloadState.requestComplete(false);
assertThat(downloadState.getDownloadFuture()).isNotDone();
}

downloadState.requestComplete(false);
assertThat(downloadState.getDownloadFuture()).isCompletedExceptionally();
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -633,7 +633,7 @@ public void stalledDownloader() {
final WorldStateStorage localStorage =
new KeyValueStorageWorldStateStorage(new InMemoryKeyValueStorage());
final SynchronizerConfiguration syncConfig =
SynchronizerConfiguration.builder().worldStateRequestMaxRetries(10).build();
SynchronizerConfiguration.builder().worldStateMaxRequestsWithoutProgress(10).build();
final WorldStateDownloader downloader =
createDownloader(syncConfig, ethProtocolManager.ethContext(), localStorage, queue);

Expand Down Expand Up @@ -902,7 +902,7 @@ private WorldStateDownloader createDownloader(
queue,
config.getWorldStateHashCountPerRequest(),
config.getWorldStateRequestParallelism(),
config.getWorldStateRequestMaxRetries(),
config.getWorldStateMaxRequestsWithoutProgress(),
new NoOpMetricsSystem());
}

Expand Down

0 comments on commit 5986da9

Please sign in to comment.