diff --git a/nautilus_core/Cargo.lock b/nautilus_core/Cargo.lock index 1bcbebde2b91..f568feed8b9a 100644 --- a/nautilus_core/Cargo.lock +++ b/nautilus_core/Cargo.lock @@ -407,6 +407,61 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" +[[package]] +name = "axum" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d09dbe0e490df5da9d69b36dca48a76635288a82f92eca90024883a56202026d" +dependencies = [ + "async-trait", + "axum-core", + "bytes", + "futures-util", + "http 1.0.0", + "http-body 1.0.0", + "http-body-util", + "hyper 1.1.0", + "hyper-util", + "itoa", + "matchit", + "memchr", + "mime", + "percent-encoding", + "pin-project-lite", + "rustversion", + "serde", + "serde_json", + "serde_path_to_error", + "serde_urlencoded", + "sync_wrapper", + "tokio", + "tower", + "tower-layer", + "tower-service", + "tracing", +] + +[[package]] +name = "axum-core" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e87c8503f93e6d144ee5690907ba22db7ba79ab001a932ab99034f0fe836b3df" +dependencies = [ + "async-trait", + "bytes", + "futures-util", + "http 1.0.0", + "http-body 1.0.0", + "http-body-util", + "mime", + "pin-project-lite", + "rustversion", + "sync_wrapper", + "tower-layer", + "tower-service", + "tracing", +] + [[package]] name = "backtrace" version = "0.3.69" @@ -1383,6 +1438,15 @@ dependencies = [ "serde", ] +[[package]] +name = "encoding_rs" +version = "0.8.33" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7268b386296a025e474d5140678f75d6de9493ae55a5d709eeb9dd08149945e1" +dependencies = [ + "cfg-if", +] + [[package]] name = "env_logger" version = "0.8.4" @@ -1678,6 +1742,44 @@ version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" +[[package]] +name = "h2" +version = "0.3.22" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4d6250322ef6e60f93f9a2162799302cd6f68f79f6e5d85c8c16f14d1d958178" +dependencies = [ + "bytes", + "fnv", + "futures-core", + "futures-sink", + "futures-util", + "http 0.2.11", + "indexmap 2.1.0", + "slab", + "tokio", + "tokio-util", + "tracing", +] + +[[package]] +name = "h2" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e1d308f63daf4181410c242d34c11f928dcb3aa105852019e043c9d1f4e4368a" +dependencies = [ + "bytes", + "fnv", + "futures-core", + "futures-sink", + "futures-util", + "http 1.0.0", + "indexmap 2.1.0", + "slab", + "tokio", + "tokio-util", + "tracing", +] + [[package]] name = "half" version = "1.8.2" @@ -1813,6 +1915,29 @@ dependencies = [ "pin-project-lite", ] +[[package]] +name = "http-body" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1cac85db508abc24a2e48553ba12a996e87244a0395ce011e62b37158745d643" +dependencies = [ + "bytes", + "http 1.0.0", +] + +[[package]] +name = "http-body-util" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "41cb79eb393015dadd30fc252023adb0b2400a0caee0fa2a077e6e21a551e840" +dependencies = [ + "bytes", + "futures-util", + "http 1.0.0", + "http-body 1.0.0", + "pin-project-lite", +] + [[package]] name = "httparse" version = "1.8.0" @@ -1841,8 +1966,9 @@ dependencies = [ "futures-channel", "futures-core", "futures-util", + "h2 0.3.22", "http 0.2.11", - "http-body", + "http-body 0.4.6", "httparse", "httpdate", "itoa", @@ -1854,6 +1980,25 @@ dependencies = [ "want", ] +[[package]] +name = "hyper" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fb5aa53871fc917b1a9ed87b683a5d86db645e23acb32c2e0785a353e522fb75" +dependencies = [ + "bytes", + "futures-channel", + "futures-util", + "h2 0.4.0", + "http 1.0.0", + "http-body 1.0.0", + "httparse", + "httpdate", + "itoa", + "pin-project-lite", + "tokio", +] + [[package]] name = "hyper-tls" version = "0.5.0" @@ -1861,12 +2006,30 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d6183ddfa99b85da61a140bea0efc93fdf56ceaa041b37d553518030827f9905" dependencies = [ "bytes", - "hyper", + "hyper 0.14.28", "native-tls", "tokio", "tokio-native-tls", ] +[[package]] +name = "hyper-util" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bdea9aac0dbe5a9240d68cfd9501e2db94222c6dc06843e06640b9e07f0fdc67" +dependencies = [ + "bytes", + "futures-channel", + "futures-util", + "http 1.0.0", + "http-body 1.0.0", + "hyper 1.1.0", + "pin-project-lite", + "socket2 0.5.5", + "tokio", + "tracing", +] + [[package]] name = "iai" version = "0.1.1" @@ -1944,6 +2107,12 @@ version = "3.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8bb03732005da905c88227371639bf1ad885cc712789c011c31c5fb3ab3ccf02" +[[package]] +name = "ipnet" +version = "2.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f518f335dce6725a761382244631d86cf0ccb2863413590b31338feb467f9c3" + [[package]] name = "is-terminal" version = "0.4.10" @@ -2163,6 +2332,12 @@ dependencies = [ "regex-automata 0.1.10", ] +[[package]] +name = "matchit" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0e7465ac9959cc2b1404e8e2367b43684a6d13790fe23056cc8c6c5a6b7bcb94" + [[package]] name = "md-5" version = "0.10.6" @@ -2188,6 +2363,12 @@ dependencies = [ "autocfg", ] +[[package]] +name = "mime" +version = "0.3.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6877bb514081ee2a7ff5ef9de3281f14a4dd4bceac4c09388074a6b5df8a139a" + [[package]] name = "minimal-lexical" version = "0.2.1" @@ -2374,16 +2555,20 @@ name = "nautilus-network" version = "0.14.0" dependencies = [ "anyhow", + "axum", "criterion", "dashmap", "futures", "futures-util", - "hyper", + "http 1.0.0", + "http-body-util", + "hyper 1.1.0", "hyper-tls", "nautilus-core", "nonzero_ext", "pyo3", "pyo3-asyncio", + "reqwest", "rstest", "serde_json", "tokio", @@ -3288,6 +3473,44 @@ dependencies = [ "bytecheck", ] +[[package]] +name = "reqwest" +version = "0.11.23" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "37b1ae8d9ac08420c66222fb9096fc5de435c3c48542bc5336c51892cffafb41" +dependencies = [ + "base64", + "bytes", + "encoding_rs", + "futures-core", + "futures-util", + "h2 0.3.22", + "http 0.2.11", + "http-body 0.4.6", + "hyper 0.14.28", + "hyper-tls", + "ipnet", + "js-sys", + "log", + "mime", + "native-tls", + "once_cell", + "percent-encoding", + "pin-project-lite", + "serde", + "serde_json", + "serde_urlencoded", + "system-configuration", + "tokio", + "tokio-native-tls", + "tower-service", + "url", + "wasm-bindgen", + "wasm-bindgen-futures", + "web-sys", + "winreg", +] + [[package]] name = "ring" version = "0.16.20" @@ -3673,6 +3896,28 @@ dependencies = [ "serde", ] +[[package]] +name = "serde_path_to_error" +version = "0.1.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4beec8bce849d58d06238cb50db2e1c417cfeafa4c63f692b15c82b7c80f8335" +dependencies = [ + "itoa", + "serde", +] + +[[package]] +name = "serde_urlencoded" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d3491c14715ca2294c4d6a88f15e84739788c1d030eed8c110436aafdaa2f3fd" +dependencies = [ + "form_urlencoded", + "itoa", + "ryu", + "serde", +] + [[package]] name = "sha1" version = "0.10.6" @@ -4149,6 +4394,33 @@ dependencies = [ "syn 2.0.43", ] +[[package]] +name = "sync_wrapper" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2047c6ded9c721764247e62cd3b03c09ffc529b2ba5b10ec482ae507a4a70160" + +[[package]] +name = "system-configuration" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ba3a3adc5c275d719af8cb4272ea1c4a6d668a777f37e115f6d11ddbc1c8e0e7" +dependencies = [ + "bitflags 1.3.2", + "core-foundation", + "system-configuration-sys", +] + +[[package]] +name = "system-configuration-sys" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a75fb188eb626b924683e3b95e3a48e63551fcfb51949de2f06a9d91dbee93c9" +dependencies = [ + "core-foundation-sys", + "libc", +] + [[package]] name = "tabled" version = "0.12.2" @@ -4402,7 +4674,7 @@ dependencies = [ "env_logger 0.10.1", "futures-channel", "futures-util", - "hyper", + "hyper 0.14.28", "log", "native-tls", "rustls 0.21.10", @@ -4455,6 +4727,28 @@ dependencies = [ "winnow", ] +[[package]] +name = "tower" +version = "0.4.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b8fa9be0de6cf49e536ce1851f987bd21a43b771b09473c3549a6c853db37c1c" +dependencies = [ + "futures-core", + "futures-util", + "pin-project", + "pin-project-lite", + "tokio", + "tower-layer", + "tower-service", + "tracing", +] + +[[package]] +name = "tower-layer" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c20c8dbed6283a09604c3e69b4b7eeb54e298b8a600d4d5ecb5ad39de609f1d0" + [[package]] name = "tower-service" version = "0.3.2" @@ -4770,6 +5064,18 @@ dependencies = [ "wasm-bindgen-shared", ] +[[package]] +name = "wasm-bindgen-futures" +version = "0.4.39" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ac36a15a220124ac510204aec1c3e5db8a22ab06fd6706d881dc6149f8ed9a12" +dependencies = [ + "cfg-if", + "js-sys", + "wasm-bindgen", + "web-sys", +] + [[package]] name = "wasm-bindgen-macro" version = "0.2.89" @@ -5005,6 +5311,16 @@ dependencies = [ "memchr", ] +[[package]] +name = "winreg" +version = "0.50.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "524e57b2c537c0f9b1e69f1965311ec12182b4122e45035b1508cd24d2adadb1" +dependencies = [ + "cfg-if", + "windows-sys 0.48.0", +] + [[package]] name = "wyz" version = "0.5.1" diff --git a/nautilus_core/network/Cargo.toml b/nautilus_core/network/Cargo.toml index e4a9fbac6d4d..7d06aa7e1445 100644 --- a/nautilus_core/network/Cargo.toml +++ b/nautilus_core/network/Cargo.toml @@ -20,12 +20,16 @@ tracing = { workspace = true } tokio = { workspace = true } dashmap = "5.5.3" futures-util = "0.3.29" -hyper = { version = "0.14.28", features = ["client", "http1", "server"] } +hyper = "1.1.0" hyper-tls = "0.5.0" nonzero_ext = "0.3.0" tokio-tungstenite = { path = "./tokio-tungstenite", features = ["rustls-tls-native-roots"] } +reqwest = "0.11.23" +http-body-util = "0.1.0" +http = "1.0.0" [dev-dependencies] +axum = "0.7.3" criterion = { workspace = true } serde_json = { workspace = true } tracing-test = "0.2.4" diff --git a/nautilus_core/network/benches/test_client.rs b/nautilus_core/network/benches/test_client.rs index 1a2b06f9b1ce..2f32c96c2f69 100644 --- a/nautilus_core/network/benches/test_client.rs +++ b/nautilus_core/network/benches/test_client.rs @@ -15,8 +15,8 @@ use std::collections::HashMap; -use hyper::Method; use nautilus_network::http::InnerHttpClient; +use reqwest::Method; const CONCURRENCY: usize = 256; const TOTAL: usize = 1_000_000; diff --git a/nautilus_core/network/benches/test_server.rs b/nautilus_core/network/benches/test_server.rs index 1457cae1760d..369d2deeddfa 100644 --- a/nautilus_core/network/benches/test_server.rs +++ b/nautilus_core/network/benches/test_server.rs @@ -13,27 +13,18 @@ // limitations under the License. // ------------------------------------------------------------------------------------------------- -use std::{convert::Infallible, net::SocketAddr}; - -use hyper::{ - service::{make_service_fn, service_fn}, - Body, Request, Response, Server, -}; - -async fn handle(_req: Request) -> Result, Infallible> { - Ok(Response::new(Body::from("Hello World"))) -} +use axum::{routing::get, Router}; #[tokio::main] async fn main() { // Construct our SocketAddr to listen on... - let addr = SocketAddr::from(([127, 0, 0, 1], 3000)); - - // And a MakeService to handle each connection... - let make_service = make_service_fn(|_conn| async { Ok::<_, Infallible>(service_fn(handle)) }); + let router = Router::new().route("/", get(|| async { "Hello World" })); - // Then bind and serve... - let server = Server::bind(&addr).serve(make_service); + // Create a listener and serve... + let listener = tokio::net::TcpListener::bind("127.0.0.1:3000") + .await + .unwrap(); + let server = axum::serve(listener, router); // And run forever... if let Err(e) = server.await { diff --git a/nautilus_core/network/src/http.rs b/nautilus_core/network/src/http.rs index 24dacc9fc15a..f89c7a3d877c 100644 --- a/nautilus_core/network/src/http.rs +++ b/nautilus_core/network/src/http.rs @@ -20,9 +20,11 @@ use std::{ }; use futures_util::{stream, StreamExt}; -use hyper::{Body, Client, Method, Request, Response}; -use hyper_tls::HttpsConnector; use pyo3::{exceptions::PyException, prelude::*, types::PyBytes}; +use reqwest::{ + header::{HeaderMap, HeaderName}, + Method, Response, Url, +}; use crate::ratelimiter::{clock::MonotonicClock, quota::Quota, RateLimiter}; @@ -36,7 +38,7 @@ use crate::ratelimiter::{clock::MonotonicClock, quota::Quota, RateLimiter}; /// for the give `header_keys`. #[derive(Clone)] pub struct InnerHttpClient { - client: Client>, + client: reqwest::Client, header_keys: Vec, } @@ -48,25 +50,28 @@ impl InnerHttpClient { headers: HashMap, body: Option>, ) -> Result> { - let mut req_builder = Request::builder().method(method).uri(url); + let reqwest_url = Url::parse(url.as_str())?; - for (header_name, header_value) in &headers { - req_builder = req_builder.header(header_name, header_value); + let mut header_map = HeaderMap::new(); + for (header_key, header_value) in &headers { + let key = HeaderName::from_bytes(header_key.as_bytes())?; + let _ = header_map.insert(key, header_value.parse().unwrap()); } - let req = if let Some(body) = body { - req_builder.body(Body::from(body))? - } else { - req_builder.body(Body::empty())? + let request_builder = self.client.request(method, reqwest_url).headers(header_map); + + let request = match body { + Some(b) => request_builder.body(b).build()?, + None => request_builder.build()?, }; - let res = self.client.request(req).await?; + let res = self.client.execute(request).await?; self.to_response(res).await } pub async fn to_response( &self, - res: Response, + res: Response, ) -> Result> { let headers: HashMap = self .header_keys @@ -76,7 +81,7 @@ impl InnerHttpClient { .map(|(k, v)| (k.clone(), v.to_owned())) .collect(); let status = res.status().as_u16(); - let bytes = hyper::body::to_bytes(res.into_body()).await?; + let bytes = res.bytes().await?; Ok(HttpResponse { status, @@ -137,8 +142,7 @@ pub struct HttpResponse { impl Default for InnerHttpClient { fn default() -> Self { - let https = HttpsConnector::new(); - let client = Client::builder().build::<_, hyper::Body>(https); + let client = reqwest::Client::new(); Self { client, header_keys: Default::default(), @@ -188,8 +192,7 @@ impl HttpClient { keyed_quotas: Vec<(String, Quota)>, default_quota: Option, ) -> Self { - let https = HttpsConnector::new(); - let client = Client::builder().build::<_, hyper::Body>(https); + let client = reqwest::Client::new(); let rate_limiter = Arc::new(RateLimiter::new_with_quota(default_quota, keyed_quotas)); let client = InnerHttpClient { @@ -249,56 +252,16 @@ impl HttpClient { //////////////////////////////////////////////////////////////////////////////// #[cfg(test)] mod tests { - use std::{ - convert::Infallible, - net::{SocketAddr, TcpListener}, - }; + use std::net::{SocketAddr, TcpListener}; - use hyper::{ - service::{make_service_fn, service_fn}, - Body, Method, Request, Response, Server, StatusCode, + use axum::{ + routing::{delete, get, patch, post}, + serve, Router, }; - use tokio::sync::oneshot; + use http::status::StatusCode; use super::*; - async fn handle(req: Request) -> Result, Infallible> { - match (req.method(), req.uri().path()) { - (&Method::GET, "/get") => { - let response = Response::new(Body::from("hello-world!")); - Ok(response) - } - (&Method::POST, "/post") => { - let response = Response::builder() - .status(StatusCode::OK) - .body(Body::empty()) - .unwrap(); - Ok(response) - } - (&Method::PATCH, "/patch") => { - let response = Response::builder() - .status(StatusCode::OK) - .body(Body::empty()) - .unwrap(); - Ok(response) - } - (&Method::DELETE, "/delete") => { - let response = Response::builder() - .status(StatusCode::OK) - .body(Body::empty()) - .unwrap(); - Ok(response) - } - _ => { - let response = Response::builder() - .status(StatusCode::NOT_FOUND) - .body(Body::empty()) - .unwrap(); - Ok(response) - } - } - } - fn get_unique_port() -> u16 { // Create a temporary TcpListener to get an available port let listener = @@ -311,37 +274,41 @@ mod tests { port } - fn start_test_server() -> (SocketAddr, oneshot::Sender<()>) { - let addr: SocketAddr = ([127, 0, 0, 1], get_unique_port()).into(); - let make_svc = make_service_fn(|_conn| async { Ok::<_, Infallible>(service_fn(handle)) }); - - let (tx, rx) = oneshot::channel::<()>(); - - let server = Server::bind(&addr).serve(make_svc); + fn create_router() -> Router { + Router::new() + .route("/get", get(|| async { "hello-world!" })) + .route("/post", post(|| async { StatusCode::OK })) + .route("/patch", patch(|| async { StatusCode::OK })) + .route("/delete", delete(|| async { StatusCode::OK })) + } - let graceful = server.with_graceful_shutdown(async { - if let Err(e) = rx.await { - eprintln!("shutdown signal error: {e}"); - } - }); + async fn start_test_server() -> Result> { + let port = get_unique_port(); + let listener = tokio::net::TcpListener::bind(format!("127.0.0.1:{}", port)) + .await + .unwrap(); + let addr = listener.local_addr().unwrap(); - tokio::spawn(async { - if let Err(e) = graceful.await { - eprintln!("server error: {e}"); - } + tokio::spawn(async move { + serve(listener, create_router()).await.unwrap(); }); - (addr, tx) + Ok(addr) } #[tokio::test] async fn test_get() { - let (addr, _shutdown_tx) = start_test_server(); - let url = format!("http://{}:{}", addr.ip(), addr.port()); + let addr = start_test_server().await.unwrap(); + let url = format!("http://{}", addr); let client = InnerHttpClient::default(); let response = client - .send_request(Method::GET, format!("{url}/get"), HashMap::new(), None) + .send_request( + reqwest::Method::GET, + format!("{url}/get"), + HashMap::new(), + None, + ) .await .unwrap(); @@ -351,12 +318,17 @@ mod tests { #[tokio::test] async fn test_post() { - let (addr, _shutdown_tx) = start_test_server(); - let url = format!("http://{}:{}", addr.ip(), addr.port()); + let addr = start_test_server().await.unwrap(); + let url = format!("http://{}", addr); let client = InnerHttpClient::default(); let response = client - .send_request(Method::POST, format!("{url}/post"), HashMap::new(), None) + .send_request( + reqwest::Method::POST, + format!("{url}/post"), + HashMap::new(), + None, + ) .await .unwrap(); @@ -365,8 +337,8 @@ mod tests { #[tokio::test] async fn test_post_with_body() { - let (addr, _shutdown_tx) = start_test_server(); - let url = format!("http://{}:{}", addr.ip(), addr.port()); + let addr = start_test_server().await.unwrap(); + let url = format!("http://{}", addr); let client = InnerHttpClient::default(); @@ -380,12 +352,14 @@ mod tests { serde_json::Value::String("value2".to_string()), ); + println!("{:?}", body); + let body_string = serde_json::to_string(&body).unwrap(); let body_bytes = body_string.into_bytes(); let response = client .send_request( - Method::POST, + reqwest::Method::POST, format!("{url}/post"), HashMap::new(), Some(body_bytes), @@ -398,12 +372,17 @@ mod tests { #[tokio::test] async fn test_patch() { - let (addr, _shutdown_tx) = start_test_server(); - let url = format!("http://{}:{}", addr.ip(), addr.port()); + let addr = start_test_server().await.unwrap(); + let url = format!("http://{}", addr); let client = InnerHttpClient::default(); let response = client - .send_request(Method::PATCH, format!("{url}/patch"), HashMap::new(), None) + .send_request( + reqwest::Method::PATCH, + format!("{url}/patch"), + HashMap::new(), + None, + ) .await .unwrap(); @@ -412,13 +391,13 @@ mod tests { #[tokio::test] async fn test_delete() { - let (addr, _shutdown_tx) = start_test_server(); - let url = format!("http://{}:{}", addr.ip(), addr.port()); + let addr = start_test_server().await.unwrap(); + let url = format!("http://{}", addr); let client = InnerHttpClient::default(); let response = client .send_request( - Method::DELETE, + reqwest::Method::DELETE, format!("{url}/delete"), HashMap::new(), None,