Skip to content

Commit

Permalink
Replace rustls with boring-ssl
Browse files Browse the repository at this point in the history
This removes all re-attempts present in monero-serai's RPC and is an attempt to
narrow down the sporadic failures.

Inspired by hyperium/hyper#3427
  • Loading branch information
kayabaNerve committed Nov 29, 2023
1 parent d1122a6 commit 7dc7f8f
Show file tree
Hide file tree
Showing 16 changed files with 342 additions and 123 deletions.
268 changes: 239 additions & 29 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -93,3 +93,5 @@ lazy_static = { git = "https://github.com/rust-lang-nursery/lazy-static.rs", rev
# subxt *can* pull these off crates.io yet there's no benefit to this
sp-core-hashing = { git = "https://github.com/serai-dex/substrate" }
sp-std = { git = "https://github.com/serai-dex/substrate" }

hyper-boring = { git = "https://github.com/cloudflare/boring", rev = "423c260d87b69a926594ded0dd693b5cf1220452" }
9 changes: 5 additions & 4 deletions coins/bitcoin/src/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use thiserror::Error;
use serde::{Deserialize, de::DeserializeOwned};
use serde_json::json;

use simple_request::{hyper, Request, Client};
use simple_request::{hyper, Full, Request, Client};

use bitcoin::{
hashes::{Hash, hex::FromHex},
Expand Down Expand Up @@ -62,7 +62,8 @@ impl Rpc {
/// provided to this library, if the RPC has an incompatible argument layout. That is not checked
/// at time of RPC creation.
pub async fn new(url: String) -> Result<Rpc, RpcError> {
let rpc = Rpc { client: Client::with_connection_pool(), url };
let rpc =
Rpc { client: Client::with_connection_pool().map_err(|_| RpcError::ConnectionError)?, url };

// Make an RPC request to verify the node is reachable and sane
let res: String = rpc.rpc_call("help", json!([])).await?;
Expand Down Expand Up @@ -110,11 +111,11 @@ impl Rpc {
let mut request = Request::from(
hyper::Request::post(&self.url)
.header("Content-Type", "application/json")
.body(
.body(Full::new(
serde_json::to_vec(&json!({ "jsonrpc": "2.0", "method": method, "params": params }))
.unwrap()
.into(),
)
))
.unwrap(),
);
request.with_basic_auth();
Expand Down
65 changes: 27 additions & 38 deletions coins/monero/src/rpc/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ impl HttpRpc {
&client
.request(
Request::post(url.clone())
.body(vec![].into())
.body("".into())
.map_err(|e| RpcError::ConnectionError(format!("couldn't make request: {e:?}")))?,
)
.await
Expand All @@ -102,7 +102,9 @@ impl HttpRpc {
connection: Arc::new(Mutex::new((challenge, client))),
}
} else {
Authentication::Unauthenticated(Client::with_connection_pool())
Authentication::Unauthenticated(
Client::with_connection_pool().map_err(|e| RpcError::ConnectionError(format!("{e:?}")))?,
)
};

Ok(Rpc(HttpRpc { authentication, url }))
Expand All @@ -117,7 +119,7 @@ impl HttpRpc {
.map_err(|e| RpcError::ConnectionError(format!("couldn't make request: {e:?}")))
};

async fn body_from_response(response: Response<'_>) -> Result<Vec<u8>, RpcError> {
async fn body_from_response(response: Response) -> Result<Vec<u8>, RpcError> {
/*
let length = usize::try_from(
response
Expand Down Expand Up @@ -210,50 +212,37 @@ impl HttpRpc {
.1
.request(request)
.await
.map_err(|e| RpcError::ConnectionError(format!("{e:?}")));
.map_err(|e| RpcError::ConnectionError(format!("{e:?}")))?;

let (error, is_stale) = match &response {
Err(e) => (Some(e.clone()), false),
Ok(response) => (
None,
if response.status() == StatusCode::UNAUTHORIZED {
if let Some(header) = response.headers().get("www-authenticate") {
header
.to_str()
.map_err(|_| {
RpcError::InvalidNode("www-authenticate header wasn't a string".to_string())
})?
.contains("stale")
} else {
false
}
} else {
false
},
),
};
let mut is_stale = false;
if response.status() == StatusCode::UNAUTHORIZED {
if let Some(header) = response.headers().get("www-authenticate") {
if header
.to_str()
.map_err(|_| {
RpcError::InvalidNode("www-authenticate header wasn't a string".to_string())
})?
.contains("stale")
{
is_stale = true;
}
}
}

// If the connection entered an error state, drop the cached challenge as challenges are
// per-connection
// We don't need to create a new connection as simple-request will for us
if error.is_some() || is_stale {
// If the authentication is stale, drop the cached challenge and retry
if is_stale {
connection_lock.0 = None;
// If we're not already on our second attempt, move to the next loop iteration
// (retrying all of this once)
if attempt == 0 {
continue;
}
if let Some(e) = error {
Err(e)?
} else {
debug_assert!(is_stale);
Err(RpcError::InvalidNode(
"node claimed fresh connection had stale authentication".to_string(),
))?
}
} else {
body_from_response(response.unwrap()).await?
Err(RpcError::InvalidNode(
"node claimed fresh connection had stale authentication".to_string(),
))?
}

body_from_response(response).await?
}
});
}
Expand Down
6 changes: 1 addition & 5 deletions coins/monero/src/rpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,11 +135,7 @@ impl<R: RpcConnection> Rpc<R> {
.0
.post(
route,
if let Some(params) = params {
serde_json::to_string(&params).unwrap().into_bytes()
} else {
vec![]
},
if let Some(params) = params { serde_json::to_vec(&params).unwrap() } else { vec![] },
)
.await?;
let res_str = std_shims::str::from_utf8(&res)
Expand Down
11 changes: 7 additions & 4 deletions common/request/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,19 @@ all-features = true
rustdoc-args = ["--cfg", "docsrs"]

[dependencies]
# Deprecated here means to enable deprecated warnings, not to restore deprecated APIs
hyper = { version = "0.14", default-features = false, features = ["http1", "tcp", "client", "runtime", "backports", "deprecated"] }
hyper = { version = "1", default-features = false, features = ["http1", "client"] }
hyper-util = { version = "0.1", default-features = false, features = ["http1", "client", "tokio"] }
http-body-util = { version = "0.1", default-features = false }

tokio = { version = "1", default-features = false }

hyper-rustls = { version = "0.24", default-features = false, features = ["http1", "native-tokio"], optional = true }
tower-service = { version = "0.3", default-features = false, optional = true }
hyper-boring = { version = "5", default-features = false, optional = true }

zeroize = { version = "1", optional = true }
base64ct = { version = "1", features = ["alloc"], optional = true }

[features]
tls = ["hyper-rustls"]
tls = ["hyper-boring"]
basic-auth = ["zeroize", "base64ct"]
default = ["tls"]
63 changes: 37 additions & 26 deletions common/request/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,13 @@ use std::sync::Arc;
use tokio::sync::Mutex;

#[cfg(feature = "tls")]
use hyper_rustls::{HttpsConnectorBuilder, HttpsConnector};
use hyper::{
Uri,
header::HeaderValue,
body::Body,
service::Service,
client::{HttpConnector, conn::http1::SendRequest},
use tower_service::Service as TowerService;
#[cfg(feature = "tls")]
use hyper_boring::HttpsConnector;
use hyper::{Uri, header::HeaderValue, body::Bytes, client::conn::http1::SendRequest};
use hyper_util::{
rt::tokio::TokioExecutor,
client::legacy::{Client as HyperClient, connect::HttpConnector},
};
pub use hyper;

Expand All @@ -29,17 +29,27 @@ pub enum Error {
InconsistentHost,
ConnectionError(Box<dyn Send + Sync + std::error::Error>),
Hyper(hyper::Error),
HyperUtil(hyper_util::client::legacy::Error),
}

#[cfg(not(feature = "tls"))]
type Connector = HttpConnector;
#[cfg(feature = "tls")]
type Connector = HttpsConnector<HttpConnector>;

#[derive(Clone, Debug)]
#[derive(Clone)]
enum Connection {
ConnectionPool(hyper::Client<Connector>),
Connection { connector: Connector, host: Uri, connection: Arc<Mutex<Option<SendRequest<Body>>>> },
ConnectionPool(HyperClient<Connector, Full<Bytes>>),
Connection {
connector: Connector,
host: Uri,
connection: Arc<Mutex<Option<SendRequest<Full<Bytes>>>>>,
},
}
impl core::fmt::Debug for Connection {
fn fmt(&self, fmt: &mut core::fmt::Formatter<'_>) -> Result<(), core::fmt::Error> {
fmt.debug_struct("Connection").finish_non_exhaustive()
}
}

#[derive(Clone, Debug)]
Expand All @@ -48,25 +58,26 @@ pub struct Client {
}

impl Client {
fn connector() -> Connector {
fn connector() -> Result<Connector, Error> {
#[cfg(feature = "tls")]
let res =
HttpsConnectorBuilder::new().with_native_roots().https_or_http().enable_http1().build();
let res = HttpsConnector::new().map_err(|e| Error::ConnectionError(format!("{e:?}").into()))?;
#[cfg(not(feature = "tls"))]
let res = HttpConnector::new();
res
Ok(res)
}

pub fn with_connection_pool() -> Client {
Client {
connection: Connection::ConnectionPool(hyper::Client::builder().build(Self::connector())),
}
pub fn with_connection_pool() -> Result<Client, Error> {
Ok(Client {
connection: Connection::ConnectionPool(
HyperClient::builder(TokioExecutor::new()).build(Self::connector()?),
),
})
}

pub fn without_connection_pool(host: String) -> Result<Client, Error> {
Ok(Client {
connection: Connection::Connection {
connector: Self::connector(),
connector: Self::connector()?,
host: {
let uri: Uri = host.parse().map_err(|_| Error::InvalidUri)?;
if uri.host().is_none() {
Expand All @@ -79,7 +90,7 @@ impl Client {
})
}

pub async fn request<R: Into<Request>>(&self, request: R) -> Result<Response<'_>, Error> {
pub async fn request<R: Into<Request>>(&self, request: R) -> Result<Response, Error> {
let request: Request = request.into();
let mut request = request.0;
if let Some(header_host) = request.headers().get(hyper::header::HOST) {
Expand Down Expand Up @@ -111,8 +122,10 @@ impl Client {
.insert(hyper::header::HOST, HeaderValue::from_str(&host).map_err(|_| Error::InvalidUri)?);
}

let response = match &self.connection {
Connection::ConnectionPool(client) => client.request(request).await.map_err(Error::Hyper)?,
Ok(Response(match &self.connection {
Connection::ConnectionPool(client) => {
client.request(request).await.map_err(Error::HyperUtil)?
}
Connection::Connection { connector, host, connection } => {
let mut connection_lock = connection.lock().await;

Expand All @@ -137,16 +150,14 @@ impl Client {
// Send the request
let res = connection.send_request(request).await;
if let Ok(res) = res {
return Ok(Response(res, self));
return Ok(Response(res));
}
err = res.err();
}
// Since this connection has been put into an error state, drop it
*connection_lock = None;
Err(Error::Hyper(err.unwrap()))?
}
};

Ok(Response(response, self))
}))
}
}
10 changes: 5 additions & 5 deletions common/request/src/request.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
use hyper::body::Body;
#[cfg(feature = "basic-auth")]
use hyper::header::HeaderValue;
use hyper::{body::Bytes, header::HeaderValue};
pub use http_body_util::Full;

#[cfg(feature = "basic-auth")]
use crate::Error;

#[derive(Debug)]
pub struct Request(pub(crate) hyper::Request<Body>);
pub struct Request(pub(crate) hyper::Request<Full<Bytes>>);
impl Request {
#[cfg(feature = "basic-auth")]
fn username_password_from_uri(&self) -> Result<(String, String), Error> {
Expand Down Expand Up @@ -59,8 +59,8 @@ impl Request {
let _ = self.basic_auth_from_uri();
}
}
impl From<hyper::Request<Body>> for Request {
fn from(request: hyper::Request<Body>) -> Request {
impl From<hyper::Request<Full<Bytes>>> for Request {
fn from(request: hyper::Request<Full<Bytes>>) -> Request {
Request(request)
}
}
17 changes: 12 additions & 5 deletions common/request/src/response.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,29 @@
use hyper::{
StatusCode,
header::{HeaderValue, HeaderMap},
body::{Buf, Body},
body::{Buf, Incoming},
};
use http_body_util::BodyExt;

use crate::{Client, Error};
use crate::Error;

// Borrows the client so its async task lives as long as this response exists.
#[derive(Debug)]
pub struct Response<'a>(pub(crate) hyper::Response<Body>, pub(crate) &'a Client);
impl<'a> Response<'a> {
pub struct Response(pub(crate) hyper::Response<Incoming>);
impl Response {
pub fn status(&self) -> StatusCode {
self.0.status()
}
pub fn headers(&self) -> &HeaderMap<HeaderValue> {
self.0.headers()
}
pub async fn body(self) -> Result<impl std::io::Read, Error> {
hyper::body::aggregate(self.0.into_body()).await.map(Buf::reader).map_err(Error::Hyper)
self
.0
.into_body()
.collect()
.await
.map_err(Error::Hyper)
.map(|collected| Buf::reader(collected.aggregate()))
}
}
2 changes: 1 addition & 1 deletion orchestration/Dockerfile.parts/Dockerfile.serai.build
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ RUN echo "/usr/lib/libmimalloc.so" >> /etc/ld.so.preload
RUN apt update && apt upgrade -y && apt autoremove -y && apt clean

# Add dev dependencies
RUN apt install -y pkg-config clang
RUN apt install -y pkg-config clang cmake git

# Dependencies for the Serai node
RUN apt install -y make protobuf-compiler
Expand Down
2 changes: 1 addition & 1 deletion orchestration/coordinator/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ RUN echo "/usr/lib/libmimalloc.so" >> /etc/ld.so.preload
RUN apt update && apt upgrade -y && apt autoremove -y && apt clean

# Add dev dependencies
RUN apt install -y pkg-config clang
RUN apt install -y pkg-config clang cmake git

# Dependencies for the Serai node
RUN apt install -y make protobuf-compiler
Expand Down
2 changes: 1 addition & 1 deletion orchestration/message-queue/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ RUN echo "/usr/lib/libmimalloc.so" >> /etc/ld.so.preload
RUN apt update && apt upgrade -y && apt autoremove -y && apt clean

# Add dev dependencies
RUN apt install -y pkg-config clang
RUN apt install -y pkg-config clang cmake git

# Dependencies for the Serai node
RUN apt install -y make protobuf-compiler
Expand Down
Loading

0 comments on commit 7dc7f8f

Please sign in to comment.