Skip to content

Commit

Permalink
Make the retrying snap tasks switching (#7307)
Browse files Browse the repository at this point in the history
* make snap tasks switching

Signed-off-by: [email protected] <[email protected]>
  • Loading branch information
pinges authored Jul 12, 2024
1 parent 812dc74 commit d35c6d7
Show file tree
Hide file tree
Showing 5 changed files with 40 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,17 @@
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.eth.manager.EthContext;
import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
import org.hyperledger.besu.ethereum.eth.manager.task.AbstractRetryingPeerTask;
import org.hyperledger.besu.ethereum.eth.manager.task.AbstractRetryingSwitchingPeerTask;
import org.hyperledger.besu.ethereum.eth.manager.task.EthTask;
import org.hyperledger.besu.ethereum.eth.messages.snap.AccountRangeMessage;
import org.hyperledger.besu.plugin.services.MetricsSystem;

import java.util.Optional;
import java.util.concurrent.CompletableFuture;

import org.apache.tuweni.bytes.Bytes32;

public class RetryingGetAccountRangeFromPeerTask
extends AbstractRetryingPeerTask<AccountRangeMessage.AccountRangeData> {
extends AbstractRetryingSwitchingPeerTask<AccountRangeMessage.AccountRangeData> {

public static final int MAX_RETRIES = 4;

Expand All @@ -46,9 +45,9 @@ private RetryingGetAccountRangeFromPeerTask(
final MetricsSystem metricsSystem) {
super(
ethContext,
MAX_RETRIES,
metricsSystem,
data -> data.accounts().isEmpty() && data.proofs().isEmpty(),
metricsSystem);
MAX_RETRIES);
this.ethContext = ethContext;
this.startKeyHash = startKeyHash;
this.endKeyHash = endKeyHash;
Expand All @@ -67,12 +66,12 @@ public static EthTask<AccountRangeMessage.AccountRangeData> forAccountRange(
}

@Override
protected CompletableFuture<AccountRangeMessage.AccountRangeData> executePeerTask(
final Optional<EthPeer> assignedPeer) {
protected CompletableFuture<AccountRangeMessage.AccountRangeData> executeTaskOnCurrentPeer(
final EthPeer peer) {
final GetAccountRangeFromPeerTask task =
GetAccountRangeFromPeerTask.forAccountRange(
ethContext, startKeyHash, endKeyHash, blockHeader, metricsSystem);
assignedPeer.ifPresent(task::assignPeer);
task.assignPeer(peer);
return executeSubTask(task::run)
.thenApply(
peerResult -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,21 @@
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.eth.manager.EthContext;
import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
import org.hyperledger.besu.ethereum.eth.manager.task.AbstractRetryingPeerTask;
import org.hyperledger.besu.ethereum.eth.manager.task.AbstractRetryingSwitchingPeerTask;
import org.hyperledger.besu.ethereum.eth.manager.task.EthTask;
import org.hyperledger.besu.plugin.services.MetricsSystem;

import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;

import org.apache.tuweni.bytes.Bytes;
import org.apache.tuweni.bytes.Bytes32;

public class RetryingGetBytecodeFromPeerTask extends AbstractRetryingPeerTask<Map<Bytes32, Bytes>> {
public class RetryingGetBytecodeFromPeerTask
extends AbstractRetryingSwitchingPeerTask<Map<Bytes32, Bytes>> {

public static final int MAX_RETRIES = 4;

private final EthContext ethContext;
private final List<Bytes32> codeHashes;
Expand All @@ -41,7 +43,7 @@ private RetryingGetBytecodeFromPeerTask(
final List<Bytes32> codeHashes,
final BlockHeader blockHeader,
final MetricsSystem metricsSystem) {
super(ethContext, 4, Map::isEmpty, metricsSystem);
super(ethContext, metricsSystem, Map::isEmpty, MAX_RETRIES);
this.ethContext = ethContext;
this.codeHashes = codeHashes;
this.blockHeader = blockHeader;
Expand All @@ -57,11 +59,10 @@ public static EthTask<Map<Bytes32, Bytes>> forByteCode(
}

@Override
protected CompletableFuture<Map<Bytes32, Bytes>> executePeerTask(
final Optional<EthPeer> assignedPeer) {
protected CompletableFuture<Map<Bytes32, Bytes>> executeTaskOnCurrentPeer(final EthPeer peer) {
final GetBytecodeFromPeerTask task =
GetBytecodeFromPeerTask.forBytecode(ethContext, codeHashes, blockHeader, metricsSystem);
assignedPeer.ifPresent(task::assignPeer);
task.assignPeer(peer);
return executeSubTask(task::run)
.thenApply(
peerResult -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,20 @@
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.eth.manager.EthContext;
import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
import org.hyperledger.besu.ethereum.eth.manager.task.AbstractRetryingPeerTask;
import org.hyperledger.besu.ethereum.eth.manager.task.AbstractRetryingSwitchingPeerTask;
import org.hyperledger.besu.ethereum.eth.manager.task.EthTask;
import org.hyperledger.besu.ethereum.eth.messages.snap.StorageRangeMessage;
import org.hyperledger.besu.plugin.services.MetricsSystem;

import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;

import org.apache.tuweni.bytes.Bytes32;

public class RetryingGetStorageRangeFromPeerTask
extends AbstractRetryingPeerTask<StorageRangeMessage.SlotRangeData> {
extends AbstractRetryingSwitchingPeerTask<StorageRangeMessage.SlotRangeData> {

public static final int MAX_RETRIES = 4;

private final EthContext ethContext;
private final List<Bytes32> accountHashes;
Expand All @@ -45,7 +46,11 @@ private RetryingGetStorageRangeFromPeerTask(
final Bytes32 endKeyHash,
final BlockHeader blockHeader,
final MetricsSystem metricsSystem) {
super(ethContext, 4, data -> data.proofs().isEmpty() && data.slots().isEmpty(), metricsSystem);
super(
ethContext,
metricsSystem,
data -> data.proofs().isEmpty() && data.slots().isEmpty(),
MAX_RETRIES);
this.ethContext = ethContext;
this.accountHashes = accountHashes;
this.startKeyHash = startKeyHash;
Expand All @@ -66,12 +71,12 @@ public static EthTask<StorageRangeMessage.SlotRangeData> forStorageRange(
}

@Override
protected CompletableFuture<StorageRangeMessage.SlotRangeData> executePeerTask(
final Optional<EthPeer> assignedPeer) {
protected CompletableFuture<StorageRangeMessage.SlotRangeData> executeTaskOnCurrentPeer(
final EthPeer peer) {
final GetStorageRangeFromPeerTask task =
GetStorageRangeFromPeerTask.forStorageRange(
ethContext, accountHashes, startKeyHash, endKeyHash, blockHeader, metricsSystem);
assignedPeer.ifPresent(task::assignPeer);
task.assignPeer(peer);
return executeSubTask(task::run)
.thenApply(
peerResult -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,20 @@
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.eth.manager.EthContext;
import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
import org.hyperledger.besu.ethereum.eth.manager.task.AbstractRetryingPeerTask;
import org.hyperledger.besu.ethereum.eth.manager.task.AbstractRetryingSwitchingPeerTask;
import org.hyperledger.besu.ethereum.eth.manager.task.EthTask;
import org.hyperledger.besu.plugin.services.MetricsSystem;

import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;

import org.apache.tuweni.bytes.Bytes;

public class RetryingGetTrieNodeFromPeerTask extends AbstractRetryingPeerTask<Map<Bytes, Bytes>> {
public class RetryingGetTrieNodeFromPeerTask
extends AbstractRetryingSwitchingPeerTask<Map<Bytes, Bytes>> {

public static final int MAX_RETRIES = 4;

private final EthContext ethContext;
private final Map<Bytes, List<Bytes>> paths;
Expand All @@ -40,7 +42,7 @@ private RetryingGetTrieNodeFromPeerTask(
final Map<Bytes, List<Bytes>> paths,
final BlockHeader blockHeader,
final MetricsSystem metricsSystem) {
super(ethContext, 4, Map::isEmpty, metricsSystem);
super(ethContext, metricsSystem, Map::isEmpty, MAX_RETRIES);
this.ethContext = ethContext;
this.paths = paths;
this.blockHeader = blockHeader;
Expand All @@ -56,11 +58,10 @@ public static EthTask<Map<Bytes, Bytes>> forTrieNodes(
}

@Override
protected CompletableFuture<Map<Bytes, Bytes>> executePeerTask(
final Optional<EthPeer> assignedPeer) {
protected CompletableFuture<Map<Bytes, Bytes>> executeTaskOnCurrentPeer(final EthPeer peer) {
final GetTrieNodeFromPeerTask task =
GetTrieNodeFromPeerTask.forTrieNodes(ethContext, paths, blockHeader, metricsSystem);
assignedPeer.ifPresent(task::assignPeer);
task.assignPeer(peer);
return executeSubTask(task::run)
.thenApply(
peerResult -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,11 @@ private void outputNextCompletedTask(final WritePipe<O> outputPipe) {
waitForAnyFutureToComplete();
outputCompletedTasks(outputPipe);
} catch (final InterruptedException e) {
LOG.trace("Interrupted while waiting for processing to complete", e.getMessage());
LOG.atTrace()
.setMessage("Interrupted while waiting for processing to complete: Message=({})")
.addArgument(e.getMessage())
.setCause(e)
.log();
} catch (final ExecutionException e) {
throw new AsyncOperationException("Async operation failed. " + e.getMessage(), e);
} catch (final TimeoutException e) {
Expand Down

0 comments on commit d35c6d7

Please sign in to comment.