diff --git a/src-tauri/src/api/admin.rs b/src-tauri/src/api/admin.rs index 07db4f40..6d97b554 100644 --- a/src-tauri/src/api/admin.rs +++ b/src-tauri/src/api/admin.rs @@ -1,7 +1,12 @@ -use super::error::{ Result }; -use crate::{ configuration::Cluster, kafka::admin::{ list_topics, TopicInfo } }; +use log::debug; + +use crate::lib::TopicInfo; + +use super::{ error::{ Result }, AppState }; #[tauri::command] -pub async fn list_topic(cluster: Cluster, topic: Option<&str>) -> Result> { - Ok(list_topics(&cluster, topic)?) +pub async fn list_topics(cluster_id: String, state: tauri::State<'_, AppState>) -> Result> { + debug!("Retrieve the list of topics"); + let cluster = state.get_by_cluster_id(&cluster_id).await; + Ok(cluster.admin_client.list_topics()?) } \ No newline at end of file diff --git a/src-tauri/src/api/error.rs b/src-tauri/src/api/error.rs index 1611d433..dbeb27ef 100644 --- a/src-tauri/src/api/error.rs +++ b/src-tauri/src/api/error.rs @@ -1,4 +1,4 @@ -use crate::kafka::error::Error as KafkaError; +use crate::{ kafka::error::Error as KafkaError, lib::Error }; use serde::{ Deserialize, Serialize }; pub type Result = std::result::Result; @@ -9,6 +9,19 @@ pub struct TauriError { pub message: String, } +impl From for TauriError { + fn from(err: Error) -> Self { + let (error_type, message) = match err { + Error::AvroParse { message } => ("Avro parser error", message), + Error::IOError { message } => ("IO error", message), + Error::JSONSerdeError { message } => ("JSON Serde error", message), + Error::ConsumerError { message } => ("Kafka Consumer error", message), + Error::KafkaError { message } => ("Kafka error", message), + }; + TauriError { error_type: error_type.into(), message } + } +} + impl From for TauriError { fn from(error: KafkaError) -> Self { match error { diff --git a/src-tauri/src/api/mod.rs b/src-tauri/src/api/mod.rs index 9901ab3f..e1067b54 100644 --- a/src-tauri/src/api/mod.rs +++ b/src-tauri/src/api/mod.rs @@ -4,5 +4,7 @@ pub mod consumer; mod error; mod notification; pub mod schema_registry; +mod state; -pub use notification::notify_error; \ No newline at end of file +pub use notification::notify_error; +pub use state::AppState; \ No newline at end of file diff --git a/src-tauri/src/api/state.rs b/src-tauri/src/api/state.rs new file mode 100644 index 00000000..b1c74a31 --- /dev/null +++ b/src-tauri/src/api/state.rs @@ -0,0 +1,33 @@ +use std::{ collections::HashMap, sync::Arc }; + +use futures::lock::Mutex; +use log::debug; + +use crate::lib::{ Cluster, ConfigStore }; + +type ClusterId = String; + +#[derive(Default)] +pub struct AppState { + clusters: Arc>>, +} + +impl AppState { + pub async fn get_by_cluster_id(&self, cluster_id: &str) -> Cluster { + let clusters = self.clusters.clone(); + let mut map = clusters.lock().await; + if map.get(cluster_id).is_none() { + debug!("Initialise cluster {}", cluster_id); + let configurations = ConfigStore::new().get_configuration().expect("Unable to get the configuration"); + let cluster_config = configurations.clusters + .iter() + .find(|c| c.id == cluster_id) + .expect("Unable to find the cluster config"); + let cluster = Cluster::new(cluster_config.to_owned()); + map.insert(cluster_id.into(), cluster); + } + map.get(cluster_id) + .expect("Something went wrong retrieving a cluster that must be in the clusters vector") + .clone() + } +} \ No newline at end of file diff --git a/src-tauri/src/configuration/config_store.rs b/src-tauri/src/configuration/config_store.rs index 382e0fe5..0a97ac86 100644 --- a/src-tauri/src/configuration/config_store.rs +++ b/src-tauri/src/configuration/config_store.rs @@ -4,6 +4,7 @@ use dirs::home_dir; use std::path::PathBuf; use std::{ fs, path::Path }; +#[derive(Default)] pub struct ConfigStore {} impl ConfigStore { diff --git a/src-tauri/src/lib/admin/mod.rs b/src-tauri/src/lib/admin/mod.rs index 8a804c57..8c0e7223 100644 --- a/src-tauri/src/lib/admin/mod.rs +++ b/src-tauri/src/lib/admin/mod.rs @@ -1,4 +1,5 @@ mod client; mod types; -pub use client::{ Admin, KafkaAdmin }; \ No newline at end of file +pub use client::{ Admin, KafkaAdmin }; +pub use types::*; \ No newline at end of file diff --git a/src-tauri/src/lib/cluster.rs b/src-tauri/src/lib/cluster.rs index cfbeb246..c8511d96 100644 --- a/src-tauri/src/lib/cluster.rs +++ b/src-tauri/src/lib/cluster.rs @@ -5,7 +5,7 @@ use futures::lock::Mutex; use crate::{ lib::{ schema_registry::{ SchemaRegistryClient, CachedSchemaRegistry }, - consumer::{ Consumer, KafkaConsumer }, + consumer::{ Consumer }, admin::{ Admin, KafkaAdmin }, parser::{ Parser, RecordParser }, configuration::{ ClusterConfig, SchemaRegistryConfig }, @@ -13,33 +13,27 @@ use crate::{ }; type TopicName = String; +#[derive(Clone)] pub struct Cluster { - config: ClusterConfig, - schema_registry_client: Option>, - consumers: Arc>>>, - admin_client: Box, - parser: Box, + pub config: ClusterConfig, + pub schema_registry_client: Option>, + pub consumers: Arc>>>, + pub admin_client: Arc, + pub parser: Arc, } impl Cluster { - fn new(config: ClusterConfig) -> Cluster { - let cluster_config = config.clone(); + pub fn new(config: ClusterConfig) -> Cluster { //todo: share schema registry client - // build the admin client - let admin_client: Box = Box::new(KafkaAdmin::new(&cluster_config)); - // build the parser - let parser: Box = Box::new( - RecordParser::new(build_schema_registry_client(config.schema_registry.clone())) - ); Cluster { - config: cluster_config, + config: config.clone(), schema_registry_client: match build_schema_registry_client(config.schema_registry.clone()) { - Some(client) => Some(Box::new(client)), + Some(client) => Some(Arc::new(client)), None => None, }, consumers: Arc::new(Mutex::new(HashMap::new())), - admin_client, - parser, + admin_client: Arc::new(KafkaAdmin::new(&config)), + parser: Arc::new(RecordParser::new(build_schema_registry_client(config.schema_registry.clone()))), } } } diff --git a/src-tauri/src/lib/configuration/config_store.rs b/src-tauri/src/lib/configuration/config_store.rs index c8d3db06..c6c1b90a 100644 --- a/src-tauri/src/lib/configuration/config_store.rs +++ b/src-tauri/src/lib/configuration/config_store.rs @@ -4,10 +4,15 @@ use dirs::home_dir; use std::path::PathBuf; use std::{ fs, path::Path }; +#[derive(Default)] pub struct ConfigStore {} impl ConfigStore { - pub fn get_configuration() -> Result { + //todo: cache? + pub fn new() -> ConfigStore { + ConfigStore {} + } + pub fn get_configuration(&self) -> Result { let config_path = config_path(); let raw_config = (match Path::exists(&config_path) { // read file content @@ -21,7 +26,7 @@ impl ConfigStore { } } - pub fn write_configuration(configuration: &InsulatorConfig) -> Result<()> { + pub fn write_configuration(&self, configuration: &InsulatorConfig) -> Result<()> { let config_path = config_path(); let raw_config = serde_json::to_string_pretty(&configuration)?; fs::write(config_path, raw_config)?; diff --git a/src-tauri/src/lib/configuration/mod.rs b/src-tauri/src/lib/configuration/mod.rs index 40acc62a..5c6fe471 100644 --- a/src-tauri/src/lib/configuration/mod.rs +++ b/src-tauri/src/lib/configuration/mod.rs @@ -1,4 +1,5 @@ mod types; mod config_store; -pub use types::*; \ No newline at end of file +pub use types::*; +pub use config_store::ConfigStore; \ No newline at end of file diff --git a/src-tauri/src/lib/mod.rs b/src-tauri/src/lib/mod.rs index b3cc7fa0..5c869da5 100644 --- a/src-tauri/src/lib/mod.rs +++ b/src-tauri/src/lib/mod.rs @@ -5,4 +5,9 @@ mod admin; mod parser; mod configuration; mod cluster; -mod error; \ No newline at end of file +mod error; + +pub use cluster::Cluster; +pub use admin::{ TopicInfo, PartitionInfo }; +pub use configuration::ConfigStore; +pub use error::Error; \ No newline at end of file diff --git a/src-tauri/src/main.rs b/src-tauri/src/main.rs index 1ad74dc3..dc6f4d5a 100644 --- a/src-tauri/src/main.rs +++ b/src-tauri/src/main.rs @@ -6,9 +6,9 @@ mod kafka; mod schema_registry; mod lib; -use api::consumer::AppConsumers; +use api::{ consumer::AppConsumers, AppState }; use crate::api::{ - admin::list_topic, + admin::list_topics, configuration::{ get_configuration, write_configuration }, consumer::{ get_consumer_state, get_record, start_consumer, stop_consumer }, schema_registry::{ get_schema, list_subjects }, @@ -18,12 +18,13 @@ fn main() { env_logger::init(); tauri::Builder ::default() + .manage(AppState::default()) .manage(AppConsumers::default()) .invoke_handler( tauri::generate_handler![ get_configuration, write_configuration, - list_topic, + list_topics, list_subjects, get_schema, start_consumer, diff --git a/src/tauri.ts b/src/tauri.ts index 87f3e5b5..7ffb169c 100644 --- a/src/tauri.ts +++ b/src/tauri.ts @@ -51,9 +51,10 @@ export const getSchemaVersions = (subjectName: string, config: SchemaRegistry): /** Kafka API **/ export const getTopicNamesList = (cluster: Cluster): Promise => - invoke("list_topic", { cluster }) + invoke("list_topics", { clusterId: cluster.id }) .then((topics) => topics.map((t) => t.name)) .catch((err: TauriError) => { + console.error(err); addNotification({ type: "error", title: `Unable to retrieve the list of topics for "${cluster?.name}"`,