Skip to content
This repository has been archived by the owner on Oct 30, 2024. It is now read-only.

Upstream changes #13

Merged
merged 9 commits into from
May 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ jobs:
- windows
toolchain:
- stable
- 1.72
- 1.78
name: cargo test on ${{ matrix.os }}, rust ${{ matrix.toolchain }}
runs-on: ${{ matrix.os }}-latest
steps:
Expand Down
6 changes: 3 additions & 3 deletions dogsdogsdogs/examples/delta_query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,17 +79,17 @@ fn main() {

// Prior technology
// dQ/dE1 := dE1(a,b), E2(b,c), E3(a,c)
let changes1 = propose(&changes, forward_key_neu.clone(), key2.clone(), Clone::clone);
let changes1 = propose(&changes, forward_key_neu.clone(), key2.clone());
let changes1 = validate(&changes1, forward_self_neu.clone(), key1.clone());
let changes1 = changes1.map(|((a,b),c)| (a,b,c));

// dQ/dE2 := dE2(b,c), E1(a,b), E3(a,c)
let changes2 = propose(&changes, reverse_key_alt.clone(), key1.clone(), Clone::clone);
let changes2 = propose(&changes, reverse_key_alt.clone(), key1.clone());
let changes2 = validate(&changes2, reverse_self_neu.clone(), key2.clone());
let changes2 = changes2.map(|((b,c),a)| (a,b,c));

// dQ/dE3 := dE3(a,c), E1(a,b), E2(b,c)
let changes3 = propose(&changes, forward_key_alt.clone(), key1.clone(), Clone::clone);
let changes3 = propose(&changes, forward_key_alt.clone(), key1.clone());
let changes3 = validate(&changes3, reverse_self_alt.clone(), key2.clone());
let changes3 = changes3.map(|((a,c),b)| (a,b,c));

Expand Down
8 changes: 5 additions & 3 deletions dogsdogsdogs/src/calculus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,16 @@ impl<G, D, R> Differentiate<G, D, R> for Collection<G, D, R>
where
G: Scope,
D: Data,
R: Abelian,
R: Abelian + 'static,
{
// For each (data, Alt(time), diff) we add a (data, Neu(time), -diff).
fn differentiate<'a>(&self, child: &Child<'a, G, AltNeu<G::Timestamp>>) -> Collection<Child<'a, G, AltNeu<G::Timestamp>>, D, R> {
self.enter(child)
.inner
.flat_map(|(data, time, diff)| {
let neu = (data.clone(), AltNeu::neu(time.time.clone()), diff.clone().negate());
let mut neg_diff = diff.clone();
neg_diff.negate();
let neu = (data.clone(), AltNeu::neu(time.time.clone()), neg_diff);
let alt = (data, time, diff);
Some(alt).into_iter().chain(Some(neu))
})
Expand All @@ -53,7 +55,7 @@ impl<'a, G, D, R> Integrate<G, D, R> for Collection<Child<'a, G, AltNeu<G::Times
where
G: Scope,
D: Data,
R: Abelian,
R: Abelian + 'static,
{
// We discard each `neu` variant and strip off the `alt` wrapper.
fn integrate(&self) -> Collection<G, D, R> {
Expand Down
4 changes: 2 additions & 2 deletions dogsdogsdogs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ impl<G, P, R> ProposeExtensionMethod<G, P, R> for Collection<G, P, R>
where
G: Scope,
P: ExchangeData+Ord,
R: Monoid+Multiply<Output = R>,
R: Monoid+Multiply<Output = R>+'static,
{
fn propose_using<PE>(&self, extender: &mut PE) -> Collection<G, (P, PE::Extension), R>
where
Expand Down Expand Up @@ -199,7 +199,7 @@ where

fn propose(&mut self, prefixes: &Collection<G, P, R>) -> Collection<G, (P, V), R> {
let propose = self.indices.propose_trace.import(&prefixes.scope());
operators::propose::propose(prefixes, propose, self.key_selector.clone(), |x| x.clone())
operators::propose::propose(prefixes, propose, self.key_selector.clone())
}

fn validate(&mut self, extensions: &Collection<G, (P, V), R>) -> Collection<G, (P, V), R> {
Expand Down
13 changes: 8 additions & 5 deletions dogsdogsdogs/src/operators/count.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,18 @@
use timely::dataflow::Scope;

use differential_dataflow::{ExchangeData, Collection, Hashable};
use differential_dataflow::difference::{Monoid, Multiply};
use differential_dataflow::difference::{Semigroup, Monoid, Multiply};
use differential_dataflow::operators::arrange::Arranged;
use differential_dataflow::trace::TraceReader;
use differential_dataflow::trace::cursor::IntoOwned;

/// Reports a number of extensions to a stream of prefixes.
///
/// This method takes as input a stream of `(prefix, count, index)` triples.
/// For each triple, it extracts a key using `key_selector`, and finds the
/// associated count in `arrangement`. If the found count is less than `count`,
/// the `count` and `index` fields are overwritten with their new values.
pub fn count<G, Tr, R, F, P>(
pub fn count<G, Tr, K, R, F, P>(
prefixes: &Collection<G, (P, usize, usize), R>,
arrangement: Arranged<G, Tr>,
key_selector: F,
Expand All @@ -20,15 +21,17 @@ pub fn count<G, Tr, R, F, P>(
where
G: Scope<Timestamp=Tr::Time>,
Tr: TraceReader<Diff=isize>+Clone+'static,
Tr::KeyOwned: Hashable + Default,
for<'a> Tr::Key<'a>: IntoOwned<'a, Owned = K>,
for<'a> Tr::Diff : Semigroup<Tr::DiffGat<'a>>,
K: Hashable + Ord + Default + 'static,
R: Monoid+Multiply<Output = R>+ExchangeData,
F: Fn(&P)->Tr::KeyOwned+Clone+'static,
F: Fn(&P)->K+Clone+'static,
P: ExchangeData,
{
crate::operators::lookup_map(
prefixes,
arrangement,
move |p: &(P,usize,usize), k: &mut Tr::KeyOwned| { *k = key_selector(&p.0); },
move |p: &(P,usize,usize), k: &mut K| { *k = key_selector(&p.0); },
move |(p,c,i), r, _, s| {
let s = *s as usize;
if *c < s { ((p.clone(), *c, *i), r.clone()) }
Expand Down
35 changes: 20 additions & 15 deletions dogsdogsdogs/src/operators/half_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ use differential_dataflow::lattice::Lattice;
use differential_dataflow::operators::arrange::Arranged;
use differential_dataflow::trace::{Cursor, TraceReader};
use differential_dataflow::consolidation::{consolidate, consolidate_updates};
use differential_dataflow::trace::cursor::IntoOwned;

/// A binary equijoin that responds to updates on only its first input.
///
Expand All @@ -68,27 +69,28 @@ use differential_dataflow::consolidation::{consolidate, consolidate_updates};
/// Notice that the time is hoisted up into data. The expectation is that
/// once out of the "delta flow region", the updates will be `delay`d to the
/// times specified in the payloads.
pub fn half_join<G, V, R, Tr, FF, CF, DOut, S>(
stream: &Collection<G, (Tr::KeyOwned, V, G::Timestamp), R>,
pub fn half_join<G, K, V, R, Tr, FF, CF, DOut, S>(
stream: &Collection<G, (K, V, G::Timestamp), R>,
arrangement: Arranged<G, Tr>,
frontier_func: FF,
comparison: CF,
mut output_func: S,
) -> Collection<G, (DOut, G::Timestamp), <R as Mul<Tr::Diff>>::Output>
where
G: Scope<Timestamp = Tr::Time>,
Tr::KeyOwned: Hashable + ExchangeData,
K: Hashable + ExchangeData,
V: ExchangeData,
R: ExchangeData + Monoid,
Tr: TraceReader+Clone+'static,
for<'a> Tr::Key<'a> : IntoOwned<'a, Owned = K>,
R: Mul<Tr::Diff>,
<R as Mul<Tr::Diff>>::Output: Semigroup,
FF: Fn(&G::Timestamp, &mut Antichain<G::Timestamp>) + 'static,
CF: Fn(&G::Timestamp, &G::Timestamp) -> bool + 'static,
CF: Fn(Tr::TimeGat<'_>, &G::Timestamp) -> bool + 'static,
DOut: Clone+'static,
S: FnMut(&Tr::KeyOwned, &V, Tr::Val<'_>)->DOut+'static,
S: FnMut(&K, &V, Tr::Val<'_>)->DOut+'static,
{
let output_func = move |k: &Tr::KeyOwned, v1: &V, v2: Tr::Val<'_>, initial: &G::Timestamp, time: &G::Timestamp, diff1: &R, diff2: &Tr::Diff| {
let output_func = move |k: &K, v1: &V, v2: Tr::Val<'_>, initial: &G::Timestamp, time: &G::Timestamp, diff1: &R, diff2: &Tr::Diff| {
let diff = diff1.clone() * diff2.clone();
let dout = (output_func(k, v1, v2), time.clone());
Some((dout, initial.clone(), diff))
Expand Down Expand Up @@ -120,8 +122,8 @@ where
/// yield control, as a function of the elapsed time and the number of matched
/// records. Note this is not the number of *output* records, owing mainly to
/// the number of matched records being easiest to record with low overhead.
pub fn half_join_internal_unsafe<G, V, R, Tr, FF, CF, DOut, ROut, Y, I, S>(
stream: &Collection<G, (Tr::KeyOwned, V, G::Timestamp), R>,
pub fn half_join_internal_unsafe<G, K, V, R, Tr, FF, CF, DOut, ROut, Y, I, S>(
stream: &Collection<G, (K, V, G::Timestamp), R>,
mut arrangement: Arranged<G, Tr>,
frontier_func: FF,
comparison: CF,
Expand All @@ -130,17 +132,18 @@ pub fn half_join_internal_unsafe<G, V, R, Tr, FF, CF, DOut, ROut, Y, I, S>(
) -> Collection<G, DOut, ROut>
where
G: Scope<Timestamp = Tr::Time>,
Tr::KeyOwned: Hashable + ExchangeData,
K: Hashable + ExchangeData,
V: ExchangeData,
R: ExchangeData + Monoid,
Tr: TraceReader+Clone+'static,
for<'a> Tr::Key<'a> : IntoOwned<'a, Owned = K>,
FF: Fn(&G::Timestamp, &mut Antichain<G::Timestamp>) + 'static,
CF: Fn(&G::Timestamp, &G::Timestamp) -> bool + 'static,
CF: Fn(Tr::TimeGat<'_>, &Tr::Time) -> bool + 'static,
DOut: Clone+'static,
ROut: Semigroup,
ROut: Semigroup + 'static,
Y: Fn(std::time::Instant, usize) -> bool + 'static,
I: IntoIterator<Item=(DOut, G::Timestamp, ROut)>,
S: FnMut(&Tr::KeyOwned, &V, Tr::Val<'_>, &G::Timestamp, &G::Timestamp, &R, &Tr::Diff)-> I + 'static,
S: FnMut(&K, &V, Tr::Val<'_>, &G::Timestamp, &G::Timestamp, &R, &Tr::Diff)-> I + 'static,
{
// No need to block physical merging for this operator.
arrangement.trace.set_physical_compaction(Antichain::new().borrow());
Expand All @@ -150,7 +153,7 @@ where
let mut stash = HashMap::new();
let mut buffer = Vec::new();

let exchange = Exchange::new(move |update: &((Tr::KeyOwned, V, G::Timestamp),G::Timestamp,R)| (update.0).0.hashed().into());
let exchange = Exchange::new(move |update: &((K, V, G::Timestamp),G::Timestamp,R)| (update.0).0.hashed().into());

// Stash for (time, diff) accumulation.
let mut output_buffer = Vec::new();
Expand Down Expand Up @@ -207,14 +210,16 @@ where
for &mut ((ref key, ref val1, ref time), ref initial, ref mut diff1) in proposals.iter_mut() {
// Use TOTAL ORDER to allow the release of `time`.
yielded = yielded || yield_function(timer, work);
if !yielded && !input2.frontier.frontier().iter().any(|t| comparison(t, initial)) {
if !yielded && !input2.frontier.frontier().iter().any(|t| comparison(<Tr::TimeGat<'_> as IntoOwned>::borrow_as(t), initial)) {
use differential_dataflow::trace::cursor::IntoOwned;
cursor.seek_key(&storage, IntoOwned::borrow_as(key));
if cursor.get_key(&storage) == Some(IntoOwned::borrow_as(key)) {
while let Some(val2) = cursor.get_val(&storage) {
cursor.map_times(&storage, |t, d| {
if comparison(t, initial) {
output_buffer.push((t.join(time), d.clone()))
let mut t = t.into_owned();
t.join_assign(time);
output_buffer.push((t, d.into_owned()))
}
});
consolidate(&mut output_buffer);
Expand Down
27 changes: 15 additions & 12 deletions dogsdogsdogs/src/operators/lookup_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,34 +7,37 @@ use timely::dataflow::operators::Operator;
use timely::progress::Antichain;

use differential_dataflow::{ExchangeData, Collection, AsCollection, Hashable};
use differential_dataflow::difference::{Semigroup, Monoid};
use differential_dataflow::difference::{IsZero, Semigroup, Monoid};
use differential_dataflow::operators::arrange::Arranged;
use differential_dataflow::trace::{Cursor, TraceReader};
use differential_dataflow::trace::cursor::IntoOwned;

/// Proposes extensions to a stream of prefixes.
///
/// This method takes a stream of prefixes and for each determines a
/// key with `key_selector` and then proposes all pair af the prefix
/// and values associated with the key in `arrangement`.
pub fn lookup_map<G, D, R, Tr, F, DOut, ROut, S>(
pub fn lookup_map<G, D, K, R, Tr, F, DOut, ROut, S>(
prefixes: &Collection<G, D, R>,
mut arrangement: Arranged<G, Tr>,
key_selector: F,
mut output_func: S,
supplied_key0: Tr::KeyOwned,
supplied_key1: Tr::KeyOwned,
supplied_key2: Tr::KeyOwned,
supplied_key0: K,
supplied_key1: K,
supplied_key2: K,
) -> Collection<G, DOut, ROut>
where
G: Scope<Timestamp=Tr::Time>,
Tr: TraceReader+Clone+'static,
Tr::KeyOwned: Hashable,
for<'a> Tr::Key<'a>: IntoOwned<'a, Owned = K>,
for<'a> Tr::Diff : Semigroup<Tr::DiffGat<'a>>,
K: Hashable + Ord + 'static,
Tr::Diff: Monoid+ExchangeData,
F: FnMut(&D, &mut Tr::KeyOwned)+Clone+'static,
F: FnMut(&D, &mut K)+Clone+'static,
D: ExchangeData,
R: ExchangeData+Monoid,
DOut: Clone+'static,
ROut: Monoid,
ROut: Monoid + 'static,
S: FnMut(&D, &R, Tr::Val<'_>, &Tr::Diff)->(DOut, ROut)+'static,
{
// No need to block physical merging for this operator.
Expand All @@ -48,14 +51,14 @@ where

let mut buffer = Vec::new();

let mut key: Tr::KeyOwned = supplied_key0;
let mut key: K = supplied_key0;
let exchange = Exchange::new(move |update: &(D,G::Timestamp,R)| {
logic1(&update.0, &mut key);
key.hashed().into()
});

let mut key1: Tr::KeyOwned = supplied_key1;
let mut key2: Tr::KeyOwned = supplied_key2;
let mut key1: K = supplied_key1;
let mut key2: K = supplied_key2;

prefixes.inner.binary_frontier(&propose_stream, exchange, Pipeline, "LookupMap", move |_,_| move |input1, input2, output| {

Expand Down Expand Up @@ -99,7 +102,7 @@ where
while let Some(value) = cursor.get_val(&storage) {
let mut count = Tr::Diff::zero();
cursor.map_times(&storage, |t, d| {
if t.less_equal(time) { count.plus_equals(d); }
if t.into_owned().less_equal(time) { count.plus_equals(&d); }
});
if !count.is_zero() {
let (dout, rout) = output_func(prefix, diff, value, &count);
Expand Down
33 changes: 18 additions & 15 deletions dogsdogsdogs/src/operators/propose.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
use timely::dataflow::Scope;

use differential_dataflow::{ExchangeData, Collection, Hashable};
use differential_dataflow::difference::{Monoid, Multiply};
use differential_dataflow::difference::{Semigroup, Monoid, Multiply};
use differential_dataflow::operators::arrange::Arranged;
use differential_dataflow::trace::TraceReader;
use differential_dataflow::trace::cursor::IntoOwned;

/// Proposes extensions to a prefix stream.
///
Expand All @@ -13,27 +14,28 @@ use differential_dataflow::trace::TraceReader;
/// create a join if the `prefixes` collection is also arranged and responds to changes that
/// `arrangement` undergoes. More complicated patterns are also appropriate, as in the case
/// of delta queries.
pub fn propose<G, Tr, F, P, V, VF>(
pub fn propose<G, Tr, K, F, P, V>(
prefixes: &Collection<G, P, Tr::Diff>,
arrangement: Arranged<G, Tr>,
key_selector: F,
val_from: VF,
) -> Collection<G, (P, V), Tr::Diff>
where
G: Scope<Timestamp=Tr::Time>,
Tr: TraceReader+Clone+'static,
Tr::KeyOwned: Hashable + Default,
for<'a> Tr::Key<'a> : IntoOwned<'a, Owned = K>,
K: Hashable + Default + Ord + 'static,
Tr::Diff: Monoid+Multiply<Output = Tr::Diff>+ExchangeData,
F: Fn(&P)->Tr::KeyOwned+Clone+'static,
for<'a> Tr::Diff : Semigroup<Tr::DiffGat<'a>>,
F: Fn(&P)->K+Clone+'static,
P: ExchangeData,
V: Clone + 'static,
VF: Fn(Tr::Val<'_>) -> V + 'static,
for<'a> Tr::Val<'a> : IntoOwned<'a, Owned = V>,
{
crate::operators::lookup_map(
prefixes,
arrangement,
move |p: &P, k: &mut Tr::KeyOwned | { *k = key_selector(p); },
move |prefix, diff, value, sum| ((prefix.clone(), val_from(value)), diff.clone().multiply(sum)),
move |p: &P, k: &mut K | { *k = key_selector(p); },
move |prefix, diff, value, sum| ((prefix.clone(), value.into_owned()), diff.clone().multiply(sum)),
Default::default(),
Default::default(),
Default::default(),
Expand All @@ -45,27 +47,28 @@ where
/// Unlike `propose`, this method does not scale the multiplicity of matched
/// prefixes by the number of matches in `arrangement`. This can be useful to
/// avoid the need to prepare an arrangement of distinct extensions.
pub fn propose_distinct<G, Tr, F, P, V, VF>(
pub fn propose_distinct<G, Tr, K, F, P, V>(
prefixes: &Collection<G, P, Tr::Diff>,
arrangement: Arranged<G, Tr>,
key_selector: F,
val_from: VF,
) -> Collection<G, (P, V), Tr::Diff>
where
G: Scope<Timestamp=Tr::Time>,
Tr: TraceReader+Clone+'static,
Tr::KeyOwned: Hashable + Default,
for<'a> Tr::Key<'a> : IntoOwned<'a, Owned = K>,
for<'a> Tr::Diff : Semigroup<Tr::DiffGat<'a>>,
K: Hashable + Default + Ord + 'static,
Tr::Diff: Monoid+Multiply<Output = Tr::Diff>+ExchangeData,
F: Fn(&P)->Tr::KeyOwned+Clone+'static,
F: Fn(&P)->K+Clone+'static,
P: ExchangeData,
V: Clone + 'static,
VF: Fn(Tr::Val<'_>) -> V + 'static,
for<'a> Tr::Val<'a> : IntoOwned<'a, Owned = V>,
{
crate::operators::lookup_map(
prefixes,
arrangement,
move |p: &P, k: &mut Tr::KeyOwned| { *k = key_selector(p); },
move |prefix, diff, value, _sum| ((prefix.clone(), val_from(value)), diff.clone()),
move |p: &P, k: &mut K| { *k = key_selector(p); },
move |prefix, diff, value, _sum| ((prefix.clone(), value.into_owned()), diff.clone()),
Default::default(),
Default::default(),
Default::default(),
Expand Down
Loading
Loading