Skip to content

Commit

Permalink
refactor packerizer unpacketizer mod
Browse files Browse the repository at this point in the history
  • Loading branch information
harlanc committed Nov 11, 2023
1 parent f38d38a commit 9fce66d
Show file tree
Hide file tree
Showing 3 changed files with 140 additions and 59 deletions.
10 changes: 10 additions & 0 deletions protocol/rtmp/src/chunk/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,21 @@ impl ChunkBasicHeader {
//5.3.1.2
#[derive(Eq, PartialEq, Debug, Clone)]
pub struct ChunkMessageHeader {
//save the absolute timestamp of chunk type 0.
//or save the computed absolute timestamp of chunk type 1,2,3.
pub timestamp: u32,
pub msg_length: u32,
pub msg_type_id: u8,
pub msg_streamd_id: u32,
// Save the timestamp delta of chunk type 1,2.
// For chunk type 3, this field saves the timestamp
// delta inherited from the previous chunk type 1 or 2.
// NOTE: this value should be reset to 0 when the current chunk type is 0.
pub timestamp_delta: u32,
// This field will be set for type 0,1,2 .If the timestamp/timestamp delta >= 0xFFFFFF
// then set this value to true else set it to false.
// Note that when the chunk format is 3, this value will be inherited from
// the most recent chunk 0, 1, or 2 chunk.(5.3.1.3 Extended Timestamp).
pub is_extended_timestamp: bool,
}

Expand Down
51 changes: 39 additions & 12 deletions protocol/rtmp/src/chunk/packetizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ pub struct ChunkPacketizer {
max_chunk_size: usize,
//bytes: Cursor<Vec<u8>>,
writer: AsyncBytesWriter,
//save timestamp need to be write for chunk
timestamp: u32,
}

impl ChunkPacketizer {
Expand All @@ -32,33 +34,46 @@ impl ChunkPacketizer {
//chunk_info: ChunkInfo::new(),
writer: AsyncBytesWriter::new(io),
max_chunk_size: CHUNK_SIZE as usize,
timestamp: 0,
}
}
fn zip_chunk_header(&mut self, chunk_info: &mut ChunkInfo) -> Result<PackResult, PackError> {
chunk_info.basic_header.format = 0;

let pre_header = self
if let Some(pre_header) = self
.csid_2_chunk_header
.get_mut(&chunk_info.basic_header.chunk_stream_id);

if let Some(val) = pre_header {
.get_mut(&chunk_info.basic_header.chunk_stream_id)
{
let cur_msg_header = &mut chunk_info.message_header;
let pre_msg_header = &val.message_header;
let pre_msg_header = &mut pre_header.message_header;

if cur_msg_header.msg_streamd_id == pre_msg_header.msg_streamd_id {
chunk_info.basic_header.format = 1;
cur_msg_header.timestamp -= pre_msg_header.timestamp;
cur_msg_header.timestamp_delta =
cur_msg_header.timestamp - pre_msg_header.timestamp;

if cur_msg_header.msg_type_id == pre_msg_header.msg_type_id
&& cur_msg_header.msg_length == pre_msg_header.msg_length
{
chunk_info.basic_header.format = 2;
if chunk_info.message_header.timestamp == pre_msg_header.timestamp {
if cur_msg_header.timestamp_delta == pre_msg_header.timestamp_delta {
chunk_info.basic_header.format = 3;
}
}
}
} else {
assert_eq!(chunk_info.message_header.timestamp_delta, 0);
}

//update pre header
self.csid_2_chunk_header.insert(
chunk_info.basic_header.chunk_stream_id,
ChunkHeader {
basic_header: chunk_info.basic_header.clone(),
message_header: chunk_info.message_header.clone(),
},
);

Ok(PackResult::Success)
}

Expand All @@ -81,11 +96,18 @@ impl ChunkPacketizer {
basic_header: &ChunkBasicHeader,
message_header: &mut ChunkMessageHeader,
) -> Result<(), PackError> {
let timestamp = if message_header.timestamp >= 0xFFFFFF {
self.timestamp = if basic_header.format == 0 {
message_header.timestamp
} else {
message_header.timestamp_delta
};

let timestamp = if self.timestamp >= 0xFFFFFF {
message_header.is_extended_timestamp = true;
0xFFFFFF
} else {
message_header.timestamp
message_header.is_extended_timestamp = false;
self.timestamp
};

match basic_header.format {
Expand All @@ -101,6 +123,7 @@ impl ChunkPacketizer {
self.writer.write_u24::<BigEndian>(timestamp)?;
self.writer
.write_u24::<BigEndian>(message_header.msg_length)?;
self.writer.write_u8(message_header.msg_type_id)?;
}
2 => {
self.writer.write_u24::<BigEndian>(timestamp)?;
Expand All @@ -120,6 +143,11 @@ impl ChunkPacketizer {
pub async fn write_chunk(&mut self, chunk_info: &mut ChunkInfo) -> Result<(), PackError> {
self.zip_chunk_header(chunk_info)?;

log::trace!(
"write_chunk current timestamp: {}",
chunk_info.message_header.timestamp,
);

let mut whole_payload_size = chunk_info.payload.len();

self.write_basic_header(
Expand All @@ -129,11 +157,10 @@ impl ChunkPacketizer {
self.write_message_header(&chunk_info.basic_header, &mut chunk_info.message_header)?;

if chunk_info.message_header.is_extended_timestamp {
self.write_extened_timestamp(chunk_info.message_header.timestamp)?;
self.write_extened_timestamp(self.timestamp)?;
}

let mut cur_payload_size: usize;

while whole_payload_size > 0 {
cur_payload_size = if whole_payload_size > self.max_chunk_size {
self.max_chunk_size
Expand All @@ -149,7 +176,7 @@ impl ChunkPacketizer {
if whole_payload_size > 0 {
self.write_basic_header(3, chunk_info.basic_header.chunk_stream_id)?;
if chunk_info.message_header.is_extended_timestamp {
self.write_extened_timestamp(chunk_info.message_header.timestamp)?;
self.write_extened_timestamp(self.timestamp)?;
}
}
}
Expand Down
138 changes: 91 additions & 47 deletions protocol/rtmp/src/chunk/unpacketizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use {
super::{
define,
errors::{UnpackError, UnpackErrorValue},
ChunkBasicHeader, ChunkHeader, ChunkInfo, ChunkMessageHeader,
ChunkBasicHeader, ChunkInfo, ChunkMessageHeader,
},
crate::messages::define::msg_type_id,
byteorder::{BigEndian, LittleEndian},
Expand Down Expand Up @@ -67,8 +67,16 @@ pub struct ChunkUnpacketizer {

//https://doc.rust-lang.org/stable/rust-by-example/scope/lifetime/fn.html
//https://zhuanlan.zhihu.com/p/165976086
//We use this member to generate a complete message:
// - basic_header: the 2 fields will be updated from each chunk.
// - message_header: whose fields need to be updated for current chunk
// depends on the format id from basic header.
// Each field can inherit the value from the previous chunk.
// - payload: If the message's payload size is longger than the max chunk size,
// the whole payload will be splitted into several chunks.
//
pub current_chunk_info: ChunkInfo,
chunk_headers: HashMap<u32, ChunkHeader>,
chunk_message_headers: HashMap<u32, ChunkMessageHeader>,
chunk_read_state: ChunkReadState,
msg_header_read_state: MessageHeaderReadState,
max_chunk_size: usize,
Expand All @@ -87,7 +95,7 @@ impl ChunkUnpacketizer {
Self {
reader: BytesReader::new(BytesMut::new()),
current_chunk_info: ChunkInfo::default(),
chunk_headers: HashMap::new(),
chunk_message_headers: HashMap::new(),
chunk_read_state: ChunkReadState::ReadBasicHeader,
msg_header_read_state: MessageHeaderReadState::ReadTimeStamp,
max_chunk_size: define::INIT_CHUNK_SIZE as usize,
Expand Down Expand Up @@ -280,20 +288,35 @@ impl ChunkUnpacketizer {
}

//todo
//Only when the csid is changed, we restore the chunk message header
//One AV message may be splitted into serval chunks, the csid
//will be updated when one av message's chunks are completely
//sent/received??
if csid != self.current_chunk_info.basic_header.chunk_stream_id {
log::trace!(
"read_basic_header, chunk stream id update, new: {}, old:{}, byte: {}",
csid,
self.current_chunk_info.basic_header.chunk_stream_id,
byte
);
if let Some(header) = self.chunk_headers.get_mut(&csid) {
self.current_chunk_info.basic_header = header.basic_header.clone();
self.current_chunk_info.message_header = header.message_header.clone();
self.print_current_basic_header();
//If the chunk stream id is changed, then we should
//restore the cached chunk message header used for
//getting the correct message header fields.
match self.chunk_message_headers.get_mut(&csid) {
Some(header) => {
self.current_chunk_info.message_header = header.clone();
self.print_current_basic_header();
}
None => {
//The format id of the first chunk of a new chunk stream id must be zero.
assert_eq!(format_id, 0);
}
}
}

if format_id == 0 {
self.current_message_header().timestamp_delta = 0;
}
// each chunk will read and update the csid and format id
self.current_chunk_info.basic_header.chunk_stream_id = csid;
self.current_chunk_info.basic_header.format = format_id;
self.print_current_basic_header();
Expand Down Expand Up @@ -327,10 +350,14 @@ impl ChunkUnpacketizer {
self.reader.len(),
);

//fix bug: the is_extended_timestamp flag should be set in the read_message_header process
//each time and should not be saved which will lead to incorrectly reading an extra 4 bytes,
//so here at the start of the read_message_header process, reset this flag.
self.current_message_header().is_extended_timestamp = false;
//Reset is_extended_timestamp for type 0 ,1 ,2 , for type 3 ,this field will
//inherited from the most recent chunk 0, 1, or 2.
//(This field is present in Type 3 chunks when the most recent Type 0,
//1, or 2 chunk for the same chunk stream ID indicated the presence of
//an extended timestamp field. 5.3.1.3)
if self.current_chunk_info.basic_header.format != 3 {
self.current_message_header().is_extended_timestamp = false;
}

match self.current_chunk_info.basic_header.format {
/*****************************************************************/
Expand Down Expand Up @@ -474,36 +501,54 @@ impl ChunkUnpacketizer {
extended_timestamp = self.reader.read_u32::<BigEndian>()?;
}

match self.current_chunk_info.basic_header.format {
let cur_format_id = self.current_chunk_info.basic_header.format;

match cur_format_id {
0 => {
if self.current_message_header().is_extended_timestamp {
self.current_message_header().timestamp = extended_timestamp;
}
}
1 => {
if self.current_message_header().is_extended_timestamp {
self.current_message_header().timestamp += extended_timestamp - 0xFFFFFF;
} else {
self.current_message_header().timestamp +=
self.current_message_header().timestamp_delta;
}
}
2 => {
1 | 2 | 3 => {
//The extended timestamp field is present in Type 3 chunks when the most recent Type 0,
//1, or 2 chunk for the same chunk stream ID indicated the presence of
//an extended timestamp field.
if self.current_message_header().is_extended_timestamp {
self.current_message_header().timestamp =
self.current_message_header().timestamp - 0xFFFFFF + extended_timestamp;
} else {
self.current_message_header().timestamp +=
self.current_message_header().timestamp_delta;
self.current_message_header().timestamp_delta = extended_timestamp;
}
}
3 => {
//log::info!("format 3==============");
}
//todo: 3 should also be processed

_ => {}
}

if cur_format_id == 1
|| cur_format_id == 2
|| (cur_format_id == 3 && self.current_chunk_info.payload.len() == 0)
{
let timestamp = self.current_message_header().timestamp;
let timestamp_delta = self.current_message_header().timestamp_delta;

let (cur_abs_timestamp, is_overflow) = timestamp.overflowing_add(timestamp_delta);
if is_overflow {
log::warn!(
"the current timestamp is overflow, current timestamp: {}, timestamp delta: {}",
timestamp,
timestamp_delta
);
}
self.current_message_header().timestamp = cur_abs_timestamp;
}

let timestamp = self.current_message_header().timestamp;
let timestamp_delta = self.current_message_header().timestamp_delta;

log::trace!(
"the current timestamp is overflow,format: {}, current timestamp: {}. timestamp delta: {}",
self.current_chunk_info.basic_header.format,
timestamp,
timestamp_delta
);

self.chunk_read_state = ChunkReadState::ReadMessagePayload;
self.print_current_message_header(ChunkReadState::ReadExtendedTimestamp);

Expand Down Expand Up @@ -549,26 +594,13 @@ impl ChunkUnpacketizer {

if self.current_chunk_info.payload.len() == whole_msg_length {
self.chunk_read_state = ChunkReadState::Finish;
//get the complete chunk and clear the current chunk payload
let chunk_info = self.current_chunk_info.clone();
self.current_chunk_info.payload.clear();

let csid = self.current_chunk_info.basic_header.chunk_stream_id;

//todo
if let Some(header) = self.chunk_headers.get_mut(&csid) {
header.basic_header = self.current_chunk_info.basic_header.clone();
header.message_header = self.current_chunk_info.message_header.clone();
} else {
let chunk_header = ChunkHeader {
basic_header: self.current_chunk_info.basic_header.clone(),
message_header: self.current_chunk_info.message_header.clone(),
};
self.chunk_headers.insert(csid, chunk_header);
}

// self.chunk_headers
// .entry(self.current_chunk_info.basic_header.chunk_stream_id)
// .or_insert(chunk_header);
self.chunk_message_headers
.insert(csid, self.current_chunk_info.message_header.clone());

return Ok(UnpackResult::ChunkInfo(chunk_info));
}
Expand Down Expand Up @@ -617,6 +649,18 @@ mod tests {
)
}

#[test]
fn test_overflow_add() {
let aa: u32 = u32::MAX;
println!("{}", aa);

let (a, b) = aa.overflowing_add(5);

let b = aa.wrapping_add(5);

println!("{}", b);
}

// #[test]
// fn test_window_acknowlage_size_set_peer_bandwidth() {
// let mut unpacker = ChunkUnpacketizer::new();
Expand Down

0 comments on commit 9fce66d

Please sign in to comment.