diff --git a/components/message_queue/src/kafka/config.rs b/components/message_queue/src/kafka/config.rs index 8382601f28..880b7f4f94 100644 --- a/components/message_queue/src/kafka/config.rs +++ b/components/message_queue/src/kafka/config.rs @@ -1,4 +1,4 @@ -// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0. +// Copyright 2022-2023 CeresDB Project Authors. Licensed under Apache-2.0. //! Kafka implementation's config @@ -21,7 +21,7 @@ pub struct Config { pub struct ClientConfig { /// The endpoint of boost broker, must be set and will panic if found it /// None. - pub boost_broker: Option, + pub boost_brokers: Option>, /// Maximum message size in bytes. /// diff --git a/components/message_queue/src/kafka/kafka_impl.rs b/components/message_queue/src/kafka/kafka_impl.rs index 6665ec907a..cf341aa219 100644 --- a/components/message_queue/src/kafka/kafka_impl.rs +++ b/components/message_queue/src/kafka/kafka_impl.rs @@ -1,4 +1,4 @@ -// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0. +// Copyright 2022-2023 CeresDB Project Authors. Licensed under Apache-2.0. //! Kafka implementation's detail @@ -137,12 +137,12 @@ impl KafkaImplInner { async fn new(config: Config) -> Result { info!("Kafka init, config:{:?}", config); - if config.client.boost_broker.is_none() { + if config.client.boost_brokers.is_none() { panic!("The boost broker must be set"); } let mut client_builder = - ClientBuilder::new(vec![config.client.boost_broker.clone().unwrap()]); + ClientBuilder::new(dbg!(config.client.boost_brokers.clone().unwrap())); if let Some(max_message_size) = config.client.max_message_size { client_builder = client_builder.max_message_size(max_message_size); } diff --git a/components/message_queue/src/tests/cases.rs b/components/message_queue/src/tests/cases.rs index 3da1182493..29ab6805a3 100644 --- a/components/message_queue/src/tests/cases.rs +++ b/components/message_queue/src/tests/cases.rs @@ -1,4 +1,4 @@ -// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0. +// Copyright 2022-2023 CeresDB Project Authors. Licensed under Apache-2.0. //! Test cases for message queue @@ -22,7 +22,7 @@ use crate::{ #[ignore = "It can just run with a Kafka cluster"] async fn test_kafka() { let mut config = Config::default(); - config.client.boost_broker = Some("127.0.0.1:9011".to_string()); + config.client.boost_brokers = Some(vec!["127.0.0.1:9011".to_string()]); let kafka_impl = Arc::new(KafkaImpl::new(config).await.unwrap()); run_message_queue_test(kafka_impl).await; diff --git a/wal/src/message_queue_impl/region.rs b/wal/src/message_queue_impl/region.rs index 32d6948ab5..1d973f7719 100644 --- a/wal/src/message_queue_impl/region.rs +++ b/wal/src/message_queue_impl/region.rs @@ -1,4 +1,4 @@ -// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0. +// Copyright 2022-2023 CeresDB Project Authors. Licensed under Apache-2.0. //! Region in wal on message queue @@ -909,7 +909,7 @@ mod tests { async fn test_region_kafka_impl() { // Test region let mut config = Config::default(); - config.client.boost_broker = Some("127.0.0.1:9011".to_string()); + config.client.boost_brokers = Some(vec!["127.0.0.1:9011".to_string()]); let kafka_impl = KafkaImpl::new(config).await.unwrap(); let message_queue = Arc::new(kafka_impl); test_region(message_queue).await; diff --git a/wal/src/tests/util.rs b/wal/src/tests/util.rs index 323c7b34d8..ef0efd26ac 100644 --- a/wal/src/tests/util.rs +++ b/wal/src/tests/util.rs @@ -143,7 +143,7 @@ impl WalBuilder for KafkaWalBuilder { async fn build(&self, _data_path: &Path, runtime: Arc) -> Arc { let mut config = KafkaConfig::default(); - config.client.boost_broker = Some("127.0.0.1:9011".to_string()); + config.client.boost_brokers = Some(vec!["127.0.0.1:9011".to_string()]); let kafka_impl = KafkaImpl::new(config).await.unwrap(); let message_queue_impl = MessageQueueImpl::new( self.namespace.clone(),