Skip to content

Commit

Permalink
feat: use new lib for the topic list
Browse files Browse the repository at this point in the history
  • Loading branch information
andrewinci committed Oct 11, 2022
1 parent a117ae0 commit d6d348c
Show file tree
Hide file tree
Showing 12 changed files with 95 additions and 33 deletions.
13 changes: 9 additions & 4 deletions src-tauri/src/api/admin.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
use super::error::{ Result };
use crate::{ configuration::Cluster, kafka::admin::{ list_topics, TopicInfo } };
use log::debug;

use crate::lib::TopicInfo;

use super::{ error::{ Result }, AppState };

#[tauri::command]
pub async fn list_topic(cluster: Cluster, topic: Option<&str>) -> Result<Vec<TopicInfo>> {
Ok(list_topics(&cluster, topic)?)
pub async fn list_topics(cluster_id: String, state: tauri::State<'_, AppState>) -> Result<Vec<TopicInfo>> {
debug!("Retrieve the list of topics");
let cluster = state.get_by_cluster_id(&cluster_id).await;
Ok(cluster.admin_client.list_topics()?)
}
15 changes: 14 additions & 1 deletion src-tauri/src/api/error.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::kafka::error::Error as KafkaError;
use crate::{ kafka::error::Error as KafkaError, lib::Error };
use serde::{ Deserialize, Serialize };
pub type Result<T> = std::result::Result<T, TauriError>;

Expand All @@ -9,6 +9,19 @@ pub struct TauriError {
pub message: String,
}

impl From<Error> for TauriError {
fn from(err: Error) -> Self {
let (error_type, message) = match err {
Error::AvroParse { message } => ("Avro parser error", message),
Error::IOError { message } => ("IO error", message),
Error::JSONSerdeError { message } => ("JSON Serde error", message),
Error::ConsumerError { message } => ("Kafka Consumer error", message),
Error::KafkaError { message } => ("Kafka error", message),
};
TauriError { error_type: error_type.into(), message }
}
}

impl From<KafkaError> for TauriError {
fn from(error: KafkaError) -> Self {
match error {
Expand Down
4 changes: 3 additions & 1 deletion src-tauri/src/api/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,7 @@ pub mod consumer;
mod error;
mod notification;
pub mod schema_registry;
mod state;

pub use notification::notify_error;
pub use notification::notify_error;
pub use state::AppState;
33 changes: 33 additions & 0 deletions src-tauri/src/api/state.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
use std::{ collections::HashMap, sync::Arc };

use futures::lock::Mutex;
use log::debug;

use crate::lib::{ Cluster, ConfigStore };

type ClusterId = String;

#[derive(Default)]
pub struct AppState {
clusters: Arc<Mutex<HashMap<ClusterId, Cluster>>>,
}

impl AppState {
pub async fn get_by_cluster_id(&self, cluster_id: &str) -> Cluster {
let clusters = self.clusters.clone();
let mut map = clusters.lock().await;
if map.get(cluster_id).is_none() {
debug!("Initialise cluster {}", cluster_id);
let configurations = ConfigStore::new().get_configuration().expect("Unable to get the configuration");
let cluster_config = configurations.clusters
.iter()
.find(|c| c.id == cluster_id)
.expect("Unable to find the cluster config");
let cluster = Cluster::new(cluster_config.to_owned());
map.insert(cluster_id.into(), cluster);
}
map.get(cluster_id)
.expect("Something went wrong retrieving a cluster that must be in the clusters vector")
.clone()
}
}
1 change: 1 addition & 0 deletions src-tauri/src/configuration/config_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use dirs::home_dir;
use std::path::PathBuf;
use std::{ fs, path::Path };

#[derive(Default)]
pub struct ConfigStore {}

impl ConfigStore {
Expand Down
3 changes: 2 additions & 1 deletion src-tauri/src/lib/admin/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
mod client;
mod types;

pub use client::{ Admin, KafkaAdmin };
pub use client::{ Admin, KafkaAdmin };
pub use types::*;
30 changes: 12 additions & 18 deletions src-tauri/src/lib/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,41 +5,35 @@ use futures::lock::Mutex;
use crate::{
lib::{
schema_registry::{ SchemaRegistryClient, CachedSchemaRegistry },
consumer::{ Consumer, KafkaConsumer },
consumer::{ Consumer },
admin::{ Admin, KafkaAdmin },
parser::{ Parser, RecordParser },
configuration::{ ClusterConfig, SchemaRegistryConfig },
},
};

type TopicName = String;
#[derive(Clone)]
pub struct Cluster {
config: ClusterConfig,
schema_registry_client: Option<Box<dyn SchemaRegistryClient>>,
consumers: Arc<Mutex<HashMap<TopicName, Box<dyn Consumer>>>>,
admin_client: Box<dyn Admin>,
parser: Box<dyn Parser>,
pub config: ClusterConfig,
pub schema_registry_client: Option<Arc<dyn SchemaRegistryClient + Send + Sync>>,
pub consumers: Arc<Mutex<HashMap<TopicName, Box<dyn Consumer + Send + Sync>>>>,
pub admin_client: Arc<dyn Admin + Send + Sync>,
pub parser: Arc<dyn Parser + Send + Sync>,
}

impl Cluster {
fn new(config: ClusterConfig) -> Cluster {
let cluster_config = config.clone();
pub fn new(config: ClusterConfig) -> Cluster {
//todo: share schema registry client
// build the admin client
let admin_client: Box<dyn Admin> = Box::new(KafkaAdmin::new(&cluster_config));
// build the parser
let parser: Box<dyn Parser> = Box::new(
RecordParser::new(build_schema_registry_client(config.schema_registry.clone()))
);
Cluster {
config: cluster_config,
config: config.clone(),
schema_registry_client: match build_schema_registry_client(config.schema_registry.clone()) {
Some(client) => Some(Box::new(client)),
Some(client) => Some(Arc::new(client)),
None => None,
},
consumers: Arc::new(Mutex::new(HashMap::new())),
admin_client,
parser,
admin_client: Arc::new(KafkaAdmin::new(&config)),
parser: Arc::new(RecordParser::new(build_schema_registry_client(config.schema_registry.clone()))),
}
}
}
Expand Down
9 changes: 7 additions & 2 deletions src-tauri/src/lib/configuration/config_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,15 @@ use dirs::home_dir;
use std::path::PathBuf;
use std::{ fs, path::Path };

#[derive(Default)]
pub struct ConfigStore {}

impl ConfigStore {
pub fn get_configuration() -> Result<InsulatorConfig> {
//todo: cache?
pub fn new() -> ConfigStore {
ConfigStore {}
}
pub fn get_configuration(&self) -> Result<InsulatorConfig> {
let config_path = config_path();
let raw_config = (match Path::exists(&config_path) {
// read file content
Expand All @@ -21,7 +26,7 @@ impl ConfigStore {
}
}

pub fn write_configuration(configuration: &InsulatorConfig) -> Result<()> {
pub fn write_configuration(&self, configuration: &InsulatorConfig) -> Result<()> {
let config_path = config_path();
let raw_config = serde_json::to_string_pretty(&configuration)?;
fs::write(config_path, raw_config)?;
Expand Down
3 changes: 2 additions & 1 deletion src-tauri/src/lib/configuration/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
mod types;
mod config_store;

pub use types::*;
pub use types::*;
pub use config_store::ConfigStore;
7 changes: 6 additions & 1 deletion src-tauri/src/lib/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,9 @@ mod admin;
mod parser;
mod configuration;
mod cluster;
mod error;
mod error;

pub use cluster::Cluster;
pub use admin::{ TopicInfo, PartitionInfo };
pub use configuration::ConfigStore;
pub use error::Error;
7 changes: 4 additions & 3 deletions src-tauri/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ mod kafka;
mod schema_registry;
mod lib;

use api::consumer::AppConsumers;
use api::{ consumer::AppConsumers, AppState };
use crate::api::{
admin::list_topic,
admin::list_topics,
configuration::{ get_configuration, write_configuration },
consumer::{ get_consumer_state, get_record, start_consumer, stop_consumer },
schema_registry::{ get_schema, list_subjects },
Expand All @@ -18,12 +18,13 @@ fn main() {
env_logger::init();
tauri::Builder
::default()
.manage(AppState::default())
.manage(AppConsumers::default())
.invoke_handler(
tauri::generate_handler![
get_configuration,
write_configuration,
list_topic,
list_topics,
list_subjects,
get_schema,
start_consumer,
Expand Down
3 changes: 2 additions & 1 deletion src/tauri.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,10 @@ export const getSchemaVersions = (subjectName: string, config: SchemaRegistry):
/** Kafka API **/

export const getTopicNamesList = (cluster: Cluster): Promise<string[]> =>
invoke<TopicInfo[]>("list_topic", { cluster })
invoke<TopicInfo[]>("list_topics", { clusterId: cluster.id })
.then((topics) => topics.map((t) => t.name))
.catch((err: TauriError) => {
console.error(err);
addNotification({
type: "error",
title: `Unable to retrieve the list of topics for "${cluster?.name}"`,
Expand Down

0 comments on commit d6d348c

Please sign in to comment.