Skip to content

Commit

Permalink
updating proxy 6.2 for 2 bugfixs (#137)
Browse files Browse the repository at this point in the history
and unsafe backoff

* raftstore: use force_send to send ApplyRes (tikv#13168)

close tikv#13160

Use force_send to send ApplyRes

Signed-off-by: 5kbpers <[email protected]>

Co-authored-by: Ti Chi Robot <[email protected]>

Co-authored-by: 5kbpers <[email protected]>
Co-authored-by: Ti Chi Robot <[email protected]>
  • Loading branch information
3 people authored Aug 8, 2022
1 parent 2eea34c commit 2685d85
Show file tree
Hide file tree
Showing 8 changed files with 1,636 additions and 10 deletions.
3 changes: 3 additions & 0 deletions components/raftstore/src/store/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,8 @@ pub struct Config {

#[doc(hidden)]
pub max_snapshot_file_raw_size: ReadableSize,

pub unreachable_backoff: ReadableDuration,
}

impl Default for Config {
Expand Down Expand Up @@ -383,6 +385,7 @@ impl Default for Config {
renew_leader_lease_advance_duration: ReadableDuration::secs(0),
report_region_buckets_tick_interval: ReadableDuration::secs(10),
max_snapshot_file_raw_size: ReadableSize::mb(100),
unreachable_backoff: ReadableDuration::secs(10),
}
}
}
Expand Down
1 change: 1 addition & 0 deletions components/raftstore/src/store/fsm/apply.rs
Original file line number Diff line number Diff line change
Expand Up @@ -613,6 +613,7 @@ where
let is_synced = self.write_to_db();

if !self.apply_res.is_empty() {
fail_point!("before_nofity_apply_res");
let apply_res = mem::take(&mut self.apply_res);
self.notifier.notify(apply_res);
}
Expand Down
20 changes: 13 additions & 7 deletions components/raftstore/src/store/fsm/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,6 @@ use crate::{
type Key = Vec<u8>;

pub const PENDING_MSG_CAP: usize = 100;
const UNREACHABLE_BACKOFF: Duration = Duration::from_secs(10);
const ENTRY_CACHE_EVICT_TICK_DURATION: Duration = Duration::from_secs(1);
pub const MULTI_FILES_SNAPSHOT_FEATURE: Feature = Feature::require(6, 1, 0); // it only makes sense for large region

Expand Down Expand Up @@ -286,16 +285,21 @@ where
{
fn notify(&self, apply_res: Vec<ApplyRes<EK::Snapshot>>) {
for r in apply_res {
self.router.try_send(
r.region_id,
let region_id = r.region_id;
if let Err(e) = self.router.force_send(
region_id,
PeerMsg::ApplyRes {
res: ApplyTaskRes::Apply(r),
},
);
) {
error!("failed to send apply result"; "region_id" => region_id, "err" => ?e);
}
}
}
fn notify_one(&self, region_id: u64, msg: PeerMsg<EK>) {
self.router.try_send(region_id, msg);
if let Err(e) = self.router.force_send(region_id, msg) {
error!("failed to notify apply msg"; "region_id" => region_id, "err" => ?e);
}
}

fn clone_box(&self) -> Box<dyn ApplyNotifier<EK>> {
Expand Down Expand Up @@ -786,6 +790,7 @@ impl<EK: KvEngine, ER: RaftEngine, T: Transport> PollHandler<PeerFsm<EK, ER>, St
where
for<'a> F: FnOnce(&'a BatchSystemConfig),
{
fail_point!("begin_raft_poller");
self.previous_metrics = self.poll_ctx.raft_metrics.ready.clone();
self.poll_ctx.pending_count = 0;
self.poll_ctx.ready_count = 0;
Expand Down Expand Up @@ -2651,13 +2656,14 @@ impl<'a, EK: KvEngine, ER: RaftEngine, T: Transport> StoreFsmDelegate<'a, EK, ER

fn on_store_unreachable(&mut self, store_id: u64) {
let now = Instant::now();
let unreachable_backoff = self.ctx.cfg.unreachable_backoff.0;
if self
.fsm
.store
.last_unreachable_report
.get(&store_id)
.map_or(UNREACHABLE_BACKOFF, |t| now.saturating_duration_since(*t))
< UNREACHABLE_BACKOFF
.map_or(unreachable_backoff, |t| now.saturating_duration_since(*t))
< unreachable_backoff
{
return;
}
Expand Down
2 changes: 1 addition & 1 deletion components/raftstore/src/store/worker/check_leader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ impl Runner {
meta.region_ranges
// get overlapped regions
.range((Excluded(start_key), Unbounded))
.take_while(|(_, id)| end_key > enc_start_key(&meta.regions[id]))
.take_while(|(_, id)| end_key > enc_start_key(&meta.regions[*id]))
// get the min `safe_ts`
.map(|(_, id)| {
registry.get(id).unwrap().safe_ts()
Expand Down
7 changes: 6 additions & 1 deletion components/tikv_util/src/mpsc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use std::{
use crossbeam::channel::{
self, RecvError, RecvTimeoutError, SendError, TryRecvError, TrySendError,
};
use fail::fail_point;

struct State {
sender_cnt: AtomicIsize,
Expand Down Expand Up @@ -239,7 +240,11 @@ impl<T> LooseBoundedSender<T> {
#[inline]
pub fn try_send(&self, t: T) -> Result<(), TrySendError<T>> {
let cnt = self.tried_cnt.get();
if cnt < CHECK_INTERVAL {
let check_interval = || {
fail_point!("loose_bounded_sender_check_interval", |_| 0);
CHECK_INTERVAL
};
if cnt < check_interval() {
self.tried_cnt.set(cnt + 1);
} else if self.len() < self.limit {
self.tried_cnt.set(1);
Expand Down
39 changes: 38 additions & 1 deletion tests/failpoints/cases/test_split_region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use kvproto::{
use pd_client::PdClient;
use raft::eraftpb::MessageType;
use raftstore::{
store::{config::Config as RaftstoreConfig, util::is_vote_msg, Callback},
store::{config::Config as RaftstoreConfig, util::is_vote_msg, Callback, PeerMsg},
Result,
};
use test_raftstore::*;
Expand Down Expand Up @@ -984,3 +984,40 @@ fn test_split_pessimistic_locks_with_concurrent_prewrite() {
let resp = resp.join().unwrap();
assert!(resp.get_region_error().has_epoch_not_match(), "{:?}", resp);
}

#[test]
fn test_split_store_channel_full() {
let mut cluster = new_node_cluster(0, 1);
cluster.cfg.raft_store.notify_capacity = 10;
cluster.cfg.raft_store.store_batch_system.max_batch_size = Some(1);
cluster.cfg.raft_store.messages_per_tick = 1;
let pd_client = cluster.pd_client.clone();
pd_client.disable_default_operator();
cluster.run();
cluster.must_put(b"k1", b"v1");
cluster.must_put(b"k2", b"v2");
let region = pd_client.get_region(b"k2").unwrap();
let apply_fp = "before_nofity_apply_res";
fail::cfg(apply_fp, "pause").unwrap();
let (tx, rx) = mpsc::channel();
cluster.split_region(
&region,
b"k2",
Callback::write(Box::new(move |_| tx.send(()).unwrap())),
);
rx.recv().unwrap();
let sender_fp = "loose_bounded_sender_check_interval";
fail::cfg(sender_fp, "return").unwrap();
let store_fp = "begin_raft_poller";
fail::cfg(store_fp, "pause").unwrap();
let raft_router = cluster.sim.read().unwrap().get_router(1).unwrap();
for _ in 0..50 {
raft_router.force_send(1, PeerMsg::Noop).unwrap();
}
fail::remove(apply_fp);
fail::remove(store_fp);
sleep_ms(300);
let region = pd_client.get_region(b"k1").unwrap();
assert_ne!(region.id, 1);
fail::remove(sender_fp);
}
Loading

0 comments on commit 2685d85

Please sign in to comment.