Skip to content

Commit

Permalink
br-stream: use raft router to apply kv files for sst_importer
Browse files Browse the repository at this point in the history
Signed-off-by: 3pointer <[email protected]>
  • Loading branch information
3pointer committed Feb 24, 2022
1 parent 43a0d8b commit 6b09a8b
Show file tree
Hide file tree
Showing 7 changed files with 173 additions and 81 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion components/br-stream/src/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ where
async fn starts_flush_ticks(router: Router) {
let ticker = tick(Duration::from_secs(FLUSH_STORAGE_INTERVAL / 5));
loop {
// wait 10s to trigger tick
// wait 1min to trigger tick
let _ = ticker.recv().unwrap();
debug!("backup stream trigger flush tick");
router.tick().await;
Expand Down
1 change: 1 addition & 0 deletions components/br-stream/src/event_loader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ impl<S: Snapshot> EventLoader<S> {
/// Like [`cdc::Initializer`], but supports initialize over range.
/// Note: maybe we can merge those two structures?
#[derive(Clone)]
#[allow(dead_code)]
pub struct InitialDataLoader<E, R, RT> {
router: RT,
regions: R,
Expand Down
1 change: 0 additions & 1 deletion components/br-stream/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
// Copyright 2021 TiKV Project Authors. Licensed under Apache-2.0.
#![feature(inherent_ascii_escape)]
pub mod config;
mod endpoint;
pub mod errors;
Expand Down
3 changes: 2 additions & 1 deletion components/br-stream/src/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,7 @@ impl RouterInner {
}

pub async fn unregister_task(&self, task_name: &str) {
if let Some(_) = self.tasks.lock().await.remove(task_name) {
if self.tasks.lock().await.remove(task_name).is_some() {
info!(
"backup stream unregister task";
"task" => task_name,
Expand Down Expand Up @@ -985,6 +985,7 @@ impl std::fmt::Debug for DataFile {
struct KeyRange(Vec<u8>);

#[derive(Clone, Debug)]
#[allow(dead_code)]
struct TaskRange {
end: Vec<u8>,
task_name: String,
Expand Down
83 changes: 27 additions & 56 deletions components/sst_importer/src/sst_importer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@ use kvproto::import_sstpb::*;
use encryption::{encryption_method_to_db_encryption_method, DataKeyManager};
use engine_rocks::{get_env, RocksSstReader};
use engine_traits::{
name_to_cf, CfName, EncryptionKeyManager, FileEncryptionInfo, Iterator, KvEngine, Mutable,
SSTMetaInfo, SeekKey, SstCompressionType, SstExt, SstReader, SstWriter, SstWriterBuilder,
WriteBatch, CF_DEFAULT, CF_WRITE,
name_to_cf, CfName, EncryptionKeyManager, FileEncryptionInfo, Iterator, KvEngine, SSTMetaInfo,
SeekKey, SstCompressionType, SstExt, SstReader, SstWriter, SstWriterBuilder, CF_DEFAULT,
CF_WRITE,
};
use file_system::{get_io_rate_limiter, OpenOptions};
use kvproto::kvrpcpb::ApiVersion;
Expand Down Expand Up @@ -141,38 +141,6 @@ impl SSTImporter {
self.dir.exist(meta).unwrap_or(false)
}

// Donwloads and apply a KV file from an external storage.
pub fn apply<E: KvEngine>(
&self,
meta: &KvMeta,
backend: &StorageBackend,
rewrite_rule: &RewriteRule,
speed_limiter: Limiter,
engine: E,
) -> Result<Option<Range>> {
debug!("apply start";
"url" => ?backend,
"meta" => ?meta,
"rewrite_rule" => ?rewrite_rule,
);
match self.do_download_and_apply::<E>(
meta,
backend,
rewrite_rule,
&speed_limiter,
engine,
) {
Ok(r) => {
info!("apply"; "meta" => ?meta, "range" => ?r);
Ok(r)
}
Err(e) => {
error!(%e; "apply failed"; "meta" => ?meta,);
Err(e)
}
}
}

// Downloads an SST file from an external storage.
//
// This method is blocking. It performs the following transformations before
Expand Down Expand Up @@ -294,20 +262,17 @@ impl SSTImporter {
Ok(())
}

fn do_download_and_apply<E: KvEngine>(
pub fn do_download_kv_file(
&self,
meta: &KvMeta,
backend: &StorageBackend,
rewrite_rule: &RewriteRule,
speed_limiter: &Limiter,
engine: E,
) -> Result<Option<Range>> {
) -> Result<PathBuf> {
let name = meta.get_name();
let cf = meta.get_cf();
let path = self.dir.get_import_path(name)?;
let start = Instant::now();
self.download_file_from_external_storage(
// current length is 0. which means won't check the file length.
// don't check file length after download file for now.
meta.get_length(),
name,
path.temp.clone(),
Expand All @@ -322,8 +287,18 @@ impl SSTImporter {
.with_label_values(&["download"])
.observe(start.saturating_elapsed().as_secs_f64());

// iterator `path.temp` file and performs rewrites and apply.
let file = File::open(path.temp)?;
Ok(path.temp)
}

pub fn do_apply_kv_file<P: AsRef<Path>>(
&self,
restore_ts: u64,
file_path: P,
rewrite_rule: &RewriteRule,
build_fn: &mut dyn FnMut(Vec<u8>, Vec<u8>),
) -> Result<Option<Range>> {
// iterator file and performs rewrites and apply.
let file = File::open(file_path)?;
let mut reader = BufReader::new(file);
let mut buffer = Vec::new();
reader.read_to_end(&mut buffer)?;
Expand All @@ -336,13 +311,12 @@ impl SSTImporter {
let perform_rewrite = old_prefix != new_prefix;

// perform iteration and key rewrite.
let mut key = keys::data_key(new_prefix);
let mut key = new_prefix.to_vec();
let new_prefix_data_key_len = key.len();
let mut smallest_key = None;
let mut largest_key = None;

let start = Instant::now();
let mut wb = engine.write_batch();
loop {
if !event_iter.valid() {
break;
Expand All @@ -351,11 +325,11 @@ impl SSTImporter {
let iter_key = event_iter.key().to_vec();

let ts = Key::decode_ts_from(&iter_key)?;
if ts > TimeStamp::new(meta.get_restore_ts()) {
// we assume the keys in file are sorted by ts.
// so if we met the key not satisfy the ts.
if ts > TimeStamp::new(restore_ts) {
// we assume the keys in file are sorted by ts.
// so if we met the key not satisfy the ts.
// we can easily filter the remain keys.
break
break;
}

smallest_key = smallest_key.map_or_else(
Expand Down Expand Up @@ -383,20 +357,17 @@ impl SSTImporter {

debug!(
"perform rewrite new key: {:?}, new key prefix: {:?}, old key prefix: {:?}",
log_wrappers::Value::key(keys::origin_key(&key)),
log_wrappers::Value::key(&key),
log_wrappers::Value::key(new_prefix),
log_wrappers::Value::key(old_prefix),
);
} else {
key = keys::data_key(event_iter.key());
key = event_iter.key().to_vec();
}
let value = Cow::Borrowed(event_iter.value());
// TODO handle delete cf
wb.put_cf(cf, &key, &value)?;
let value = event_iter.value().to_vec();
build_fn(key.clone(), value);
}
wb.write()?;
let label = if perform_rewrite { "rewrite" } else { "normal" };
info!("apply file finished {}", name);
IMPORTER_APPLY_DURATION
.with_label_values(&[label])
.observe(start.saturating_elapsed().as_secs_f64());
Expand Down
Loading

0 comments on commit 6b09a8b

Please sign in to comment.