From a7997c8d41555b24fa3f39be7a46071761b027d3 Mon Sep 17 00:00:00 2001 From: jeremyhi Date: Wed, 3 Jul 2024 12:06:42 +0800 Subject: [PATCH 1/3] feat: provide a simple way to create metaclient --- src/cmd/src/datanode.rs | 2 +- src/cmd/src/flownode.rs | 2 +- src/datanode/src/heartbeat.rs | 43 +++++++------------------- src/flow/src/heartbeat.rs | 40 ++++++------------------ src/frontend/src/instance.rs | 35 +++++---------------- src/meta-client/src/lib.rs | 57 +++++++++++++++++++++++++++++++++++ 6 files changed, 87 insertions(+), 92 deletions(-) diff --git a/src/cmd/src/datanode.rs b/src/cmd/src/datanode.rs index f395d6634fa6..bb0db5d7cab1 100644 --- a/src/cmd/src/datanode.rs +++ b/src/cmd/src/datanode.rs @@ -283,7 +283,7 @@ impl StartCommand { msg: "'meta_client_options'", })?; - let meta_client = datanode::heartbeat::new_metasrv_client(node_id, meta_config) + let meta_client = datanode::heartbeat::new_meta_client(node_id, meta_config) .await .context(StartDatanodeSnafu)?; diff --git a/src/cmd/src/flownode.rs b/src/cmd/src/flownode.rs index 9c7cd6695d59..d601dac80a5f 100644 --- a/src/cmd/src/flownode.rs +++ b/src/cmd/src/flownode.rs @@ -226,7 +226,7 @@ impl StartCommand { })?; let meta_client = Arc::new( - flow::heartbeat::new_metasrv_client(cluster_id, node_id, meta_config) + flow::heartbeat::new_meta_client(cluster_id, node_id, meta_config) .await .context(StartFlownodeSnafu)?, ); diff --git a/src/datanode/src/heartbeat.rs b/src/datanode/src/heartbeat.rs index 20eac6f8f5a8..8e8ae8a0cb8d 100644 --- a/src/datanode/src/heartbeat.rs +++ b/src/datanode/src/heartbeat.rs @@ -17,7 +17,6 @@ use std::sync::Arc; use std::time::Duration; use api::v1::meta::{HeartbeatRequest, NodeInfo, Peer, RegionRole, RegionStat}; -use common_grpc::channel_manager::{ChannelConfig, ChannelManager}; use common_meta::distributed_time_constants::META_KEEP_ALIVE_INTERVAL_SECS; use common_meta::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler; use common_meta::heartbeat::handler::{ @@ -26,8 +25,8 @@ use common_meta::heartbeat::handler::{ use common_meta::heartbeat::mailbox::{HeartbeatMailbox, MailboxRef}; use common_meta::heartbeat::utils::outgoing_message_to_mailbox_message; use common_telemetry::{debug, error, info, trace, warn}; -use meta_client::client::{HeartbeatSender, MetaClient, MetaClientBuilder}; -use meta_client::MetaClientOptions; +use meta_client::client::{HeartbeatSender, MetaClient}; +use meta_client::{MetaClientOptions, MetaClientType}; use servers::addrs; use snafu::ResultExt; use tokio::sync::{mpsc, Notify}; @@ -343,37 +342,17 @@ impl HeartbeatTask { } /// Create metasrv client instance and spawn heartbeat loop. -pub async fn new_metasrv_client( - node_id: u64, +pub async fn new_meta_client( + member_id: u64, meta_config: &MetaClientOptions, ) -> Result { let cluster_id = 0; // TODO(hl): read from config - let member_id = node_id; - - let config = ChannelConfig::new() - .timeout(meta_config.timeout) - .connect_timeout(meta_config.connect_timeout) - .tcp_nodelay(meta_config.tcp_nodelay); - let channel_manager = ChannelManager::with_config(config.clone()); - let heartbeat_channel_manager = ChannelManager::with_config( - config - .timeout(meta_config.timeout) - .connect_timeout(meta_config.connect_timeout), - ); - - let mut meta_client = MetaClientBuilder::datanode_default_options(cluster_id, member_id) - .channel_manager(channel_manager) - .heartbeat_channel_manager(heartbeat_channel_manager) - .build(); - meta_client - .start(&meta_config.metasrv_addrs) - .await - .context(MetaClientInitSnafu)?; - // required only when the heartbeat_client is enabled - meta_client - .ask_leader() - .await - .context(MetaClientInitSnafu)?; - Ok(meta_client) + meta_client::create_meta_client( + cluster_id, + MetaClientType::Datanode { member_id }, + meta_config, + ) + .await + .context(MetaClientInitSnafu) } diff --git a/src/flow/src/heartbeat.rs b/src/flow/src/heartbeat.rs index 339f53520d0d..9f8a0566b806 100644 --- a/src/flow/src/heartbeat.rs +++ b/src/flow/src/heartbeat.rs @@ -29,7 +29,7 @@ use common_meta::heartbeat::utils::outgoing_message_to_mailbox_message; use common_telemetry::{debug, error, info, warn}; use greptime_proto::v1::meta::NodeInfo; use meta_client::client::{HeartbeatSender, HeartbeatStream, MetaClient, MetaClientBuilder}; -use meta_client::MetaClientOptions; +use meta_client::{MetaClientOptions, MetaClientType}; use servers::addrs; use servers::heartbeat_options::HeartbeatOptions; use snafu::ResultExt; @@ -235,36 +235,16 @@ impl HeartbeatTask { } /// Create metasrv client instance and spawn heartbeat loop. -pub async fn new_metasrv_client( +pub async fn new_meta_client( cluster_id: u64, - node_id: u64, + member_id: u64, meta_config: &MetaClientOptions, ) -> Result { - let member_id = node_id; - let config = ChannelConfig::new() - .timeout(meta_config.timeout) - .connect_timeout(meta_config.connect_timeout) - .tcp_nodelay(meta_config.tcp_nodelay); - let channel_manager = ChannelManager::with_config(config.clone()); - let heartbeat_channel_manager = ChannelManager::with_config( - config - .timeout(meta_config.timeout) - .connect_timeout(meta_config.connect_timeout), - ); - - let mut meta_client = MetaClientBuilder::flownode_default_options(cluster_id, member_id) - .channel_manager(channel_manager) - .heartbeat_channel_manager(heartbeat_channel_manager) - .build(); - meta_client - .start(&meta_config.metasrv_addrs) - .await - .context(MetaClientInitSnafu)?; - - // required only when the heartbeat_client is enabled - meta_client - .ask_leader() - .await - .context(MetaClientInitSnafu)?; - Ok(meta_client) + meta_client::create_meta_client( + cluster_id, + MetaClientType::Flownode { member_id }, + meta_config, + ) + .await + .context(MetaClientInitSnafu) } diff --git a/src/frontend/src/instance.rs b/src/frontend/src/instance.rs index 91e52815fe3f..2e4bd5c3da18 100644 --- a/src/frontend/src/instance.rs +++ b/src/frontend/src/instance.rs @@ -34,7 +34,6 @@ use common_base::Plugins; use common_config::KvBackendConfig; use common_error::ext::{BoxedError, ErrorExt}; use common_frontend::handler::FrontendInvoker; -use common_grpc::channel_manager::{ChannelConfig, ChannelManager}; use common_meta::key::TableMetadataManagerRef; use common_meta::kv_backend::KvBackendRef; use common_meta::state_store::KvStateStore; @@ -42,10 +41,10 @@ use common_procedure::local::{LocalManager, ManagerConfig}; use common_procedure::options::ProcedureConfig; use common_procedure::ProcedureManagerRef; use common_query::Output; -use common_telemetry::{debug, error, info, tracing}; +use common_telemetry::{debug, error, tracing}; use log_store::raft_engine::RaftEngineBackend; -use meta_client::client::{MetaClient, MetaClientBuilder}; -use meta_client::MetaClientOptions; +use meta_client::client::MetaClient; +use meta_client::{MetaClientOptions, MetaClientType}; use operator::delete::DeleterRef; use operator::insert::InserterRef; use operator::statement::StatementExecutor; @@ -133,31 +132,11 @@ impl Instance { pub async fn create_meta_client( meta_client_options: &MetaClientOptions, ) -> Result> { - info!( - "Creating Frontend instance in distributed mode with Meta server addr {:?}", - meta_client_options.metasrv_addrs - ); - - let channel_config = ChannelConfig::new() - .timeout(meta_client_options.timeout) - .connect_timeout(meta_client_options.connect_timeout) - .tcp_nodelay(meta_client_options.tcp_nodelay); - let ddl_channel_config = channel_config - .clone() - .timeout(meta_client_options.ddl_timeout); - let channel_manager = ChannelManager::with_config(channel_config); - let ddl_channel_manager = ChannelManager::with_config(ddl_channel_config); - - let cluster_id = 0; // It is currently a reserved field and has not been enabled. - let mut meta_client = MetaClientBuilder::frontend_default_options(cluster_id) - .channel_manager(channel_manager) - .ddl_channel_manager(ddl_channel_manager) - .build(); - meta_client - .start(&meta_client_options.metasrv_addrs) + let cluster_id = 0; // (TODO: jeremy): It is currently a reserved field and has not been enabled. + meta_client::create_meta_client(cluster_id, MetaClientType::Frontend, meta_client_options) .await - .context(error::StartMetaClientSnafu)?; - Ok(Arc::new(meta_client)) + .map(Arc::new) + .context(error::StartMetaClientSnafu) } pub async fn try_build_standalone_components( diff --git a/src/meta-client/src/lib.rs b/src/meta-client/src/lib.rs index fb340c2f4adc..07cb0249763d 100644 --- a/src/meta-client/src/lib.rs +++ b/src/meta-client/src/lib.rs @@ -14,8 +14,12 @@ use std::time::Duration; +use common_grpc::channel_manager::{ChannelConfig, ChannelManager}; +use common_telemetry::info; use serde::{Deserialize, Serialize}; +use crate::client::MetaClientBuilder; + pub mod client; pub mod error; @@ -56,5 +60,58 @@ impl Default for MetaClientOptions { } } +#[derive(Debug)] +pub enum MetaClientType { + Datanode { member_id: u64 }, + Flownode { member_id: u64 }, + Frontend, +} + +pub async fn create_meta_client( + cluster_id: u64, + client_type: MetaClientType, + meta_client_options: &MetaClientOptions, +) -> error::Result { + info!( + "Creating {:?} instance from cluster {} with Metasrv addrs {:?}", + client_type, cluster_id, meta_client_options.metasrv_addrs + ); + + let mut builder = match client_type { + MetaClientType::Datanode { member_id } => { + MetaClientBuilder::datanode_default_options(cluster_id, member_id) + } + MetaClientType::Flownode { member_id } => { + MetaClientBuilder::flownode_default_options(cluster_id, member_id) + } + MetaClientType::Frontend => MetaClientBuilder::frontend_default_options(cluster_id), + }; + + let base_config = ChannelConfig::new() + .timeout(meta_client_options.timeout) + .connect_timeout(meta_client_options.connect_timeout) + .tcp_nodelay(meta_client_options.tcp_nodelay); + let heartbeat_config = base_config.clone(); + + if let MetaClientType::Frontend = client_type { + let ddl_config = base_config.clone().timeout(meta_client_options.ddl_timeout); + builder = builder.ddl_channel_manager(ChannelManager::with_config(ddl_config)); + } + + builder = builder + .channel_manager(ChannelManager::with_config(base_config)) + .heartbeat_channel_manager(ChannelManager::with_config(heartbeat_config)); + + let mut meta_client = builder.build(); + + meta_client + .start(&meta_client_options.metasrv_addrs) + .await?; + + meta_client.ask_leader().await?; + + Ok(meta_client) +} + #[cfg(test)] mod mocks; From db94781537f92f83641aac9d0f9f4e0ee087f144 Mon Sep 17 00:00:00 2001 From: jeremyhi Date: Wed, 3 Jul 2024 14:54:50 +0800 Subject: [PATCH 2/3] chore: minor refactor using metaclient --- src/cmd/src/datanode.rs | 20 +++++++++++++------- src/cmd/src/error.rs | 8 ++++++++ src/cmd/src/flownode.rs | 20 +++++++++++--------- src/cmd/src/frontend.rs | 16 +++++++++++----- src/datanode/src/datanode.rs | 6 +++--- src/datanode/src/heartbeat.rs | 24 ++++-------------------- src/flow/src/heartbeat.rs | 15 --------------- src/frontend/src/instance.rs | 12 ------------ src/meta-client/src/lib.rs | 7 +++++-- tests-integration/src/cluster.rs | 3 ++- 10 files changed, 57 insertions(+), 74 deletions(-) diff --git a/src/cmd/src/datanode.rs b/src/cmd/src/datanode.rs index bb0db5d7cab1..874bcbb04890 100644 --- a/src/cmd/src/datanode.rs +++ b/src/cmd/src/datanode.rs @@ -25,13 +25,14 @@ use common_version::{short_version, version}; use common_wal::config::DatanodeWalConfig; use datanode::datanode::{Datanode, DatanodeBuilder}; use datanode::service::DatanodeServiceBuilder; -use meta_client::MetaClientOptions; +use meta_client::{MetaClientOptions, MetaClientType}; use servers::Mode; use snafu::{OptionExt, ResultExt}; use tracing_appender::non_blocking::WorkerGuard; use crate::error::{ - LoadLayeredConfigSnafu, MissingConfigSnafu, Result, ShutdownDatanodeSnafu, StartDatanodeSnafu, + LoadLayeredConfigSnafu, MetaClientInitSnafu, MissingConfigSnafu, Result, ShutdownDatanodeSnafu, + StartDatanodeSnafu, }; use crate::options::{GlobalOptions, GreptimeOptions}; use crate::{log_versions, App}; @@ -275,7 +276,8 @@ impl StartCommand { .await .context(StartDatanodeSnafu)?; - let node_id = opts + let cluster_id = 0; // TODO(hl): read from config + let member_id = opts .node_id .context(MissingConfigSnafu { msg: "'node_id'" })?; @@ -283,12 +285,16 @@ impl StartCommand { msg: "'meta_client_options'", })?; - let meta_client = datanode::heartbeat::new_meta_client(node_id, meta_config) - .await - .context(StartDatanodeSnafu)?; + let meta_client = meta_client::create_meta_client( + cluster_id, + MetaClientType::Datanode { member_id }, + meta_config, + ) + .await + .context(MetaClientInitSnafu)?; let meta_backend = Arc::new(MetaKvBackend { - client: Arc::new(meta_client.clone()), + client: meta_client.clone(), }); let mut datanode = DatanodeBuilder::new(opts.clone(), plugins) diff --git a/src/cmd/src/error.rs b/src/cmd/src/error.rs index d11e2421435c..1bc3d0b8eb81 100644 --- a/src/cmd/src/error.rs +++ b/src/cmd/src/error.rs @@ -339,6 +339,13 @@ pub enum Error { location: Location, source: cache::error::Error, }, + + #[snafu(display("Failed to initialize meta client"))] + MetaClientInit { + #[snafu(implicit)] + location: Location, + source: meta_client::error::Error, + }, } pub type Result = std::result::Result; @@ -397,6 +404,7 @@ impl ErrorExt for Error { Self::StartFlownode { source, .. } | Self::ShutdownFlownode { source, .. } => { source.status_code() } + Error::MetaClientInit { source, .. } => source.status_code(), } } diff --git a/src/cmd/src/flownode.rs b/src/cmd/src/flownode.rs index d601dac80a5f..c0de61564a12 100644 --- a/src/cmd/src/flownode.rs +++ b/src/cmd/src/flownode.rs @@ -28,14 +28,14 @@ use common_telemetry::logging::TracingOptions; use common_version::{short_version, version}; use flow::{FlownodeBuilder, FlownodeInstance}; use frontend::heartbeat::handler::invalidate_table_cache::InvalidateTableCacheHandler; -use meta_client::MetaClientOptions; +use meta_client::{MetaClientOptions, MetaClientType}; use servers::Mode; use snafu::{OptionExt, ResultExt}; use tracing_appender::non_blocking::WorkerGuard; use crate::error::{ - BuildCacheRegistrySnafu, InitMetadataSnafu, LoadLayeredConfigSnafu, MissingConfigSnafu, Result, - ShutdownFlownodeSnafu, StartFlownodeSnafu, + BuildCacheRegistrySnafu, InitMetadataSnafu, LoadLayeredConfigSnafu, MetaClientInitSnafu, + MissingConfigSnafu, Result, ShutdownFlownodeSnafu, StartFlownodeSnafu, }; use crate::options::{GlobalOptions, GreptimeOptions}; use crate::{log_versions, App}; @@ -217,7 +217,7 @@ impl StartCommand { msg: "'cluster_id'", })?; - let node_id = opts + let member_id = opts .node_id .context(MissingConfigSnafu { msg: "'node_id'" })?; @@ -225,11 +225,13 @@ impl StartCommand { msg: "'meta_client_options'", })?; - let meta_client = Arc::new( - flow::heartbeat::new_meta_client(cluster_id, node_id, meta_config) - .await - .context(StartFlownodeSnafu)?, - ); + let meta_client = meta_client::create_meta_client( + cluster_id, + MetaClientType::Flownode { member_id }, + meta_config, + ) + .await + .context(MetaClientInitSnafu)?; let cache_max_capacity = meta_config.metadata_cache_max_capacity; let cache_ttl = meta_config.metadata_cache_ttl; diff --git a/src/cmd/src/frontend.rs b/src/cmd/src/frontend.rs index 6b0e835bbc33..e6cc18f02f60 100644 --- a/src/cmd/src/frontend.rs +++ b/src/cmd/src/frontend.rs @@ -34,14 +34,15 @@ use frontend::heartbeat::HeartbeatTask; use frontend::instance::builder::FrontendBuilder; use frontend::instance::{FrontendInstance, Instance as FeInstance}; use frontend::server::Services; -use meta_client::MetaClientOptions; +use meta_client::{MetaClientOptions, MetaClientType}; use servers::tls::{TlsMode, TlsOption}; use servers::Mode; use snafu::{OptionExt, ResultExt}; use tracing_appender::non_blocking::WorkerGuard; use crate::error::{ - self, InitTimezoneSnafu, LoadLayeredConfigSnafu, MissingConfigSnafu, Result, StartFrontendSnafu, + self, InitTimezoneSnafu, LoadLayeredConfigSnafu, MetaClientInitSnafu, MissingConfigSnafu, + Result, StartFrontendSnafu, }; use crate::options::{GlobalOptions, GreptimeOptions}; use crate::{log_versions, App}; @@ -279,9 +280,14 @@ impl StartCommand { let cache_ttl = meta_client_options.metadata_cache_ttl; let cache_tti = meta_client_options.metadata_cache_tti; - let meta_client = FeInstance::create_meta_client(meta_client_options) - .await - .context(StartFrontendSnafu)?; + let cluster_id = 0; // (TODO: jeremy): It is currently a reserved field and has not been enabled. + let meta_client = meta_client::create_meta_client( + cluster_id, + MetaClientType::Frontend, + meta_client_options, + ) + .await + .context(MetaClientInitSnafu)?; // TODO(discord9): add helper function to ease the creation of cache registry&such let cached_meta_backend = CachedMetaKvBackendBuilder::new(meta_client.clone()) diff --git a/src/datanode/src/datanode.rs b/src/datanode/src/datanode.rs index 0fe593be95cb..d4ba2f77cc24 100644 --- a/src/datanode/src/datanode.rs +++ b/src/datanode/src/datanode.rs @@ -34,7 +34,7 @@ use file_engine::engine::FileRegionEngine; use futures_util::TryStreamExt; use log_store::kafka::log_store::KafkaLogStore; use log_store::raft_engine::log_store::RaftEngineLogStore; -use meta_client::client::MetaClient; +use meta_client::MetaClientRef; use metric_engine::engine::MetricEngine; use mito2::config::MitoConfig; use mito2::engine::MitoEngine; @@ -155,7 +155,7 @@ impl Datanode { pub struct DatanodeBuilder { opts: DatanodeOptions, plugins: Plugins, - meta_client: Option, + meta_client: Option, kv_backend: Option, } @@ -171,7 +171,7 @@ impl DatanodeBuilder { } } - pub fn with_meta_client(self, meta_client: MetaClient) -> Self { + pub fn with_meta_client(self, meta_client: MetaClientRef) -> Self { Self { meta_client: Some(meta_client), ..self diff --git a/src/datanode/src/heartbeat.rs b/src/datanode/src/heartbeat.rs index 8e8ae8a0cb8d..6c2bca43994f 100644 --- a/src/datanode/src/heartbeat.rs +++ b/src/datanode/src/heartbeat.rs @@ -26,7 +26,7 @@ use common_meta::heartbeat::mailbox::{HeartbeatMailbox, MailboxRef}; use common_meta::heartbeat::utils::outgoing_message_to_mailbox_message; use common_telemetry::{debug, error, info, trace, warn}; use meta_client::client::{HeartbeatSender, MetaClient}; -use meta_client::{MetaClientOptions, MetaClientType}; +use meta_client::MetaClientRef; use servers::addrs; use snafu::ResultExt; use tokio::sync::{mpsc, Notify}; @@ -49,7 +49,7 @@ pub struct HeartbeatTask { node_epoch: u64, peer_addr: String, running: Arc, - meta_client: Arc, + meta_client: MetaClientRef, region_server: RegionServer, interval: u64, resp_handler_executor: HeartbeatResponseHandlerExecutorRef, @@ -67,7 +67,7 @@ impl HeartbeatTask { pub async fn try_new( opts: &DatanodeOptions, region_server: RegionServer, - meta_client: MetaClient, + meta_client: MetaClientRef, ) -> Result { let region_alive_keeper = Arc::new(RegionAliveKeeper::new( region_server.clone(), @@ -85,7 +85,7 @@ impl HeartbeatTask { node_epoch: common_time::util::current_time_millis() as u64, peer_addr: addrs::resolve_addr(&opts.grpc.addr, Some(&opts.grpc.hostname)), running: Arc::new(AtomicBool::new(false)), - meta_client: Arc::new(meta_client), + meta_client, region_server, interval: opts.heartbeat.interval.as_millis() as u64, resp_handler_executor, @@ -340,19 +340,3 @@ impl HeartbeatTask { Ok(()) } } - -/// Create metasrv client instance and spawn heartbeat loop. -pub async fn new_meta_client( - member_id: u64, - meta_config: &MetaClientOptions, -) -> Result { - let cluster_id = 0; // TODO(hl): read from config - - meta_client::create_meta_client( - cluster_id, - MetaClientType::Datanode { member_id }, - meta_config, - ) - .await - .context(MetaClientInitSnafu) -} diff --git a/src/flow/src/heartbeat.rs b/src/flow/src/heartbeat.rs index 9f8a0566b806..a48230a89883 100644 --- a/src/flow/src/heartbeat.rs +++ b/src/flow/src/heartbeat.rs @@ -233,18 +233,3 @@ impl HeartbeatTask { } } } - -/// Create metasrv client instance and spawn heartbeat loop. -pub async fn new_meta_client( - cluster_id: u64, - member_id: u64, - meta_config: &MetaClientOptions, -) -> Result { - meta_client::create_meta_client( - cluster_id, - MetaClientType::Flownode { member_id }, - meta_config, - ) - .await - .context(MetaClientInitSnafu) -} diff --git a/src/frontend/src/instance.rs b/src/frontend/src/instance.rs index 2e4bd5c3da18..89e223e6d354 100644 --- a/src/frontend/src/instance.rs +++ b/src/frontend/src/instance.rs @@ -43,8 +43,6 @@ use common_procedure::ProcedureManagerRef; use common_query::Output; use common_telemetry::{debug, error, tracing}; use log_store::raft_engine::RaftEngineBackend; -use meta_client::client::MetaClient; -use meta_client::{MetaClientOptions, MetaClientType}; use operator::delete::DeleterRef; use operator::insert::InserterRef; use operator::statement::StatementExecutor; @@ -129,16 +127,6 @@ pub struct Instance { } impl Instance { - pub async fn create_meta_client( - meta_client_options: &MetaClientOptions, - ) -> Result> { - let cluster_id = 0; // (TODO: jeremy): It is currently a reserved field and has not been enabled. - meta_client::create_meta_client(cluster_id, MetaClientType::Frontend, meta_client_options) - .await - .map(Arc::new) - .context(error::StartMetaClientSnafu) - } - pub async fn try_build_standalone_components( dir: String, kv_backend_config: KvBackendConfig, diff --git a/src/meta-client/src/lib.rs b/src/meta-client/src/lib.rs index 07cb0249763d..0a19539977e2 100644 --- a/src/meta-client/src/lib.rs +++ b/src/meta-client/src/lib.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::sync::Arc; use std::time::Duration; use common_grpc::channel_manager::{ChannelConfig, ChannelManager}; @@ -67,11 +68,13 @@ pub enum MetaClientType { Frontend, } +pub type MetaClientRef = Arc; + pub async fn create_meta_client( cluster_id: u64, client_type: MetaClientType, meta_client_options: &MetaClientOptions, -) -> error::Result { +) -> error::Result { info!( "Creating {:?} instance from cluster {} with Metasrv addrs {:?}", client_type, cluster_id, meta_client_options.metasrv_addrs @@ -110,7 +113,7 @@ pub async fn create_meta_client( meta_client.ask_leader().await?; - Ok(meta_client) + Ok(Arc::new(meta_client)) } #[cfg(test)] diff --git a/tests-integration/src/cluster.rs b/tests-integration/src/cluster.rs index adc914e56d56..1c01aaf58479 100644 --- a/tests-integration/src/cluster.rs +++ b/tests-integration/src/cluster.rs @@ -318,9 +318,10 @@ impl GreptimeDbClusterBuilder { .channel_manager(metasrv.channel_manager) .build(); meta_client.start(&[&metasrv.server_addr]).await.unwrap(); + let meta_client = Arc::new(meta_client); let meta_backend = Arc::new(MetaKvBackend { - client: Arc::new(meta_client.clone()), + client: meta_client.clone(), }); let mut datanode = DatanodeBuilder::new(opts, Plugins::default()) From 41cecd36c69207e2bbd900271f8f59b762334be7 Mon Sep 17 00:00:00 2001 From: jeremyhi Date: Wed, 3 Jul 2024 14:59:11 +0800 Subject: [PATCH 3/3] chore: minor refactor using metaclient --- src/meta-client/examples/lock.rs | 13 ++++++++----- src/meta-client/src/client.rs | 2 +- 2 files changed, 9 insertions(+), 6 deletions(-) diff --git a/src/meta-client/examples/lock.rs b/src/meta-client/examples/lock.rs index 89a50affe86d..c8a8b61d6067 100644 --- a/src/meta-client/examples/lock.rs +++ b/src/meta-client/examples/lock.rs @@ -12,12 +12,14 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::sync::Arc; use std::time::Duration; use api::v1::meta::Role; use common_grpc::channel_manager::{ChannelConfig, ChannelManager}; use common_meta::rpc::lock::{LockRequest, UnlockRequest}; -use meta_client::client::{MetaClient, MetaClientBuilder}; +use meta_client::client::MetaClientBuilder; +use meta_client::MetaClientRef; use tracing::{info, subscriber}; use tracing_subscriber::FmtSubscriber; @@ -39,6 +41,7 @@ async fn run() { .channel_manager(channel_manager) .build(); meta_client.start(&["127.0.0.1:3002"]).await.unwrap(); + let meta_client = Arc::new(meta_client); run_normal(meta_client.clone()).await; @@ -47,7 +50,7 @@ async fn run() { run_multi_thread_with_one_timeout(meta_client).await; } -async fn run_normal(meta_client: MetaClient) { +async fn run_normal(meta_client: MetaClientRef) { let name = "lock_name".as_bytes().to_vec(); let expire_secs = 60; @@ -70,7 +73,7 @@ async fn run_normal(meta_client: MetaClient) { info!("unlock success!"); } -async fn run_multi_thread(meta_client: MetaClient) { +async fn run_multi_thread(meta_client: MetaClientRef) { let meta_client_clone = meta_client.clone(); let join1 = tokio::spawn(async move { run_normal(meta_client_clone.clone()).await; @@ -86,7 +89,7 @@ async fn run_multi_thread(meta_client: MetaClient) { join2.await.unwrap(); } -async fn run_multi_thread_with_one_timeout(meta_client: MetaClient) { +async fn run_multi_thread_with_one_timeout(meta_client: MetaClientRef) { let meta_client_clone = meta_client.clone(); let join1 = tokio::spawn(async move { run_with_timeout(meta_client_clone.clone()).await; @@ -102,7 +105,7 @@ async fn run_multi_thread_with_one_timeout(meta_client: MetaClient) { join2.await.unwrap(); } -async fn run_with_timeout(meta_client: MetaClient) { +async fn run_with_timeout(meta_client: MetaClientRef) { let name = "lock_name".as_bytes().to_vec(); let expire_secs = 5; diff --git a/src/meta-client/src/client.rs b/src/meta-client/src/client.rs index d43840e73ad2..840fec70a80e 100644 --- a/src/meta-client/src/client.rs +++ b/src/meta-client/src/client.rs @@ -213,7 +213,7 @@ impl MetaClientBuilder { } } -#[derive(Clone, Debug, Default)] +#[derive(Debug, Default)] pub struct MetaClient { id: Id, channel_manager: ChannelManager,