From b9dfd49c47efe3ff5df43ca6495f06c7014bc1be Mon Sep 17 00:00:00 2001 From: Hubert Gruszecki Date: Sat, 28 Dec 2024 16:08:27 +0100 Subject: [PATCH] Add index rebuilding functionality This commit introduces the `IndexRebuilder` struct to rebuild index files when they are missing or are corrupted. If one wants to trigger index rebuilding, .index files should be removed from local_data directory. --- Cargo.lock | 2 +- server/Cargo.toml | 2 +- .../index_conversion/index_converter.rs | 130 ------------------ server/src/compat/index_conversion/mod.rs | 3 - .../index_rebuilding/index_rebuilder.rs | 98 +++++++++++++ server/src/compat/index_rebuilding/mod.rs | 1 + server/src/compat/mod.rs | 2 +- server/src/streaming/partitions/storage.rs | 55 +++++--- server/src/streaming/segments/messages.rs | 2 +- server/src/streaming/topics/persistence.rs | 2 +- 10 files changed, 140 insertions(+), 157 deletions(-) delete mode 100644 server/src/compat/index_conversion/index_converter.rs delete mode 100644 server/src/compat/index_conversion/mod.rs create mode 100644 server/src/compat/index_rebuilding/index_rebuilder.rs create mode 100644 server/src/compat/index_rebuilding/mod.rs diff --git a/Cargo.lock b/Cargo.lock index 5e8a15355..22831c420 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4243,7 +4243,7 @@ dependencies = [ [[package]] name = "server" -version = "0.4.90" +version = "0.4.91" dependencies = [ "ahash 0.8.11", "anyhow", diff --git a/server/Cargo.toml b/server/Cargo.toml index 0944d4d7d..7030c0a55 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "server" -version = "0.4.90" +version = "0.4.91" edition = "2021" build = "src/build.rs" diff --git a/server/src/compat/index_conversion/index_converter.rs b/server/src/compat/index_conversion/index_converter.rs deleted file mode 100644 index 59186eeb4..000000000 --- a/server/src/compat/index_conversion/index_converter.rs +++ /dev/null @@ -1,130 +0,0 @@ -use std::path::Path; -use std::time::Duration; - -use crate::compat::index_conversion::COMPONENT; -use crate::streaming::utils::file; -use crate::{server_error::CompatError, streaming::segments::storage::INDEX_SIZE}; -use bytes::{BufMut, BytesMut}; -use error_set::ResultContext; -use tokio::fs::OpenOptions; -use tokio::io::{AsyncReadExt, AsyncWriteExt, BufReader, BufWriter}; -use tokio::time::{sleep, timeout}; -use tracing::{error, info, trace}; - -pub struct IndexConverter { - pub index_path: String, - pub time_index_path: String, -} - -impl IndexConverter { - pub fn new(index_path: String, time_index_path: String) -> Self { - Self { - index_path, - time_index_path, - } - } - - pub async fn migrate(&self) -> Result<(), CompatError> { - let indexes = self.convert_indexes().await?; - self.replace_with_converted(indexes).await?; - - Ok(()) - } - - pub async fn needs_migration(&self) -> Result { - Ok(file::exists(&self.time_index_path).await?) - } - - async fn convert_indexes(&self) -> Result, CompatError> { - let index_file = file::open(&self.index_path).await?; - let time_index_file = file::open(&self.time_index_path).await?; - - let mut index_reader = BufReader::new(index_file); - let mut time_index_reader = BufReader::new(time_index_file); - - let new_index = Vec::new(); - let mut new_index_writer = BufWriter::new(new_index); - - loop { - let relative_offset_result = index_reader.read_u32_le().await; - let position_result = index_reader.read_u32_le().await; - - let time_relative_offset_result = time_index_reader.read_u32_le().await; - let timestamp_result = time_index_reader.read_u64_le().await; - - if relative_offset_result.is_err() - || position_result.is_err() - || time_relative_offset_result.is_err() - || timestamp_result.is_err() - { - trace!("Reached EOF for index file: {}", &self.index_path); - break; - } - - let relative_offset = relative_offset_result?; - let position = position_result?; - let time_relative_offset = time_relative_offset_result?; - let timestamp = timestamp_result?; - - if relative_offset != time_relative_offset { - error!("{COMPONENT} - mismatched relative offsets in normal index: {relative_offset} vs time index: {time_relative_offset}"); - return Err(CompatError::IndexMigrationError); - } - - let mut new_index_entry = BytesMut::with_capacity(INDEX_SIZE as usize); // u32 + u32 + u64 - new_index_entry.put_u32_le(relative_offset); - new_index_entry.put_u32_le(position); - new_index_entry.put_u64_le(timestamp); - - new_index_writer.write_all(&new_index_entry).await?; - } - new_index_writer.flush().await?; - - Ok(new_index_writer.into_inner()) - } - - async fn replace_with_converted(&self, indexes: Vec) -> Result<(), CompatError> { - file::remove(&self.index_path).await?; - file::remove(&self.time_index_path).await?; - - let dir_path = Path::new(&self.index_path).parent().ok_or({ - error!("{COMPONENT} - failed to get parent directory of index file"); - CompatError::IndexMigrationError - })?; - - let dir = OpenOptions::new().read(true).open(dir_path).await?; - - dir.sync_all() - .await - .with_error(|error| { - format!("{COMPONENT} - failed to sync data for directory, error: {error}") - }) - .map_err(|_| CompatError::IndexMigrationError)?; - - let wait_duration = Duration::from_secs(2); - let sleep_duration = Duration::from_millis(10); - - let wait_future = async { - while file::exists(&self.time_index_path).await.unwrap_or(false) { - sleep(sleep_duration).await; - } - info!("File {} has been removed", &self.time_index_path); - }; - - if (timeout(wait_duration, wait_future).await).is_err() { - error!( - "Timeout waiting for file {} to be removed", - &self.time_index_path - ); - } - - let new_index_file = file::overwrite(&self.index_path).await?; - let mut new_index_writer = BufWriter::new(new_index_file); - new_index_writer.write_all(&indexes).await?; - new_index_writer.flush().await?; - - info!("Replaced old index with new index at {}", &self.index_path); - - Ok(()) - } -} diff --git a/server/src/compat/index_conversion/mod.rs b/server/src/compat/index_conversion/mod.rs deleted file mode 100644 index 232ff058a..000000000 --- a/server/src/compat/index_conversion/mod.rs +++ /dev/null @@ -1,3 +0,0 @@ -pub mod index_converter; - -pub const COMPONENT: &str = "INDEX_CONVERSION"; diff --git a/server/src/compat/index_rebuilding/index_rebuilder.rs b/server/src/compat/index_rebuilding/index_rebuilder.rs new file mode 100644 index 000000000..80261b3c9 --- /dev/null +++ b/server/src/compat/index_rebuilding/index_rebuilder.rs @@ -0,0 +1,98 @@ +use crate::streaming::utils::file; +use crate::{ + server_error::CompatError, streaming::batching::message_batch::RETAINED_BATCH_OVERHEAD, +}; +use std::io::SeekFrom; +use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt, BufReader, BufWriter}; + +// Same struct as RetainedMessageBatch, but without payload +pub struct BatchHeader { + base_offset: u64, + last_offset_delta: u32, + max_timestamp: u64, + length: u32, +} + +pub struct IndexRebuilder { + pub log_path: String, + pub index_path: String, + pub start_offset: u64, +} + +impl IndexRebuilder { + pub fn new(log_path: String, index_path: String, start_offset: u64) -> Self { + Self { + log_path, + index_path, + start_offset, + } + } + + async fn read_batch_header( + reader: &mut BufReader, + ) -> Result { + let base_offset = reader.read_u64_le().await?; + let length = reader.read_u32_le().await?; + let last_offset_delta = reader.read_u32_le().await?; + let max_timestamp = reader.read_u64_le().await?; + + Ok(BatchHeader { + base_offset, + length, + last_offset_delta, + max_timestamp, + }) + } + + async fn write_index_entry( + writer: &mut BufWriter, + header: &BatchHeader, + position: u32, + start_offset: u64, + ) -> Result<(), CompatError> { + // Write offset (4 bytes) - base_offset + last_offset_delta - start_offset + let offset = (header.base_offset + header.last_offset_delta as u64 - start_offset) as u32; + writer + .write_u32_le(offset) + .await?; + + // Write position (4 bytes) + writer.write_u32_le(position).await?; + + // Write timestamp (8 bytes) + writer.write_u64_le(header.max_timestamp).await?; + + Ok(()) + } + + pub async fn rebuild(&self) -> Result<(), CompatError> { + let mut reader = BufReader::new(file::open(&self.log_path).await?); + let mut writer = BufWriter::new(file::overwrite(&self.index_path).await?); + let mut position = 0; + let mut next_position; + + loop { + match Self::read_batch_header(&mut reader).await { + Ok(header) => { + // Calculate next position before writing current entry + next_position = position + RETAINED_BATCH_OVERHEAD as u32 + header.length; + + // Write index entry using current position + Self::write_index_entry(&mut writer, &header, position, self.start_offset) + .await?; + + // Skip batch messages + reader.seek(SeekFrom::Current(header.length as i64)).await?; + + // Update position for next iteration + position = next_position; + } + Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => break, + Err(e) => return Err(e.into()), + } + } + + writer.flush().await?; + Ok(()) + } +} diff --git a/server/src/compat/index_rebuilding/mod.rs b/server/src/compat/index_rebuilding/mod.rs new file mode 100644 index 000000000..231068480 --- /dev/null +++ b/server/src/compat/index_rebuilding/mod.rs @@ -0,0 +1 @@ +pub mod index_rebuilder; diff --git a/server/src/compat/mod.rs b/server/src/compat/mod.rs index c25a93c7f..b7f0bf2c1 100644 --- a/server/src/compat/mod.rs +++ b/server/src/compat/mod.rs @@ -1 +1 @@ -pub mod index_conversion; +pub mod index_rebuilding; diff --git a/server/src/streaming/partitions/storage.rs b/server/src/streaming/partitions/storage.rs index 939847131..c08166d74 100644 --- a/server/src/streaming/partitions/storage.rs +++ b/server/src/streaming/partitions/storage.rs @@ -1,4 +1,4 @@ -use crate::compat::index_conversion::index_converter::IndexConverter; +use crate::compat::index_rebuilding::index_rebuilder::IndexRebuilder; use crate::state::system::PartitionState; use crate::streaming::batching::batch_accumulator::BatchAccumulator; use crate::streaming::partitions::partition::{ConsumerOffset, Partition}; @@ -17,7 +17,7 @@ use std::sync::atomic::Ordering; use std::sync::Arc; use tokio::fs; use tokio::fs::create_dir; -use tokio::io::AsyncReadExt; +use tokio::io::{AsyncReadExt, BufReader}; use tracing::{error, info, trace, warn}; #[derive(Debug)] @@ -98,24 +98,41 @@ impl PartitionStorage for FilePartitionStorage { ); let index_path = segment.index_path.to_owned(); + let log_path = segment.log_path.to_owned(); let time_index_path = index_path.replace(INDEX_EXTENSION, "timeindex"); - let index_converter = IndexConverter::new(index_path, time_index_path); - if let Ok(true) = index_converter.needs_migration().await { - match index_converter.migrate().await { - Ok(_) => { - info!( - "Migrated indexes for partition with ID: {} for stream with ID: {} and topic with ID: {}.", - partition.partition_id, partition.stream_id, partition.topic_id - ); - } - Err(err) => { - error!( - "Failed to migrate indexes for partition with ID: {} for stream with ID: {} and topic with ID: {}. Error: {}", - partition.partition_id, partition.stream_id, partition.topic_id, err - ); - } - } + let index_cache_enabled = partition.config.segment.cache_indexes; + + let index_path_exists = tokio::fs::try_exists(&index_path).await.unwrap(); + let time_index_path_exists = tokio::fs::try_exists(&time_index_path).await.unwrap(); + + // Rebuild indexes in 2 cases: + // 1. Index cache is enabled and index at path does not exists. + // 2. Index cache is enabled and time index at path exists. + if index_cache_enabled && (!index_path_exists || time_index_path_exists) { + warn!( + "Index at path {} does not exist, rebuilding it based on {}...", + index_path, log_path + ); + let now = tokio::time::Instant::now(); + let index_rebuilder = IndexRebuilder::new(log_path.clone(), index_path.clone(), start_offset); + index_rebuilder.rebuild().await.unwrap_or_else(|e| { + panic!( + "Failed to rebuild index for partition with ID: {} for + stream with ID: {} and topic with ID: {}. Error: {e}", + partition.partition_id, partition.stream_id, partition.topic_id, + ) + }); + info!( + "Rebuilding index for path {} finished, it took {} ms", + index_path, + now.elapsed().as_millis() + ); + } + + // Remove legacy time index if it exists. + if time_index_path_exists { + tokio::fs::remove_file(&time_index_path).await.unwrap(); } segment @@ -330,7 +347,7 @@ impl PartitionStorage for FilePartitionStorage { .overwrite(&offset.path, &offset.offset.to_le_bytes()) .await .with_error(|_| format!( - "{COMPONENT} - failed to ovewrite consumer offset with value: {}, kind: {}, consumer ID: {}, path: {}", + "{COMPONENT} - failed to overwrite consumer offset with value: {}, kind: {}, consumer ID: {}, path: {}", offset.offset, offset.kind, offset.consumer_id, offset.path, ))?; trace!( diff --git a/server/src/streaming/segments/messages.rs b/server/src/streaming/segments/messages.rs index e836c5942..fde8a7288 100644 --- a/server/src/streaming/segments/messages.rs +++ b/server/src/streaming/segments/messages.rs @@ -181,7 +181,7 @@ impl Segment { .load_message_batches(self, index_range) .await .with_error(|_| format!( - "STREAMING_SEGMENT - failed to load message batches, stream ID: {}, topic ID: {}, partition ID: {}, startf offset: {}, end offset: {}", + "STREAMING_SEGMENT - failed to load message batches, stream ID: {}, topic ID: {}, partition ID: {}, start offset: {}, end offset: {}", self.stream_id, self.topic_id, self.partition_id, start_offset, end_offset, ))? .iter() diff --git a/server/src/streaming/topics/persistence.rs b/server/src/streaming/topics/persistence.rs index 1606f8d5f..37f36167f 100644 --- a/server/src/streaming/topics/persistence.rs +++ b/server/src/streaming/topics/persistence.rs @@ -36,7 +36,7 @@ impl Topic { let mut partition = partition.write().await; let partition_id = partition.partition_id; for segment in partition.get_segments_mut() { - saved_messages_number += segment.persist_messages().await.with_error(|_| format!("{COMPONENT} - failed to persist messages in segment, partiton ID: {partition_id}"))?; + saved_messages_number += segment.persist_messages().await.with_error(|_| format!("{COMPONENT} - failed to persist messages in segment, partition ID: {partition_id}"))?; } }