Skip to content

Commit

Permalink
remove engine_store_ffi out of raftstore (#167)
Browse files Browse the repository at this point in the history
  • Loading branch information
CalvinNeo authored Sep 13, 2022
1 parent 13a534e commit 20d7ef2
Show file tree
Hide file tree
Showing 30 changed files with 255 additions and 103 deletions.
69 changes: 69 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

96 changes: 96 additions & 0 deletions components/engine_store_ffi/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
[package]
name = "engine_store_ffi"
version = "0.0.1"
authors = ["The TiKV Authors"]
license = "Apache-2.0"
edition = "2018"
publish = false

[features]
default = ["test-engine-kv-rocksdb", "test-engine-raft-raft-engine"]
failpoints = ["fail/failpoints"]
testexport = []
test-raftstore-proxy = []
test-engine-kv-rocksdb = [
"engine_test/test-engine-kv-rocksdb"
]
test-engine-raft-raft-engine = [
"engine_test/test-engine-raft-raft-engine"
]
test-engines-rocksdb = [
"engine_test/test-engines-rocksdb",
]
test-engines-panic = [
"engine_test/test-engines-panic",
]

cloud-aws = ["sst_importer/cloud-aws"]
cloud-gcp = ["sst_importer/cloud-gcp"]
cloud-azure = ["sst_importer/cloud-azure"]
compat_new_proxy = ["engine_tiflash/compat_new_proxy"]
compat_old_proxy = ["engine_tiflash/compat_old_proxy"]

[dependencies]
batch-system = { path = "../batch-system", default-features = false }
bitflags = "1.0.1"
byteorder = "1.2"
bytes = "1.0"
collections = { path = "../collections" }
crossbeam = "0.8"
derivative = "2"
encryption = { path = "../encryption", default-features = false }

engine_rocks = { path = "../engine_rocks", default-features = false }
# Should be [dev-dependencies] but we need to control the features
# https://github.com/rust-lang/cargo/issues/6915
engine_test = { path = "../engine_test", default-features = false }
engine_tiflash = { path = "../../engine_tiflash", default-features = false }
engine_traits = { path = "../engine_traits", default-features = false }
error_code = { path = "../error_code", default-features = false }
fail = "0.5"
file_system = { path = "../file_system", default-features = false }
fs2 = "0.4"
futures = "0.3"
futures-util = { version = "0.3.1", default-features = false, features = ["io"] }
getset = "0.1"
grpcio-health = { version = "0.10", default-features = false, features = ["protobuf-codec"] }
into_other = { path = "../into_other", default-features = false }
itertools = "0.10"
keys = { path = "../keys", default-features = false }
kvproto = { git = "https://github.com/pingcap/kvproto.git" }
lazy_static = "1.3"
log = { version = "0.4", features = ["max_level_trace", "release_max_level_debug"] }
log_wrappers = { path = "../log_wrappers" }
online_config = { path = "../online_config" }
ordered-float = "2.6"
parking_lot = "0.12"
pd_client = { path = "../pd_client", default-features = false }
prometheus = { version = "0.13", features = ["nightly"] }
prometheus-static-metric = "0.5"
protobuf = { version = "2.8", features = ["bytes"] }
raft = { version = "0.7.0", default-features = false, features = ["protobuf-codec"] }
raft-proto = { version = "0.7.0", default-features = false }
raftstore = { path = "../raftstore", default-features = false }
rand = "0.8.3"
serde = "1.0"
serde_derive = "1.0"
serde_with = "1.4"
slog = { version = "2.3", features = ["max_level_trace", "release_max_level_debug"] }
slog-global = { version = "0.1", git = "https://github.com/breeswish/slog-global.git", rev = "d592f88e4dbba5eb439998463054f1a44fbf17b9" }
smallvec = "1.4"
sst_importer = { path = "../sst_importer", default-features = false }
tempfile = "3.0"
thiserror = "1.0"
tikv_alloc = { path = "../tikv_alloc" }
tikv_util = { path = "../tikv_util", default-features = false }
time = "0.1"
tokio = { version = "1.5", features = ["sync", "rt-multi-thread"] }
tokio-timer = { git = "https://github.com/tikv/tokio", branch = "tokio-timer-hotfix" }
uuid = { version = "0.8.1", features = ["serde", "v4"] }
yatp = { git = "https://github.com/tikv/yatp.git", branch = "master" }

[dev-dependencies]
encryption_export = { path = "../encryption/export", default-features = false }
engine_panic = { path = "../engine_panic", default-features = false }
panic_hook = { path = "../panic_hook" }
test_sst_importer = { path = "../test_sst_importer", default-features = false }
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
// Copyright 2022 TiKV Project Authors. Licensed under Apache-2.0.
#![feature(drain_filter)]

#[allow(dead_code)]
pub mod interfaces;
Expand Down Expand Up @@ -34,14 +35,12 @@ pub use self::interfaces::root::DB::{
KVGetStatus, RaftCmdHeader, RaftProxyStatus, RaftStoreProxyFFIHelper, RawCppPtr,
RawCppStringPtr, RawVoidPtr, SSTReaderPtr, StoreStats, WriteCmdType, WriteCmdsView,
};
use self::{
interfaces::root::DB::{
ConstRawVoidPtr, FileEncryptionInfoRaw, RaftStoreProxyPtr, RawCppPtrType, RawRustPtr,
SSTReaderInterfaces, SSTView, SSTViewVec, RAFT_STORE_PROXY_MAGIC_NUMBER,
RAFT_STORE_PROXY_VERSION,
},
lock_cf_reader::LockCFFileReader,
use self::interfaces::root::DB::{
ConstRawVoidPtr, FileEncryptionInfoRaw, RaftStoreProxyPtr, RawCppPtrType, RawRustPtr,
SSTReaderInterfaces, SSTView, SSTViewVec, RAFT_STORE_PROXY_MAGIC_NUMBER,
RAFT_STORE_PROXY_VERSION,
};
use crate::lock_cf_reader::LockCFFileReader;

pub type TiFlashEngine = engine_tiflash::RocksEngine;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,10 @@ use std::{

use encryption::DataKeyManager;
use file_system::File;
use raftstore::store::snap::snap_io::get_decrypter_reader;
use tikv_util::codec::bytes::CompactBytesFromFileDecoder;

use crate::{
engine_store_ffi::interfaces::root::DB::{BaseBuffView, RawVoidPtr},
store::snap::snap_io::get_decrypter_reader,
};
use crate::interfaces::root::DB::{BaseBuffView, RawVoidPtr};

type LockCFDecoder = BufReader<Box<dyn Read + Send>>;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,30 +20,31 @@ use kvproto::{
raft_serverpb::RaftApplyState,
};
use raft::{eraftpb, StateRole};
use sst_importer::SstImporter;
use tikv_util::{box_err, debug, error, info, warn};
use yatp::{
pool::{Builder, ThreadPool},
task::future::TaskCell,
};

use crate::{
use raftstore::{
coprocessor,
coprocessor::{
AdminObserver, ApplyCtxInfo, ApplySnapshotObserver, BoxAdminObserver,
BoxApplySnapshotObserver, BoxPdTaskObserver, BoxQueryObserver, BoxRegionChangeObserver,
Cmd, Coprocessor, CoprocessorHost, ObserverContext, PdTaskObserver, QueryObserver,
RegionChangeEvent, RegionChangeObserver, RegionState, StoreSizeInfo,
},
engine_store_ffi::{
gen_engine_store_server_helper,
interfaces::root::{DB as ffi_interfaces, DB::EngineStoreApplyRes},
name_to_cf, ColumnFamilyType, EngineStoreServerHelper, RaftCmdHeader, RawCppPtr,
TiFlashEngine, WriteCmdType, WriteCmds, CF_DEFAULT, CF_LOCK, CF_WRITE,
},
store,
store::{check_sst_for_ingestion, snap::plain_file_used, SnapKey},
Error, Result,
};
use sst_importer::SstImporter;
use tikv_util::{box_err, debug, error, info, warn};
use yatp::{
pool::{Builder, ThreadPool},
task::future::TaskCell,
};

use crate::{
gen_engine_store_server_helper,
interfaces::root::{DB as ffi_interfaces, DB::EngineStoreApplyRes},
name_to_cf, ColumnFamilyType, EngineStoreServerHelper, RaftCmdHeader, RawCppPtr, TiFlashEngine,
WriteCmdType, WriteCmds, CF_DEFAULT, CF_LOCK, CF_WRITE,
};

impl Into<engine_tiflash::FsStatsExt> for ffi_interfaces::StoreStats {
fn into(self) -> FsStatsExt {
Expand Down Expand Up @@ -581,7 +582,7 @@ impl PdTaskObserver for TiFlashObserver {
}
}

fn retrieve_sst_files(snap: &crate::store::Snapshot) -> Vec<(PathBuf, ColumnFamilyType)> {
fn retrieve_sst_files(snap: &store::Snapshot) -> Vec<(PathBuf, ColumnFamilyType)> {
let mut sst_views: Vec<(PathBuf, ColumnFamilyType)> = vec![];
let mut ssts = vec![];
for cf_file in snap.cf_files() {
Expand Down Expand Up @@ -640,8 +641,8 @@ impl ApplySnapshotObserver for TiFlashObserver {
&self,
ob_ctx: &mut ObserverContext<'_>,
peer_id: u64,
snap_key: &crate::store::SnapKey,
snap: Option<&crate::store::Snapshot>,
snap_key: &store::SnapKey,
snap: Option<&store::Snapshot>,
) {
info!("pre apply snapshot";
"peer_id" => peer_id,
Expand Down Expand Up @@ -703,19 +704,19 @@ impl ApplySnapshotObserver for TiFlashObserver {
&self,
ob_ctx: &mut ObserverContext<'_>,
peer_id: u64,
snap_key: &crate::store::SnapKey,
snap: Option<&crate::store::Snapshot>,
) -> std::result::Result<(), coprocessor::Error> {
snap_key: &store::SnapKey,
snap: Option<&store::Snapshot>,
) {
fail::fail_point!("on_ob_post_apply_snapshot", |_| {
return Err(box_err!("on_ob_post_apply_snapshot"));
return;
});
info!("post apply snapshot";
"peer_id" => ?peer_id,
"snap_key" => ?snap_key,
"region" => ?ob_ctx.region(),
);
let snap = match snap {
None => return Ok(()),
None => return,
Some(s) => s,
};
let maybe_snapshot = {
Expand Down Expand Up @@ -788,7 +789,6 @@ impl ApplySnapshotObserver for TiFlashObserver {
"pending" => self.engine.pending_applies_count.load(Ordering::SeqCst),
);
}
Ok(())
}

fn should_pre_apply_snapshot(&self) -> bool {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,13 @@ use kvproto::{
kvrpcpb::{ReadIndexRequest, ReadIndexResponse},
raft_cmdpb::{CmdType, RaftCmdRequest, RaftRequestHeader, Request as RaftRequest},
};
use tikv_util::{debug, error, future::paired_future_callback};

use super::utils::ArcNotifyWaker;
use crate::{
use raftstore::{
router::RaftStoreRouter,
store::{Callback, RaftCmdExtraOpts, RaftRouter, ReadResponse},
};
use tikv_util::{debug, error, future::paired_future_callback};

use super::utils::ArcNotifyWaker;

pub trait ReadIndex: Sync + Send {
// To remove
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use futures_util::{compat::Future01CompatExt, future::BoxFuture, FutureExt};
use tikv_util::timer::start_global_timer;
use tokio_timer::timer::Handle;

use crate::engine_store_ffi::lazy_static;
use crate::lazy_static;

lazy_static! {
pub static ref PROXY_TIMER_HANDLE: Handle = start_global_timer("proxy-timer");
Expand Down
1 change: 1 addition & 0 deletions components/proxy_server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ encryption = { path = "../encryption", default-features = false }
encryption_export = { path = "../encryption/export", default-features = false }
engine_rocks = { path = "../engine_rocks", default-features = false }
engine_rocks_helper = { path = "../engine_rocks_helper" }
engine_store_ffi = { path = "../engine_store_ffi", default-features = false }
engine_traits = { path = "../engine_traits", default-features = false }
error_code = { path = "../error_code", default-features = false }
fail = "0.5"
Expand Down
7 changes: 3 additions & 4 deletions components/proxy_server/src/proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,9 @@ pub unsafe fn run_proxy(
argv: *const *const c_char,
engine_store_server_helper: *const u8,
) {
raftstore::engine_store_ffi::init_engine_store_server_helper(engine_store_server_helper);
let engine_store_server_helper = raftstore::engine_store_ffi::gen_engine_store_server_helper(
engine_store_server_helper as isize,
);
engine_store_ffi::init_engine_store_server_helper(engine_store_server_helper);
let engine_store_server_helper =
engine_store_ffi::gen_engine_store_server_helper(engine_store_server_helper as isize);

let mut args = vec![];

Expand Down
Loading

0 comments on commit 20d7ef2

Please sign in to comment.