Skip to content

Commit

Permalink
Add index rebuilding functionality
Browse files Browse the repository at this point in the history
This commit introduces the `IndexRebuilder` struct to rebuild index files
when they are missing or are corrupted.

If one wants to trigger index rebuilding, .index files should be
removed from local_data directory.
  • Loading branch information
hubcio committed Dec 28, 2024
1 parent 5e09ab9 commit b9dfd49
Show file tree
Hide file tree
Showing 10 changed files with 140 additions and 157 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion server/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "server"
version = "0.4.90"
version = "0.4.91"
edition = "2021"
build = "src/build.rs"

Expand Down
130 changes: 0 additions & 130 deletions server/src/compat/index_conversion/index_converter.rs

This file was deleted.

3 changes: 0 additions & 3 deletions server/src/compat/index_conversion/mod.rs

This file was deleted.

98 changes: 98 additions & 0 deletions server/src/compat/index_rebuilding/index_rebuilder.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
use crate::streaming::utils::file;
use crate::{
server_error::CompatError, streaming::batching::message_batch::RETAINED_BATCH_OVERHEAD,
};
use std::io::SeekFrom;
use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt, BufReader, BufWriter};

// Same struct as RetainedMessageBatch, but without payload
pub struct BatchHeader {
base_offset: u64,
last_offset_delta: u32,
max_timestamp: u64,
length: u32,
}

pub struct IndexRebuilder {
pub log_path: String,
pub index_path: String,
pub start_offset: u64,
}

impl IndexRebuilder {
pub fn new(log_path: String, index_path: String, start_offset: u64) -> Self {
Self {
log_path,
index_path,
start_offset,
}
}

async fn read_batch_header(
reader: &mut BufReader<tokio::fs::File>,
) -> Result<BatchHeader, std::io::Error> {
let base_offset = reader.read_u64_le().await?;
let length = reader.read_u32_le().await?;
let last_offset_delta = reader.read_u32_le().await?;
let max_timestamp = reader.read_u64_le().await?;

Ok(BatchHeader {
base_offset,
length,
last_offset_delta,
max_timestamp,
})
}

async fn write_index_entry(
writer: &mut BufWriter<tokio::fs::File>,
header: &BatchHeader,
position: u32,
start_offset: u64,
) -> Result<(), CompatError> {
// Write offset (4 bytes) - base_offset + last_offset_delta - start_offset
let offset = (header.base_offset + header.last_offset_delta as u64 - start_offset) as u32;
writer
.write_u32_le(offset)
.await?;

// Write position (4 bytes)
writer.write_u32_le(position).await?;

// Write timestamp (8 bytes)
writer.write_u64_le(header.max_timestamp).await?;

Ok(())
}

pub async fn rebuild(&self) -> Result<(), CompatError> {
let mut reader = BufReader::new(file::open(&self.log_path).await?);
let mut writer = BufWriter::new(file::overwrite(&self.index_path).await?);
let mut position = 0;
let mut next_position;

loop {
match Self::read_batch_header(&mut reader).await {
Ok(header) => {
// Calculate next position before writing current entry
next_position = position + RETAINED_BATCH_OVERHEAD as u32 + header.length;

// Write index entry using current position
Self::write_index_entry(&mut writer, &header, position, self.start_offset)
.await?;

// Skip batch messages
reader.seek(SeekFrom::Current(header.length as i64)).await?;

// Update position for next iteration
position = next_position;
}
Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => break,
Err(e) => return Err(e.into()),
}
}

writer.flush().await?;
Ok(())
}
}
1 change: 1 addition & 0 deletions server/src/compat/index_rebuilding/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
pub mod index_rebuilder;
2 changes: 1 addition & 1 deletion server/src/compat/mod.rs
Original file line number Diff line number Diff line change
@@ -1 +1 @@
pub mod index_conversion;
pub mod index_rebuilding;
55 changes: 36 additions & 19 deletions server/src/streaming/partitions/storage.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::compat::index_conversion::index_converter::IndexConverter;
use crate::compat::index_rebuilding::index_rebuilder::IndexRebuilder;
use crate::state::system::PartitionState;
use crate::streaming::batching::batch_accumulator::BatchAccumulator;
use crate::streaming::partitions::partition::{ConsumerOffset, Partition};
Expand All @@ -17,7 +17,7 @@ use std::sync::atomic::Ordering;
use std::sync::Arc;
use tokio::fs;
use tokio::fs::create_dir;
use tokio::io::AsyncReadExt;
use tokio::io::{AsyncReadExt, BufReader};
use tracing::{error, info, trace, warn};

#[derive(Debug)]
Expand Down Expand Up @@ -98,24 +98,41 @@ impl PartitionStorage for FilePartitionStorage {
);

let index_path = segment.index_path.to_owned();
let log_path = segment.log_path.to_owned();
let time_index_path = index_path.replace(INDEX_EXTENSION, "timeindex");

let index_converter = IndexConverter::new(index_path, time_index_path);
if let Ok(true) = index_converter.needs_migration().await {
match index_converter.migrate().await {
Ok(_) => {
info!(
"Migrated indexes for partition with ID: {} for stream with ID: {} and topic with ID: {}.",
partition.partition_id, partition.stream_id, partition.topic_id
);
}
Err(err) => {
error!(
"Failed to migrate indexes for partition with ID: {} for stream with ID: {} and topic with ID: {}. Error: {}",
partition.partition_id, partition.stream_id, partition.topic_id, err
);
}
}
let index_cache_enabled = partition.config.segment.cache_indexes;

let index_path_exists = tokio::fs::try_exists(&index_path).await.unwrap();
let time_index_path_exists = tokio::fs::try_exists(&time_index_path).await.unwrap();

// Rebuild indexes in 2 cases:
// 1. Index cache is enabled and index at path does not exists.
// 2. Index cache is enabled and time index at path exists.
if index_cache_enabled && (!index_path_exists || time_index_path_exists) {
warn!(
"Index at path {} does not exist, rebuilding it based on {}...",
index_path, log_path
);
let now = tokio::time::Instant::now();
let index_rebuilder = IndexRebuilder::new(log_path.clone(), index_path.clone(), start_offset);
index_rebuilder.rebuild().await.unwrap_or_else(|e| {
panic!(
"Failed to rebuild index for partition with ID: {} for
stream with ID: {} and topic with ID: {}. Error: {e}",
partition.partition_id, partition.stream_id, partition.topic_id,
)
});
info!(
"Rebuilding index for path {} finished, it took {} ms",
index_path,
now.elapsed().as_millis()
);
}

// Remove legacy time index if it exists.
if time_index_path_exists {
tokio::fs::remove_file(&time_index_path).await.unwrap();
}

segment
Expand Down Expand Up @@ -330,7 +347,7 @@ impl PartitionStorage for FilePartitionStorage {
.overwrite(&offset.path, &offset.offset.to_le_bytes())
.await
.with_error(|_| format!(
"{COMPONENT} - failed to ovewrite consumer offset with value: {}, kind: {}, consumer ID: {}, path: {}",
"{COMPONENT} - failed to overwrite consumer offset with value: {}, kind: {}, consumer ID: {}, path: {}",
offset.offset, offset.kind, offset.consumer_id, offset.path,
))?;
trace!(
Expand Down
2 changes: 1 addition & 1 deletion server/src/streaming/segments/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ impl Segment {
.load_message_batches(self, index_range)
.await
.with_error(|_| format!(
"STREAMING_SEGMENT - failed to load message batches, stream ID: {}, topic ID: {}, partition ID: {}, startf offset: {}, end offset: {}",
"STREAMING_SEGMENT - failed to load message batches, stream ID: {}, topic ID: {}, partition ID: {}, start offset: {}, end offset: {}",
self.stream_id, self.topic_id, self.partition_id, start_offset, end_offset,
))?
.iter()
Expand Down
2 changes: 1 addition & 1 deletion server/src/streaming/topics/persistence.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ impl Topic {
let mut partition = partition.write().await;
let partition_id = partition.partition_id;
for segment in partition.get_segments_mut() {
saved_messages_number += segment.persist_messages().await.with_error(|_| format!("{COMPONENT} - failed to persist messages in segment, partiton ID: {partition_id}"))?;
saved_messages_number += segment.persist_messages().await.with_error(|_| format!("{COMPONENT} - failed to persist messages in segment, partition ID: {partition_id}"))?;
}
}

Expand Down

0 comments on commit b9dfd49

Please sign in to comment.