Skip to content

Commit

Permalink
Pivot Val<'_> -> V into IntoOwned
Browse files Browse the repository at this point in the history
  • Loading branch information
frankmcsherry committed May 25, 2024
1 parent 0f42bf3 commit 2fbb39b
Show file tree
Hide file tree
Showing 12 changed files with 54 additions and 59 deletions.
6 changes: 3 additions & 3 deletions dogsdogsdogs/examples/delta_query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,17 +79,17 @@ fn main() {

// Prior technology
// dQ/dE1 := dE1(a,b), E2(b,c), E3(a,c)
let changes1 = propose(&changes, forward_key_neu.clone(), key2.clone(), Clone::clone);
let changes1 = propose(&changes, forward_key_neu.clone(), key2.clone());
let changes1 = validate(&changes1, forward_self_neu.clone(), key1.clone());
let changes1 = changes1.map(|((a,b),c)| (a,b,c));

// dQ/dE2 := dE2(b,c), E1(a,b), E3(a,c)
let changes2 = propose(&changes, reverse_key_alt.clone(), key1.clone(), Clone::clone);
let changes2 = propose(&changes, reverse_key_alt.clone(), key1.clone());
let changes2 = validate(&changes2, reverse_self_neu.clone(), key2.clone());
let changes2 = changes2.map(|((b,c),a)| (a,b,c));

// dQ/dE3 := dE3(a,c), E1(a,b), E2(b,c)
let changes3 = propose(&changes, forward_key_alt.clone(), key1.clone(), Clone::clone);
let changes3 = propose(&changes, forward_key_alt.clone(), key1.clone());
let changes3 = validate(&changes3, reverse_self_alt.clone(), key2.clone());
let changes3 = changes3.map(|((a,c),b)| (a,b,c));

Expand Down
2 changes: 1 addition & 1 deletion dogsdogsdogs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ where

fn propose(&mut self, prefixes: &Collection<G, P, R>) -> Collection<G, (P, V), R> {
let propose = self.indices.propose_trace.import(&prefixes.scope());
operators::propose::propose(prefixes, propose, self.key_selector.clone(), |x| x.clone())
operators::propose::propose(prefixes, propose, self.key_selector.clone())
}

fn validate(&mut self, extensions: &Collection<G, (P, V), R>) -> Collection<G, (P, V), R> {
Expand Down
14 changes: 6 additions & 8 deletions dogsdogsdogs/src/operators/propose.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,10 @@ use differential_dataflow::trace::cursor::IntoOwned;
/// create a join if the `prefixes` collection is also arranged and responds to changes that
/// `arrangement` undergoes. More complicated patterns are also appropriate, as in the case
/// of delta queries.
pub fn propose<G, Tr, K, F, P, V, VF>(
pub fn propose<G, Tr, K, F, P, V>(
prefixes: &Collection<G, P, Tr::Diff>,
arrangement: Arranged<G, Tr>,
key_selector: F,
val_from: VF,
) -> Collection<G, (P, V), Tr::Diff>
where
G: Scope<Timestamp=Tr::Time>,
Expand All @@ -29,13 +28,13 @@ where
F: Fn(&P)->K+Clone+'static,
P: ExchangeData,
V: Clone + 'static,
VF: Fn(Tr::Val<'_>) -> V + 'static,
for<'a> Tr::Val<'a> : IntoOwned<'a, Owned = V>,
{
crate::operators::lookup_map(
prefixes,
arrangement,
move |p: &P, k: &mut K | { *k = key_selector(p); },
move |prefix, diff, value, sum| ((prefix.clone(), val_from(value)), diff.clone().multiply(sum)),
move |prefix, diff, value, sum| ((prefix.clone(), value.into_owned()), diff.clone().multiply(sum)),
Default::default(),
Default::default(),
Default::default(),
Expand All @@ -47,11 +46,10 @@ where
/// Unlike `propose`, this method does not scale the multiplicity of matched
/// prefixes by the number of matches in `arrangement`. This can be useful to
/// avoid the need to prepare an arrangement of distinct extensions.
pub fn propose_distinct<G, Tr, K, F, P, V, VF>(
pub fn propose_distinct<G, Tr, K, F, P, V>(
prefixes: &Collection<G, P, Tr::Diff>,
arrangement: Arranged<G, Tr>,
key_selector: F,
val_from: VF,
) -> Collection<G, (P, V), Tr::Diff>
where
G: Scope<Timestamp=Tr::Time>,
Expand All @@ -62,13 +60,13 @@ where
F: Fn(&P)->K+Clone+'static,
P: ExchangeData,
V: Clone + 'static,
VF: Fn(Tr::Val<'_>) -> V + 'static,
for<'a> Tr::Val<'a> : IntoOwned<'a, Owned = V>,
{
crate::operators::lookup_map(
prefixes,
arrangement,
move |p: &P, k: &mut K| { *k = key_selector(p); },
move |prefix, diff, value, _sum| ((prefix.clone(), val_from(value)), diff.clone()),
move |prefix, diff, value, _sum| ((prefix.clone(), value.into_owned()), diff.clone()),
Default::default(),
Default::default(),
Default::default(),
Expand Down
2 changes: 1 addition & 1 deletion examples/cursors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ fn main() {

/* Return trace content after the last round. */
let (mut cursor, storage) = graph_trace.cursor();
cursor.to_vec(Clone::clone, &storage)
cursor.to_vec(&storage)
})
.unwrap().join();

Expand Down
2 changes: 1 addition & 1 deletion examples/monoid-bfs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ where G::Timestamp: Lattice+Ord {
.join_map(&edges, |_k,&(),d| *d)
.concat(&roots)
.map(|x| (x,()))
.reduce_core::<_,_,KeySpine<_,_,_>>("Reduce", Clone::clone, |_key, input, output, updates| {
.reduce_core::<_,KeySpine<_,_,_>>("Reduce", |_key, input, output, updates| {
if output.is_empty() || input[0].1 < output[0].1 {
updates.push(((), input[0].1));
}
Expand Down
4 changes: 2 additions & 2 deletions examples/spines.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,11 @@ fn main() {
let data =
data.map(|x| (x.clone().into_bytes(), x.into_bytes()))
.arrange::<PreferredSpine<[u8],[u8],_,_>>()
.reduce_abelian::<_, _, _, _, PreferredSpine<[u8],(),_,_>>("distinct", |_| (), |_,_,output| output.push(((), 1)));
.reduce_abelian::<_, _, _, PreferredSpine<[u8],(),_,_>>("distinct", |_,_,output| output.push(((), 1)));
let keys =
keys.map(|x| (x.clone().into_bytes(), 7))
.arrange::<PreferredSpine<[u8],u8,_,_>>()
.reduce_abelian::<_, _, _, _, PreferredSpine<[u8],(),_,_>>("distinct", |_| (), |_,_,output| output.push(((), 1)));
.reduce_abelian::<_, _, _, PreferredSpine<[u8],(),_,_>>("distinct", |_,_,output| output.push(((), 1)));

keys.join_core(&data, |_k, &(), &()| Option::<()>::None)
.probe_with(&mut probe);
Expand Down
2 changes: 1 addition & 1 deletion src/algorithms/graphs/propagate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ where
let labels =
proposals
.concat(&nodes)
.reduce_abelian::<_,_,ValSpine<_,_,_,_>>("Propagate", |v| v.clone(), |_, s, t| t.push((s[0].0.clone(), R::from(1_i8))));
.reduce_abelian::<_,ValSpine<_,_,_,_>>("Propagate", |_, s, t| t.push((s[0].0.clone(), R::from(1_i8))));

let propagate: Collection<_, (N, L), R> =
labels
Expand Down
12 changes: 6 additions & 6 deletions src/operators/arrange/arrangement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -289,19 +289,19 @@ where
T1: TraceReader + Clone + 'static,
{
/// A direct implementation of `ReduceCore::reduce_abelian`.
pub fn reduce_abelian<L, K, V, F, T2>(&self, name: &str, from: F, mut logic: L) -> Arranged<G, TraceAgent<T2>>
pub fn reduce_abelian<L, K, V, T2>(&self, name: &str, mut logic: L) -> Arranged<G, TraceAgent<T2>>
where
for<'a> T1::Key<'a>: IntoOwned<'a, Owned = K>,
T2: for<'a> Trace<Key<'a>= T1::Key<'a>, Time=T1::Time>+'static,
K: Ord + 'static,
V: Data,
F: Fn(T2::Val<'_>) -> V + 'static,
for<'a> T2::Val<'a> : IntoOwned<'a, Owned = V>,
T2::Diff: Abelian,
T2::Batch: Batch,
<T2::Builder as Builder>::Input: Container + PushInto<((K, V), T2::Time, T2::Diff)>,
L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(V, T2::Diff)>)+'static,
{
self.reduce_core::<_,K,V,F,T2>(name, from, move |key, input, output, change| {
self.reduce_core::<_,K,V,T2>(name, move |key, input, output, change| {
if !input.is_empty() {
logic(key, input, change);
}
Expand All @@ -311,19 +311,19 @@ where
}

/// A direct implementation of `ReduceCore::reduce_core`.
pub fn reduce_core<L, K, V, F, T2>(&self, name: &str, from: F, logic: L) -> Arranged<G, TraceAgent<T2>>
pub fn reduce_core<L, K, V, T2>(&self, name: &str, logic: L) -> Arranged<G, TraceAgent<T2>>
where
for<'a> T1::Key<'a>: IntoOwned<'a, Owned = K>,
T2: for<'a> Trace<Key<'a>=T1::Key<'a>, Time=T1::Time>+'static,
K: Ord + 'static,
V: Data,
F: Fn(T2::Val<'_>) -> V + 'static,
for<'a> T2::Val<'a> : IntoOwned<'a, Owned = V>,
T2::Batch: Batch,
<T2::Builder as Builder>::Input: Container + PushInto<((K, V), T2::Time, T2::Diff)>,
L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(V, T2::Diff)>, &mut Vec<(V, T2::Diff)>)+'static,
{
use crate::operators::reduce::reduce_trace;
reduce_trace::<_,_,_,_,V,_,_>(self, name, from, logic)
reduce_trace::<_,_,_,_,V,_>(self, name, logic)
}
}

Expand Down
9 changes: 4 additions & 5 deletions src/operators/arrange/upsert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@
//! use differential_dataflow::operators::arrange::upsert;
//!
//! let stream = scope.input_from(&mut input);
//! let arranged = upsert::arrange_from_upsert::<_, _, _, _, ValSpine<Key, Val, _, _>>(&stream, &"test", |v| v.clone());
//! let arranged = upsert::arrange_from_upsert::<_, _, _, ValSpine<Key, Val, _, _>>(&stream, &"test");
//!
//! arranged
//! .as_collection(|k,v| (k.clone(), v.clone()))
Expand Down Expand Up @@ -126,18 +126,17 @@ use super::TraceAgent;
/// This method is only implemented for totally ordered times, as we do not yet
/// understand what a "sequence" of upserts would mean for partially ordered
/// timestamps.
pub fn arrange_from_upsert<G, K, V, F, Tr>(
pub fn arrange_from_upsert<G, K, V, Tr>(
stream: &Stream<G, (K, Option<V>, G::Timestamp)>,
name: &str,
from: F,
) -> Arranged<G, TraceAgent<Tr>>
where
G: Scope<Timestamp=Tr::Time>,
Tr: Trace+TraceReader<Diff=isize>+'static,
for<'a> Tr::Key<'a> : IntoOwned<'a, Owned = K>,
K: ExchangeData+Hashable+std::hash::Hash,
V: ExchangeData,
F: Fn(Tr::Val<'_>) -> V + 'static,
for<'a> Tr::Val<'a> : IntoOwned<'a, Owned = V>,
Tr::Time: TotalOrder+ExchangeData,
Tr::Batch: Batch,
Tr::Builder: Builder<Input = Vec<((K, V), Tr::Time, Tr::Diff)>>,
Expand Down Expand Up @@ -261,7 +260,7 @@ where
assert!(count == 0 || count == 1);
if count == 1 {
assert!(prev_value.is_none());
prev_value = Some(from(val));
prev_value = Some(val.into_owned());
}
trace_cursor.step_val(&trace_storage);
}
Expand Down
Loading

0 comments on commit 2fbb39b

Please sign in to comment.