Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
Signed-off-by: 3pointer <[email protected]>
  • Loading branch information
3pointer committed Feb 24, 2022
1 parent 77a93dc commit bb116e2
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 60 deletions.
1 change: 1 addition & 0 deletions components/server/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -985,6 +985,7 @@ impl<ER: RaftEngine> TiKVServer<ER> {
// Import SST service.
let import_service = ImportSSTService::new(
self.config.import.clone(),
self.config.raft_store.raft_entry_max_size,
self.router.clone(),
engines.engines.kv.clone(),
servers.importer.clone(),
Expand Down
1 change: 1 addition & 0 deletions components/test_raftstore/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,7 @@ impl Simulator for ServerCluster {
};
let import_service = ImportSSTService::new(
cfg.import.clone(),
cfg.raft_store.raft_entry_max_size,
sim_router.clone(),
engines.kv.clone(),
Arc::clone(&importer),
Expand Down
107 changes: 47 additions & 60 deletions src/import/sst_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use protobuf::Message;

use crate::server::CONFIG_ROCKSDB_GAUGE;
use raftstore::router::RaftStoreRouter;
use raftstore::store::{Callback, Config as RaftConfig, RaftCmdExtraOpts, RegionSnapshot};
use raftstore::store::{Callback, RaftCmdExtraOpts, RegionSnapshot};
use tikv_util::config::ReadableSize;
use tikv_util::future::create_stream_with_buffer;
use tikv_util::future::paired_future_callback;
Expand Down Expand Up @@ -69,6 +69,7 @@ where
{
pub fn new(
cfg: Config,
raft_entry_max_size: ReadableSize,
router: Router,
engine: E,
importer: Arc<SSTImporter>,
Expand All @@ -94,7 +95,7 @@ where
importer,
limiter: Limiter::new(f64::INFINITY),
task_slots: Arc::new(Mutex::new(HashSet::default())),
raft_entry_max_size: RaftConfig::default().raft_entry_max_size,
raft_entry_max_size,
}
}

Expand Down Expand Up @@ -448,76 +449,62 @@ where
let context = req.take_context();
let meta = req.get_meta();

match importer.do_download_kv_file(meta, req.get_storage_backend(), &limiter) {
Ok(temp_file) => {
let mut reqs = vec![];
let mut cmd_reqs = vec![];
let mut build_req_fn = self.build_apply_request(
reqs.as_mut(),
cmd_reqs.as_mut(),
meta.get_is_delete(),
meta.get_cf(),
context.clone(),
);
match importer.do_apply_kv_file(
meta.get_restore_ts(),
temp_file,
req.get_rewrite_rule(),
&mut build_req_fn,
) {
Ok(range) => {
drop(build_req_fn);
if !reqs.is_empty() {
let cmd = make_request(reqs.as_mut(), context);
cmd_reqs.push(cmd);
}
for cmd in cmd_reqs {
let (cb, future) = paired_future_callback();
match router.send_command(
cmd,
Callback::write(cb),
RaftCmdExtraOpts::default(),
) {
Ok(_) => futs.push(future),
Err(_e) => {
let mut import_err = kvproto::import_sstpb::Error::default();
import_err
.set_message("failed to send raft command".to_string());
apply_resp.set_error(import_err);
}
}
}
if let Some(r) = range {
apply_resp.set_range(r);
}
}
let result = (|| -> Result<()> {
let temp_file =
importer.do_download_kv_file(meta, req.get_storage_backend(), &limiter)?;
let mut reqs = vec![];
let mut cmd_reqs = vec![];
let mut build_req_fn = self.build_apply_request(
reqs.as_mut(),
cmd_reqs.as_mut(),
meta.get_is_delete(),
meta.get_cf(),
context.clone(),
);
let range = importer.do_apply_kv_file(
meta.get_restore_ts(),
temp_file,
req.get_rewrite_rule(),
&mut build_req_fn,
)?;
drop(build_req_fn);
if !reqs.is_empty() {
let cmd = make_request(reqs.as_mut(), context);
cmd_reqs.push(cmd);
}
for cmd in cmd_reqs {
let (cb, future) = paired_future_callback();
match router.send_command(cmd, Callback::write(cb), RaftCmdExtraOpts::default()) {
Ok(_) => futs.push(future),
Err(e) => {
apply_resp.set_error(e.into());
let mut import_err = kvproto::import_sstpb::Error::default();
import_err.set_message(format!("failed to send raft command: {}", e));
apply_resp.set_error(import_err);
}
}
}
Err(e) => {
apply_resp.set_error(e.into());
if let Some(r) = range {
apply_resp.set_range(r);
}
Ok(())
})();
if let Err(e) = result {
apply_resp.set_error(e.into());
}

let handle_task = async move {
// Records how long the apply task waits to be scheduled.
sst_importer::metrics::IMPORTER_APPLY_DURATION
.with_label_values(&["queue"])
.observe(start.saturating_elapsed().as_secs_f64());
let resp = if !futs.is_empty() {
Ok(join_all(futs).await.iter().fold(apply_resp, |mut resp, x| {
if x.is_err() {
let mut import_err = kvproto::import_sstpb::Error::default();
import_err.set_message("failed to complete raft command".to_string());
resp.set_error(import_err);
}
resp
}))
} else {
Ok(apply_resp)
};
let resp = Ok(join_all(futs).await.iter().fold(apply_resp, |mut resp, x| {
if let Err(e) = x {
let mut import_err = kvproto::import_sstpb::Error::default();
import_err.set_message(format!("failed to complete raft command: {}", e));
resp.set_error(import_err);
}
resp
}));
// Records how long the apply task waits to be scheduled.
sst_importer::metrics::IMPORTER_APPLY_DURATION
.with_label_values(&["finish"])
Expand Down

0 comments on commit bb116e2

Please sign in to comment.