Skip to content

Commit

Permalink
remove intermediary types
Browse files Browse the repository at this point in the history
  • Loading branch information
smatthewenglish committed Jun 30, 2024
1 parent fffeefb commit 0a0f206
Show file tree
Hide file tree
Showing 5 changed files with 94 additions and 239 deletions.
30 changes: 15 additions & 15 deletions crates/node/builder/src/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -285,19 +285,20 @@ where

extend_rpc_modules.extend_rpc_modules(ctx)?;

let server_config = config.rpc.rpc_server_config();
let launch_rpc = modules.clone().start_server(server_config).map_ok(|handle| {
if let Some(path) = handle.ipc_endpoint() {
info!(target: "reth::cli", %path, "RPC IPC server started");
}
if let Some(addr) = handle.http_local_addr() {
info!(target: "reth::cli", url=%addr, "RPC HTTP server started");
}
if let Some(addr) = handle.ws_local_addr() {
info!(target: "reth::cli", url=%addr, "RPC WS server started");
}
handle
});
let mut server_config = config.rpc.rpc_server_config();

let cloned_modules = modules.clone();
let launch_rpc = server_config.build_ws_http(&cloned_modules).await?;

if let Some(path) = launch_rpc.ipc_endpoint() {
info!(target: "reth::cli", %path, "RPC IPC server started");
}
if let Some(addr) = launch_rpc.http_local_addr() {
info!(target: "reth::cli", url=%addr, "RPC HTTP server started");
}
if let Some(addr) = launch_rpc.ws_local_addr() {
info!(target: "reth::cli", url=%addr, "RPC WS server started");
}

let launch_auth = auth_module.clone().start_server(auth_config).map_ok(|handle| {
let addr = handle.local_addr();
Expand All @@ -310,8 +311,7 @@ where
});

// launch servers concurrently
let (rpc, auth) = futures::future::try_join(launch_rpc, launch_auth).await?;
let handles = RethRpcServerHandles { rpc, auth };
let handles = RethRpcServerHandles { rpc: launch_rpc, auth: launch_auth.await? };

let ctx = RpcContext {
node,
Expand Down
203 changes: 40 additions & 163 deletions crates/rpc/rpc-builder/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ use error::{ConflictingModules, RpcError, ServerKind};
use http::{header::AUTHORIZATION, HeaderMap};
use jsonrpsee::{
core::RegisterMethodError,
server::{AlreadyStoppedError, IdProvider, RpcServiceBuilder, Server, ServerHandle},
server::{AlreadyStoppedError, IdProvider, RpcServiceBuilder, ServerHandle},
Methods, RpcModule,
};
use reth_engine_primitives::EngineTypes;
Expand All @@ -190,7 +190,6 @@ use reth_transaction_pool::{noop::NoopTransactionPool, TransactionPool};
use serde::{Deserialize, Serialize};
use std::{
collections::HashMap,
fmt,
net::{Ipv4Addr, SocketAddr, SocketAddrV4},
sync::Arc,
time::{Duration, SystemTime, UNIX_EPOCH},
Expand Down Expand Up @@ -254,11 +253,13 @@ where
EvmConfig: ConfigureEvm,
{
let module_config = module_config.into();
let server_config = server_config.into();
RpcModuleBuilder::new(provider, pool, network, executor, events, evm_config)
.build(module_config)
.start_server(server_config)
.await
let mut server_config = server_config.into();

let value = RpcModuleBuilder::new(provider, pool, network, executor, events, evm_config)
.build(module_config);

let output: RpcServerHandle = server_config.build_ws_http(&value).await?;
Ok(output)
}

/// A builder type to configure the RPC module: See [`RpcModule`]
Expand Down Expand Up @@ -1271,11 +1272,6 @@ impl RpcServerConfig {
self.ipc_endpoint.clone()
}

/// Convenience function to do [`RpcServerConfig::build`] and [`RpcServer::start`] in one step
pub async fn start(self, modules: TransportRpcModules) -> Result<RpcServerHandle, RpcError> {
self.build(&modules).await?.start(modules).await
}

/// Creates the [`CorsLayer`] if any
fn maybe_cors_layer(cors: Option<String>) -> Result<Option<CorsLayer>, CorsDomainError> {
cors.as_deref().map(cors::create_cors_layer).transpose()
Expand All @@ -1289,10 +1285,13 @@ impl RpcServerConfig {
/// Builds the ws and http server(s).
///
/// If both are on the same port, they are combined into one server.
async fn build_ws_http(
pub async fn build_ws_http(
&mut self,
modules: &TransportRpcModules,
) -> Result<WsHttpServer, RpcError> {
) -> Result<RpcServerHandle, RpcError> {
let mut http_handle = None;
let mut ws_handle = None;

let http_socket_addr = self.http_addr.unwrap_or(SocketAddr::V4(SocketAddrV4::new(
Ipv4Addr::LOCALHOST,
constants::DEFAULT_HTTP_RPC_PORT,
Expand Down Expand Up @@ -1351,10 +1350,19 @@ impl RpcServerConfig {
let addr = server
.local_addr()
.map_err(|err| RpcError::server_error(err, ServerKind::WsHttp(http_socket_addr)))?;
return Ok(WsHttpServer {

if let Some(module) = modules.http.as_ref().or(modules.ws.as_ref()) {
let handle = server.start(module.clone());
http_handle = Some(handle.clone());
ws_handle = Some(handle);
}
return Ok(RpcServerHandle {
http_local_addr: Some(addr),
ws_local_addr: Some(addr),
server: WsHttpServers::SamePort(server),
http: http_handle,
ws: ws_handle,
ipc_endpoint: None,
ipc: None,
jwt_secret: self.jwt_secret,
})
}
Expand Down Expand Up @@ -1410,36 +1418,20 @@ impl RpcServerConfig {
http_server = Some(server);
}

Ok(WsHttpServer {
http_handle =
Some(http_server.expect("REASON").start(modules.http.clone().expect("REASON")));
ws_handle = Some(ws_server.expect("REASON").start(modules.ws.clone().expect("REASON")));

Ok(RpcServerHandle {
http_local_addr,
ws_local_addr,
server: WsHttpServers::DifferentPort { http: http_server, ws: ws_server },
http: http_handle,
ws: ws_handle,
ipc_endpoint: None,
ipc: None,
jwt_secret: self.jwt_secret,
})
}

/// Finalize the configuration of the server(s).
///
/// This consumes the builder and returns a server.
///
/// Note: The server is not started and does nothing unless polled, See also
/// [`RpcServer::start`]
pub async fn build(mut self, modules: &TransportRpcModules) -> Result<RpcServer, RpcError> {
let mut server = RpcServer::empty();
server.ws_http = self.build_ws_http(modules).await?;

if let Some(builder) = self.ipc_server_config {
let metrics = modules.ipc.as_ref().map(RpcRequestMetrics::ipc).unwrap_or_default();
let ipc_path =
self.ipc_endpoint.unwrap_or_else(|| constants::DEFAULT_IPC_ENDPOINT.into());
let ipc = builder
.set_rpc_middleware(IpcRpcServiceBuilder::new().layer(metrics))
.build(ipc_path);
server.ipc = Some(ipc);
}

Ok(server)
}
}

/// Holds modules to be installed per transport type
Expand Down Expand Up @@ -1646,120 +1638,18 @@ impl TransportRpcModules {
self.merge_ipc(other)?;
Ok(())
}

/// Convenience function for starting a server
pub async fn start_server(self, builder: RpcServerConfig) -> Result<RpcServerHandle, RpcError> {
builder.start(self).await
}
}

/// Container type for ws and http servers in all possible combinations.
#[derive(Default)]
struct WsHttpServer {
/// The address of the http server
http_local_addr: Option<SocketAddr>,
/// The address of the ws server
ws_local_addr: Option<SocketAddr>,
/// Configured ws,http servers
server: WsHttpServers,
/// The jwt secret.
jwt_secret: Option<JwtSecret>,
}

// Define the type alias with detailed type complexity
type WsHttpServerKind = Server<
Stack<
tower::util::Either<AuthLayer<JwtAuthValidator>, Identity>,
Stack<tower::util::Either<CorsLayer, Identity>, Identity>,
>,
Stack<RpcRequestMetrics, Identity>,
>;

/// Enum for holding the http and ws servers in all possible combinations.
enum WsHttpServers {
/// Both servers are on the same port
SamePort(WsHttpServerKind),
/// Servers are on different ports
DifferentPort { http: Option<WsHttpServerKind>, ws: Option<WsHttpServerKind> },
}

// === impl WsHttpServers ===

impl WsHttpServers {
/// Starts the servers and returns the handles (http, ws)
async fn start(
self,
http_module: Option<RpcModule<()>>,
ws_module: Option<RpcModule<()>>,
config: &TransportRpcModuleConfig,
) -> Result<(Option<ServerHandle>, Option<ServerHandle>), RpcError> {
let mut http_handle = None;
let mut ws_handle = None;
match self {
Self::SamePort(server) => {
// Make sure http and ws modules are identical, since we currently can't run
// different modules on same server
config.ensure_ws_http_identical()?;

if let Some(module) = http_module.or(ws_module) {
let handle = server.start(module);
http_handle = Some(handle.clone());
ws_handle = Some(handle);
}
}
Self::DifferentPort { http, ws } => {
if let Some((server, module)) =
http.and_then(|server| http_module.map(|module| (server, module)))
{
http_handle = Some(server.start(module));
}
if let Some((server, module)) =
ws.and_then(|server| ws_module.map(|module| (server, module)))
{
ws_handle = Some(server.start(module));
}
}
}

Ok((http_handle, ws_handle))
}
}

impl Default for WsHttpServers {
fn default() -> Self {
Self::DifferentPort { http: None, ws: None }
}
}

/// Container type for each transport ie. http, ws, and ipc server
/// Container type for ipc server
#[allow(missing_debug_implementations)]
pub struct RpcServer {
/// Configured ws,http servers
ws_http: WsHttpServer,
/// ipc server
ipc: Option<IpcServer<Identity, Stack<RpcRequestMetrics, Identity>>>,
}

// === impl RpcServer ===

impl RpcServer {
fn empty() -> Self {
Self { ws_http: Default::default(), ipc: None }
}

/// Returns the [`SocketAddr`] of the http server if started.
pub const fn http_local_addr(&self) -> Option<SocketAddr> {
self.ws_http.http_local_addr
}
/// Return the `JwtSecret` of the server
pub const fn jwt(&self) -> Option<JwtSecret> {
self.ws_http.jwt_secret
}

/// Returns the [`SocketAddr`] of the ws server if started.
pub const fn ws_local_addr(&self) -> Option<SocketAddr> {
self.ws_http.ws_local_addr
}

/// Returns the endpoint of the ipc server if started.
pub fn ipc_endpoint(&self) -> Option<String> {
self.ipc.as_ref().map(|ipc| ipc.endpoint())
Expand All @@ -1769,25 +1659,22 @@ impl RpcServer {
///
/// This returns an [RpcServerHandle] that's connected to the server task(s) until the server is
/// stopped or the [RpcServerHandle] is dropped.
#[instrument(name = "start", skip_all, fields(http = ?self.http_local_addr(), ws = ?self.ws_local_addr(), ipc = ?self.ipc_endpoint()), target = "rpc", level = "TRACE")]
#[instrument(name = "start", skip_all, fields(ipc = ?self.ipc_endpoint()), target = "rpc", level = "TRACE")]
#[allow(dead_code, unused_variables)]
pub async fn start(self, modules: TransportRpcModules) -> Result<RpcServerHandle, RpcError> {
trace!(target: "rpc", "staring RPC server");
let Self { ws_http, ipc: ipc_server } = self;
let Self { ipc: ipc_server } = self;
let TransportRpcModules { config, http, ws, ipc } = modules;
let mut handle = RpcServerHandle {
http_local_addr: ws_http.http_local_addr,
ws_local_addr: ws_http.ws_local_addr,
http_local_addr: None,
ws_local_addr: None,
http: None,
ws: None,
ipc_endpoint: None,
ipc: None,
jwt_secret: None,
};

let (http, ws) = ws_http.server.start(http, ws, &config).await?;
handle.http = http;
handle.ws = ws;

if let Some((server, module)) =
ipc_server.and_then(|server| ipc.map(|module| (server, module)))
{
Expand All @@ -1799,16 +1686,6 @@ impl RpcServer {
}
}

impl fmt::Debug for RpcServer {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("RpcServer")
.field("http", &self.ws_http.http_local_addr.is_some())
.field("ws", &self.ws_http.ws_local_addr.is_some())
.field("ipc", &self.ipc.is_some())
.finish()
}
}

/// A handle to the spawned servers.
///
/// When this type is dropped or [`RpcServerHandle::stop`] has been called the server will be
Expand Down
Loading

0 comments on commit 0a0f206

Please sign in to comment.