Skip to content
This repository has been archived by the owner on Oct 30, 2024. It is now read-only.

Commit

Permalink
Merge pull request #12 from TimelyDataflow/master
Browse files Browse the repository at this point in the history
Pull upstream changes
  • Loading branch information
antiguru authored May 24, 2024
2 parents 8d077e5 + 2de0cbd commit 4ba7bc2
Show file tree
Hide file tree
Showing 23 changed files with 513 additions and 575 deletions.
6 changes: 3 additions & 3 deletions dogsdogsdogs/src/operators/half_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,9 +208,9 @@ where
// Use TOTAL ORDER to allow the release of `time`.
yielded = yielded || yield_function(timer, work);
if !yielded && !input2.frontier.frontier().iter().any(|t| comparison(t, initial)) {
use differential_dataflow::trace::cursor::MyTrait;
cursor.seek_key(&storage, MyTrait::borrow_as(key));
if cursor.get_key(&storage) == Some(MyTrait::borrow_as(key)) {
use differential_dataflow::trace::cursor::IntoOwned;
cursor.seek_key(&storage, IntoOwned::borrow_as(key));
if cursor.get_key(&storage) == Some(IntoOwned::borrow_as(key)) {
while let Some(val2) = cursor.get_val(&storage) {
cursor.map_times(&storage, |t, d| {
if comparison(t, initial) {
Expand Down
6 changes: 3 additions & 3 deletions dogsdogsdogs/src/operators/lookup_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,9 @@ where
for &mut (ref prefix, ref time, ref mut diff) in prefixes.iter_mut() {
if !input2.frontier.less_equal(time) {
logic2(prefix, &mut key1);
use differential_dataflow::trace::cursor::MyTrait;
cursor.seek_key(&storage, MyTrait::borrow_as(&key1));
if cursor.get_key(&storage) == Some(MyTrait::borrow_as(&key1)) {
use differential_dataflow::trace::cursor::IntoOwned;
cursor.seek_key(&storage, IntoOwned::borrow_as(&key1));
if cursor.get_key(&storage) == Some(IntoOwned::borrow_as(&key1)) {
while let Some(value) = cursor.get_val(&storage) {
let mut count = Tr::Diff::zero();
cursor.map_times(&storage, |t, d| {
Expand Down
21 changes: 9 additions & 12 deletions examples/spines.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,13 @@ fn main() {
keys.join_core(&data, |_k, &(), &()| Option::<()>::None)
.probe_with(&mut probe);
},
// "rhh" => {
// use differential_dataflow::trace::implementations::rhh::{HashWrapper, VecSpine};
// let data = data.map(|x| HashWrapper { inner: x }).arrange::<VecSpine<_,(),_,_>>();
// let keys = keys.map(|x| HashWrapper { inner: x }).arrange::<VecSpine<_,(),_,_>>();
// keys.join_core(&data, |_k, &(), &()| Option::<()>::None)
// .probe_with(&mut probe);
// },
"rhh" => {
use differential_dataflow::trace::implementations::rhh::{HashWrapper, VecSpine};
let data = data.map(|x| HashWrapper { inner: x }).arrange::<VecSpine<_,(),_,_>>();
let keys = keys.map(|x| HashWrapper { inner: x }).arrange::<VecSpine<_,(),_,_>>();
keys.join_core(&data, |_k, &(), &()| Option::<()>::None)
.probe_with(&mut probe);
},
"slc" => {

use differential_dataflow::trace::implementations::ord_neu::PreferredSpine;
Expand All @@ -61,10 +61,7 @@ fn main() {
.arrange::<PreferredSpine<[u8],u8,_,_>>()
.reduce_abelian::<_, _, _, PreferredSpine<[u8],(),_,_>>("distinct", |_| (), |_,_,output| output.push(((), 1)));

keys.join_core(&data, |k,_v1,_v2| {
println!("{:?}", k.text);
Option::<((),isize,isize)>::None
})
keys.join_core(&data, |_k, &(), &()| Option::<()>::None)
.probe_with(&mut probe);
},
_ => {
Expand Down Expand Up @@ -122,4 +119,4 @@ fn main() {

println!("{:?}\tshut down", timer2.elapsed());

}
}
4 changes: 2 additions & 2 deletions mdbook/src/chapter_0/chapter_0_0.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ version = "0.1.0"
authors = ["Your Name <[email protected]>"]

[dependencies]
timely = "0.11.1"
differential-dataflow = "0.11.0"
timely = "0.12.0"
differential-dataflow = "0.12.0"
```

You should only need to add those last two lines there, which bring in dependencies on both [timely dataflow](https://github.com/TimelyDataflow/timely-dataflow) and [differential dataflow](https://github.com/TimelyDataflow/differential-dataflow). We will be using both of those.
Expand Down
4 changes: 2 additions & 2 deletions mdbook/src/chapter_0/chapter_0_1.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ fn main() {
.inspect(|x| println!("{:?}", x));
});

// Set an arbitrary size for our organization.
let size = 100;
// Set a size for our organization from the input.
let size = std::env::args().nth(1).and_then(|s| s.parse::<u32>().ok()).unwrap_or(10);

// Load input (a binary tree).
input.advance_to(0);
Expand Down
2 changes: 1 addition & 1 deletion mdbook/src/chapter_0/chapter_0_3.md
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ We can then use this probe to limit the introduction of new data, by waiting for
This starts to print out a mess of data, indicating not only how long it takes to start up the computation, but also how long each individual round of updates takes.

``` ignore
Echidnatron% cargo run --release --example hello 10000000
Echidnatron% cargo run --release -- 10000000
Finished release [optimized + debuginfo] target(s) in 0.06s
Running `target/release/examples/hello 10000000`
4.092895186s data loaded
Expand Down
2 changes: 0 additions & 2 deletions src/algorithms/graphs/sequential.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,6 @@ where
F: Fn(&N, &[(&V, isize)])->V+'static
{

let _timer = ::std::time::Instant::now();

// start iteration with None messages for all.
state
.map(|(node, _state)| (node, None))
Expand Down
43 changes: 2 additions & 41 deletions src/capture.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,47 +42,6 @@ pub struct Progress<T> {
pub counts: Vec<(T, usize)>,
}

/// An iterator that yields with a `None` every so often.
pub struct YieldingIter<I> {
/// When set, a time after which we should return `None`.
start: Option<std::time::Instant>,
after: std::time::Duration,
iter: I,
}

impl<I> YieldingIter<I> {
/// Construct a yielding iterator from an inter-yield duration.
pub fn new_from(iter: I, yield_after: std::time::Duration) -> Self {
Self {
start: None,
after: yield_after,
iter,
}
}
}

impl<I: Iterator> Iterator for YieldingIter<I> {
type Item = I::Item;
fn next(&mut self) -> Option<Self::Item> {
if self.start.is_none() {
self.start = Some(std::time::Instant::now());
}
let start = self.start.as_ref().unwrap();
if start.elapsed() > self.after {
self.start = None;
None
} else {
match self.iter.next() {
Some(x) => Some(x),
None => {
self.start = None;
None
}
}
}
}
}

/// A simple sink for byte slices.
pub trait Writer<T> {
/// Returns an amount of time to wait before retrying, or `None` for success.
Expand Down Expand Up @@ -785,6 +744,8 @@ pub mod sink {
// {
// super::source::build(scope, |activator| {
// let source = KafkaSource::new(addr, topic, group, activator);
// // An iterator combinator that yields every "duration" even if more items exist.
// // The implementation of such an iterator exists in the git history, or can be rewritten easily.
// super::YieldingIter::new_from(Iter::<D,T,R>::new_from(source), std::time::Duration::from_millis(10))
// })
// }
Expand Down
37 changes: 29 additions & 8 deletions src/difference.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ pub use self::Abelian as Diff;
/// There is a light presumption of commutativity here, in that while we will largely perform addition
/// in order of timestamps, for many types of timestamps there is no total order and consequently no
/// obvious order to respect. Non-commutative semigroups should be used with care.
pub trait Semigroup : ::std::marker::Sized + Data + Clone {
pub trait Semigroup<Rhs: ?Sized = Self> : Data + Clone {
/// The method of `std::ops::AddAssign`, for types that do not implement `AddAssign`.
fn plus_equals(&mut self, rhs: &Self);
fn plus_equals(&mut self, rhs: &Rhs);
/// Returns true if the element is the additive identity.
///
/// This is primarily used by differential dataflow to know when it is safe to delete an update.
Expand Down Expand Up @@ -233,22 +233,43 @@ mod vector {

impl<R: Semigroup> Semigroup for Vec<R> {
fn plus_equals(&mut self, rhs: &Self) {
// Ensure sufficient length to receive addition.
self.plus_equals(&rhs[..])
}
fn is_zero(&self) -> bool {
self.iter().all(|x| x.is_zero())
}
}

impl<R: Semigroup> Semigroup<[R]> for Vec<R> {
fn plus_equals(&mut self, rhs: &[R]) {
// Apply all updates to existing elements
for (index, update) in rhs.iter().enumerate().take(self.len()) {
self[index].plus_equals(update);
}

// Clone leftover elements from `rhs`
while self.len() < rhs.len() {
let element = &rhs[self.len()];
self.push(element.clone());
}

// As other is not longer, apply updates without tests.
for (index, update) in rhs.iter().enumerate() {
self[index].plus_equals(update);
}
}
fn is_zero(&self) -> bool {
self.iter().all(|x| x.is_zero())
}
}

#[cfg(test)]
mod tests {
use crate::difference::Semigroup;

#[test]
fn test_semigroup_vec() {
let mut a = vec![1,2,3];
a.plus_equals([1,1,1,1].as_slice());
assert_eq!(vec![2,3,4,1], a);
}
}

impl<R: Monoid> Monoid for Vec<R> {
fn zero() -> Self {
Self::new()
Expand Down
8 changes: 6 additions & 2 deletions src/operators/arrange/arrangement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ where

use ::timely::dataflow::scopes::Child;
use ::timely::progress::timestamp::Refines;
use timely::Container;

Check failure on line 78 in src/operators/arrange/arrangement.rs

View workflow job for this annotation

GitHub Actions / test mdBook

unresolved import `timely::Container`

Check failure on line 78 in src/operators/arrange/arrangement.rs

View workflow job for this annotation

GitHub Actions / cargo test on ubuntu, rust stable

unresolved import `timely::Container`

Check failure on line 78 in src/operators/arrange/arrangement.rs

View workflow job for this annotation

GitHub Actions / cargo test on ubuntu, rust 1.72

unresolved import `timely::Container`

Check failure on line 78 in src/operators/arrange/arrangement.rs

View workflow job for this annotation

GitHub Actions / cargo test on macos, rust stable

unresolved import `timely::Container`
use timely::container::PushInto;

Check failure on line 79 in src/operators/arrange/arrangement.rs

View workflow job for this annotation

GitHub Actions / test mdBook

unresolved import `timely::container`

Check failure on line 79 in src/operators/arrange/arrangement.rs

View workflow job for this annotation

GitHub Actions / cargo test on ubuntu, rust stable

unresolved import `timely::container`

Check failure on line 79 in src/operators/arrange/arrangement.rs

View workflow job for this annotation

GitHub Actions / cargo test on ubuntu, rust 1.72

unresolved import `timely::container`

Check failure on line 79 in src/operators/arrange/arrangement.rs

View workflow job for this annotation

GitHub Actions / cargo test on macos, rust stable

unresolved import `timely::container`

impl<G, Tr> Arranged<G, Tr>
where
Expand Down Expand Up @@ -292,7 +294,8 @@ where
F: Fn(T2::Val<'_>) -> V + 'static,
T2::Diff: Abelian,
T2::Batch: Batch,
T2::Builder: Builder<Input = ((T1::KeyOwned, V), T2::Time, T2::Diff)>,
<T2::Builder as Builder>::Input: Container,
((T1::KeyOwned, V), T2::Time, T2::Diff): PushInto<<T2::Builder as Builder>::Input>,
L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(V, T2::Diff)>)+'static,
{
self.reduce_core::<_,V,F,T2>(name, from, move |key, input, output, change| {
Expand All @@ -311,7 +314,8 @@ where
V: Data,
F: Fn(T2::Val<'_>) -> V + 'static,
T2::Batch: Batch,
T2::Builder: Builder<Input = ((T1::KeyOwned,V), T2::Time, T2::Diff)>,
<T2::Builder as Builder>::Input: Container,
((T1::KeyOwned, V), T2::Time, T2::Diff): PushInto<<T2::Builder as Builder>::Input>,
L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(V, T2::Diff)>, &mut Vec<(V, T2::Diff)>)+'static,
{
use crate::operators::reduce::reduce_trace;
Expand Down
12 changes: 5 additions & 7 deletions src/operators/arrange/upsert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ where
F: Fn(Tr::Val<'_>) -> V + 'static,
Tr::Time: TotalOrder+ExchangeData,
Tr::Batch: Batch,
Tr::Builder: Builder<Input = ((Tr::KeyOwned, V), Tr::Time, Tr::Diff)>,
Tr::Builder: Builder<Input = Vec<((Tr::KeyOwned, V), Tr::Time, Tr::Diff)>>,
{
let mut reader: Option<TraceAgent<Tr>> = None;

Expand Down Expand Up @@ -244,14 +244,14 @@ where
let mut builder = Tr::Builder::new();
for (key, mut list) in to_process.drain(..) {

use trace::cursor::MyTrait;
use trace::cursor::IntoOwned;

// The prior value associated with the key.
let mut prev_value: Option<V> = None;

// Attempt to find the key in the trace.
trace_cursor.seek_key_owned(&trace_storage, &key);
if trace_cursor.get_key(&trace_storage).map(|k| k.equals(&key)).unwrap_or(false) {
trace_cursor.seek_key(&trace_storage, IntoOwned::borrow_as(&key));
if trace_cursor.get_key(&trace_storage).map(|k| k.eq(&IntoOwned::borrow_as(&key))).unwrap_or(false) {
// Determine the prior value associated with the key.
while let Some(val) = trace_cursor.get_val(&trace_storage) {
let mut count = 0;
Expand Down Expand Up @@ -282,9 +282,7 @@ where
}
// Must insert updates in (key, val, time) order.
updates.sort();
for update in updates.drain(..) {
builder.push(update);
}
builder.push(&mut updates);
}
let batch = builder.done(prev_frontier.clone(), upper.clone(), Antichain::from_elem(G::Timestamp::minimum()));
prev_frontier.clone_from(&upper);
Expand Down
8 changes: 4 additions & 4 deletions src/operators/consolidate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
use timely::dataflow::Scope;

use crate::{Collection, ExchangeData, Hashable};
use crate::consolidation::ConsolidatingContainerBuilder;
use crate::difference::Semigroup;

use crate::Data;
Expand Down Expand Up @@ -56,7 +57,7 @@ where
Tr::Batcher: Batcher<Input=Vec<((D,()),G::Timestamp,R)>>,
{
use crate::operators::arrange::arrangement::Arrange;
use crate::trace::cursor::MyTrait;
use crate::trace::cursor::IntoOwned;
self.map(|k| (k, ()))
.arrange_named::<Tr>(name)
.as_collection(|d, _| d.into_owned())
Expand Down Expand Up @@ -92,14 +93,13 @@ where
use crate::collection::AsCollection;

self.inner
.unary(Pipeline, "ConsolidateStream", |_cap, _info| {
.unary::<ConsolidatingContainerBuilder<_>, _, _, _>(Pipeline, "ConsolidateStream", |_cap, _info| {

let mut vector = Vec::new();
move |input, output| {
input.for_each(|time, data| {
data.swap(&mut vector);
crate::consolidation::consolidate_updates(&mut vector);
output.session(&time).give_container(&mut vector);
output.session_with_builder(&time).give_container(&mut vector);
})
}
})
Expand Down
2 changes: 1 addition & 1 deletion src/operators/count.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ where

move |input, output| {

use crate::trace::cursor::MyTrait;
use crate::trace::cursor::IntoOwned;
input.for_each(|capability, batches| {
batches.swap(&mut buffer);
let mut session = output.session(&capability);
Expand Down
Loading

0 comments on commit 4ba7bc2

Please sign in to comment.