Skip to content

Commit

Permalink
chore: clean up partition creation path (#3540)
Browse files Browse the repository at this point in the history
Remove unnecessary partition creation path, which was no op.
Clean up related debugging code
  • Loading branch information
sehz committed Sep 16, 2023
1 parent dfe9b3b commit 60515d6
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 14 deletions.
19 changes: 8 additions & 11 deletions crates/fluvio-sc/src/controllers/topics/policy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,9 @@ pub async fn generate_replica_map<C: MetadataItem>(
) -> TopicNextState<C> {
let spu_count = spus.count().await as ReplicationFactor;
if spu_count < param.replication_factor {
trace!(
"R-MAP needs {:?} online spus, found {:?}",
debug!(
param.replication_factor,
spu_count
spu_count, "R-MAP needs online spus,found"
);

let reason = format!("need {} more SPU", param.replication_factor - spu_count);
Expand Down Expand Up @@ -195,14 +194,8 @@ impl<C: MetadataItem> TopicNextState<C> {
validate_computed_topic_parameters(param)
}
TopicResolution::Pending | TopicResolution::InsufficientResources => {
let mut next_state = generate_replica_map(spu_store, param).await;
if next_state.resolution == TopicResolution::Provisioned {
debug!(
topic = %topic.key(),
"generated replica map for mirror topic"
);
next_state.partitions = topic.create_new_partitions(partition_store).await;
}
let next_state = generate_replica_map(spu_store, param).await;
debug!(?next_state, "generated replica map for computed topic");
next_state
}
_ => {
Expand All @@ -213,6 +206,7 @@ impl<C: MetadataItem> TopicNextState<C> {
);
let mut next_state = TopicNextState::same_next_state(topic);
if next_state.resolution == TopicResolution::Provisioned {
debug!("creating new partitions");
next_state.partitions = topic.create_new_partitions(partition_store).await;
}
next_state
Expand Down Expand Up @@ -309,12 +303,15 @@ async fn generate_partitions_without_rack<C: MetadataItem>(
let s_idx: u32 = start_index.unwrap_or_else(|| thread_rng().gen_range(0..spu_cnt));

let gap_max = spu_cnt - param.replication_factor + 1;
trace!(spu_cnt, ?spu_ids, s_idx, "init");
for p_idx in 0..param.partitions {
let mut replicas: Vec<i32> = vec![];
let gap_cnt = ((s_idx + p_idx) / spu_cnt) % gap_max;
trace!(p_idx, gap_cnt, "partition loop");
for r_idx in 0..param.replication_factor {
let gap = if r_idx != 0 { gap_cnt } else { 0 };
let spu_idx = ((s_idx + p_idx + r_idx + gap) % spu_cnt) as usize;
trace!(r_idx, spu_idx, "replica loop");
replicas.push(spu_ids[spu_idx]);
}
partition_map.insert(p_idx as PartitionId, replicas);
Expand Down
11 changes: 8 additions & 3 deletions crates/fluvio-sc/src/stores/topic/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use fluvio_stream_model::{
store::{MetadataStoreObject, LocalStore},
core::MetadataItem,
};
use tracing::debug;
use tracing::{debug, trace};
use async_trait::async_trait;

use crate::stores::partition::PartitionLocalStore;
Expand Down Expand Up @@ -36,15 +36,20 @@ where
partition_store: &PartitionLocalStore<C>,
) -> Vec<PartitionMetadata<C>> {
let mut partitions = vec![];
for (idx, replicas) in self.status.replica_map.iter() {
let replica_map = &self.status.replica_map;
trace!(?replica_map, "creating new partitions for topic");
for (idx, replicas) in replica_map.iter() {
let replica_key = ReplicaKey::new(self.key(), *idx);
debug!("Topic: {} creating partition: {}", self.key(), replica_key);

let partition_spec = PartitionSpec::from_replicas(replicas.clone(), &self.spec);
if !partition_store.contains_key(&replica_key).await {
debug!(?replica_key, ?partition_spec, "creating new partition");
partitions.push(
MetadataStoreObject::with_spec(replica_key, partition_spec)
.with_context(self.ctx.create_child()),
)
} else {
debug!(?replica_key, "partition already exists");
}
}
partitions
Expand Down

0 comments on commit 60515d6

Please sign in to comment.