Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[MEV Boost\Builder] Send Validator Registrations in Batches #5829

Merged
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ public static OkHttpClient create(
final Logger logger,
final Optional<JwtConfig> jwtConfig,
final TimeProvider timeProvider) {
final OkHttpClient.Builder builder = new OkHttpClient.Builder().callTimeout(timeout);
final OkHttpClient.Builder builder =
new OkHttpClient.Builder().callTimeout(timeout).readTimeout(timeout);
if (logger.isTraceEnabled()) {
final HttpLoggingInterceptor loggingInterceptor = new HttpLoggingInterceptor(logger::trace);
loggingInterceptor.setLevel(Level.BODY);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,8 +159,7 @@ public void onFailure(@NotNull final Call call, @NotNull final IOException ex) {
@Override
public void onResponse(
@NotNull final Call call, @NotNull final okhttp3.Response response) {
HttpUrl requestUrl = response.request().url();
LOG.trace("{} {} {}", response.request().method(), requestUrl, response.code());
LOG.trace("{} {} {}", request.method(), request.url(), response.code());
if (!response.isSuccessful()) {
handleFailure(response, futureResponse);
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,17 @@ public class ValidatorProposerOptions {
private UInt64 registrationDefaultGasLimit =
ValidatorConfig.DEFAULT_VALIDATOR_REGISTRATION_GAS_LIMIT;

@Option(
names = {"--Xvalidators-registration-sending-batch-size"},
paramLabel = "<INTEGER>",
showDefaultValue = Visibility.ALWAYS,
description =
"Change the default batch size for sending validator registrations to the Beacon Node.",
arity = "1",
hidden = true)
private int registrationSendingBatchSize =
ValidatorConfig.DEFAULT_VALIDATOR_REGISTRATION_SENDING_BATCH_SIZE;

@Option(
names = {"--Xvalidators-proposer-blinded-blocks-enabled"},
paramLabel = "<BOOLEAN>",
Expand All @@ -90,6 +101,7 @@ public void configure(TekuConfiguration.Builder builder) {
.refreshProposerConfigFromSource(proposerConfigRefreshEnabled)
.validatorsRegistrationDefaultEnabled(validatorsRegistrationDefaultEnabled)
.blindedBeaconBlocksEnabled(blindedBlocksEnabled)
.validatorsRegistrationDefaultGasLimit(registrationDefaultGasLimit));
.validatorsRegistrationDefaultGasLimit(registrationDefaultGasLimit)
.validatorsRegistrationSendingBatchSize(registrationSendingBatchSize));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ public class ValidatorConfig {
public static final boolean DEFAULT_VALIDATOR_PROPOSER_CONFIG_REFRESH_ENABLED = false;
public static final boolean DEFAULT_VALIDATOR_REGISTRATION_DEFAULT_ENABLED = false;
public static final boolean DEFAULT_VALIDATOR_BLINDED_BLOCKS_ENABLED = false;
public static final int DEFAULT_VALIDATOR_REGISTRATION_SENDING_BATCH_SIZE = 100;
public static final UInt64 DEFAULT_VALIDATOR_REGISTRATION_GAS_LIMIT = UInt64.valueOf(30_000_000);

private final List<String> validatorKeys;
Expand All @@ -71,6 +72,7 @@ public class ValidatorConfig {
private final boolean validatorsRegistrationDefaultEnabled;
private final boolean validatorClientUseSszBlocksEnabled;
private final UInt64 validatorsRegistrationDefaultGasLimit;
private final int validatorsRegistrationSendingBatchSize;
private final int executorMaxQueueSize;

private ValidatorConfig(
Expand All @@ -96,6 +98,7 @@ private ValidatorConfig(
final boolean blindedBeaconBlocksEnabled,
final boolean validatorClientUseSszBlocksEnabled,
final UInt64 validatorsRegistrationDefaultGasLimit,
final int validatorsRegistrationSendingBatchSize,
final int executorMaxQueueSize) {
this.validatorKeys = validatorKeys;
this.validatorExternalSignerPublicKeySources = validatorExternalSignerPublicKeySources;
Expand All @@ -122,6 +125,7 @@ private ValidatorConfig(
this.validatorsRegistrationDefaultEnabled = validatorsRegistrationDefaultEnabled;
this.validatorClientUseSszBlocksEnabled = validatorClientUseSszBlocksEnabled;
this.validatorsRegistrationDefaultGasLimit = validatorsRegistrationDefaultGasLimit;
this.validatorsRegistrationSendingBatchSize = validatorsRegistrationSendingBatchSize;
this.executorMaxQueueSize = executorMaxQueueSize;
}

Expand Down Expand Up @@ -196,6 +200,10 @@ public UInt64 getValidatorsRegistrationDefaultGasLimit() {
return validatorsRegistrationDefaultGasLimit;
}

public int getValidatorsRegistrationSendingBatchSize() {
return validatorsRegistrationSendingBatchSize;
}

public boolean getRefreshProposerConfigFromSource() {
return refreshProposerConfigFromSource;
}
Expand Down Expand Up @@ -254,6 +262,8 @@ public static final class Builder {
private boolean blindedBlocksEnabled = DEFAULT_VALIDATOR_BLINDED_BLOCKS_ENABLED;
private boolean validatorClientSszBlocksEnabled = DEFAULT_VALIDATOR_CLIENT_SSZ_BLOCKS_ENABLED;
private UInt64 validatorsRegistrationDefaultGasLimit = DEFAULT_VALIDATOR_REGISTRATION_GAS_LIMIT;
private int validatorsRegistrationSendingBatchSize =
DEFAULT_VALIDATOR_REGISTRATION_SENDING_BATCH_SIZE;
private int executorMaxQueueSize = DEFAULT_EXECUTOR_MAX_QUEUE_SIZE;

private Builder() {}
Expand Down Expand Up @@ -398,6 +408,12 @@ public Builder validatorsRegistrationDefaultGasLimit(
return this;
}

public Builder validatorsRegistrationSendingBatchSize(
final int validatorsRegistrationSendingBatchSize) {
this.validatorsRegistrationSendingBatchSize = validatorsRegistrationSendingBatchSize;
return this;
}

public Builder executorMaxQueueSize(final int executorMaxQueueSize) {
this.executorMaxQueueSize = executorMaxQueueSize;
return this;
Expand Down Expand Up @@ -433,6 +449,7 @@ public ValidatorConfig build() {
blindedBlocksEnabled,
validatorClientSszBlocksEnabled,
validatorsRegistrationDefaultGasLimit,
validatorsRegistrationSendingBatchSize,
executorMaxQueueSize);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import tech.pegasys.teku.spec.Spec;
import tech.pegasys.teku.spec.SpecMilestone;
import tech.pegasys.teku.validator.api.ValidatorApiChannel;
import tech.pegasys.teku.validator.api.ValidatorConfig;
import tech.pegasys.teku.validator.api.ValidatorTimingChannel;
import tech.pegasys.teku.validator.beaconnode.BeaconNodeApi;
import tech.pegasys.teku.validator.beaconnode.GenesisDataProvider;
Expand Down Expand Up @@ -104,16 +105,15 @@ private ValidatorClientService(
public static ValidatorClientService create(
final ServiceConfig services, final ValidatorClientConfiguration config) {
final EventChannels eventChannels = services.getEventChannels();
final ValidatorConfig validatorConfig = config.getValidatorConfig();

final AsyncRunner asyncRunner =
services.createAsyncRunnerWithMaxQueueSize(
"validator", config.getValidatorConfig().getExecutorMaxQueueSize());
final boolean generateEarlyAttestations =
config.getValidatorConfig().generateEarlyAttestations();
final boolean preferSszBlockEncoding =
config.getValidatorConfig().isValidatorClientUseSszBlocksEnabled();
"validator", validatorConfig.getExecutorMaxQueueSize());
final boolean generateEarlyAttestations = validatorConfig.generateEarlyAttestations();
final boolean preferSszBlockEncoding = validatorConfig.isValidatorClientUseSszBlocksEnabled();
final BeaconNodeApi beaconNodeApi =
config
.getValidatorConfig()
validatorConfig
.getBeaconNodeApiEndpoint()
.map(
endpoint ->
Expand All @@ -135,7 +135,6 @@ public static ValidatorClientService create(
final ForkProvider forkProvider = new ForkProvider(config.getSpec(), genesisDataProvider);

final ValidatorLoader validatorLoader = createValidatorLoader(config, asyncRunner, services);

final ValidatorRestApiConfig validatorApiConfig = config.getValidatorRestApiConfig();
Optional<RestApi> validatorRestApi = Optional.empty();
Optional<ProposerConfigProvider> proposerConfigProvider = Optional.empty();
Expand All @@ -146,18 +145,18 @@ public static ValidatorClientService create(
Optional.of(
ProposerConfigProvider.create(
asyncRunner,
config.getValidatorConfig().getRefreshProposerConfigFromSource(),
validatorConfig.getRefreshProposerConfigFromSource(),
new ProposerConfigLoader(new JsonProvider().getObjectMapper()),
services.getTimeProvider(),
config.getValidatorConfig().getProposerConfigSource()));
validatorConfig.getProposerConfigSource()));

beaconProposerPreparer =
Optional.of(
new BeaconProposerPreparer(
validatorApiChannel,
Optional.empty(),
proposerConfigProvider.get(),
config.getValidatorConfig().getProposerDefaultFeeRecipient(),
validatorConfig.getProposerDefaultFeeRecipient(),
config.getSpec(),
Optional.of(
ValidatorClientService.getKeyManagerPath(services.getDataDirLayout())
Expand All @@ -170,9 +169,11 @@ public static ValidatorClientService create(
services.getTimeProvider(),
validatorLoader.getOwnedValidators(),
proposerConfigProvider.get(),
config.getValidatorConfig(),
validatorConfig,
beaconProposerPreparer.get(),
validatorApiChannel));
new ValidatorRegistrationBatchSender(
validatorConfig.getValidatorsRegistrationSendingBatchSize(),
validatorApiChannel)));
}
if (validatorApiConfig.isRestApiEnabled()) {
validatorRestApi =
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/*
* Copyright ConsenSys Software Inc., 2022
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package tech.pegasys.teku.validator.client;

import com.google.common.collect.Lists;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import tech.pegasys.teku.infrastructure.async.SafeFuture;
import tech.pegasys.teku.infrastructure.ssz.SszList;
import tech.pegasys.teku.infrastructure.ssz.impl.SszUtils;
import tech.pegasys.teku.spec.datastructures.execution.SignedValidatorRegistration;
import tech.pegasys.teku.spec.schemas.ApiSchemas;
import tech.pegasys.teku.validator.api.ValidatorApiChannel;

public class ValidatorRegistrationBatchSender {
private static final Logger LOG = LogManager.getLogger();

private final int batchSize;
private final ValidatorApiChannel validatorApiChannel;

public ValidatorRegistrationBatchSender(
final int batchSize, final ValidatorApiChannel validatorApiChannel) {
this.batchSize = batchSize;
this.validatorApiChannel = validatorApiChannel;
}

public SafeFuture<Void> sendInBatches(
final List<SignedValidatorRegistration> validatorRegistrations) {
if (validatorRegistrations.isEmpty()) {
LOG.debug("No validator(s) registrations required to be sent to the Beacon Node.");
return SafeFuture.completedFuture(null);
}

final List<List<SignedValidatorRegistration>> batchedRegistrations =
Lists.partition(validatorRegistrations, batchSize);

LOG.debug(
"Going to send {} validator(s) registrations to the Beacon Node in {} batch(es)",
validatorRegistrations.size(),
batchedRegistrations.size());

final Iterator<List<SignedValidatorRegistration>> batchedRegistrationsIterator =
batchedRegistrations.iterator();

final AtomicInteger batchCounter = new AtomicInteger(0);
final AtomicInteger successfullySentRegistrations = new AtomicInteger(0);

return SafeFuture.asyncDoWhile(
() -> {
if (!batchedRegistrationsIterator.hasNext()) {
return SafeFuture.completedFuture(false);
}
final List<SignedValidatorRegistration> batch = batchedRegistrationsIterator.next();
final int currentBatch = batchCounter.incrementAndGet();
LOG.debug("Starting to send batch {}/{}", currentBatch, batchedRegistrations.size());
return sendBatch(batch)
.thenApply(
__ -> {
successfullySentRegistrations.updateAndGet(count -> count + batch.size());
LOG.debug(
"Batch {}/{}: {} validator(s) registrations were sent to the Beacon Node.",
currentBatch,
batchedRegistrations.size(),
batch.size());
return true;
});
})
.whenComplete(
(__, throwable) ->
LOG.info(
"{} out of {} validator(s) registrations were successfully sent to the Beacon Node.",
successfullySentRegistrations.get(),
validatorRegistrations.size()));
}

private SafeFuture<Void> sendBatch(
final List<SignedValidatorRegistration> validatorRegistrations) {
final SszList<SignedValidatorRegistration> sszValidatorRegistrations =
SszUtils.toSszList(
ApiSchemas.SIGNED_VALIDATOR_REGISTRATIONS_SCHEMA, validatorRegistrations);
return validatorApiChannel.registerValidators(sszValidatorRegistrations);
}
}
Loading