Skip to content

Commit

Permalink
Check full in fold & find_any consume_iter, remove for filter*
Browse files Browse the repository at this point in the history
These adaptors consume may many elements before deferring to a base
folder's fullness checks, and so they need to be performed
manually. For the `filter`s, there's no way to do it manually (rayon-rs#632),
so the specialisations just have to be removed. For `fold` and
`find_any` this can be done with a `take_while`.

This extends the octillion tests to confirm this behaviour.

This makes a program like the following slightly slower compared to
the direct `consume_iter` without a check, but it's still faster than
the non-specialized form.

```
extern crate test;
extern crate rayon;
use rayon::prelude::*;

fn main() {
    let count = (0..std::u32::MAX)
        .into_par_iter()
        .map(test::black_box)
        .find_any(|_| test::black_box(false));
    println!("{:?}", count);
}
```

```
$ hyperfine ./find-original ./find-no-check ./find-check
Benchmark #1: ./find-original
  Time (mean ± σ):     627.6 ms ±  25.7 ms    [User: 7.130 s, System: 0.014 s]
  Range (min … max):   588.4 ms … 656.4 ms    10 runs

Benchmark #2: ./find-no-check
  Time (mean ± σ):     481.5 ms ±  10.8 ms    [User: 5.415 s, System: 0.013 s]
  Range (min … max):   468.9 ms … 498.2 ms    10 runs

Benchmark #3: ./find-check
  Time (mean ± σ):     562.3 ms ±  11.8 ms    [User: 6.363 s, System: 0.013 s]
  Range (min … max):   542.5 ms … 578.2 ms    10 runs
```

(find-original = without specialization, find-no-check = custom
`consume_iter` without `take_while`, find-check = this commit)
  • Loading branch information
huonw committed Feb 14, 2019
1 parent cabe301 commit a346bf0
Show file tree
Hide file tree
Showing 5 changed files with 62 additions and 30 deletions.
11 changes: 3 additions & 8 deletions src/iter/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,14 +136,9 @@ where
}
}

fn consume_iter<I>(mut self, iter: I) -> Self
where
I: IntoIterator<Item = T>
{
self.base = self.base.consume_iter(iter.into_iter().filter(self.filter_op));
self
}

// This cannot easily specialize `consume_iter` to be better than
// the default, because that requires checking `self.base.full()`
// during a call to `self.base.consume_iter()`. (#632)

fn complete(self) -> Self::Result {
self.base.complete()
Expand Down
11 changes: 4 additions & 7 deletions src/iter/filter_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,13 +139,10 @@ where
self
}
}
fn consume_iter<I>(mut self, iter: I) -> Self
where
I: IntoIterator<Item = T>
{
self.base = self.base.consume_iter(iter.into_iter().filter_map(self.filter_op));
self
}

// This cannot easily specialize `consume_iter` to be better than
// the default, because that requires checking `self.base.full()`
// during a call to `self.base.consume_iter()`. (#632)

fn complete(self) -> C::Result {
self.base.complete()
Expand Down
6 changes: 5 additions & 1 deletion src/iter/find.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,11 @@ where
fn consume_iter<I>(mut self, iter: I) -> Self
where I: IntoIterator<Item = T>
{
self.item = iter.into_iter().find(self.find_op);
self.item = iter
.into_iter()
// stop iterating if another thread has found something
.take_while(|_| !self.full())
.find(self.find_op);
if self.item.is_some() {
self.found.store(true, Ordering::Relaxed)
}
Expand Down
12 changes: 9 additions & 3 deletions src/iter/fold.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,12 +141,18 @@ where
}
}

fn consume_iter<I>(mut self, iter: I) -> Self
fn consume_iter<I>(self, iter: I) -> Self
where
I: IntoIterator<Item = T>
{
self.item = iter.into_iter().fold(self.item, self.fold_op);
self
let base = self.base;
let item = iter
.into_iter()
// stop iterating if another thread has finished
.take_while(|_| !base.full())
.fold(self.item, self.fold_op);

FoldFolder { base: base, item: item, fold_op: self.fold_op }
}

fn complete(self) -> C::Result {
Expand Down
52 changes: 41 additions & 11 deletions tests/octillion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,27 +42,57 @@ fn find_first_octillion_flat() {
assert_eq!(x, Some(0));
}

#[test]
fn find_last_octillion() {
fn two_threads<F: Send + FnOnce() -> R, R: Send>(f: F) -> R {
// FIXME: If we don't use at least two threads, then we end up walking
// through the entire iterator sequentially, without the benefit of any
// short-circuiting. We probably don't want testing to wait that long. ;)
// It would be nice if `find_last` could prioritize the later splits,
// basically flipping the `join` args, without needing indexed `rev`.
// (or could we have an unindexed `rev`?)
let builder = rayon::ThreadPoolBuilder::new().num_threads(2);
let pool = builder.build().unwrap();

let x = pool.install(|| octillion().find_last(|_| true));
pool.install(f)
}

#[test]
fn find_last_octillion() {
// It would be nice if `find_last` could prioritize the later splits,
// basically flipping the `join` args, without needing indexed `rev`.
// (or could we have an unindexed `rev`?)
let x = two_threads(|| octillion().find_last(|_| true));
assert_eq!(x, Some(OCTILLION - 1));
}

#[test]
fn find_last_octillion_flat() {
// FIXME: Ditto, need two threads.
let builder = rayon::ThreadPoolBuilder::new().num_threads(2);
let pool = builder.build().unwrap();

let x = pool.install(|| octillion_flat().find_last(|_| true));
let x = two_threads(|| octillion_flat().find_last(|_| true));
assert_eq!(x, Some(OCTILLION - 1));
}

#[test]
fn find_any_octillion() {
let x = two_threads(|| octillion().find_any(|x| *x > OCTILLION / 2));
assert!(x.is_some());
}

#[test]
fn find_any_octillion_flat() {
let x = two_threads(|| octillion_flat().find_any(|x| *x > OCTILLION / 2));
assert!(x.is_some());
}

#[test]
fn filter_find_any_octillion() {
let x = two_threads(|| octillion().filter(|x| *x > OCTILLION / 2).find_any(|_| true));
assert!(x.is_some());
}

#[test]
fn filter_find_any_octillion_flat() {
let x = two_threads(|| octillion_flat().filter(|x| *x > OCTILLION / 2).find_any(|_| true));
assert!(x.is_some());
}

#[test]
fn fold_find_any_octillion_flat() {
let x = two_threads(|| octillion_flat().fold(|| (), |_, _| ()).find_any(|_| true));
assert!(x.is_some());
}

0 comments on commit a346bf0

Please sign in to comment.