Skip to content

Commit

Permalink
Improve materialize_batch_and_maybe_update_state (#1381)
Browse files Browse the repository at this point in the history
  • Loading branch information
haze518 authored Dec 8, 2024
1 parent 577f84a commit 20a6d14
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 16 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion server/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "server"
version = "0.4.84"
version = "0.4.85"
edition = "2021"
build = "src/build.rs"

Expand Down
28 changes: 14 additions & 14 deletions server/src/streaming/batching/batch_accumulator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,35 +71,35 @@ impl BatchAccumulator {
pub fn materialize_batch_and_maybe_update_state(&mut self) -> (bool, RetainedMessageBatch) {
let batch_base_offset = self.base_offset;
let batch_last_offset_delta = (self.current_offset - self.base_offset) as u32;
let batch_max_timestamp = self.messages.last().unwrap().timestamp;
let split_point = std::cmp::min(self.capacity as usize, self.messages.len());
let (batch, remainder) = self.messages.as_slice().split_at(split_point);

let mut bytes = BytesMut::with_capacity(self.current_size.as_bytes_u64() as usize);
for message in batch {
let last_batch_timestamp = self
.messages
.get(split_point - 1)
.map_or(0, |msg| msg.timestamp);
for message in self.messages.drain(..split_point) {
message.extend(&mut bytes);
}

let mut remaining_messages = Vec::with_capacity(remainder.len());
let has_remainder = !remainder.is_empty();
let has_remainder = !self.messages.is_empty();
if has_remainder {
self.base_offset = remainder.first().unwrap().offset;
self.current_size = remainder
self.base_offset = self.messages.first().unwrap().offset;
self.current_size = self
.messages
.iter()
.map(|msg| msg.get_size_bytes())
.sum::<IggyByteSize>();
self.current_offset = remainder.last().unwrap().offset;
self.current_timestamp = remainder.last().unwrap().timestamp;
for message in remainder {
remaining_messages.push(message.clone());
}
self.messages = remaining_messages;
self.current_offset = self.messages.last().unwrap().offset;
self.current_timestamp = self.messages.last().unwrap().timestamp;
}

let batch_payload = bytes.freeze();
let batch_payload_len = IggyByteSize::from(batch_payload.len() as u64);
let batch = RetainedMessageBatch::new(
batch_base_offset,
batch_last_offset_delta,
batch_max_timestamp,
last_batch_timestamp,
batch_payload_len,
batch_payload,
);
Expand Down

0 comments on commit 20a6d14

Please sign in to comment.