Skip to content
This repository has been archived by the owner on Oct 30, 2024. It is now read-only.

Commit

Permalink
Merge pull request #14 from TimelyDataflow/master
Browse files Browse the repository at this point in the history
Pull upstream changes
  • Loading branch information
antiguru authored Jun 13, 2024
2 parents 05a27da + bda68cf commit f051b82
Show file tree
Hide file tree
Showing 18 changed files with 1,402 additions and 621 deletions.
4 changes: 2 additions & 2 deletions examples/spines.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,8 @@ fn main() {
},
"flat" => {
use differential_dataflow::trace::implementations::ord_neu::FlatKeySpine;
let data = data.arrange::<FlatKeySpine<_,_,_>>();
let keys = keys.arrange::<FlatKeySpine<_,_,_>>();
let data = data.arrange::<FlatKeySpine<String,_,isize,_>>();
let keys = keys.arrange::<FlatKeySpine<String,_,isize,_>>();
keys.join_core(&data, |_k, (), ()| Option::<()>::None)
.probe_with(&mut probe);
}
Expand Down
247 changes: 138 additions & 109 deletions src/collection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,14 @@
use std::hash::Hash;

use timely::Container;
use timely::Data;
use timely::progress::Timestamp;
use timely::order::Product;
use timely::dataflow::scopes::{Child, child::Iterative};
use timely::dataflow::{Scope, Stream};
use timely::dataflow::Scope;
use timely::dataflow::operators::*;
use timely::dataflow::StreamCore;

use crate::difference::{Semigroup, Abelian, Multiply};
use crate::lattice::Lattice;
Expand All @@ -37,24 +39,142 @@ use crate::hashable::Hashable;
/// The `R` parameter represents the types of changes that the data undergo, and is most commonly (and
/// defaults to) `isize`, representing changes to the occurrence count of each record.
#[derive(Clone)]
pub struct Collection<G: Scope, D, R: Semigroup = isize> {
pub struct Collection<G: Scope, D, R = isize, C = Vec<(D, <G as ScopeParent>::Timestamp, R)>> {
/// The underlying timely dataflow stream.
///
/// This field is exposed to support direct timely dataflow manipulation when required, but it is
/// not intended to be the idiomatic way to work with the collection.
pub inner: Stream<G, (D, G::Timestamp, R)>
pub inner: timely::dataflow::StreamCore<G, C>,
/// Phantom data for unreferenced `D` and `R` types.
phantom: std::marker::PhantomData<(D, R)>,
}

impl<G: Scope, D: Data, R: Semigroup+'static> Collection<G, D, R> where G::Timestamp: Data {
impl<G: Scope, D, R, C> Collection<G, D, R, C> {
/// Creates a new Collection from a timely dataflow stream.
///
/// This method seems to be rarely used, with the `as_collection` method on streams being a more
/// idiomatic approach to convert timely streams to collections. Also, the `input::Input` trait
/// provides a `new_collection` method which will create a new collection for you without exposing
/// the underlying timely stream at all.
pub fn new(stream: Stream<G, (D, G::Timestamp, R)>) -> Collection<G, D, R> {
Collection { inner: stream }
pub fn new(stream: StreamCore<G, C>) -> Collection<G, D, R, C> {
Collection { inner: stream, phantom: std::marker::PhantomData }
}
}
impl<G: Scope, D, R, C: Container> Collection<G, D, R, C> {
/// Creates a new collection accumulating the contents of the two collections.
///
/// Despite the name, differential dataflow collections are unordered. This method is so named because the
/// implementation is the concatenation of the stream of updates, but it corresponds to the addition of the
/// two collections.
///
/// # Examples
///
/// ```
/// use differential_dataflow::input::Input;
///
/// ::timely::example(|scope| {
///
/// let data = scope.new_collection_from(1 .. 10).1;
///
/// let odds = data.filter(|x| x % 2 == 1);
/// let evens = data.filter(|x| x % 2 == 0);
///
/// odds.concat(&evens)
/// .assert_eq(&data);
/// });
/// ```
pub fn concat(&self, other: &Self) -> Self {
self.inner
.concat(&other.inner)
.as_collection()
}
/// Creates a new collection accumulating the contents of the two collections.
///
/// Despite the name, differential dataflow collections are unordered. This method is so named because the
/// implementation is the concatenation of the stream of updates, but it corresponds to the addition of the
/// two collections.
///
/// # Examples
///
/// ```
/// use differential_dataflow::input::Input;
///
/// ::timely::example(|scope| {
///
/// let data = scope.new_collection_from(1 .. 10).1;
///
/// let odds = data.filter(|x| x % 2 == 1);
/// let evens = data.filter(|x| x % 2 == 0);
///
/// odds.concatenate(Some(evens))
/// .assert_eq(&data);
/// });
/// ```
pub fn concatenate<I>(&self, sources: I) -> Self
where
I: IntoIterator<Item=Self>
{
self.inner
.concatenate(sources.into_iter().map(|x| x.inner))
.as_collection()
}
// Brings a Collection into a nested region.
///
/// This method is a specialization of `enter` to the case where the nested scope is a region.
/// It removes the need for an operator that adjusts the timestamp.
pub fn enter_region<'a>(&self, child: &Child<'a, G, <G as ScopeParent>::Timestamp>) -> Collection<Child<'a, G, <G as ScopeParent>::Timestamp>, D, R, C> {
self.inner
.enter(child)
.as_collection()
}
/// Applies a supplied function to each batch of updates.
///
/// This method is analogous to `inspect`, but operates on batches and reveals the timestamp of the
/// timely dataflow capability associated with the batch of updates. The observed batching depends
/// on how the system executes, and may vary run to run.
///
/// # Examples
///
/// ```
/// use differential_dataflow::input::Input;
///
/// ::timely::example(|scope| {
/// scope.new_collection_from(1 .. 10).1
/// .map_in_place(|x| *x *= 2)
/// .filter(|x| x % 2 == 1)
/// .inspect_container(|event| println!("event: {:?}", event));
/// });
/// ```
pub fn inspect_container<F>(&self, func: F) -> Self
where F: FnMut(Result<(&G::Timestamp, &C), &[G::Timestamp]>)+'static {
self.inner
.inspect_container(func)
.as_collection()
}
/// Attaches a timely dataflow probe to the output of a Collection.
///
/// This probe is used to determine when the state of the Collection has stabilized and can
/// be read out.
pub fn probe(&self) -> probe::Handle<G::Timestamp> {
self.inner
.probe()
}
/// Attaches a timely dataflow probe to the output of a Collection.
///
/// This probe is used to determine when the state of the Collection has stabilized and all updates observed.
/// In addition, a probe is also often use to limit the number of rounds of input in flight at any moment; a
/// computation can wait until the probe has caught up to the input before introducing more rounds of data, to
/// avoid swamping the system.
pub fn probe_with(&self, handle: &mut probe::Handle<G::Timestamp>) -> Self {
Self::new(self.inner.probe_with(handle))
}
/// The scope containing the underlying timely dataflow stream.
pub fn scope(&self) -> G {
self.inner.scope()
}
}

impl<G: Scope, D: Clone+'static, R: Clone+'static> Collection<G, D, R> {
/// Creates a new collection by applying the supplied function to each input element.
///
/// # Examples
Expand Down Expand Up @@ -146,63 +266,6 @@ impl<G: Scope, D: Data, R: Semigroup+'static> Collection<G, D, R> where G::Times
.filter(move |(data, _, _)| logic(data))
.as_collection()
}
/// Creates a new collection accumulating the contents of the two collections.
///
/// Despite the name, differential dataflow collections are unordered. This method is so named because the
/// implementation is the concatenation of the stream of updates, but it corresponds to the addition of the
/// two collections.
///
/// # Examples
///
/// ```
/// use differential_dataflow::input::Input;
///
/// ::timely::example(|scope| {
///
/// let data = scope.new_collection_from(1 .. 10).1;
///
/// let odds = data.filter(|x| x % 2 == 1);
/// let evens = data.filter(|x| x % 2 == 0);
///
/// odds.concat(&evens)
/// .assert_eq(&data);
/// });
/// ```
pub fn concat(&self, other: &Collection<G, D, R>) -> Collection<G, D, R> {
self.inner
.concat(&other.inner)
.as_collection()
}
/// Creates a new collection accumulating the contents of the two collections.
///
/// Despite the name, differential dataflow collections are unordered. This method is so named because the
/// implementation is the concatenation of the stream of updates, but it corresponds to the addition of the
/// two collections.
///
/// # Examples
///
/// ```
/// use differential_dataflow::input::Input;
///
/// ::timely::example(|scope| {
///
/// let data = scope.new_collection_from(1 .. 10).1;
///
/// let odds = data.filter(|x| x % 2 == 1);
/// let evens = data.filter(|x| x % 2 == 0);
///
/// odds.concatenate(Some(evens))
/// .assert_eq(&data);
/// });
/// ```
pub fn concatenate<I>(&self, sources: I) -> Collection<G, D, R>
where
I: IntoIterator<Item=Collection<G, D, R>>
{
self.inner
.concatenate(sources.into_iter().map(|x| x.inner))
.as_collection()
}
/// Replaces each record with another, with a new difference type.
///
/// This method is most commonly used to take records containing aggregatable data (e.g. numbers to be summed)
Expand Down Expand Up @@ -337,17 +400,6 @@ impl<G: Scope, D: Data, R: Semigroup+'static> Collection<G, D, R> where G::Times
.as_collection()
}

/// Brings a Collection into a nested region.
///
/// This method is a specialization of `enter` to the case where the nested scope is a region.
/// It removes the need for an operator that adjusts the timestamp.
pub fn enter_region<'a>(&self, child: &Child<'a, G, <G as ScopeParent>::Timestamp>) -> Collection<Child<'a, G, <G as ScopeParent>::Timestamp>, D, R>
{
self.inner
.enter(child)
.as_collection()
}

/// Delays each difference by a supplied function.
///
/// It is assumed that `func` only advances timestamps; this is not verified, and things may go horribly
Expand Down Expand Up @@ -418,25 +470,6 @@ impl<G: Scope, D: Data, R: Semigroup+'static> Collection<G, D, R> where G::Times
.inspect_batch(move |time, data| func(time, data))
.as_collection()
}
/// Attaches a timely dataflow probe to the output of a Collection.
///
/// This probe is used to determine when the state of the Collection has stabilized and can
/// be read out.
pub fn probe(&self) -> probe::Handle<G::Timestamp> {
self.inner
.probe()
}
/// Attaches a timely dataflow probe to the output of a Collection.
///
/// This probe is used to determine when the state of the Collection has stabilized and all updates observed.
/// In addition, a probe is also often use to limit the number of rounds of input in flight at any moment; a
/// computation can wait until the probe has caught up to the input before introducing more rounds of data, to
/// avoid swamping the system.
pub fn probe_with(&self, handle: &mut probe::Handle<G::Timestamp>) -> Collection<G, D, R> {
self.inner
.probe_with(handle)
.as_collection()
}

/// Assert if the collection is ever non-empty.
///
Expand All @@ -459,24 +492,19 @@ impl<G: Scope, D: Data, R: Semigroup+'static> Collection<G, D, R> where G::Times
/// ```
pub fn assert_empty(&self)
where D: crate::ExchangeData+Hashable,
R: crate::ExchangeData+Hashable,
R: crate::ExchangeData+Hashable + Semigroup,
G::Timestamp: Lattice+Ord,
{
self.consolidate()
.inspect(|x| panic!("Assertion failed: non-empty collection: {:?}", x));
}

/// The scope containing the underlying timely dataflow stream.
pub fn scope(&self) -> G {
self.inner.scope()
}
}

use timely::dataflow::scopes::ScopeParent;
use timely::progress::timestamp::Refines;

/// Methods requiring a nested scope.
impl<'a, G: Scope, T: Timestamp, D: Data, R: Semigroup+'static> Collection<Child<'a, G, T>, D, R>
impl<'a, G: Scope, T: Timestamp, D: Clone+'static, R: Clone+'static> Collection<Child<'a, G, T>, D, R>
where
T: Refines<<G as ScopeParent>::Timestamp>,
{
Expand Down Expand Up @@ -509,7 +537,7 @@ where
}

/// Methods requiring a region as the scope.
impl<'a, G: Scope, D: Data, R: Semigroup+'static> Collection<Child<'a, G, G::Timestamp>, D, R>
impl<'a, G: Scope, D: Clone+'static, R: Clone+'static> Collection<Child<'a, G, G::Timestamp>, D, R>
{
/// Returns the value of a Collection from a nested region to its containing scope.
///
Expand All @@ -523,7 +551,7 @@ impl<'a, G: Scope, D: Data, R: Semigroup+'static> Collection<Child<'a, G, G::Tim
}

/// Methods requiring an Abelian difference, to support negation.
impl<G: Scope, D: Data, R: Abelian+'static> Collection<G, D, R> where G::Timestamp: Data {
impl<G: Scope, D: Clone+'static, R: Abelian+'static> Collection<G, D, R> where G::Timestamp: Data {
/// Creates a new collection whose counts are the negation of those in the input.
///
/// This method is most commonly used with `concat` to get those element in one collection but not another.
Expand Down Expand Up @@ -589,14 +617,14 @@ impl<G: Scope, D: Data, R: Abelian+'static> Collection<G, D, R> where G::Timesta
}

/// Conversion to a differential dataflow Collection.
pub trait AsCollection<G: Scope, D: Data, R: Semigroup> {
pub trait AsCollection<G: Scope, D, R, C> {
/// Converts the type to a differential dataflow collection.
fn as_collection(&self) -> Collection<G, D, R>;
fn as_collection(&self) -> Collection<G, D, R, C>;
}

impl<G: Scope, D: Data, R: Semigroup+'static> AsCollection<G, D, R> for Stream<G, (D, G::Timestamp, R)> {
fn as_collection(&self) -> Collection<G, D, R> {
Collection::new(self.clone())
impl<G: Scope, D, R, C: Clone> AsCollection<G, D, R, C> for StreamCore<G, C> {
fn as_collection(&self) -> Collection<G, D, R, C> {
Collection::<G,D,R,C>::new(self.clone())
}
}

Expand All @@ -621,12 +649,13 @@ impl<G: Scope, D: Data, R: Semigroup+'static> AsCollection<G, D, R> for Stream<G
/// .assert_eq(&data);
/// });
/// ```
pub fn concatenate<G, D, R, I>(scope: &mut G, iterator: I) -> Collection<G, D, R>
pub fn concatenate<G, D, R, C, I>(scope: &mut G, iterator: I) -> Collection<G, D, R, C>
where
G: Scope,
D: Data,
R: Semigroup+'static,
I: IntoIterator<Item=Collection<G, D, R>>,
C: Container,
I: IntoIterator<Item=Collection<G, D, R, C>>,
{
scope
.concatenate(iterator.into_iter().map(|x| x.inner))
Expand Down
Loading

0 comments on commit f051b82

Please sign in to comment.