diff --git a/README.md b/README.md index ce186ff5..66788dcc 100644 --- a/README.md +++ b/README.md @@ -44,6 +44,7 @@ Xiu is a simple,high performance and secure live media server written in pure Ru - [x] Support querying stream information. - [x] Support notification of stream status. - [x] Support token authentications. +- [x] Support recording live streams into HLS files(m3u8+ts). ## Preparation @@ -163,6 +164,8 @@ You can use command line to configure the xiu server easily. You can specify to enabled = true # listening port port = 8080 + # need record the live stream or not + need_record = true ##### Log diff --git a/README_CN.md b/README_CN.md index 94377b96..517fb442 100644 --- a/README_CN.md +++ b/README_CN.md @@ -42,6 +42,7 @@ XIU是用纯Rust开发的一款简单和安全的流媒体服务器,目前支 - [x] 支持查询流信息 - [x] 支持流事件通知 - [x] 支持token鉴权 +- [x] 支持把直播流录制成HLS协议(m3u8+ts)文件. ## 准备工作 #### 安装 Rust and Cargo @@ -163,6 +164,8 @@ XIU是用纯Rust开发的一款简单和安全的流媒体服务器,目前支 enabled = true # listening port port = 8080 + # need record the live stream or not + need_record = true ##### Log diff --git a/application/http-server/Cargo.toml b/application/http-server/Cargo.toml index d30beb3b..48babb1d 100644 --- a/application/http-server/Cargo.toml +++ b/application/http-server/Cargo.toml @@ -15,7 +15,7 @@ serde_json = { version = "1", default-features = false, features = [ ] } axum = "0.6.10" log = "0.4.0" -env_logger = "0.9.3" +env_logger = "0.10.0" [dependencies.tokio] version = "1.4.0" diff --git a/application/pprtmp/Cargo.toml b/application/pprtmp/Cargo.toml index bdad116c..64009460 100644 --- a/application/pprtmp/Cargo.toml +++ b/application/pprtmp/Cargo.toml @@ -8,7 +8,7 @@ edition = "2021" [dependencies] anyhow = "^1.0" log = "0.4.0" -env_logger = "0.9.3" +env_logger = "0.10.0" clap = "4.1.4" rtmp = "0.4.0" diff --git a/application/xiu/Cargo.toml b/application/xiu/Cargo.toml index cc062c67..ce85a896 100644 --- a/application/xiu/Cargo.toml +++ b/application/xiu/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "xiu" description = "A powerful live server by Rust ." -version = "0.7.0" +version = "0.8.0" authors = ["HarlanC Result { let (data_sender, mut data_receiver) = mpsc::unbounded_channel(); let (size_sender, size_receiver) = oneshot::channel(); - let channel_event = define::StreamHubEvent::ApiStatistic { + let hub_event = define::StreamHubEvent::ApiStatistic { data_sender, size_sender, }; - if let Err(err) = self.channel_event_producer.send(channel_event) { + if let Err(err) = self.channel_event_producer.send(hub_event) { log::error!("send api event error: {}", err); } let mut data = Vec::new(); @@ -74,9 +74,9 @@ impl ApiService { let id_result = Uuid::from_str2(&id.id); if let Some(id) = id_result { - let channel_event = define::StreamHubEvent::ApiKickClient { id }; + let hub_event = define::StreamHubEvent::ApiKickClient { id }; - if let Err(err) = self.channel_event_producer.send(channel_event) { + if let Err(err) = self.channel_event_producer.send(hub_event) { log::error!("send api kick_off_client event error: {}", err); } } diff --git a/application/xiu/src/config/config_rtmp_hls.toml b/application/xiu/src/config/config_rtmp_hls.toml index 3ca1fd7c..04802d45 100644 --- a/application/xiu/src/config/config_rtmp_hls.toml +++ b/application/xiu/src/config/config_rtmp_hls.toml @@ -12,9 +12,11 @@ port = 1935 [hls] enabled = true port = 8080 +#defalut false +need_record = true ####################################### # LOG configurations # ####################################### [log] -level = "info" \ No newline at end of file +level = "info" diff --git a/application/xiu/src/config/mod.rs b/application/xiu/src/config/mod.rs index 8ad385a8..a7587830 100644 --- a/application/xiu/src/config/mod.rs +++ b/application/xiu/src/config/mod.rs @@ -56,6 +56,7 @@ impl Config { hls_config = Some(HlsConfig { enabled: true, port: hls_port, + need_record: false, }); } @@ -113,6 +114,8 @@ pub struct HttpFlvConfig { pub struct HlsConfig { pub enabled: bool, pub port: usize, + //record or not + pub need_record: bool, } pub enum LogLevel { @@ -164,7 +167,9 @@ fn test_toml_parse() { // Err(err) => print!("{}\n", err), // } - let str = fs::read_to_string("/Users/zexu/github/xiu_live_rust/application/xiu/src/config/config.toml"); + let str = fs::read_to_string( + "/Users/zexu/github/xiu_live_rust/application/xiu/src/config/config.toml", + ); match str { Ok(val) => { diff --git a/application/xiu/src/main.rs b/application/xiu/src/main.rs index 146916d7..a9e193a6 100644 --- a/application/xiu/src/main.rs +++ b/application/xiu/src/main.rs @@ -14,7 +14,7 @@ async fn main() -> Result<()> { let mut cmd = Command::new("XIU") .bin_name("xiu") - .version("0.7.0") + .version("0.8.0") .author("HarlanC ") .about("A secure and easy to use live media server, hope you love it!!!") .arg( diff --git a/application/xiu/src/service.rs b/application/xiu/src/service.rs index 7ab29f92..9c5e37fe 100644 --- a/application/xiu/src/service.rs +++ b/application/xiu/src/service.rs @@ -5,7 +5,7 @@ use { super::config::Config, //https://rustcc.cn/article?id=6dcbf032-0483-4980-8bfe-c64a7dfb33c7 anyhow::Result, - hls::rtmp_event_processor::RtmpEventProcessor, + hls::remuxer::HlsRemuxer, hls::server as hls_server, httpflv::server as httpflv_server, rtmp::{ @@ -49,6 +49,7 @@ impl Service { self.start_rtmp(&mut stream_hub).await?; self.start_rtsp(&mut stream_hub).await?; self.start_http_api_server(&mut stream_hub).await?; + self.start_rtmp_remuxer(&mut stream_hub).await?; tokio::spawn(async move { stream_hub.run().await; @@ -149,14 +150,33 @@ impl Service { log::error!("rtmp server error: {}\n", err); } }); - - self.start_rtmp_remuxer(stream_hub).await?; } Ok(()) } async fn start_rtmp_remuxer(&mut self, stream_hub: &mut StreamsHub) -> Result<()> { + //The remuxer now is used for rtsp2rtmp, so both rtsp/rtmp cfg need to be enabled. + let mut rtsp_enabled = false; + if let Some(rtsp_cfg_value) = &self.cfg.rtsp { + if rtsp_cfg_value.enabled { + rtsp_enabled = true; + } + } + if !rtsp_enabled { + return Ok(()); + } + + let mut rtmp_enabled: bool = false; + if let Some(rtmp_cfg_value) = &self.cfg.rtmp { + if rtmp_cfg_value.enabled { + rtmp_enabled = true; + } + } + if !rtmp_enabled { + return Ok(()); + } + let event_producer = stream_hub.get_hub_event_sender(); let broadcast_event_receiver = stream_hub.get_client_event_consumer(); let mut remuxer = RtmpRemuxer::new(broadcast_event_receiver, event_producer); @@ -224,11 +244,14 @@ impl Service { let event_producer = stream_hub.get_hub_event_sender(); let cient_event_consumer = stream_hub.get_client_event_consumer(); - let mut rtmp_event_processor = - RtmpEventProcessor::new(cient_event_consumer, event_producer); + let mut hls_remuxer = HlsRemuxer::new( + cient_event_consumer, + event_producer, + hls_cfg_value.need_record, + ); tokio::spawn(async move { - if let Err(err) = rtmp_event_processor.run().await { + if let Err(err) = hls_remuxer.run().await { log::error!("rtmp event processor error: {}\n", err); } }); diff --git a/confs/local/pprtmp.Cargo.toml b/confs/local/pprtmp.Cargo.toml index f3764269..64abe615 100644 --- a/confs/local/pprtmp.Cargo.toml +++ b/confs/local/pprtmp.Cargo.toml @@ -10,7 +10,7 @@ anyhow = "^1.0" rtmp = { path = "../../protocol/rtmp/" } streamhub = { path = "../../library/streamhub/" } log = "0.4.0" -env_logger = "0.9.3" +env_logger = "0.10.0" clap = "4.1.4" [dependencies.tokio] diff --git a/confs/online/hls.Cargo.toml b/confs/online/hls.Cargo.toml index eecb28da..c2d5f1bd 100644 --- a/confs/online/hls.Cargo.toml +++ b/confs/online/hls.Cargo.toml @@ -1,7 +1,7 @@ [package] name = "hls" description = "hls library." -version = "0.3.0" +version = "0.4.0" authors = ["HarlanC Self { + pub fn new(duration: i64, app_name: String, stream_name: String, need_record: bool) -> Self { let mut ts_muxer = TsMuxer::new(); let audio_pid = ts_muxer .add_stream(epsi_stream_type::PSI_STREAM_AAC, BytesMut::new()) @@ -42,8 +42,6 @@ impl Flv2HlsRemuxer { .add_stream(epsi_stream_type::PSI_STREAM_H264, BytesMut::new()) .unwrap(); - let m3u8_name = format!("{stream_name}.m3u8"); - Self { video_demuxer: FlvVideoTagDemuxer::new(), audio_demuxer: FlvAudioTagDemuxer::new(), @@ -62,7 +60,7 @@ impl Flv2HlsRemuxer { video_pid, audio_pid, - m3u8_handler: M3u8::new(duration, 6, m3u8_name, app_name, stream_name), + m3u8_handler: M3u8::new(duration, 6, app_name, stream_name, need_record), } } @@ -173,3 +171,38 @@ impl Flv2HlsRemuxer { self.m3u8_handler.clear() } } +#[cfg(test)] +mod tests { + // use std::{ + // env, + // fs::{self}, + // }; + + // #[test] + // fn test_new_path() { + // if let Ok(current_dir) = env::current_dir() { + // println!("Current directory: {:?}", current_dir); + // } else { + // eprintln!("Failed to get the current directory"); + // } + // let directory = "test"; + + // if !fs::metadata(directory).is_ok() { + // match fs::create_dir(directory) { + // Ok(_) => println!("目录已创建"), + // Err(err) => println!("创建目录时出错:{:?}", err), + // } + // } else { + // println!("目录已存在"); + // } + // } + // #[test] + // fn test_copy() { + // let path = "./aa.txt"; + // if let Err(err) = fs::copy(path, "./test/") { + // println!("copy err: {err}"); + // } else { + // println!("copy success"); + // } + // } +} diff --git a/protocol/hls/src/flv_data_receiver.rs b/protocol/hls/src/flv_data_receiver.rs index e10cb1da..24954765 100644 --- a/protocol/hls/src/flv_data_receiver.rs +++ b/protocol/hls/src/flv_data_receiver.rs @@ -22,7 +22,6 @@ use { pub struct FlvDataReceiver { app_name: String, stream_name: String, - event_producer: StreamHubEventSender, data_consumer: FrameDataReceiver, media_processor: Flv2HlsRemuxer, @@ -34,8 +33,8 @@ impl FlvDataReceiver { app_name: String, stream_name: String, event_producer: StreamHubEventSender, - duration: i64, + need_record: bool, ) -> Self { let (_, data_consumer) = mpsc::unbounded_channel(); let subscriber_id = Uuid::new(RandomDigitCount::Four); @@ -43,10 +42,9 @@ impl FlvDataReceiver { Self { app_name: app_name.clone(), stream_name: stream_name.clone(), - data_consumer, event_producer, - media_processor: Flv2HlsRemuxer::new(duration, app_name, stream_name), + media_processor: Flv2HlsRemuxer::new(duration, app_name, stream_name, need_record), subscriber_id, } } diff --git a/protocol/hls/src/lib.rs b/protocol/hls/src/lib.rs index 40fbd5f2..eb090e65 100644 --- a/protocol/hls/src/lib.rs +++ b/protocol/hls/src/lib.rs @@ -3,7 +3,7 @@ pub mod errors; pub mod flv2hls; pub mod flv_data_receiver; pub mod m3u8; -pub mod rtmp_event_processor; +pub mod remuxer; pub mod server; mod test_flv2hls; pub mod ts; diff --git a/protocol/hls/src/m3u8.rs b/protocol/hls/src/m3u8.rs index 399281d0..29521fc9 100644 --- a/protocol/hls/src/m3u8.rs +++ b/protocol/hls/src/m3u8.rs @@ -6,12 +6,12 @@ use { pub struct Segment { /*ts duration*/ - duration: i64, - discontinuity: bool, + pub duration: i64, + pub discontinuity: bool, /*ts name*/ - name: String, + pub name: String, path: String, - is_eof: bool, + pub is_eof: bool, } impl Segment { @@ -39,45 +39,59 @@ pub struct M3u8 { A duration of 10 seconds of media per file seems to strike a reasonable balance for most broadcast content. http://devimages.apple.com/iphone/samples/bipbop/bipbopall.m3u8*/ duration: i64, - - is_live: bool, /*How many files should be listed in the index file during a continuous, ongoing session? The normal recommendation is 3, but the optimum number may be larger.*/ live_ts_count: usize, segments: VecDeque, - is_header_generated: bool, - m3u8_header: String, m3u8_folder: String, - m3u8_name: String, + live_m3u8_name: String, ts_handler: Ts, + + need_record: bool, + vod_m3u8_content: String, + vod_m3u8_name: String, } impl M3u8 { pub fn new( duration: i64, live_ts_count: usize, - name: String, app_name: String, stream_name: String, + need_record: bool, ) -> Self { let m3u8_folder = format!("./{app_name}/{stream_name}"); fs::create_dir_all(m3u8_folder.clone()).unwrap(); - Self { + + let live_m3u8_name = format!("{stream_name}.m3u8"); + let vod_m3u8_name = if need_record { + format!("vod_{stream_name}.m3u8") + } else { + String::default() + }; + + let mut m3u8 = Self { version: 3, sequence_no: 0, duration, - is_live: true, live_ts_count, segments: VecDeque::new(), - is_header_generated: false, m3u8_folder, - m3u8_header: String::new(), - m3u8_name: name, + live_m3u8_name, ts_handler: Ts::new(app_name, stream_name), + // record, + need_record, + vod_m3u8_content: String::default(), + vod_m3u8_name, + }; + + if need_record { + m3u8.vod_m3u8_content = m3u8.generate_m3u8_header(true); } + m3u8 } pub fn add_segment( @@ -89,58 +103,65 @@ impl M3u8 { ) -> Result<(), MediaError> { let segment_count = self.segments.len(); - if self.is_live && segment_count >= self.live_ts_count { + if segment_count >= self.live_ts_count { let segment = self.segments.pop_front().unwrap(); - self.ts_handler.delete(segment.path); + if !self.need_record { + self.ts_handler.delete(segment.path); + } + self.sequence_no += 1; } - self.duration = std::cmp::max(duration, self.duration); - let (ts_name, ts_path) = self.ts_handler.write(ts_data)?; let segment = Segment::new(duration, discontinuity, ts_name, ts_path, is_eof); + + if self.need_record { + self.update_vod_m3u8(&segment); + } + self.segments.push_back(segment); Ok(()) } pub fn clear(&mut self) -> Result<(), MediaError> { - //clear ts - for segment in &self.segments { - self.ts_handler.delete(segment.path.clone()); + if self.need_record { + let vod_m3u8_path = format!("{}/{}", self.m3u8_folder, self.vod_m3u8_name); + let mut file_handler = File::create(vod_m3u8_path).unwrap(); + self.vod_m3u8_content += "#EXT-X-ENDLIST\n"; + file_handler.write_all(self.vod_m3u8_content.as_bytes())?; + } else { + for segment in &self.segments { + self.ts_handler.delete(segment.path.clone()); + } } - //clear m3u8 - let m3u8_path = format!("{}/{}", self.m3u8_folder, self.m3u8_name); - fs::remove_file(m3u8_path)?; + + //clear live m3u8 + let live_m3u8_path = format!("{}/{}", self.m3u8_folder, self.live_m3u8_name); + fs::remove_file(live_m3u8_path)?; Ok(()) } - pub fn generate_m3u8_header(&mut self) -> Result<(), MediaError> { - self.is_header_generated = true; - - let mut playlist_type: &str = ""; - let mut allow_cache: &str = ""; - if !self.is_live { - playlist_type = "#EXT-X-PLAYLIST-TYPE:VOD\n"; - allow_cache = "#EXT-X-ALLOW-CACHE:YES\n"; + pub fn generate_m3u8_header(&self, is_vod: bool) -> String { + let mut m3u8_header = "#EXTM3U\n".to_string(); + m3u8_header += format!("#EXT-X-VERSION:{}\n", self.version).as_str(); + m3u8_header += format!("#EXT-X-TARGETDURATION:{}\n", (self.duration + 999) / 1000).as_str(); + + if is_vod { + m3u8_header += "#EXT-X-MEDIA-SEQUENCE:0\n"; + m3u8_header += "#EXT-X-PLAYLIST-TYPE:VOD\n"; + m3u8_header += "#EXT-X-ALLOW-CACHE:YES\n"; + } else { + m3u8_header += format!("#EXT-X-MEDIA-SEQUENCE:{}\n", self.sequence_no).as_str(); } - self.m3u8_header = "#EXTM3U\n".to_string(); - self.m3u8_header += format!("#EXT-X-VERSION:{}\n", self.version).as_str(); - self.m3u8_header += - format!("#EXT-X-TARGETDURATION:{}\n", (self.duration + 999) / 1000).as_str(); - self.m3u8_header += format!("#EXT-X-MEDIA-SEQUENCE:{}\n", self.sequence_no).as_str(); - self.m3u8_header += playlist_type; - self.m3u8_header += allow_cache; - - Ok(()) + m3u8_header } pub fn refresh_playlist(&mut self) -> Result { - self.generate_m3u8_header()?; + let mut m3u8_content = self.generate_m3u8_header(false); - let mut m3u8_content = self.m3u8_header.clone(); for segment in &self.segments { if segment.discontinuity { m3u8_content += "#EXT-X-DISCONTINUITY\n"; @@ -158,11 +179,23 @@ impl M3u8 { } } - let m3u8_path = format!("{}/{}", self.m3u8_folder, self.m3u8_name); + let m3u8_path = format!("{}/{}", self.m3u8_folder, self.live_m3u8_name); let mut file_handler = File::create(m3u8_path).unwrap(); file_handler.write_all(m3u8_content.as_bytes())?; Ok(m3u8_content) } + + pub fn update_vod_m3u8(&mut self, segment: &Segment) { + if segment.discontinuity { + self.vod_m3u8_content += "#EXT-X-DISCONTINUITY\n"; + } + self.vod_m3u8_content += format!( + "#EXTINF:{:.3}\n{}\n", + segment.duration as f64 / 1000.0, + segment.name + ) + .as_str(); + } } diff --git a/protocol/hls/src/rtmp_event_processor.rs b/protocol/hls/src/remuxer.rs similarity index 79% rename from protocol/hls/src/rtmp_event_processor.rs rename to protocol/hls/src/remuxer.rs index b92df973..b6005d62 100644 --- a/protocol/hls/src/rtmp_event_processor.rs +++ b/protocol/hls/src/remuxer.rs @@ -1,21 +1,27 @@ use { super::{errors::HlsError, flv_data_receiver::FlvDataReceiver}, streamhub::{ - define::{StreamHubEventSender, BroadcastEvent, BroadcastEventReceiver}, + define::{BroadcastEvent, BroadcastEventReceiver, StreamHubEventSender}, stream::StreamIdentifier, }, }; -pub struct RtmpEventProcessor { +pub struct HlsRemuxer { client_event_consumer: BroadcastEventReceiver, event_producer: StreamHubEventSender, + need_record: bool, } -impl RtmpEventProcessor { - pub fn new(consumer: BroadcastEventReceiver, event_producer: StreamHubEventSender) -> Self { +impl HlsRemuxer { + pub fn new( + consumer: BroadcastEventReceiver, + event_producer: StreamHubEventSender, + need_record: bool, + ) -> Self { Self { client_event_consumer: consumer, event_producer, + need_record, } } @@ -34,6 +40,7 @@ impl RtmpEventProcessor { stream_name, self.event_producer.clone(), 5, + self.need_record, ); tokio::spawn(async move { diff --git a/protocol/hls/src/test_flv2hls.rs b/protocol/hls/src/test_flv2hls.rs index 4acd6140..0fbd68a2 100644 --- a/protocol/hls/src/test_flv2hls.rs +++ b/protocol/hls/src/test_flv2hls.rs @@ -64,7 +64,8 @@ mod tests { demuxer.read_flv_header()?; let start = Instant::now(); - let mut media_demuxer = Flv2HlsRemuxer::new(5, String::from("live"), String::from("test")); + let mut media_demuxer = + Flv2HlsRemuxer::new(5, String::from("live"), String::from("test"), false); loop { let data_ = demuxer.read_flv_tag(); diff --git a/protocol/hls/src/ts.rs b/protocol/hls/src/ts.rs index f402afb9..1a112103 100644 --- a/protocol/hls/src/ts.rs +++ b/protocol/hls/src/ts.rs @@ -6,22 +6,22 @@ use { pub struct Ts { ts_number: u32, - folder_name: String, + live_path: String, } impl Ts { pub fn new(app_name: String, stream_name: String) -> Self { - let folder_name = format!("./{app_name}/{stream_name}"); - fs::create_dir_all(folder_name.clone()).unwrap(); + let live_path = format!("./{app_name}/{stream_name}"); + fs::create_dir_all(live_path.clone()).unwrap(); Self { ts_number: 0, - folder_name, + live_path, } } pub fn write(&mut self, data: BytesMut) -> Result<(String, String), MediaError> { let ts_file_name = format!("{}.ts", self.ts_number); - let ts_file_path = format!("{}/{}", self.folder_name, ts_file_name); + let ts_file_path = format!("{}/{}", self.live_path, ts_file_name); self.ts_number += 1; let mut ts_file_handler = File::create(ts_file_path.clone())?;