Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert historical and recent block fetching for coupled blobs #6915

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,14 @@
import org.hyperledger.besu.plugin.services.MetricsSystem;
import tech.pegasys.teku.beacon.sync.events.SyncStateProvider;
import tech.pegasys.teku.beacon.sync.events.SyncStateTracker;
import tech.pegasys.teku.beacon.sync.fetch.FetchBlockTaskFactory;
import tech.pegasys.teku.beacon.sync.fetch.MilestoneBasedFetchBlockTaskFactory;
import tech.pegasys.teku.beacon.sync.fetch.DefaultFetchTaskFactory;
import tech.pegasys.teku.beacon.sync.fetch.FetchTaskFactory;
import tech.pegasys.teku.beacon.sync.forward.ForwardSync;
import tech.pegasys.teku.beacon.sync.forward.ForwardSyncService;
import tech.pegasys.teku.beacon.sync.forward.multipeer.MultipeerSyncService;
import tech.pegasys.teku.beacon.sync.forward.singlepeer.SinglePeerSyncServiceFactory;
import tech.pegasys.teku.beacon.sync.gossip.FetchRecentBlocksService;
import tech.pegasys.teku.beacon.sync.historical.HistoricalBlockSyncService;
import tech.pegasys.teku.ethereum.events.SlotEventsChannel;
import tech.pegasys.teku.ethereum.executionclient.events.ExecutionClientEventsChannel;
import tech.pegasys.teku.infrastructure.async.AsyncRunner;
import tech.pegasys.teku.infrastructure.async.AsyncRunnerFactory;
Expand Down Expand Up @@ -117,27 +116,25 @@ public SyncService create(final EventChannels eventChannels) {

final ForwardSyncService forwardSyncService = createForwardSyncService();

final FetchBlockTaskFactory fetchBlockTaskFactory =
new MilestoneBasedFetchBlockTaskFactory(spec, p2pNetwork);
final FetchTaskFactory fetchTaskFactory = new DefaultFetchTaskFactory(p2pNetwork);

final FetchRecentBlocksService recentBlockFetcher =
FetchRecentBlocksService.create(
asyncRunner, pendingBlocks, forwardSyncService, fetchBlockTaskFactory);

eventChannels.subscribe(SlotEventsChannel.class, recentBlockFetcher);
asyncRunner, pendingBlocks, forwardSyncService, fetchTaskFactory);

final SyncStateTracker syncStateTracker = createSyncStateTracker(forwardSyncService);

eventChannels.subscribe(ExecutionClientEventsChannel.class, syncStateTracker);

final HistoricalBlockSyncService historicalBlockSyncService =
createHistoricalSyncService(syncStateTracker, fetchBlockTaskFactory);
createHistoricalSyncService(syncStateTracker);

return new DefaultSyncService(
forwardSyncService, recentBlockFetcher, syncStateTracker, historicalBlockSyncService);
}

protected HistoricalBlockSyncService createHistoricalSyncService(
final SyncStateProvider syncStateProvider,
final FetchBlockTaskFactory fetchBlockTaskFactory) {
final SyncStateProvider syncStateProvider) {
final AsyncRunner asyncRunner =
asyncRunnerFactory.create(HistoricalBlockSyncService.class.getSimpleName(), 1);
return HistoricalBlockSyncService.create(
Expand All @@ -152,9 +149,7 @@ protected HistoricalBlockSyncService createHistoricalSyncService(
syncStateProvider,
syncConfig.isReconstructHistoricStatesEnabled(),
genesisStateResource,
syncConfig.fetchAllHistoricBlocks(),
fetchBlockTaskFactory,
blobsSidecarManager);
syncConfig.fetchAllHistoricBlocks());
}

protected SyncStateTracker createSyncStateTracker(final ForwardSync forwardSync) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Copyright ConsenSys Software Inc., 2023
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package tech.pegasys.teku.beacon.sync.fetch;

import java.util.Collections;
import java.util.Comparator;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import tech.pegasys.teku.networking.eth2.peers.Eth2Peer;
import tech.pegasys.teku.networking.p2p.network.P2PNetwork;
import tech.pegasys.teku.networking.p2p.peer.NodeId;

public abstract class AbstractFetchTask {

private static final Comparator<Eth2Peer> SHUFFLING_COMPARATOR =
Comparator.comparing(p -> Math.random());

private final Set<NodeId> queriedPeers = Collections.newSetFromMap(new ConcurrentHashMap<>());
private final AtomicBoolean cancelled = new AtomicBoolean(false);

private final P2PNetwork<Eth2Peer> eth2Network;

protected AbstractFetchTask(final P2PNetwork<Eth2Peer> eth2Network) {
this.eth2Network = eth2Network;
}

protected Optional<Eth2Peer> findRandomPeer() {
return eth2Network
.streamPeers()
.filter(p -> !queriedPeers.contains(p.getId()))
.min(
Comparator.comparing(Eth2Peer::getOutstandingRequests)
.thenComparing(SHUFFLING_COMPARATOR));
}

protected void trackQueriedPeer(final Eth2Peer peer) {
queriedPeers.add(peer.getId());
}

public void cancel() {
cancelled.set(true);
}

protected boolean isCancelled() {
return cancelled.get();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright ConsenSys Software Inc., 2023
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package tech.pegasys.teku.beacon.sync.fetch;

import org.apache.tuweni.bytes.Bytes32;
import tech.pegasys.teku.networking.eth2.peers.Eth2Peer;
import tech.pegasys.teku.networking.p2p.network.P2PNetwork;

public class DefaultFetchTaskFactory implements FetchTaskFactory {

private final P2PNetwork<Eth2Peer> eth2Network;

public DefaultFetchTaskFactory(final P2PNetwork<Eth2Peer> eth2Network) {
this.eth2Network = eth2Network;
}

@Override
public FetchBlockTask createFetchBlockTask(final Bytes32 blockRoot) {
return new FetchBlockTask(eth2Network, blockRoot);
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@

import java.util.Optional;
import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock;
import tech.pegasys.teku.spec.datastructures.blocks.blockbody.versions.deneb.SignedBeaconBlockAndBlobsSidecar;
import tech.pegasys.teku.spec.datastructures.execution.versions.deneb.BlobsSidecar;

public final class FetchBlockResult {

Expand All @@ -29,31 +27,18 @@ public enum Status {

private final Status status;
private final Optional<SignedBeaconBlock> block;
private final Optional<BlobsSidecar> blobsSidecar;

private FetchBlockResult(
final Status status,
final Optional<SignedBeaconBlock> block,
final Optional<BlobsSidecar> blobsSidecar) {
private FetchBlockResult(final Status status, final Optional<SignedBeaconBlock> block) {
this.status = status;
this.block = block;
this.blobsSidecar = blobsSidecar;
}

public static FetchBlockResult createSuccessful(final SignedBeaconBlock block) {
return new FetchBlockResult(Status.SUCCESSFUL, Optional.of(block), Optional.empty());
}

public static FetchBlockResult createSuccessful(
final SignedBeaconBlockAndBlobsSidecar blockAndBlobsSidecar) {
return new FetchBlockResult(
Status.SUCCESSFUL,
Optional.of(blockAndBlobsSidecar.getSignedBeaconBlock()),
Optional.of(blockAndBlobsSidecar.getBlobsSidecar()));
return new FetchBlockResult(Status.SUCCESSFUL, Optional.of(block));
}

public static FetchBlockResult createFailed(final Status failureStatus) {
return new FetchBlockResult(failureStatus, Optional.empty(), Optional.empty());
return new FetchBlockResult(failureStatus, Optional.empty());
}

public boolean isSuccessful() {
Expand All @@ -64,10 +49,6 @@ public Optional<SignedBeaconBlock> getBlock() {
return block;
}

public Optional<BlobsSidecar> getBlobsSidecar() {
return blobsSidecar;
}

public FetchBlockResult.Status getStatus() {
return status;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,7 @@

package tech.pegasys.teku.beacon.sync.fetch;

import java.util.Collections;
import java.util.Comparator;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand All @@ -27,31 +22,19 @@
import tech.pegasys.teku.infrastructure.async.SafeFuture;
import tech.pegasys.teku.networking.eth2.peers.Eth2Peer;
import tech.pegasys.teku.networking.p2p.network.P2PNetwork;
import tech.pegasys.teku.networking.p2p.peer.NodeId;

public class FetchBlockTask {
public class FetchBlockTask extends AbstractFetchTask {
private static final Logger LOG = LogManager.getLogger();
private static final Comparator<Eth2Peer> SHUFFLING_COMPARATOR =
Comparator.comparing(p -> Math.random());

private final P2PNetwork<Eth2Peer> eth2Network;

protected final Bytes32 blockRoot;

private final Set<NodeId> queriedPeers = Collections.newSetFromMap(new ConcurrentHashMap<>());

private final AtomicInteger numberOfRuns = new AtomicInteger(0);
private final AtomicBoolean cancelled = new AtomicBoolean(false);

public FetchBlockTask(final P2PNetwork<Eth2Peer> eth2Network, final Bytes32 blockRoot) {
this.eth2Network = eth2Network;
super(eth2Network);
this.blockRoot = blockRoot;
}

public void cancel() {
cancelled.set(true);
}

public Bytes32 getBlockRoot() {
return blockRoot;
}
Expand All @@ -61,37 +44,28 @@ public int getNumberOfRetries() {
}

/**
* Selects random {@link Eth2Peer} from the network and fetches a block by root using the
* implementation of {@link #fetchBlock(Eth2Peer)}. It also tracks the number of runs and the
* already queried peers.
* Selects random {@link Eth2Peer} from the network and fetches a block by root. It also tracks
* the number of runs and the already queried peers.
*/
public SafeFuture<FetchBlockResult> run() {
if (cancelled.get()) {
if (isCancelled()) {
return SafeFuture.completedFuture(FetchBlockResult.createFailed(Status.CANCELLED));
}

final Optional<Eth2Peer> maybePeer =
eth2Network
.streamPeers()
.filter(p -> !queriedPeers.contains(p.getId()))
.min(
Comparator.comparing(Eth2Peer::getOutstandingRequests)
.thenComparing(SHUFFLING_COMPARATOR));
final Optional<Eth2Peer> maybePeer = findRandomPeer();

if (maybePeer.isEmpty()) {
return SafeFuture.completedFuture(FetchBlockResult.createFailed(Status.NO_AVAILABLE_PEERS));
}
final Eth2Peer peer = maybePeer.get();

numberOfRuns.incrementAndGet();
queriedPeers.add(peer.getId());
trackQueriedPeer(peer);

return fetchBlock(peer);
}

/** Fetch block by root from an {@link Eth2Peer} */
public SafeFuture<FetchBlockResult> fetchBlock(final Eth2Peer peer) {

private SafeFuture<FetchBlockResult> fetchBlock(final Eth2Peer peer) {
return peer.requestBlockByRoot(blockRoot)
.thenApply(
maybeBlock ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,8 @@
package tech.pegasys.teku.beacon.sync.fetch;

import org.apache.tuweni.bytes.Bytes32;
import tech.pegasys.teku.infrastructure.unsigned.UInt64;

public interface FetchBlockTaskFactory {
public interface FetchTaskFactory {

FetchBlockTask create(UInt64 slot, Bytes32 blockRoot);
FetchBlockTask createFetchBlockTask(Bytes32 blockRoot);
}
Loading