Skip to content

Commit

Permalink
allow setting multiple boost brokers.
Browse files Browse the repository at this point in the history
  • Loading branch information
Rachelint committed Jun 8, 2023
1 parent 25899eb commit 6fbb896
Show file tree
Hide file tree
Showing 3 changed files with 7 additions and 8 deletions.
4 changes: 2 additions & 2 deletions components/message_queue/src/kafka/config.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0.
// Copyright 2022-2023 CeresDB Project Authors. Licensed under Apache-2.0.

//! Kafka implementation's config
Expand All @@ -21,7 +21,7 @@ pub struct Config {
pub struct ClientConfig {
/// The endpoint of boost broker, must be set and will panic if found it
/// None.
pub boost_broker: Option<String>,
pub boost_brokers: Option<Vec<String>>,

/// Maximum message size in bytes.
///
Expand Down
7 changes: 3 additions & 4 deletions components/message_queue/src/kafka/kafka_impl.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0.
// Copyright 2022-2023 CeresDB Project Authors. Licensed under Apache-2.0.

//! Kafka implementation's detail
Expand Down Expand Up @@ -137,12 +137,11 @@ impl KafkaImplInner {
async fn new(config: Config) -> Result<Self> {
info!("Kafka init, config:{:?}", config);

if config.client.boost_broker.is_none() {
if config.client.boost_brokers.is_none() {
panic!("The boost broker must be set");
}

let mut client_builder =
ClientBuilder::new(vec![config.client.boost_broker.clone().unwrap()]);
let mut client_builder = ClientBuilder::new(config.client.boost_brokers.clone().unwrap());
if let Some(max_message_size) = config.client.max_message_size {
client_builder = client_builder.max_message_size(max_message_size);
}
Expand Down
4 changes: 2 additions & 2 deletions components/message_queue/src/tests/cases.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0.
// Copyright 2022-2023 CeresDB Project Authors. Licensed under Apache-2.0.

//! Test cases for message queue
Expand All @@ -22,7 +22,7 @@ use crate::{
#[ignore = "It can just run with a Kafka cluster"]
async fn test_kafka() {
let mut config = Config::default();
config.client.boost_broker = Some("127.0.0.1:9011".to_string());
config.client.boost_brokers = Some(vec!["127.0.0.1:9011".to_string()]);
let kafka_impl = Arc::new(KafkaImpl::new(config).await.unwrap());

run_message_queue_test(kafka_impl).await;
Expand Down

0 comments on commit 6fbb896

Please sign in to comment.