Skip to content

Commit

Permalink
feat: optimize code
Browse files Browse the repository at this point in the history
  • Loading branch information
poltao committed Jun 2, 2024
1 parent f21a207 commit 0c9d3c7
Show file tree
Hide file tree
Showing 9 changed files with 58 additions and 47 deletions.
13 changes: 9 additions & 4 deletions config/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -296,10 +296,15 @@
| `init_regions_in_background` | Bool | `false` | Initialize all regions in the background during the startup.<br/>By default, it provides services after all regions have been initialized. |
| `rpc_addr` | String | `127.0.0.1:3001` | The gRPC address of the datanode. |
| `rpc_hostname` | String | `None` | The hostname of the datanode. |
| `rpc_runtime_size` | Integer | `8` | The number of gRPC server worker threads. |
| `rpc_max_recv_message_size` | String | `512MB` | The maximum receive message size for gRPC server. |
| `rpc_max_send_message_size` | String | `512MB` | The maximum send message size for gRPC server. |
| `enable_telemetry` | Bool | `true` | Enable telemetry to collect anonymous usage data. |
| `grpc` | -- | -- | The gRPC server options. |
| `grpc.addr` | String | `127.0.0.1:4001` | The address to bind the gRPC server. |
| `grpc.runtime_size` | Integer | `8` | The number of server worker threads. |
| `grpc.tls` | -- | -- | gRPC server TLS options, see `mysql.tls` section. |
| `grpc.tls.mode` | String | `disable` | TLS mode. |
| `grpc.tls.cert_path` | String | `None` | Certificate file path. |
| `grpc.tls.key_path` | String | `None` | Private key file path. |
| `grpc.tls.watch` | Bool | `false` | Watch for Certificate and key file change and auto reload.<br/>For now, gRPC tls config does not support auto reload. |
| `grpc.tls.enable_telemetry` | Bool | `true` | Enable telemetry to collect anonymous usage data. |
| `heartbeat` | -- | -- | The heartbeat options. |
| `heartbeat.interval` | String | `3s` | Interval for sending heartbeat messages to the metasrv. |
| `heartbeat.retry_interval` | String | `3s` | Interval for retrying to send heartbeat messages to the metasrv. |
Expand Down
27 changes: 21 additions & 6 deletions config/datanode.example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,29 @@ rpc_addr = "127.0.0.1:3001"
## +toml2docs:none-default
rpc_hostname = "127.0.0.1"

## The number of gRPC server worker threads.
rpc_runtime_size = 8
## The gRPC server options.
[grpc]
## The address to bind the gRPC server.
addr = "127.0.0.1:4001"
## The number of server worker threads.
runtime_size = 8

## gRPC server TLS options, see `mysql.tls` section.
[grpc.tls]
## TLS mode.
mode = "disable"

## Certificate file path.
## +toml2docs:none-default
cert_path = ""

## The maximum receive message size for gRPC server.
rpc_max_recv_message_size = "512MB"
## Private key file path.
## +toml2docs:none-default
key_path = ""

## The maximum send message size for gRPC server.
rpc_max_send_message_size = "512MB"
## Watch for Certificate and key file change and auto reload.
## For now, gRPC tls config does not support auto reload.
watch = false

## Enable telemetry to collect anonymous usage data.
enable_telemetry = true
Expand Down
10 changes: 6 additions & 4 deletions src/cmd/src/datanode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ impl StartCommand {
};

if let Some(addr) = &self.rpc_addr {
opts.rpc_addr.clone_from(addr);
opts.grpc.addr.clone_from(addr);
}

if self.rpc_hostname.is_some() {
Expand Down Expand Up @@ -306,9 +306,11 @@ mod tests {
mode = "distributed"
enable_memory_catalog = false
node_id = 42
rpc_addr = "127.0.0.1:3001"
rpc_hostname = "127.0.0.1"
rpc_runtime_size = 8
[grpc]
addr = "127.0.0.1:3001"
runtime_size = 8
[heartbeat]
interval = "300ms"
Expand Down Expand Up @@ -355,7 +357,7 @@ mod tests {

let options = cmd.load_options(&GlobalOptions::default()).unwrap();

assert_eq!("127.0.0.1:3001".to_string(), options.rpc_addr);
assert_eq!("127.0.0.1:3001".to_string(), options.grpc.addr);
assert_eq!(Some(42), options.node_id);

let DatanodeWalConfig::RaftEngine(raft_engine_config) = options.wal else {
Expand Down
22 changes: 11 additions & 11 deletions src/cmd/src/standalone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,8 @@ use snafu::ResultExt;
use tracing_appender::non_blocking::WorkerGuard;

use crate::error::{
BuildCacheRegistrySnafu, CreateDirSnafu, IllegalConfigSnafu, InitDdlManagerSnafu,
InitMetadataSnafu, InitTimezoneSnafu, LoadLayeredConfigSnafu, Result, ShutdownDatanodeSnafu,
BuildCacheRegistrySnafu, CreateDirSnafu, InitDdlManagerSnafu, InitMetadataSnafu,
InitTimezoneSnafu, LoadLayeredConfigSnafu, Result, ShutdownDatanodeSnafu,
ShutdownFrontendSnafu, StartDatanodeSnafu, StartFrontendSnafu, StartProcedureManagerSnafu,
StartWalOptionsAllocatorSnafu, StopProcedureManagerSnafu,
};
Expand Down Expand Up @@ -198,7 +198,7 @@ impl StandaloneOptions {
wal: cloned_opts.wal.into(),
storage: cloned_opts.storage,
region_engine: cloned_opts.region_engine,
rpc_addr: cloned_opts.grpc.addr,
grpc: cloned_opts.grpc,
..Default::default()
}
}
Expand Down Expand Up @@ -341,14 +341,14 @@ impl StartCommand {

if let Some(addr) = &self.rpc_addr {
// frontend grpc addr conflict with datanode default grpc addr
let datanode_grpc_addr = DatanodeOptions::default().rpc_addr;
if addr.eq(&datanode_grpc_addr) {
return IllegalConfigSnafu {
msg: format!(
"gRPC listen address conflicts with datanode reserved gRPC addr: {datanode_grpc_addr}",
),
}.fail();
}
// let datanode_grpc_addr = DatanodeOptions::default().grpc.addr;
// if addr.eq(&datanode_grpc_addr) {
// return IllegalConfigSnafu {
// msg: format!(
// "gRPC listen address conflicts with datanode reserved gRPC addr: {datanode_grpc_addr}",
// ),
// }.fail();
// }
opts.grpc.addr.clone_from(addr)
}

Expand Down
16 changes: 3 additions & 13 deletions src/datanode/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,6 @@
use common_base::readable_size::ReadableSize;
use common_base::secrets::SecretString;
use common_config::Configurable;
use common_grpc::channel_manager::{
DEFAULT_MAX_GRPC_RECV_MESSAGE_SIZE, DEFAULT_MAX_GRPC_SEND_MESSAGE_SIZE,
};
pub use common_procedure::options::ProcedureConfig;
use common_telemetry::logging::{LoggingOptions, TracingOptions};
use common_wal::config::DatanodeWalConfig;
Expand All @@ -28,6 +25,7 @@ use meta_client::MetaClientOptions;
use mito2::config::MitoConfig;
use serde::{Deserialize, Serialize};
use servers::export_metrics::ExportMetricsOption;
use servers::grpc::GrpcOptions;
use servers::heartbeat_options::HeartbeatOptions;
use servers::http::HttpOptions;
use servers::Mode;
Expand Down Expand Up @@ -218,13 +216,8 @@ pub struct DatanodeOptions {
pub node_id: Option<u64>,
pub require_lease_before_startup: bool,
pub init_regions_in_background: bool,
pub rpc_addr: String,
pub rpc_hostname: Option<String>,
pub rpc_runtime_size: usize,
// Max gRPC receiving(decoding) message size
pub rpc_max_recv_message_size: ReadableSize,
// Max gRPC sending(encoding) message size
pub rpc_max_send_message_size: ReadableSize,
pub grpc: GrpcOptions,
pub heartbeat: HeartbeatOptions,
pub http: HttpOptions,
pub meta_client: Option<MetaClientOptions>,
Expand All @@ -245,11 +238,8 @@ impl Default for DatanodeOptions {
node_id: None,
require_lease_before_startup: false,
init_regions_in_background: false,
rpc_addr: "127.0.0.1:3001".to_string(),
rpc_hostname: None,
rpc_runtime_size: 8,
rpc_max_recv_message_size: DEFAULT_MAX_GRPC_RECV_MESSAGE_SIZE,
rpc_max_send_message_size: DEFAULT_MAX_GRPC_SEND_MESSAGE_SIZE,
grpc: GrpcOptions::default(),
http: HttpOptions::default(),
meta_client: None,
wal: DatanodeWalConfig::default(),
Expand Down
2 changes: 1 addition & 1 deletion src/datanode/src/datanode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ impl DatanodeBuilder {

let runtime = Arc::new(
Runtime::builder()
.worker_threads(opts.rpc_runtime_size)
.worker_threads(opts.grpc.runtime_size)
.thread_name("io-handlers")
.build()
.context(RuntimeResourceSnafu)?,
Expand Down
2 changes: 1 addition & 1 deletion src/datanode/src/heartbeat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ impl HeartbeatTask {
node_id: opts.node_id.unwrap_or(0),
// We use datanode's start time millis as the node's epoch.
node_epoch: common_time::util::current_time_millis() as u64,
server_addr: opts.rpc_addr.clone(),
server_addr: opts.grpc.addr.clone(),
server_hostname: opts.rpc_hostname.clone(),
running: Arc::new(AtomicBool::new(false)),
meta_client: Arc::new(meta_client),
Expand Down
11 changes: 5 additions & 6 deletions src/datanode/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ use servers::grpc::{GrpcServer, GrpcServerConfig};
use servers::http::HttpServerBuilder;
use servers::metrics_handler::MetricsHandler;
use servers::server::{ServerHandler, ServerHandlers};
use servers::tls::TlsOption;
use snafu::ResultExt;

use crate::config::DatanodeOptions;
Expand Down Expand Up @@ -67,8 +66,8 @@ impl<'a> DatanodeServiceBuilder<'a> {
let handlers = ServerHandlers::default();

if let Some(grpc_server) = self.grpc_server.take() {
let addr: SocketAddr = self.opts.rpc_addr.parse().context(ParseAddrSnafu {
addr: &self.opts.rpc_addr,
let addr: SocketAddr = self.opts.grpc.addr.parse().context(ParseAddrSnafu {
addr: &self.opts.grpc.addr,
})?;
let handler: ServerHandler = (Box::new(grpc_server), addr);
handlers.insert(handler).await;
Expand All @@ -94,9 +93,9 @@ impl<'a> DatanodeServiceBuilder<'a> {
region_server: &RegionServer,
) -> GrpcServerBuilder {
let config = GrpcServerConfig {
max_recv_message_size: opts.rpc_max_recv_message_size.as_bytes() as usize,
max_send_message_size: opts.rpc_max_send_message_size.as_bytes() as usize,
tls: TlsOption::default(),
max_recv_message_size: opts.grpc.max_recv_message_size.as_bytes() as usize,
max_send_message_size: opts.grpc.max_send_message_size.as_bytes() as usize,
tls: opts.grpc.tls.clone(),
};

GrpcServerBuilder::new(config, region_server.runtime())
Expand Down
2 changes: 1 addition & 1 deletion tests-integration/src/test_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ use session::context::QueryContext;

use crate::standalone::{GreptimeDbStandalone, GreptimeDbStandaloneBuilder};

pub const PEER_PLACEHOLDER_ADDR: &str = "127.0.0.1:3001";
pub const PEER_PLACEHOLDER_ADDR: &str = "127.0.0.1:4001";

#[derive(Debug, Clone, Copy, Eq, PartialEq)]
pub enum StorageType {
Expand Down

0 comments on commit 0c9d3c7

Please sign in to comment.