Skip to content

Commit

Permalink
chore: minor refactor using metaclient
Browse files Browse the repository at this point in the history
  • Loading branch information
fengjiachun committed Jul 3, 2024
1 parent a7997c8 commit db94781
Show file tree
Hide file tree
Showing 10 changed files with 57 additions and 74 deletions.
20 changes: 13 additions & 7 deletions src/cmd/src/datanode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,14 @@ use common_version::{short_version, version};
use common_wal::config::DatanodeWalConfig;
use datanode::datanode::{Datanode, DatanodeBuilder};
use datanode::service::DatanodeServiceBuilder;
use meta_client::MetaClientOptions;
use meta_client::{MetaClientOptions, MetaClientType};
use servers::Mode;
use snafu::{OptionExt, ResultExt};
use tracing_appender::non_blocking::WorkerGuard;

use crate::error::{
LoadLayeredConfigSnafu, MissingConfigSnafu, Result, ShutdownDatanodeSnafu, StartDatanodeSnafu,
LoadLayeredConfigSnafu, MetaClientInitSnafu, MissingConfigSnafu, Result, ShutdownDatanodeSnafu,
StartDatanodeSnafu,
};
use crate::options::{GlobalOptions, GreptimeOptions};
use crate::{log_versions, App};
Expand Down Expand Up @@ -275,20 +276,25 @@ impl StartCommand {
.await
.context(StartDatanodeSnafu)?;

let node_id = opts
let cluster_id = 0; // TODO(hl): read from config
let member_id = opts
.node_id
.context(MissingConfigSnafu { msg: "'node_id'" })?;

let meta_config = opts.meta_client.as_ref().context(MissingConfigSnafu {
msg: "'meta_client_options'",
})?;

let meta_client = datanode::heartbeat::new_meta_client(node_id, meta_config)
.await
.context(StartDatanodeSnafu)?;
let meta_client = meta_client::create_meta_client(
cluster_id,
MetaClientType::Datanode { member_id },
meta_config,
)
.await
.context(MetaClientInitSnafu)?;

let meta_backend = Arc::new(MetaKvBackend {
client: Arc::new(meta_client.clone()),
client: meta_client.clone(),
});

let mut datanode = DatanodeBuilder::new(opts.clone(), plugins)
Expand Down
8 changes: 8 additions & 0 deletions src/cmd/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,13 @@ pub enum Error {
location: Location,
source: cache::error::Error,
},

#[snafu(display("Failed to initialize meta client"))]
MetaClientInit {
#[snafu(implicit)]
location: Location,
source: meta_client::error::Error,
},
}

pub type Result<T> = std::result::Result<T, Error>;
Expand Down Expand Up @@ -397,6 +404,7 @@ impl ErrorExt for Error {
Self::StartFlownode { source, .. } | Self::ShutdownFlownode { source, .. } => {
source.status_code()
}
Error::MetaClientInit { source, .. } => source.status_code(),
}
}

Expand Down
20 changes: 11 additions & 9 deletions src/cmd/src/flownode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,14 @@ use common_telemetry::logging::TracingOptions;
use common_version::{short_version, version};
use flow::{FlownodeBuilder, FlownodeInstance};
use frontend::heartbeat::handler::invalidate_table_cache::InvalidateTableCacheHandler;
use meta_client::MetaClientOptions;
use meta_client::{MetaClientOptions, MetaClientType};
use servers::Mode;
use snafu::{OptionExt, ResultExt};
use tracing_appender::non_blocking::WorkerGuard;

use crate::error::{
BuildCacheRegistrySnafu, InitMetadataSnafu, LoadLayeredConfigSnafu, MissingConfigSnafu, Result,
ShutdownFlownodeSnafu, StartFlownodeSnafu,
BuildCacheRegistrySnafu, InitMetadataSnafu, LoadLayeredConfigSnafu, MetaClientInitSnafu,
MissingConfigSnafu, Result, ShutdownFlownodeSnafu, StartFlownodeSnafu,
};
use crate::options::{GlobalOptions, GreptimeOptions};
use crate::{log_versions, App};
Expand Down Expand Up @@ -217,19 +217,21 @@ impl StartCommand {
msg: "'cluster_id'",
})?;

let node_id = opts
let member_id = opts
.node_id
.context(MissingConfigSnafu { msg: "'node_id'" })?;

let meta_config = opts.meta_client.as_ref().context(MissingConfigSnafu {
msg: "'meta_client_options'",
})?;

let meta_client = Arc::new(
flow::heartbeat::new_meta_client(cluster_id, node_id, meta_config)
.await
.context(StartFlownodeSnafu)?,
);
let meta_client = meta_client::create_meta_client(
cluster_id,
MetaClientType::Flownode { member_id },
meta_config,
)
.await
.context(MetaClientInitSnafu)?;

let cache_max_capacity = meta_config.metadata_cache_max_capacity;
let cache_ttl = meta_config.metadata_cache_ttl;
Expand Down
16 changes: 11 additions & 5 deletions src/cmd/src/frontend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,15 @@ use frontend::heartbeat::HeartbeatTask;
use frontend::instance::builder::FrontendBuilder;
use frontend::instance::{FrontendInstance, Instance as FeInstance};
use frontend::server::Services;
use meta_client::MetaClientOptions;
use meta_client::{MetaClientOptions, MetaClientType};
use servers::tls::{TlsMode, TlsOption};
use servers::Mode;
use snafu::{OptionExt, ResultExt};
use tracing_appender::non_blocking::WorkerGuard;

use crate::error::{
self, InitTimezoneSnafu, LoadLayeredConfigSnafu, MissingConfigSnafu, Result, StartFrontendSnafu,
self, InitTimezoneSnafu, LoadLayeredConfigSnafu, MetaClientInitSnafu, MissingConfigSnafu,
Result, StartFrontendSnafu,
};
use crate::options::{GlobalOptions, GreptimeOptions};
use crate::{log_versions, App};
Expand Down Expand Up @@ -279,9 +280,14 @@ impl StartCommand {
let cache_ttl = meta_client_options.metadata_cache_ttl;
let cache_tti = meta_client_options.metadata_cache_tti;

let meta_client = FeInstance::create_meta_client(meta_client_options)
.await
.context(StartFrontendSnafu)?;
let cluster_id = 0; // (TODO: jeremy): It is currently a reserved field and has not been enabled.
let meta_client = meta_client::create_meta_client(
cluster_id,
MetaClientType::Frontend,
meta_client_options,
)
.await
.context(MetaClientInitSnafu)?;

// TODO(discord9): add helper function to ease the creation of cache registry&such
let cached_meta_backend = CachedMetaKvBackendBuilder::new(meta_client.clone())
Expand Down
6 changes: 3 additions & 3 deletions src/datanode/src/datanode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use file_engine::engine::FileRegionEngine;
use futures_util::TryStreamExt;
use log_store::kafka::log_store::KafkaLogStore;
use log_store::raft_engine::log_store::RaftEngineLogStore;
use meta_client::client::MetaClient;
use meta_client::MetaClientRef;
use metric_engine::engine::MetricEngine;
use mito2::config::MitoConfig;
use mito2::engine::MitoEngine;
Expand Down Expand Up @@ -155,7 +155,7 @@ impl Datanode {
pub struct DatanodeBuilder {
opts: DatanodeOptions,
plugins: Plugins,
meta_client: Option<MetaClient>,
meta_client: Option<MetaClientRef>,
kv_backend: Option<KvBackendRef>,
}

Expand All @@ -171,7 +171,7 @@ impl DatanodeBuilder {
}
}

pub fn with_meta_client(self, meta_client: MetaClient) -> Self {
pub fn with_meta_client(self, meta_client: MetaClientRef) -> Self {
Self {
meta_client: Some(meta_client),
..self
Expand Down
24 changes: 4 additions & 20 deletions src/datanode/src/heartbeat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use common_meta::heartbeat::mailbox::{HeartbeatMailbox, MailboxRef};
use common_meta::heartbeat::utils::outgoing_message_to_mailbox_message;
use common_telemetry::{debug, error, info, trace, warn};
use meta_client::client::{HeartbeatSender, MetaClient};
use meta_client::{MetaClientOptions, MetaClientType};
use meta_client::MetaClientRef;
use servers::addrs;
use snafu::ResultExt;
use tokio::sync::{mpsc, Notify};
Expand All @@ -49,7 +49,7 @@ pub struct HeartbeatTask {
node_epoch: u64,
peer_addr: String,
running: Arc<AtomicBool>,
meta_client: Arc<MetaClient>,
meta_client: MetaClientRef,
region_server: RegionServer,
interval: u64,
resp_handler_executor: HeartbeatResponseHandlerExecutorRef,
Expand All @@ -67,7 +67,7 @@ impl HeartbeatTask {
pub async fn try_new(
opts: &DatanodeOptions,
region_server: RegionServer,
meta_client: MetaClient,
meta_client: MetaClientRef,
) -> Result<Self> {
let region_alive_keeper = Arc::new(RegionAliveKeeper::new(
region_server.clone(),
Expand All @@ -85,7 +85,7 @@ impl HeartbeatTask {
node_epoch: common_time::util::current_time_millis() as u64,
peer_addr: addrs::resolve_addr(&opts.grpc.addr, Some(&opts.grpc.hostname)),
running: Arc::new(AtomicBool::new(false)),
meta_client: Arc::new(meta_client),
meta_client,
region_server,
interval: opts.heartbeat.interval.as_millis() as u64,
resp_handler_executor,
Expand Down Expand Up @@ -340,19 +340,3 @@ impl HeartbeatTask {
Ok(())
}
}

/// Create metasrv client instance and spawn heartbeat loop.
pub async fn new_meta_client(
member_id: u64,
meta_config: &MetaClientOptions,
) -> Result<MetaClient> {
let cluster_id = 0; // TODO(hl): read from config

meta_client::create_meta_client(
cluster_id,
MetaClientType::Datanode { member_id },
meta_config,
)
.await
.context(MetaClientInitSnafu)
}
15 changes: 0 additions & 15 deletions src/flow/src/heartbeat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -233,18 +233,3 @@ impl HeartbeatTask {
}
}
}

/// Create metasrv client instance and spawn heartbeat loop.
pub async fn new_meta_client(
cluster_id: u64,
member_id: u64,
meta_config: &MetaClientOptions,
) -> Result<MetaClient, Error> {
meta_client::create_meta_client(
cluster_id,
MetaClientType::Flownode { member_id },
meta_config,
)
.await
.context(MetaClientInitSnafu)
}
12 changes: 0 additions & 12 deletions src/frontend/src/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,6 @@ use common_procedure::ProcedureManagerRef;
use common_query::Output;
use common_telemetry::{debug, error, tracing};
use log_store::raft_engine::RaftEngineBackend;
use meta_client::client::MetaClient;
use meta_client::{MetaClientOptions, MetaClientType};
use operator::delete::DeleterRef;
use operator::insert::InserterRef;
use operator::statement::StatementExecutor;
Expand Down Expand Up @@ -129,16 +127,6 @@ pub struct Instance {
}

impl Instance {
pub async fn create_meta_client(
meta_client_options: &MetaClientOptions,
) -> Result<Arc<MetaClient>> {
let cluster_id = 0; // (TODO: jeremy): It is currently a reserved field and has not been enabled.
meta_client::create_meta_client(cluster_id, MetaClientType::Frontend, meta_client_options)
.await
.map(Arc::new)
.context(error::StartMetaClientSnafu)
}

pub async fn try_build_standalone_components(
dir: String,
kv_backend_config: KvBackendConfig,
Expand Down
7 changes: 5 additions & 2 deletions src/meta-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;
use std::time::Duration;

use common_grpc::channel_manager::{ChannelConfig, ChannelManager};
Expand Down Expand Up @@ -67,11 +68,13 @@ pub enum MetaClientType {
Frontend,
}

pub type MetaClientRef = Arc<client::MetaClient>;

pub async fn create_meta_client(
cluster_id: u64,
client_type: MetaClientType,
meta_client_options: &MetaClientOptions,
) -> error::Result<client::MetaClient> {
) -> error::Result<MetaClientRef> {
info!(
"Creating {:?} instance from cluster {} with Metasrv addrs {:?}",
client_type, cluster_id, meta_client_options.metasrv_addrs
Expand Down Expand Up @@ -110,7 +113,7 @@ pub async fn create_meta_client(

meta_client.ask_leader().await?;

Ok(meta_client)
Ok(Arc::new(meta_client))
}

#[cfg(test)]
Expand Down
3 changes: 2 additions & 1 deletion tests-integration/src/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -318,9 +318,10 @@ impl GreptimeDbClusterBuilder {
.channel_manager(metasrv.channel_manager)
.build();
meta_client.start(&[&metasrv.server_addr]).await.unwrap();
let meta_client = Arc::new(meta_client);

let meta_backend = Arc::new(MetaKvBackend {
client: Arc::new(meta_client.clone()),
client: meta_client.clone(),
});

let mut datanode = DatanodeBuilder::new(opts, Plugins::default())
Expand Down

0 comments on commit db94781

Please sign in to comment.