Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
async run forkchoice::onBlock in blockImporter
Browse files Browse the repository at this point in the history
tbenr committed Nov 6, 2024

Verified

This commit was signed with the committer’s verified signature. The key has expired.
vespian Paweł Rozlach
1 parent d813213 commit b2289eb
Showing 8 changed files with 75 additions and 8 deletions.
Original file line number Diff line number Diff line change
@@ -146,6 +146,7 @@ public static SyncingNodeManager create(

final BlockImporter blockImporter =
new BlockImporter(
asyncRunner,
spec,
receivedBlockEventsChannelPublisher,
recentChainData,
Original file line number Diff line number Diff line change
@@ -36,6 +36,8 @@
import tech.pegasys.teku.benchmarks.util.CustomRunner;
import tech.pegasys.teku.bls.BLSKeyPair;
import tech.pegasys.teku.bls.BLSSignatureVerifier;
import tech.pegasys.teku.infrastructure.async.AsyncRunner;
import tech.pegasys.teku.infrastructure.async.DelayedExecutorAsyncRunner;
import tech.pegasys.teku.infrastructure.async.eventthread.InlineEventThread;
import tech.pegasys.teku.infrastructure.metrics.StubMetricsSystem;
import tech.pegasys.teku.infrastructure.ssz.collections.SszMutableUInt64List;
@@ -72,6 +74,7 @@
@Threads(1)
@Fork(1)
public class EpochTransitionBenchmark {
AsyncRunner asyncRunner;
Spec spec;
WeakSubjectivityValidator wsValidator;
RecentChainData recentChainData;
@@ -100,6 +103,7 @@ public void init() throws Exception {
AbstractBlockProcessor.depositSignatureVerifier = BLSSignatureVerifier.NO_OP;

spec = TestSpecFactory.createMainnetAltair();
asyncRunner = DelayedExecutorAsyncRunner.create();
String blocksFile =
"/blocks/blocks_epoch_"
+ spec.getSlotsPerEpoch(UInt64.ZERO)
@@ -131,6 +135,7 @@ public void init() throws Exception {

blockImporter =
new BlockImporter(
asyncRunner,
spec,
receivedBlockEventsChannelPublisher,
recentChainData,
Original file line number Diff line number Diff line change
@@ -31,6 +31,8 @@
import tech.pegasys.teku.bls.BLSPublicKey;
import tech.pegasys.teku.bls.BLSSignatureVerifier;
import tech.pegasys.teku.bls.BLSTestUtil;
import tech.pegasys.teku.infrastructure.async.AsyncRunner;
import tech.pegasys.teku.infrastructure.async.DelayedExecutorAsyncRunner;
import tech.pegasys.teku.infrastructure.async.eventthread.InlineEventThread;
import tech.pegasys.teku.infrastructure.metrics.StubMetricsSystem;
import tech.pegasys.teku.infrastructure.unsigned.UInt64;
@@ -63,6 +65,7 @@ public class ProfilingRun {
private Spec spec = TestSpecFactory.createMainnetPhase0();

private final MetricsSystem metricsSystem = new StubMetricsSystem();
private final AsyncRunner asyncRunner = DelayedExecutorAsyncRunner.create();

@Disabled
@Test
@@ -111,6 +114,7 @@ public void importBlocks() throws Exception {
BeaconChainUtil.create(spec, recentChainData, validatorKeys, false);
BlockImporter blockImporter =
new BlockImporter(
asyncRunner,
spec,
receivedBlockEventsChannelPublisher,
recentChainData,
@@ -203,6 +207,7 @@ public void importBlocksMemProfiling() throws Exception {
metricsSystem);
BlockImporter blockImporter =
new BlockImporter(
asyncRunner,
spec,
receivedBlockEventsChannelPublisher,
recentChainData,
Original file line number Diff line number Diff line change
@@ -33,6 +33,8 @@
import tech.pegasys.teku.benchmarks.gen.KeyFileGenerator;
import tech.pegasys.teku.bls.BLSKeyPair;
import tech.pegasys.teku.bls.BLSSignatureVerifier;
import tech.pegasys.teku.infrastructure.async.AsyncRunner;
import tech.pegasys.teku.infrastructure.async.DelayedExecutorAsyncRunner;
import tech.pegasys.teku.infrastructure.async.eventthread.InlineEventThread;
import tech.pegasys.teku.infrastructure.metrics.StubMetricsSystem;
import tech.pegasys.teku.infrastructure.unsigned.UInt64;
@@ -79,6 +81,7 @@ public abstract class TransitionBenchmark {
public void init() throws Exception {
spec = TestSpecFactory.createMainnetAltair();
AbstractBlockProcessor.depositSignatureVerifier = BLSSignatureVerifier.NO_OP;
AsyncRunner asyncRunner = DelayedExecutorAsyncRunner.create();

String blocksFile =
"/blocks/blocks_epoch_"
@@ -109,6 +112,7 @@ public void init() throws Exception {

blockImporter =
new BlockImporter(
asyncRunner,
spec,
receivedBlockEventsChannelPublisher,
recentChainData,
Original file line number Diff line number Diff line change
@@ -20,6 +20,7 @@
import javax.annotation.CheckReturnValue;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import tech.pegasys.teku.infrastructure.async.AsyncRunner;
import tech.pegasys.teku.infrastructure.async.SafeFuture;
import tech.pegasys.teku.infrastructure.logging.EventLogger;
import tech.pegasys.teku.infrastructure.ssz.SszList;
@@ -65,13 +66,17 @@ public class BlockImporter {
private final AtomicReference<CheckpointState> latestFinalizedCheckpointState =
new AtomicReference<>(null);

private final AsyncRunner asyncRunner;

public BlockImporter(
final AsyncRunner asyncRunner,
final Spec spec,
final ReceivedBlockEventsChannel receivedBlockEventsChannelPublisher,
final RecentChainData recentChainData,
final ForkChoice forkChoice,
final WeakSubjectivityValidator weakSubjectivityValidator,
final ExecutionLayerChannel executionLayer) {
this.asyncRunner = asyncRunner;
this.spec = spec;
this.receivedBlockEventsChannelPublisher = receivedBlockEventsChannelPublisher;
this.recentChainData = recentChainData;
@@ -106,8 +111,13 @@ public SafeFuture<BlockImportResult> importBlock(
return validateWeakSubjectivityPeriod()
.thenCompose(
__ ->
forkChoice.onBlock(
block, blockImportPerformance, blockBroadcastValidator, executionLayer))
asyncRunner.runAsync(
() ->
forkChoice.onBlock(
block,
blockImportPerformance,
blockBroadcastValidator,
executionLayer)))
.thenApply(
result -> {
if (!result.isSuccessful()) {
@@ -141,7 +151,7 @@ public SafeFuture<BlockImportResult> importBlock(
});
}

private SafeFuture<?> validateWeakSubjectivityPeriod() {
private SafeFuture<Void> validateWeakSubjectivityPeriod() {
return getLatestCheckpointState()
.thenCombine(
SafeFuture.of(() -> recentChainData.getCurrentSlot().orElseThrow()),
Original file line number Diff line number Diff line change
@@ -16,6 +16,7 @@
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatCode;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
@@ -38,6 +39,8 @@
import tech.pegasys.teku.bls.BLSSignatureVerifier;
import tech.pegasys.teku.bls.BLSTestUtil;
import tech.pegasys.teku.ethereum.execution.types.Eth1Address;
import tech.pegasys.teku.infrastructure.async.AsyncRunner;
import tech.pegasys.teku.infrastructure.async.ExceptionThrowingFutureSupplier;
import tech.pegasys.teku.infrastructure.async.SafeFuture;
import tech.pegasys.teku.infrastructure.async.eventthread.InlineEventThread;
import tech.pegasys.teku.infrastructure.metrics.StubMetricsSystem;
@@ -79,6 +82,7 @@
import tech.pegasys.teku.weaksubjectivity.config.WeakSubjectivityConfig;

public class BlockImporterTest {
private final AsyncRunner asyncRunner = mock(AsyncRunner.class);
private final Spec spec = TestSpecFactory.createMinimalPhase0();
private final SpecConfig genesisConfig = spec.getGenesisSpecConfig();
private final AttestationSchema<?> attestationSchema =
@@ -113,6 +117,7 @@ public class BlockImporterTest {

private final BlockImporter blockImporter =
new BlockImporter(
asyncRunner,
spec,
receivedBlockEventsChannelPublisher,
recentChainData,
@@ -133,6 +138,15 @@ public static void dispose() {

@BeforeEach
public void setup() {
// prepare a synchronous async runner
doAnswer(
invocation -> {
final ExceptionThrowingFutureSupplier<?> task = invocation.getArgument(0);
return SafeFuture.completedFuture(SafeFuture.of(task.get()).join());
})
.when(asyncRunner)
.runAsync((ExceptionThrowingFutureSupplier<?>) any());

otherChain.initializeStorage();
localChain.initializeStorage();
when(weakSubjectivityValidator.isBlockValid(any(), any())).thenReturn(true);
@@ -403,6 +417,7 @@ public void importBlock_weakSubjectivityFailure_wrongAncestor() throws Exception
WeakSubjectivityValidator.lenient(wsConfig);
final BlockImporter blockImporter =
new BlockImporter(
asyncRunner,
spec,
receivedBlockEventsChannelPublisher,
recentChainData,
@@ -433,6 +448,7 @@ public void importBlock_weakSubjectivityChecksPass() throws Exception {
WeakSubjectivityValidator.lenient(wsConfig);
final BlockImporter blockImporter =
new BlockImporter(
asyncRunner,
spec,
receivedBlockEventsChannelPublisher,
recentChainData,
@@ -463,6 +479,7 @@ public void importBlock_runWSPChecks() throws Exception {
storageSystem.getMetricsSystem());
final BlockImporter blockImporter =
new BlockImporter(
asyncRunner,
spec,
receivedBlockEventsChannelPublisher,
storageSystem.recentChainData(),
@@ -508,6 +525,7 @@ public void importBlock_nonFinalizingChain_runWSPChecks() throws Exception {
storageSystem.getMetricsSystem());
final BlockImporter blockImporter =
new BlockImporter(
asyncRunner,
spec,
receivedBlockEventsChannelPublisher,
storageSystem.recentChainData(),
@@ -561,6 +579,7 @@ public void importBlock_nonFinalizingChain_skipWSPChecks() throws Exception {
storageSystem.getMetricsSystem());
final BlockImporter blockImporter =
new BlockImporter(
asyncRunner,
spec,
receivedBlockEventsChannelPublisher,
storageSystem.recentChainData(),
@@ -606,6 +625,7 @@ public void getLatestCheckpointState_initialCall() {
storageSystem.getMetricsSystem());
final BlockImporter blockImporter =
new BlockImporter(
asyncRunner,
spec,
receivedBlockEventsChannelPublisher,
storageSystem.recentChainData(),
@@ -639,6 +659,7 @@ public void getLatestCheckpointState_shouldPullUpdatedFinalizedCheckpoint() {
storageSystem.getMetricsSystem());
final BlockImporter blockImporter =
new BlockImporter(
asyncRunner,
spec,
receivedBlockEventsChannelPublisher,
storageSystem.recentChainData(),
@@ -694,6 +715,7 @@ public void importBlock_validBlsToExecutionChanges() throws Exception {

final BlockImporter blockImporter =
new BlockImporter(
asyncRunner,
spec,
receivedBlockEventsChannelPublisher,
storageSystem.recentChainData(),
Original file line number Diff line number Diff line change
@@ -30,7 +30,6 @@
import static org.mockito.Mockito.when;
import static tech.pegasys.teku.infrastructure.async.FutureUtil.ignoreFuture;
import static tech.pegasys.teku.infrastructure.async.SafeFutureAssert.assertThatSafeFuture;
import static tech.pegasys.teku.infrastructure.async.SafeFutureAssert.safeJoin;
import static tech.pegasys.teku.spec.config.SpecConfig.GENESIS_SLOT;
import static tech.pegasys.teku.spec.datastructures.validator.BroadcastValidationLevel.GOSSIP;
import static tech.pegasys.teku.spec.logic.common.statetransition.results.BlockImportResult.FailureReason.FAILED_DATA_AVAILABILITY_CHECK_INVALID;
@@ -53,13 +52,18 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.tuweni.bytes.Bytes32;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import tech.pegasys.teku.bls.BLSSignatureVerifier;
import tech.pegasys.teku.infrastructure.async.AsyncRunner;
import tech.pegasys.teku.infrastructure.async.ExceptionThrowingFutureSupplier;
import tech.pegasys.teku.infrastructure.async.SafeFuture;
import tech.pegasys.teku.infrastructure.async.SafeFutureAssert;
import tech.pegasys.teku.infrastructure.async.eventthread.InlineEventThread;
@@ -109,6 +113,7 @@

@SuppressWarnings("FutureReturnValueIgnored")
public class BlockManagerTest {
private final AsyncRunner asyncRunner = mock(AsyncRunner.class);
private final StubTimeProvider timeProvider = StubTimeProvider.withTimeInSeconds(0);
private final EventLogger eventLogger = mock(EventLogger.class);
private Spec spec;
@@ -157,6 +162,15 @@ public static void resetSession() {

@BeforeEach
public void setup() {
// prepare an async runner
doAnswer(
invocation -> {
final ExceptionThrowingFutureSupplier<?> task = invocation.getArgument(0);
return SafeFuture.of(task.get());
})
.when(asyncRunner)
.runAsync((ExceptionThrowingFutureSupplier<?>) any());

setupWithSpec(TestSpecFactory.createMinimalDeneb());
}

@@ -184,6 +198,7 @@ private void setupWithSpec(final Spec spec) {
this.executionLayer = spy(new ExecutionLayerChannelStub(spec, false, Optional.empty()));
this.blockImporter =
new BlockImporter(
asyncRunner,
spec,
receivedBlockEventsChannelPublisher,
localRecentChainData,
@@ -1196,9 +1211,13 @@ private SafeFutureAssert<BlockImportResult> assertThatBlockImport(final SignedBe
}

private void safeJoinBlockImport(final SignedBeaconBlock block) {
safeJoin(
blockManager
.importBlock(block)
.thenCompose(BlockImportAndBroadcastValidationResults::blockImportResult));
try {
blockManager
.importBlock(block)
.thenCompose(BlockImportAndBroadcastValidationResults::blockImportResult)
.get(5, TimeUnit.SECONDS);
} catch (final InterruptedException | ExecutionException | TimeoutException e) {
throw new RuntimeException(e);
}
}
}
Original file line number Diff line number Diff line change
@@ -1235,6 +1235,7 @@ public void initBlockImporter() {
LOG.debug("BeaconChainController.initBlockImporter()");
blockImporter =
new BlockImporter(
beaconAsyncRunner,
spec,
receivedBlockEventsChannelPublisher,
recentChainData,

0 comments on commit b2289eb

Please sign in to comment.