Skip to content

Commit

Permalink
[feat] support HLS record
Browse files Browse the repository at this point in the history
  • Loading branch information
harlanc committed Aug 4, 2023
1 parent 4f118a9 commit b754b69
Show file tree
Hide file tree
Showing 25 changed files with 201 additions and 90 deletions.
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ Xiu is a simple,high performance and secure live media server written in pure Ru
- [x] Support querying stream information.
- [x] Support notification of stream status.
- [x] Support token authentications.
- [x] Support recording live streams into HLS files(m3u8+ts).


## Preparation
Expand Down Expand Up @@ -163,6 +164,8 @@ You can use command line to configure the xiu server easily. You can specify to
enabled = true
# listening port
port = 8080
# need record the live stream or not
need_record = true

##### Log

Expand Down
3 changes: 3 additions & 0 deletions README_CN.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ XIU是用纯Rust开发的一款简单和安全的流媒体服务器,目前支
- [x] 支持查询流信息
- [x] 支持流事件通知
- [x] 支持token鉴权
- [x] 支持把直播流录制成HLS协议(m3u8+ts)文件.

## 准备工作
#### 安装 Rust and Cargo
Expand Down Expand Up @@ -163,6 +164,8 @@ XIU是用纯Rust开发的一款简单和安全的流媒体服务器,目前支
enabled = true
# listening port
port = 8080
# need record the live stream or not
need_record = true

##### Log

Expand Down
2 changes: 1 addition & 1 deletion application/http-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ serde_json = { version = "1", default-features = false, features = [
] }
axum = "0.6.10"
log = "0.4.0"
env_logger = "0.9.3"
env_logger = "0.10.0"

[dependencies.tokio]
version = "1.4.0"
Expand Down
2 changes: 1 addition & 1 deletion application/pprtmp/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ edition = "2021"
[dependencies]
anyhow = "^1.0"
log = "0.4.0"
env_logger = "0.9.3"
env_logger = "0.10.0"
clap = "4.1.4"

rtmp = "0.4.0"
Expand Down
4 changes: 2 additions & 2 deletions application/xiu/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[package]
name = "xiu"
description = "A powerful live server by Rust ."
version = "0.7.0"
version = "0.8.0"
authors = ["HarlanC <[email protected]"]
repository = "https://github.com/harlanc/xiu"
license = "MIT"
Expand Down Expand Up @@ -31,7 +31,7 @@ streamhub = "0.1.0"
rtmp = "0.4.0"
xrtsp = "0.1.0"
httpflv = "0.3.0"
hls = "0.3.0"
hls = "0.4.0"

[features]
default = ["std"]
Expand Down
2 changes: 2 additions & 0 deletions application/xiu/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -322,5 +322,7 @@ Open issues if you have any problems. Star and pull requests are welcomed. Your
- Fix RTMP examples in README.
## v0.7.0
- Support RTSP.
## v0.8.0
- Support HLS record.


8 changes: 4 additions & 4 deletions application/xiu/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,11 @@ impl ApiService {
async fn get_stream_status(&self) -> Result<String> {
let (data_sender, mut data_receiver) = mpsc::unbounded_channel();
let (size_sender, size_receiver) = oneshot::channel();
let channel_event = define::StreamHubEvent::ApiStatistic {
let hub_event = define::StreamHubEvent::ApiStatistic {
data_sender,
size_sender,
};
if let Err(err) = self.channel_event_producer.send(channel_event) {
if let Err(err) = self.channel_event_producer.send(hub_event) {
log::error!("send api event error: {}", err);
}
let mut data = Vec::new();
Expand Down Expand Up @@ -74,9 +74,9 @@ impl ApiService {
let id_result = Uuid::from_str2(&id.id);

if let Some(id) = id_result {
let channel_event = define::StreamHubEvent::ApiKickClient { id };
let hub_event = define::StreamHubEvent::ApiKickClient { id };

if let Err(err) = self.channel_event_producer.send(channel_event) {
if let Err(err) = self.channel_event_producer.send(hub_event) {
log::error!("send api kick_off_client event error: {}", err);
}
}
Expand Down
4 changes: 3 additions & 1 deletion application/xiu/src/config/config_rtmp_hls.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,11 @@ port = 1935
[hls]
enabled = true
port = 8080
#defalut false
need_record = true

#######################################
# LOG configurations #
#######################################
[log]
level = "info"
level = "info"
7 changes: 6 additions & 1 deletion application/xiu/src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ impl Config {
hls_config = Some(HlsConfig {
enabled: true,
port: hls_port,
need_record: false,
});
}

Expand Down Expand Up @@ -113,6 +114,8 @@ pub struct HttpFlvConfig {
pub struct HlsConfig {
pub enabled: bool,
pub port: usize,
//record or not
pub need_record: bool,
}

pub enum LogLevel {
Expand Down Expand Up @@ -164,7 +167,9 @@ fn test_toml_parse() {
// Err(err) => print!("{}\n", err),
// }

let str = fs::read_to_string("/Users/zexu/github/xiu_live_rust/application/xiu/src/config/config.toml");
let str = fs::read_to_string(
"/Users/zexu/github/xiu_live_rust/application/xiu/src/config/config.toml",
);

match str {
Ok(val) => {
Expand Down
2 changes: 1 addition & 1 deletion application/xiu/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ async fn main() -> Result<()> {

let mut cmd = Command::new("XIU")
.bin_name("xiu")
.version("0.7.0")
.version("0.8.0")
.author("HarlanC <[email protected]>")
.about("A secure and easy to use live media server, hope you love it!!!")
.arg(
Expand Down
35 changes: 29 additions & 6 deletions application/xiu/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use {
super::config::Config,
//https://rustcc.cn/article?id=6dcbf032-0483-4980-8bfe-c64a7dfb33c7
anyhow::Result,
hls::rtmp_event_processor::RtmpEventProcessor,
hls::remuxer::HlsRemuxer,
hls::server as hls_server,
httpflv::server as httpflv_server,
rtmp::{
Expand Down Expand Up @@ -49,6 +49,7 @@ impl Service {
self.start_rtmp(&mut stream_hub).await?;
self.start_rtsp(&mut stream_hub).await?;
self.start_http_api_server(&mut stream_hub).await?;
self.start_rtmp_remuxer(&mut stream_hub).await?;

tokio::spawn(async move {
stream_hub.run().await;
Expand Down Expand Up @@ -149,14 +150,33 @@ impl Service {
log::error!("rtmp server error: {}\n", err);
}
});

self.start_rtmp_remuxer(stream_hub).await?;
}

Ok(())
}

async fn start_rtmp_remuxer(&mut self, stream_hub: &mut StreamsHub) -> Result<()> {
//The remuxer now is used for rtsp2rtmp, so both rtsp/rtmp cfg need to be enabled.
let mut rtsp_enabled = false;
if let Some(rtsp_cfg_value) = &self.cfg.rtsp {
if rtsp_cfg_value.enabled {
rtsp_enabled = true;
}
}
if !rtsp_enabled {
return Ok(());
}

let mut rtmp_enabled: bool = false;
if let Some(rtmp_cfg_value) = &self.cfg.rtmp {
if rtmp_cfg_value.enabled {
rtmp_enabled = true;
}
}
if !rtmp_enabled {
return Ok(());
}

let event_producer = stream_hub.get_hub_event_sender();
let broadcast_event_receiver = stream_hub.get_client_event_consumer();
let mut remuxer = RtmpRemuxer::new(broadcast_event_receiver, event_producer);
Expand Down Expand Up @@ -224,11 +244,14 @@ impl Service {

let event_producer = stream_hub.get_hub_event_sender();
let cient_event_consumer = stream_hub.get_client_event_consumer();
let mut rtmp_event_processor =
RtmpEventProcessor::new(cient_event_consumer, event_producer);
let mut hls_remuxer = HlsRemuxer::new(
cient_event_consumer,
event_producer,
hls_cfg_value.need_record,
);

tokio::spawn(async move {
if let Err(err) = rtmp_event_processor.run().await {
if let Err(err) = hls_remuxer.run().await {
log::error!("rtmp event processor error: {}\n", err);
}
});
Expand Down
2 changes: 1 addition & 1 deletion confs/local/pprtmp.Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ anyhow = "^1.0"
rtmp = { path = "../../protocol/rtmp/" }
streamhub = { path = "../../library/streamhub/" }
log = "0.4.0"
env_logger = "0.9.3"
env_logger = "0.10.0"
clap = "4.1.4"

[dependencies.tokio]
Expand Down
2 changes: 1 addition & 1 deletion confs/online/hls.Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[package]
name = "hls"
description = "hls library."
version = "0.3.0"
version = "0.4.0"
authors = ["HarlanC <[email protected]"]
repository = "https://github.com/harlanc/xiu"
license = "MIT"
Expand Down
2 changes: 1 addition & 1 deletion confs/online/pprtmp.Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ edition = "2021"
[dependencies]
anyhow = "^1.0"
log = "0.4.0"
env_logger = "0.9.3"
env_logger = "0.10.0"
clap = "4.1.4"

rtmp = "0.4.0"
Expand Down
4 changes: 2 additions & 2 deletions confs/online/xiu.Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[package]
name = "xiu"
description = "A powerful live server by Rust ."
version = "0.7.0"
version = "0.8.0"
authors = ["HarlanC <[email protected]"]
repository = "https://github.com/harlanc/xiu"
license = "MIT"
Expand Down Expand Up @@ -31,7 +31,7 @@ streamhub = "0.1.0"
rtmp = "0.4.0"
xrtsp = "0.1.0"
httpflv = "0.3.0"
hls = "0.3.0"
hls = "0.4.0"

[features]
default = ["std"]
Expand Down
4 changes: 1 addition & 3 deletions library/logger/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,8 @@ repository = "https://github.com/harlanc/xiu"

[dependencies]
anyhow = "^1.0"
env_logger = "0.9.3"
env_logger = "0.10.0"
job_scheduler = "1.2.1"
chrono = "0.4"
failure = "0.1.1"
log = "0.4.0"


2 changes: 1 addition & 1 deletion protocol/hls/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[package]
name = "hls"
description = "hls library."
version = "0.3.0"
version = "0.4.0"
authors = ["HarlanC <[email protected]"]
repository = "https://github.com/harlanc/xiu"
license = "MIT"
Expand Down
3 changes: 3 additions & 0 deletions protocol/hls/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,5 +25,8 @@ Change the listening port type.
## 0.3.0
- Do some refactoring.
- Reference bytesio v0.3.0.
## 0.4.0
- Support record.



41 changes: 37 additions & 4 deletions protocol/hls/src/flv2hls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ pub struct Flv2HlsRemuxer {
}

impl Flv2HlsRemuxer {
pub fn new(duration: i64, app_name: String, stream_name: String) -> Self {
pub fn new(duration: i64, app_name: String, stream_name: String, need_record: bool) -> Self {
let mut ts_muxer = TsMuxer::new();
let audio_pid = ts_muxer
.add_stream(epsi_stream_type::PSI_STREAM_AAC, BytesMut::new())
Expand All @@ -42,8 +42,6 @@ impl Flv2HlsRemuxer {
.add_stream(epsi_stream_type::PSI_STREAM_H264, BytesMut::new())
.unwrap();

let m3u8_name = format!("{stream_name}.m3u8");

Self {
video_demuxer: FlvVideoTagDemuxer::new(),
audio_demuxer: FlvAudioTagDemuxer::new(),
Expand All @@ -62,7 +60,7 @@ impl Flv2HlsRemuxer {
video_pid,
audio_pid,

m3u8_handler: M3u8::new(duration, 6, m3u8_name, app_name, stream_name),
m3u8_handler: M3u8::new(duration, 6, app_name, stream_name, need_record),
}
}

Expand Down Expand Up @@ -173,3 +171,38 @@ impl Flv2HlsRemuxer {
self.m3u8_handler.clear()
}
}
#[cfg(test)]
mod tests {
// use std::{
// env,
// fs::{self},
// };

// #[test]
// fn test_new_path() {
// if let Ok(current_dir) = env::current_dir() {
// println!("Current directory: {:?}", current_dir);
// } else {
// eprintln!("Failed to get the current directory");
// }
// let directory = "test";

// if !fs::metadata(directory).is_ok() {
// match fs::create_dir(directory) {
// Ok(_) => println!("目录已创建"),
// Err(err) => println!("创建目录时出错:{:?}", err),
// }
// } else {
// println!("目录已存在");
// }
// }
// #[test]
// fn test_copy() {
// let path = "./aa.txt";
// if let Err(err) = fs::copy(path, "./test/") {
// println!("copy err: {err}");
// } else {
// println!("copy success");
// }
// }
}
6 changes: 2 additions & 4 deletions protocol/hls/src/flv_data_receiver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ use {
pub struct FlvDataReceiver {
app_name: String,
stream_name: String,

event_producer: StreamHubEventSender,
data_consumer: FrameDataReceiver,
media_processor: Flv2HlsRemuxer,
Expand All @@ -34,19 +33,18 @@ impl FlvDataReceiver {
app_name: String,
stream_name: String,
event_producer: StreamHubEventSender,

duration: i64,
need_record: bool,
) -> Self {
let (_, data_consumer) = mpsc::unbounded_channel();
let subscriber_id = Uuid::new(RandomDigitCount::Four);

Self {
app_name: app_name.clone(),
stream_name: stream_name.clone(),

data_consumer,
event_producer,
media_processor: Flv2HlsRemuxer::new(duration, app_name, stream_name),
media_processor: Flv2HlsRemuxer::new(duration, app_name, stream_name, need_record),
subscriber_id,
}
}
Expand Down
Loading

0 comments on commit b754b69

Please sign in to comment.