Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

br-stream: remove duplicate entry in apply kv file #28

Merged
merged 5 commits into from
Mar 7, 2022
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 21 additions & 8 deletions src/import/sst_service.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
// Copyright 2018 TiKV Project Authors. Licensed under Apache-2.0.

use std::collections::HashMap;
use std::future::Future;
use std::path::PathBuf;
use std::sync::{Arc, Mutex};
Expand Down Expand Up @@ -33,6 +34,7 @@ use tikv_util::config::ReadableSize;
use tikv_util::future::create_stream_with_buffer;
use tikv_util::future::paired_future_callback;
use tikv_util::time::{Instant, Limiter};
use txn_types::Key;

use crate::import::duplicate_detect::DuplicateDetector;
use sst_importer::metrics::*;
Expand Down Expand Up @@ -161,9 +163,12 @@ where
None
}

// we need to remove duplicate keys in here, since
// in https://github.com/tikv/tikv/blob/a401f78bc86f7e6ea6a55ad9f453ae31be835b55/components/resolved_ts/src/cmd.rs#L204
// will panic if found duplicated entry during Vec<PutRequest>.
fn build_apply_request<'a, 'b>(
&self,
reqs: &'a mut Vec<Request>,
reqs: &'a mut HashMap<Vec<u8>, Request>,
cmd_reqs: &'a mut Vec<RaftCmdRequest>,
is_delete: bool,
cf: &'b str,
Expand All @@ -180,12 +185,14 @@ where
Box::new(move |k: Vec<u8>, _v: Vec<u8>| {
let mut req = Request::default();
let mut del = DeleteRequest::default();

let hk = Key::truncate_ts_for(&k).expect("key without ts").to_vec();
del.set_key(k);
del.set_cf(cf.to_string());
req.set_cmd_type(CmdType::Delete);
req.set_delete(del);
req_size += req.compute_size() as u64;
reqs.push(req);
reqs.insert(hk, req);
if req_size > raft_size / 2 {
req_size = 0;
let cmd = make_request(reqs, context.clone());
Expand All @@ -196,13 +203,14 @@ where
Box::new(move |k: Vec<u8>, v: Vec<u8>| {
let mut req = Request::default();
let mut put = PutRequest::default();
let hk = Key::truncate_ts_for(&k).expect("key without ts").to_vec();
put.set_key(k);
put.set_value(v);
put.set_cf(cf.to_string());
req.set_cmd_type(CmdType::Put);
req.set_put(put);
req_size += req.compute_size() as u64;
reqs.push(req);
reqs.insert(hk, req);
if req_size > raft_size / 2 {
req_size = 0;
let cmd = make_request(reqs, context.clone());
Expand Down Expand Up @@ -452,10 +460,10 @@ where
let result = (|| -> Result<()> {
let temp_file =
importer.do_download_kv_file(meta, req.get_storage_backend(), &limiter)?;
let mut reqs = vec![];
let mut reqs: HashMap<Vec<u8>, Request> = HashMap::default();
3pointer marked this conversation as resolved.
Show resolved Hide resolved
let mut cmd_reqs = vec![];
let mut build_req_fn = self.build_apply_request(
reqs.as_mut(),
&mut reqs,
cmd_reqs.as_mut(),
meta.get_is_delete(),
meta.get_cf(),
Expand All @@ -469,7 +477,7 @@ where
)?;
drop(build_req_fn);
if !reqs.is_empty() {
let cmd = make_request(reqs.as_mut(), context);
let cmd = make_request(&mut reqs, context);
cmd_reqs.push(cmd);
}
for cmd in cmd_reqs {
Expand Down Expand Up @@ -852,10 +860,15 @@ fn make_request_header(mut context: Context) -> RaftRequestHeader {
header
}

fn make_request(reqs: &mut Vec<Request>, context: Context) -> RaftCmdRequest {
fn make_request(reqs: &mut HashMap<Vec<u8>, Request>, context: Context) -> RaftCmdRequest {
let mut cmd = RaftCmdRequest::default();
let header = make_request_header(context);
cmd.set_header(header);
cmd.set_requests(std::mem::take(reqs).into());
cmd.set_requests(
std::mem::take(reqs)
.into_values()
.collect::<Vec<Request>>()
.into(),
);
cmd
}