Skip to content

Commit

Permalink
feat: allow setting multiple kafka boost brokers (apache#980)
Browse files Browse the repository at this point in the history
## Rationale
Kafka client allow setting multiple kafka boost brokers, expose this in
ceresdb.

## Detailed Changes
Allow setting multiple kafka boost brokers.

## Test Plan
Test manually.
  • Loading branch information
Rachelint authored and dust1 committed Aug 9, 2023
1 parent 6e872e8 commit 73c9c02
Show file tree
Hide file tree
Showing 5 changed files with 10 additions and 10 deletions.
4 changes: 2 additions & 2 deletions components/message_queue/src/kafka/config.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0.
// Copyright 2022-2023 CeresDB Project Authors. Licensed under Apache-2.0.

//! Kafka implementation's config
Expand All @@ -21,7 +21,7 @@ pub struct Config {
pub struct ClientConfig {
/// The endpoint of boost broker, must be set and will panic if found it
/// None.
pub boost_broker: Option<String>,
pub boost_brokers: Option<Vec<String>>,

/// Maximum message size in bytes.
///
Expand Down
6 changes: 3 additions & 3 deletions components/message_queue/src/kafka/kafka_impl.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0.
// Copyright 2022-2023 CeresDB Project Authors. Licensed under Apache-2.0.

//! Kafka implementation's detail
Expand Down Expand Up @@ -137,12 +137,12 @@ impl KafkaImplInner {
async fn new(config: Config) -> Result<Self> {
info!("Kafka init, config:{:?}", config);

if config.client.boost_broker.is_none() {
if config.client.boost_brokers.is_none() {
panic!("The boost broker must be set");
}

let mut client_builder =
ClientBuilder::new(vec![config.client.boost_broker.clone().unwrap()]);
ClientBuilder::new(dbg!(config.client.boost_brokers.clone().unwrap()));
if let Some(max_message_size) = config.client.max_message_size {
client_builder = client_builder.max_message_size(max_message_size);
}
Expand Down
4 changes: 2 additions & 2 deletions components/message_queue/src/tests/cases.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0.
// Copyright 2022-2023 CeresDB Project Authors. Licensed under Apache-2.0.

//! Test cases for message queue
Expand All @@ -22,7 +22,7 @@ use crate::{
#[ignore = "It can just run with a Kafka cluster"]
async fn test_kafka() {
let mut config = Config::default();
config.client.boost_broker = Some("127.0.0.1:9011".to_string());
config.client.boost_brokers = Some(vec!["127.0.0.1:9011".to_string()]);
let kafka_impl = Arc::new(KafkaImpl::new(config).await.unwrap());

run_message_queue_test(kafka_impl).await;
Expand Down
4 changes: 2 additions & 2 deletions wal/src/message_queue_impl/region.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0.
// Copyright 2022-2023 CeresDB Project Authors. Licensed under Apache-2.0.

//! Region in wal on message queue
Expand Down Expand Up @@ -909,7 +909,7 @@ mod tests {
async fn test_region_kafka_impl() {
// Test region
let mut config = Config::default();
config.client.boost_broker = Some("127.0.0.1:9011".to_string());
config.client.boost_brokers = Some(vec!["127.0.0.1:9011".to_string()]);
let kafka_impl = KafkaImpl::new(config).await.unwrap();
let message_queue = Arc::new(kafka_impl);
test_region(message_queue).await;
Expand Down
2 changes: 1 addition & 1 deletion wal/src/tests/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ impl WalBuilder for KafkaWalBuilder {

async fn build(&self, _data_path: &Path, runtime: Arc<Runtime>) -> Arc<Self::Wal> {
let mut config = KafkaConfig::default();
config.client.boost_broker = Some("127.0.0.1:9011".to_string());
config.client.boost_brokers = Some(vec!["127.0.0.1:9011".to_string()]);
let kafka_impl = KafkaImpl::new(config).await.unwrap();
let message_queue_impl = MessageQueueImpl::new(
self.namespace.clone(),
Expand Down

0 comments on commit 73c9c02

Please sign in to comment.