Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support can_apply_snapshot #166

Merged
merged 4 commits into from
Sep 13, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions components/engine_rocks/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,10 @@ impl RocksEngine {
pub fn set_shared_block_cache(&mut self, enable: bool) {
self.shared_block_cache = enable;
}

pub fn shared_block_cache(&self) -> bool {
self.shared_block_cache
}
}

impl KvEngine for RocksEngine {
Expand Down
8 changes: 8 additions & 0 deletions components/engine_traits/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,14 @@ pub trait KvEngine:
/// This only exists as a temporary hack during refactoring.
/// It cannot be used forever.
fn bad_downcast<T: 'static>(&self) -> &T;

/// Returns false if KvEngine can't apply snapshot for this region now.
/// Some KvEngines need to do some transforms before apply data from
/// snapshot. These procedures can be batched in background if there are
/// more than one incoming snapshots, thus not blocking applying thread.
fn can_apply_snapshot(&self, _is_timeout: bool, _new_batch: bool, _region_id: u64) -> bool {
true
}
}

/// A factory trait to create new engine.
Expand Down
7 changes: 7 additions & 0 deletions components/engine_traits/src/misc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,13 @@ pub trait MiscExt: CFNamesExt + FlowControlFactorsExt {

fn flush_cf(&self, cf: &str, sync: bool) -> Result<()>;

fn delete_ranges_cfs(&self, strategy: DeleteStrategy, ranges: &[Range<'_>]) -> Result<()> {
for cf in self.cf_names() {
self.delete_ranges_cf(cf, strategy.clone(), ranges)?;
}
Ok(())
}

fn delete_all_in_range(&self, strategy: DeleteStrategy, ranges: &[Range<'_>]) -> Result<()> {
for cf in self.cf_names() {
self.delete_ranges_cf(cf, strategy.clone(), ranges)?;
Expand Down
8 changes: 4 additions & 4 deletions components/proxy_server/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,9 +145,9 @@ pub fn setup_default_tikv_config(default: &mut TiKvConfig) {
default.server.status_addr = TIFLASH_DEFAULT_STATUS_ADDR.to_string();
default.server.advertise_status_addr = TIFLASH_DEFAULT_STATUS_ADDR.to_string();
default.raft_store.region_worker_tick_interval = ReadableDuration::millis(500);
let stale_peer_check_tick =
let clean_stale_ranges_tick =
(10_000 / default.raft_store.region_worker_tick_interval.as_millis()) as usize;
default.raft_store.stale_peer_check_tick = stale_peer_check_tick;
default.raft_store.clean_stale_ranges_tick = clean_stale_ranges_tick;

// Unlike TiKV, in TiFlash, we use the low-priority pool to both decode snapshots
// (transform from row to column) and ingest SST. These operations are pretty heavy.
Expand All @@ -170,9 +170,9 @@ pub fn address_proxy_config(config: &mut TiKvConfig) {
.server
.labels
.insert(DEFAULT_ENGINE_LABEL_KEY.to_owned(), engine_name);
let stale_peer_check_tick =
let clean_stale_ranges_tick =
(10_000 / config.raft_store.region_worker_tick_interval.as_millis()) as usize;
config.raft_store.stale_peer_check_tick = stale_peer_check_tick;
config.raft_store.clean_stale_ranges_tick = clean_stale_ranges_tick;
}

pub fn validate_and_persist_config(config: &mut TiKvConfig, persist: bool) {
Expand Down
8 changes: 2 additions & 6 deletions components/raftstore/src/coprocessor/dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -537,16 +537,12 @@ impl<E: KvEngine> CoprocessorHost<E> {
peer_id: u64,
snap_key: &crate::store::SnapKey,
snap: Option<&crate::store::Snapshot>,
) -> Result<()> {
) {
let mut ctx = ObserverContext::new(region);
for observer in &self.registry.apply_snapshot_observers {
let observer = observer.observer.inner();
let res = observer.post_apply_snapshot(&mut ctx, peer_id, snap_key, snap);
if res.is_err() {
return res;
}
observer.post_apply_snapshot(&mut ctx, peer_id, snap_key, snap);
}
Ok(())
}

pub fn new_split_checker_host<'a>(
Expand Down
19 changes: 9 additions & 10 deletions components/raftstore/src/engine_store_ffi/observer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use kvproto::{
};
use raft::{eraftpb, StateRole};
use sst_importer::SstImporter;
use tikv_util::{box_err, debug, error, info};
use tikv_util::{box_err, debug, error, info, warn};
use yatp::{
pool::{Builder, ThreadPool},
task::future::TaskCell,
Expand Down Expand Up @@ -668,11 +668,11 @@ impl ApplySnapshotObserver for TiFlashObserver {
let region = ob_ctx.region().clone();
let snap_key = snap_key.clone();
let ssts = retrieve_sst_files(snap);
self.engine
.pending_applies_count
.fetch_add(1, Ordering::SeqCst);
match self.apply_snap_pool.as_ref() {
Some(p) => {
self.engine
.pending_applies_count
.fetch_add(1, Ordering::SeqCst);
p.spawn(async move {
// The original implementation is in `Snapshot`, so we don't need to care abort lifetime.
fail::fail_point!("before_actually_pre_handle", |_| {});
Expand All @@ -690,10 +690,8 @@ impl ApplySnapshotObserver for TiFlashObserver {
});
}
None => {
self.engine
.pending_applies_count
.fetch_sub(1, Ordering::SeqCst);
error!("apply_snap_pool is not initialized, quit background pre apply";
// quit background pre handling
warn!("apply_snap_pool is not initialized";
"peer_id" => peer_id,
"region_id" => ob_ctx.region().get_id()
);
Expand Down Expand Up @@ -758,8 +756,9 @@ impl ApplySnapshotObserver for TiFlashObserver {
neer_retry
}
None => {
// We can't find background pre-handle task,
// maybe we can't get snapshot at that time.
// We can't find background pre-handle task, maybe:
// 1. we can't get snapshot from snap manager at that time.
// 2. we disabled background pre handling.
info!("pre-handled snapshot not found";
"snap_key" => ?snap_key,
"region" => ?ob_ctx.region(),
Expand Down
15 changes: 5 additions & 10 deletions components/raftstore/src/store/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,16 +147,16 @@ pub struct Config {
#[online_config(skip)]
pub snap_apply_batch_size: ReadableSize,

#[online_config(skip)]
pub snap_handle_pool_size: usize,

// used to periodically check whether schedule pending applies in region runner
#[doc(hidden)]
#[online_config(skip)]
pub region_worker_tick_interval: ReadableDuration,

// used to periodically check whether we should delete a stale peer's range in
// region runner
#[doc(hidden)]
#[online_config(skip)]
pub stale_peer_check_tick: usize,
pub clean_stale_ranges_tick: usize,

// Interval (ms) to check region whether the data is consistent.
pub consistency_check_interval: ReadableDuration,
Expand Down Expand Up @@ -349,13 +349,12 @@ impl Default for Config {
peer_stale_state_check_interval: ReadableDuration::minutes(5),
leader_transfer_max_log_lag: 128,
snap_apply_batch_size: ReadableSize::mb(10),
snap_handle_pool_size: 2,
region_worker_tick_interval: if cfg!(feature = "test") {
ReadableDuration::millis(200)
} else {
ReadableDuration::millis(1000)
},
stale_peer_check_tick: if cfg!(feature = "test") { 1 } else { 10 },
clean_stale_ranges_tick: if cfg!(feature = "test") { 1 } else { 10 },
lock_cf_compact_interval: ReadableDuration::minutes(10),
lock_cf_compact_bytes_threshold: ReadableSize::mb(256),
// Disable consistency check by default as it will hurt performance.
Expand Down Expand Up @@ -834,10 +833,6 @@ impl Config {
.with_label_values(&["snap_apply_batch_size"])
.set(self.snap_apply_batch_size.0 as f64);

CONFIG_RAFTSTORE_GAUGE
.with_label_values(&["snap_handle_pool_size"])
.set(self.snap_handle_pool_size as f64);

CONFIG_RAFTSTORE_GAUGE
.with_label_values(&["store_batch_retry_recv_timeout"])
.set(self.store_batch_retry_recv_timeout.as_millis() as f64 / 1000.0);
Expand Down
2 changes: 1 addition & 1 deletion components/raftstore/src/store/fsm/apply.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3170,7 +3170,7 @@ impl GenSnapTask {
region_id: self.region_id,
notifier: self.snap_notifier,
for_balance: self.for_balance,
last_applied_index_term,
last_applied_term: last_applied_index_term,
last_applied_state,
canceled: self.canceled,
// This snapshot may be held for a long time, which may cause too many
Expand Down
7 changes: 1 addition & 6 deletions components/raftstore/src/store/fsm/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1443,14 +1443,9 @@ impl<EK: KvEngine, ER: RaftEngine> RaftBatchSystem<EK, ER> {
};
mgr.init()?;
let region_runner = RegionRunner::new(
&*cfg.value(),
engines.kv.clone(),
mgr.clone(),
cfg.value().snap_apply_batch_size.0 as usize,
cfg.value().region_worker_tick_interval.as_millis(),
cfg.value().stale_peer_check_tick,
cfg.value().use_delete_range,
cfg.value().snap_generator_pool_size,
cfg.clone(),
workers.coprocessor_host.clone(),
self.router(),
Some(Arc::clone(&pd_client)),
Expand Down
20 changes: 10 additions & 10 deletions components/raftstore/src/store/peer_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2095,7 +2095,10 @@ mod tests {
bootstrap_store,
fsm::apply::compact_raft_log,
initial_region, prepare_bootstrap_cluster,
worker::{RaftlogFetchRunner, RegionRunner, RegionTask},
worker::{
make_region_worker_raftstore_cfg, FetchedLogs, LogFetchedNotifier,
RaftlogFetchRunner, RegionRunner, RegionTask,
},
},
};

Expand Down Expand Up @@ -2809,12 +2812,11 @@ mod tests {
let (dummy_scheduler, _) = dummy_scheduler();
let mut s = new_storage_from_ents(sched.clone(), dummy_scheduler, &td, &ents);
let (router, _) = mpsc::sync_channel(100);
let cfg = make_region_worker_raftstore_cfg(true);
let runner = RegionRunner::new(
s.engines.kv.clone(),
mgr,
0,
true,
2,
cfg,
CoprocessorHost::<KvTestEngine>::default(),
router,
Option::<Arc<TestPdClient>>::None,
Expand Down Expand Up @@ -2958,12 +2960,11 @@ mod tests {
let store = new_store(1, labels);
pd_client.add_store(store);
let pd_mock = Arc::new(pd_client);
let cfg = make_region_worker_raftstore_cfg(true);
let runner = RegionRunner::new(
s.engines.kv.clone(),
mgr,
0,
true,
2,
cfg,
CoprocessorHost::<KvTestEngine>::default(),
router,
Some(pd_mock),
Expand Down Expand Up @@ -3264,12 +3265,11 @@ mod tests {
let (dummy_scheduler, _) = dummy_scheduler();
let s1 = new_storage_from_ents(sched.clone(), dummy_scheduler.clone(), &td1, &ents);
let (router, _) = mpsc::sync_channel(100);
let cfg = make_region_worker_raftstore_cfg(true);
let runner = RegionRunner::new(
s1.engines.kv.clone(),
mgr,
0,
true,
2,
cfg,
CoprocessorHost::<KvTestEngine>::default(),
router,
Option::<Arc<TestPdClient>>::None,
Expand Down
2 changes: 1 addition & 1 deletion components/raftstore/src/store/snap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1213,7 +1213,7 @@ impl Snapshot {
}

pub fn cf_files(&self) -> &[CfFile] {
self.cf_files.as_slice()
&self.cf_files
}
}

Expand Down
2 changes: 2 additions & 0 deletions components/raftstore/src/store/worker/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ mod split_check;
mod split_config;
mod split_controller;

#[cfg(test)]
pub use self::region::tests::make_raftstore_cfg as make_region_worker_raftstore_cfg;
pub use self::{
check_leader::{Runner as CheckLeaderRunner, Task as CheckLeaderTask},
cleanup::{Runner as CleanupRunner, Task as CleanupTask},
Expand Down
Loading