Skip to content

Commit

Permalink
Recreate missing state if data is somehow lost (#1228)
Browse files Browse the repository at this point in the history
  • Loading branch information
spetz authored Sep 7, 2024
1 parent 38e1a8d commit 72894f7
Show file tree
Hide file tree
Showing 10 changed files with 120 additions and 15 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions configs/server.json
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,9 @@
"enabled": false,
"max_entries": 1000,
"expiry": "1 m"
},
"recovery": {
"recreate_missing_state": true
}
}
}
8 changes: 7 additions & 1 deletion configs/server.toml
Original file line number Diff line number Diff line change
Expand Up @@ -392,7 +392,7 @@ validate_checksum = false
# The threshold of buffered messages before triggering a save to disk (integer).
# Specifies how many messages accumulate before persisting to storage.
# Adjusting this can balance between write performance and data durability.
messages_required_to_save = 5_000
messages_required_to_save = 5000

# Segment configuration
[system.segment]
Expand Down Expand Up @@ -429,3 +429,9 @@ enabled = false
max_entries = 1000
# Maximum age of ID entries in the deduplication cache in human-readable format.
expiry = "1 m"


# Recovery configuration in case of lost data
[system.recovery]
# Controls whether streams/topics/partitions should be recreated if the expected data for existing state is missing (boolean).
recreate_missing_state = true
2 changes: 1 addition & 1 deletion server/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "server"
version = "0.4.31"
version = "0.4.40"
edition = "2021"
build = "src/build.rs"

Expand Down
13 changes: 11 additions & 2 deletions server/src/configs/defaults.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ use crate::configs::server::{
};
use crate::configs::system::{
BackupConfig, CacheConfig, CompatibilityConfig, CompressionConfig, EncryptionConfig,
LoggingConfig, MessageDeduplicationConfig, PartitionConfig, RuntimeConfig, SegmentConfig,
StateConfig, StreamConfig, SystemConfig, TopicConfig,
LoggingConfig, MessageDeduplicationConfig, PartitionConfig, RecoveryConfig, RuntimeConfig,
SegmentConfig, StateConfig, StreamConfig, SystemConfig, TopicConfig,
};
use crate::configs::tcp::{TcpConfig, TcpTlsConfig};
use std::sync::Arc;
Expand Down Expand Up @@ -282,6 +282,7 @@ impl Default for SystemConfig {
state: StateConfig::default(),
compression: CompressionConfig::default(),
message_deduplication: MessageDeduplicationConfig::default(),
recovery: RecoveryConfig::default(),
}
}
}
Expand Down Expand Up @@ -430,3 +431,11 @@ impl Default for MessageDeduplicationConfig {
}
}
}

impl Default for RecoveryConfig {
fn default() -> RecoveryConfig {
RecoveryConfig {
recreate_missing_state: SERVER_CONFIG.system.recovery.recreate_missing_state,
}
}
}
6 changes: 6 additions & 0 deletions server/src/configs/system.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ pub struct SystemConfig {
pub encryption: EncryptionConfig,
pub compression: CompressionConfig,
pub message_deduplication: MessageDeduplicationConfig,
pub recovery: RecoveryConfig,
}

#[derive(Debug, Deserialize, Serialize)]
Expand Down Expand Up @@ -109,6 +110,11 @@ pub struct MessageDeduplicationConfig {
pub expiry: IggyDuration,
}

#[derive(Debug, Deserialize, Serialize)]
pub struct RecoveryConfig {
pub recreate_missing_state: bool,
}

#[serde_as]
#[derive(Debug, Deserialize, Serialize)]
pub struct SegmentConfig {
Expand Down
2 changes: 1 addition & 1 deletion server/src/streaming/partitions/partition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ impl Partition {
}

pub fn get_size_bytes(&self) -> u64 {
self.size_bytes.load(std::sync::atomic::Ordering::SeqCst)
self.size_bytes.load(Ordering::SeqCst)
}
}

Expand Down
29 changes: 28 additions & 1 deletion server/src/streaming/streams/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,34 @@ impl StreamStorage for FileStreamStorage {
);
} else {
error!("Topics with IDs: '{missing_ids:?}' for stream with ID: '{}' were not found on disk.", stream.stream_id);
return Err(IggyError::MissingTopics(stream.stream_id));
if !stream.config.recovery.recreate_missing_state {
warn!("Recreating missing state in recovery config is disabled, missing topics will not be created for stream with ID: '{}'.", stream.stream_id);
return Err(IggyError::MissingTopics(stream.stream_id));
}

info!(
"Recreating missing state in recovery config is enabled, missing topics will be created for stream with ID: '{}'.",
stream.stream_id
);
for topic_id in missing_ids {
let topic_state = state.topics.get(&topic_id).unwrap();
let topic = Topic::empty(
stream.stream_id,
topic_id,
&topic_state.name,
stream.size_bytes.clone(),
stream.messages_count.clone(),
stream.segments_count.clone(),
stream.config.clone(),
stream.storage.clone(),
);
topic.persist().await?;
unloaded_topics.push(topic);
info!(
"Created missing topic with ID: '{}', name: {}, for stream with ID: '{}'.",
topic_id, &topic_state.name, stream.stream_id
);
}
}

let loaded_topics = Arc::new(Mutex::new(Vec::new()));
Expand Down
26 changes: 23 additions & 3 deletions server/src/streaming/systems/streams.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ impl System {
return Err(IggyError::CannotReadStreams);
}

let mut dir_entries = dir_entries.unwrap();
let mut dir_entries = dir_entries?;
while let Some(dir_entry) = dir_entries.next_entry().await.unwrap_or(None) {
let name = dir_entry.file_name().into_string().unwrap();
let stream_id = name.parse::<u32>();
Expand All @@ -38,7 +38,7 @@ impl System {
continue;
}

let stream_id = stream_id.unwrap();
let stream_id = stream_id?;
let stream_state = streams.iter().find(|s| s.id == stream_id);
if stream_state.is_none() {
error!("Stream with ID: '{stream_id}' was not found in state, but exists on disk and will be removed.");
Expand Down Expand Up @@ -77,7 +77,27 @@ impl System {
info!("All streams found on disk were found in state.");
} else {
error!("Streams with IDs: '{missing_ids:?}' were not found on disk.");
return Err(IggyError::MissingStreams);
if !self.config.recovery.recreate_missing_state {
warn!("Recreating missing state in recovery config is disabled, missing streams will not be created.");
return Err(IggyError::MissingStreams);
}

info!("Recreating missing state in recovery config is enabled, missing streams will be created.");
for stream_id in missing_ids {
let stream_state = streams.iter().find(|s| s.id == stream_id).unwrap();
let stream = Stream::create(
stream_id,
&stream_state.name,
self.config.clone(),
self.storage.clone(),
);
stream.persist().await?;
unloaded_streams.push(stream);
info!(
"Missing stream with ID: '{stream_id}', name: {} was recreated.",
stream_state.name
);
}
}

let mut streams_states = streams
Expand Down
44 changes: 39 additions & 5 deletions server/src/streaming/topics/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,10 +120,45 @@ impl TopicStorage for FileTopicStorage {
"Partitions with IDs: '{missing_ids:?}' for topic with ID: '{topic_id}' for stream with ID: '{stream_id}' were not found on disk.",
topic_id = topic.topic_id, stream_id = topic.stream_id
);
return Err(IggyError::MissingPartitions(
topic.topic_id,
topic.stream_id,
));
if !topic.config.recovery.recreate_missing_state {
warn!(
"Recreating missing state in recovery config is disabled, missing partitions will not be created for topic with ID: '{}' for stream with ID: '{}'.", topic.topic_id, topic.stream_id);
return Err(IggyError::MissingPartitions(
topic.topic_id,
topic.stream_id,
));
}

info!(
"Recreating missing state in recovery config is enabled, missing partitions will be created for topic with ID: '{}' for stream with ID: '{}'.",
topic.topic_id, topic.stream_id
);

for partition_id in missing_ids {
let partition_state = state.partitions.get(&partition_id).unwrap();
let mut partition = Partition::create(
topic.stream_id,
topic.topic_id,
partition_id,
true,
topic.config.clone(),
topic.storage.clone(),
message_expiry,
topic.messages_count_of_parent_stream.clone(),
topic.messages_count.clone(),
topic.size_of_parent_stream.clone(),
topic.size_bytes.clone(),
topic.segments_count_of_parent_stream.clone(),
partition_state.created_at,
);
partition.persist().await?;
partition.segments.clear();
unloaded_partitions.push(partition);
info!(
"Created missing partition with ID: '{partition_id}', for topic with ID: '{}' for stream with ID: '{}'.",
topic.topic_id, topic.stream_id
);
}
}

let stream_id = topic.stream_id;
Expand Down Expand Up @@ -171,7 +206,6 @@ impl TopicStorage for FileTopicStorage {
}

topic.load_messages_from_disk_to_cache().await?;

info!("Loaded topic {topic}");

Ok(())
Expand Down

0 comments on commit 72894f7

Please sign in to comment.