diff --git a/components/raftstore/src/engine_store_ffi/mod.rs b/components/raftstore/src/engine_store_ffi/mod.rs index 4e08db899e7..efbc6fb8845 100644 --- a/components/raftstore/src/engine_store_ffi/mod.rs +++ b/components/raftstore/src/engine_store_ffi/mod.rs @@ -62,7 +62,7 @@ impl UnwrapExternCFunc for std::option::Option { pub struct RaftStoreProxy { status: AtomicU8, key_manager: Option>, - read_index_client: Box, + read_index_client: Option>, kv_engine: std::sync::RwLock>, } @@ -78,7 +78,7 @@ impl RaftStoreProxy { pub fn new( status: AtomicU8, key_manager: Option>, - read_index_client: Box, + read_index_client: Option>, kv_engine: std::sync::RwLock>, ) -> Self { RaftStoreProxy { @@ -212,6 +212,14 @@ pub extern "C" fn ffi_batch_read_index( fn_insert_batch_read_index_resp: Option, ) { assert!(!proxy_ptr.is_null()); + unsafe { + match proxy_ptr.as_ref().read_index_client { + Option::None => { + return; + } + _ => {} + } + } debug_assert!(fn_insert_batch_read_index_resp.is_some()); if view.len != 0 { assert_ne!(view.view, std::ptr::null()); @@ -229,6 +237,8 @@ pub extern "C" fn ffi_batch_read_index( let resp = proxy_ptr .as_ref() .read_index_client + .as_ref() + .unwrap() .batch_read_index(req_vec, time::Duration::from_millis(timeout_ms)); assert_ne!(res, std::ptr::null_mut()); for (r, region_id) in &resp { @@ -301,12 +311,22 @@ pub extern "C" fn ffi_make_read_index_task( req_view: BaseBuffView, ) -> RawRustPtr { assert!(!proxy_ptr.is_null()); + unsafe { + match proxy_ptr.as_ref().read_index_client { + Option::None => { + return RawRustPtr::default(); + } + _ => {} + } + } let mut req = kvrpcpb::ReadIndexRequest::default(); req.merge_from_bytes(req_view.to_slice()).unwrap(); let task = unsafe { proxy_ptr .as_ref() .read_index_client + .as_ref() + .unwrap() .make_read_index_task(req) }; return match task { @@ -352,6 +372,14 @@ pub extern "C" fn ffi_poll_read_index_task( waker: RawVoidPtr, ) -> u8 { assert!(!proxy_ptr.is_null()); + unsafe { + match proxy_ptr.as_ref().read_index_client { + Option::None => { + return 0; + } + _ => {} + } + } let task = unsafe { &mut *(task_ptr as *mut crate::engine_store_ffi::read_index_helper::ReadIndexTask) }; @@ -364,6 +392,8 @@ pub extern "C" fn ffi_poll_read_index_task( proxy_ptr .as_ref() .read_index_client + .as_ref() + .unwrap() .poll_read_index_task(task, waker) } { get_engine_store_server_helper().set_read_index_resp(resp_data, &res); diff --git a/components/server/src/proxy.rs b/components/server/src/proxy.rs index db95da8d0a1..ececf00e615 100644 --- a/components/server/src/proxy.rs +++ b/components/server/src/proxy.rs @@ -201,6 +201,11 @@ pub unsafe fn run_proxy( .required(true) .takes_value(true), ) + .arg( + Arg::with_name("only-decryption") + .long("only-decryption") + .help("Only do decryption in Proxy"), + ) .get_matches_from(args); if matches.is_present("print-sample-config") { @@ -245,7 +250,11 @@ pub unsafe fn run_proxy( } config.raft_store.engine_store_server_helper = engine_store_server_helper as *const _ as isize; - crate::server::run_tikv(config, engine_store_server_helper); + if matches.is_present("only-decryption") { + crate::server::run_tikv_only_decryption(config, engine_store_server_helper); + } else { + crate::server::run_tikv(config, engine_store_server_helper); + } } fn check_engine_label(matches: &clap::ArgMatches<'_>) { diff --git a/components/server/src/server.rs b/components/server/src/server.rs index d2036e99570..d7fe533c46d 100644 --- a/components/server/src/server.rs +++ b/components/server/src/server.rs @@ -136,10 +136,10 @@ fn run_impl( let mut proxy = RaftStoreProxy::new( AtomicU8::new(RaftProxyStatus::Idle as u8), tikv.encryption_key_manager.clone(), - Box::new(ReadIndexClient::new( + Some(Box::new(ReadIndexClient::new( tikv.router.clone(), SysQuota::cpu_cores_quota() as usize * 2, - )), + ))), std::sync::RwLock::new(None), ); @@ -221,6 +221,91 @@ fn run_impl( info!("engine-store server is stopped"); } +#[inline] +fn run_impl_only_for_decryption( + config: TiKvConfig, + engine_store_server_helper: &EngineStoreServerHelper, +) { + let encryption_key_manager = + data_key_manager_from_config(&config.security.encryption, &config.storage.data_dir) + .map_err(|e| { + panic!( + "Encryption failed to initialize: {}. code: {}", + e, + e.error_code() + ) + }) + .unwrap() + .map(Arc::new); + + let mut proxy = RaftStoreProxy::new( + AtomicU8::new(RaftProxyStatus::Idle as u8), + encryption_key_manager.clone(), + Option::None, + std::sync::RwLock::new(None), + ); + + let proxy_helper = { + let mut proxy_helper = RaftStoreProxyFFIHelper::new(&proxy); + proxy_helper.fn_server_info = Some(ffi_server_info); + proxy_helper + }; + + info!("set raft-store proxy helper"); + + engine_store_server_helper.handle_set_proxy(&proxy_helper); + + info!("wait for engine-store server to start"); + while engine_store_server_helper.handle_get_engine_store_server_status() + == EngineStoreServerStatus::Idle + { + thread::sleep(Duration::from_millis(200)); + } + + if engine_store_server_helper.handle_get_engine_store_server_status() + != EngineStoreServerStatus::Running + { + info!("engine-store server is not running, make proxy exit"); + return; + } + + info!("engine-store server is started"); + + proxy.set_status(RaftProxyStatus::Running); + + { + debug_assert!( + engine_store_server_helper.handle_get_engine_store_server_status() + == EngineStoreServerStatus::Running + ); + loop { + if engine_store_server_helper.handle_get_engine_store_server_status() + != EngineStoreServerStatus::Running + { + break; + } + thread::sleep(Duration::from_millis(200)); + } + } + + info!( + "found engine-store server status is {:?}, start to stop all services", + engine_store_server_helper.handle_get_engine_store_server_status() + ); + + proxy.set_status(RaftProxyStatus::Stopped); + + info!("all services in raft-store proxy are stopped"); + + info!("wait for engine-store server to stop"); + while engine_store_server_helper.handle_get_engine_store_server_status() + != EngineStoreServerStatus::Terminated + { + thread::sleep(Duration::from_millis(200)); + } + info!("engine-store server is stopped"); +} + /// Run a TiKV server. Returns when the server is shutdown by the user, in which /// case the server will be properly stopped. pub unsafe fn run_tikv(config: TiKvConfig, engine_store_server_helper: &EngineStoreServerHelper) { @@ -250,6 +335,38 @@ pub unsafe fn run_tikv(config: TiKvConfig, engine_store_server_helper: &EngineSt }) } +/// Run a TiKV server only for decryption. Returns when the server is shutdown by the user, in which +/// case the server will be properly stopped. +pub unsafe fn run_tikv_only_decryption( + config: TiKvConfig, + engine_store_server_helper: &EngineStoreServerHelper, +) { + // Sets the global logger ASAP. + // It is okay to use the config w/o `validate()`, + // because `initial_logger()` handles various conditions. + initial_logger(&config); + + // Print version information. + crate::log_proxy_info(); + + // Print resource quota. + SysQuota::log_quota(); + CPU_CORES_QUOTA_GAUGE.set(SysQuota::cpu_cores_quota()); + + // Do some prepare works before start. + pre_start(); + + let _m = Monitor::default(); + + dispatch_api_version!(config.storage.api_version(), { + if !config.raft_engine.enable { + run_impl_only_for_decryption::(config, engine_store_server_helper) + } else { + run_impl_only_for_decryption::(config, engine_store_server_helper) + } + }) +} + const RESERVED_OPEN_FDS: u64 = 1000; const DEFAULT_METRICS_FLUSH_INTERVAL: Duration = Duration::from_millis(10_000); diff --git a/components/test_raftstore/src/cluster.rs b/components/test_raftstore/src/cluster.rs index 5ce6f300903..7334de02bf1 100644 --- a/components/test_raftstore/src/cluster.rs +++ b/components/test_raftstore/src/cluster.rs @@ -320,10 +320,10 @@ impl Cluster { let proxy = Box::new(raftstore::engine_store_ffi::RaftStoreProxy::new( AtomicU8::new(raftstore::engine_store_ffi::RaftProxyStatus::Idle as u8), key_mgr.clone(), - Box::new(raftstore::engine_store_ffi::ReadIndexClient::new( + Some(Box::new(raftstore::engine_store_ffi::ReadIndexClient::new( router.clone(), SysQuota::cpu_cores_quota() as usize * 2, - )), + ))), std::sync::RwLock::new(Some(engines.kv.clone())), ));