Skip to content
This repository has been archived by the owner on Oct 30, 2024. It is now read-only.

Commit

Permalink
Merge pull request #13 from TimelyDataflow/master
Browse files Browse the repository at this point in the history
Upstream changes
  • Loading branch information
antiguru authored May 29, 2024
2 parents 4ba7bc2 + c8f8917 commit 05a27da
Show file tree
Hide file tree
Showing 46 changed files with 939 additions and 800 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ jobs:
- windows
toolchain:
- stable
- 1.72
- 1.78
name: cargo test on ${{ matrix.os }}, rust ${{ matrix.toolchain }}
runs-on: ${{ matrix.os }}-latest
steps:
Expand Down
6 changes: 3 additions & 3 deletions dogsdogsdogs/examples/delta_query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,17 +79,17 @@ fn main() {

// Prior technology
// dQ/dE1 := dE1(a,b), E2(b,c), E3(a,c)
let changes1 = propose(&changes, forward_key_neu.clone(), key2.clone(), Clone::clone);
let changes1 = propose(&changes, forward_key_neu.clone(), key2.clone());
let changes1 = validate(&changes1, forward_self_neu.clone(), key1.clone());
let changes1 = changes1.map(|((a,b),c)| (a,b,c));

// dQ/dE2 := dE2(b,c), E1(a,b), E3(a,c)
let changes2 = propose(&changes, reverse_key_alt.clone(), key1.clone(), Clone::clone);
let changes2 = propose(&changes, reverse_key_alt.clone(), key1.clone());
let changes2 = validate(&changes2, reverse_self_neu.clone(), key2.clone());
let changes2 = changes2.map(|((b,c),a)| (a,b,c));

// dQ/dE3 := dE3(a,c), E1(a,b), E2(b,c)
let changes3 = propose(&changes, forward_key_alt.clone(), key1.clone(), Clone::clone);
let changes3 = propose(&changes, forward_key_alt.clone(), key1.clone());
let changes3 = validate(&changes3, reverse_self_alt.clone(), key2.clone());
let changes3 = changes3.map(|((a,c),b)| (a,b,c));

Expand Down
8 changes: 5 additions & 3 deletions dogsdogsdogs/src/calculus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,16 @@ impl<G, D, R> Differentiate<G, D, R> for Collection<G, D, R>
where
G: Scope,
D: Data,
R: Abelian,
R: Abelian + 'static,
{
// For each (data, Alt(time), diff) we add a (data, Neu(time), -diff).
fn differentiate<'a>(&self, child: &Child<'a, G, AltNeu<G::Timestamp>>) -> Collection<Child<'a, G, AltNeu<G::Timestamp>>, D, R> {
self.enter(child)
.inner
.flat_map(|(data, time, diff)| {
let neu = (data.clone(), AltNeu::neu(time.time.clone()), diff.clone().negate());
let mut neg_diff = diff.clone();
neg_diff.negate();
let neu = (data.clone(), AltNeu::neu(time.time.clone()), neg_diff);
let alt = (data, time, diff);
Some(alt).into_iter().chain(Some(neu))
})
Expand All @@ -53,7 +55,7 @@ impl<'a, G, D, R> Integrate<G, D, R> for Collection<Child<'a, G, AltNeu<G::Times
where
G: Scope,
D: Data,
R: Abelian,
R: Abelian + 'static,
{
// We discard each `neu` variant and strip off the `alt` wrapper.
fn integrate(&self) -> Collection<G, D, R> {
Expand Down
4 changes: 2 additions & 2 deletions dogsdogsdogs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ impl<G, P, R> ProposeExtensionMethod<G, P, R> for Collection<G, P, R>
where
G: Scope,
P: ExchangeData+Ord,
R: Monoid+Multiply<Output = R>,
R: Monoid+Multiply<Output = R>+'static,
{
fn propose_using<PE>(&self, extender: &mut PE) -> Collection<G, (P, PE::Extension), R>
where
Expand Down Expand Up @@ -199,7 +199,7 @@ where

fn propose(&mut self, prefixes: &Collection<G, P, R>) -> Collection<G, (P, V), R> {
let propose = self.indices.propose_trace.import(&prefixes.scope());
operators::propose::propose(prefixes, propose, self.key_selector.clone(), |x| x.clone())
operators::propose::propose(prefixes, propose, self.key_selector.clone())
}

fn validate(&mut self, extensions: &Collection<G, (P, V), R>) -> Collection<G, (P, V), R> {
Expand Down
13 changes: 8 additions & 5 deletions dogsdogsdogs/src/operators/count.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,18 @@
use timely::dataflow::Scope;

use differential_dataflow::{ExchangeData, Collection, Hashable};
use differential_dataflow::difference::{Monoid, Multiply};
use differential_dataflow::difference::{Semigroup, Monoid, Multiply};
use differential_dataflow::operators::arrange::Arranged;
use differential_dataflow::trace::TraceReader;
use differential_dataflow::trace::cursor::IntoOwned;

/// Reports a number of extensions to a stream of prefixes.
///
/// This method takes as input a stream of `(prefix, count, index)` triples.
/// For each triple, it extracts a key using `key_selector`, and finds the
/// associated count in `arrangement`. If the found count is less than `count`,
/// the `count` and `index` fields are overwritten with their new values.
pub fn count<G, Tr, R, F, P>(
pub fn count<G, Tr, K, R, F, P>(
prefixes: &Collection<G, (P, usize, usize), R>,
arrangement: Arranged<G, Tr>,
key_selector: F,
Expand All @@ -20,15 +21,17 @@ pub fn count<G, Tr, R, F, P>(
where
G: Scope<Timestamp=Tr::Time>,
Tr: TraceReader<Diff=isize>+Clone+'static,
Tr::KeyOwned: Hashable + Default,
for<'a> Tr::Key<'a>: IntoOwned<'a, Owned = K>,
for<'a> Tr::Diff : Semigroup<Tr::DiffGat<'a>>,
K: Hashable + Ord + Default + 'static,
R: Monoid+Multiply<Output = R>+ExchangeData,
F: Fn(&P)->Tr::KeyOwned+Clone+'static,
F: Fn(&P)->K+Clone+'static,
P: ExchangeData,
{
crate::operators::lookup_map(
prefixes,
arrangement,
move |p: &(P,usize,usize), k: &mut Tr::KeyOwned| { *k = key_selector(&p.0); },
move |p: &(P,usize,usize), k: &mut K| { *k = key_selector(&p.0); },
move |(p,c,i), r, _, s| {
let s = *s as usize;
if *c < s { ((p.clone(), *c, *i), r.clone()) }
Expand Down
35 changes: 20 additions & 15 deletions dogsdogsdogs/src/operators/half_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ use differential_dataflow::lattice::Lattice;
use differential_dataflow::operators::arrange::Arranged;
use differential_dataflow::trace::{Cursor, TraceReader};
use differential_dataflow::consolidation::{consolidate, consolidate_updates};
use differential_dataflow::trace::cursor::IntoOwned;

/// A binary equijoin that responds to updates on only its first input.
///
Expand All @@ -68,27 +69,28 @@ use differential_dataflow::consolidation::{consolidate, consolidate_updates};
/// Notice that the time is hoisted up into data. The expectation is that
/// once out of the "delta flow region", the updates will be `delay`d to the
/// times specified in the payloads.
pub fn half_join<G, V, R, Tr, FF, CF, DOut, S>(
stream: &Collection<G, (Tr::KeyOwned, V, G::Timestamp), R>,
pub fn half_join<G, K, V, R, Tr, FF, CF, DOut, S>(
stream: &Collection<G, (K, V, G::Timestamp), R>,
arrangement: Arranged<G, Tr>,
frontier_func: FF,
comparison: CF,
mut output_func: S,
) -> Collection<G, (DOut, G::Timestamp), <R as Mul<Tr::Diff>>::Output>
where
G: Scope<Timestamp = Tr::Time>,
Tr::KeyOwned: Hashable + ExchangeData,
K: Hashable + ExchangeData,
V: ExchangeData,
R: ExchangeData + Monoid,
Tr: TraceReader+Clone+'static,
for<'a> Tr::Key<'a> : IntoOwned<'a, Owned = K>,
R: Mul<Tr::Diff>,
<R as Mul<Tr::Diff>>::Output: Semigroup,
FF: Fn(&G::Timestamp, &mut Antichain<G::Timestamp>) + 'static,
CF: Fn(&G::Timestamp, &G::Timestamp) -> bool + 'static,
CF: Fn(Tr::TimeGat<'_>, &G::Timestamp) -> bool + 'static,
DOut: Clone+'static,
S: FnMut(&Tr::KeyOwned, &V, Tr::Val<'_>)->DOut+'static,
S: FnMut(&K, &V, Tr::Val<'_>)->DOut+'static,
{
let output_func = move |k: &Tr::KeyOwned, v1: &V, v2: Tr::Val<'_>, initial: &G::Timestamp, time: &G::Timestamp, diff1: &R, diff2: &Tr::Diff| {
let output_func = move |k: &K, v1: &V, v2: Tr::Val<'_>, initial: &G::Timestamp, time: &G::Timestamp, diff1: &R, diff2: &Tr::Diff| {
let diff = diff1.clone() * diff2.clone();
let dout = (output_func(k, v1, v2), time.clone());
Some((dout, initial.clone(), diff))
Expand Down Expand Up @@ -120,8 +122,8 @@ where
/// yield control, as a function of the elapsed time and the number of matched
/// records. Note this is not the number of *output* records, owing mainly to
/// the number of matched records being easiest to record with low overhead.
pub fn half_join_internal_unsafe<G, V, R, Tr, FF, CF, DOut, ROut, Y, I, S>(
stream: &Collection<G, (Tr::KeyOwned, V, G::Timestamp), R>,
pub fn half_join_internal_unsafe<G, K, V, R, Tr, FF, CF, DOut, ROut, Y, I, S>(
stream: &Collection<G, (K, V, G::Timestamp), R>,
mut arrangement: Arranged<G, Tr>,
frontier_func: FF,
comparison: CF,
Expand All @@ -130,17 +132,18 @@ pub fn half_join_internal_unsafe<G, V, R, Tr, FF, CF, DOut, ROut, Y, I, S>(
) -> Collection<G, DOut, ROut>
where
G: Scope<Timestamp = Tr::Time>,
Tr::KeyOwned: Hashable + ExchangeData,
K: Hashable + ExchangeData,
V: ExchangeData,
R: ExchangeData + Monoid,
Tr: TraceReader+Clone+'static,
for<'a> Tr::Key<'a> : IntoOwned<'a, Owned = K>,
FF: Fn(&G::Timestamp, &mut Antichain<G::Timestamp>) + 'static,
CF: Fn(&G::Timestamp, &G::Timestamp) -> bool + 'static,
CF: Fn(Tr::TimeGat<'_>, &Tr::Time) -> bool + 'static,
DOut: Clone+'static,
ROut: Semigroup,
ROut: Semigroup + 'static,
Y: Fn(std::time::Instant, usize) -> bool + 'static,
I: IntoIterator<Item=(DOut, G::Timestamp, ROut)>,
S: FnMut(&Tr::KeyOwned, &V, Tr::Val<'_>, &G::Timestamp, &G::Timestamp, &R, &Tr::Diff)-> I + 'static,
S: FnMut(&K, &V, Tr::Val<'_>, &G::Timestamp, &G::Timestamp, &R, &Tr::Diff)-> I + 'static,
{
// No need to block physical merging for this operator.
arrangement.trace.set_physical_compaction(Antichain::new().borrow());
Expand All @@ -150,7 +153,7 @@ where
let mut stash = HashMap::new();
let mut buffer = Vec::new();

let exchange = Exchange::new(move |update: &((Tr::KeyOwned, V, G::Timestamp),G::Timestamp,R)| (update.0).0.hashed().into());
let exchange = Exchange::new(move |update: &((K, V, G::Timestamp),G::Timestamp,R)| (update.0).0.hashed().into());

// Stash for (time, diff) accumulation.
let mut output_buffer = Vec::new();
Expand Down Expand Up @@ -207,14 +210,16 @@ where
for &mut ((ref key, ref val1, ref time), ref initial, ref mut diff1) in proposals.iter_mut() {
// Use TOTAL ORDER to allow the release of `time`.
yielded = yielded || yield_function(timer, work);
if !yielded && !input2.frontier.frontier().iter().any(|t| comparison(t, initial)) {
if !yielded && !input2.frontier.frontier().iter().any(|t| comparison(<Tr::TimeGat<'_> as IntoOwned>::borrow_as(t), initial)) {
use differential_dataflow::trace::cursor::IntoOwned;
cursor.seek_key(&storage, IntoOwned::borrow_as(key));
if cursor.get_key(&storage) == Some(IntoOwned::borrow_as(key)) {
while let Some(val2) = cursor.get_val(&storage) {
cursor.map_times(&storage, |t, d| {
if comparison(t, initial) {
output_buffer.push((t.join(time), d.clone()))
let mut t = t.into_owned();
t.join_assign(time);
output_buffer.push((t, d.into_owned()))
}
});
consolidate(&mut output_buffer);
Expand Down
27 changes: 15 additions & 12 deletions dogsdogsdogs/src/operators/lookup_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,34 +7,37 @@ use timely::dataflow::operators::Operator;
use timely::progress::Antichain;

use differential_dataflow::{ExchangeData, Collection, AsCollection, Hashable};
use differential_dataflow::difference::{Semigroup, Monoid};
use differential_dataflow::difference::{IsZero, Semigroup, Monoid};
use differential_dataflow::operators::arrange::Arranged;
use differential_dataflow::trace::{Cursor, TraceReader};
use differential_dataflow::trace::cursor::IntoOwned;

/// Proposes extensions to a stream of prefixes.
///
/// This method takes a stream of prefixes and for each determines a
/// key with `key_selector` and then proposes all pair af the prefix
/// and values associated with the key in `arrangement`.
pub fn lookup_map<G, D, R, Tr, F, DOut, ROut, S>(
pub fn lookup_map<G, D, K, R, Tr, F, DOut, ROut, S>(
prefixes: &Collection<G, D, R>,
mut arrangement: Arranged<G, Tr>,
key_selector: F,
mut output_func: S,
supplied_key0: Tr::KeyOwned,
supplied_key1: Tr::KeyOwned,
supplied_key2: Tr::KeyOwned,
supplied_key0: K,
supplied_key1: K,
supplied_key2: K,
) -> Collection<G, DOut, ROut>
where
G: Scope<Timestamp=Tr::Time>,
Tr: TraceReader+Clone+'static,
Tr::KeyOwned: Hashable,
for<'a> Tr::Key<'a>: IntoOwned<'a, Owned = K>,
for<'a> Tr::Diff : Semigroup<Tr::DiffGat<'a>>,
K: Hashable + Ord + 'static,
Tr::Diff: Monoid+ExchangeData,
F: FnMut(&D, &mut Tr::KeyOwned)+Clone+'static,
F: FnMut(&D, &mut K)+Clone+'static,
D: ExchangeData,
R: ExchangeData+Monoid,
DOut: Clone+'static,
ROut: Monoid,
ROut: Monoid + 'static,
S: FnMut(&D, &R, Tr::Val<'_>, &Tr::Diff)->(DOut, ROut)+'static,
{
// No need to block physical merging for this operator.
Expand All @@ -48,14 +51,14 @@ where

let mut buffer = Vec::new();

let mut key: Tr::KeyOwned = supplied_key0;
let mut key: K = supplied_key0;
let exchange = Exchange::new(move |update: &(D,G::Timestamp,R)| {
logic1(&update.0, &mut key);
key.hashed().into()
});

let mut key1: Tr::KeyOwned = supplied_key1;
let mut key2: Tr::KeyOwned = supplied_key2;
let mut key1: K = supplied_key1;
let mut key2: K = supplied_key2;

prefixes.inner.binary_frontier(&propose_stream, exchange, Pipeline, "LookupMap", move |_,_| move |input1, input2, output| {

Expand Down Expand Up @@ -99,7 +102,7 @@ where
while let Some(value) = cursor.get_val(&storage) {
let mut count = Tr::Diff::zero();
cursor.map_times(&storage, |t, d| {
if t.less_equal(time) { count.plus_equals(d); }
if t.into_owned().less_equal(time) { count.plus_equals(&d); }
});
if !count.is_zero() {
let (dout, rout) = output_func(prefix, diff, value, &count);
Expand Down
33 changes: 18 additions & 15 deletions dogsdogsdogs/src/operators/propose.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
use timely::dataflow::Scope;

use differential_dataflow::{ExchangeData, Collection, Hashable};
use differential_dataflow::difference::{Monoid, Multiply};
use differential_dataflow::difference::{Semigroup, Monoid, Multiply};
use differential_dataflow::operators::arrange::Arranged;
use differential_dataflow::trace::TraceReader;
use differential_dataflow::trace::cursor::IntoOwned;

/// Proposes extensions to a prefix stream.
///
Expand All @@ -13,27 +14,28 @@ use differential_dataflow::trace::TraceReader;
/// create a join if the `prefixes` collection is also arranged and responds to changes that
/// `arrangement` undergoes. More complicated patterns are also appropriate, as in the case
/// of delta queries.
pub fn propose<G, Tr, F, P, V, VF>(
pub fn propose<G, Tr, K, F, P, V>(
prefixes: &Collection<G, P, Tr::Diff>,
arrangement: Arranged<G, Tr>,
key_selector: F,
val_from: VF,
) -> Collection<G, (P, V), Tr::Diff>
where
G: Scope<Timestamp=Tr::Time>,
Tr: TraceReader+Clone+'static,
Tr::KeyOwned: Hashable + Default,
for<'a> Tr::Key<'a> : IntoOwned<'a, Owned = K>,
K: Hashable + Default + Ord + 'static,
Tr::Diff: Monoid+Multiply<Output = Tr::Diff>+ExchangeData,
F: Fn(&P)->Tr::KeyOwned+Clone+'static,
for<'a> Tr::Diff : Semigroup<Tr::DiffGat<'a>>,
F: Fn(&P)->K+Clone+'static,
P: ExchangeData,
V: Clone + 'static,
VF: Fn(Tr::Val<'_>) -> V + 'static,
for<'a> Tr::Val<'a> : IntoOwned<'a, Owned = V>,
{
crate::operators::lookup_map(
prefixes,
arrangement,
move |p: &P, k: &mut Tr::KeyOwned | { *k = key_selector(p); },
move |prefix, diff, value, sum| ((prefix.clone(), val_from(value)), diff.clone().multiply(sum)),
move |p: &P, k: &mut K | { *k = key_selector(p); },
move |prefix, diff, value, sum| ((prefix.clone(), value.into_owned()), diff.clone().multiply(sum)),
Default::default(),
Default::default(),
Default::default(),
Expand All @@ -45,27 +47,28 @@ where
/// Unlike `propose`, this method does not scale the multiplicity of matched
/// prefixes by the number of matches in `arrangement`. This can be useful to
/// avoid the need to prepare an arrangement of distinct extensions.
pub fn propose_distinct<G, Tr, F, P, V, VF>(
pub fn propose_distinct<G, Tr, K, F, P, V>(
prefixes: &Collection<G, P, Tr::Diff>,
arrangement: Arranged<G, Tr>,
key_selector: F,
val_from: VF,
) -> Collection<G, (P, V), Tr::Diff>
where
G: Scope<Timestamp=Tr::Time>,
Tr: TraceReader+Clone+'static,
Tr::KeyOwned: Hashable + Default,
for<'a> Tr::Key<'a> : IntoOwned<'a, Owned = K>,
for<'a> Tr::Diff : Semigroup<Tr::DiffGat<'a>>,
K: Hashable + Default + Ord + 'static,
Tr::Diff: Monoid+Multiply<Output = Tr::Diff>+ExchangeData,
F: Fn(&P)->Tr::KeyOwned+Clone+'static,
F: Fn(&P)->K+Clone+'static,
P: ExchangeData,
V: Clone + 'static,
VF: Fn(Tr::Val<'_>) -> V + 'static,
for<'a> Tr::Val<'a> : IntoOwned<'a, Owned = V>,
{
crate::operators::lookup_map(
prefixes,
arrangement,
move |p: &P, k: &mut Tr::KeyOwned| { *k = key_selector(p); },
move |prefix, diff, value, _sum| ((prefix.clone(), val_from(value)), diff.clone()),
move |p: &P, k: &mut K| { *k = key_selector(p); },
move |prefix, diff, value, _sum| ((prefix.clone(), value.into_owned()), diff.clone()),
Default::default(),
Default::default(),
Default::default(),
Expand Down
Loading

0 comments on commit 05a27da

Please sign in to comment.