From be56a09ca2123158975a2aea6c345d6efdd807b4 Mon Sep 17 00:00:00 2001 From: hailiang8 Date: Fri, 10 Nov 2023 19:05:19 +0800 Subject: [PATCH] refactor packerizer unpacketizer mod --- protocol/rtmp/src/chunk/mod.rs | 10 ++ protocol/rtmp/src/chunk/packetizer.rs | 46 +++++--- protocol/rtmp/src/chunk/unpacketizer.rs | 138 ++++++++++++++++-------- 3 files changed, 133 insertions(+), 61 deletions(-) diff --git a/protocol/rtmp/src/chunk/mod.rs b/protocol/rtmp/src/chunk/mod.rs index 90768e3f..81b828ca 100644 --- a/protocol/rtmp/src/chunk/mod.rs +++ b/protocol/rtmp/src/chunk/mod.rs @@ -26,11 +26,21 @@ impl ChunkBasicHeader { //5.3.1.2 #[derive(Eq, PartialEq, Debug, Clone)] pub struct ChunkMessageHeader { + //save the absolute timestamp of chunk type 0. + //or save the computed absolute timestamp of chunk type 1,2,3. pub timestamp: u32, pub msg_length: u32, pub msg_type_id: u8, pub msg_streamd_id: u32, + // Save the timestamp delta of chunk type 1,2. + // For chunk type 3, this field saves the timestamp + // delta inherited from the previous chunk type 1 or 2. + // NOTE: this value should be reset to 0 when the current chunk type is 0. pub timestamp_delta: u32, + // This field will be set for type 0,1,2 .If the timestamp/timestamp delta >= 0xFFFFFF + // then set this value to true else set it to false. + // Note that when the chunk format is 3, this value will be inherited from + // the most recent chunk 0, 1, or 2 chunk.(5.3.1.3 Extended Timestamp). pub is_extended_timestamp: bool, } diff --git a/protocol/rtmp/src/chunk/packetizer.rs b/protocol/rtmp/src/chunk/packetizer.rs index 2013e357..a4f9be0c 100644 --- a/protocol/rtmp/src/chunk/packetizer.rs +++ b/protocol/rtmp/src/chunk/packetizer.rs @@ -23,6 +23,8 @@ pub struct ChunkPacketizer { max_chunk_size: usize, //bytes: Cursor>, writer: AsyncBytesWriter, + //save timestamp need to be write for chunk + timestamp: u32, } impl ChunkPacketizer { @@ -32,15 +34,11 @@ impl ChunkPacketizer { //chunk_info: ChunkInfo::new(), writer: AsyncBytesWriter::new(io), max_chunk_size: CHUNK_SIZE as usize, + timestamp: 0, } } fn zip_chunk_header(&mut self, chunk_info: &mut ChunkInfo) -> Result { chunk_info.basic_header.format = 0; - //save the header data for update - let cur_chunk_header = ChunkHeader { - basic_header: chunk_info.basic_header.clone(), - message_header: chunk_info.message_header.clone(), - }; if let Some(pre_header) = self .csid_2_chunk_header @@ -51,21 +49,30 @@ impl ChunkPacketizer { if cur_msg_header.msg_streamd_id == pre_msg_header.msg_streamd_id { chunk_info.basic_header.format = 1; - cur_msg_header.timestamp -= pre_msg_header.timestamp; + cur_msg_header.timestamp_delta = + cur_msg_header.timestamp - pre_msg_header.timestamp; if cur_msg_header.msg_type_id == pre_msg_header.msg_type_id && cur_msg_header.msg_length == pre_msg_header.msg_length { chunk_info.basic_header.format = 2; - if cur_msg_header.timestamp == pre_msg_header.timestamp { + if cur_msg_header.timestamp_delta == pre_msg_header.timestamp_delta { chunk_info.basic_header.format = 3; } } } + } else { + assert_eq!(chunk_info.message_header.timestamp_delta, 0); } + //update pre header - self.csid_2_chunk_header - .insert(chunk_info.basic_header.chunk_stream_id, cur_chunk_header); + self.csid_2_chunk_header.insert( + chunk_info.basic_header.chunk_stream_id, + ChunkHeader { + basic_header: chunk_info.basic_header.clone(), + message_header: chunk_info.message_header.clone(), + }, + ); Ok(PackResult::Success) } @@ -89,11 +96,18 @@ impl ChunkPacketizer { basic_header: &ChunkBasicHeader, message_header: &mut ChunkMessageHeader, ) -> Result<(), PackError> { - let timestamp = if message_header.timestamp >= 0xFFFFFF { + self.timestamp = if basic_header.format == 0 { + message_header.timestamp + } else { + message_header.timestamp_delta + }; + + let timestamp = if self.timestamp >= 0xFFFFFF { message_header.is_extended_timestamp = true; 0xFFFFFF } else { - message_header.timestamp + message_header.is_extended_timestamp = false; + self.timestamp }; match basic_header.format { @@ -129,6 +143,11 @@ impl ChunkPacketizer { pub async fn write_chunk(&mut self, chunk_info: &mut ChunkInfo) -> Result<(), PackError> { self.zip_chunk_header(chunk_info)?; + log::trace!( + "write_chunk current timestamp: {}", + chunk_info.message_header.timestamp, + ); + let mut whole_payload_size = chunk_info.payload.len(); self.write_basic_header( @@ -138,11 +157,10 @@ impl ChunkPacketizer { self.write_message_header(&chunk_info.basic_header, &mut chunk_info.message_header)?; if chunk_info.message_header.is_extended_timestamp { - self.write_extened_timestamp(chunk_info.message_header.timestamp)?; + self.write_extened_timestamp(self.timestamp)?; } let mut cur_payload_size: usize; - while whole_payload_size > 0 { cur_payload_size = if whole_payload_size > self.max_chunk_size { self.max_chunk_size @@ -158,7 +176,7 @@ impl ChunkPacketizer { if whole_payload_size > 0 { self.write_basic_header(3, chunk_info.basic_header.chunk_stream_id)?; if chunk_info.message_header.is_extended_timestamp { - self.write_extened_timestamp(chunk_info.message_header.timestamp)?; + self.write_extened_timestamp(self.timestamp)?; } } } diff --git a/protocol/rtmp/src/chunk/unpacketizer.rs b/protocol/rtmp/src/chunk/unpacketizer.rs index 1dfa4789..c0403a4f 100644 --- a/protocol/rtmp/src/chunk/unpacketizer.rs +++ b/protocol/rtmp/src/chunk/unpacketizer.rs @@ -2,7 +2,7 @@ use { super::{ define, errors::{UnpackError, UnpackErrorValue}, - ChunkBasicHeader, ChunkHeader, ChunkInfo, ChunkMessageHeader, + ChunkBasicHeader, ChunkInfo, ChunkMessageHeader, }, crate::messages::define::msg_type_id, byteorder::{BigEndian, LittleEndian}, @@ -67,8 +67,16 @@ pub struct ChunkUnpacketizer { //https://doc.rust-lang.org/stable/rust-by-example/scope/lifetime/fn.html //https://zhuanlan.zhihu.com/p/165976086 + //We use this member to generate a complete message: + // - basic_header: the 2 fields will be updated from each chunk. + // - message_header: whose fields need to be updated for current chunk + // depends on the format id from basic header. + // Each field can inherit the value from the previous chunk. + // - payload: If the message's payload size is longger than the max chunk size, + // the whole payload will be splitted into several chunks. + // pub current_chunk_info: ChunkInfo, - chunk_headers: HashMap, + chunk_message_headers: HashMap, chunk_read_state: ChunkReadState, msg_header_read_state: MessageHeaderReadState, max_chunk_size: usize, @@ -87,7 +95,7 @@ impl ChunkUnpacketizer { Self { reader: BytesReader::new(BytesMut::new()), current_chunk_info: ChunkInfo::default(), - chunk_headers: HashMap::new(), + chunk_message_headers: HashMap::new(), chunk_read_state: ChunkReadState::ReadBasicHeader, msg_header_read_state: MessageHeaderReadState::ReadTimeStamp, max_chunk_size: define::INIT_CHUNK_SIZE as usize, @@ -280,6 +288,10 @@ impl ChunkUnpacketizer { } //todo + //Only when the csid is changed, we restore the chunk message header + //One AV message may be splitted into serval chunks, the csid + //will be updated when one av message's chunks are completely + //sent/received?? if csid != self.current_chunk_info.basic_header.chunk_stream_id { log::trace!( "read_basic_header, chunk stream id update, new: {}, old:{}, byte: {}", @@ -287,13 +299,24 @@ impl ChunkUnpacketizer { self.current_chunk_info.basic_header.chunk_stream_id, byte ); - if let Some(header) = self.chunk_headers.get_mut(&csid) { - self.current_chunk_info.basic_header = header.basic_header.clone(); - self.current_chunk_info.message_header = header.message_header.clone(); - self.print_current_basic_header(); + //If the chunk stream id is changed, then we should + //restore the cached chunk message header used for + //getting the correct message header fields. + match self.chunk_message_headers.get_mut(&csid) { + Some(header) => { + self.current_chunk_info.message_header = header.clone(); + self.print_current_basic_header(); + } + None => { + //The format id of the first chunk of a new chunk stream id must be zero. + assert_eq!(format_id, 0); + } } } - + if format_id == 0 { + self.current_message_header().timestamp_delta = 0; + } + // each chunk will read and update the csid and format id self.current_chunk_info.basic_header.chunk_stream_id = csid; self.current_chunk_info.basic_header.format = format_id; self.print_current_basic_header(); @@ -327,10 +350,14 @@ impl ChunkUnpacketizer { self.reader.len(), ); - //fix bug: the is_extended_timestamp flag should be set in the read_message_header process - //each time and should not be saved which will lead to incorrectly reading an extra 4 bytes, - //so here at the start of the read_message_header process, reset this flag. - self.current_message_header().is_extended_timestamp = false; + //Reset is_extended_timestamp for type 0 ,1 ,2 , for type 3 ,this field will + //inherited from the most recent chunk 0, 1, or 2. + //(This field is present in Type 3 chunks when the most recent Type 0, + //1, or 2 chunk for the same chunk stream ID indicated the presence of + //an extended timestamp field. 5.3.1.3) + if self.current_chunk_info.basic_header.format != 3 { + self.current_message_header().is_extended_timestamp = false; + } match self.current_chunk_info.basic_header.format { /*****************************************************************/ @@ -474,36 +501,54 @@ impl ChunkUnpacketizer { extended_timestamp = self.reader.read_u32::()?; } - match self.current_chunk_info.basic_header.format { + let cur_format_id = self.current_chunk_info.basic_header.format; + + match cur_format_id { 0 => { if self.current_message_header().is_extended_timestamp { self.current_message_header().timestamp = extended_timestamp; } } - 1 => { - if self.current_message_header().is_extended_timestamp { - self.current_message_header().timestamp += extended_timestamp - 0xFFFFFF; - } else { - self.current_message_header().timestamp += - self.current_message_header().timestamp_delta; - } - } - 2 => { + 1 | 2 | 3 => { + //The extended timestamp field is present in Type 3 chunks when the most recent Type 0, + //1, or 2 chunk for the same chunk stream ID indicated the presence of + //an extended timestamp field. if self.current_message_header().is_extended_timestamp { - self.current_message_header().timestamp = - self.current_message_header().timestamp - 0xFFFFFF + extended_timestamp; - } else { - self.current_message_header().timestamp += - self.current_message_header().timestamp_delta; + self.current_message_header().timestamp_delta = extended_timestamp; } } - 3 => { - //log::info!("format 3=============="); - } - //todo: 3 should also be processed + _ => {} } + if cur_format_id == 1 + || cur_format_id == 2 + || (cur_format_id == 3 && self.current_chunk_info.payload.len() == 0) + { + let timestamp = self.current_message_header().timestamp; + let timestamp_delta = self.current_message_header().timestamp_delta; + + let (cur_abs_timestamp, is_overflow) = timestamp.overflowing_add(timestamp_delta); + if is_overflow { + log::warn!( + "the current timestamp is overflow, current timestamp: {}, timestamp delta: {}", + timestamp, + timestamp_delta + ); + } + self.current_message_header().timestamp = cur_abs_timestamp; + } + + let timestamp = self.current_message_header().timestamp; + let timestamp_delta = self.current_message_header().timestamp_delta; + + log::trace!( + "the current timestamp is overflow,format: {}, current timestamp: {}. timestamp delta: {}", + self.current_chunk_info.basic_header.format, + timestamp, + timestamp_delta + ); + self.chunk_read_state = ChunkReadState::ReadMessagePayload; self.print_current_message_header(ChunkReadState::ReadExtendedTimestamp); @@ -549,26 +594,13 @@ impl ChunkUnpacketizer { if self.current_chunk_info.payload.len() == whole_msg_length { self.chunk_read_state = ChunkReadState::Finish; + //get the complete chunk and clear the current chunk payload let chunk_info = self.current_chunk_info.clone(); self.current_chunk_info.payload.clear(); let csid = self.current_chunk_info.basic_header.chunk_stream_id; - - //todo - if let Some(header) = self.chunk_headers.get_mut(&csid) { - header.basic_header = self.current_chunk_info.basic_header.clone(); - header.message_header = self.current_chunk_info.message_header.clone(); - } else { - let chunk_header = ChunkHeader { - basic_header: self.current_chunk_info.basic_header.clone(), - message_header: self.current_chunk_info.message_header.clone(), - }; - self.chunk_headers.insert(csid, chunk_header); - } - - // self.chunk_headers - // .entry(self.current_chunk_info.basic_header.chunk_stream_id) - // .or_insert(chunk_header); + self.chunk_message_headers + .insert(csid, self.current_chunk_info.message_header.clone()); return Ok(UnpackResult::ChunkInfo(chunk_info)); } @@ -617,6 +649,18 @@ mod tests { ) } + #[test] + fn test_overflow_add() { + let aa: u32 = u32::MAX; + println!("{}", aa); + + let (a, b) = aa.overflowing_add(5); + + let b = aa.wrapping_add(5); + + println!("{}", b); + } + // #[test] // fn test_window_acknowlage_size_set_peer_bandwidth() { // let mut unpacker = ChunkUnpacketizer::new();