diff --git a/beacon_node/http_api/src/lib.rs b/beacon_node/http_api/src/lib.rs index 59bf367b4ca..2d50dc6c635 100644 --- a/beacon_node/http_api/src/lib.rs +++ b/beacon_node/http_api/src/lib.rs @@ -97,7 +97,7 @@ use warp::hyper::Body; use warp::sse::Event; use warp::Reply; use warp::{http::Response, Filter, Rejection}; -use warp_utils::{query::multi_key_query, uor::UnifyingOrFilter}; +use warp_utils::{query::multi_key_query, reject::convert_rejection, uor::UnifyingOrFilter}; const API_PREFIX: &str = "eth"; @@ -1802,7 +1802,7 @@ pub fn serve( ) .await .map(|()| warp::reply::json(&())); - task_spawner::convert_rejection(result).await + convert_rejection(result).await }, ); @@ -3817,12 +3817,12 @@ pub fn serve( .await; if initial_result.is_err() { - return task_spawner::convert_rejection(initial_result).await; + return convert_rejection(initial_result).await; } // Await a response from the builder without blocking a // `BeaconProcessor` worker. - task_spawner::convert_rejection(rx.await.unwrap_or_else(|_| { + convert_rejection(rx.await.unwrap_or_else(|_| { Ok(warp::reply::with_status( warp::reply::json(&"No response from channel"), eth2::StatusCode::INTERNAL_SERVER_ERROR, diff --git a/beacon_node/http_api/src/task_spawner.rs b/beacon_node/http_api/src/task_spawner.rs index cfee5e01ca0..a679b294f65 100644 --- a/beacon_node/http_api/src/task_spawner.rs +++ b/beacon_node/http_api/src/task_spawner.rs @@ -4,6 +4,7 @@ use std::future::Future; use tokio::sync::{mpsc::error::TrySendError, oneshot}; use types::EthSpec; use warp::reply::{Reply, Response}; +use warp_utils::reject::convert_rejection; /// Maps a request to a queue in the `BeaconProcessor`. #[derive(Clone, Copy)] @@ -35,24 +36,6 @@ pub struct TaskSpawner { beacon_processor_send: Option>, } -/// Convert a warp `Rejection` into a `Response`. -/// -/// This function should *always* be used to convert rejections into responses. This prevents warp -/// from trying to backtrack in strange ways. See: https://github.com/sigp/lighthouse/issues/3404 -pub async fn convert_rejection(res: Result) -> Response { - match res { - Ok(response) => response.into_response(), - Err(e) => match warp_utils::reject::handle_rejection(e).await { - Ok(reply) => reply.into_response(), - Err(_) => warp::reply::with_status( - warp::reply::json(&"unhandled error"), - eth2::StatusCode::INTERNAL_SERVER_ERROR, - ) - .into_response(), - }, - } -} - impl TaskSpawner { pub fn new(beacon_processor_send: Option>) -> Self { Self { diff --git a/common/eth2/src/lighthouse_vc/http_client.rs b/common/eth2/src/lighthouse_vc/http_client.rs index 83aeea4bfcc..67fe77a3157 100644 --- a/common/eth2/src/lighthouse_vc/http_client.rs +++ b/common/eth2/src/lighthouse_vc/http_client.rs @@ -1,13 +1,10 @@ -use super::{types::*, PK_LEN, SECRET_PREFIX}; +use super::types::*; use crate::Error; use account_utils::ZeroizeString; -use bytes::Bytes; -use libsecp256k1::{Message, PublicKey, Signature}; use reqwest::{ header::{HeaderMap, HeaderValue}, IntoUrl, }; -use ring::digest::{digest, SHA256}; use sensitive_url::SensitiveUrl; use serde::{de::DeserializeOwned, Serialize}; use std::fmt::{self, Display}; @@ -24,8 +21,7 @@ use types::graffiti::GraffitiString; pub struct ValidatorClientHttpClient { client: reqwest::Client, server: SensitiveUrl, - secret: Option, - server_pubkey: Option, + api_token: Option, authorization_header: AuthorizationHeader, } @@ -46,45 +42,13 @@ impl Display for AuthorizationHeader { } } -/// Parse an API token and return a secp256k1 public key. -/// -/// If the token does not start with the Lighthouse token prefix then `Ok(None)` will be returned. -/// An error will be returned if the token looks like a Lighthouse token but doesn't correspond to a -/// valid public key. -pub fn parse_pubkey(secret: &str) -> Result, Error> { - let secret = if !secret.starts_with(SECRET_PREFIX) { - return Ok(None); - } else { - &secret[SECRET_PREFIX.len()..] - }; - - serde_utils::hex::decode(secret) - .map_err(|e| Error::InvalidSecret(format!("invalid hex: {:?}", e))) - .and_then(|bytes| { - if bytes.len() != PK_LEN { - return Err(Error::InvalidSecret(format!( - "expected {} bytes not {}", - PK_LEN, - bytes.len() - ))); - } - - let mut arr = [0; PK_LEN]; - arr.copy_from_slice(&bytes); - PublicKey::parse_compressed(&arr) - .map_err(|e| Error::InvalidSecret(format!("invalid secp256k1 pubkey: {:?}", e))) - }) - .map(Some) -} - impl ValidatorClientHttpClient { /// Create a new client pre-initialised with an API token. pub fn new(server: SensitiveUrl, secret: String) -> Result { Ok(Self { client: reqwest::Client::new(), server, - server_pubkey: parse_pubkey(&secret)?, - secret: Some(secret.into()), + api_token: Some(secret.into()), authorization_header: AuthorizationHeader::Bearer, }) } @@ -96,8 +60,7 @@ impl ValidatorClientHttpClient { Ok(Self { client: reqwest::Client::new(), server, - secret: None, - server_pubkey: None, + api_token: None, authorization_header: AuthorizationHeader::Omit, }) } @@ -110,15 +73,14 @@ impl ValidatorClientHttpClient { Ok(Self { client, server, - server_pubkey: parse_pubkey(&secret)?, - secret: Some(secret.into()), + api_token: Some(secret.into()), authorization_header: AuthorizationHeader::Bearer, }) } /// Get a reference to this client's API token, if any. pub fn api_token(&self) -> Option<&ZeroizeString> { - self.secret.as_ref() + self.api_token.as_ref() } /// Read an API token from the specified `path`, stripping any trailing whitespace. @@ -128,19 +90,11 @@ impl ValidatorClientHttpClient { } /// Add an authentication token to use when making requests. - /// - /// If the token is Lighthouse-like, a pubkey derivation will be attempted. In the case - /// of failure the token will still be stored, and the client can continue to be used to - /// communicate with non-Lighthouse nodes. pub fn add_auth_token(&mut self, token: ZeroizeString) -> Result<(), Error> { - let pubkey_res = parse_pubkey(token.as_str()); - - self.secret = Some(token); + self.api_token = Some(token); self.authorization_header = AuthorizationHeader::Bearer; - pubkey_res.map(|opt_pubkey| { - self.server_pubkey = opt_pubkey; - }) + Ok(()) } /// Set to `false` to disable sending the `Authorization` header on requests. @@ -160,49 +114,17 @@ impl ValidatorClientHttpClient { self.authorization_header = AuthorizationHeader::Basic; } - async fn signed_body(&self, response: Response) -> Result { - let server_pubkey = self.server_pubkey.as_ref().ok_or(Error::NoServerPubkey)?; - let sig = response - .headers() - .get("Signature") - .ok_or(Error::MissingSignatureHeader)? - .to_str() - .map_err(|_| Error::InvalidSignatureHeader)? - .to_string(); - - let body = response.bytes().await.map_err(Error::from)?; - - let message = - Message::parse_slice(digest(&SHA256, &body).as_ref()).expect("sha256 is 32 bytes"); - - serde_utils::hex::decode(&sig) - .ok() - .and_then(|bytes| { - let sig = Signature::parse_der(&bytes).ok()?; - Some(libsecp256k1::verify(&message, &sig, server_pubkey)) - }) - .filter(|is_valid| *is_valid) - .ok_or(Error::InvalidSignatureHeader)?; - - Ok(body) - } - - async fn signed_json(&self, response: Response) -> Result { - let body = self.signed_body(response).await?; - serde_json::from_slice(&body).map_err(Error::InvalidJson) - } - fn headers(&self) -> Result { let mut headers = HeaderMap::new(); if self.authorization_header == AuthorizationHeader::Basic || self.authorization_header == AuthorizationHeader::Bearer { - let secret = self.secret.as_ref().ok_or(Error::NoToken)?; + let auth_header_token = self.api_token().ok_or(Error::NoToken)?; let header_value = HeaderValue::from_str(&format!( "{} {}", self.authorization_header, - secret.as_str() + auth_header_token.as_str() )) .map_err(|e| { Error::InvalidSecret(format!("secret is invalid as a header value: {}", e)) @@ -240,7 +162,8 @@ impl ValidatorClientHttpClient { async fn get(&self, url: U) -> Result { let response = self.get_response(url).await?; - self.signed_json(response).await + let body = response.bytes().await.map_err(Error::from)?; + serde_json::from_slice(&body).map_err(Error::InvalidJson) } async fn delete(&self, url: U) -> Result<(), Error> { @@ -263,7 +186,14 @@ impl ValidatorClientHttpClient { /// Perform a HTTP GET request, returning `None` on a 404 error. async fn get_opt(&self, url: U) -> Result, Error> { match self.get_response(url).await { - Ok(resp) => self.signed_json(resp).await.map(Option::Some), + Ok(resp) => { + let body = resp.bytes().await.map(Option::Some)?; + if let Some(body) = body { + serde_json::from_slice(&body).map_err(Error::InvalidJson) + } else { + Ok(None) + } + } Err(err) => { if err.status() == Some(StatusCode::NOT_FOUND) { Ok(None) @@ -297,7 +227,8 @@ impl ValidatorClientHttpClient { body: &T, ) -> Result { let response = self.post_with_raw_response(url, body).await?; - self.signed_json(response).await + let body = response.bytes().await.map_err(Error::from)?; + serde_json::from_slice(&body).map_err(Error::InvalidJson) } async fn post_with_unsigned_response( @@ -319,8 +250,7 @@ impl ValidatorClientHttpClient { .send() .await .map_err(Error::from)?; - let response = ok_or_error(response).await?; - self.signed_body(response).await?; + ok_or_error(response).await?; Ok(()) } diff --git a/common/eth2/src/lighthouse_vc/mod.rs b/common/eth2/src/lighthouse_vc/mod.rs index 81b4fca283a..038726c829a 100644 --- a/common/eth2/src/lighthouse_vc/mod.rs +++ b/common/eth2/src/lighthouse_vc/mod.rs @@ -1,10 +1,3 @@ pub mod http_client; pub mod std_types; pub mod types; - -/// The number of bytes in the secp256k1 public key used as the authorization token for the VC API. -pub const PK_LEN: usize = 33; - -/// The prefix for the secp256k1 public key when it is used as the authorization token for the VC -/// API. -pub const SECRET_PREFIX: &str = "api-token-"; diff --git a/common/warp_utils/src/reject.rs b/common/warp_utils/src/reject.rs index d33f32251b9..9b28c65212c 100644 --- a/common/warp_utils/src/reject.rs +++ b/common/warp_utils/src/reject.rs @@ -2,7 +2,7 @@ use eth2::types::{ErrorMessage, Failure, IndexedErrorMessage}; use std::convert::Infallible; use std::error::Error; use std::fmt; -use warp::{http::StatusCode, reject::Reject}; +use warp::{http::StatusCode, reject::Reject, reply::Response, Reply}; #[derive(Debug)] pub struct ServerSentEventError(pub String); @@ -255,3 +255,21 @@ pub async fn handle_rejection(err: warp::Rejection) -> Result(res: Result) -> Response { + match res { + Ok(response) => response.into_response(), + Err(e) => match handle_rejection(e).await { + Ok(reply) => reply.into_response(), + Err(_) => warp::reply::with_status( + warp::reply::json(&"unhandled error"), + eth2::StatusCode::INTERNAL_SERVER_ERROR, + ) + .into_response(), + }, + } +} diff --git a/common/warp_utils/src/task.rs b/common/warp_utils/src/task.rs index 001231f2c6b..e2fa4ebc368 100644 --- a/common/warp_utils/src/task.rs +++ b/common/warp_utils/src/task.rs @@ -1,3 +1,4 @@ +use crate::reject::convert_rejection; use serde::Serialize; use warp::reply::{Reply, Response}; @@ -24,14 +25,16 @@ where } /// A convenience wrapper around `blocking_task` for use with `warp` JSON responses. -pub async fn blocking_json_task(func: F) -> Result +pub async fn blocking_json_task(func: F) -> Response where F: FnOnce() -> Result + Send + 'static, T: Serialize + Send + 'static, { - blocking_response_task(|| { + let result = blocking_response_task(|| { let response = func()?; Ok(warp::reply::json(&response)) }) - .await + .await; + + convert_rejection(result).await } diff --git a/validator_client/src/http_api/api_secret.rs b/validator_client/src/http_api/api_secret.rs index e688792ddc1..32035caf473 100644 --- a/validator_client/src/http_api/api_secret.rs +++ b/validator_client/src/http_api/api_secret.rs @@ -1,85 +1,53 @@ -use eth2::lighthouse_vc::{PK_LEN, SECRET_PREFIX as PK_PREFIX}; use filesystem::create_with_600_perms; -use libsecp256k1::{Message, PublicKey, SecretKey}; -use rand::thread_rng; -use ring::digest::{digest, SHA256}; +use rand::distributions::Alphanumeric; +use rand::{thread_rng, Rng}; use std::fs; use std::path::{Path, PathBuf}; use warp::Filter; -/// The name of the file which stores the secret key. -/// -/// It is purposefully opaque to prevent users confusing it with the "secret" that they need to -/// share with API consumers (which is actually the public key). -pub const SK_FILENAME: &str = ".secp-sk"; - -/// Length of the raw secret key, in bytes. -pub const SK_LEN: usize = 32; - -/// The name of the file which stores the public key. -/// -/// For users, this public key is a "secret" that can be shared with API consumers to provide them -/// access to the API. We avoid calling it a "public" key to users, since they should not post this -/// value in a public forum. +/// The name of the file which stores the API token. pub const PK_FILENAME: &str = "api-token.txt"; -/// Contains a `secp256k1` keypair that is saved-to/loaded-from disk on instantiation. The keypair -/// is used for authorization/authentication for requests/responses on the HTTP API. +pub const PK_LEN: usize = 33; + +/// Contains a randomly generated string which is used for authorization of requests to the HTTP API. /// /// Provides convenience functions to ultimately provide: /// -/// - A signature across outgoing HTTP responses, applied to the `Signature` header. /// - Verification of proof-of-knowledge of the public key in `self` for incoming HTTP requests, /// via the `Authorization` header. /// /// The aforementioned scheme was first defined here: /// /// https://github.com/sigp/lighthouse/issues/1269#issuecomment-649879855 +/// +/// This scheme has since been tweaked to remove VC response signing and secp256k1 key generation. +/// https://github.com/sigp/lighthouse/issues/5423 pub struct ApiSecret { - pk: PublicKey, - sk: SecretKey, + pk: String, pk_path: PathBuf, } impl ApiSecret { - /// If both the secret and public keys are already on-disk, parse them and ensure they're both - /// from the same keypair. + /// If the public key is already on-disk, use it. /// - /// The provided `dir` is a directory containing two files, `SK_FILENAME` and `PK_FILENAME`. + /// The provided `dir` is a directory containing `PK_FILENAME`. /// - /// If either the secret or public key files are missing on disk, create a new keypair and + /// If the public key file is missing on disk, create a new key and /// write it to disk (over-writing any existing files). pub fn create_or_open>(dir: P) -> Result { - let sk_path = dir.as_ref().join(SK_FILENAME); let pk_path = dir.as_ref().join(PK_FILENAME); - if !(sk_path.exists() && pk_path.exists()) { - let sk = SecretKey::random(&mut thread_rng()); - let pk = PublicKey::from_secret_key(&sk); - - // Create and write the secret key to file with appropriate permissions - create_with_600_perms( - &sk_path, - serde_utils::hex::encode(sk.serialize()).as_bytes(), - ) - .map_err(|e| { - format!( - "Unable to create file with permissions for {:?}: {:?}", - sk_path, e - ) - })?; + if !pk_path.exists() { + let length = PK_LEN; + let pk: String = thread_rng() + .sample_iter(&Alphanumeric) + .take(length) + .map(char::from) + .collect(); // Create and write the public key to file with appropriate permissions - create_with_600_perms( - &pk_path, - format!( - "{}{}", - PK_PREFIX, - serde_utils::hex::encode(&pk.serialize_compressed()[..]) - ) - .as_bytes(), - ) - .map_err(|e| { + create_with_600_perms(&pk_path, pk.to_string().as_bytes()).map_err(|e| { format!( "Unable to create file with permissions for {:?}: {:?}", pk_path, e @@ -87,78 +55,18 @@ impl ApiSecret { })?; } - let sk = fs::read(&sk_path) - .map_err(|e| format!("cannot read {}: {}", SK_FILENAME, e)) - .and_then(|bytes| { - serde_utils::hex::decode(&String::from_utf8_lossy(&bytes)) - .map_err(|_| format!("{} should be 0x-prefixed hex", PK_FILENAME)) - }) - .and_then(|bytes| { - if bytes.len() == SK_LEN { - let mut array = [0; SK_LEN]; - array.copy_from_slice(&bytes); - SecretKey::parse(&array).map_err(|e| format!("invalid {}: {}", SK_FILENAME, e)) - } else { - Err(format!( - "{} expected {} bytes not {}", - SK_FILENAME, - SK_LEN, - bytes.len() - )) - } - })?; - let pk = fs::read(&pk_path) - .map_err(|e| format!("cannot read {}: {}", PK_FILENAME, e)) - .and_then(|bytes| { - let hex = - String::from_utf8(bytes).map_err(|_| format!("{} is not utf8", SK_FILENAME))?; - if let Some(stripped) = hex.strip_prefix(PK_PREFIX) { - serde_utils::hex::decode(stripped) - .map_err(|_| format!("{} should be 0x-prefixed hex", SK_FILENAME)) - } else { - Err(format!("unable to parse {}", SK_FILENAME)) - } - }) - .and_then(|bytes| { - if bytes.len() == PK_LEN { - let mut array = [0; PK_LEN]; - array.copy_from_slice(&bytes); - PublicKey::parse_compressed(&array) - .map_err(|e| format!("invalid {}: {}", PK_FILENAME, e)) - } else { - Err(format!( - "{} expected {} bytes not {}", - PK_FILENAME, - PK_LEN, - bytes.len() - )) - } - })?; + .map_err(|e| format!("cannot read {}: {}", PK_FILENAME, e))? + .iter() + .map(|&c| char::from(c)) + .collect(); - // Ensure that the keys loaded from disk are indeed a pair. - if PublicKey::from_secret_key(&sk) != pk { - fs::remove_file(&sk_path) - .map_err(|e| format!("unable to remove {}: {}", SK_FILENAME, e))?; - fs::remove_file(&pk_path) - .map_err(|e| format!("unable to remove {}: {}", PK_FILENAME, e))?; - return Err(format!( - "{:?} does not match {:?} and the files have been deleted. Please try again.", - sk_path, pk_path - )); - } - - Ok(Self { pk, sk, pk_path }) - } - - /// Returns the public key of `self` as a 0x-prefixed hex string. - fn pubkey_string(&self) -> String { - serde_utils::hex::encode(&self.pk.serialize_compressed()[..]) + Ok(Self { pk, pk_path }) } /// Returns the API token. pub fn api_token(&self) -> String { - format!("{}{}", PK_PREFIX, self.pubkey_string()) + self.pk.clone() } /// Returns the path for the API token file @@ -196,16 +104,4 @@ impl ApiSecret { .untuple_one() .boxed() } - - /// Returns a closure which produces a signature over some bytes using the secret key in - /// `self`. The signature is a 32-byte hash formatted as a 0x-prefixed string. - pub fn signer(&self) -> impl Fn(&[u8]) -> String + Clone { - let sk = self.sk; - move |input: &[u8]| -> String { - let message = - Message::parse_slice(digest(&SHA256, input).as_ref()).expect("sha256 is 32 bytes"); - let (signature, _) = libsecp256k1::sign(&message, &sk); - serde_utils::hex::encode(signature.serialize_der().as_ref()) - } - } } diff --git a/validator_client/src/http_api/mod.rs b/validator_client/src/http_api/mod.rs index a4480195e59..3d7cab8e5e0 100644 --- a/validator_client/src/http_api/mod.rs +++ b/validator_client/src/http_api/mod.rs @@ -45,15 +45,8 @@ use task_executor::TaskExecutor; use tokio_stream::{wrappers::BroadcastStream, StreamExt}; use types::{ChainSpec, ConfigAndPreset, EthSpec}; use validator_dir::Builder as ValidatorDirBuilder; -use warp::{ - http::{ - header::{HeaderValue, CONTENT_TYPE}, - response::Response, - StatusCode, - }, - sse::Event, - Filter, -}; +use warp::{sse::Event, Filter}; +use warp_utils::task::blocking_json_task; #[derive(Debug)] pub enum Error { @@ -176,9 +169,6 @@ pub fn serve( } }; - let signer = ctx.api_secret.signer(); - let signer = warp::any().map(move || signer.clone()); - let inner_validator_store = ctx.validator_store.clone(); let validator_store_filter = warp::any() .map(move || inner_validator_store.clone()) @@ -270,9 +260,8 @@ pub fn serve( let get_node_version = warp::path("lighthouse") .and(warp::path("version")) .and(warp::path::end()) - .and(signer.clone()) - .and_then(|signer| { - blocking_signed_json_task(signer, move || { + .then(|| { + blocking_json_task(move || { Ok(api_types::GenericResponse::from(api_types::VersionData { version: version_with_platform(), })) @@ -283,9 +272,8 @@ pub fn serve( let get_lighthouse_health = warp::path("lighthouse") .and(warp::path("health")) .and(warp::path::end()) - .and(signer.clone()) - .and_then(|signer| { - blocking_signed_json_task(signer, move || { + .then(|| { + blocking_json_task(move || { eth2::lighthouse::Health::observe() .map(api_types::GenericResponse::from) .map_err(warp_utils::reject::custom_bad_request) @@ -297,9 +285,8 @@ pub fn serve( .and(warp::path("spec")) .and(warp::path::end()) .and(spec_filter.clone()) - .and(signer.clone()) - .and_then(|spec: Arc<_>, signer| { - blocking_signed_json_task(signer, move || { + .then(|spec: Arc<_>| { + blocking_json_task(move || { let config = ConfigAndPreset::from_chain_spec::(&spec, None); Ok(api_types::GenericResponse::from(config)) }) @@ -310,9 +297,8 @@ pub fn serve( .and(warp::path("validators")) .and(warp::path::end()) .and(validator_store_filter.clone()) - .and(signer.clone()) - .and_then(|validator_store: Arc>, signer| { - blocking_signed_json_task(signer, move || { + .then(|validator_store: Arc>| { + blocking_json_task(move || { let validators = validator_store .initialized_validators() .read() @@ -335,10 +321,9 @@ pub fn serve( .and(warp::path::param::()) .and(warp::path::end()) .and(validator_store_filter.clone()) - .and(signer.clone()) - .and_then( - |validator_pubkey: PublicKey, validator_store: Arc>, signer| { - blocking_signed_json_task(signer, move || { + .then( + |validator_pubkey: PublicKey, validator_store: Arc>| { + blocking_json_task(move || { let validator = validator_store .initialized_validators() .read() @@ -370,9 +355,8 @@ pub fn serve( .and(system_info_filter) .and(app_start_filter) .and(validator_dir_filter.clone()) - .and(signer.clone()) - .and_then(|sysinfo, app_start: std::time::Instant, val_dir, signer| { - blocking_signed_json_task(signer, move || { + .then(|sysinfo, app_start: std::time::Instant, val_dir| { + blocking_json_task(move || { let app_uptime = app_start.elapsed().as_secs(); Ok(api_types::GenericResponse::from(observe_system_health_vc( sysinfo, val_dir, app_uptime, @@ -387,15 +371,13 @@ pub fn serve( .and(validator_store_filter.clone()) .and(graffiti_file_filter.clone()) .and(graffiti_flag_filter) - .and(signer.clone()) .and(log_filter.clone()) - .and_then( + .then( |validator_store: Arc>, graffiti_file: Option, graffiti_flag: Option, - signer, log| { - blocking_signed_json_task(signer, move || { + blocking_json_task(move || { let mut result = HashMap::new(); for (key, graffiti_definition) in validator_store .initialized_validators() @@ -425,17 +407,15 @@ pub fn serve( .and(secrets_dir_filter.clone()) .and(validator_store_filter.clone()) .and(spec_filter.clone()) - .and(signer.clone()) .and(task_executor_filter.clone()) - .and_then( + .then( move |body: Vec, validator_dir: PathBuf, secrets_dir: PathBuf, validator_store: Arc>, spec: Arc, - signer, task_executor: TaskExecutor| { - blocking_signed_json_task(signer, move || { + blocking_json_task(move || { let secrets_dir = store_passwords_in_secrets_dir.then_some(secrets_dir); if let Some(handle) = task_executor.handle() { let (validators, mnemonic) = @@ -472,17 +452,15 @@ pub fn serve( .and(secrets_dir_filter.clone()) .and(validator_store_filter.clone()) .and(spec_filter) - .and(signer.clone()) .and(task_executor_filter.clone()) - .and_then( + .then( move |body: api_types::CreateValidatorsMnemonicRequest, validator_dir: PathBuf, secrets_dir: PathBuf, validator_store: Arc>, spec: Arc, - signer, task_executor: TaskExecutor| { - blocking_signed_json_task(signer, move || { + blocking_json_task(move || { let secrets_dir = store_passwords_in_secrets_dir.then_some(secrets_dir); if let Some(handle) = task_executor.handle() { let mnemonic = @@ -521,16 +499,14 @@ pub fn serve( .and(validator_dir_filter.clone()) .and(secrets_dir_filter.clone()) .and(validator_store_filter.clone()) - .and(signer.clone()) .and(task_executor_filter.clone()) - .and_then( + .then( move |body: api_types::KeystoreValidatorsPostRequest, validator_dir: PathBuf, secrets_dir: PathBuf, validator_store: Arc>, - signer, task_executor: TaskExecutor| { - blocking_signed_json_task(signer, move || { + blocking_json_task(move || { // Check to ensure the password is correct. let keypair = body .keystore @@ -611,14 +587,12 @@ pub fn serve( .and(warp::path::end()) .and(warp::body::json()) .and(validator_store_filter.clone()) - .and(signer.clone()) .and(task_executor_filter.clone()) - .and_then( + .then( |body: Vec, validator_store: Arc>, - signer, task_executor: TaskExecutor| { - blocking_signed_json_task(signer, move || { + blocking_json_task(move || { if let Some(handle) = task_executor.handle() { let web3signers: Vec = body .into_iter() @@ -666,16 +640,14 @@ pub fn serve( .and(warp::body::json()) .and(validator_store_filter.clone()) .and(graffiti_file_filter.clone()) - .and(signer.clone()) .and(task_executor_filter.clone()) - .and_then( + .then( |validator_pubkey: PublicKey, body: api_types::ValidatorPatchRequest, validator_store: Arc>, graffiti_file: Option, - signer, task_executor: TaskExecutor| { - blocking_signed_json_task(signer, move || { + blocking_json_task(move || { if body.graffiti.is_some() && graffiti_file.is_some() { return Err(warp_utils::reject::custom_bad_request( "Unable to update graffiti as the \"--graffiti-file\" flag is set" @@ -784,10 +756,9 @@ pub fn serve( // GET /lighthouse/auth let get_auth = warp::path("lighthouse").and(warp::path("auth").and(warp::path::end())); let get_auth = get_auth - .and(signer.clone()) .and(api_token_path_filter) - .and_then(|signer, token_path: PathBuf| { - blocking_signed_json_task(signer, move || { + .then(move |token_path: PathBuf| { + blocking_json_task(move || { Ok(AuthResponse { token_path: token_path.display().to_string(), }) @@ -799,23 +770,20 @@ pub fn serve( .and(warp::path("keystores")) .and(warp::path::end()) .and(warp::body::json()) - .and(signer.clone()) .and(validator_store_filter.clone()) .and(task_executor_filter.clone()) .and(log_filter.clone()) - .and_then( - move |request, signer, validator_store, task_executor, log| { - blocking_signed_json_task(signer, move || { - if allow_keystore_export { - keystores::export(request, validator_store, task_executor, log) - } else { - Err(warp_utils::reject::custom_bad_request( - "keystore export is disabled".to_string(), - )) - } - }) - }, - ); + .then(move |request, validator_store, task_executor, log| { + blocking_json_task(move || { + if allow_keystore_export { + keystores::export(request, validator_store, task_executor, log) + } else { + Err(warp_utils::reject::custom_bad_request( + "keystore export is disabled".to_string(), + )) + } + }) + }); // Standard key-manager endpoints. let eth_v1 = warp::path("eth").and(warp::path("v1")); @@ -829,10 +797,9 @@ pub fn serve( .and(warp::path("feerecipient")) .and(warp::path::end()) .and(validator_store_filter.clone()) - .and(signer.clone()) - .and_then( - |validator_pubkey: PublicKey, validator_store: Arc>, signer| { - blocking_signed_json_task(signer, move || { + .then( + |validator_pubkey: PublicKey, validator_store: Arc>| { + blocking_json_task(move || { if validator_store .initialized_validators() .read() @@ -869,13 +836,11 @@ pub fn serve( .and(warp::body::json()) .and(warp::path::end()) .and(validator_store_filter.clone()) - .and(signer.clone()) - .and_then( + .then( |validator_pubkey: PublicKey, request: api_types::UpdateFeeRecipientRequest, - validator_store: Arc>, - signer| { - blocking_signed_json_task(signer, move || { + validator_store: Arc>| { + blocking_json_task(move || { if validator_store .initialized_validators() .read() @@ -909,10 +874,9 @@ pub fn serve( .and(warp::path("feerecipient")) .and(warp::path::end()) .and(validator_store_filter.clone()) - .and(signer.clone()) - .and_then( - |validator_pubkey: PublicKey, validator_store: Arc>, signer| { - blocking_signed_json_task(signer, move || { + .then( + |validator_pubkey: PublicKey, validator_store: Arc>| { + blocking_json_task(move || { if validator_store .initialized_validators() .read() @@ -946,10 +910,9 @@ pub fn serve( .and(warp::path("gas_limit")) .and(warp::path::end()) .and(validator_store_filter.clone()) - .and(signer.clone()) - .and_then( - |validator_pubkey: PublicKey, validator_store: Arc>, signer| { - blocking_signed_json_task(signer, move || { + .then( + |validator_pubkey: PublicKey, validator_store: Arc>| { + blocking_json_task(move || { if validator_store .initialized_validators() .read() @@ -978,13 +941,11 @@ pub fn serve( .and(warp::body::json()) .and(warp::path::end()) .and(validator_store_filter.clone()) - .and(signer.clone()) - .and_then( + .then( |validator_pubkey: PublicKey, request: api_types::UpdateGasLimitRequest, - validator_store: Arc>, - signer| { - blocking_signed_json_task(signer, move || { + validator_store: Arc>| { + blocking_json_task(move || { if validator_store .initialized_validators() .read() @@ -1018,10 +979,9 @@ pub fn serve( .and(warp::path("gas_limit")) .and(warp::path::end()) .and(validator_store_filter.clone()) - .and(signer.clone()) - .and_then( - |validator_pubkey: PublicKey, validator_store: Arc>, signer| { - blocking_signed_json_task(signer, move || { + .then( + |validator_pubkey: PublicKey, validator_store: Arc>| { + blocking_json_task(move || { if validator_store .initialized_validators() .read() @@ -1058,17 +1018,15 @@ pub fn serve( .and(validator_store_filter.clone()) .and(slot_clock_filter) .and(log_filter.clone()) - .and(signer.clone()) .and(task_executor_filter.clone()) - .and_then( + .then( |pubkey: PublicKey, query: api_types::VoluntaryExitQuery, validator_store: Arc>, slot_clock: T, log, - signer, task_executor: TaskExecutor| { - blocking_signed_json_task(signer, move || { + blocking_json_task(move || { if let Some(handle) = task_executor.handle() { let signed_voluntary_exit = handle.block_on(create_signed_voluntary_exit( @@ -1096,13 +1054,11 @@ pub fn serve( .and(warp::path::end()) .and(validator_store_filter.clone()) .and(graffiti_flag_filter) - .and(signer.clone()) - .and_then( + .then( |pubkey: PublicKey, validator_store: Arc>, - graffiti_flag: Option, - signer| { - blocking_signed_json_task(signer, move || { + graffiti_flag: Option| { + blocking_json_task(move || { let graffiti = get_graffiti(pubkey.clone(), validator_store, graffiti_flag)?; Ok(GenericResponse::from(GetGraffitiResponse { pubkey: pubkey.into(), @@ -1121,14 +1077,12 @@ pub fn serve( .and(warp::path::end()) .and(validator_store_filter.clone()) .and(graffiti_file_filter.clone()) - .and(signer.clone()) - .and_then( + .then( |pubkey: PublicKey, query: SetGraffitiRequest, validator_store: Arc>, - graffiti_file: Option, - signer| { - blocking_signed_json_task(signer, move || { + graffiti_file: Option| { + blocking_json_task(move || { if graffiti_file.is_some() { return Err(warp_utils::reject::invalid_auth( "Unable to update graffiti as the \"--graffiti-file\" flag is set" @@ -1149,13 +1103,11 @@ pub fn serve( .and(warp::path::end()) .and(validator_store_filter.clone()) .and(graffiti_file_filter.clone()) - .and(signer.clone()) - .and_then( + .then( |pubkey: PublicKey, validator_store: Arc>, - graffiti_file: Option, - signer| { - blocking_signed_json_task(signer, move || { + graffiti_file: Option| { + blocking_json_task(move || { if graffiti_file.is_some() { return Err(warp_utils::reject::invalid_auth( "Unable to delete graffiti as the \"--graffiti-file\" flag is set" @@ -1169,32 +1121,24 @@ pub fn serve( .map(|reply| warp::reply::with_status(reply, warp::http::StatusCode::NO_CONTENT)); // GET /eth/v1/keystores - let get_std_keystores = std_keystores - .and(signer.clone()) - .and(validator_store_filter.clone()) - .and_then(|signer, validator_store: Arc>| { - blocking_signed_json_task(signer, move || Ok(keystores::list(validator_store))) - }); + let get_std_keystores = std_keystores.and(validator_store_filter.clone()).then( + |validator_store: Arc>| { + blocking_json_task(move || Ok(keystores::list(validator_store))) + }, + ); // POST /eth/v1/keystores let post_std_keystores = std_keystores .and(warp::body::json()) - .and(signer.clone()) .and(validator_dir_filter) .and(secrets_dir_filter) .and(validator_store_filter.clone()) .and(task_executor_filter.clone()) .and(log_filter.clone()) - .and_then( - move |request, - signer, - validator_dir, - secrets_dir, - validator_store, - task_executor, - log| { + .then( + move |request, validator_dir, secrets_dir, validator_store, task_executor, log| { let secrets_dir = store_passwords_in_secrets_dir.then_some(secrets_dir); - blocking_signed_json_task(signer, move || { + blocking_json_task(move || { keystores::import( request, validator_dir, @@ -1210,33 +1154,30 @@ pub fn serve( // DELETE /eth/v1/keystores let delete_std_keystores = std_keystores .and(warp::body::json()) - .and(signer.clone()) .and(validator_store_filter.clone()) .and(task_executor_filter.clone()) .and(log_filter.clone()) - .and_then(|request, signer, validator_store, task_executor, log| { - blocking_signed_json_task(signer, move || { + .then(|request, validator_store, task_executor, log| { + blocking_json_task(move || { keystores::delete(request, validator_store, task_executor, log) }) }); // GET /eth/v1/remotekeys - let get_std_remotekeys = std_remotekeys - .and(signer.clone()) - .and(validator_store_filter.clone()) - .and_then(|signer, validator_store: Arc>| { - blocking_signed_json_task(signer, move || Ok(remotekeys::list(validator_store))) - }); + let get_std_remotekeys = std_remotekeys.and(validator_store_filter.clone()).then( + |validator_store: Arc>| { + blocking_json_task(move || Ok(remotekeys::list(validator_store))) + }, + ); // POST /eth/v1/remotekeys let post_std_remotekeys = std_remotekeys .and(warp::body::json()) - .and(signer.clone()) .and(validator_store_filter.clone()) .and(task_executor_filter.clone()) .and(log_filter.clone()) - .and_then(|request, signer, validator_store, task_executor, log| { - blocking_signed_json_task(signer, move || { + .then(|request, validator_store, task_executor, log| { + blocking_json_task(move || { remotekeys::import(request, validator_store, task_executor, log) }) }); @@ -1244,12 +1185,11 @@ pub fn serve( // DELETE /eth/v1/remotekeys let delete_std_remotekeys = std_remotekeys .and(warp::body::json()) - .and(signer) .and(validator_store_filter) .and(task_executor_filter) .and(log_filter.clone()) - .and_then(|request, signer, validator_store, task_executor, log| { - blocking_signed_json_task(signer, move || { + .then(|request, validator_store, task_executor, log| { + blocking_json_task(move || { remotekeys::delete(request, validator_store, task_executor, log) }) }); @@ -1369,42 +1309,3 @@ pub fn serve( Ok((listening_socket, server)) } - -/// Executes `func` in blocking tokio task (i.e., where long-running tasks are permitted). -/// JSON-encodes the return value of `func`, using the `signer` function to produce a signature of -/// those bytes. -pub async fn blocking_signed_json_task( - signer: S, - func: F, -) -> Result -where - S: Fn(&[u8]) -> String, - F: FnOnce() -> Result + Send + 'static, - T: Serialize + Send + 'static, -{ - warp_utils::task::blocking_task(func) - .await - .map(|func_output| { - let mut response = match serde_json::to_vec(&func_output) { - Ok(body) => { - let mut res = Response::new(body); - res.headers_mut() - .insert(CONTENT_TYPE, HeaderValue::from_static("application/json")); - res - } - Err(_) => Response::builder() - .status(StatusCode::INTERNAL_SERVER_ERROR) - .body(vec![]) - .expect("can produce simple response from static values"), - }; - - let body: &Vec = response.body(); - let signature = signer(body); - let header_value = - HeaderValue::from_str(&signature).expect("hash can be encoded as header"); - - response.headers_mut().append("Signature", header_value); - - response - }) -}