From 8e50615ef96251afe4f8dfb9bf02051d96a87087 Mon Sep 17 00:00:00 2001 From: zuiyu1998 <1542844298@qq.com> Date: Thu, 25 Jan 2024 15:10:23 +0800 Subject: [PATCH 1/4] =?UTF-8?q?=F0=9F=A6=84=20refactor(hls):=20Change=20hy?= =?UTF-8?q?per=20to=20axum?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- Cargo.lock | 209 ++++++++++++++++++++++++++++++++----- protocol/hls/Cargo.toml | 2 +- protocol/hls/src/server.rs | 29 +++-- 3 files changed, 197 insertions(+), 43 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 10ddc450..c1a98a17 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -241,13 +241,13 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8582122b8edba2af43eaf6b80dbfd33f421b5a0eb3a3113d21bc096ac5b44faf" dependencies = [ "async-trait", - "axum-core", + "axum-core 0.3.3", "bitflags 1.3.2", "bytes", "futures-util", - "http", - "http-body", - "hyper", + "http 0.2.9", + "http-body 0.4.5", + "hyper 0.14.25", "itoa", "matchit", "memchr", @@ -267,6 +267,40 @@ dependencies = [ "tower-service", ] +[[package]] +name = "axum" +version = "0.7.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1236b4b292f6c4d6dc34604bb5120d85c3fe1d1aa596bd5cc52ca054d13e7b9e" +dependencies = [ + "async-trait", + "axum-core 0.4.3", + "bytes", + "futures-util", + "http 1.0.0", + "http-body 1.0.0", + "http-body-util", + "hyper 1.1.0", + "hyper-util", + "itoa", + "matchit", + "memchr", + "mime", + "percent-encoding", + "pin-project-lite", + "rustversion", + "serde", + "serde_json", + "serde_path_to_error", + "serde_urlencoded", + "sync_wrapper", + "tokio", + "tower", + "tower-layer", + "tower-service", + "tracing", +] + [[package]] name = "axum-core" version = "0.3.3" @@ -276,12 +310,33 @@ dependencies = [ "async-trait", "bytes", "futures-util", - "http", - "http-body", + "http 0.2.9", + "http-body 0.4.5", + "mime", + "rustversion", + "tower-layer", + "tower-service", +] + +[[package]] +name = "axum-core" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a15c63fd72d41492dc4f497196f5da1fb04fb7529e631d73630d1b491e47a2e3" +dependencies = [ + "async-trait", + "bytes", + "futures-util", + "http 1.0.0", + "http-body 1.0.0", + "http-body-util", "mime", + "pin-project-lite", "rustversion", + "sync_wrapper", "tower-layer", "tower-service", + "tracing", ] [[package]] @@ -1203,7 +1258,26 @@ dependencies = [ "futures-core", "futures-sink", "futures-util", - "http", + "http 0.2.9", + "indexmap 2.1.0", + "slab", + "tokio", + "tokio-util 0.7.4", + "tracing", +] + +[[package]] +name = "h2" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "31d030e59af851932b72ceebadf4a2b5986dba4c3b99dd2493f8273a0f151943" +dependencies = [ + "bytes", + "fnv", + "futures-core", + "futures-sink", + "futures-util", + "http 1.0.0", "indexmap 2.1.0", "slab", "tokio", @@ -1281,10 +1355,10 @@ dependencies = [ name = "hls" version = "0.4.1" dependencies = [ + "axum 0.7.4", "byteorder", "bytes", "failure", - "hyper", "log", "rtmp 0.4.1", "streamhub 0.1.1", @@ -1324,6 +1398,17 @@ dependencies = [ "itoa", ] +[[package]] +name = "http" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b32afd38673a8016f7c9ae69e5af41a58f81b1d31689040f2f1959594ce194ea" +dependencies = [ + "bytes", + "fnv", + "itoa", +] + [[package]] name = "http-body" version = "0.4.5" @@ -1331,7 +1416,30 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d5f38f16d184e36f2408a55281cd658ecbd3ca05cce6d6510a176eca393e26d1" dependencies = [ "bytes", - "http", + "http 0.2.9", + "pin-project-lite", +] + +[[package]] +name = "http-body" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1cac85db508abc24a2e48553ba12a996e87244a0395ce011e62b37158745d643" +dependencies = [ + "bytes", + "http 1.0.0", +] + +[[package]] +name = "http-body-util" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "41cb79eb393015dadd30fc252023adb0b2400a0caee0fa2a077e6e21a551e840" +dependencies = [ + "bytes", + "futures-util", + "http 1.0.0", + "http-body 1.0.0", "pin-project-lite", ] @@ -1345,7 +1453,7 @@ checksum = "0bfe8eed0a9285ef776bb792479ea3834e8b94e13d615c2f66d03dd50a435a29" name = "http-server" version = "0.1.0" dependencies = [ - "axum", + "axum 0.6.10", "env_logger", "log", "serde", @@ -1374,7 +1482,7 @@ dependencies = [ "bytes", "failure", "futures", - "hyper", + "hyper 0.14.25", "log", "rtmp 0.4.1", "streamhub 0.1.1", @@ -1398,20 +1506,39 @@ dependencies = [ "futures-channel", "futures-core", "futures-util", - "h2", - "http", - "http-body", + "h2 0.3.24", + "http 0.2.9", + "http-body 0.4.5", "httparse", "httpdate", "itoa", "pin-project-lite", - "socket2", + "socket2 0.4.7", "tokio", "tower-service", "tracing", "want", ] +[[package]] +name = "hyper" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fb5aa53871fc917b1a9ed87b683a5d86db645e23acb32c2e0785a353e522fb75" +dependencies = [ + "bytes", + "futures-channel", + "futures-util", + "h2 0.4.2", + "http 1.0.0", + "http-body 1.0.0", + "httparse", + "httpdate", + "itoa", + "pin-project-lite", + "tokio", +] + [[package]] name = "hyper-tls" version = "0.5.0" @@ -1419,12 +1546,30 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d6183ddfa99b85da61a140bea0efc93fdf56ceaa041b37d553518030827f9905" dependencies = [ "bytes", - "hyper", + "hyper 0.14.25", "native-tls", "tokio", "tokio-native-tls", ] +[[package]] +name = "hyper-util" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bdea9aac0dbe5a9240d68cfd9501e2db94222c6dc06843e06640b9e07f0fdc67" +dependencies = [ + "bytes", + "futures-channel", + "futures-util", + "http 1.0.0", + "http-body 1.0.0", + "hyper 1.1.0", + "pin-project-lite", + "socket2 0.5.5", + "tokio", + "tracing", +] + [[package]] name = "iana-time-zone" version = "0.1.53" @@ -2181,10 +2326,10 @@ dependencies = [ "encoding_rs", "futures-core", "futures-util", - "h2", - "http", - "http-body", - "hyper", + "h2 0.3.24", + "http 0.2.9", + "http-body 0.4.5", + "hyper 0.14.25", "hyper-tls", "ipnet", "js-sys", @@ -2597,6 +2742,16 @@ dependencies = [ "winapi", ] +[[package]] +name = "socket2" +version = "0.5.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7b5fac59a5cb5dd637972e5fca70daf0523c9067fcdc4842f053dae04a18f8e9" +dependencies = [ + "libc", + "windows-sys 0.48.0", +] + [[package]] name = "spin" version = "0.5.2" @@ -2847,7 +3002,7 @@ dependencies = [ "parking_lot", "pin-project-lite", "signal-hook-registry", - "socket2", + "socket2 0.4.7", "tokio-macros", "windows-sys 0.45.0", ] @@ -2958,8 +3113,8 @@ dependencies = [ "bytes", "futures-core", "futures-util", - "http", - "http-body", + "http 0.2.9", + "http-body 0.4.5", "http-range-header", "pin-project-lite", "tower", @@ -3364,7 +3519,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f08dfd7a6e3987e255c4dbe710dde5d94d0f0574f8a21afa95d171376c143106" dependencies = [ "log", - "socket2", + "socket2 0.4.7", "thiserror", "tokio", "webrtc-util", @@ -3713,7 +3868,7 @@ name = "xiu" version = "0.9.0" dependencies = [ "anyhow", - "axum", + "axum 0.6.10", "clap", "env_logger_extend", "failure", @@ -3755,7 +3910,7 @@ dependencies = [ "chrono", "failure", "hex", - "http", + "http 0.2.9", "indexmap 1.9.3", "lazy_static", "log", @@ -3773,7 +3928,7 @@ dependencies = [ "bytes", "bytesio 0.3.0", "failure", - "http", + "http 0.2.9", "indexmap 1.9.3", "log", "streamhub 0.1.1", diff --git a/protocol/hls/Cargo.toml b/protocol/hls/Cargo.toml index 845cbc9e..f58fd342 100644 --- a/protocol/hls/Cargo.toml +++ b/protocol/hls/Cargo.toml @@ -11,11 +11,11 @@ keywords = ["hls", "video", "streaming"] edition = "2018" [dependencies] +axum = { version = "0.7.4" } byteorder = "1.4.2" bytes = "1.0.0" failure = "0.1.1" log = "0.4" -hyper = { version = "0.14", features = ["full"] } tokio-util = { version = "0.6.5", features = ["codec"] } streamhub = { path = "../../library/streamhub/" } diff --git a/protocol/hls/src/server.rs b/protocol/hls/src/server.rs index 204e5904..1aed35c2 100644 --- a/protocol/hls/src/server.rs +++ b/protocol/hls/src/server.rs @@ -1,9 +1,10 @@ use { - hyper::{ - service::{make_service_fn, service_fn}, - Body, Request, Response, Server, StatusCode, + axum::{ + body::Body, extract::Request, handler::HandlerWithoutStateExt, http::StatusCode, + response::Response, }, - tokio::fs::File, + std::net::SocketAddr, + tokio::{fs::File, net::TcpListener}, tokio_util::codec::{BytesCodec, FramedRead}, }; @@ -11,7 +12,7 @@ type GenericError = Box; type Result = std::result::Result; static NOTFOUND: &[u8] = b"Not Found"; -async fn handle_connection(req: Request) -> Result> { +async fn handle_connection(req: Request) -> Response { let path = req.uri().path(); let mut file_path: String = String::from(""); @@ -45,7 +46,6 @@ async fn handle_connection(req: Request) -> Result> { file_path = format!("./{app_name}/{stream_name}/{ts_name}.ts"); } } - simple_file_send(file_path.as_str()).await } @@ -57,28 +57,27 @@ fn not_found() -> Response { .unwrap() } -async fn simple_file_send(filename: &str) -> Result> { +async fn simple_file_send(filename: &str) -> Response { // Serve a file by asynchronously reading it by chunks using tokio-util crate. if let Ok(file) = File::open(filename).await { let stream = FramedRead::new(file, BytesCodec::new()); - let body = Body::wrap_stream(stream); - return Ok(Response::new(body)); + let body = Body::from_stream(stream); + return Response::new(body); } - Ok(not_found()) + not_found() } pub async fn run(port: usize) -> Result<()> { let listen_address = format!("0.0.0.0:{port}"); - let sock_addr = listen_address.parse().unwrap(); + let sock_addr: SocketAddr = listen_address.parse().unwrap(); - let new_service = - make_service_fn(move |_| async { Ok::<_, GenericError>(service_fn(handle_connection)) }); + let listener = TcpListener::bind(sock_addr).await?; - let server = Server::bind(&sock_addr).serve(new_service); log::info!("Hls server listening on http://{}", sock_addr); - server.await?; + + axum::serve(listener, handle_connection.into_service()).await?; Ok(()) } From 8fc6889200578ed29c9c0a88abfe0c964b3630d4 Mon Sep 17 00:00:00 2001 From: zuiyu1998 <1542844298@qq.com> Date: Thu, 25 Jan 2024 15:38:01 +0800 Subject: [PATCH 2/4] =?UTF-8?q?=F0=9F=A6=84=20refactor(flv):=20Change=20hy?= =?UTF-8?q?per=20to=20axum?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- Cargo.lock | 2 +- protocol/httpflv/Cargo.toml | 2 +- protocol/httpflv/src/lib.rs | 1 - protocol/httpflv/src/server.rs | 45 +++++++++++++++------------------- 4 files changed, 22 insertions(+), 28 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c1a98a17..f9f6d133 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1478,11 +1478,11 @@ checksum = "c4a1e36c821dbe04574f602848a19f742f4fb3c98d40449f11bcad18d6b17421" name = "httpflv" version = "0.3.1" dependencies = [ + "axum 0.7.4", "byteorder", "bytes", "failure", "futures", - "hyper 0.14.25", "log", "rtmp 0.4.1", "streamhub 0.1.1", diff --git a/protocol/httpflv/Cargo.toml b/protocol/httpflv/Cargo.toml index d0b14b1a..ca4e1792 100644 --- a/protocol/httpflv/Cargo.toml +++ b/protocol/httpflv/Cargo.toml @@ -15,7 +15,7 @@ byteorder = "1.4.2" bytes = "1.0.0" failure = "0.1.1" log = "0.4" -hyper = { version = "0.14", features = ["full"] } +axum = { version = "0.7.4" } futures = "0.3" streamhub = { path = "../../library/streamhub/" } diff --git a/protocol/httpflv/src/lib.rs b/protocol/httpflv/src/lib.rs index f83b3f30..f50d1467 100644 --- a/protocol/httpflv/src/lib.rs +++ b/protocol/httpflv/src/lib.rs @@ -1,4 +1,3 @@ -extern crate hyper; extern crate rtmp; pub mod define; diff --git a/protocol/httpflv/src/server.rs b/protocol/httpflv/src/server.rs index 3cb66da7..bc758a7b 100644 --- a/protocol/httpflv/src/server.rs +++ b/protocol/httpflv/src/server.rs @@ -1,13 +1,16 @@ use { super::httpflv::HttpFlv, - futures::channel::mpsc::unbounded, - hyper::{ - server::conn::AddrStream, - service::{make_service_fn, service_fn}, - Body, Request, Response, Server, StatusCode, + axum::{ + body::Body, + extract::{ConnectInfo, Request, State}, + handler::Handler, + http::StatusCode, + response::Response, }, + futures::channel::mpsc::unbounded, std::net::SocketAddr, streamhub::define::StreamHubEventSender, + tokio::net::TcpListener, }; type GenericError = Box; @@ -15,10 +18,10 @@ type Result = std::result::Result; static NOTFOUND: &[u8] = b"Not Found"; async fn handle_connection( + State(event_producer): State, // event_producer: ChannelEventProducer + ConnectInfo(remote_addr): ConnectInfo, req: Request, - event_producer: StreamHubEventSender, // event_producer: ChannelEventProducer - remote_addr: SocketAddr, -) -> Result> { +) -> Response { let path = req.uri().path(); match path.find(".flv") { @@ -46,39 +49,31 @@ async fn handle_connection( } }); - let mut resp = Response::new(Body::wrap_stream(http_response_data_consumer)); + let mut resp = Response::new(Body::from_stream(http_response_data_consumer)); resp.headers_mut() .insert("Access-Control-Allow-Origin", "*".parse().unwrap()); - Ok(resp) + resp } - _ => Ok(Response::builder() + _ => Response::builder() .status(StatusCode::NOT_FOUND) .body(NOTFOUND.into()) - .unwrap()), + .unwrap(), } } pub async fn run(event_producer: StreamHubEventSender, port: usize) -> Result<()> { let listen_address = format!("0.0.0.0:{port}"); - let sock_addr = listen_address.parse().unwrap(); - - let new_service = make_service_fn(move |socket: &AddrStream| { - let remote_addr = socket.remote_addr(); - let flv_copy = event_producer.clone(); - async move { - Ok::<_, GenericError>(service_fn(move |req| { - handle_connection(req, flv_copy.clone(), remote_addr) - })) - } - }); + let sock_addr: SocketAddr = listen_address.parse().unwrap(); - let server = Server::bind(&sock_addr).serve(new_service); + let listener = TcpListener::bind(sock_addr).await?; log::info!("Httpflv server listening on http://{}", sock_addr); - server.await?; + let handle_connection = handle_connection.with_state(event_producer.clone()); + + axum::serve(listener, handle_connection.into_make_service()).await?; Ok(()) } From c56dd7008155220e3fd0f3a0b250e1285b4a1a57 Mon Sep 17 00:00:00 2001 From: zuiyu1998 <1542844298@qq.com> Date: Thu, 25 Jan 2024 15:41:23 +0800 Subject: [PATCH 3/4] =?UTF-8?q?=F0=9F=A6=84=20refactor:=20Change=20hyper?= =?UTF-8?q?=20to=20axum?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- confs/local/hls.Cargo.toml | 2 +- confs/local/httpflv.Cargo.toml | 2 +- confs/online/hls.Cargo.toml | 2 +- confs/online/httpflv.Cargo.toml | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/confs/local/hls.Cargo.toml b/confs/local/hls.Cargo.toml index 845cbc9e..0649d4b4 100644 --- a/confs/local/hls.Cargo.toml +++ b/confs/local/hls.Cargo.toml @@ -15,7 +15,7 @@ byteorder = "1.4.2" bytes = "1.0.0" failure = "0.1.1" log = "0.4" -hyper = { version = "0.14", features = ["full"] } +axum = { version = "0.7.4" } tokio-util = { version = "0.6.5", features = ["codec"] } streamhub = { path = "../../library/streamhub/" } diff --git a/confs/local/httpflv.Cargo.toml b/confs/local/httpflv.Cargo.toml index d0b14b1a..ca4e1792 100644 --- a/confs/local/httpflv.Cargo.toml +++ b/confs/local/httpflv.Cargo.toml @@ -15,7 +15,7 @@ byteorder = "1.4.2" bytes = "1.0.0" failure = "0.1.1" log = "0.4" -hyper = { version = "0.14", features = ["full"] } +axum = { version = "0.7.4" } futures = "0.3" streamhub = { path = "../../library/streamhub/" } diff --git a/confs/online/hls.Cargo.toml b/confs/online/hls.Cargo.toml index cf8bee34..ac41ba25 100644 --- a/confs/online/hls.Cargo.toml +++ b/confs/online/hls.Cargo.toml @@ -15,7 +15,7 @@ byteorder = "1.4.2" bytes = "1.0.0" failure = "0.1.1" log = "0.4" -hyper = { version = "0.14", features = ["full"] } +axum = { version = "0.7.4" } tokio-util = { version = "0.6.5", features = ["codec"] } streamhub = "0.1.2" diff --git a/confs/online/httpflv.Cargo.toml b/confs/online/httpflv.Cargo.toml index 7837dd2c..76f9077d 100644 --- a/confs/online/httpflv.Cargo.toml +++ b/confs/online/httpflv.Cargo.toml @@ -15,7 +15,7 @@ byteorder = "1.4.2" bytes = "1.0.0" failure = "0.1.1" log = "0.4" -hyper = { version = "0.14", features = ["full"] } +axum = { version = "0.7.4" } futures = "0.3" streamhub = "0.1.2" From 98820469b3e0e36b5213176fbef029bb0bcb5494 Mon Sep 17 00:00:00 2001 From: zuiyu1998 <1542844298@qq.com> Date: Mon, 29 Jan 2024 11:03:02 +0800 Subject: [PATCH 4/4] =?UTF-8?q?=F0=9F=90=9E=20fix(flv):=20Change=20service?= =?UTF-8?q?=20type?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- protocol/httpflv/src/server.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/protocol/httpflv/src/server.rs b/protocol/httpflv/src/server.rs index bc758a7b..58a4aa95 100644 --- a/protocol/httpflv/src/server.rs +++ b/protocol/httpflv/src/server.rs @@ -73,7 +73,11 @@ pub async fn run(event_producer: StreamHubEventSender, port: usize) -> Result<() let handle_connection = handle_connection.with_state(event_producer.clone()); - axum::serve(listener, handle_connection.into_make_service()).await?; + axum::serve( + listener, + handle_connection.into_make_service_with_connect_info::(), + ) + .await?; Ok(()) }