Skip to content

Commit

Permalink
Improve loading topic state for expiry and max size
Browse files Browse the repository at this point in the history
  • Loading branch information
spetz committed Sep 3, 2024
1 parent d3029dc commit cd2d001
Show file tree
Hide file tree
Showing 6 changed files with 39 additions and 43 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion server/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "server"
version = "0.4.22"
version = "0.4.30"
edition = "2021"
build = "src/build.rs"

Expand Down
4 changes: 1 addition & 3 deletions server/server.http
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
@root_password = iggy
@user1_username = user1
@user1_password = secret
@access_token = secret
@access_token = eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJqdGkiOiIwMTkxYjZmNS0yY2RiLTdkNTMtODdkYi00YmE1YWVjZjdhYmYiLCJpc3MiOiJpZ2d5LnJzIiwiYXVkIjoiaWdneS5ycyIsInN1YiI6MSwiaWF0IjoxNzI1MzUxNDA2LCJleHAiOjE3MjUzNTUwMDYsIm5iZiI6MTcyNTM1MTQwNn0.r8RntPmJoRNUYohP1MRf3y1rrWRBG87lhK8VvxB_cAc
@root_id = 1
@user1_id = 2
@pat_name = dev_token
Expand Down Expand Up @@ -188,7 +188,6 @@ Authorization: Bearer {{access_token}}
Content-Type: application/json

{
"stream_id": {{stream_id}},
"name": "stream1"
}

Expand Down Expand Up @@ -223,7 +222,6 @@ Authorization: Bearer {{access_token}}
Content-Type: application/json

{
"topic_id": {{topic_id}},
"name": "topic1",
"compression_algorithm": "none",
"partitions_count": 3,
Expand Down
30 changes: 3 additions & 27 deletions server/src/streaming/streams/topics.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use crate::configs::system::SystemConfig;
use crate::streaming::streams::stream::Stream;
use crate::streaming::topics::topic::Topic;
use iggy::compression::compression_algorithm::CompressionAlgorithm;
Expand Down Expand Up @@ -27,7 +26,7 @@ impl Stream {
max_topic_size: MaxTopicSize,
replication_factor: u8,
) -> Result<u32, IggyError> {
let max_topic_size = get_max_topic_size(max_topic_size, &self.config)?;
let max_topic_size = Topic::get_max_topic_size(max_topic_size, &self.config)?;
let name = text::to_lowercase_non_whitespace(name);
if self.topics_ids.contains_key(&name) {
return Err(IggyError::TopicNameAlreadyExists(name, self.stream_id));
Expand Down Expand Up @@ -85,7 +84,8 @@ impl Stream {
max_topic_size: MaxTopicSize,
replication_factor: u8,
) -> Result<(), IggyError> {
let max_topic_size = get_max_topic_size(max_topic_size, &self.config)?;
let message_expiry = Topic::get_message_expiry(message_expiry, &self.config);
let max_topic_size = Topic::get_max_topic_size(max_topic_size, &self.config)?;
let topic_id;
{
let topic = self.get_topic(id)?;
Expand Down Expand Up @@ -115,11 +115,6 @@ impl Stream {
self.topics_ids.insert(updated_name.clone(), topic_id);
let topic = self.get_topic_mut(id)?;

let message_expiry = match message_expiry {
IggyExpiry::ServerDefault => topic.config.segment.message_expiry,
_ => message_expiry,
};

topic.name = updated_name;
topic.message_expiry = message_expiry;
topic.compression_algorithm = compression_algorithm;
Expand Down Expand Up @@ -229,25 +224,6 @@ impl Stream {
}
}

fn get_max_topic_size(
max_topic_size: MaxTopicSize,
config: &SystemConfig,
) -> Result<MaxTopicSize, IggyError> {
match max_topic_size {
MaxTopicSize::ServerDefault => Ok(config.topic.max_size),
_ => {
if max_topic_size.as_bytes_u64() >= config.segment.size.as_bytes_u64() {
Ok(max_topic_size)
} else {
Err(IggyError::InvalidTopicSize(
max_topic_size,
config.segment.size,
))
}
}
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
8 changes: 5 additions & 3 deletions server/src/streaming/topics/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,12 @@ impl TopicStorage for FileTopicStorage {
return Err(IggyError::TopicIdNotFound(topic.topic_id, topic.stream_id));
}

let message_expiry = Topic::get_message_expiry(state.message_expiry, &topic.config);
let max_topic_size = Topic::get_max_topic_size(state.max_topic_size, &topic.config)?;
topic.created_at = state.created_at;
topic.message_expiry = state.message_expiry;
topic.message_expiry = message_expiry;
topic.max_topic_size = max_topic_size;
topic.compression_algorithm = state.compression_algorithm;
topic.max_topic_size = state.max_topic_size;
topic.replication_factor = state.replication_factor.unwrap_or(1);

let dir_entries = fs::read_dir(&topic.partitions_path).await
Expand Down Expand Up @@ -88,7 +90,7 @@ impl TopicStorage for FileTopicStorage {
false,
topic.config.clone(),
topic.storage.clone(),
topic.message_expiry,
message_expiry,
topic.messages_count_of_parent_stream.clone(),
topic.messages_count.clone(),
topic.size_of_parent_stream.clone(),
Expand Down
36 changes: 28 additions & 8 deletions server/src/streaming/topics/topic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,15 +111,9 @@ impl Topic {
consumer_groups_ids: HashMap::new(),
current_consumer_group_id: AtomicU32::new(1),
current_partition_id: AtomicU32::new(1),
message_expiry: match message_expiry {
IggyExpiry::ServerDefault => config.segment.message_expiry,
_ => message_expiry,
},
message_expiry: Topic::get_message_expiry(message_expiry, &config),
max_topic_size: Topic::get_max_topic_size(max_topic_size, &config)?,
compression_algorithm,
max_topic_size: match max_topic_size {
MaxTopicSize::ServerDefault => config.topic.max_size,
_ => max_topic_size,
},
replication_factor,
config,
created_at: IggyTimestamp::now(),
Expand Down Expand Up @@ -214,6 +208,32 @@ impl Topic {
}
}
}

pub fn get_max_topic_size(
max_topic_size: MaxTopicSize,
config: &SystemConfig,
) -> Result<MaxTopicSize, IggyError> {
match max_topic_size {
MaxTopicSize::ServerDefault => Ok(config.topic.max_size),
_ => {
if max_topic_size.as_bytes_u64() >= config.segment.size.as_bytes_u64() {
Ok(max_topic_size)
} else {
Err(IggyError::InvalidTopicSize(
max_topic_size,
config.segment.size,
))
}
}
}
}

pub fn get_message_expiry(message_expiry: IggyExpiry, config: &SystemConfig) -> IggyExpiry {
match message_expiry {
IggyExpiry::ServerDefault => config.segment.message_expiry,
_ => message_expiry,
}
}
}

impl fmt::Display for Topic {
Expand Down

0 comments on commit cd2d001

Please sign in to comment.