Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Merged by Bors] - Adjust beacon node timeouts for validator client HTTP requests #2352

Closed
wants to merge 13 commits into from
3 changes: 2 additions & 1 deletion account_manager/src/validator/exit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use clap::{App, Arg, ArgMatches};
use environment::Environment;
use eth2::{
types::{GenesisData, StateId, ValidatorData, ValidatorId, ValidatorStatus},
BeaconNodeHttpClient,
BeaconNodeHttpClient, Timeouts,
};
use eth2_keystore::Keystore;
use eth2_network_config::Eth2NetworkConfig;
Expand Down Expand Up @@ -81,6 +81,7 @@ pub fn cli_run<E: EthSpec>(matches: &ArgMatches, env: Environment<E>) -> Result<
let client = BeaconNodeHttpClient::new(
SensitiveUrl::parse(&server_url)
.map_err(|e| format!("Failed to parse beacon http server: {:?}", e))?,
Timeouts::set_all(Duration::from_secs(env.eth2_config.spec.seconds_per_slot)),
);

let testnet_config = env
Expand Down
5 changes: 4 additions & 1 deletion beacon_node/http_api/tests/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use discv5::enr::{CombinedKey, EnrBuilder};
use environment::null_logger;
use eth2::Error;
use eth2::StatusCode;
use eth2::{types::*, BeaconNodeHttpClient};
use eth2::{types::*, BeaconNodeHttpClient, Timeouts};
use eth2_libp2p::{
rpc::methods::MetaData,
types::{EnrBitfield, SyncState},
Expand Down Expand Up @@ -37,6 +37,7 @@ use types::{

type E = MainnetEthSpec;

const SECONDS_PER_SLOT: u64 = 12;
const SLOTS_PER_EPOCH: u64 = 32;
const VALIDATOR_COUNT: usize = SLOTS_PER_EPOCH as usize;
const CHAIN_LENGTH: u64 = SLOTS_PER_EPOCH * 5 - 1; // Make `next_block` an epoch transition
Expand Down Expand Up @@ -213,6 +214,7 @@ impl ApiTester {
listening_socket.port()
))
.unwrap(),
Timeouts::set_all(Duration::from_secs(SECONDS_PER_SLOT)),
);

Self {
Expand Down Expand Up @@ -325,6 +327,7 @@ impl ApiTester {
listening_socket.port()
))
.unwrap(),
Timeouts::set_all(Duration::from_secs(SECONDS_PER_SLOT)),
);

Self {
Expand Down
126 changes: 115 additions & 11 deletions common/eth2/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use ssz::Decode;
use std::convert::TryFrom;
use std::fmt;
use std::iter::Iterator;
use std::time::Duration;

#[derive(Debug)]
pub enum Error {
Expand Down Expand Up @@ -77,12 +78,34 @@ impl fmt::Display for Error {
}
}

/// A struct to define a variety of different timeouts for different validator tasks to ensure
/// proper fallback behaviour.
#[derive(Clone)]
pub struct Timeouts {
pub attestation: Duration,
pub attester_duties: Duration,
pub proposal: Duration,
pub proposer_duties: Duration,
}

impl Timeouts {
pub fn set_all(timeout: Duration) -> Self {
Timeouts {
attestation: timeout,
attester_duties: timeout,
proposal: timeout,
proposer_duties: timeout,
}
}
}

/// A wrapper around `reqwest::Client` which provides convenience methods for interfacing with a
/// Lighthouse Beacon Node HTTP server (`http_api`).
#[derive(Clone)]
pub struct BeaconNodeHttpClient {
client: reqwest::Client,
server: SensitiveUrl,
timeouts: Timeouts,
}

impl fmt::Display for BeaconNodeHttpClient {
Expand All @@ -98,15 +121,24 @@ impl AsRef<str> for BeaconNodeHttpClient {
}

impl BeaconNodeHttpClient {
pub fn new(server: SensitiveUrl) -> Self {
pub fn new(server: SensitiveUrl, timeouts: Timeouts) -> Self {
Self {
client: reqwest::Client::new(),
server,
timeouts,
}
}

pub fn from_components(server: SensitiveUrl, client: reqwest::Client) -> Self {
Self { client, server }
pub fn from_components(
server: SensitiveUrl,
client: reqwest::Client,
timeouts: Timeouts,
) -> Self {
Self {
client,
server,
timeouts,
}
}

/// Return the path with the standard `/eth1/v1` prefix applied.
Expand All @@ -131,6 +163,26 @@ impl BeaconNodeHttpClient {
.map_err(Error::Reqwest)
}

/// Perform a HTTP GET request with a custom timeout.
async fn get_with_timeout<T: DeserializeOwned, U: IntoUrl>(
&self,
url: U,
timeout: Duration,
) -> Result<T, Error> {
let response = self
.client
.get(url)
.timeout(timeout)
.send()
.await
.map_err(Error::Reqwest)?;
ok_or_error(response)
.await?
.json()
.await
.map_err(Error::Reqwest)
}

/// Perform a HTTP GET request, returning `None` on a 404 error.
async fn get_opt<T: DeserializeOwned, U: IntoUrl>(&self, url: U) -> Result<Option<T>, Error> {
let response = self.client.get(url).send().await.map_err(Error::Reqwest)?;
Expand All @@ -146,6 +198,31 @@ impl BeaconNodeHttpClient {
}
}

/// Perform a HTTP GET request with a custom timeout, returning `None` on a 404 error.
async fn get_opt_with_timeout<T: DeserializeOwned, U: IntoUrl>(
&self,
url: U,
timeout: Duration,
) -> Result<Option<T>, Error> {
let response = self
.client
.get(url)
.timeout(timeout)
.send()
.await
.map_err(Error::Reqwest)?;
match ok_or_error(response).await {
Ok(resp) => resp.json().await.map(Option::Some).map_err(Error::Reqwest),
Err(err) => {
if err.status() == Some(StatusCode::NOT_FOUND) {
Ok(None)
} else {
Err(err)
}
}
}
}

/// Perform a HTTP GET request using an 'accept' header, returning `None` on a 404 error.
pub async fn get_bytes_opt_accept_header<U: IntoUrl>(
&self,
Expand Down Expand Up @@ -190,15 +267,36 @@ impl BeaconNodeHttpClient {
Ok(())
}

/// Perform a HTTP POST request, returning a JSON response.
async fn post_with_response<T: DeserializeOwned, U: IntoUrl, V: Serialize>(
/// Perform a HTTP POST request with a custom timeout.
async fn post_with_timeout<T: Serialize, U: IntoUrl>(
&self,
url: U,
body: &T,
timeout: Duration,
) -> Result<(), Error> {
let response = self
.client
.post(url)
.timeout(timeout)
.json(body)
.send()
.await
.map_err(Error::Reqwest)?;
ok_or_error(response).await?;
Ok(())
}

/// Perform a HTTP POST request with a custom timeout, returning a JSON response.
async fn post_with_timeout_and_response<T: DeserializeOwned, U: IntoUrl, V: Serialize>(
&self,
url: U,
body: &V,
timeout: Duration,
) -> Result<T, Error> {
let response = self
.client
.post(url)
.timeout(timeout)
.json(body)
.send()
.await
Expand Down Expand Up @@ -469,7 +567,8 @@ impl BeaconNodeHttpClient {
.push("beacon")
.push("blocks");

self.post(path, block).await?;
self.post_with_timeout(path, block, self.timeouts.proposal)
.await?;

Ok(())
}
Expand Down Expand Up @@ -567,6 +666,7 @@ impl BeaconNodeHttpClient {
let response = self
.client
.post(path)
.timeout(self.timeouts.attestation)
.json(attestations)
.send()
.await
Expand Down Expand Up @@ -928,7 +1028,8 @@ impl BeaconNodeHttpClient {
.push("proposer")
.push(&epoch.to_string());

self.get(path).await
self.get_with_timeout(path, self.timeouts.proposer_duties)
.await
}

/// `GET validator/blocks/{slot}`
Expand Down Expand Up @@ -974,10 +1075,10 @@ impl BeaconNodeHttpClient {
.append_pair("slot", &slot.to_string())
.append_pair("committee_index", &committee_index.to_string());

self.get(path).await
self.get_with_timeout(path, self.timeouts.attestation).await
}

/// `GET validator/attestation_attestation?slot,attestation_data_root`
/// `GET validator/aggregate_attestation?slot,attestation_data_root`
pub async fn get_validator_aggregate_attestation<T: EthSpec>(
&self,
slot: Slot,
Expand All @@ -997,7 +1098,8 @@ impl BeaconNodeHttpClient {
&format!("{:?}", attestation_data_root),
);

self.get_opt(path).await
self.get_opt_with_timeout(path, self.timeouts.attestation)
.await
}

/// `POST validator/duties/attester/{epoch}`
Expand All @@ -1015,7 +1117,8 @@ impl BeaconNodeHttpClient {
.push("attester")
.push(&epoch.to_string());

self.post_with_response(path, &indices).await
self.post_with_timeout_and_response(path, &indices, self.timeouts.attester_duties)
.await
}

/// `POST validator/aggregate_and_proofs`
Expand All @@ -1033,6 +1136,7 @@ impl BeaconNodeHttpClient {
let response = self
.client
.post(path)
.timeout(self.timeouts.attestation)
.json(aggregates)
.send()
.await
Expand Down
8 changes: 8 additions & 0 deletions lighthouse/tests/validator_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,14 @@ fn init_slashing_protections_flag() {
.with_config(|config| assert!(config.init_slashing_protection));
}

#[test]
fn use_long_timeouts_flag() {
CommandLineTest::new()
.flag("use-long-timeouts", None)
.run()
.with_config(|config| assert!(config.use_long_timeouts));
}

// Tests for Graffiti flags.
#[test]
fn graffiti_flag() {
Expand Down
3 changes: 2 additions & 1 deletion testing/node_test_rig/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

use beacon_node::ProductionBeaconNode;
use environment::RuntimeContext;
use eth2::{reqwest::ClientBuilder, BeaconNodeHttpClient};
use eth2::{reqwest::ClientBuilder, BeaconNodeHttpClient, Timeouts};
use sensitive_url::SensitiveUrl;
use std::path::PathBuf;
use std::time::Duration;
Expand Down Expand Up @@ -77,6 +77,7 @@ impl<E: EthSpec> LocalBeaconNode<E> {
Ok(BeaconNodeHttpClient::from_components(
beacon_node_url,
beacon_node_http_client,
Timeouts::set_all(HTTP_TIMEOUT),
))
}
}
Expand Down
16 changes: 16 additions & 0 deletions validator_client/src/attestation_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,10 @@ impl<T: SlotClock + 'static, E: EthSpec> AttestationService<T, E> {
let attestation_data = self
.beacon_nodes
.first_success(RequireSynced::No, |beacon_node| async move {
let _timer = metrics::start_timer_vec(
&metrics::ATTESTATION_SERVICE_TIMES,
&[metrics::ATTESTATIONS_HTTP_GET],
);
beacon_node
.get_validator_attestation_data(slot, committee_index)
.await
Expand Down Expand Up @@ -402,6 +406,10 @@ impl<T: SlotClock + 'static, E: EthSpec> AttestationService<T, E> {
match self
.beacon_nodes
.first_success(RequireSynced::No, |beacon_node| async move {
let _timer = metrics::start_timer_vec(
&metrics::ATTESTATION_SERVICE_TIMES,
&[metrics::ATTESTATIONS_HTTP_POST],
);
beacon_node
.post_beacon_pool_attestations(attestations_slice)
.await
Expand Down Expand Up @@ -454,6 +462,10 @@ impl<T: SlotClock + 'static, E: EthSpec> AttestationService<T, E> {
let aggregated_attestation = self
.beacon_nodes
.first_success(RequireSynced::No, |beacon_node| async move {
let _timer = metrics::start_timer_vec(
&metrics::ATTESTATION_SERVICE_TIMES,
&[metrics::AGGREGATES_HTTP_GET],
);
beacon_node
.get_validator_aggregate_attestation(
attestation_data_ref.slot,
Expand Down Expand Up @@ -506,6 +518,10 @@ impl<T: SlotClock + 'static, E: EthSpec> AttestationService<T, E> {
match self
.beacon_nodes
.first_success(RequireSynced::No, |beacon_node| async move {
let _timer = metrics::start_timer_vec(
&metrics::ATTESTATION_SERVICE_TIMES,
&[metrics::AGGREGATES_HTTP_POST],
);
beacon_node
.post_validator_aggregate_and_proof(signed_aggregate_and_proofs_slice)
.await
Expand Down
9 changes: 9 additions & 0 deletions validator_client/src/block_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -263,17 +263,26 @@ impl<T: SlotClock + 'static, E: EthSpec> BlockService<T, E> {
let signed_block = self
.beacon_nodes
.first_success(RequireSynced::No, |beacon_node| async move {
let get_timer = metrics::start_timer_vec(
&metrics::BLOCK_SERVICE_TIMES,
&[metrics::BEACON_BLOCK_HTTP_GET],
);
let block = beacon_node
.get_validator_blocks(slot, randao_reveal_ref, graffiti.as_ref())
.await
.map_err(|e| format!("Error from beacon node when producing block: {:?}", e))?
.data;
drop(get_timer);

let signed_block = self_ref
.validator_store
.sign_block(validator_pubkey_ref, block, current_slot)
.ok_or("Unable to sign block")?;

let _post_timer = metrics::start_timer_vec(
&metrics::BLOCK_SERVICE_TIMES,
&[metrics::BEACON_BLOCK_HTTP_POST],
);
beacon_node
.post_beacon_blocks(&signed_block)
.await
Expand Down
Loading