Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove KeyOwned #498

Merged
merged 5 commits into from
May 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ jobs:
- windows
toolchain:
- stable
- 1.72
- 1.78
name: cargo test on ${{ matrix.os }}, rust ${{ matrix.toolchain }}
runs-on: ${{ matrix.os }}-latest
steps:
Expand Down
6 changes: 3 additions & 3 deletions dogsdogsdogs/examples/delta_query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,17 +79,17 @@ fn main() {

// Prior technology
// dQ/dE1 := dE1(a,b), E2(b,c), E3(a,c)
let changes1 = propose(&changes, forward_key_neu.clone(), key2.clone(), Clone::clone);
let changes1 = propose(&changes, forward_key_neu.clone(), key2.clone());
let changes1 = validate(&changes1, forward_self_neu.clone(), key1.clone());
let changes1 = changes1.map(|((a,b),c)| (a,b,c));

// dQ/dE2 := dE2(b,c), E1(a,b), E3(a,c)
let changes2 = propose(&changes, reverse_key_alt.clone(), key1.clone(), Clone::clone);
let changes2 = propose(&changes, reverse_key_alt.clone(), key1.clone());
let changes2 = validate(&changes2, reverse_self_neu.clone(), key2.clone());
let changes2 = changes2.map(|((b,c),a)| (a,b,c));

// dQ/dE3 := dE3(a,c), E1(a,b), E2(b,c)
let changes3 = propose(&changes, forward_key_alt.clone(), key1.clone(), Clone::clone);
let changes3 = propose(&changes, forward_key_alt.clone(), key1.clone());
let changes3 = validate(&changes3, reverse_self_alt.clone(), key2.clone());
let changes3 = changes3.map(|((a,c),b)| (a,b,c));

Expand Down
2 changes: 1 addition & 1 deletion dogsdogsdogs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ where

fn propose(&mut self, prefixes: &Collection<G, P, R>) -> Collection<G, (P, V), R> {
let propose = self.indices.propose_trace.import(&prefixes.scope());
operators::propose::propose(prefixes, propose, self.key_selector.clone(), |x| x.clone())
operators::propose::propose(prefixes, propose, self.key_selector.clone())
}

fn validate(&mut self, extensions: &Collection<G, (P, V), R>) -> Collection<G, (P, V), R> {
Expand Down
10 changes: 6 additions & 4 deletions dogsdogsdogs/src/operators/count.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,15 @@ use differential_dataflow::{ExchangeData, Collection, Hashable};
use differential_dataflow::difference::{Monoid, Multiply};
use differential_dataflow::operators::arrange::Arranged;
use differential_dataflow::trace::TraceReader;
use differential_dataflow::trace::cursor::IntoOwned;

/// Reports a number of extensions to a stream of prefixes.
///
/// This method takes as input a stream of `(prefix, count, index)` triples.
/// For each triple, it extracts a key using `key_selector`, and finds the
/// associated count in `arrangement`. If the found count is less than `count`,
/// the `count` and `index` fields are overwritten with their new values.
pub fn count<G, Tr, R, F, P>(
pub fn count<G, Tr, K, R, F, P>(
prefixes: &Collection<G, (P, usize, usize), R>,
arrangement: Arranged<G, Tr>,
key_selector: F,
Expand All @@ -20,15 +21,16 @@ pub fn count<G, Tr, R, F, P>(
where
G: Scope<Timestamp=Tr::Time>,
Tr: TraceReader<Diff=isize>+Clone+'static,
Tr::KeyOwned: Hashable + Default,
for<'a> Tr::Key<'a>: IntoOwned<'a, Owned = K>,
K: Hashable + Ord + Default + 'static,
R: Monoid+Multiply<Output = R>+ExchangeData,
F: Fn(&P)->Tr::KeyOwned+Clone+'static,
F: Fn(&P)->K+Clone+'static,
P: ExchangeData,
{
crate::operators::lookup_map(
prefixes,
arrangement,
move |p: &(P,usize,usize), k: &mut Tr::KeyOwned| { *k = key_selector(&p.0); },
move |p: &(P,usize,usize), k: &mut K| { *k = key_selector(&p.0); },
move |(p,c,i), r, _, s| {
let s = *s as usize;
if *c < s { ((p.clone(), *c, *i), r.clone()) }
Expand Down
23 changes: 13 additions & 10 deletions dogsdogsdogs/src/operators/half_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ use differential_dataflow::lattice::Lattice;
use differential_dataflow::operators::arrange::Arranged;
use differential_dataflow::trace::{Cursor, TraceReader};
use differential_dataflow::consolidation::{consolidate, consolidate_updates};
use differential_dataflow::trace::cursor::IntoOwned;

/// A binary equijoin that responds to updates on only its first input.
///
Expand All @@ -68,27 +69,28 @@ use differential_dataflow::consolidation::{consolidate, consolidate_updates};
/// Notice that the time is hoisted up into data. The expectation is that
/// once out of the "delta flow region", the updates will be `delay`d to the
/// times specified in the payloads.
pub fn half_join<G, V, R, Tr, FF, CF, DOut, S>(
stream: &Collection<G, (Tr::KeyOwned, V, G::Timestamp), R>,
pub fn half_join<G, K, V, R, Tr, FF, CF, DOut, S>(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it correct that the K is also implied by the IntoOwned implementation? If yes, would it be possible to remove the type parameter? Just asking because it still shows up in many places, but better than what we had before, but I can imagine that Rust has a hard time to figure out the associated types.

Copy link
Member Author

@frankmcsherry frankmcsherry May 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think generally the issue is that we need to assert that there is one type that all lifetimes can be taken to. Each implements IntoOwned, but nothing ensures that there is a common target. If we could say for example for<'a, 'b> Key<'a>::Owned = Key<'b>::Owned or something like, I think you could avoid the K. So, it's less that it is a degree of freedom, and more that it is the rallying point for the common type.

stream: &Collection<G, (K, V, G::Timestamp), R>,
arrangement: Arranged<G, Tr>,
frontier_func: FF,
comparison: CF,
mut output_func: S,
) -> Collection<G, (DOut, G::Timestamp), <R as Mul<Tr::Diff>>::Output>
where
G: Scope<Timestamp = Tr::Time>,
Tr::KeyOwned: Hashable + ExchangeData,
K: Hashable + ExchangeData,
V: ExchangeData,
R: ExchangeData + Monoid,
Tr: TraceReader+Clone+'static,
for<'a> Tr::Key<'a> : IntoOwned<'a, Owned = K>,
R: Mul<Tr::Diff>,
<R as Mul<Tr::Diff>>::Output: Semigroup,
FF: Fn(&G::Timestamp, &mut Antichain<G::Timestamp>) + 'static,
CF: Fn(&G::Timestamp, &G::Timestamp) -> bool + 'static,
DOut: Clone+'static,
S: FnMut(&Tr::KeyOwned, &V, Tr::Val<'_>)->DOut+'static,
S: FnMut(&K, &V, Tr::Val<'_>)->DOut+'static,
{
let output_func = move |k: &Tr::KeyOwned, v1: &V, v2: Tr::Val<'_>, initial: &G::Timestamp, time: &G::Timestamp, diff1: &R, diff2: &Tr::Diff| {
let output_func = move |k: &K, v1: &V, v2: Tr::Val<'_>, initial: &G::Timestamp, time: &G::Timestamp, diff1: &R, diff2: &Tr::Diff| {
let diff = diff1.clone() * diff2.clone();
let dout = (output_func(k, v1, v2), time.clone());
Some((dout, initial.clone(), diff))
Expand Down Expand Up @@ -120,8 +122,8 @@ where
/// yield control, as a function of the elapsed time and the number of matched
/// records. Note this is not the number of *output* records, owing mainly to
/// the number of matched records being easiest to record with low overhead.
pub fn half_join_internal_unsafe<G, V, R, Tr, FF, CF, DOut, ROut, Y, I, S>(
stream: &Collection<G, (Tr::KeyOwned, V, G::Timestamp), R>,
pub fn half_join_internal_unsafe<G, K, V, R, Tr, FF, CF, DOut, ROut, Y, I, S>(
stream: &Collection<G, (K, V, G::Timestamp), R>,
mut arrangement: Arranged<G, Tr>,
frontier_func: FF,
comparison: CF,
Expand All @@ -130,17 +132,18 @@ pub fn half_join_internal_unsafe<G, V, R, Tr, FF, CF, DOut, ROut, Y, I, S>(
) -> Collection<G, DOut, ROut>
where
G: Scope<Timestamp = Tr::Time>,
Tr::KeyOwned: Hashable + ExchangeData,
K: Hashable + ExchangeData,
V: ExchangeData,
R: ExchangeData + Monoid,
Tr: TraceReader+Clone+'static,
for<'a> Tr::Key<'a> : IntoOwned<'a, Owned = K>,
FF: Fn(&G::Timestamp, &mut Antichain<G::Timestamp>) + 'static,
CF: Fn(&G::Timestamp, &G::Timestamp) -> bool + 'static,
DOut: Clone+'static,
ROut: Semigroup,
Y: Fn(std::time::Instant, usize) -> bool + 'static,
I: IntoIterator<Item=(DOut, G::Timestamp, ROut)>,
S: FnMut(&Tr::KeyOwned, &V, Tr::Val<'_>, &G::Timestamp, &G::Timestamp, &R, &Tr::Diff)-> I + 'static,
S: FnMut(&K, &V, Tr::Val<'_>, &G::Timestamp, &G::Timestamp, &R, &Tr::Diff)-> I + 'static,
{
// No need to block physical merging for this operator.
arrangement.trace.set_physical_compaction(Antichain::new().borrow());
Expand All @@ -150,7 +153,7 @@ where
let mut stash = HashMap::new();
let mut buffer = Vec::new();

let exchange = Exchange::new(move |update: &((Tr::KeyOwned, V, G::Timestamp),G::Timestamp,R)| (update.0).0.hashed().into());
let exchange = Exchange::new(move |update: &((K, V, G::Timestamp),G::Timestamp,R)| (update.0).0.hashed().into());

// Stash for (time, diff) accumulation.
let mut output_buffer = Vec::new();
Expand Down
20 changes: 11 additions & 9 deletions dogsdogsdogs/src/operators/lookup_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,27 +10,29 @@ use differential_dataflow::{ExchangeData, Collection, AsCollection, Hashable};
use differential_dataflow::difference::{Semigroup, Monoid};
use differential_dataflow::operators::arrange::Arranged;
use differential_dataflow::trace::{Cursor, TraceReader};
use differential_dataflow::trace::cursor::IntoOwned;

/// Proposes extensions to a stream of prefixes.
///
/// This method takes a stream of prefixes and for each determines a
/// key with `key_selector` and then proposes all pair af the prefix
/// and values associated with the key in `arrangement`.
pub fn lookup_map<G, D, R, Tr, F, DOut, ROut, S>(
pub fn lookup_map<G, D, K, R, Tr, F, DOut, ROut, S>(
prefixes: &Collection<G, D, R>,
mut arrangement: Arranged<G, Tr>,
key_selector: F,
mut output_func: S,
supplied_key0: Tr::KeyOwned,
supplied_key1: Tr::KeyOwned,
supplied_key2: Tr::KeyOwned,
supplied_key0: K,
supplied_key1: K,
supplied_key2: K,
) -> Collection<G, DOut, ROut>
where
G: Scope<Timestamp=Tr::Time>,
Tr: TraceReader+Clone+'static,
Tr::KeyOwned: Hashable,
for<'a> Tr::Key<'a>: IntoOwned<'a, Owned = K>,
K: Hashable + Ord + 'static,
Tr::Diff: Monoid+ExchangeData,
F: FnMut(&D, &mut Tr::KeyOwned)+Clone+'static,
F: FnMut(&D, &mut K)+Clone+'static,
D: ExchangeData,
R: ExchangeData+Monoid,
DOut: Clone+'static,
Expand All @@ -48,14 +50,14 @@ where

let mut buffer = Vec::new();

let mut key: Tr::KeyOwned = supplied_key0;
let mut key: K = supplied_key0;
let exchange = Exchange::new(move |update: &(D,G::Timestamp,R)| {
logic1(&update.0, &mut key);
key.hashed().into()
});

let mut key1: Tr::KeyOwned = supplied_key1;
let mut key2: Tr::KeyOwned = supplied_key2;
let mut key1: K = supplied_key1;
let mut key2: K = supplied_key2;

prefixes.inner.binary_frontier(&propose_stream, exchange, Pipeline, "LookupMap", move |_,_| move |input1, input2, output| {

Expand Down
29 changes: 15 additions & 14 deletions dogsdogsdogs/src/operators/propose.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use differential_dataflow::{ExchangeData, Collection, Hashable};
use differential_dataflow::difference::{Monoid, Multiply};
use differential_dataflow::operators::arrange::Arranged;
use differential_dataflow::trace::TraceReader;
use differential_dataflow::trace::cursor::IntoOwned;

/// Proposes extensions to a prefix stream.
///
Expand All @@ -13,27 +14,27 @@ use differential_dataflow::trace::TraceReader;
/// create a join if the `prefixes` collection is also arranged and responds to changes that
/// `arrangement` undergoes. More complicated patterns are also appropriate, as in the case
/// of delta queries.
pub fn propose<G, Tr, F, P, V, VF>(
pub fn propose<G, Tr, K, F, P, V>(
prefixes: &Collection<G, P, Tr::Diff>,
arrangement: Arranged<G, Tr>,
key_selector: F,
val_from: VF,
) -> Collection<G, (P, V), Tr::Diff>
where
G: Scope<Timestamp=Tr::Time>,
Tr: TraceReader+Clone+'static,
Tr::KeyOwned: Hashable + Default,
for<'a> Tr::Key<'a> : IntoOwned<'a, Owned = K>,
K: Hashable + Default + Ord + 'static,
Tr::Diff: Monoid+Multiply<Output = Tr::Diff>+ExchangeData,
F: Fn(&P)->Tr::KeyOwned+Clone+'static,
F: Fn(&P)->K+Clone+'static,
P: ExchangeData,
V: Clone + 'static,
VF: Fn(Tr::Val<'_>) -> V + 'static,
for<'a> Tr::Val<'a> : IntoOwned<'a, Owned = V>,
{
crate::operators::lookup_map(
prefixes,
arrangement,
move |p: &P, k: &mut Tr::KeyOwned | { *k = key_selector(p); },
move |prefix, diff, value, sum| ((prefix.clone(), val_from(value)), diff.clone().multiply(sum)),
move |p: &P, k: &mut K | { *k = key_selector(p); },
move |prefix, diff, value, sum| ((prefix.clone(), value.into_owned()), diff.clone().multiply(sum)),
Default::default(),
Default::default(),
Default::default(),
Expand All @@ -45,27 +46,27 @@ where
/// Unlike `propose`, this method does not scale the multiplicity of matched
/// prefixes by the number of matches in `arrangement`. This can be useful to
/// avoid the need to prepare an arrangement of distinct extensions.
pub fn propose_distinct<G, Tr, F, P, V, VF>(
pub fn propose_distinct<G, Tr, K, F, P, V>(
prefixes: &Collection<G, P, Tr::Diff>,
arrangement: Arranged<G, Tr>,
key_selector: F,
val_from: VF,
) -> Collection<G, (P, V), Tr::Diff>
where
G: Scope<Timestamp=Tr::Time>,
Tr: TraceReader+Clone+'static,
Tr::KeyOwned: Hashable + Default,
for<'a> Tr::Key<'a> : IntoOwned<'a, Owned = K>,
K: Hashable + Default + Ord + 'static,
Tr::Diff: Monoid+Multiply<Output = Tr::Diff>+ExchangeData,
F: Fn(&P)->Tr::KeyOwned+Clone+'static,
F: Fn(&P)->K+Clone+'static,
P: ExchangeData,
V: Clone + 'static,
VF: Fn(Tr::Val<'_>) -> V + 'static,
for<'a> Tr::Val<'a> : IntoOwned<'a, Owned = V>,
{
crate::operators::lookup_map(
prefixes,
arrangement,
move |p: &P, k: &mut Tr::KeyOwned| { *k = key_selector(p); },
move |prefix, diff, value, _sum| ((prefix.clone(), val_from(value)), diff.clone()),
move |p: &P, k: &mut K| { *k = key_selector(p); },
move |prefix, diff, value, _sum| ((prefix.clone(), value.into_owned()), diff.clone()),
Default::default(),
Default::default(),
Default::default(),
Expand Down
6 changes: 4 additions & 2 deletions dogsdogsdogs/src/operators/validate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use differential_dataflow::{ExchangeData, Collection};
use differential_dataflow::difference::{Monoid, Multiply};
use differential_dataflow::operators::arrange::Arranged;
use differential_dataflow::trace::TraceReader;
use differential_dataflow::trace::cursor::IntoOwned;

/// Proposes extensions to a stream of prefixes.
///
Expand All @@ -19,8 +20,9 @@ pub fn validate<G, K, V, Tr, F, P>(
) -> Collection<G, (P, V), Tr::Diff>
where
G: Scope<Timestamp=Tr::Time>,
Tr: TraceReader<KeyOwned=(K,V)>+Clone+'static,
K: Ord+Hash+Clone+Default,
Tr: TraceReader+Clone+'static,
for<'a> Tr::Key<'a> : IntoOwned<'a, Owned = (K, V)>,
K: Ord+Hash+Clone+Default + 'static,
V: ExchangeData+Hash+Default,
Tr::Diff: Monoid+Multiply<Output = Tr::Diff>+ExchangeData,
F: Fn(&P)->K+Clone+'static,
Expand Down
2 changes: 1 addition & 1 deletion examples/cursors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ fn main() {

/* Return trace content after the last round. */
let (mut cursor, storage) = graph_trace.cursor();
cursor.to_vec(Clone::clone, &storage)
cursor.to_vec(&storage)
})
.unwrap().join();

Expand Down
2 changes: 1 addition & 1 deletion examples/monoid-bfs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ where G::Timestamp: Lattice+Ord {
.join_map(&edges, |_k,&(),d| *d)
.concat(&roots)
.map(|x| (x,()))
.reduce_core::<_,_,KeySpine<_,_,_>>("Reduce", Clone::clone, |_key, input, output, updates| {
.reduce_core::<_,KeySpine<_,_,_>>("Reduce", |_key, input, output, updates| {
if output.is_empty() || input[0].1 < output[0].1 {
updates.push(((), input[0].1));
}
Expand Down
4 changes: 2 additions & 2 deletions examples/spines.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,11 @@ fn main() {
let data =
data.map(|x| (x.clone().into_bytes(), x.into_bytes()))
.arrange::<PreferredSpine<[u8],[u8],_,_>>()
.reduce_abelian::<_, _, _, PreferredSpine<[u8],(),_,_>>("distinct", |_| (), |_,_,output| output.push(((), 1)));
.reduce_abelian::<_, _, _, PreferredSpine<[u8],(),_,_>>("distinct", |_,_,output| output.push(((), 1)));
let keys =
keys.map(|x| (x.clone().into_bytes(), 7))
.arrange::<PreferredSpine<[u8],u8,_,_>>()
.reduce_abelian::<_, _, _, PreferredSpine<[u8],(),_,_>>("distinct", |_| (), |_,_,output| output.push(((), 1)));
.reduce_abelian::<_, _, _, PreferredSpine<[u8],(),_,_>>("distinct", |_,_,output| output.push(((), 1)));

keys.join_core(&data, |_k, &(), &()| Option::<()>::None)
.probe_with(&mut probe);
Expand Down
2 changes: 1 addition & 1 deletion src/algorithms/graphs/propagate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ where
let labels =
proposals
.concat(&nodes)
.reduce_abelian::<_,_,ValSpine<_,_,_,_>>("Propagate", |v| v.clone(), |_, s, t| t.push((s[0].0.clone(), R::from(1_i8))));
.reduce_abelian::<_,ValSpine<_,_,_,_>>("Propagate", |_, s, t| t.push((s[0].0.clone(), R::from(1_i8))));

let propagate: Collection<_, (N, L), R> =
labels
Expand Down
1 change: 0 additions & 1 deletion src/operators/arrange/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ where
Tr: TraceReader,
{
type Key<'a> = Tr::Key<'a>;
type KeyOwned = Tr::KeyOwned;
type Val<'a> = Tr::Val<'a>;
type Time = Tr::Time;
type Diff = Tr::Diff;
Expand Down
Loading