Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

introduce only-decryption mode #118

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 32 additions & 2 deletions components/raftstore/src/engine_store_ffi/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ impl<T> UnwrapExternCFunc<T> for std::option::Option<T> {
pub struct RaftStoreProxy {
status: AtomicU8,
key_manager: Option<Arc<DataKeyManager>>,
read_index_client: Box<dyn read_index_helper::ReadIndex>,
read_index_client: Option<Box<dyn read_index_helper::ReadIndex>>,
kv_engine: std::sync::RwLock<Option<engine_rocks::RocksEngine>>,
}

Expand All @@ -78,7 +78,7 @@ impl RaftStoreProxy {
pub fn new(
status: AtomicU8,
key_manager: Option<Arc<DataKeyManager>>,
read_index_client: Box<dyn read_index_helper::ReadIndex>,
read_index_client: Option<Box<dyn read_index_helper::ReadIndex>>,
kv_engine: std::sync::RwLock<Option<engine_rocks::RocksEngine>>,
) -> Self {
RaftStoreProxy {
Expand Down Expand Up @@ -212,6 +212,14 @@ pub extern "C" fn ffi_batch_read_index(
fn_insert_batch_read_index_resp: Option<unsafe extern "C" fn(RawVoidPtr, BaseBuffView, u64)>,
) {
assert!(!proxy_ptr.is_null());
unsafe {
match proxy_ptr.as_ref().read_index_client {
Option::None => {
return;
}
_ => {}
}
}
debug_assert!(fn_insert_batch_read_index_resp.is_some());
if view.len != 0 {
assert_ne!(view.view, std::ptr::null());
Expand All @@ -229,6 +237,8 @@ pub extern "C" fn ffi_batch_read_index(
let resp = proxy_ptr
.as_ref()
.read_index_client
.as_ref()
.unwrap()
.batch_read_index(req_vec, time::Duration::from_millis(timeout_ms));
assert_ne!(res, std::ptr::null_mut());
for (r, region_id) in &resp {
Expand Down Expand Up @@ -301,12 +311,22 @@ pub extern "C" fn ffi_make_read_index_task(
req_view: BaseBuffView,
) -> RawRustPtr {
assert!(!proxy_ptr.is_null());
unsafe {
match proxy_ptr.as_ref().read_index_client {
Option::None => {
return RawRustPtr::default();
}
_ => {}
}
}
let mut req = kvrpcpb::ReadIndexRequest::default();
req.merge_from_bytes(req_view.to_slice()).unwrap();
let task = unsafe {
proxy_ptr
.as_ref()
.read_index_client
.as_ref()
.unwrap()
.make_read_index_task(req)
};
return match task {
Expand Down Expand Up @@ -352,6 +372,14 @@ pub extern "C" fn ffi_poll_read_index_task(
waker: RawVoidPtr,
) -> u8 {
assert!(!proxy_ptr.is_null());
unsafe {
match proxy_ptr.as_ref().read_index_client {
Option::None => {
return 0;
}
_ => {}
}
}
let task = unsafe {
&mut *(task_ptr as *mut crate::engine_store_ffi::read_index_helper::ReadIndexTask)
};
Expand All @@ -364,6 +392,8 @@ pub extern "C" fn ffi_poll_read_index_task(
proxy_ptr
.as_ref()
.read_index_client
.as_ref()
.unwrap()
.poll_read_index_task(task, waker)
} {
get_engine_store_server_helper().set_read_index_resp(resp_data, &res);
Expand Down
11 changes: 10 additions & 1 deletion components/server/src/proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,11 @@ pub unsafe fn run_proxy(
.required(true)
.takes_value(true),
)
.arg(
Arg::with_name("only-decryption")
.long("only-decryption")
.help("Only do decryption in Proxy"),
)
.get_matches_from(args);

if matches.is_present("print-sample-config") {
Expand Down Expand Up @@ -245,7 +250,11 @@ pub unsafe fn run_proxy(
}

config.raft_store.engine_store_server_helper = engine_store_server_helper as *const _ as isize;
crate::server::run_tikv(config, engine_store_server_helper);
if matches.is_present("only-decryption") {
crate::server::run_tikv_only_decryption(config, engine_store_server_helper);
} else {
crate::server::run_tikv(config, engine_store_server_helper);
}
}

fn check_engine_label(matches: &clap::ArgMatches<'_>) {
Expand Down
121 changes: 119 additions & 2 deletions components/server/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,10 +136,10 @@ fn run_impl<CER: ConfiguredRaftEngine, F: KvFormat>(
let mut proxy = RaftStoreProxy::new(
AtomicU8::new(RaftProxyStatus::Idle as u8),
tikv.encryption_key_manager.clone(),
Box::new(ReadIndexClient::new(
Some(Box::new(ReadIndexClient::new(
tikv.router.clone(),
SysQuota::cpu_cores_quota() as usize * 2,
)),
))),
std::sync::RwLock::new(None),
);

Expand Down Expand Up @@ -221,6 +221,91 @@ fn run_impl<CER: ConfiguredRaftEngine, F: KvFormat>(
info!("engine-store server is stopped");
}

#[inline]
fn run_impl_only_for_decryption<CER: ConfiguredRaftEngine, F: KvFormat>(
config: TiKvConfig,
engine_store_server_helper: &EngineStoreServerHelper,
) {
let encryption_key_manager =
data_key_manager_from_config(&config.security.encryption, &config.storage.data_dir)
.map_err(|e| {
panic!(
"Encryption failed to initialize: {}. code: {}",
e,
e.error_code()
)
})
.unwrap()
.map(Arc::new);

let mut proxy = RaftStoreProxy::new(
AtomicU8::new(RaftProxyStatus::Idle as u8),
encryption_key_manager.clone(),
Option::None,
std::sync::RwLock::new(None),
);

let proxy_helper = {
let mut proxy_helper = RaftStoreProxyFFIHelper::new(&proxy);
proxy_helper.fn_server_info = Some(ffi_server_info);
proxy_helper
};

info!("set raft-store proxy helper");

engine_store_server_helper.handle_set_proxy(&proxy_helper);

info!("wait for engine-store server to start");
while engine_store_server_helper.handle_get_engine_store_server_status()
== EngineStoreServerStatus::Idle
{
thread::sleep(Duration::from_millis(200));
}

if engine_store_server_helper.handle_get_engine_store_server_status()
!= EngineStoreServerStatus::Running
{
info!("engine-store server is not running, make proxy exit");
return;
}

info!("engine-store server is started");

proxy.set_status(RaftProxyStatus::Running);

{
debug_assert!(
engine_store_server_helper.handle_get_engine_store_server_status()
== EngineStoreServerStatus::Running
);
loop {
if engine_store_server_helper.handle_get_engine_store_server_status()
!= EngineStoreServerStatus::Running
{
break;
}
thread::sleep(Duration::from_millis(200));
}
}

info!(
"found engine-store server status is {:?}, start to stop all services",
engine_store_server_helper.handle_get_engine_store_server_status()
);

proxy.set_status(RaftProxyStatus::Stopped);

info!("all services in raft-store proxy are stopped");

info!("wait for engine-store server to stop");
while engine_store_server_helper.handle_get_engine_store_server_status()
!= EngineStoreServerStatus::Terminated
{
thread::sleep(Duration::from_millis(200));
}
info!("engine-store server is stopped");
}

/// Run a TiKV server. Returns when the server is shutdown by the user, in which
/// case the server will be properly stopped.
pub unsafe fn run_tikv(config: TiKvConfig, engine_store_server_helper: &EngineStoreServerHelper) {
Expand Down Expand Up @@ -250,6 +335,38 @@ pub unsafe fn run_tikv(config: TiKvConfig, engine_store_server_helper: &EngineSt
})
}

/// Run a TiKV server only for decryption. Returns when the server is shutdown by the user, in which
/// case the server will be properly stopped.
pub unsafe fn run_tikv_only_decryption(
config: TiKvConfig,
engine_store_server_helper: &EngineStoreServerHelper,
) {
// Sets the global logger ASAP.
// It is okay to use the config w/o `validate()`,
// because `initial_logger()` handles various conditions.
initial_logger(&config);

// Print version information.
crate::log_proxy_info();

// Print resource quota.
SysQuota::log_quota();
CPU_CORES_QUOTA_GAUGE.set(SysQuota::cpu_cores_quota());

// Do some prepare works before start.
pre_start();

let _m = Monitor::default();

dispatch_api_version!(config.storage.api_version(), {
if !config.raft_engine.enable {
run_impl_only_for_decryption::<RocksEngine, API>(config, engine_store_server_helper)
} else {
run_impl_only_for_decryption::<RaftLogEngine, API>(config, engine_store_server_helper)
}
})
}

const RESERVED_OPEN_FDS: u64 = 1000;

const DEFAULT_METRICS_FLUSH_INTERVAL: Duration = Duration::from_millis(10_000);
Expand Down
4 changes: 2 additions & 2 deletions components/test_raftstore/src/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -320,10 +320,10 @@ impl<T: Simulator> Cluster<T> {
let proxy = Box::new(raftstore::engine_store_ffi::RaftStoreProxy::new(
AtomicU8::new(raftstore::engine_store_ffi::RaftProxyStatus::Idle as u8),
key_mgr.clone(),
Box::new(raftstore::engine_store_ffi::ReadIndexClient::new(
Some(Box::new(raftstore::engine_store_ffi::ReadIndexClient::new(
router.clone(),
SysQuota::cpu_cores_quota() as usize * 2,
)),
))),
std::sync::RwLock::new(Some(engines.kv.clone())),
));

Expand Down