diff --git a/crates/node/builder/src/rpc.rs b/crates/node/builder/src/rpc.rs index 7f6cb5e898cc..03ae899cba8b 100644 --- a/crates/node/builder/src/rpc.rs +++ b/crates/node/builder/src/rpc.rs @@ -289,7 +289,8 @@ where extend_rpc_modules.extend_rpc_modules(ctx)?; let server_config = config.rpc.rpc_server_config(); - let launch_rpc = modules.clone().start_server(server_config).map_ok(|handle| { + let cloned_modules = modules.clone(); + let launch_rpc = server_config.start(&cloned_modules).map_ok(|handle| { if let Some(path) = handle.ipc_endpoint() { info!(target: "reth::cli", %path, "RPC IPC server started"); } diff --git a/crates/rpc/rpc-builder/src/lib.rs b/crates/rpc/rpc-builder/src/lib.rs index ee57e79e9aa7..8a6dce5ae6d7 100644 --- a/crates/rpc/rpc-builder/src/lib.rs +++ b/crates/rpc/rpc-builder/src/lib.rs @@ -8,9 +8,8 @@ //! transaction pool. [`RpcModuleBuilder::build`] returns a [`TransportRpcModules`] which contains //! the transport specific config (what APIs are available via this transport). //! -//! The [`RpcServerConfig`] is used to configure the [`RpcServer`] type which contains all transport -//! implementations (http server, ws server, ipc server). [`RpcServer::start`] requires the -//! [`TransportRpcModules`] so it can start the servers with the configured modules. +//! The [`RpcServerConfig`] is used to assemble and start the http server, ws server, ipc servers, +//! it requires the [`TransportRpcModules`] so it can start the servers with the configured modules. //! //! # Examples //! @@ -58,9 +57,8 @@ //! .build(transports, EthApiBuild::build); //! let handle = RpcServerConfig::default() //! .with_http(ServerBuilder::default()) -//! .start(transport_modules) -//! .await -//! .unwrap(); +//! .start(&transport_modules) +//! .await; //! } //! ``` //! @@ -122,8 +120,7 @@ //! let config = RpcServerConfig::default(); //! //! let (_rpc_handle, _auth_handle) = -//! try_join!(modules.start_server(config), auth_module.start_server(auth_config),) -//! .unwrap(); +//! try_join!(config.start(&modules), auth_module.start_server(auth_config),).unwrap(); //! } //! ``` @@ -137,7 +134,6 @@ use std::{ collections::HashMap, - fmt, net::{Ipv4Addr, SocketAddr, SocketAddrV4}, sync::Arc, time::{Duration, SystemTime, UNIX_EPOCH}, @@ -147,12 +143,11 @@ use error::{ConflictingModules, RpcError, ServerKind}; use http::{header::AUTHORIZATION, HeaderMap}; use jsonrpsee::{ core::RegisterMethodError, - server::{AlreadyStoppedError, IdProvider, RpcServiceBuilder, Server, ServerHandle}, + server::{AlreadyStoppedError, IdProvider, RpcServiceBuilder, ServerHandle}, Methods, RpcModule, }; use reth_engine_primitives::EngineTypes; use reth_evm::ConfigureEvm; -use reth_ipc::server::IpcServer; use reth_network_api::{noop::NoopNetwork, NetworkInfo, Peers}; use reth_provider::{ AccountReader, BlockReader, CanonStateSubscriptions, ChainSpecProvider, ChangeSetReader, @@ -175,7 +170,6 @@ use reth_tasks::{pool::BlockingTaskGuard, TaskSpawner, TokioTaskExecutor}; use reth_transaction_pool::{noop::NoopTransactionPool, TransactionPool}; use serde::{Deserialize, Serialize}; use tower_http::cors::CorsLayer; -use tracing::{instrument, trace}; use crate::{ auth::AuthRpcModule, cors::CorsDomainError, error::WsHttpSamePortError, @@ -237,10 +231,12 @@ where EthApi: FullEthApiServer, { let module_config = module_config.into(); - let server_config = server_config.into(); - RpcModuleBuilder::new(provider, pool, network, executor, events, evm_config) - .build(module_config, eth) - .start_server(server_config) + server_config + .into() + .start( + &RpcModuleBuilder::new(provider, pool, network, executor, events, evm_config) + .build(module_config, eth), + ) .await } @@ -501,8 +497,6 @@ where /// Configures all [`RpcModule`]s specific to the given [`TransportRpcModuleConfig`] which can /// be used to start the transport server(s). - /// - /// See also [`RpcServer::start`] pub fn build( self, module_config: TransportRpcModuleConfig, @@ -1283,28 +1277,26 @@ impl RpcServerConfig { self.ipc_endpoint.clone() } - /// Convenience function to do [`RpcServerConfig::build`] and [`RpcServer::start`] in one step - pub async fn start(self, modules: TransportRpcModules) -> Result { - self.build(&modules).await?.start(modules).await - } - /// Creates the [`CorsLayer`] if any fn maybe_cors_layer(cors: Option) -> Result, CorsDomainError> { cors.as_deref().map(cors::create_cors_layer).transpose() } /// Creates the [`AuthLayer`] if any - fn maybe_jwt_layer(&self) -> Option> { - self.jwt_secret.map(|secret| AuthLayer::new(JwtAuthValidator::new(secret))) + fn maybe_jwt_layer(jwt_secret: Option) -> Option> { + jwt_secret.map(|secret| AuthLayer::new(JwtAuthValidator::new(secret))) } - /// Builds the ws and http server(s). + /// Builds and starts the configured server(s): http, ws, ipc. /// - /// If both are on the same port, they are combined into one server. - async fn build_ws_http( - &mut self, - modules: &TransportRpcModules, - ) -> Result { + /// If both http and ws are on the same port, they are combined into one server. + /// + /// Returns the [`RpcServerHandle`] with the handle to the started servers. + pub async fn start(self, modules: &TransportRpcModules) -> Result { + let mut http_handle = None; + let mut ws_handle = None; + let mut ipc_handle = None; + let http_socket_addr = self.http_addr.unwrap_or(SocketAddr::V4(SocketAddrV4::new( Ipv4Addr::LOCALHOST, constants::DEFAULT_HTTP_RPC_PORT, @@ -1315,6 +1307,17 @@ impl RpcServerConfig { constants::DEFAULT_WS_RPC_PORT, ))); + let metrics = modules.ipc.as_ref().map(RpcRequestMetrics::ipc).unwrap_or_default(); + let ipc_path = + self.ipc_endpoint.clone().unwrap_or_else(|| constants::DEFAULT_IPC_ENDPOINT.into()); + + if let Some(builder) = self.ipc_server_config { + let ipc = builder + .set_rpc_middleware(IpcRpcServiceBuilder::new().layer(metrics)) + .build(ipc_path); + ipc_handle = Some(ipc.start(modules.ipc.clone().expect("ipc server error")).await?); + } + // If both are configured on the same port, we combine them into one server. if self.http_addr == self.ws_addr && self.http_server_config.is_some() && @@ -1327,7 +1330,7 @@ impl RpcServerConfig { http_cors_domains: Some(http_cors.clone()), ws_cors_domains: Some(ws_cors.clone()), } - .into()) + .into()); } Some(ws_cors) } @@ -1336,53 +1339,62 @@ impl RpcServerConfig { .cloned(); // we merge this into one server using the http setup - self.ws_server_config.take(); - modules.config.ensure_ws_http_identical()?; - let builder = self.http_server_config.take().expect("http_server_config is Some"); - let server = builder - .set_http_middleware( - tower::ServiceBuilder::new() - .option_layer(Self::maybe_cors_layer(cors)?) - .option_layer(self.maybe_jwt_layer()), - ) - .set_rpc_middleware( - RpcServiceBuilder::new().layer( - modules - .http - .as_ref() - .or(modules.ws.as_ref()) - .map(RpcRequestMetrics::same_port) - .unwrap_or_default(), - ), - ) - .build(http_socket_addr) - .await - .map_err(|err| RpcError::server_error(err, ServerKind::WsHttp(http_socket_addr)))?; - let addr = server - .local_addr() - .map_err(|err| RpcError::server_error(err, ServerKind::WsHttp(http_socket_addr)))?; - return Ok(WsHttpServer { - http_local_addr: Some(addr), - ws_local_addr: Some(addr), - server: WsHttpServers::SamePort(server), - jwt_secret: self.jwt_secret, - }) + if let Some(builder) = self.http_server_config { + let server = builder + .set_http_middleware( + tower::ServiceBuilder::new() + .option_layer(Self::maybe_cors_layer(cors)?) + .option_layer(Self::maybe_jwt_layer(self.jwt_secret)), + ) + .set_rpc_middleware( + RpcServiceBuilder::new().layer( + modules + .http + .as_ref() + .or(modules.ws.as_ref()) + .map(RpcRequestMetrics::same_port) + .unwrap_or_default(), + ), + ) + .build(http_socket_addr) + .await + .map_err(|err| { + RpcError::server_error(err, ServerKind::WsHttp(http_socket_addr)) + })?; + let addr = server.local_addr().map_err(|err| { + RpcError::server_error(err, ServerKind::WsHttp(http_socket_addr)) + })?; + if let Some(module) = modules.http.as_ref().or(modules.ws.as_ref()) { + let handle = server.start(module.clone()); + http_handle = Some(handle.clone()); + ws_handle = Some(handle); + } + return Ok(RpcServerHandle { + http_local_addr: Some(addr), + ws_local_addr: Some(addr), + http: http_handle, + ws: ws_handle, + ipc_endpoint: self.ipc_endpoint.clone(), + ipc: ipc_handle, + jwt_secret: self.jwt_secret, + }); + } } + let mut ws_local_addr = None; + let mut ws_server = None; let mut http_local_addr = None; let mut http_server = None; - let mut ws_local_addr = None; - let mut ws_server = None; - if let Some(builder) = self.ws_server_config.take() { + if let Some(builder) = self.ws_server_config { let server = builder .ws_only() .set_http_middleware( tower::ServiceBuilder::new() .option_layer(Self::maybe_cors_layer(self.ws_cors_domains.clone())?) - .option_layer(self.maybe_jwt_layer()), + .option_layer(Self::maybe_jwt_layer(self.jwt_secret)), ) .set_rpc_middleware( RpcServiceBuilder::new() @@ -1391,6 +1403,7 @@ impl RpcServerConfig { .build(ws_socket_addr) .await .map_err(|err| RpcError::server_error(err, ServerKind::WS(ws_socket_addr)))?; + let addr = server .local_addr() .map_err(|err| RpcError::server_error(err, ServerKind::WS(ws_socket_addr)))?; @@ -1399,13 +1412,13 @@ impl RpcServerConfig { ws_server = Some(server); } - if let Some(builder) = self.http_server_config.take() { + if let Some(builder) = self.http_server_config { let server = builder .http_only() .set_http_middleware( tower::ServiceBuilder::new() .option_layer(Self::maybe_cors_layer(self.http_cors_domains.clone())?) - .option_layer(self.maybe_jwt_layer()), + .option_layer(Self::maybe_jwt_layer(self.jwt_secret)), ) .set_rpc_middleware( RpcServiceBuilder::new().layer( @@ -1422,36 +1435,20 @@ impl RpcServerConfig { http_server = Some(server); } - Ok(WsHttpServer { + http_handle = http_server + .map(|http_server| http_server.start(modules.http.clone().expect("http server error"))); + ws_handle = ws_server + .map(|ws_server| ws_server.start(modules.ws.clone().expect("ws server error"))); + Ok(RpcServerHandle { http_local_addr, ws_local_addr, - server: WsHttpServers::DifferentPort { http: http_server, ws: ws_server }, + http: http_handle, + ws: ws_handle, + ipc_endpoint: self.ipc_endpoint.clone(), + ipc: ipc_handle, jwt_secret: self.jwt_secret, }) } - - /// Finalize the configuration of the server(s). - /// - /// This consumes the builder and returns a server. - /// - /// Note: The server is not started and does nothing unless polled, See also - /// [`RpcServer::start`] - pub async fn build(mut self, modules: &TransportRpcModules) -> Result { - let mut server = RpcServer::empty(); - server.ws_http = self.build_ws_http(modules).await?; - - if let Some(builder) = self.ipc_server_config { - let metrics = modules.ipc.as_ref().map(RpcRequestMetrics::ipc).unwrap_or_default(); - let ipc_path = - self.ipc_endpoint.unwrap_or_else(|| constants::DEFAULT_IPC_ENDPOINT.into()); - let ipc = builder - .set_rpc_middleware(IpcRpcServiceBuilder::new().layer(metrics)) - .build(ipc_path); - server.ipc = Some(ipc); - } - - Ok(server) - } } /// Holds modules to be installed per transport type @@ -1658,167 +1655,6 @@ impl TransportRpcModules { self.merge_ipc(other)?; Ok(()) } - - /// Convenience function for starting a server - pub async fn start_server(self, builder: RpcServerConfig) -> Result { - builder.start(self).await - } -} - -/// Container type for ws and http servers in all possible combinations. -#[derive(Default)] -struct WsHttpServer { - /// The address of the http server - http_local_addr: Option, - /// The address of the ws server - ws_local_addr: Option, - /// Configured ws,http servers - server: WsHttpServers, - /// The jwt secret. - jwt_secret: Option, -} - -// Define the type alias with detailed type complexity -type WsHttpServerKind = Server< - Stack< - tower::util::Either, Identity>, - Stack, Identity>, - >, - Stack, ->; - -/// Enum for holding the http and ws servers in all possible combinations. -enum WsHttpServers { - /// Both servers are on the same port - SamePort(WsHttpServerKind), - /// Servers are on different ports - DifferentPort { http: Option, ws: Option }, -} - -// === impl WsHttpServers === - -impl WsHttpServers { - /// Starts the servers and returns the handles (http, ws) - fn start( - self, - http_module: Option>, - ws_module: Option>, - config: &TransportRpcModuleConfig, - ) -> Result<(Option, Option), RpcError> { - let mut http_handle = None; - let mut ws_handle = None; - match self { - Self::SamePort(server) => { - // Make sure http and ws modules are identical, since we currently can't run - // different modules on same server - config.ensure_ws_http_identical()?; - - if let Some(module) = http_module.or(ws_module) { - let handle = server.start(module); - http_handle = Some(handle.clone()); - ws_handle = Some(handle); - } - } - Self::DifferentPort { http, ws } => { - if let Some((server, module)) = - http.and_then(|server| http_module.map(|module| (server, module))) - { - http_handle = Some(server.start(module)); - } - if let Some((server, module)) = - ws.and_then(|server| ws_module.map(|module| (server, module))) - { - ws_handle = Some(server.start(module)); - } - } - } - - Ok((http_handle, ws_handle)) - } -} - -impl Default for WsHttpServers { - fn default() -> Self { - Self::DifferentPort { http: None, ws: None } - } -} - -/// Container type for each transport ie. http, ws, and ipc server -pub struct RpcServer { - /// Configured ws,http servers - ws_http: WsHttpServer, - /// ipc server - ipc: Option>>, -} - -// === impl RpcServer === - -impl RpcServer { - fn empty() -> Self { - Self { ws_http: Default::default(), ipc: None } - } - - /// Returns the [`SocketAddr`] of the http server if started. - pub const fn http_local_addr(&self) -> Option { - self.ws_http.http_local_addr - } - /// Return the `JwtSecret` of the server - pub const fn jwt(&self) -> Option { - self.ws_http.jwt_secret - } - - /// Returns the [`SocketAddr`] of the ws server if started. - pub const fn ws_local_addr(&self) -> Option { - self.ws_http.ws_local_addr - } - - /// Returns the endpoint of the ipc server if started. - pub fn ipc_endpoint(&self) -> Option { - self.ipc.as_ref().map(|ipc| ipc.endpoint()) - } - - /// Starts the configured server by spawning the servers on the tokio runtime. - /// - /// This returns an [RpcServerHandle] that's connected to the server task(s) until the server is - /// stopped or the [RpcServerHandle] is dropped. - #[instrument(name = "start", skip_all, fields(http = ?self.http_local_addr(), ws = ?self.ws_local_addr(), ipc = ?self.ipc_endpoint()), target = "rpc", level = "TRACE")] - pub async fn start(self, modules: TransportRpcModules) -> Result { - trace!(target: "rpc", "staring RPC server"); - let Self { ws_http, ipc: ipc_server } = self; - let TransportRpcModules { config, http, ws, ipc } = modules; - let mut handle = RpcServerHandle { - http_local_addr: ws_http.http_local_addr, - ws_local_addr: ws_http.ws_local_addr, - http: None, - ws: None, - ipc_endpoint: None, - ipc: None, - jwt_secret: None, - }; - - let (http, ws) = ws_http.server.start(http, ws, &config)?; - handle.http = http; - handle.ws = ws; - - if let Some((server, module)) = - ipc_server.and_then(|server| ipc.map(|module| (server, module))) - { - handle.ipc_endpoint = Some(server.endpoint()); - handle.ipc = Some(server.start(module).await?); - } - - Ok(handle) - } -} - -impl fmt::Debug for RpcServer { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("RpcServer") - .field("http", &self.ws_http.http_local_addr.is_some()) - .field("ws", &self.ws_http.ws_local_addr.is_some()) - .field("ipc", &self.ipc.is_some()) - .finish() - } } /// A handle to the spawned servers. diff --git a/crates/rpc/rpc-builder/tests/it/startup.rs b/crates/rpc/rpc-builder/tests/it/startup.rs index 4c873f2b38c9..5680d03a5307 100644 --- a/crates/rpc/rpc-builder/tests/it/startup.rs +++ b/crates/rpc/rpc-builder/tests/it/startup.rs @@ -28,9 +28,8 @@ async fn test_http_addr_in_use() { let builder = test_rpc_builder(); let server = builder .build(TransportRpcModuleConfig::set_http(vec![RethRpcModule::Admin]), EthApiBuild::build); - let result = server - .start_server(RpcServerConfig::http(Default::default()).with_http_address(addr)) - .await; + let result = + RpcServerConfig::http(Default::default()).with_http_address(addr).start(&server).await; let err = result.unwrap_err(); assert!(is_addr_in_use_kind(&err, ServerKind::Http(addr)), "{err}"); } @@ -42,8 +41,7 @@ async fn test_ws_addr_in_use() { let builder = test_rpc_builder(); let server = builder .build(TransportRpcModuleConfig::set_ws(vec![RethRpcModule::Admin]), EthApiBuild::build); - let result = - server.start_server(RpcServerConfig::ws(Default::default()).with_ws_address(addr)).await; + let result = RpcServerConfig::ws(Default::default()).with_ws_address(addr).start(&server).await; let err = result.unwrap_err(); assert!(is_addr_in_use_kind(&err, ServerKind::WS(addr)), "{err}"); } @@ -65,13 +63,11 @@ async fn test_launch_same_port_different_modules() { EthApiBuild::build, ); let addr = test_address(); - let res = server - .start_server( - RpcServerConfig::ws(Default::default()) - .with_ws_address(addr) - .with_http(Default::default()) - .with_http_address(addr), - ) + let res = RpcServerConfig::ws(Default::default()) + .with_ws_address(addr) + .with_http(Default::default()) + .with_http_address(addr) + .start(&server) .await; let err = res.unwrap_err(); assert!(matches!( @@ -89,15 +85,13 @@ async fn test_launch_same_port_same_cors() { EthApiBuild::build, ); let addr = test_address(); - let res = server - .start_server( - RpcServerConfig::ws(Default::default()) - .with_ws_address(addr) - .with_http(Default::default()) - .with_cors(Some("*".to_string())) - .with_http_cors(Some("*".to_string())) - .with_http_address(addr), - ) + let res = RpcServerConfig::ws(Default::default()) + .with_ws_address(addr) + .with_http(Default::default()) + .with_cors(Some("*".to_string())) + .with_http_cors(Some("*".to_string())) + .with_http_address(addr) + .start(&server) .await; assert!(res.is_ok()); } @@ -111,15 +105,13 @@ async fn test_launch_same_port_different_cors() { EthApiBuild::build, ); let addr = test_address(); - let res = server - .start_server( - RpcServerConfig::ws(Default::default()) - .with_ws_address(addr) - .with_http(Default::default()) - .with_cors(Some("*".to_string())) - .with_http_cors(Some("example".to_string())) - .with_http_address(addr), - ) + let res = RpcServerConfig::ws(Default::default()) + .with_ws_address(addr) + .with_http(Default::default()) + .with_cors(Some("*".to_string())) + .with_http_cors(Some("example".to_string())) + .with_http_address(addr) + .start(&server) .await; let err = res.unwrap_err(); assert!(matches!( diff --git a/crates/rpc/rpc-builder/tests/it/utils.rs b/crates/rpc/rpc-builder/tests/it/utils.rs index 85c9dbeac3f2..ea9954f23c10 100644 --- a/crates/rpc/rpc-builder/tests/it/utils.rs +++ b/crates/rpc/rpc-builder/tests/it/utils.rs @@ -53,8 +53,9 @@ pub async fn launch_auth(secret: JwtSecret) -> AuthServerHandle { pub async fn launch_http(modules: impl Into) -> RpcServerHandle { let builder = test_rpc_builder(); let server = builder.build(TransportRpcModuleConfig::set_http(modules), EthApiBuild::build); - server - .start_server(RpcServerConfig::http(Default::default()).with_http_address(test_address())) + RpcServerConfig::http(Default::default()) + .with_http_address(test_address()) + .start(&server) .await .unwrap() } @@ -63,8 +64,9 @@ pub async fn launch_http(modules: impl Into) -> RpcServerHan pub async fn launch_ws(modules: impl Into) -> RpcServerHandle { let builder = test_rpc_builder(); let server = builder.build(TransportRpcModuleConfig::set_ws(modules), EthApiBuild::build); - server - .start_server(RpcServerConfig::ws(Default::default()).with_ws_address(test_address())) + RpcServerConfig::ws(Default::default()) + .with_http_address(test_address()) + .start(&server) .await .unwrap() } @@ -77,13 +79,11 @@ pub async fn launch_http_ws(modules: impl Into) -> RpcServer TransportRpcModuleConfig::set_ws(modules.clone()).with_http(modules), EthApiBuild::build, ); - server - .start_server( - RpcServerConfig::ws(Default::default()) - .with_ws_address(test_address()) - .with_http(Default::default()) - .with_http_address(test_address()), - ) + RpcServerConfig::ws(Default::default()) + .with_ws_address(test_address()) + .with_http(Default::default()) + .with_http_address(test_address()) + .start(&server) .await .unwrap() } @@ -97,13 +97,11 @@ pub async fn launch_http_ws_same_port(modules: impl Into) -> EthApiBuild::build, ); let addr = test_address(); - server - .start_server( - RpcServerConfig::ws(Default::default()) - .with_ws_address(addr) - .with_http(Default::default()) - .with_http_address(addr), - ) + RpcServerConfig::ws(Default::default()) + .with_ws_address(addr) + .with_http(Default::default()) + .with_http_address(addr) + .start(&server) .await .unwrap() } diff --git a/examples/rpc-db/src/main.rs b/examples/rpc-db/src/main.rs index 85ad28d6a051..30c0479549fc 100644 --- a/examples/rpc-db/src/main.rs +++ b/examples/rpc-db/src/main.rs @@ -79,7 +79,7 @@ async fn main() -> eyre::Result<()> { // Start the server & keep it alive let server_args = RpcServerConfig::http(Default::default()).with_http_address("0.0.0.0:8545".parse()?); - let _handle = server_args.start(server).await?; + let _handle = server_args.start(&server).await?; futures::future::pending::<()>().await; Ok(())