Skip to content

Commit

Permalink
chore: add generic signature to topic controller to make it easier to…
Browse files Browse the repository at this point in the history
… test (#3445)

Remove tech debt.  TopicController default to only work in K8 metadata.  This changes add generic signature similar to other controllers which will make it easier to unit test
  • Loading branch information
sehz committed Aug 4, 2023
1 parent f91ece0 commit 7d0a0f6
Show file tree
Hide file tree
Showing 12 changed files with 156 additions and 93 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion crates/fluvio-controlplane-metadata/src/topic/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use tracing::debug;
use async_trait::async_trait;

use crate::store::{MetadataStoreObject, LocalStore};
use crate::core::{MetadataItem};
use crate::core::MetadataItem;
use crate::partition::store::{PartitionLocalStore, PartitionMetadata};
use crate::partition::*;
use super::*;
Expand Down
2 changes: 1 addition & 1 deletion crates/fluvio-sc/src/controllers/partitions/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use fluvio_future::task::spawn;
use fluvio_controlplane_metadata::core::MetadataItem;
use fluvio_controlplane_metadata::store::k8::K8MetaItem;

use crate::stores::{StoreContext};
use crate::stores::StoreContext;
use crate::stores::partition::PartitionSpec;
use crate::stores::spu::SpuSpec;

Expand Down
14 changes: 9 additions & 5 deletions crates/fluvio-sc/src/controllers/topics/actions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,18 @@
//!
use std::fmt;

use crate::controllers::partitions::PartitionWSAction;
use fluvio_controlplane_metadata::{topic::TopicSpec, partition::PartitionSpec};
use fluvio_stream_model::{store::k8::K8MetaItem, core::MetadataItem};

use super::*;
use crate::stores::actions::WSAction;

#[derive(Debug, Default)]
pub struct TopicActions {
pub topics: Vec<TopicWSAction>,
pub partitions: Vec<PartitionWSAction>,
pub struct TopicActions<C = K8MetaItem>
where
C: MetadataItem + Send + Sync,
{
pub topics: Vec<WSAction<TopicSpec, C>>,
pub partitions: Vec<WSAction<PartitionSpec, C>>,
}

impl fmt::Display for TopicActions {
Expand Down
25 changes: 18 additions & 7 deletions crates/fluvio-sc/src/controllers/topics/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
//!
//! Reconcile Topics
use fluvio_stream_model::core::MetadataItem;
use fluvio_stream_model::store::ChangeListener;
use fluvio_stream_model::store::k8::K8MetaItem;
use tracing::debug;
use tracing::instrument;

Expand All @@ -11,18 +14,21 @@ use fluvio_future::task::spawn;
use crate::core::SharedContext;
use crate::stores::topic::TopicSpec;
use crate::stores::partition::PartitionSpec;
use crate::stores::{StoreContext, K8ChangeListener};
use crate::stores::StoreContext;

use super::reducer::TopicReducer;

#[derive(Debug)]
pub struct TopicController {
topics: StoreContext<TopicSpec>,
partitions: StoreContext<PartitionSpec>,
reducer: TopicReducer,
pub struct TopicController<C = K8MetaItem>
where
C: MetadataItem + Send + Sync,
{
topics: StoreContext<TopicSpec, C>,
partitions: StoreContext<PartitionSpec, C>,
reducer: TopicReducer<C>,
}

impl TopicController {
impl TopicController<K8MetaItem> {
/// streaming coordinator controller constructor
pub fn start(ctx: SharedContext) {
let topics = ctx.topics().clone();
Expand All @@ -40,7 +46,12 @@ impl TopicController {

spawn(controller.dispatch_loop());
}
}

impl<C> TopicController<C>
where
C: MetadataItem + Send + Sync,
{
#[instrument(name = "TopicController", skip(self))]
async fn dispatch_loop(mut self) {
use std::time::Duration;
Expand Down Expand Up @@ -70,7 +81,7 @@ impl TopicController {
}

#[instrument(skip(self, listener))]
async fn sync_topics(&mut self, listener: &mut K8ChangeListener<TopicSpec>) {
async fn sync_topics(&mut self, listener: &mut ChangeListener<TopicSpec, C>) {
if !listener.has_change() {
debug!("no change");
return;
Expand Down
9 changes: 0 additions & 9 deletions crates/fluvio-sc/src/controllers/topics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,3 @@ mod policy;
pub use self::actions::*;
pub use self::controller::*;
pub use self::policy::*;
pub use common::*;

mod common {

use ::fluvio_controlplane_metadata::topic::TopicSpec;
use crate::stores::actions::WSAction;

pub type TopicWSAction = WSAction<TopicSpec>;
}
106 changes: 73 additions & 33 deletions crates/fluvio-sc/src/controllers/topics/policy.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::fmt;
use std::collections::BTreeMap;

use fluvio_stream_model::core::MetadataItem;
use tracing::{debug, trace, instrument};
use rand::thread_rng;
use rand::Rng;
Expand All @@ -16,7 +17,10 @@ use crate::stores::spu::*;
/// Validate assigned topic spec parameters and update topic status
/// * error is passed to the topic reason.
///
pub fn validate_assigned_topic_parameters(partition_map: &PartitionMaps) -> TopicNextState {
pub fn validate_assigned_topic_parameters<C>(partition_map: &PartitionMaps) -> TopicNextState<C>
where
C: MetadataItem + Send + Sync,
{
if let Err(err) = partition_map.validate() {
TopicStatus::next_resolution_invalid_config(err.to_string()).into()
} else {
Expand All @@ -28,7 +32,10 @@ pub fn validate_assigned_topic_parameters(partition_map: &PartitionMaps) -> Topi
/// Validate computed topic spec parameters and update topic status
/// * error is passed to the topic reason.
///
pub fn validate_computed_topic_parameters(param: &TopicReplicaParam) -> TopicNextState {
pub fn validate_computed_topic_parameters<C>(param: &TopicReplicaParam) -> TopicNextState<C>
where
C: MetadataItem + Send + Sync,
{
if let Err(err) = ReplicaSpec::valid_partition(&param.partitions) {
TopicStatus::next_resolution_invalid_config(err.to_string()).into()
} else if let Err(err) = ReplicaSpec::valid_replication_factor(&param.replication_factor) {
Expand All @@ -44,10 +51,13 @@ pub fn validate_computed_topic_parameters(param: &TopicReplicaParam) -> TopicNex
/// * fatal error configuration errors and are not recoverable
///
#[instrument(level = "trace", skip(spus, param))]
pub async fn generate_replica_map(
spus: &SpuAdminStore,
pub async fn generate_replica_map<C>(
spus: &SpuLocalStore<C>,
param: &TopicReplicaParam,
) -> TopicNextState {
) -> TopicNextState<C>
where
C: MetadataItem + Send + Sync,
{
let spu_count = spus.count().await as ReplicationFactor;
if spu_count < param.replication_factor {
trace!(
Expand All @@ -74,10 +84,13 @@ pub async fn generate_replica_map(
/// update topic status to ok. otherwise, mark as waiting for live SPUs
///
#[instrument(skip(partition_maps, spu_store))]
pub async fn update_replica_map_for_assigned_topic(
pub async fn update_replica_map_for_assigned_topic<C>(
partition_maps: &PartitionMaps,
spu_store: &SpuAdminStore,
) -> TopicNextState {
spu_store: &SpuLocalStore<C>,
) -> TopicNextState<C>
where
C: MetadataItem + Send + Sync,
{
let partition_map_spus = partition_maps.unique_spus_in_partition_map();
let spus_id = spu_store.spu_ids().await;

Expand All @@ -99,20 +112,29 @@ pub async fn update_replica_map_for_assigned_topic(

/// values for next state
#[derive(Default, Debug)]
pub struct TopicNextState {
pub struct TopicNextState<C>
where
C: MetadataItem + Send + Sync,
{
pub resolution: TopicResolution,
pub reason: String,
pub replica_map: ReplicaMap,
pub partitions: Vec<PartitionAdminMd>,
pub partitions: Vec<PartitionMetadata<C>>,
}

impl fmt::Display for TopicNextState {
impl<C> fmt::Display for TopicNextState<C>
where
C: MetadataItem + Send + Sync,
{
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "{:#?}", self.resolution)
}
}

impl From<(TopicResolution, String)> for TopicNextState {
impl<C> From<(TopicResolution, String)> for TopicNextState<C>
where
C: MetadataItem + Send + Sync,
{
fn from(val: (TopicResolution, String)) -> Self {
let (resolution, reason) = val;
Self {
Expand All @@ -123,7 +145,10 @@ impl From<(TopicResolution, String)> for TopicNextState {
}
}

impl From<((TopicResolution, String), ReplicaMap)> for TopicNextState {
impl<C> From<((TopicResolution, String), ReplicaMap)> for TopicNextState<C>
where
C: MetadataItem + Send + Sync,
{
fn from(val: ((TopicResolution, String), ReplicaMap)) -> Self {
let ((resolution, reason), replica_map) = val;
Self {
Expand All @@ -135,8 +160,11 @@ impl From<((TopicResolution, String), ReplicaMap)> for TopicNextState {
}
}

impl From<((TopicResolution, String), Vec<PartitionAdminMd>)> for TopicNextState {
fn from(val: ((TopicResolution, String), Vec<PartitionAdminMd>)) -> Self {
impl<C> From<((TopicResolution, String), Vec<PartitionMetadata<C>>)> for TopicNextState<C>
where
C: MetadataItem + Send + Sync,
{
fn from(val: ((TopicResolution, String), Vec<PartitionMetadata<C>>)) -> Self {
let ((resolution, reason), partitions) = val;
Self {
resolution,
Expand All @@ -147,9 +175,12 @@ impl From<((TopicResolution, String), Vec<PartitionAdminMd>)> for TopicNextState
}
}

impl TopicNextState {
impl<C> TopicNextState<C>
where
C: MetadataItem + Send + Sync,
{
/// apply this state to topic and return set of partitions
pub fn apply_as_next_state(self, topic: &mut TopicAdminMd) -> Vec<PartitionAdminMd> {
pub fn apply_as_next_state(self, topic: &mut TopicMetadata<C>) -> Vec<PartitionMetadata<C>> {
topic.status.resolution = self.resolution;
topic.status.reason = self.reason;
if !self.replica_map.is_empty() {
Expand All @@ -159,7 +190,7 @@ impl TopicNextState {
}

/// create same next state as given topic
pub fn same_next_state(topic: &TopicAdminMd) -> TopicNextState {
pub fn same_next_state(topic: &TopicMetadata<C>) -> TopicNextState<C> {
TopicNextState {
resolution: topic.status.resolution.clone(),
..Default::default()
Expand All @@ -168,10 +199,10 @@ impl TopicNextState {

/// given topic, compute next state
pub async fn compute_next_state(
topic: &TopicAdminMd,
spu_store: &SpuAdminStore,
partition_store: &PartitionAdminStore,
) -> TopicNextState {
topic: &TopicMetadata<C>,
spu_store: &SpuLocalStore<C>,
partition_store: &PartitionLocalStore<C>,
) -> TopicNextState<C> {
match topic.spec().replicas() {
// Computed Topic
ReplicaSpec::Computed(ref param) => match topic.status.resolution {
Expand Down Expand Up @@ -236,11 +267,14 @@ impl TopicNextState {
/// Generate replica map for a specific topic
///
#[instrument(level = "trace", skip(spus, param, from_index))]
pub async fn generate_replica_map_for_topic(
spus: &SpuAdminStore,
pub async fn generate_replica_map_for_topic<C>(
spus: &SpuLocalStore<C>,
param: &TopicReplicaParam,
from_index: Option<u32>,
) -> ReplicaMap {
) -> ReplicaMap
where
C: MetadataItem + Send + Sync,
{
let in_rack_count = spus.spus_in_rack_count().await;

// generate partition map (with our without rack assignment)
Expand All @@ -254,14 +288,17 @@ pub async fn generate_replica_map_for_topic(
///
/// Generate partitions on spus that have been assigned to racks
///
async fn generate_partitions_with_rack_assignment(
spus: &SpuAdminStore,
async fn generate_partitions_with_rack_assignment<C>(
spus: &SpuLocalStore<C>,
param: &TopicReplicaParam,
start_index: Option<u32>,
) -> ReplicaMap {
) -> ReplicaMap
where
C: MetadataItem + Send + Sync,
{
let mut partition_map: ReplicaMap = BTreeMap::new();
let rack_map = SpuAdminStore::live_spu_rack_map_sorted(spus).await;
let spu_list = SpuAdminStore::online_spus_in_rack(&rack_map);
let rack_map = SpuLocalStore::live_spu_rack_map_sorted(spus).await;
let spu_list = SpuLocalStore::<C>::online_spus_in_rack(&rack_map);
let spu_cnt = spus.online_spu_count().await;

let s_idx = start_index.unwrap_or_else(|| thread_rng().gen_range(0..spu_cnt));
Expand All @@ -281,11 +318,14 @@ async fn generate_partitions_with_rack_assignment(
///
/// Generate partitions without taking rack assignments into consideration
///
async fn generate_partitions_without_rack(
spus: &SpuAdminStore,
async fn generate_partitions_without_rack<C>(
spus: &SpuLocalStore<C>,
param: &TopicReplicaParam,
start_index: Option<u32>,
) -> ReplicaMap {
) -> ReplicaMap
where
C: MetadataItem + Send + Sync,
{
let mut partition_map: ReplicaMap = BTreeMap::new();
let spu_cnt = spus.spu_used_for_replica().await as u32;
let spu_ids = spus.spu_ids().await;
Expand Down
Loading

0 comments on commit 7d0a0f6

Please sign in to comment.