Skip to content

Commit

Permalink
Improve loading topic state for expiry and max size (#1212)
Browse files Browse the repository at this point in the history
  • Loading branch information
spetz authored Sep 3, 2024
1 parent d3029dc commit 1296365
Show file tree
Hide file tree
Showing 9 changed files with 52 additions and 46 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion server/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "server"
version = "0.4.22"
version = "0.4.30"
edition = "2021"
build = "src/build.rs"

Expand Down
4 changes: 3 additions & 1 deletion server/src/binary/handlers/topics/create_topic_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use iggy::topics::create_topic::CreateTopic;
use tracing::debug;

pub async fn handle(
command: CreateTopic,
mut command: CreateTopic,
sender: &mut dyn Sender,
session: &Session,
system: &SharedSystem,
Expand All @@ -31,6 +31,8 @@ pub async fn handle(
command.replication_factor,
)
.await?;
command.message_expiry = topic.message_expiry;
command.max_topic_size = topic.max_topic_size;
response = mapper::map_topic(topic).await;
}

Expand Down
6 changes: 4 additions & 2 deletions server/src/binary/handlers/topics/update_topic_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,15 @@ use iggy::topics::update_topic::UpdateTopic;
use tracing::debug;

pub async fn handle(
command: UpdateTopic,
mut command: UpdateTopic,
sender: &mut dyn Sender,
session: &Session,
system: &SharedSystem,
) -> Result<(), IggyError> {
debug!("session: {session}, command: {command}");
{
let mut system = system.write().await;
system
let topic = system
.update_topic(
session,
&command.stream_id,
Expand All @@ -28,6 +28,8 @@ pub async fn handle(
command.replication_factor,
)
.await?;
command.message_expiry = topic.message_expiry;
command.max_topic_size = topic.max_topic_size;
}

let system = system.read().await;
Expand Down
6 changes: 5 additions & 1 deletion server/src/http/topics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,8 @@ async fn create_topic(
command.replication_factor,
)
.await?;
command.message_expiry = topic.message_expiry;
command.max_topic_size = topic.max_topic_size;
response = Json(mapper::map_topic(topic).await);
}

Expand All @@ -116,7 +118,7 @@ async fn update_topic(
command.validate()?;
{
let mut system = state.system.write().await;
system
let topic = system
.update_topic(
&Session::stateless(identity.user_id, identity.ip_address),
&command.stream_id,
Expand All @@ -128,6 +130,8 @@ async fn update_topic(
command.replication_factor,
)
.await?;
command.message_expiry = topic.message_expiry;
command.max_topic_size = topic.max_topic_size;
}

let system = state.system.read().await;
Expand Down
30 changes: 3 additions & 27 deletions server/src/streaming/streams/topics.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use crate::configs::system::SystemConfig;
use crate::streaming::streams::stream::Stream;
use crate::streaming::topics::topic::Topic;
use iggy::compression::compression_algorithm::CompressionAlgorithm;
Expand Down Expand Up @@ -27,7 +26,7 @@ impl Stream {
max_topic_size: MaxTopicSize,
replication_factor: u8,
) -> Result<u32, IggyError> {
let max_topic_size = get_max_topic_size(max_topic_size, &self.config)?;
let max_topic_size = Topic::get_max_topic_size(max_topic_size, &self.config)?;
let name = text::to_lowercase_non_whitespace(name);
if self.topics_ids.contains_key(&name) {
return Err(IggyError::TopicNameAlreadyExists(name, self.stream_id));
Expand Down Expand Up @@ -85,7 +84,8 @@ impl Stream {
max_topic_size: MaxTopicSize,
replication_factor: u8,
) -> Result<(), IggyError> {
let max_topic_size = get_max_topic_size(max_topic_size, &self.config)?;
let message_expiry = Topic::get_message_expiry(message_expiry, &self.config);
let max_topic_size = Topic::get_max_topic_size(max_topic_size, &self.config)?;
let topic_id;
{
let topic = self.get_topic(id)?;
Expand Down Expand Up @@ -115,11 +115,6 @@ impl Stream {
self.topics_ids.insert(updated_name.clone(), topic_id);
let topic = self.get_topic_mut(id)?;

let message_expiry = match message_expiry {
IggyExpiry::ServerDefault => topic.config.segment.message_expiry,
_ => message_expiry,
};

topic.name = updated_name;
topic.message_expiry = message_expiry;
topic.compression_algorithm = compression_algorithm;
Expand Down Expand Up @@ -229,25 +224,6 @@ impl Stream {
}
}

fn get_max_topic_size(
max_topic_size: MaxTopicSize,
config: &SystemConfig,
) -> Result<MaxTopicSize, IggyError> {
match max_topic_size {
MaxTopicSize::ServerDefault => Ok(config.topic.max_size),
_ => {
if max_topic_size.as_bytes_u64() >= config.segment.size.as_bytes_u64() {
Ok(max_topic_size)
} else {
Err(IggyError::InvalidTopicSize(
max_topic_size,
config.segment.size,
))
}
}
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
4 changes: 2 additions & 2 deletions server/src/streaming/systems/topics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ impl System {
compression_algorithm: CompressionAlgorithm,
max_topic_size: MaxTopicSize,
replication_factor: Option<u8>,
) -> Result<(), IggyError> {
) -> Result<&Topic, IggyError> {
self.ensure_authenticated(session)?;
{
let topic = self.find_topic(session, stream_id, topic_id)?;
Expand All @@ -116,7 +116,7 @@ impl System {
// TODO: if message_expiry is changed, we need to check if we need to purge messages based on the new expiry
// TODO: if max_size_bytes is changed, we need to check if we need to purge messages based on the new size
// TODO: if replication_factor is changed, we need to do `something`
Ok(())
self.get_stream(stream_id)?.get_topic(topic_id)
}

pub async fn delete_topic(
Expand Down
8 changes: 5 additions & 3 deletions server/src/streaming/topics/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,12 @@ impl TopicStorage for FileTopicStorage {
return Err(IggyError::TopicIdNotFound(topic.topic_id, topic.stream_id));
}

let message_expiry = Topic::get_message_expiry(state.message_expiry, &topic.config);
let max_topic_size = Topic::get_max_topic_size(state.max_topic_size, &topic.config)?;
topic.created_at = state.created_at;
topic.message_expiry = state.message_expiry;
topic.message_expiry = message_expiry;
topic.max_topic_size = max_topic_size;
topic.compression_algorithm = state.compression_algorithm;
topic.max_topic_size = state.max_topic_size;
topic.replication_factor = state.replication_factor.unwrap_or(1);

let dir_entries = fs::read_dir(&topic.partitions_path).await
Expand Down Expand Up @@ -88,7 +90,7 @@ impl TopicStorage for FileTopicStorage {
false,
topic.config.clone(),
topic.storage.clone(),
topic.message_expiry,
message_expiry,
topic.messages_count_of_parent_stream.clone(),
topic.messages_count.clone(),
topic.size_of_parent_stream.clone(),
Expand Down
36 changes: 28 additions & 8 deletions server/src/streaming/topics/topic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,15 +111,9 @@ impl Topic {
consumer_groups_ids: HashMap::new(),
current_consumer_group_id: AtomicU32::new(1),
current_partition_id: AtomicU32::new(1),
message_expiry: match message_expiry {
IggyExpiry::ServerDefault => config.segment.message_expiry,
_ => message_expiry,
},
message_expiry: Topic::get_message_expiry(message_expiry, &config),
max_topic_size: Topic::get_max_topic_size(max_topic_size, &config)?,
compression_algorithm,
max_topic_size: match max_topic_size {
MaxTopicSize::ServerDefault => config.topic.max_size,
_ => max_topic_size,
},
replication_factor,
config,
created_at: IggyTimestamp::now(),
Expand Down Expand Up @@ -214,6 +208,32 @@ impl Topic {
}
}
}

pub fn get_max_topic_size(
max_topic_size: MaxTopicSize,
config: &SystemConfig,
) -> Result<MaxTopicSize, IggyError> {
match max_topic_size {
MaxTopicSize::ServerDefault => Ok(config.topic.max_size),
_ => {
if max_topic_size.as_bytes_u64() >= config.segment.size.as_bytes_u64() {
Ok(max_topic_size)
} else {
Err(IggyError::InvalidTopicSize(
max_topic_size,
config.segment.size,
))
}
}
}
}

pub fn get_message_expiry(message_expiry: IggyExpiry, config: &SystemConfig) -> IggyExpiry {
match message_expiry {
IggyExpiry::ServerDefault => config.segment.message_expiry,
_ => message_expiry,
}
}
}

impl fmt::Display for Topic {
Expand Down

0 comments on commit 1296365

Please sign in to comment.