diff --git a/Cargo.lock b/Cargo.lock index e3c6352e7..fc657ee15 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4262,7 +4262,7 @@ dependencies = [ [[package]] name = "server" -version = "0.4.84" +version = "0.4.85" dependencies = [ "ahash 0.8.11", "anyhow", diff --git a/server/Cargo.toml b/server/Cargo.toml index 44dbde794..ce64e501e 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "server" -version = "0.4.84" +version = "0.4.85" edition = "2021" build = "src/build.rs" diff --git a/server/src/streaming/batching/batch_accumulator.rs b/server/src/streaming/batching/batch_accumulator.rs index 65f6f8471..eb9b6f25c 100644 --- a/server/src/streaming/batching/batch_accumulator.rs +++ b/server/src/streaming/batching/batch_accumulator.rs @@ -71,35 +71,35 @@ impl BatchAccumulator { pub fn materialize_batch_and_maybe_update_state(&mut self) -> (bool, RetainedMessageBatch) { let batch_base_offset = self.base_offset; let batch_last_offset_delta = (self.current_offset - self.base_offset) as u32; - let batch_max_timestamp = self.messages.last().unwrap().timestamp; let split_point = std::cmp::min(self.capacity as usize, self.messages.len()); - let (batch, remainder) = self.messages.as_slice().split_at(split_point); + let mut bytes = BytesMut::with_capacity(self.current_size.as_bytes_u64() as usize); - for message in batch { + let last_batch_timestamp = self + .messages + .get(split_point - 1) + .map_or(0, |msg| msg.timestamp); + for message in self.messages.drain(..split_point) { message.extend(&mut bytes); } - let mut remaining_messages = Vec::with_capacity(remainder.len()); - let has_remainder = !remainder.is_empty(); + let has_remainder = !self.messages.is_empty(); if has_remainder { - self.base_offset = remainder.first().unwrap().offset; - self.current_size = remainder + self.base_offset = self.messages.first().unwrap().offset; + self.current_size = self + .messages .iter() .map(|msg| msg.get_size_bytes()) .sum::(); - self.current_offset = remainder.last().unwrap().offset; - self.current_timestamp = remainder.last().unwrap().timestamp; - for message in remainder { - remaining_messages.push(message.clone()); - } - self.messages = remaining_messages; + self.current_offset = self.messages.last().unwrap().offset; + self.current_timestamp = self.messages.last().unwrap().timestamp; } + let batch_payload = bytes.freeze(); let batch_payload_len = IggyByteSize::from(batch_payload.len() as u64); let batch = RetainedMessageBatch::new( batch_base_offset, batch_last_offset_delta, - batch_max_timestamp, + last_batch_timestamp, batch_payload_len, batch_payload, );