Skip to content
This repository has been archived by the owner on Oct 30, 2024. It is now read-only.

Pull upstream changes #12

Merged
merged 8 commits into from
May 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions dogsdogsdogs/src/operators/half_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,9 +208,9 @@ where
// Use TOTAL ORDER to allow the release of `time`.
yielded = yielded || yield_function(timer, work);
if !yielded && !input2.frontier.frontier().iter().any(|t| comparison(t, initial)) {
use differential_dataflow::trace::cursor::MyTrait;
cursor.seek_key(&storage, MyTrait::borrow_as(key));
if cursor.get_key(&storage) == Some(MyTrait::borrow_as(key)) {
use differential_dataflow::trace::cursor::IntoOwned;
cursor.seek_key(&storage, IntoOwned::borrow_as(key));
if cursor.get_key(&storage) == Some(IntoOwned::borrow_as(key)) {
while let Some(val2) = cursor.get_val(&storage) {
cursor.map_times(&storage, |t, d| {
if comparison(t, initial) {
Expand Down
6 changes: 3 additions & 3 deletions dogsdogsdogs/src/operators/lookup_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,9 @@ where
for &mut (ref prefix, ref time, ref mut diff) in prefixes.iter_mut() {
if !input2.frontier.less_equal(time) {
logic2(prefix, &mut key1);
use differential_dataflow::trace::cursor::MyTrait;
cursor.seek_key(&storage, MyTrait::borrow_as(&key1));
if cursor.get_key(&storage) == Some(MyTrait::borrow_as(&key1)) {
use differential_dataflow::trace::cursor::IntoOwned;
cursor.seek_key(&storage, IntoOwned::borrow_as(&key1));
if cursor.get_key(&storage) == Some(IntoOwned::borrow_as(&key1)) {
while let Some(value) = cursor.get_val(&storage) {
let mut count = Tr::Diff::zero();
cursor.map_times(&storage, |t, d| {
Expand Down
21 changes: 9 additions & 12 deletions examples/spines.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,13 @@ fn main() {
keys.join_core(&data, |_k, &(), &()| Option::<()>::None)
.probe_with(&mut probe);
},
// "rhh" => {
// use differential_dataflow::trace::implementations::rhh::{HashWrapper, VecSpine};
// let data = data.map(|x| HashWrapper { inner: x }).arrange::<VecSpine<_,(),_,_>>();
// let keys = keys.map(|x| HashWrapper { inner: x }).arrange::<VecSpine<_,(),_,_>>();
// keys.join_core(&data, |_k, &(), &()| Option::<()>::None)
// .probe_with(&mut probe);
// },
"rhh" => {
use differential_dataflow::trace::implementations::rhh::{HashWrapper, VecSpine};
let data = data.map(|x| HashWrapper { inner: x }).arrange::<VecSpine<_,(),_,_>>();
let keys = keys.map(|x| HashWrapper { inner: x }).arrange::<VecSpine<_,(),_,_>>();
keys.join_core(&data, |_k, &(), &()| Option::<()>::None)
.probe_with(&mut probe);
},
"slc" => {

use differential_dataflow::trace::implementations::ord_neu::PreferredSpine;
Expand All @@ -61,10 +61,7 @@ fn main() {
.arrange::<PreferredSpine<[u8],u8,_,_>>()
.reduce_abelian::<_, _, _, PreferredSpine<[u8],(),_,_>>("distinct", |_| (), |_,_,output| output.push(((), 1)));

keys.join_core(&data, |k,_v1,_v2| {
println!("{:?}", k.text);
Option::<((),isize,isize)>::None
})
keys.join_core(&data, |_k, &(), &()| Option::<()>::None)
.probe_with(&mut probe);
},
_ => {
Expand Down Expand Up @@ -122,4 +119,4 @@ fn main() {

println!("{:?}\tshut down", timer2.elapsed());

}
}
4 changes: 2 additions & 2 deletions mdbook/src/chapter_0/chapter_0_0.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ version = "0.1.0"
authors = ["Your Name <[email protected]>"]

[dependencies]
timely = "0.11.1"
differential-dataflow = "0.11.0"
timely = "0.12.0"
differential-dataflow = "0.12.0"
```

You should only need to add those last two lines there, which bring in dependencies on both [timely dataflow](https://github.com/TimelyDataflow/timely-dataflow) and [differential dataflow](https://github.com/TimelyDataflow/differential-dataflow). We will be using both of those.
Expand Down
4 changes: 2 additions & 2 deletions mdbook/src/chapter_0/chapter_0_1.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ fn main() {
.inspect(|x| println!("{:?}", x));
});

// Set an arbitrary size for our organization.
let size = 100;
// Set a size for our organization from the input.
let size = std::env::args().nth(1).and_then(|s| s.parse::<u32>().ok()).unwrap_or(10);

// Load input (a binary tree).
input.advance_to(0);
Expand Down
2 changes: 1 addition & 1 deletion mdbook/src/chapter_0/chapter_0_3.md
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ We can then use this probe to limit the introduction of new data, by waiting for
This starts to print out a mess of data, indicating not only how long it takes to start up the computation, but also how long each individual round of updates takes.

``` ignore
Echidnatron% cargo run --release --example hello 10000000
Echidnatron% cargo run --release -- 10000000
Finished release [optimized + debuginfo] target(s) in 0.06s
Running `target/release/examples/hello 10000000`
4.092895186s data loaded
Expand Down
2 changes: 0 additions & 2 deletions src/algorithms/graphs/sequential.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,6 @@ where
F: Fn(&N, &[(&V, isize)])->V+'static
{

let _timer = ::std::time::Instant::now();

// start iteration with None messages for all.
state
.map(|(node, _state)| (node, None))
Expand Down
43 changes: 2 additions & 41 deletions src/capture.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,47 +42,6 @@ pub struct Progress<T> {
pub counts: Vec<(T, usize)>,
}

/// An iterator that yields with a `None` every so often.
pub struct YieldingIter<I> {
/// When set, a time after which we should return `None`.
start: Option<std::time::Instant>,
after: std::time::Duration,
iter: I,
}

impl<I> YieldingIter<I> {
/// Construct a yielding iterator from an inter-yield duration.
pub fn new_from(iter: I, yield_after: std::time::Duration) -> Self {
Self {
start: None,
after: yield_after,
iter,
}
}
}

impl<I: Iterator> Iterator for YieldingIter<I> {
type Item = I::Item;
fn next(&mut self) -> Option<Self::Item> {
if self.start.is_none() {
self.start = Some(std::time::Instant::now());
}
let start = self.start.as_ref().unwrap();
if start.elapsed() > self.after {
self.start = None;
None
} else {
match self.iter.next() {
Some(x) => Some(x),
None => {
self.start = None;
None
}
}
}
}
}

/// A simple sink for byte slices.
pub trait Writer<T> {
/// Returns an amount of time to wait before retrying, or `None` for success.
Expand Down Expand Up @@ -785,6 +744,8 @@ pub mod sink {
// {
// super::source::build(scope, |activator| {
// let source = KafkaSource::new(addr, topic, group, activator);
// // An iterator combinator that yields every "duration" even if more items exist.
// // The implementation of such an iterator exists in the git history, or can be rewritten easily.
// super::YieldingIter::new_from(Iter::<D,T,R>::new_from(source), std::time::Duration::from_millis(10))
// })
// }
Expand Down
37 changes: 29 additions & 8 deletions src/difference.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ pub use self::Abelian as Diff;
/// There is a light presumption of commutativity here, in that while we will largely perform addition
/// in order of timestamps, for many types of timestamps there is no total order and consequently no
/// obvious order to respect. Non-commutative semigroups should be used with care.
pub trait Semigroup : ::std::marker::Sized + Data + Clone {
pub trait Semigroup<Rhs: ?Sized = Self> : Data + Clone {
/// The method of `std::ops::AddAssign`, for types that do not implement `AddAssign`.
fn plus_equals(&mut self, rhs: &Self);
fn plus_equals(&mut self, rhs: &Rhs);
/// Returns true if the element is the additive identity.
///
/// This is primarily used by differential dataflow to know when it is safe to delete an update.
Expand Down Expand Up @@ -233,22 +233,43 @@ mod vector {

impl<R: Semigroup> Semigroup for Vec<R> {
fn plus_equals(&mut self, rhs: &Self) {
// Ensure sufficient length to receive addition.
self.plus_equals(&rhs[..])
}
fn is_zero(&self) -> bool {
self.iter().all(|x| x.is_zero())
}
}

impl<R: Semigroup> Semigroup<[R]> for Vec<R> {
fn plus_equals(&mut self, rhs: &[R]) {
// Apply all updates to existing elements
for (index, update) in rhs.iter().enumerate().take(self.len()) {
self[index].plus_equals(update);
}

// Clone leftover elements from `rhs`
while self.len() < rhs.len() {
let element = &rhs[self.len()];
self.push(element.clone());
}

// As other is not longer, apply updates without tests.
for (index, update) in rhs.iter().enumerate() {
self[index].plus_equals(update);
}
}
fn is_zero(&self) -> bool {
self.iter().all(|x| x.is_zero())
}
}

#[cfg(test)]
mod tests {
use crate::difference::Semigroup;

#[test]
fn test_semigroup_vec() {
let mut a = vec![1,2,3];
a.plus_equals([1,1,1,1].as_slice());
assert_eq!(vec![2,3,4,1], a);
}
}

impl<R: Monoid> Monoid for Vec<R> {
fn zero() -> Self {
Self::new()
Expand Down
8 changes: 6 additions & 2 deletions src/operators/arrange/arrangement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

use timely::dataflow::operators::{Enter, Map};
use timely::order::PartialOrder;
use timely::dataflow::{Scope, Stream, StreamCore};

Check failure on line 22 in src/operators/arrange/arrangement.rs

View workflow job for this annotation

GitHub Actions / test mdBook

unresolved import `timely::dataflow::StreamCore`

Check failure on line 22 in src/operators/arrange/arrangement.rs

View workflow job for this annotation

GitHub Actions / cargo test on macos, rust stable

unresolved import `timely::dataflow::StreamCore`

Check failure on line 22 in src/operators/arrange/arrangement.rs

View workflow job for this annotation

GitHub Actions / cargo test on ubuntu, rust stable

unresolved import `timely::dataflow::StreamCore`

Check failure on line 22 in src/operators/arrange/arrangement.rs

View workflow job for this annotation

GitHub Actions / cargo test on macos, rust 1.72

unresolved import `timely::dataflow::StreamCore`

Check failure on line 22 in src/operators/arrange/arrangement.rs

View workflow job for this annotation

GitHub Actions / cargo test on ubuntu, rust 1.72

unresolved import `timely::dataflow::StreamCore`
use timely::dataflow::operators::generic::Operator;
use timely::dataflow::channels::pact::{ParallelizationContract, Pipeline, Exchange};
use timely::progress::Timestamp;
Expand Down Expand Up @@ -75,6 +75,8 @@

use ::timely::dataflow::scopes::Child;
use ::timely::progress::timestamp::Refines;
use timely::Container;

Check failure on line 78 in src/operators/arrange/arrangement.rs

View workflow job for this annotation

GitHub Actions / test mdBook

unresolved import `timely::Container`

Check failure on line 78 in src/operators/arrange/arrangement.rs

View workflow job for this annotation

GitHub Actions / cargo test on macos, rust stable

unresolved import `timely::Container`

Check failure on line 78 in src/operators/arrange/arrangement.rs

View workflow job for this annotation

GitHub Actions / cargo test on ubuntu, rust stable

unresolved import `timely::Container`

Check failure on line 78 in src/operators/arrange/arrangement.rs

View workflow job for this annotation

GitHub Actions / cargo test on macos, rust 1.72

unresolved import `timely::Container`

Check failure on line 78 in src/operators/arrange/arrangement.rs

View workflow job for this annotation

GitHub Actions / cargo test on ubuntu, rust 1.72

unresolved import `timely::Container`
use timely::container::PushInto;

Check failure on line 79 in src/operators/arrange/arrangement.rs

View workflow job for this annotation

GitHub Actions / test mdBook

unresolved import `timely::container`

Check failure on line 79 in src/operators/arrange/arrangement.rs

View workflow job for this annotation

GitHub Actions / cargo test on macos, rust stable

unresolved import `timely::container`

Check failure on line 79 in src/operators/arrange/arrangement.rs

View workflow job for this annotation

GitHub Actions / cargo test on ubuntu, rust stable

unresolved import `timely::container`

Check failure on line 79 in src/operators/arrange/arrangement.rs

View workflow job for this annotation

GitHub Actions / cargo test on macos, rust 1.72

unresolved import `timely::container`

Check failure on line 79 in src/operators/arrange/arrangement.rs

View workflow job for this annotation

GitHub Actions / cargo test on ubuntu, rust 1.72

unresolved import `timely::container`

impl<G, Tr> Arranged<G, Tr>
where
Expand Down Expand Up @@ -292,7 +294,8 @@
F: Fn(T2::Val<'_>) -> V + 'static,
T2::Diff: Abelian,
T2::Batch: Batch,
T2::Builder: Builder<Input = ((T1::KeyOwned, V), T2::Time, T2::Diff)>,
<T2::Builder as Builder>::Input: Container,
((T1::KeyOwned, V), T2::Time, T2::Diff): PushInto<<T2::Builder as Builder>::Input>,
L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(V, T2::Diff)>)+'static,
{
self.reduce_core::<_,V,F,T2>(name, from, move |key, input, output, change| {
Expand All @@ -311,7 +314,8 @@
V: Data,
F: Fn(T2::Val<'_>) -> V + 'static,
T2::Batch: Batch,
T2::Builder: Builder<Input = ((T1::KeyOwned,V), T2::Time, T2::Diff)>,
<T2::Builder as Builder>::Input: Container,
((T1::KeyOwned, V), T2::Time, T2::Diff): PushInto<<T2::Builder as Builder>::Input>,
L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(V, T2::Diff)>, &mut Vec<(V, T2::Diff)>)+'static,
{
use crate::operators::reduce::reduce_trace;
Expand Down
12 changes: 5 additions & 7 deletions src/operators/arrange/upsert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ where
F: Fn(Tr::Val<'_>) -> V + 'static,
Tr::Time: TotalOrder+ExchangeData,
Tr::Batch: Batch,
Tr::Builder: Builder<Input = ((Tr::KeyOwned, V), Tr::Time, Tr::Diff)>,
Tr::Builder: Builder<Input = Vec<((Tr::KeyOwned, V), Tr::Time, Tr::Diff)>>,
{
let mut reader: Option<TraceAgent<Tr>> = None;

Expand Down Expand Up @@ -244,14 +244,14 @@ where
let mut builder = Tr::Builder::new();
for (key, mut list) in to_process.drain(..) {

use trace::cursor::MyTrait;
use trace::cursor::IntoOwned;

// The prior value associated with the key.
let mut prev_value: Option<V> = None;

// Attempt to find the key in the trace.
trace_cursor.seek_key_owned(&trace_storage, &key);
if trace_cursor.get_key(&trace_storage).map(|k| k.equals(&key)).unwrap_or(false) {
trace_cursor.seek_key(&trace_storage, IntoOwned::borrow_as(&key));
if trace_cursor.get_key(&trace_storage).map(|k| k.eq(&IntoOwned::borrow_as(&key))).unwrap_or(false) {
// Determine the prior value associated with the key.
while let Some(val) = trace_cursor.get_val(&trace_storage) {
let mut count = 0;
Expand Down Expand Up @@ -282,9 +282,7 @@ where
}
// Must insert updates in (key, val, time) order.
updates.sort();
for update in updates.drain(..) {
builder.push(update);
}
builder.push(&mut updates);
}
let batch = builder.done(prev_frontier.clone(), upper.clone(), Antichain::from_elem(G::Timestamp::minimum()));
prev_frontier.clone_from(&upper);
Expand Down
8 changes: 4 additions & 4 deletions src/operators/consolidate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
use timely::dataflow::Scope;

use crate::{Collection, ExchangeData, Hashable};
use crate::consolidation::ConsolidatingContainerBuilder;
use crate::difference::Semigroup;

use crate::Data;
Expand Down Expand Up @@ -56,7 +57,7 @@ where
Tr::Batcher: Batcher<Input=Vec<((D,()),G::Timestamp,R)>>,
{
use crate::operators::arrange::arrangement::Arrange;
use crate::trace::cursor::MyTrait;
use crate::trace::cursor::IntoOwned;
self.map(|k| (k, ()))
.arrange_named::<Tr>(name)
.as_collection(|d, _| d.into_owned())
Expand Down Expand Up @@ -92,14 +93,13 @@ where
use crate::collection::AsCollection;

self.inner
.unary(Pipeline, "ConsolidateStream", |_cap, _info| {
.unary::<ConsolidatingContainerBuilder<_>, _, _, _>(Pipeline, "ConsolidateStream", |_cap, _info| {

let mut vector = Vec::new();
move |input, output| {
input.for_each(|time, data| {
data.swap(&mut vector);
crate::consolidation::consolidate_updates(&mut vector);
output.session(&time).give_container(&mut vector);
output.session_with_builder(&time).give_container(&mut vector);
})
}
})
Expand Down
2 changes: 1 addition & 1 deletion src/operators/count.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ where

move |input, output| {

use crate::trace::cursor::MyTrait;
use crate::trace::cursor::IntoOwned;
input.for_each(|capability, batches| {
batches.swap(&mut buffer);
let mut session = output.session(&capability);
Expand Down
Loading
Loading