Skip to content

Commit

Permalink
cherry-pick more fixes from 3pointer#24
Browse files Browse the repository at this point in the history
Signed-off-by: Yu Juncen <[email protected]>
  • Loading branch information
YuJuncen committed Feb 28, 2022
1 parent 6a0f1d9 commit 6275833
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 16 deletions.
38 changes: 26 additions & 12 deletions components/br-stream/src/observer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,16 @@ impl BackupStreamObserver {

/// Test whether a region should be observed by the observer.
fn should_register_region(&self, region: &Region) -> bool {
// If the end key is empty, it actually meant infinity.
// However, this way is a little hacky, maybe we'd better make a
// `RangesBound<R>` version for `is_overlapping`.
let mut end_key = region.get_end_key();
if end_key.is_empty() {
end_key = &[0xffu8; 32];
}
self.ranges
.rl()
.is_overlapping((region.get_start_key(), region.get_end_key()))
.is_overlapping((region.get_start_key(), end_key))
}
}

Expand Down Expand Up @@ -233,6 +240,16 @@ mod tests {

use super::BackupStreamObserver;

macro_rules! assert_let {
(let $p:pat = $e:expr; $cc:tt) => {
if let $p = $e {
$cc
} else {
panic!("{} doesn't matches {}", stringify!($e), stringify!($p))
}
};
}

fn fake_region(id: u64, start: &[u8], end: &[u8]) -> Region {
let mut r = Region::new();
r.set_id(id);
Expand Down Expand Up @@ -278,11 +295,9 @@ mod tests {
o.register_region(&r);
let task = rx.recv_timeout(Duration::from_secs(0)).unwrap().unwrap();
let handle = ObserveHandle::new();
if let Task::ModifyObserve(ObserveOp::Start { region }) = task {
assert_let!(let Task::ModifyObserve(ObserveOp::Start { region }) = task; {
o.subs.register_region(region.get_id(), handle.clone())
} else {
panic!("unexpected message received: it is {}", task);
}
});

// Test events with key in the range can be observed.
let observe_info = CmdObserveInfo::from_handle(handle.clone(), ObserveHandle::new());
Expand All @@ -291,13 +306,11 @@ mod tests {
let mut cmd_batches = vec![cb];
o.on_flush_applied_cmd_batch(ObserveLevel::All, &mut cmd_batches, &mock_engine);
let task = rx.recv_timeout(Duration::from_secs(0)).unwrap().unwrap();
if let Task::BatchEvent(batches) = task {
assert_let!(let Task::BatchEvent(batches) = task; {
assert!(batches.len() == 1);
assert!(batches[0].region_id == 42);
assert!(batches[0].cdc_id == handle.id);
} else {
panic!("unexpected message received: it is {}", task);
}
});

// Test event from other region should not be send.
let observe_info = CmdObserveInfo::from_handle(ObserveHandle::new(), ObserveHandle::new());
Expand All @@ -317,7 +330,7 @@ mod tests {
assert!(task.is_err(), "it is {:?}", task);
assert!(!o.subs.is_observing(43));

// Test region out of range won't be added to observe list.
// Test newly created region out of range won't be added to observe list.
let mut ctx = ObserverContext::new(&r);
o.on_region_changed(&mut ctx, RegionChangeEvent::Create, StateRole::Leader);
let task = rx.recv_timeout(Duration::from_millis(20));
Expand All @@ -329,7 +342,8 @@ mod tests {
let mut ctx = ObserverContext::new(&r);
o.on_role_change(&mut ctx, StateRole::Follower);
let task = rx.recv_timeout(Duration::from_millis(20));
assert!(task.is_err(), "it is {:?}", task);
assert!(!o.subs.should_observe(42));
assert_let!(let Ok(Some(Task::ModifyObserve(ObserveOp::Stop { region, .. }))) = task; {
assert_eq!(region.id, 42);
});
}
}
43 changes: 39 additions & 4 deletions components/br-stream/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,18 @@ impl<K: Ord, V: Default> SegmentMap<K, V> {
}
}

struct RangeToExclusiveRef<'a, T: ?Sized>(&'a T);

impl<'a, T: ?Sized> RangeBounds<T> for RangeToExclusiveRef<'a, T> {
fn start_bound(&self) -> Bound<&T> {
Bound::Unbounded
}

fn end_bound(&self) -> Bound<&T> {
Bound::Excluded(self.0)
}
}

impl<K: Ord, V> SegmentMap<K, V> {
/// Like `add`, but insert a value associated to the key.
pub fn insert(&mut self, (start, end): (K, K), value: V) -> bool {
Expand Down Expand Up @@ -222,10 +234,32 @@ impl<K: Ord, V> SegmentMap<K, V> {
K: Borrow<R>,
R: Ord + ?Sized,
{
self.get_interval_by_point(range.0).is_some()
|| self
.get_interval_by_point(range.1)
.map_or(false, |rng| <K as Borrow<R>>::borrow(rng.0) != range.1)
// o: The Start Key.
// e: The End Key.
// +: The Boundary of Candidate Range.
// |------+-s----+----e----|
// Firstly, we check whether the start point is in some range.
// if true, it must be overlapping.
let overlap_with_start = self.get_interval_by_point(range.0).is_some();
// |--s----+-----+----e----|
// Otherwise, the possibility of being overlapping would be there are some sub range
// of the queried range...
// |--s----+----e----+-----|
// ...Or the end key is contained by some Range.
// For faster query, we merged the two cases together.
let covered_by_the_range = self
.0
// When querying possibility of overlapping by end key,
// we don't want the range [end key, ...) become a candidate.
// (which is impossible to overlapping with the range)
.range(RangeToExclusiveRef(range.1))
.next_back()
.filter(|(start, end)| {
<K as Borrow<R>>::borrow(&end.range_end) > range.1
|| <K as Borrow<R>>::borrow(start) > range.0
})
.is_some();
overlap_with_start || covered_by_the_range
}

pub fn get_inner(&mut self) -> &mut BTreeMap<K, SegmentValue<K, V>> {
Expand Down Expand Up @@ -298,5 +332,6 @@ mod test {
assert!(!tree.is_overlapping((&8, &42)));
assert!(!tree.is_overlapping((&9, &10)));
assert!(tree.is_overlapping((&2, &10)));
assert!(tree.is_overlapping((&0, &9999999)));
}
}

0 comments on commit 6275833

Please sign in to comment.