diff --git a/components/engine_panic/src/misc.rs b/components/engine_panic/src/misc.rs index 9a5cc310fc3..cacf08af6f5 100644 --- a/components/engine_panic/src/misc.rs +++ b/components/engine_panic/src/misc.rs @@ -34,10 +34,6 @@ impl MiscExt for PanicEngine { panic!() } - fn roughly_cleanup_ranges(&self, ranges: &[(Vec, Vec)]) -> Result<()> { - panic!() - } - fn path(&self) -> &str { panic!() } diff --git a/components/engine_rocks/src/misc.rs b/components/engine_rocks/src/misc.rs index 0ae93fe34df..553ab14a403 100644 --- a/components/engine_rocks/src/misc.rs +++ b/components/engine_rocks/src/misc.rs @@ -4,7 +4,6 @@ use engine_traits::{ CFNamesExt, DeleteStrategy, ImportExt, IterOptions, Iterable, Iterator, MiscExt, Mutable, Range, Result, SstWriter, SstWriterBuilder, WriteBatch, WriteBatchExt, ALL_CFS, }; -use rocksdb::Range as RocksRange; use tikv_util::{box_try, keybuilder::KeyBuilder}; use crate::{ @@ -28,17 +27,6 @@ impl RocksEngine { ) -> Result<()> { let mut ranges = ranges.to_owned(); ranges.sort_by(|a, b| a.start_key.cmp(b.start_key)); - let max_end_key = ranges - .iter() - .fold(ranges[0].end_key, |x, y| std::cmp::max(x, y.end_key)); - let start = KeyBuilder::from_slice(ranges[0].start_key, 0, 0); - let end = KeyBuilder::from_slice(max_end_key, 0, 0); - let mut opts = IterOptions::new(Some(start), Some(end), false); - if self.is_titan() { - // Cause DeleteFilesInRange may expose old blob index keys, setting key only for Titan - // to avoid referring to missing blob files. - opts.set_key_only(true); - } let mut writer_wrapper: Option = None; let mut data: Vec> = vec![]; @@ -54,7 +42,17 @@ impl RocksEngine { } last_end_key = Some(r.end_key.to_owned()); - let mut it = self.iterator_cf_opt(cf, opts.clone())?; + let mut opts = IterOptions::new( + Some(KeyBuilder::from_slice(r.start_key, 0, 0)), + Some(KeyBuilder::from_slice(r.end_key, 0, 0)), + false, + ); + if self.is_titan() { + // Cause DeleteFilesInRange may expose old blob index keys, setting key only for + // Titan to avoid referring to missing blob files. + opts.set_key_only(true); + } + let mut it = self.iterator_cf_opt(cf, opts)?; let mut it_valid = it.seek(r.start_key.into())?; while it_valid { if it.key() >= r.end_key { @@ -225,28 +223,6 @@ impl MiscExt for RocksEngine { Ok(used_size) } - fn roughly_cleanup_ranges(&self, ranges: &[(Vec, Vec)]) -> Result<()> { - let db = self.as_inner(); - let mut delete_ranges = Vec::new(); - for &(ref start, ref end) in ranges { - if start == end { - continue; - } - assert!(start < end); - delete_ranges.push(RocksRange::new(start, end)); - } - if delete_ranges.is_empty() { - return Ok(()); - } - - for cf in db.cf_names() { - let handle = util::get_cf_handle(db, cf)?; - db.delete_files_in_ranges_cf(handle, &delete_ranges, /* include_end */ false)?; - } - - Ok(()) - } - fn path(&self) -> &str { self.as_inner().path() } @@ -364,13 +340,9 @@ mod tests { } } - fn test_delete_all_in_range( - strategy: DeleteStrategy, - origin_keys: &[Vec], - ranges: &[Range<'_>], - ) { + fn test_delete_ranges(strategy: DeleteStrategy, origin_keys: &[Vec], ranges: &[Range<'_>]) { let path = Builder::new() - .prefix("engine_delete_all_in_range") + .prefix("engine_delete_ranges") .tempdir() .unwrap(); let path_str = path.path().to_str().unwrap(); @@ -406,8 +378,7 @@ mod tests { wb.write().unwrap(); check_data(&db, ALL_CFS, kvs.as_slice()); - // Delete all in ranges. - db.delete_all_in_range(strategy, ranges).unwrap(); + db.delete_ranges_cfs(strategy, ranges).unwrap(); let mut kvs_left: Vec<_> = kvs; for r in ranges { @@ -429,25 +400,25 @@ mod tests { b"k4".to_vec(), ]; // Single range. - test_delete_all_in_range( + test_delete_ranges( DeleteStrategy::DeleteByRange, &data, &[Range::new(b"k1", b"k4")], ); // Two ranges without overlap. - test_delete_all_in_range( + test_delete_ranges( DeleteStrategy::DeleteByRange, &data, &[Range::new(b"k0", b"k1"), Range::new(b"k3", b"k4")], ); // Two ranges with overlap. - test_delete_all_in_range( + test_delete_ranges( DeleteStrategy::DeleteByRange, &data, &[Range::new(b"k1", b"k3"), Range::new(b"k2", b"k4")], ); // One range contains the other range. - test_delete_all_in_range( + test_delete_ranges( DeleteStrategy::DeleteByRange, &data, &[Range::new(b"k1", b"k4"), Range::new(b"k2", b"k3")], @@ -464,25 +435,25 @@ mod tests { b"k4".to_vec(), ]; // Single range. - test_delete_all_in_range( + test_delete_ranges( DeleteStrategy::DeleteByKey, &data, &[Range::new(b"k1", b"k4")], ); // Two ranges without overlap. - test_delete_all_in_range( + test_delete_ranges( DeleteStrategy::DeleteByKey, &data, &[Range::new(b"k0", b"k1"), Range::new(b"k3", b"k4")], ); // Two ranges with overlap. - test_delete_all_in_range( + test_delete_ranges( DeleteStrategy::DeleteByKey, &data, &[Range::new(b"k1", b"k3"), Range::new(b"k2", b"k4")], ); // One range contains the other range. - test_delete_all_in_range( + test_delete_ranges( DeleteStrategy::DeleteByKey, &data, &[Range::new(b"k1", b"k4"), Range::new(b"k2", b"k3")], @@ -501,7 +472,7 @@ mod tests { for i in 1000..5000 { data.push(i.to_string().as_bytes().to_vec()); } - test_delete_all_in_range( + test_delete_ranges( DeleteStrategy::DeleteByWriter { sst_path }, &data, &[ @@ -550,9 +521,9 @@ mod tests { } check_data(&db, ALL_CFS, kvs.as_slice()); - db.delete_all_in_range(DeleteStrategy::DeleteFiles, &[Range::new(b"k2", b"k4")]) + db.delete_ranges_cfs(DeleteStrategy::DeleteFiles, &[Range::new(b"k2", b"k4")]) .unwrap(); - db.delete_all_in_range(DeleteStrategy::DeleteBlobs, &[Range::new(b"k2", b"k4")]) + db.delete_ranges_cfs(DeleteStrategy::DeleteBlobs, &[Range::new(b"k2", b"k4")]) .unwrap(); check_data(&db, ALL_CFS, kvs_left.as_slice()); } @@ -598,7 +569,7 @@ mod tests { check_data(&db, &[cf], kvs.as_slice()); // Delete all in ["k2", "k4"). - db.delete_all_in_range( + db.delete_ranges_cfs( DeleteStrategy::DeleteByRange, &[Range::new(b"kabcdefg2", b"kabcdefg4")], ) diff --git a/components/engine_traits/src/misc.rs b/components/engine_traits/src/misc.rs index bc2c3a2b547..a95d381fdb5 100644 --- a/components/engine_traits/src/misc.rs +++ b/components/engine_traits/src/misc.rs @@ -11,8 +11,18 @@ use crate::{ #[derive(Clone, Debug)] pub enum DeleteStrategy { - /// Delete the SST files that are fullly fit in range. However, the SST files that are partially - /// overlapped with the range will not be touched. + /// Delete the SST files that are fullly fit in range. However, the SST + /// files that are partially overlapped with the range will not be + /// touched. + /// + /// Note: + /// - After this operation, some keys in the range might still exist in + /// the database. + /// - After this operation, some keys in the range might be removed from + /// existing snapshot, so you shouldn't expect to be able to read data + /// from the range using existing snapshots any more. + /// + /// Ref: DeleteFiles, /// Delete the data stored in Titan. DeleteBlobs, @@ -30,7 +40,7 @@ pub trait MiscExt: CFNamesExt + FlowControlFactorsExt { fn flush_cf(&self, cf: &str, sync: bool) -> Result<()>; - fn delete_all_in_range(&self, strategy: DeleteStrategy, ranges: &[Range<'_>]) -> Result<()> { + fn delete_ranges_cfs(&self, strategy: DeleteStrategy, ranges: &[Range<'_>]) -> Result<()> { for cf in self.cf_names() { self.delete_ranges_cf(cf, strategy.clone(), ranges)?; } @@ -56,17 +66,6 @@ pub trait MiscExt: CFNamesExt + FlowControlFactorsExt { /// fn get_engine_used_size(&self) -> Result; - /// Roughly deletes files in multiple ranges. - /// - /// Note: - /// - After this operation, some keys in the range might still exist in the database. - /// - After this operation, some keys in the range might be removed from existing snapshot, - /// so you shouldn't expect to be able to read data from the range using existing snapshots - /// any more. - /// - /// Ref: - fn roughly_cleanup_ranges(&self, ranges: &[(Vec, Vec)]) -> Result<()>; - /// The path to the directory on the filesystem where the database is stored fn path(&self) -> &str; diff --git a/components/raftstore/src/store/fsm/store.rs b/components/raftstore/src/store/fsm/store.rs index b8e22135a61..066f53276a6 100644 --- a/components/raftstore/src/store/fsm/store.rs +++ b/components/raftstore/src/store/fsm/store.rs @@ -1221,8 +1221,14 @@ impl RaftPollerBuilder { last_start_key = keys::enc_end_key(region); } ranges.push((last_start_key, keys::DATA_MAX_KEY.to_vec())); + let ranges: Vec<_> = ranges + .iter() + .map(|(start, end)| Range::new(start, end)) + .collect(); - self.engines.kv.roughly_cleanup_ranges(&ranges)?; + self.engines + .kv + .delete_ranges_cfs(DeleteStrategy::DeleteFiles, &ranges)?; info!( "cleans up garbage data"; @@ -2779,7 +2785,7 @@ impl<'a, EK: KvEngine, ER: RaftEngine, T: Transport> StoreFsmDelegate<'a, EK, ER } drop(meta); - if let Err(e) = self.ctx.engines.kv.delete_all_in_range( + if let Err(e) = self.ctx.engines.kv.delete_ranges_cfs( DeleteStrategy::DeleteByKey, &[Range::new(&start_key, &end_key)], ) { diff --git a/components/raftstore/src/store/worker/region.rs b/components/raftstore/src/store/worker/region.rs index 0ac92103129..a5257578aa5 100644 --- a/components/raftstore/src/store/worker/region.rs +++ b/components/raftstore/src/store/worker/region.rs @@ -48,11 +48,10 @@ use crate::{ }, }; -// used to periodically check whether we should delete a stale peer's range in region runner - +// used to periodically check whether we should delete a stale peer's range in +// region runner #[cfg(test)] pub const STALE_PEER_CHECK_TICK: usize = 1; // 1000 milliseconds - #[cfg(not(test))] pub const STALE_PEER_CHECK_TICK: usize = 10; // 10000 milliseconds @@ -86,7 +85,8 @@ pub enum Task { }, /// Destroy data between [start_key, end_key). /// - /// The deletion may and may not succeed. + /// The actual deletion may be delayed if the engine is overloaded or a + /// reader is still referencing the data. Destroy { region_id: u64, start_key: Vec, @@ -131,8 +131,8 @@ struct StalePeerInfo { pub region_id: u64, pub end_key: Vec, // Once the oldest snapshot sequence exceeds this, it ensures that no one is - // reading on this peer anymore. So we can safely call `delete_files_in_range` - // , which may break the consistency of snapshot, of this peer range. + // reading on this peer anymore. So we can safely call `delete_files_in_range`, + // which may break the consistency of snapshot, of this peer range. pub stale_sequence: u64, } @@ -202,22 +202,29 @@ impl PendingDeleteRanges { /// Inserts a new range waiting to be deleted. /// - /// Before an insert is called, it must call drain_overlap_ranges to clean the overlapping range. - fn insert(&mut self, region_id: u64, start_key: &[u8], end_key: &[u8], stale_sequence: u64) { - if !self.find_overlap_ranges(start_key, end_key).is_empty() { + /// Before an insert is called, it must call drain_overlap_ranges to clean + /// the overlapping range. + fn insert( + &mut self, + region_id: u64, + start_key: Vec, + end_key: Vec, + stale_sequence: u64, + ) { + if !self.find_overlap_ranges(&start_key, &end_key).is_empty() { panic!( "[region {}] register deleting data in [{}, {}) failed due to overlap", region_id, - log_wrappers::Value::key(start_key), - log_wrappers::Value::key(end_key), + log_wrappers::Value::key(&start_key), + log_wrappers::Value::key(&end_key), ); } let info = StalePeerInfo { region_id, - end_key: end_key.to_owned(), + end_key, stale_sequence, }; - self.ranges.insert(start_key.to_owned(), info); + self.ranges.insert(start_key, info); } /// Gets all stale ranges info. @@ -239,21 +246,13 @@ impl PendingDeleteRanges { } } -#[derive(Clone)] -struct SnapContext -where - EK: KvEngine, -{ +struct SnapGenContext { engine: EK, - batch_size: usize, mgr: SnapManager, - use_delete_range: bool, - pending_delete_ranges: PendingDeleteRanges, - coprocessor_host: CoprocessorHost, router: R, } -impl SnapContext +impl SnapGenContext where EK: KvEngine, R: CasualRouter, @@ -341,6 +340,74 @@ where .generate .observe(start.saturating_elapsed_secs()); } +} + +pub struct Runner +where + EK: KvEngine, + T: PdClient + 'static, +{ + batch_size: usize, + use_delete_range: bool, + clean_stale_tick: usize, + clean_stale_check_interval: Duration, + + tiflash_stores: HashMap, + // we may delay some apply tasks if level 0 files to write stall threshold, + // pending_applies records all delayed apply task, and will check again later + pending_applies: VecDeque>, + // Ranges that have been logically destroyed at a specific sequence number. We can + // assume there will be no reader (engine snapshot) newer than that sequence number. Therefore, + // they can be physically deleted with `DeleteFiles` when we're sure there is no older + // reader as well. + // To protect this assumption, before a new snapshot is applied, the overlapping pending ranges + // must first be removed. + // The sole purpose of maintaining this list is to optimize deletion with `DeleteFiles` + // whenever we can. Errors while processing them can be ignored. + pending_delete_ranges: PendingDeleteRanges, + + engine: EK, + mgr: SnapManager, + coprocessor_host: CoprocessorHost, + router: R, + pd_client: Option>, + pool: ThreadPool, +} + +impl Runner +where + EK: KvEngine, + R: CasualRouter, + T: PdClient + 'static, +{ + pub fn new( + engine: EK, + mgr: SnapManager, + batch_size: usize, + use_delete_range: bool, + snap_generator_pool_size: usize, + coprocessor_host: CoprocessorHost, + router: R, + pd_client: Option>, + ) -> Runner { + Runner { + batch_size, + use_delete_range, + clean_stale_tick: 0, + clean_stale_check_interval: Duration::from_millis(PENDING_APPLY_CHECK_INTERVAL), + tiflash_stores: HashMap::default(), + pending_applies: VecDeque::new(), + pending_delete_ranges: PendingDeleteRanges::default(), + engine, + mgr, + coprocessor_host, + router, + pd_client, + pool: Builder::new(thd_name!("snap-generator")) + .max_thread_count(snap_generator_pool_size) + .build_future_pool(), + } + } /// Applies snapshot data of the Region. fn apply_snap(&mut self, region_id: u64, abort: Arc) -> Result<()> { @@ -364,16 +431,7 @@ where let start_key = keys::enc_start_key(®ion); let end_key = keys::enc_end_key(®ion); check_abort(&abort)?; - let overlap_ranges = self - .pending_delete_ranges - .drain_overlap_ranges(&start_key, &end_key); - if !overlap_ranges.is_empty() { - CLEAN_COUNTER_VEC - .with_label_values(&["overlap-with-apply"]) - .inc(); - self.cleanup_overlap_regions(overlap_ranges)?; - } - self.delete_all_in_range(&[Range::new(&start_key, &end_key)])?; + self.clean_overlap_ranges(start_key, end_key)?; check_abort(&abort)?; fail_point!("apply_snap_cleanup_range"); @@ -464,79 +522,77 @@ where let _ = self.router.send(region_id, CasualMessage::SnapshotApplied); } - /// Cleans up the data within the range. - fn cleanup_range(&self, ranges: &[Range<'_>]) -> Result<()> { - self.engine - .delete_all_in_range(DeleteStrategy::DeleteFiles, ranges) - .unwrap_or_else(|e| { - error!("failed to delete files in range"; "err" => %e); - }); - self.delete_all_in_range(ranges)?; - self.engine - .delete_all_in_range(DeleteStrategy::DeleteBlobs, ranges) - .unwrap_or_else(|e| { - error!("failed to delete files in range"; "err" => %e); - }); - Ok(()) - } - - /// Gets the overlapping ranges and cleans them up. - fn cleanup_overlap_regions( + /// Tries to clean up files in pending ranges overlapping with the given + /// bounds. These pending ranges will be removed. Returns an updated range + /// that also includes these ranges. Caller must ensure the remaining keys + /// in the returning range will be deleted properly. + fn clean_overlap_ranges_roughly( &mut self, - overlap_ranges: Vec<(u64, Vec, Vec, u64)>, - ) -> Result<()> { + mut start_key: Vec, + mut end_key: Vec, + ) -> (Vec, Vec) { + let overlap_ranges = self + .pending_delete_ranges + .drain_overlap_ranges(&start_key, &end_key); + if overlap_ranges.is_empty() { + return (start_key, end_key); + } + CLEAN_COUNTER_VEC.with_label_values(&["overlap"]).inc(); let oldest_sequence = self .engine .get_oldest_snapshot_sequence_number() .unwrap_or(u64::MAX); - let mut ranges = Vec::with_capacity(overlap_ranges.len()); - let mut df_ranges = Vec::with_capacity(overlap_ranges.len()); - for (region_id, start_key, end_key, stale_sequence) in overlap_ranges.iter() { - // `DeleteFiles` may break current rocksdb snapshots consistency, - // so do not use it unless we can make sure there is no reader of the destroyed peer anymore. - if *stale_sequence < oldest_sequence { - df_ranges.push(Range::new(start_key, end_key)); - } else { - SNAP_COUNTER_VEC - .with_label_values(&["overlap", "not_delete_files"]) - .inc(); - } - info!("delete data in range because of overlap"; "region_id" => region_id, - "start_key" => log_wrappers::Value::key(start_key), - "end_key" => log_wrappers::Value::key(end_key)); - ranges.push(Range::new(start_key, end_key)); - } + let df_ranges: Vec<_> = overlap_ranges + .iter() + .filter_map(|(region_id, cur_start, cur_end, stale_sequence)| { + info!( + "delete data in range because of overlap"; "region_id" => region_id, + "start_key" => log_wrappers::Value::key(cur_start), + "end_key" => log_wrappers::Value::key(cur_end) + ); + if &start_key > cur_start { + start_key = cur_start.clone(); + } + if &end_key < cur_end { + end_key = cur_end.clone(); + } + if *stale_sequence < oldest_sequence { + Some(Range::new(cur_start, cur_end)) + } else { + SNAP_COUNTER_VEC + .with_label_values(&["overlap", "not_delete_files"]) + .inc(); + None + } + }) + .collect(); self.engine - .delete_all_in_range(DeleteStrategy::DeleteFiles, &df_ranges) + .delete_ranges_cfs(DeleteStrategy::DeleteFiles, &df_ranges) .unwrap_or_else(|e| { error!("failed to delete files in range"; "err" => %e); }); + (start_key, end_key) + } - self.delete_all_in_range(&ranges) + /// Cleans up data in the given range and all pending ranges overlapping + /// with it. + fn clean_overlap_ranges(&mut self, start_key: Vec, end_key: Vec) -> Result<()> { + let (start_key, end_key) = self.clean_overlap_ranges_roughly(start_key, end_key); + self.delete_all_in_range(&[Range::new(&start_key, &end_key)]) } /// Inserts a new pending range, and it will be cleaned up with some delay. - fn insert_pending_delete_range(&mut self, region_id: u64, start_key: &[u8], end_key: &[u8]) { - let overlap_ranges = self - .pending_delete_ranges - .drain_overlap_ranges(start_key, end_key); - if !overlap_ranges.is_empty() { - CLEAN_COUNTER_VEC - .with_label_values(&["overlap-with-destroy"]) - .inc(); - if let Err(e) = self.cleanup_overlap_regions(overlap_ranges) { - warn!("cleanup_overlap_ranges failed"; - "region_id" => region_id, - "start_key" => log_wrappers::Value::key(start_key), - "end_key" => log_wrappers::Value::key(end_key), - "err" => %e, - ); - } - } + fn insert_pending_delete_range( + &mut self, + region_id: u64, + start_key: Vec, + end_key: Vec, + ) { + let (start_key, end_key) = self.clean_overlap_ranges_roughly(start_key, end_key); info!("register deleting data in range"; "region_id" => region_id, - "start_key" => log_wrappers::Value::key(start_key), - "end_key" => log_wrappers::Value::key(end_key), + "start_key" => log_wrappers::Value::key(&start_key), + "end_key" => log_wrappers::Value::key(&end_key), ); let seq = self.engine.get_latest_sequence_number(); self.pending_delete_ranges @@ -553,33 +609,43 @@ where .engine .get_oldest_snapshot_sequence_number() .unwrap_or(u64::MAX); - let mut cleanup_ranges: Vec<(u64, Vec, Vec)> = self + let mut region_ranges: Vec<(u64, Vec, Vec)> = self .pending_delete_ranges .stale_ranges(oldest_sequence) .map(|(region_id, s, e)| (region_id, s.to_vec(), e.to_vec())) .collect(); - if cleanup_ranges.is_empty() { + if region_ranges.is_empty() { return; } CLEAN_COUNTER_VEC.with_label_values(&["destroy"]).inc_by(1); - cleanup_ranges.sort_by(|a, b| a.1.cmp(&b.1)); - while cleanup_ranges.len() > CLEANUP_MAX_REGION_COUNT { - cleanup_ranges.pop(); - } - let ranges: Vec> = cleanup_ranges + region_ranges.sort_by(|a, b| a.1.cmp(&b.1)); + region_ranges.truncate(CLEANUP_MAX_REGION_COUNT); + let ranges: Vec<_> = region_ranges .iter() .map(|(region_id, start, end)| { info!("delete data in range because of stale"; "region_id" => region_id, - "start_key" => log_wrappers::Value::key(start), - "end_key" => log_wrappers::Value::key(end)); + "start_key" => log_wrappers::Value::key(start), + "end_key" => log_wrappers::Value::key(end)); Range::new(start, end) }) .collect(); - if let Err(e) = self.cleanup_range(&ranges) { + + self.engine + .delete_ranges_cfs(DeleteStrategy::DeleteFiles, &ranges) + .unwrap_or_else(|e| { + error!("failed to delete files in range"; "err" => %e); + }); + if let Err(e) = self.delete_all_in_range(&ranges) { error!("failed to cleanup stale range"; "err" => %e); return; } - for (_, key, _) in cleanup_ranges { + self.engine + .delete_ranges_cfs(DeleteStrategy::DeleteBlobs, &ranges) + .unwrap_or_else(|e| { + error!("failed to delete blobs in range"; "err" => %e); + }); + + for (_, key, _) in region_ranges { assert!( self.pending_delete_ranges.remove(&key).is_some(), "cleanup pending_delete_ranges {} should exist", @@ -620,72 +686,19 @@ where Ok(()) } -} - -pub struct Runner -where - EK: KvEngine, - T: PdClient + 'static, -{ - pool: ThreadPool, - ctx: SnapContext, - // we may delay some apply tasks if level 0 files to write stall threshold, - // pending_applies records all delayed apply task, and will check again later - pending_applies: VecDeque>, - clean_stale_tick: usize, - clean_stale_check_interval: Duration, - tiflash_stores: HashMap, - pd_client: Option>, -} - -impl Runner -where - EK: KvEngine, - R: CasualRouter, - T: PdClient + 'static, -{ - pub fn new( - engine: EK, - mgr: SnapManager, - batch_size: usize, - use_delete_range: bool, - snap_generator_pool_size: usize, - coprocessor_host: CoprocessorHost, - router: R, - pd_client: Option>, - ) -> Runner { - Runner { - pool: Builder::new(thd_name!("snap-generator")) - .max_thread_count(snap_generator_pool_size) - .build_future_pool(), - ctx: SnapContext { - engine, - mgr, - batch_size, - use_delete_range, - pending_delete_ranges: PendingDeleteRanges::default(), - coprocessor_host, - router, - }, - pending_applies: VecDeque::new(), - clean_stale_tick: 0, - clean_stale_check_interval: Duration::from_millis(PENDING_APPLY_CHECK_INTERVAL), - tiflash_stores: HashMap::default(), - pd_client, - } - } /// Tries to apply pending tasks if there is some. fn handle_pending_applies(&mut self) { fail_point!("apply_pending_snapshot", |_| {}); while !self.pending_applies.is_empty() { - // should not handle too many applies than the number of files that can be ingested. - // check level 0 every time because we can not make sure how does the number of level 0 files change. - if self.ctx.ingest_maybe_stall() { + // should not handle too many applies than the number of files that can be + // ingested. check level 0 every time because we can not make sure + // how does the number of level 0 files change. + if self.ingest_maybe_stall() { break; } if let Some(Task::Apply { region_id, status }) = self.pending_applies.pop_front() { - self.ctx.handle_apply(region_id, status); + self.handle_apply(region_id, status); } } } @@ -713,7 +726,6 @@ where } => { // It is safe for now to handle generating and applying snapshot concurrently, // but it may not when merge is implemented. - let ctx = self.ctx.clone(); let mut allow_multi_files_snapshot = false; // if to_store_id is 0, it means the to_store_id cannot be found if to_store_id != 0 { @@ -738,6 +750,11 @@ where } } + let ctx = SnapGenContext { + engine: self.engine.clone(), + mgr: self.mgr.clone(), + router: self.router.clone(), + }; self.pool.spawn(async move { tikv_alloc::add_thread_memory_accessor(); ctx.handle_gen( @@ -771,9 +788,8 @@ where fail_point!("on_region_worker_destroy", true, |_| {}); // try to delay the range deletion because // there might be a coprocessor request related to this range - self.ctx - .insert_pending_delete_range(region_id, &start_key, &end_key); - self.ctx.clean_stale_ranges(); + self.insert_pending_delete_range(region_id, start_key, end_key); + self.clean_stale_ranges(); } } } @@ -793,7 +809,7 @@ where self.handle_pending_applies(); self.clean_stale_tick += 1; if self.clean_stale_tick >= STALE_PEER_CHECK_TICK { - self.ctx.clean_stale_ranges(); + self.clean_stale_ranges(); self.clean_stale_tick = 0; } } @@ -842,7 +858,12 @@ mod tests { e: &str, stale_sequence: u64, ) { - pending_delete_ranges.insert(id, s.as_bytes(), e.as_bytes(), stale_sequence); + pending_delete_ranges.insert( + id, + s.as_bytes().to_owned(), + e.as_bytes().to_owned(), + stale_sequence, + ); } #[test] diff --git a/tests/integrations/storage/test_titan.rs b/tests/integrations/storage/test_titan.rs index cfc250a8e15..1126288ad39 100644 --- a/tests/integrations/storage/test_titan.rs +++ b/tests/integrations/storage/test_titan.rs @@ -310,11 +310,11 @@ fn test_delete_files_in_range_for_titan() { // blob4: (b_7, b_value) // `delete_files_in_range` may expose some old keys. - // For Titan it may encounter `missing blob file` in `delete_all_in_range`, + // For Titan it may encounter `missing blob file` in `delete_ranges_cfs`, // so we set key_only for Titan. engines .kv - .delete_all_in_range( + .delete_ranges_cfs( DeleteStrategy::DeleteFiles, &[Range::new( &data_key(Key::from_raw(b"a").as_encoded()), @@ -324,7 +324,7 @@ fn test_delete_files_in_range_for_titan() { .unwrap(); engines .kv - .delete_all_in_range( + .delete_ranges_cfs( DeleteStrategy::DeleteByKey, &[Range::new( &data_key(Key::from_raw(b"a").as_encoded()), @@ -334,7 +334,7 @@ fn test_delete_files_in_range_for_titan() { .unwrap(); engines .kv - .delete_all_in_range( + .delete_ranges_cfs( DeleteStrategy::DeleteBlobs, &[Range::new( &data_key(Key::from_raw(b"a").as_encoded()),