Skip to content

Commit

Permalink
*: check region epoch strictly (tikv#4125)
Browse files Browse the repository at this point in the history
Checks region epoch strictly to avoid KeyNotInRegion errors.

A 3 nodes TiKV cluster with merge enabled, after commit merge, TiKV A
tells TiDB with a epoch not match error contains the latest target Region
info, TiDB updates its region cache and sends requests to TiKV B,
and TiKV B has not applied commit merge yet, since the region epoch in
request is higher than TiKV B, the request must be denied due to epoch
not match, so it does not read on a stale snapshot, thus avoid the
KeyNotInRegion error.

Signed-off-by: Neil Shen <[email protected]>
  • Loading branch information
overvenus authored Feb 1, 2019
1 parent c646691 commit de0cabb
Show file tree
Hide file tree
Showing 18 changed files with 161 additions and 115 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions components/test_raftstore/src/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -567,7 +567,7 @@ impl<T: Simulator> Cluster<T> {
}

let resp = result.unwrap();
if resp.get_header().get_error().has_stale_epoch() {
if resp.get_header().get_error().has_epoch_not_match() {
warn!("seems split, let's retry");
sleep_ms(100);
continue;
Expand Down Expand Up @@ -832,7 +832,7 @@ impl<T: Simulator> Cluster<T> {
let mut resp = write_resp.response;
if resp.get_header().has_error() {
let error = resp.get_header().get_error();
if error.has_stale_epoch()
if error.has_epoch_not_match()
|| error.has_not_leader()
|| error.has_stale_command()
{
Expand Down
2 changes: 1 addition & 1 deletion components/test_raftstore/src/pd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -610,7 +610,7 @@ fn check_stale_region(region: &metapb::Region, check_region: &metapb::Region) ->
}

Err(box_err!(
"stale epoch {:?}, we are now {:?}",
"epoch not match {:?}, we are now {:?}",
check_epoch,
epoch
))
Expand Down
8 changes: 4 additions & 4 deletions src/import/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ quick_error! {
display("TikvRPC {:?}", err)
}
NotLeader(new_leader: Option<Peer>) {}
StaleEpoch(new_regions: Vec<Region>) {}
EpochNotMatch(current_regions: Vec<Region>) {}
UpdateRegion(new_region: RegionInfo) {}
ImportJobFailed(tag: String) {
display("{}", tag)
Expand All @@ -124,9 +124,9 @@ impl From<errorpb::Error> for Error {
} else {
Error::NotLeader(None)
}
} else if err.has_stale_epoch() {
let mut error = err.take_stale_epoch();
Error::StaleEpoch(error.take_new_regions().to_vec())
} else if err.has_epoch_not_match() {
let mut error = err.take_epoch_not_match();
Error::EpochNotMatch(error.take_current_regions().to_vec())
} else {
Error::TikvRPC(err)
}
Expand Down
21 changes: 12 additions & 9 deletions src/import/import.rs
Original file line number Diff line number Diff line change
Expand Up @@ -320,21 +320,24 @@ impl<Client: ImportClient> ImportSSTJob<Client> {
region.leader = new_leader;
Err(Error::UpdateRegion(region))
}
Err(Error::StaleEpoch(new_regions)) => {
let new_region = new_regions
Err(Error::EpochNotMatch(current_regions)) => {
let current_region = current_regions
.iter()
.find(|&r| self.sst.inside_region(r))
.cloned();
match new_region {
Some(new_region) => {
match current_region {
Some(current_region) => {
let new_leader = region
.leader
.and_then(|p| find_region_peer(&new_region, p.get_store_id()));
Err(Error::UpdateRegion(RegionInfo::new(new_region, new_leader)))
.and_then(|p| find_region_peer(&current_region, p.get_store_id()));
Err(Error::UpdateRegion(RegionInfo::new(
current_region,
new_leader,
)))
}
None => {
warn!("{} stale epoch {:?}", self.tag, new_regions);
Err(Error::StaleEpoch(new_regions))
warn!("{} epoch not match {:?}", self.tag, current_region);
Err(Error::EpochNotMatch(current_regions))
}
}
}
Expand Down Expand Up @@ -373,7 +376,7 @@ impl<Client: ImportClient> ImportSSTJob<Client> {
Ok(())
} else {
match Error::from(resp.take_error()) {
e @ Error::NotLeader(_) | e @ Error::StaleEpoch(_) => return Err(e),
e @ Error::NotLeader(_) | e @ Error::EpochNotMatch(_) => return Err(e),
e => Err(e),
}
}
Expand Down
24 changes: 15 additions & 9 deletions src/import/prepare.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,18 +192,24 @@ impl<Client: ImportClient> PrepareRangeJob<Client> {
region.leader = new_leader;
Err(Error::UpdateRegion(region))
}
Err(Error::StaleEpoch(new_regions)) => {
let new_region = new_regions.iter().find(|&r| self.need_split(r)).cloned();
match new_region {
Some(new_region) => {
Err(Error::EpochNotMatch(current_regions)) => {
let current_region = current_regions
.iter()
.find(|&r| self.need_split(r))
.cloned();
match current_region {
Some(current_region) => {
let new_leader = region
.leader
.and_then(|p| find_region_peer(&new_region, p.get_store_id()));
Err(Error::UpdateRegion(RegionInfo::new(new_region, new_leader)))
.and_then(|p| find_region_peer(&current_region, p.get_store_id()));
Err(Error::UpdateRegion(RegionInfo::new(
current_region,
new_leader,
)))
}
None => {
warn!("{} stale epoch {:?}", self.tag, new_regions);
Err(Error::StaleEpoch(new_regions))
warn!("{} epoch not match {:?}", self.tag, current_regions);
Err(Error::EpochNotMatch(current_regions))
}
}
}
Expand Down Expand Up @@ -231,7 +237,7 @@ impl<Client: ImportClient> PrepareRangeJob<Client> {
Ok(resp)
} else {
match Error::from(resp.take_region_error()) {
e @ Error::NotLeader(_) | e @ Error::StaleEpoch(_) => return Err(e),
e @ Error::NotLeader(_) | e @ Error::EpochNotMatch(_) => return Err(e),
e => Err(e),
}
}
Expand Down
14 changes: 7 additions & 7 deletions src/raftstore/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,9 +115,9 @@ quick_error! {
description("request timeout")
display("Timeout {}", msg)
}
StaleEpoch(msg: String, new_regions: Vec<metapb::Region>) {
description("region is stale")
display("StaleEpoch {}", msg)
EpochNotMatch(msg: String, new_regions: Vec<metapb::Region>) {
description("region epoch is not match")
display("EpochNotMatch {}", msg)
}
StaleCommand {
description("stale command")
Expand Down Expand Up @@ -186,10 +186,10 @@ impl Into<errorpb::Error> for Error {
.mut_key_not_in_region()
.set_end_key(region.get_end_key().to_vec());
}
Error::StaleEpoch(_, new_regions) => {
let mut e = errorpb::StaleEpoch::new();
e.set_new_regions(RepeatedField::from_vec(new_regions));
errorpb.set_stale_epoch(e);
Error::EpochNotMatch(_, new_regions) => {
let mut e = errorpb::EpochNotMatch::new();
e.set_current_regions(RepeatedField::from_vec(new_regions));
errorpb.set_epoch_not_match(e);
}
Error::StaleCommand => {
errorpb.set_stale_command(errorpb::StaleCommand::new());
Expand Down
31 changes: 15 additions & 16 deletions src/raftstore/store/fsm/apply.rs
Original file line number Diff line number Diff line change
Expand Up @@ -883,7 +883,7 @@ impl ApplyDelegate {
///
/// An apply operation can fail in the following situations:
/// 1. it encounters an error that will occur on all stores, it can continue
/// applying next entry safely, like stale epoch for example;
/// applying next entry safely, like epoch not match for example;
/// 2. it encounters an error that may not occur on all stores, in this case
/// we should try to apply the entry again or panic. Considering that this
/// usually due to disk operation fail, which is rare, so just panic is ok.
Expand All @@ -905,7 +905,7 @@ impl ApplyDelegate {
// clear dirty values.
ctx.wb_mut().rollback_to_save_point().unwrap();
match e {
Error::StaleEpoch(..) => debug!("{} stale epoch err: {:?}", self.tag, e),
Error::EpochNotMatch(..) => debug!("{} epoch not match err: {:?}", self.tag, e),
_ => error!("{} execute raft command err: {:?}", self.tag, e),
}
(cmd_resp::new_error(e), ApplyResult::None)
Expand Down Expand Up @@ -986,7 +986,7 @@ impl ApplyDelegate {
ctx: &mut ApplyContext,
req: RaftCmdRequest,
) -> Result<(RaftCmdResponse, ApplyResult)> {
// Include region for stale epoch after merge may cause key not in range.
// Include region for epoch not match after merge may cause key not in range.
let include_region =
req.get_header().get_region_epoch().get_version() >= self.last_merge_version;
check_region_epoch(&req, &self.region, include_region)?;
Expand Down Expand Up @@ -1995,7 +1995,7 @@ fn check_sst_for_ingestion(sst: &SSTMeta, region: &Region) -> Result<()> {
|| epoch.get_version() != region_epoch.get_version()
{
let error = format!("{:?} != {:?}", epoch, region_epoch);
return Err(Error::StaleEpoch(error, vec![region.clone()]));
return Err(Error::EpochNotMatch(error, vec![region.clone()]));
}

let range = sst.get_range();
Expand Down Expand Up @@ -3180,7 +3180,7 @@ mod tests {
.build();
router.schedule_task(1, Msg::apply(Apply::new(1, 2, vec![put_entry])));
let resp = capture_rx.recv_timeout(Duration::from_secs(3)).unwrap();
assert!(resp.get_header().get_error().has_stale_epoch());
assert!(resp.get_header().get_error().has_epoch_not_match());
let apply_res = fetch_apply_res(&rx);
assert_eq!(apply_res.applied_index_term, 2);
assert_eq!(apply_res.apply_state.get_applied_index(), 3);
Expand Down Expand Up @@ -3290,12 +3290,12 @@ mod tests {
.ingest_sst(&meta1)
.epoch(0, 3)
.build();
let ingest_stale_epoch = EntryBuilder::new(11, 3)
let ingest_epoch_not_match = EntryBuilder::new(11, 3)
.capture_resp(&router, 3, 1, capture_tx.clone())
.ingest_sst(&meta2)
.epoch(0, 3)
.build();
let entries = vec![put_ok, ingest_ok, ingest_stale_epoch];
let entries = vec![put_ok, ingest_ok, ingest_epoch_not_match];
router.schedule_task(1, Msg::apply(Apply::new(1, 3, entries)));
let resp = capture_rx.recv_timeout(Duration::from_secs(3)).unwrap();
assert!(!resp.get_header().has_error(), "{:?}", resp);
Expand Down Expand Up @@ -3528,9 +3528,6 @@ mod tests {
let resp = exec_split(&router, splits.clone());
// All requests should be checked.
assert!(error_msg(&resp).contains("id count"), "{:?}", resp);

let mut new_version = epoch.borrow().get_version() + 1;
epoch.borrow_mut().set_version(new_version);
let checker = SplitResultChecker {
db: &engines.kv,
origin_peers: &peers,
Expand All @@ -3544,11 +3541,11 @@ mod tests {
let resp = exec_split(&router, splits.clone());
// Split should succeed.
assert!(!resp.get_header().has_error(), "{:?}", resp);
let mut new_version = epoch.borrow().get_version() + 1;
epoch.borrow_mut().set_version(new_version);
checker.check(b"", b"k1", 8, &[9, 10, 11], true);
checker.check(b"k1", b"k5", 1, &[3, 5, 7], false);

new_version = epoch.borrow().get_version() + 1;
epoch.borrow_mut().set_version(new_version);
splits.mut_requests().clear();
splits
.mut_requests()
Expand All @@ -3557,11 +3554,11 @@ mod tests {
let resp = exec_split(&router, splits.clone());
// Right derive should be respected.
assert!(!resp.get_header().has_error(), "{:?}", resp);
new_version = epoch.borrow().get_version() + 1;
epoch.borrow_mut().set_version(new_version);
checker.check(b"k4", b"k5", 12, &[13, 14, 15], true);
checker.check(b"k1", b"k4", 1, &[3, 5, 7], false);

new_version = epoch.borrow().get_version() + 2;
epoch.borrow_mut().set_version(new_version);
splits.mut_requests().clear();
splits
.mut_requests()
Expand All @@ -3573,12 +3570,12 @@ mod tests {
let resp = exec_split(&router, splits.clone());
// Right derive should be respected.
assert!(!resp.get_header().has_error(), "{:?}", resp);
new_version = epoch.borrow().get_version() + 2;
epoch.borrow_mut().set_version(new_version);
checker.check(b"k1", b"k2", 16, &[17, 18, 19], true);
checker.check(b"k2", b"k3", 20, &[21, 22, 23], true);
checker.check(b"k3", b"k4", 1, &[3, 5, 7], false);

new_version = epoch.borrow().get_version() + 2;
epoch.borrow_mut().set_version(new_version);
splits.mut_requests().clear();
splits
.mut_requests()
Expand All @@ -3590,6 +3587,8 @@ mod tests {
let resp = exec_split(&router, splits.clone());
// Right derive should be respected.
assert!(!resp.get_header().has_error(), "{:?}", resp);
new_version = epoch.borrow().get_version() + 2;
epoch.borrow_mut().set_version(new_version);
checker.check(b"k3", b"k31", 1, &[3, 5, 7], false);
checker.check(b"k31", b"k32", 24, &[25, 26, 27], true);
checker.check(b"k32", b"k4", 28, &[29, 30, 31], true);
Expand Down
8 changes: 4 additions & 4 deletions src/raftstore/store/fsm/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1927,7 +1927,7 @@ impl<'a, T: Transport, C: PdClient> PeerFsmDelegate<'a, T, C> {
}

match util::check_region_epoch(msg, self.fsm.peer.region(), true) {
Err(Error::StaleEpoch(msg, mut new_regions)) => {
Err(Error::EpochNotMatch(msg, mut new_regions)) => {
// Attach the region which might be split from the current region. But it doesn't
// matter if the region is not split from the current region. If the region meta
// received by the TiKV driver is newer than the meta cached in the driver, the meta is
Expand All @@ -1936,8 +1936,8 @@ impl<'a, T: Transport, C: PdClient> PeerFsmDelegate<'a, T, C> {
if let Some(sibling_region) = sibling_region {
new_regions.push(sibling_region);
}
self.ctx.raft_metrics.invalid_proposal.stale_epoch += 1;
Err(Error::StaleEpoch(msg, new_regions))
self.ctx.raft_metrics.invalid_proposal.epoch_not_match += 1;
Err(Error::EpochNotMatch(msg, new_regions))
}
Err(e) => Err(e),
Ok(()) => Ok(None),
Expand Down Expand Up @@ -2230,7 +2230,7 @@ impl<'a, T: Transport, C: PdClient> PeerFsmDelegate<'a, T, C> {
region.get_region_epoch(),
epoch
);
return Err(Error::StaleEpoch(
return Err(Error::EpochNotMatch(
format!(
"{} epoch changed {:?} != {:?}, retry later",
self.fsm.peer.tag, latest_epoch, epoch
Expand Down
12 changes: 6 additions & 6 deletions src/raftstore/store/local_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ pub struct RaftInvalidProposeMetrics {
pub not_leader: u64,
pub mismatch_peer_id: u64,
pub stale_command: u64,
pub stale_epoch: u64,
pub epoch_not_match: u64,
}

impl Default for RaftInvalidProposeMetrics {
Expand All @@ -323,7 +323,7 @@ impl Default for RaftInvalidProposeMetrics {
not_leader: 0,
mismatch_peer_id: 0,
stale_command: 0,
stale_epoch: 0,
epoch_not_match: 0,
}
}
}
Expand Down Expand Up @@ -360,11 +360,11 @@ impl RaftInvalidProposeMetrics {
.inc_by(self.stale_command as i64);
self.stale_command = 0;
}
if self.stale_epoch > 0 {
if self.epoch_not_match > 0 {
RAFT_INVALID_PROPOSAL_COUNTER_VEC
.with_label_values(&["stale_epoch"])
.inc_by(self.stale_epoch as i64);
self.stale_epoch = 0;
.with_label_values(&["epoch_not_match"])
.inc_by(self.epoch_not_match as i64);
self.epoch_not_match = 0;
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/raftstore/store/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2108,7 +2108,7 @@ impl ReadExecutor {
pub fn execute(&mut self, msg: &RaftCmdRequest, region: &metapb::Region) -> ReadResponse {
if self.check_epoch {
if let Err(e) = check_region_epoch(msg, region, true) {
debug!("[region {}] stale epoch err: {:?}", region.get_id(), e);
debug!("[region {}] epoch not match err: {:?}", region.get_id(), e);
return ReadResponse {
response: cmd_resp::new_error(e),
snapshot: None,
Expand Down
Loading

0 comments on commit de0cabb

Please sign in to comment.