From f8413844f9c3fc8156d123e1bba1e88ccfb182b5 Mon Sep 17 00:00:00 2001 From: smatthewenglish Date: Sun, 7 Jul 2024 20:12:27 -0400 Subject: [PATCH] remove intermediary types, update effected tests, add ipc start --- Cargo.toml | 2 +- crates/node/builder/src/rpc.rs | 29 ++- crates/rpc/rpc-builder/src/lib.rs | 272 ++++----------------- crates/rpc/rpc-builder/tests/it/startup.rs | 56 ++--- crates/rpc/rpc-builder/tests/it/utils.rs | 40 ++- examples/rpc-db/src/main.rs | 4 +- 6 files changed, 106 insertions(+), 297 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 84b4bfb81afb..1a0a112e23ab 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -535,4 +535,4 @@ proptest-derive = "0.5" serial_test = "3" similar-asserts = "1.5.0" test-fuzz = "5" -iai-callgrind = "0.11" +iai-callgrind = "0.11.1" diff --git a/crates/node/builder/src/rpc.rs b/crates/node/builder/src/rpc.rs index 7f6cb5e898cc..84761f24ac4f 100644 --- a/crates/node/builder/src/rpc.rs +++ b/crates/node/builder/src/rpc.rs @@ -288,19 +288,19 @@ where extend_rpc_modules.extend_rpc_modules(ctx)?; - let server_config = config.rpc.rpc_server_config(); - let launch_rpc = modules.clone().start_server(server_config).map_ok(|handle| { - if let Some(path) = handle.ipc_endpoint() { - info!(target: "reth::cli", %path, "RPC IPC server started"); - } - if let Some(addr) = handle.http_local_addr() { - info!(target: "reth::cli", url=%addr, "RPC HTTP server started"); - } - if let Some(addr) = handle.ws_local_addr() { - info!(target: "reth::cli", url=%addr, "RPC WS server started"); - } - handle - }); + let mut server_config = config.rpc.rpc_server_config(); + let cloned_modules = modules.clone(); + let launch_rpc = server_config.start(&cloned_modules).await?; + + if let Some(path) = launch_rpc.ipc_endpoint() { + info!(target: "reth::cli", %path, "RPC IPC server started"); + } + if let Some(addr) = launch_rpc.http_local_addr() { + info!(target: "reth::cli", url=%addr, "RPC HTTP server started"); + } + if let Some(addr) = launch_rpc.ws_local_addr() { + info!(target: "reth::cli", url=%addr, "RPC WS server started"); + } let launch_auth = auth_module.clone().start_server(auth_config).map_ok(|handle| { let addr = handle.local_addr(); @@ -313,8 +313,7 @@ where }); // launch servers concurrently - let (rpc, auth) = futures::future::try_join(launch_rpc, launch_auth).await?; - let handles = RethRpcServerHandles { rpc, auth }; + let handles = RethRpcServerHandles { rpc: launch_rpc, auth: launch_auth.await? }; let ctx = RpcContext { node, diff --git a/crates/rpc/rpc-builder/src/lib.rs b/crates/rpc/rpc-builder/src/lib.rs index ee57e79e9aa7..458423a24633 100644 --- a/crates/rpc/rpc-builder/src/lib.rs +++ b/crates/rpc/rpc-builder/src/lib.rs @@ -8,9 +8,8 @@ //! transaction pool. [`RpcModuleBuilder::build`] returns a [`TransportRpcModules`] which contains //! the transport specific config (what APIs are available via this transport). //! -//! The [`RpcServerConfig`] is used to configure the [`RpcServer`] type which contains all transport -//! implementations (http server, ws server, ipc server). [`RpcServer::start`] requires the -//! [`TransportRpcModules`] so it can start the servers with the configured modules. +//! The [`RpcServerConfig`] is used to assemble and start the http server, ws server, ipc servers, +//! it requires the [`TransportRpcModules`] so it can start the servers with the configured modules. //! //! # Examples //! @@ -56,11 +55,8 @@ //! evm_config, //! ) //! .build(transports, EthApiBuild::build); -//! let handle = RpcServerConfig::default() -//! .with_http(ServerBuilder::default()) -//! .start(transport_modules) -//! .await -//! .unwrap(); +//! let mut config = RpcServerConfig::default().with_http(ServerBuilder::default()); +//! config.start(&transport_modules).await; //! } //! ``` //! @@ -119,11 +115,10 @@ //! //! // start the servers //! let auth_config = AuthServerConfig::builder(JwtSecret::random()).build(); -//! let config = RpcServerConfig::default(); +//! let mut config = RpcServerConfig::default(); //! //! let (_rpc_handle, _auth_handle) = -//! try_join!(modules.start_server(config), auth_module.start_server(auth_config),) -//! .unwrap(); +//! try_join!(config.start(&modules), auth_module.start_server(auth_config),).unwrap(); //! } //! ``` @@ -137,7 +132,6 @@ use std::{ collections::HashMap, - fmt, net::{Ipv4Addr, SocketAddr, SocketAddrV4}, sync::Arc, time::{Duration, SystemTime, UNIX_EPOCH}, @@ -147,12 +141,11 @@ use error::{ConflictingModules, RpcError, ServerKind}; use http::{header::AUTHORIZATION, HeaderMap}; use jsonrpsee::{ core::RegisterMethodError, - server::{AlreadyStoppedError, IdProvider, RpcServiceBuilder, Server, ServerHandle}, + server::{AlreadyStoppedError, IdProvider, RpcServiceBuilder, ServerHandle}, Methods, RpcModule, }; use reth_engine_primitives::EngineTypes; use reth_evm::ConfigureEvm; -use reth_ipc::server::IpcServer; use reth_network_api::{noop::NoopNetwork, NetworkInfo, Peers}; use reth_provider::{ AccountReader, BlockReader, CanonStateSubscriptions, ChainSpecProvider, ChangeSetReader, @@ -175,7 +168,6 @@ use reth_tasks::{pool::BlockingTaskGuard, TaskSpawner, TokioTaskExecutor}; use reth_transaction_pool::{noop::NoopTransactionPool, TransactionPool}; use serde::{Deserialize, Serialize}; use tower_http::cors::CorsLayer; -use tracing::{instrument, trace}; use crate::{ auth::AuthRpcModule, cors::CorsDomainError, error::WsHttpSamePortError, @@ -237,10 +229,12 @@ where EthApi: FullEthApiServer, { let module_config = module_config.into(); - let server_config = server_config.into(); - RpcModuleBuilder::new(provider, pool, network, executor, events, evm_config) - .build(module_config, eth) - .start_server(server_config) + server_config + .into() + .start( + &RpcModuleBuilder::new(provider, pool, network, executor, events, evm_config) + .build(module_config, eth), + ) .await } @@ -501,8 +495,6 @@ where /// Configures all [`RpcModule`]s specific to the given [`TransportRpcModuleConfig`] which can /// be used to start the transport server(s). - /// - /// See also [`RpcServer::start`] pub fn build( self, module_config: TransportRpcModuleConfig, @@ -1283,11 +1275,6 @@ impl RpcServerConfig { self.ipc_endpoint.clone() } - /// Convenience function to do [`RpcServerConfig::build`] and [`RpcServer::start`] in one step - pub async fn start(self, modules: TransportRpcModules) -> Result { - self.build(&modules).await?.start(modules).await - } - /// Creates the [`CorsLayer`] if any fn maybe_cors_layer(cors: Option) -> Result, CorsDomainError> { cors.as_deref().map(cors::create_cors_layer).transpose() @@ -1298,13 +1285,17 @@ impl RpcServerConfig { self.jwt_secret.map(|secret| AuthLayer::new(JwtAuthValidator::new(secret))) } - /// Builds the ws and http server(s). + /// Builds and starts the ws and http server(s). /// /// If both are on the same port, they are combined into one server. - async fn build_ws_http( + pub async fn start( &mut self, modules: &TransportRpcModules, - ) -> Result { + ) -> Result { + let mut http_handle = None; + let mut ws_handle = None; + let mut ipc_handle = None; + let http_socket_addr = self.http_addr.unwrap_or(SocketAddr::V4(SocketAddrV4::new( Ipv4Addr::LOCALHOST, constants::DEFAULT_HTTP_RPC_PORT, @@ -1315,6 +1306,17 @@ impl RpcServerConfig { constants::DEFAULT_WS_RPC_PORT, ))); + let metrics = modules.ipc.as_ref().map(RpcRequestMetrics::ipc).unwrap_or_default(); + let ipc_path = + self.ipc_endpoint.clone().unwrap_or_else(|| constants::DEFAULT_IPC_ENDPOINT.into()); + + if let Some(builder) = self.ipc_server_config.take() { + let ipc = builder + .set_rpc_middleware(IpcRpcServiceBuilder::new().layer(metrics)) + .build(ipc_path); + ipc_handle = Some(ipc.start(modules.ipc.clone().expect("ipc server error")).await?); + } + // If both are configured on the same port, we combine them into one server. if self.http_addr == self.ws_addr && self.http_server_config.is_some() && @@ -1327,7 +1329,7 @@ impl RpcServerConfig { http_cors_domains: Some(http_cors.clone()), ws_cors_domains: Some(ws_cors.clone()), } - .into()) + .into()); } Some(ws_cors) } @@ -1363,12 +1365,20 @@ impl RpcServerConfig { let addr = server .local_addr() .map_err(|err| RpcError::server_error(err, ServerKind::WsHttp(http_socket_addr)))?; - return Ok(WsHttpServer { + if let Some(module) = modules.http.as_ref().or(modules.ws.as_ref()) { + let handle = server.start(module.clone()); + http_handle = Some(handle.clone()); + ws_handle = Some(handle); + } + return Ok(RpcServerHandle { http_local_addr: Some(addr), ws_local_addr: Some(addr), - server: WsHttpServers::SamePort(server), + http: http_handle, + ws: ws_handle, + ipc_endpoint: self.ipc_endpoint.clone(), + ipc: ipc_handle, jwt_secret: self.jwt_secret, - }) + }); } let mut http_local_addr = None; @@ -1421,37 +1431,20 @@ impl RpcServerConfig { http_local_addr = Some(local_addr); http_server = Some(server); } - - Ok(WsHttpServer { + http_handle = http_server + .map(|http_server| http_server.start(modules.http.clone().expect("http server error"))); + ws_handle = ws_server + .map(|ws_server| ws_server.start(modules.ws.clone().expect("ws server error"))); + Ok(RpcServerHandle { http_local_addr, ws_local_addr, - server: WsHttpServers::DifferentPort { http: http_server, ws: ws_server }, + http: http_handle, + ws: ws_handle, + ipc_endpoint: self.ipc_endpoint.clone(), + ipc: ipc_handle, jwt_secret: self.jwt_secret, }) } - - /// Finalize the configuration of the server(s). - /// - /// This consumes the builder and returns a server. - /// - /// Note: The server is not started and does nothing unless polled, See also - /// [`RpcServer::start`] - pub async fn build(mut self, modules: &TransportRpcModules) -> Result { - let mut server = RpcServer::empty(); - server.ws_http = self.build_ws_http(modules).await?; - - if let Some(builder) = self.ipc_server_config { - let metrics = modules.ipc.as_ref().map(RpcRequestMetrics::ipc).unwrap_or_default(); - let ipc_path = - self.ipc_endpoint.unwrap_or_else(|| constants::DEFAULT_IPC_ENDPOINT.into()); - let ipc = builder - .set_rpc_middleware(IpcRpcServiceBuilder::new().layer(metrics)) - .build(ipc_path); - server.ipc = Some(ipc); - } - - Ok(server) - } } /// Holds modules to be installed per transport type @@ -1658,167 +1651,6 @@ impl TransportRpcModules { self.merge_ipc(other)?; Ok(()) } - - /// Convenience function for starting a server - pub async fn start_server(self, builder: RpcServerConfig) -> Result { - builder.start(self).await - } -} - -/// Container type for ws and http servers in all possible combinations. -#[derive(Default)] -struct WsHttpServer { - /// The address of the http server - http_local_addr: Option, - /// The address of the ws server - ws_local_addr: Option, - /// Configured ws,http servers - server: WsHttpServers, - /// The jwt secret. - jwt_secret: Option, -} - -// Define the type alias with detailed type complexity -type WsHttpServerKind = Server< - Stack< - tower::util::Either, Identity>, - Stack, Identity>, - >, - Stack, ->; - -/// Enum for holding the http and ws servers in all possible combinations. -enum WsHttpServers { - /// Both servers are on the same port - SamePort(WsHttpServerKind), - /// Servers are on different ports - DifferentPort { http: Option, ws: Option }, -} - -// === impl WsHttpServers === - -impl WsHttpServers { - /// Starts the servers and returns the handles (http, ws) - fn start( - self, - http_module: Option>, - ws_module: Option>, - config: &TransportRpcModuleConfig, - ) -> Result<(Option, Option), RpcError> { - let mut http_handle = None; - let mut ws_handle = None; - match self { - Self::SamePort(server) => { - // Make sure http and ws modules are identical, since we currently can't run - // different modules on same server - config.ensure_ws_http_identical()?; - - if let Some(module) = http_module.or(ws_module) { - let handle = server.start(module); - http_handle = Some(handle.clone()); - ws_handle = Some(handle); - } - } - Self::DifferentPort { http, ws } => { - if let Some((server, module)) = - http.and_then(|server| http_module.map(|module| (server, module))) - { - http_handle = Some(server.start(module)); - } - if let Some((server, module)) = - ws.and_then(|server| ws_module.map(|module| (server, module))) - { - ws_handle = Some(server.start(module)); - } - } - } - - Ok((http_handle, ws_handle)) - } -} - -impl Default for WsHttpServers { - fn default() -> Self { - Self::DifferentPort { http: None, ws: None } - } -} - -/// Container type for each transport ie. http, ws, and ipc server -pub struct RpcServer { - /// Configured ws,http servers - ws_http: WsHttpServer, - /// ipc server - ipc: Option>>, -} - -// === impl RpcServer === - -impl RpcServer { - fn empty() -> Self { - Self { ws_http: Default::default(), ipc: None } - } - - /// Returns the [`SocketAddr`] of the http server if started. - pub const fn http_local_addr(&self) -> Option { - self.ws_http.http_local_addr - } - /// Return the `JwtSecret` of the server - pub const fn jwt(&self) -> Option { - self.ws_http.jwt_secret - } - - /// Returns the [`SocketAddr`] of the ws server if started. - pub const fn ws_local_addr(&self) -> Option { - self.ws_http.ws_local_addr - } - - /// Returns the endpoint of the ipc server if started. - pub fn ipc_endpoint(&self) -> Option { - self.ipc.as_ref().map(|ipc| ipc.endpoint()) - } - - /// Starts the configured server by spawning the servers on the tokio runtime. - /// - /// This returns an [RpcServerHandle] that's connected to the server task(s) until the server is - /// stopped or the [RpcServerHandle] is dropped. - #[instrument(name = "start", skip_all, fields(http = ?self.http_local_addr(), ws = ?self.ws_local_addr(), ipc = ?self.ipc_endpoint()), target = "rpc", level = "TRACE")] - pub async fn start(self, modules: TransportRpcModules) -> Result { - trace!(target: "rpc", "staring RPC server"); - let Self { ws_http, ipc: ipc_server } = self; - let TransportRpcModules { config, http, ws, ipc } = modules; - let mut handle = RpcServerHandle { - http_local_addr: ws_http.http_local_addr, - ws_local_addr: ws_http.ws_local_addr, - http: None, - ws: None, - ipc_endpoint: None, - ipc: None, - jwt_secret: None, - }; - - let (http, ws) = ws_http.server.start(http, ws, &config)?; - handle.http = http; - handle.ws = ws; - - if let Some((server, module)) = - ipc_server.and_then(|server| ipc.map(|module| (server, module))) - { - handle.ipc_endpoint = Some(server.endpoint()); - handle.ipc = Some(server.start(module).await?); - } - - Ok(handle) - } -} - -impl fmt::Debug for RpcServer { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("RpcServer") - .field("http", &self.ws_http.http_local_addr.is_some()) - .field("ws", &self.ws_http.ws_local_addr.is_some()) - .field("ipc", &self.ipc.is_some()) - .finish() - } } /// A handle to the spawned servers. diff --git a/crates/rpc/rpc-builder/tests/it/startup.rs b/crates/rpc/rpc-builder/tests/it/startup.rs index 4c873f2b38c9..4f5a37aa01fb 100644 --- a/crates/rpc/rpc-builder/tests/it/startup.rs +++ b/crates/rpc/rpc-builder/tests/it/startup.rs @@ -28,9 +28,8 @@ async fn test_http_addr_in_use() { let builder = test_rpc_builder(); let server = builder .build(TransportRpcModuleConfig::set_http(vec![RethRpcModule::Admin]), EthApiBuild::build); - let result = server - .start_server(RpcServerConfig::http(Default::default()).with_http_address(addr)) - .await; + let mut config = RpcServerConfig::http(Default::default()).with_http_address(addr); + let result = config.start(&server).await; let err = result.unwrap_err(); assert!(is_addr_in_use_kind(&err, ServerKind::Http(addr)), "{err}"); } @@ -42,8 +41,8 @@ async fn test_ws_addr_in_use() { let builder = test_rpc_builder(); let server = builder .build(TransportRpcModuleConfig::set_ws(vec![RethRpcModule::Admin]), EthApiBuild::build); - let result = - server.start_server(RpcServerConfig::ws(Default::default()).with_ws_address(addr)).await; + let mut config = RpcServerConfig::ws(Default::default()).with_ws_address(addr); + let result = config.start(&server).await; let err = result.unwrap_err(); assert!(is_addr_in_use_kind(&err, ServerKind::WS(addr)), "{err}"); } @@ -65,14 +64,11 @@ async fn test_launch_same_port_different_modules() { EthApiBuild::build, ); let addr = test_address(); - let res = server - .start_server( - RpcServerConfig::ws(Default::default()) - .with_ws_address(addr) - .with_http(Default::default()) - .with_http_address(addr), - ) - .await; + let mut config = RpcServerConfig::ws(Default::default()) + .with_ws_address(addr) + .with_http(Default::default()) + .with_http_address(addr); + let res = config.start(&server).await; let err = res.unwrap_err(); assert!(matches!( err, @@ -89,16 +85,13 @@ async fn test_launch_same_port_same_cors() { EthApiBuild::build, ); let addr = test_address(); - let res = server - .start_server( - RpcServerConfig::ws(Default::default()) - .with_ws_address(addr) - .with_http(Default::default()) - .with_cors(Some("*".to_string())) - .with_http_cors(Some("*".to_string())) - .with_http_address(addr), - ) - .await; + let mut config = RpcServerConfig::ws(Default::default()) + .with_ws_address(addr) + .with_http(Default::default()) + .with_cors(Some("*".to_string())) + .with_http_cors(Some("*".to_string())) + .with_http_address(addr); + let res = config.start(&server).await; assert!(res.is_ok()); } @@ -111,16 +104,13 @@ async fn test_launch_same_port_different_cors() { EthApiBuild::build, ); let addr = test_address(); - let res = server - .start_server( - RpcServerConfig::ws(Default::default()) - .with_ws_address(addr) - .with_http(Default::default()) - .with_cors(Some("*".to_string())) - .with_http_cors(Some("example".to_string())) - .with_http_address(addr), - ) - .await; + let mut config = RpcServerConfig::ws(Default::default()) + .with_ws_address(addr) + .with_http(Default::default()) + .with_cors(Some("*".to_string())) + .with_http_cors(Some("example".to_string())) + .with_http_address(addr); + let res = config.start(&server).await; let err = res.unwrap_err(); assert!(matches!( err, diff --git a/crates/rpc/rpc-builder/tests/it/utils.rs b/crates/rpc/rpc-builder/tests/it/utils.rs index 85c9dbeac3f2..7a1644c8f136 100644 --- a/crates/rpc/rpc-builder/tests/it/utils.rs +++ b/crates/rpc/rpc-builder/tests/it/utils.rs @@ -53,20 +53,16 @@ pub async fn launch_auth(secret: JwtSecret) -> AuthServerHandle { pub async fn launch_http(modules: impl Into) -> RpcServerHandle { let builder = test_rpc_builder(); let server = builder.build(TransportRpcModuleConfig::set_http(modules), EthApiBuild::build); - server - .start_server(RpcServerConfig::http(Default::default()).with_http_address(test_address())) - .await - .unwrap() + let mut config = RpcServerConfig::http(Default::default()).with_http_address(test_address()); + config.start(&server).await.unwrap() } /// Launches a new server with ws only with the given modules pub async fn launch_ws(modules: impl Into) -> RpcServerHandle { let builder = test_rpc_builder(); let server = builder.build(TransportRpcModuleConfig::set_ws(modules), EthApiBuild::build); - server - .start_server(RpcServerConfig::ws(Default::default()).with_ws_address(test_address())) - .await - .unwrap() + let mut config = RpcServerConfig::ws(Default::default()).with_http_address(test_address()); + config.start(&server).await.unwrap() } /// Launches a new server with http and ws and with the given modules @@ -77,15 +73,11 @@ pub async fn launch_http_ws(modules: impl Into) -> RpcServer TransportRpcModuleConfig::set_ws(modules.clone()).with_http(modules), EthApiBuild::build, ); - server - .start_server( - RpcServerConfig::ws(Default::default()) - .with_ws_address(test_address()) - .with_http(Default::default()) - .with_http_address(test_address()), - ) - .await - .unwrap() + let mut config = RpcServerConfig::ws(Default::default()) + .with_ws_address(test_address()) + .with_http(Default::default()) + .with_http_address(test_address()); + config.start(&server).await.unwrap() } /// Launches a new server with http and ws and with the given modules on the same port. @@ -97,15 +89,11 @@ pub async fn launch_http_ws_same_port(modules: impl Into) -> EthApiBuild::build, ); let addr = test_address(); - server - .start_server( - RpcServerConfig::ws(Default::default()) - .with_ws_address(addr) - .with_http(Default::default()) - .with_http_address(addr), - ) - .await - .unwrap() + let mut config = RpcServerConfig::ws(Default::default()) + .with_ws_address(addr) + .with_http(Default::default()) + .with_http_address(addr); + config.start(&server).await.unwrap() } /// Returns an [`RpcModuleBuilder`] with testing components. diff --git a/examples/rpc-db/src/main.rs b/examples/rpc-db/src/main.rs index 85ad28d6a051..809129c81c2c 100644 --- a/examples/rpc-db/src/main.rs +++ b/examples/rpc-db/src/main.rs @@ -77,9 +77,9 @@ async fn main() -> eyre::Result<()> { server.merge_configured(custom_rpc.into_rpc())?; // Start the server & keep it alive - let server_args = + let mut server_args = RpcServerConfig::http(Default::default()).with_http_address("0.0.0.0:8545".parse()?); - let _handle = server_args.start(server).await?; + let _handle = server_args.start(&server).await?; futures::future::pending::<()>().await; Ok(())