diff --git a/Cargo.lock b/Cargo.lock index da9ece2b4..21a10a1d5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3838,7 +3838,7 @@ dependencies = [ [[package]] name = "server" -version = "0.4.31" +version = "0.4.40" dependencies = [ "ahash 0.8.11", "anyhow", diff --git a/configs/server.json b/configs/server.json index 82d00f815..f2615d377 100644 --- a/configs/server.json +++ b/configs/server.json @@ -176,6 +176,9 @@ "enabled": false, "max_entries": 1000, "expiry": "1 m" + }, + "recovery": { + "recreate_missing_state": true } } } diff --git a/configs/server.toml b/configs/server.toml index 4867cd419..c42bc732d 100644 --- a/configs/server.toml +++ b/configs/server.toml @@ -392,7 +392,7 @@ validate_checksum = false # The threshold of buffered messages before triggering a save to disk (integer). # Specifies how many messages accumulate before persisting to storage. # Adjusting this can balance between write performance and data durability. -messages_required_to_save = 5_000 +messages_required_to_save = 5000 # Segment configuration [system.segment] @@ -429,3 +429,9 @@ enabled = false max_entries = 1000 # Maximum age of ID entries in the deduplication cache in human-readable format. expiry = "1 m" + + +# Recovery configuration in case of lost data +[system.recovery] +# Controls whether streams/topics/partitions should be recreated if the expected data for existing state is missing (boolean). +recreate_missing_state = true \ No newline at end of file diff --git a/server/Cargo.toml b/server/Cargo.toml index f0c3a9e29..d78474941 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "server" -version = "0.4.31" +version = "0.4.40" edition = "2021" build = "src/build.rs" diff --git a/server/src/configs/defaults.rs b/server/src/configs/defaults.rs index b150bf465..e1d6348a7 100644 --- a/server/src/configs/defaults.rs +++ b/server/src/configs/defaults.rs @@ -9,8 +9,8 @@ use crate::configs::server::{ }; use crate::configs::system::{ BackupConfig, CacheConfig, CompatibilityConfig, CompressionConfig, EncryptionConfig, - LoggingConfig, MessageDeduplicationConfig, PartitionConfig, RuntimeConfig, SegmentConfig, - StateConfig, StreamConfig, SystemConfig, TopicConfig, + LoggingConfig, MessageDeduplicationConfig, PartitionConfig, RecoveryConfig, RuntimeConfig, + SegmentConfig, StateConfig, StreamConfig, SystemConfig, TopicConfig, }; use crate::configs::tcp::{TcpConfig, TcpTlsConfig}; use std::sync::Arc; @@ -282,6 +282,7 @@ impl Default for SystemConfig { state: StateConfig::default(), compression: CompressionConfig::default(), message_deduplication: MessageDeduplicationConfig::default(), + recovery: RecoveryConfig::default(), } } } @@ -430,3 +431,11 @@ impl Default for MessageDeduplicationConfig { } } } + +impl Default for RecoveryConfig { + fn default() -> RecoveryConfig { + RecoveryConfig { + recreate_missing_state: SERVER_CONFIG.system.recovery.recreate_missing_state, + } + } +} diff --git a/server/src/configs/system.rs b/server/src/configs/system.rs index 5660dc91d..7455db3c3 100644 --- a/server/src/configs/system.rs +++ b/server/src/configs/system.rs @@ -25,6 +25,7 @@ pub struct SystemConfig { pub encryption: EncryptionConfig, pub compression: CompressionConfig, pub message_deduplication: MessageDeduplicationConfig, + pub recovery: RecoveryConfig, } #[derive(Debug, Deserialize, Serialize)] @@ -109,6 +110,11 @@ pub struct MessageDeduplicationConfig { pub expiry: IggyDuration, } +#[derive(Debug, Deserialize, Serialize)] +pub struct RecoveryConfig { + pub recreate_missing_state: bool, +} + #[serde_as] #[derive(Debug, Deserialize, Serialize)] pub struct SegmentConfig { diff --git a/server/src/streaming/partitions/partition.rs b/server/src/streaming/partitions/partition.rs index 98ab72060..32626356c 100644 --- a/server/src/streaming/partitions/partition.rs +++ b/server/src/streaming/partitions/partition.rs @@ -168,7 +168,7 @@ impl Partition { } pub fn get_size_bytes(&self) -> u64 { - self.size_bytes.load(std::sync::atomic::Ordering::SeqCst) + self.size_bytes.load(Ordering::SeqCst) } } diff --git a/server/src/streaming/streams/storage.rs b/server/src/streaming/streams/storage.rs index 544389e74..4bf3b45c4 100644 --- a/server/src/streaming/streams/storage.rs +++ b/server/src/streaming/streams/storage.rs @@ -93,7 +93,34 @@ impl StreamStorage for FileStreamStorage { ); } else { error!("Topics with IDs: '{missing_ids:?}' for stream with ID: '{}' were not found on disk.", stream.stream_id); - return Err(IggyError::MissingTopics(stream.stream_id)); + if !stream.config.recovery.recreate_missing_state { + warn!("Recreating missing state in recovery config is disabled, missing topics will not be created for stream with ID: '{}'.", stream.stream_id); + return Err(IggyError::MissingTopics(stream.stream_id)); + } + + info!( + "Recreating missing state in recovery config is enabled, missing topics will be created for stream with ID: '{}'.", + stream.stream_id + ); + for topic_id in missing_ids { + let topic_state = state.topics.get(&topic_id).unwrap(); + let topic = Topic::empty( + stream.stream_id, + topic_id, + &topic_state.name, + stream.size_bytes.clone(), + stream.messages_count.clone(), + stream.segments_count.clone(), + stream.config.clone(), + stream.storage.clone(), + ); + topic.persist().await?; + unloaded_topics.push(topic); + info!( + "Created missing topic with ID: '{}', name: {}, for stream with ID: '{}'.", + topic_id, &topic_state.name, stream.stream_id + ); + } } let loaded_topics = Arc::new(Mutex::new(Vec::new())); diff --git a/server/src/streaming/systems/streams.rs b/server/src/streaming/systems/streams.rs index 443b80416..e37d92733 100644 --- a/server/src/streaming/systems/streams.rs +++ b/server/src/streaming/systems/streams.rs @@ -29,7 +29,7 @@ impl System { return Err(IggyError::CannotReadStreams); } - let mut dir_entries = dir_entries.unwrap(); + let mut dir_entries = dir_entries?; while let Some(dir_entry) = dir_entries.next_entry().await.unwrap_or(None) { let name = dir_entry.file_name().into_string().unwrap(); let stream_id = name.parse::(); @@ -38,7 +38,7 @@ impl System { continue; } - let stream_id = stream_id.unwrap(); + let stream_id = stream_id?; let stream_state = streams.iter().find(|s| s.id == stream_id); if stream_state.is_none() { error!("Stream with ID: '{stream_id}' was not found in state, but exists on disk and will be removed."); @@ -77,7 +77,27 @@ impl System { info!("All streams found on disk were found in state."); } else { error!("Streams with IDs: '{missing_ids:?}' were not found on disk."); - return Err(IggyError::MissingStreams); + if !self.config.recovery.recreate_missing_state { + warn!("Recreating missing state in recovery config is disabled, missing streams will not be created."); + return Err(IggyError::MissingStreams); + } + + info!("Recreating missing state in recovery config is enabled, missing streams will be created."); + for stream_id in missing_ids { + let stream_state = streams.iter().find(|s| s.id == stream_id).unwrap(); + let stream = Stream::create( + stream_id, + &stream_state.name, + self.config.clone(), + self.storage.clone(), + ); + stream.persist().await?; + unloaded_streams.push(stream); + info!( + "Missing stream with ID: '{stream_id}', name: {} was recreated.", + stream_state.name + ); + } } let mut streams_states = streams diff --git a/server/src/streaming/topics/storage.rs b/server/src/streaming/topics/storage.rs index 3d18fba98..6e006e2e3 100644 --- a/server/src/streaming/topics/storage.rs +++ b/server/src/streaming/topics/storage.rs @@ -120,10 +120,45 @@ impl TopicStorage for FileTopicStorage { "Partitions with IDs: '{missing_ids:?}' for topic with ID: '{topic_id}' for stream with ID: '{stream_id}' were not found on disk.", topic_id = topic.topic_id, stream_id = topic.stream_id ); - return Err(IggyError::MissingPartitions( - topic.topic_id, - topic.stream_id, - )); + if !topic.config.recovery.recreate_missing_state { + warn!( + "Recreating missing state in recovery config is disabled, missing partitions will not be created for topic with ID: '{}' for stream with ID: '{}'.", topic.topic_id, topic.stream_id); + return Err(IggyError::MissingPartitions( + topic.topic_id, + topic.stream_id, + )); + } + + info!( + "Recreating missing state in recovery config is enabled, missing partitions will be created for topic with ID: '{}' for stream with ID: '{}'.", + topic.topic_id, topic.stream_id + ); + + for partition_id in missing_ids { + let partition_state = state.partitions.get(&partition_id).unwrap(); + let mut partition = Partition::create( + topic.stream_id, + topic.topic_id, + partition_id, + true, + topic.config.clone(), + topic.storage.clone(), + message_expiry, + topic.messages_count_of_parent_stream.clone(), + topic.messages_count.clone(), + topic.size_of_parent_stream.clone(), + topic.size_bytes.clone(), + topic.segments_count_of_parent_stream.clone(), + partition_state.created_at, + ); + partition.persist().await?; + partition.segments.clear(); + unloaded_partitions.push(partition); + info!( + "Created missing partition with ID: '{partition_id}', for topic with ID: '{}' for stream with ID: '{}'.", + topic.topic_id, topic.stream_id + ); + } } let stream_id = topic.stream_id; @@ -171,7 +206,6 @@ impl TopicStorage for FileTopicStorage { } topic.load_messages_from_disk_to_cache().await?; - info!("Loaded topic {topic}"); Ok(())