From 3e81b8b6a8d6160fe3ba0fa0f78af66babc36aad Mon Sep 17 00:00:00 2001 From: Moritz Hoffmann Date: Tue, 23 Apr 2024 10:44:43 -0400 Subject: [PATCH 1/5] Allow joins to produce arbitrary containers (#477) * Allow joins to produce arbitrary containers Add a `join_traces_core` function that allows the caller to specify the output stream container type. The existing `join_traces` function forces it to be vectors, by virtue of wrapping the output in a collection. Signed-off-by: Moritz Hoffmann * Remove `join_traces_core` Signed-off-by: Moritz Hoffmann --------- Signed-off-by: Moritz Hoffmann --- src/operators/arrange/arrangement.rs | 1 + src/operators/join.rs | 24 +++++++++++++++++------- 2 files changed, 18 insertions(+), 7 deletions(-) diff --git a/src/operators/arrange/arrangement.rs b/src/operators/arrange/arrangement.rs index bffc2bdaa..9a5d926c0 100644 --- a/src/operators/arrange/arrangement.rs +++ b/src/operators/arrange/arrangement.rs @@ -427,6 +427,7 @@ where { use crate::operators::join::join_traces; join_traces(self, other, result) + .as_collection() } } diff --git a/src/operators/join.rs b/src/operators/join.rs index a31e682a5..311f06802 100644 --- a/src/operators/join.rs +++ b/src/operators/join.rs @@ -5,16 +5,17 @@ //! + (b * c), and if this is not equal to the former term, little is known about the actual output. use std::cmp::Ordering; +use timely::container::{PushContainer, PushInto}; use timely::order::PartialOrder; use timely::progress::Timestamp; -use timely::dataflow::Scope; -use timely::dataflow::operators::generic::{Operator, OutputHandle}; +use timely::dataflow::{Scope, StreamCore}; +use timely::dataflow::operators::generic::{Operator, OutputHandleCore}; use timely::dataflow::channels::pact::Pipeline; use timely::dataflow::operators::Capability; use timely::dataflow::channels::pushers::tee::Tee; use crate::hashable::Hashable; -use crate::{Data, ExchangeData, Collection, AsCollection}; +use crate::{Data, ExchangeData, Collection}; use crate::difference::{Semigroup, Abelian, Multiply}; use crate::lattice::Lattice; use crate::operators::arrange::{Arranged, ArrangeByKey, ArrangeBySelf}; @@ -322,8 +323,14 @@ where /// even potentially unrelated to the input collection data. Importantly, the key and value types could be generic /// associated types (GATs) of the traces, and we would seemingly struggle to frame these types as trait arguments. /// +/// The implementation produces a caller-specified container, with the requirement that the container +/// can absorb `(D, G::Timestamp, R)` triples. Implementations can use [`AsCollection`] to wrap the +/// output stream in a collection. +/// /// The "correctness" of this method depends heavily on the behavior of the supplied `result` function. -pub fn join_traces(arranged1: &Arranged, arranged2: &Arranged, mut result: L) -> Collection +/// +/// [`AsCollection`]: crate::collection::AsCollection +pub fn join_traces(arranged1: &Arranged, arranged2: &Arranged, mut result: L) -> StreamCore where G: Scope, T1: TraceReader+Clone+'static, @@ -332,6 +339,8 @@ where R: Semigroup, I: IntoIterator, L: FnMut(T1::Key<'_>,T1::Val<'_>,T2::Val<'_>,&G::Timestamp,&T1::Diff,&T2::Diff)->I+'static, + C: PushContainer, + (D, G::Timestamp, R): PushInto, { // Rename traces for symmetry from here on out. let mut trace1 = arranged1.trace.clone(); @@ -565,7 +574,6 @@ where } } }) - .as_collection() } @@ -617,10 +625,12 @@ where /// Process keys until at least `fuel` output tuples produced, or the work is exhausted. #[inline(never)] - fn work(&mut self, output: &mut OutputHandle>>, mut logic: L, fuel: &mut usize) - where + fn work(&mut self, output: &mut OutputHandleCore>, mut logic: L, fuel: &mut usize) + where I: IntoIterator, L: for<'a> FnMut(C1::Key<'a>, C1::Val<'a>, C2::Val<'a>, &T, &C1::Diff, &C2::Diff)->I, + C: PushContainer, + (D, T, R): PushInto, { let meet = self.capability.time(); From d18497c41add25471c469c9d0e8fa94e0462c414 Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Wed, 24 Apr 2024 15:03:28 -0400 Subject: [PATCH 2/5] Remove `ValOwned` (#476) * Remove use of ValOwned * Respond to feedback --- Cargo.toml | 2 +- dogsdogsdogs/examples/delta_query.rs | 6 +- dogsdogsdogs/src/lib.rs | 2 +- dogsdogsdogs/src/operators/count.rs | 2 +- dogsdogsdogs/src/operators/propose.rs | 19 ++- dogsdogsdogs/src/operators/validate.rs | 2 +- examples/cursors.rs | 19 +-- examples/monoid-bfs.rs | 2 +- examples/spines.rs | 4 +- src/algorithms/graphs/propagate.rs | 2 +- src/operators/arrange/agent.rs | 1 - src/operators/arrange/arrangement.rs | 186 ++-------------------- src/operators/arrange/upsert.rs | 20 +-- src/operators/consolidate.rs | 2 +- src/operators/reduce.rs | 93 ++++++----- src/trace/cursor/cursor_list.rs | 1 - src/trace/cursor/mod.rs | 11 +- src/trace/implementations/ord_neu.rs | 4 - src/trace/implementations/rhh.rs | 2 - src/trace/implementations/spine_fueled.rs | 1 - src/trace/mod.rs | 18 +-- src/trace/wrappers/enter.rs | 4 - src/trace/wrappers/enter_at.rs | 4 - src/trace/wrappers/filter.rs | 4 - src/trace/wrappers/freeze.rs | 4 - src/trace/wrappers/frontier.rs | 4 - src/trace/wrappers/rc.rs | 1 - tests/trace.rs | 8 +- 28 files changed, 115 insertions(+), 313 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index dc9ab0c0e..0cca0a4bd 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -21,7 +21,7 @@ members = [ # "advent_of_code_2017", "dogsdogsdogs", "experiments", - "interactive", + #"interactive", "server", "server/dataflows/degr_dist", "server/dataflows/neighborhood", diff --git a/dogsdogsdogs/examples/delta_query.rs b/dogsdogsdogs/examples/delta_query.rs index 2f2a9ba10..dab7cc655 100644 --- a/dogsdogsdogs/examples/delta_query.rs +++ b/dogsdogsdogs/examples/delta_query.rs @@ -79,17 +79,17 @@ fn main() { // Prior technology // dQ/dE1 := dE1(a,b), E2(b,c), E3(a,c) - let changes1 = propose(&changes, forward_key_neu.clone(), key2.clone()); + let changes1 = propose(&changes, forward_key_neu.clone(), key2.clone(), Clone::clone); let changes1 = validate(&changes1, forward_self_neu.clone(), key1.clone()); let changes1 = changes1.map(|((a,b),c)| (a,b,c)); // dQ/dE2 := dE2(b,c), E1(a,b), E3(a,c) - let changes2 = propose(&changes, reverse_key_alt.clone(), key1.clone()); + let changes2 = propose(&changes, reverse_key_alt.clone(), key1.clone(), Clone::clone); let changes2 = validate(&changes2, reverse_self_neu.clone(), key2.clone()); let changes2 = changes2.map(|((b,c),a)| (a,b,c)); // dQ/dE3 := dE3(a,c), E1(a,b), E2(b,c) - let changes3 = propose(&changes, forward_key_alt.clone(), key1.clone()); + let changes3 = propose(&changes, forward_key_alt.clone(), key1.clone(), Clone::clone); let changes3 = validate(&changes3, reverse_self_alt.clone(), key2.clone()); let changes3 = changes3.map(|((a,c),b)| (a,b,c)); diff --git a/dogsdogsdogs/src/lib.rs b/dogsdogsdogs/src/lib.rs index 262e09609..1d90feb51 100644 --- a/dogsdogsdogs/src/lib.rs +++ b/dogsdogsdogs/src/lib.rs @@ -199,7 +199,7 @@ where fn propose(&mut self, prefixes: &Collection) -> Collection { let propose = self.indices.propose_trace.import(&prefixes.scope()); - operators::propose::propose(prefixes, propose, self.key_selector.clone()) + operators::propose::propose(prefixes, propose, self.key_selector.clone(), |x| x.clone()) } fn validate(&mut self, extensions: &Collection) -> Collection { diff --git a/dogsdogsdogs/src/operators/count.rs b/dogsdogsdogs/src/operators/count.rs index 68c39f1fa..d77c0b7e4 100644 --- a/dogsdogsdogs/src/operators/count.rs +++ b/dogsdogsdogs/src/operators/count.rs @@ -19,7 +19,7 @@ pub fn count( ) -> Collection where G: Scope, - Tr: TraceReader+Clone+'static, + Tr: TraceReader+Clone+'static, Tr::KeyOwned: Hashable + Default, R: Monoid+Multiply+ExchangeData, F: Fn(&P)->Tr::KeyOwned+Clone+'static, diff --git a/dogsdogsdogs/src/operators/propose.rs b/dogsdogsdogs/src/operators/propose.rs index 164e42987..36024e6d4 100644 --- a/dogsdogsdogs/src/operators/propose.rs +++ b/dogsdogsdogs/src/operators/propose.rs @@ -4,7 +4,6 @@ use differential_dataflow::{ExchangeData, Collection, Hashable}; use differential_dataflow::difference::{Monoid, Multiply}; use differential_dataflow::operators::arrange::Arranged; use differential_dataflow::trace::TraceReader; -use differential_dataflow::trace::cursor::MyTrait; /// Proposes extensions to a prefix stream. /// @@ -14,11 +13,12 @@ use differential_dataflow::trace::cursor::MyTrait; /// create a join if the `prefixes` collection is also arranged and responds to changes that /// `arrangement` undergoes. More complicated patterns are also appropriate, as in the case /// of delta queries. -pub fn propose( +pub fn propose( prefixes: &Collection, arrangement: Arranged, key_selector: F, -) -> Collection + val_from: VF, +) -> Collection where G: Scope, Tr: TraceReader+Clone+'static, @@ -26,12 +26,14 @@ where Tr::Diff: Monoid+Multiply+ExchangeData, F: Fn(&P)->Tr::KeyOwned+Clone+'static, P: ExchangeData, + V: Clone + 'static, + VF: Fn(Tr::Val<'_>) -> V + 'static, { crate::operators::lookup_map( prefixes, arrangement, move |p: &P, k: &mut Tr::KeyOwned | { *k = key_selector(p); }, - |prefix, diff, value, sum| ((prefix.clone(), value.into_owned()), diff.clone().multiply(sum)), + move |prefix, diff, value, sum| ((prefix.clone(), val_from(value)), diff.clone().multiply(sum)), Default::default(), Default::default(), Default::default(), @@ -43,11 +45,12 @@ where /// Unlike `propose`, this method does not scale the multiplicity of matched /// prefixes by the number of matches in `arrangement`. This can be useful to /// avoid the need to prepare an arrangement of distinct extensions. -pub fn propose_distinct( +pub fn propose_distinct( prefixes: &Collection, arrangement: Arranged, key_selector: F, -) -> Collection + val_from: VF, +) -> Collection where G: Scope, Tr: TraceReader+Clone+'static, @@ -55,12 +58,14 @@ where Tr::Diff: Monoid+Multiply+ExchangeData, F: Fn(&P)->Tr::KeyOwned+Clone+'static, P: ExchangeData, + V: Clone + 'static, + VF: Fn(Tr::Val<'_>) -> V + 'static, { crate::operators::lookup_map( prefixes, arrangement, move |p: &P, k: &mut Tr::KeyOwned| { *k = key_selector(p); }, - |prefix, diff, value, _sum| ((prefix.clone(), value.into_owned()), diff.clone()), + move |prefix, diff, value, _sum| ((prefix.clone(), val_from(value)), diff.clone()), Default::default(), Default::default(), Default::default(), diff --git a/dogsdogsdogs/src/operators/validate.rs b/dogsdogsdogs/src/operators/validate.rs index f1f0c273f..7ddbf38c5 100644 --- a/dogsdogsdogs/src/operators/validate.rs +++ b/dogsdogsdogs/src/operators/validate.rs @@ -19,7 +19,7 @@ pub fn validate( ) -> Collection where G: Scope, - Tr: TraceReader+Clone+'static, + Tr: TraceReader+Clone+'static, K: Ord+Hash+Clone+Default, V: ExchangeData+Hash+Default, Tr::Diff: Monoid+Multiply+ExchangeData, diff --git a/examples/cursors.rs b/examples/cursors.rs index a9632570c..e4da79e47 100644 --- a/examples/cursors.rs +++ b/examples/cursors.rs @@ -31,7 +31,6 @@ //! Final graph: {(2, 1): 1, (3, 2): 1, (3, 4): 1, (4, 3): 1} //! ``` -use std::fmt::Debug; use std::collections::BTreeMap; use timely::dataflow::operators::probe::Handle; @@ -77,7 +76,6 @@ fn main() { graph_trace.set_physical_compaction(AntichainRef::new(&[i])); graph_trace.set_logical_compaction(AntichainRef::new(&[i])); worker.step_while(|| probe.less_than(&i)); - dump_cursor(i, worker.index(), &mut graph_trace); } } else { /* Only worker 0 feeds inputs to the dataflow. */ @@ -92,13 +90,12 @@ fn main() { graph_trace.set_physical_compaction(AntichainRef::new(&[i])); graph_trace.set_logical_compaction(AntichainRef::new(&[i])); worker.step_while(|| probe.less_than(graph.time())); - dump_cursor(i, worker.index(), &mut graph_trace); } } /* Return trace content after the last round. */ let (mut cursor, storage) = graph_trace.cursor(); - cursor.to_vec(&storage) + cursor.to_vec(Clone::clone, &storage) }) .unwrap().join(); @@ -128,17 +125,3 @@ fn main() { expected_graph_content.insert((rounds, rounds+1), 1); assert_eq!(graph_content, expected_graph_content); } - -fn dump_cursor(round: u32, index: usize, trace: &mut Tr) -where - Tr: TraceReader, - Tr::KeyOwned: Debug, - Tr::ValOwned: Debug, - Tr::Time: Debug, - Tr::Diff: Debug, -{ - let (mut cursor, storage) = trace.cursor(); - for ((k, v), diffs) in cursor.to_vec(&storage).iter() { - println!("round {}, w{} {:?}:{:?}: {:?}", round, index, *k, *v, diffs); - } -} diff --git a/examples/monoid-bfs.rs b/examples/monoid-bfs.rs index f9835cef7..47563a065 100644 --- a/examples/monoid-bfs.rs +++ b/examples/monoid-bfs.rs @@ -145,7 +145,7 @@ where G::Timestamp: Lattice+Ord { .join_map(&edges, |_k,&(),d| *d) .concat(&roots) .map(|x| (x,())) - .reduce_core::<_,KeySpine<_,_,_>>("Reduce", |_key, input, output, updates| { + .reduce_core::<_,_,KeySpine<_,_,_>>("Reduce", Clone::clone, |_key, input, output, updates| { if output.is_empty() || input[0].1 < output[0].1 { updates.push(((), input[0].1)); } diff --git a/examples/spines.rs b/examples/spines.rs index 0dba1630f..d4d2e64bc 100644 --- a/examples/spines.rs +++ b/examples/spines.rs @@ -55,11 +55,11 @@ fn main() { let data = data.map(|x| (x.clone().into_bytes(), x.into_bytes())) .arrange::>() - .reduce_abelian::<_, PreferredSpine<[u8],(),_,_>>("distinct", |_,_,output| output.push(((), 1))); + .reduce_abelian::<_, _, _, PreferredSpine<[u8],(),_,_>>("distinct", |v| v.clone(), |_,_,output| output.push(((), 1))); let keys = keys.map(|x| (x.clone().into_bytes(), 7)) .arrange::>() - .reduce_abelian::<_, PreferredSpine<[u8],(),_,_>>("distinct", |_,_,output| output.push(((), 1))); + .reduce_abelian::<_, _, _, PreferredSpine<[u8],(),_,_>>("distinct", |v| v.clone(), |_,_,output| output.push(((), 1))); keys.join_core(&data, |k,_v1,_v2| { println!("{:?}", k.text); diff --git a/src/algorithms/graphs/propagate.rs b/src/algorithms/graphs/propagate.rs index 9aef8e08b..b73ff598b 100644 --- a/src/algorithms/graphs/propagate.rs +++ b/src/algorithms/graphs/propagate.rs @@ -96,7 +96,7 @@ where let labels = proposals .concat(&nodes) - .reduce_abelian::<_,ValSpine<_,_,_,_>>("Propagate", |_, s, t| t.push((s[0].0.clone(), R::from(1_i8)))); + .reduce_abelian::<_,_,ValSpine<_,_,_,_>>("Propagate", |v| v.clone(), |_, s, t| t.push((s[0].0.clone(), R::from(1_i8)))); let propagate: Collection<_, (N, L), R> = labels diff --git a/src/operators/arrange/agent.rs b/src/operators/arrange/agent.rs index c18b7e30e..012882e2b 100644 --- a/src/operators/arrange/agent.rs +++ b/src/operators/arrange/agent.rs @@ -46,7 +46,6 @@ where type Key<'a> = Tr::Key<'a>; type KeyOwned = Tr::KeyOwned; type Val<'a> = Tr::Val<'a>; - type ValOwned = Tr::ValOwned; type Time = Tr::Time; type Diff = Tr::Diff; diff --git a/src/operators/arrange/arrangement.rs b/src/operators/arrange/arrangement.rs index 9a5d926c0..c22749a27 100644 --- a/src/operators/arrange/arrangement.rs +++ b/src/operators/arrange/arrangement.rs @@ -23,7 +23,7 @@ use timely::dataflow::{Scope, Stream, StreamCore}; use timely::dataflow::operators::generic::Operator; use timely::dataflow::channels::pact::{ParallelizationContract, Pipeline, Exchange}; use timely::progress::Timestamp; -use timely::progress::{Antichain, frontier::AntichainRef}; +use timely::progress::Antichain; use timely::dataflow::operators::Capability; use crate::{Data, ExchangeData, Collection, AsCollection, Hashable}; @@ -37,8 +37,6 @@ use trace::wrappers::enter_at::TraceEnter as TraceEnterAt; use trace::wrappers::enter_at::BatchEnter as BatchEnterAt; use trace::wrappers::filter::{TraceFilter, BatchFilter}; -use trace::cursor::MyTrait; - use super::TraceAgent; /// An arranged collection of `(K,V)` values. @@ -229,166 +227,6 @@ where }) .as_collection() } - - /// Report values associated with keys at certain times. - /// - /// This method consumes a stream of (key, time) queries and reports the corresponding stream of - /// (key, value, time, diff) accumulations in the `self` trace. - pub fn lookup(&self, queries: &Stream) -> Stream - where - Tr::KeyOwned: ExchangeData+Hashable, - Tr::ValOwned: ExchangeData, - Tr::Diff: ExchangeData, - Tr: 'static, - { - // while the arrangement is already correctly distributed, the query stream may not be. - let exchange = Exchange::new(move |update: &(Tr::KeyOwned,G::Timestamp)| update.0.hashed().into()); - queries.binary_frontier(&self.stream, exchange, Pipeline, "TraceQuery", move |_capability, _info| { - - let mut trace = Some(self.trace.clone()); - // release `set_physical_compaction` capability. - trace.as_mut().unwrap().set_physical_compaction(Antichain::new().borrow()); - - let mut stash = Vec::new(); - let mut capability: Option> = None; - - let mut active = Vec::new(); - let mut retain = Vec::new(); - - let mut working: Vec<(G::Timestamp, Tr::ValOwned, Tr::Diff)> = Vec::new(); - let mut working2: Vec<(Tr::ValOwned, Tr::Diff)> = Vec::new(); - - move |input1, input2, output| { - - input1.for_each(|time, data| { - // if the minimum capability "improves" retain it. - if capability.is_none() || time.time().less_than(capability.as_ref().unwrap().time()) { - capability = Some(time.retain()); - } - stash.extend(data.iter().cloned()); - }); - - // drain input2; we will consult `trace` directly. - input2.for_each(|_time, _data| { }); - - assert_eq!(capability.is_none(), stash.is_empty()); - - let mut drained = false; - if let Some(capability) = capability.as_mut() { - if !input2.frontier().less_equal(capability.time()) { - for datum in stash.drain(..) { - if !input2.frontier().less_equal(&datum.1) { - active.push(datum); - } - else { - retain.push(datum); - } - } - drained = !active.is_empty(); - - ::std::mem::swap(&mut stash, &mut retain); // retain now the stashed queries. - - // sort temp1 by key and then by time. - active.sort_unstable_by(|x,y| x.0.cmp(&y.0)); - - let (mut cursor, storage) = trace.as_mut().unwrap().cursor(); - let mut session = output.session(&capability); - - // // V0: Potentially quadratic under load. - // for (key, time) in active.drain(..) { - // cursor.seek_key(&storage, &key); - // if cursor.get_key(&storage) == Some(&key) { - // while let Some(val) = cursor.get_val(&storage) { - // let mut count = R::zero(); - // cursor.map_times(&storage, |t, d| if t.less_equal(&time) { - // count = count + d; - // }); - // if !count.is_zero() { - // session.give((key.clone(), val.clone(), time.clone(), count)); - // } - // cursor.step_val(&storage); - // } - // } - // } - - // V1: Stable under load - let mut active_finger = 0; - while active_finger < active.len() { - - let key = &active[active_finger].0; - let mut same_key = active_finger; - while active.get(same_key).map(|x| &x.0) == Some(key) { - same_key += 1; - } - - cursor.seek_key_owned(&storage, key); - if cursor.get_key(&storage).map(|k| k.equals(key)).unwrap_or(false) { - - let mut active = &active[active_finger .. same_key]; - - while let Some(val) = cursor.get_val(&storage) { - cursor.map_times(&storage, |t,d| working.push((t.clone(), val.into_owned(), d.clone()))); - cursor.step_val(&storage); - } - - working.sort_by(|x,y| x.0.cmp(&y.0)); - for (time, val, diff) in working.drain(..) { - if !active.is_empty() && active[0].1.less_than(&time) { - crate::consolidation::consolidate(&mut working2); - while !active.is_empty() && active[0].1.less_than(&time) { - for (val, count) in working2.iter() { - session.give((key.clone(), val.clone(), active[0].1.clone(), count.clone())); - } - active = &active[1..]; - } - } - working2.push((val, diff)); - } - if !active.is_empty() { - crate::consolidation::consolidate(&mut working2); - while !active.is_empty() { - for (val, count) in working2.iter() { - session.give((key.clone(), val.clone(), active[0].1.clone(), count.clone())); - } - active = &active[1..]; - } - } - } - active_finger = same_key; - } - active.clear(); - } - } - - if drained { - if stash.is_empty() { capability = None; } - if let Some(capability) = capability.as_mut() { - let mut min_time = stash[0].1.clone(); - for datum in stash[1..].iter() { - if datum.1.less_than(&min_time) { - min_time = datum.1.clone(); - } - } - capability.downgrade(&min_time); - } - } - - // Determine new frontier on queries that may be issued. - // TODO: This code looks very suspect; explain better or fix. - let frontier = IntoIterator::into_iter([ - capability.as_ref().map(|c| c.time().clone()), - input1.frontier().frontier().get(0).cloned(), - ]).flatten().min(); - - if let Some(frontier) = frontier { - trace.as_mut().map(|t| t.set_logical_compaction(AntichainRef::new(&[frontier]))); - } - else { - trace = None; - } - } - }) - } } @@ -439,16 +277,17 @@ where T1: TraceReader + Clone + 'static, { /// A direct implementation of `ReduceCore::reduce_abelian`. - pub fn reduce_abelian(&self, name: &str, mut logic: L) -> Arranged> + pub fn reduce_abelian(&self, name: &str, from: F, mut logic: L) -> Arranged> where T2: for<'a> Trace= T1::Key<'a>, Time=T1::Time>+'static, - T2::ValOwned: Data, + V: Data, + F: Fn(T2::Val<'_>) -> V + 'static, T2::Diff: Abelian, T2::Batch: Batch, - T2::Builder: Builder, - L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(::ValOwned, T2::Diff)>)+'static, + T2::Builder: Builder, + L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(V, T2::Diff)>)+'static, { - self.reduce_core::<_,T2>(name, move |key, input, output, change| { + self.reduce_core::<_,V,F,T2>(name, from, move |key, input, output, change| { if !input.is_empty() { logic(key, input, change); } @@ -458,16 +297,17 @@ where } /// A direct implementation of `ReduceCore::reduce_core`. - pub fn reduce_core(&self, name: &str, logic: L) -> Arranged> + pub fn reduce_core(&self, name: &str, from: F, logic: L) -> Arranged> where T2: for<'a> Trace=T1::Key<'a>, Time=T1::Time>+'static, - T2::ValOwned: Data, + V: Data, + F: Fn(T2::Val<'_>) -> V + 'static, T2::Batch: Batch, - T2::Builder: Builder, - L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(::ValOwned,T2::Diff)>, &mut Vec<(::ValOwned, T2::Diff)>)+'static, + T2::Builder: Builder, + L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(V, T2::Diff)>, &mut Vec<(V, T2::Diff)>)+'static, { use crate::operators::reduce::reduce_trace; - reduce_trace(self, name, logic) + reduce_trace::<_,_,_,V,_,_>(self, name, from, logic) } } diff --git a/src/operators/arrange/upsert.rs b/src/operators/arrange/upsert.rs index 4ca9dc9b3..758ec8df3 100644 --- a/src/operators/arrange/upsert.rs +++ b/src/operators/arrange/upsert.rs @@ -59,7 +59,7 @@ //! use differential_dataflow::operators::arrange::upsert; //! //! let stream = scope.input_from(&mut input); -//! let arranged = upsert::arrange_from_upsert::<_, ValSpine>(&stream, &"test"); +//! let arranged = upsert::arrange_from_upsert::<_, _, _, ValSpine>(&stream, &"test", |v| v.clone()); //! //! arranged //! .as_collection(|k,v| (k.clone(), v.clone())) @@ -125,18 +125,20 @@ use super::TraceAgent; /// This method is only implemented for totally ordered times, as we do not yet /// understand what a "sequence" of upserts would mean for partially ordered /// timestamps. -pub fn arrange_from_upsert( - stream: &Stream, G::Timestamp)>, +pub fn arrange_from_upsert( + stream: &Stream, G::Timestamp)>, name: &str, + from: F, ) -> Arranged> where G: Scope, Tr: Trace+TraceReader+'static, Tr::KeyOwned: ExchangeData+Hashable+std::hash::Hash, - Tr::ValOwned: ExchangeData, + V: ExchangeData, + F: Fn(Tr::Val<'_>) -> V + 'static, Tr::Time: TotalOrder+ExchangeData, Tr::Batch: Batch, - Tr::Builder: Builder, + Tr::Builder: Builder, { let mut reader: Option> = None; @@ -145,7 +147,7 @@ where let reader = &mut reader; - let exchange = Exchange::new(move |update: &(Tr::KeyOwned,Option,G::Timestamp)| (update.0).hashed().into()); + let exchange = Exchange::new(move |update: &(Tr::KeyOwned,Option,G::Timestamp)| (update.0).hashed().into()); stream.unary_frontier(exchange, name, move |_capability, info| { @@ -175,7 +177,7 @@ where let mut prev_frontier = Antichain::from_elem(::minimum()); // For stashing input upserts, ordered increasing by time (`BinaryHeap` is a max-heap). - let mut priority_queue = BinaryHeap::)>>::new(); + let mut priority_queue = BinaryHeap::)>>::new(); let mut updates = Vec::new(); move |input, output| { @@ -245,7 +247,7 @@ where use trace::cursor::MyTrait; // The prior value associated with the key. - let mut prev_value: Option = None; + let mut prev_value: Option = None; // Attempt to find the key in the trace. trace_cursor.seek_key_owned(&trace_storage, &key); @@ -257,7 +259,7 @@ where assert!(count == 0 || count == 1); if count == 1 { assert!(prev_value.is_none()); - prev_value = Some(val.into_owned()); + prev_value = Some(from(val)); } trace_cursor.step_val(&trace_storage); } diff --git a/src/operators/consolidate.rs b/src/operators/consolidate.rs index b67c1c4b8..46b96528c 100644 --- a/src/operators/consolidate.rs +++ b/src/operators/consolidate.rs @@ -51,7 +51,7 @@ where /// As `consolidate` but with the ability to name the operator and specify the trace type. pub fn consolidate_named(&self, name: &str) -> Self where - Tr: crate::trace::Trace+'static, + Tr: crate::trace::Trace+'static, Tr::Batch: crate::trace::Batch, Tr::Batcher: Batcher>, { diff --git a/src/operators/reduce.rs b/src/operators/reduce.rs index 37d98d8ca..db80a5d44 100644 --- a/src/operators/reduce.rs +++ b/src/operators/reduce.rs @@ -87,7 +87,7 @@ where { fn reduce_named(&self, name: &str, logic: L) -> Collection where L: FnMut(&K, &[(&V, R)], &mut Vec<(V2, R2)>)+'static { - self.reduce_abelian::<_,ValSpine<_,_,_,_>>(name, logic) + self.reduce_abelian::<_,V2,_,ValSpine<_,_,_,_>>(name, |val| val.clone(), logic) .as_collection(|k,v| (k.clone(), v.clone())) } } @@ -163,7 +163,7 @@ where T1: for<'a> TraceReader=&'a K, KeyOwned=K, Val<'a>=&'a (), Diff=R1>+Clone+'static, { fn threshold_namedR2+'static>(&self, name: &str, mut thresh: F) -> Collection { - self.reduce_abelian::<_,KeySpine<_,_,_>>(name, move |k,s,t| t.push(((), thresh(k, &s[0].1)))) + self.reduce_abelian::<_,(),_,KeySpine<_,_,_>>(name, |&()| (), move |k,s,t| t.push(((), thresh(k, &s[0].1)))) .as_collection(|k,_| k.clone()) } } @@ -213,13 +213,13 @@ where T1: for<'a> TraceReader=&'a K, KeyOwned=K, Val<'a>=&'a (), Diff=R>+Clone+'static, { fn count_core>(&self) -> Collection { - self.reduce_abelian::<_,ValSpine<_,_,_,_>>("Count", |_k,s,t| t.push((s[0].1.clone(), R2::from(1i8)))) + self.reduce_abelian::<_,R,_,ValSpine<_,_,_,_>>("Count", |r| r.clone(), |_k,s,t| t.push((s[0].1.clone(), R2::from(1i8)))) .as_collection(|k,c| (k.clone(), c.clone())) } } /// Extension trait for the `reduce_core` differential dataflow method. -pub trait ReduceCore where G::Timestamp: Lattice+Ord { +pub trait ReduceCore where G::Timestamp: Lattice+Ord { /// Applies `reduce` to arranged data, and returns an arrangement of output data. /// /// This method is used by the more ergonomic `reduce`, `distinct`, and `count` methods, although @@ -238,23 +238,24 @@ pub trait ReduceCore>( + /// .reduce_abelian::<_,_,ValSpine<_,_,_,_>>( /// "Example", + /// Clone::clone, /// move |_key, src, dst| dst.push((*src[0].0, 1)) /// ) /// .trace; /// }); /// ``` - fn reduce_abelian(&self, name: &str, mut logic: L) -> Arranged> + fn reduce_abelian(&self, name: &str, from: F, mut logic: L) -> Arranged> where T2: for<'a> Trace= &'a K, Time=G::Timestamp>+'static, - T2::ValOwned: Data, + F: Fn(T2::Val<'_>) -> V + 'static, T2::Diff: Abelian, T2::Batch: Batch, - T2::Builder: Builder, - L: FnMut(&K, &[(&V, R)], &mut Vec<(::ValOwned, T2::Diff)>)+'static, + T2::Builder: Builder, + L: FnMut(&K, &[(&V, R)], &mut Vec<(V, T2::Diff)>)+'static, { - self.reduce_core::<_,T2>(name, move |key, input, output, change| { + self.reduce_core::<_,_,T2>(name, from, move |key, input, output, change| { if !input.is_empty() { logic(key, input, change); } @@ -268,13 +269,13 @@ pub trait ReduceCore(&self, name: &str, logic: L) -> Arranged> + fn reduce_core(&self, name: &str, from: F, logic: L) -> Arranged> where T2: for<'a> Trace=&'a K, Time=G::Timestamp>+'static, - T2::ValOwned: Data, + F: Fn(T2::Val<'_>) -> V + 'static, T2::Batch: Batch, - T2::Builder: Builder, - L: FnMut(&K, &[(&V, R)], &mut Vec<(::ValOwned,T2::Diff)>, &mut Vec<(::ValOwned, T2::Diff)>)+'static, + T2::Builder: Builder, + L: FnMut(&K, &[(&V, R)], &mut Vec<(V,T2::Diff)>, &mut Vec<(V, T2::Diff)>)+'static, ; } @@ -283,35 +284,36 @@ where G: Scope, G::Timestamp: Lattice+Ord, K: ExchangeData+Hashable, - K: ToOwned, V: ExchangeData, R: ExchangeData+Semigroup, { - fn reduce_core(&self, name: &str, logic: L) -> Arranged> + fn reduce_core(&self, name: &str, from: F, logic: L) -> Arranged> where - T2::ValOwned: Data, + V: Data, + F: Fn(T2::Val<'_>) -> V + 'static, T2: for<'a> Trace=&'a K, Time=G::Timestamp>+'static, T2::Batch: Batch, - T2::Builder: Builder, - L: FnMut(&K, &[(&V, R)], &mut Vec<(::ValOwned,T2::Diff)>, &mut Vec<(::ValOwned, T2::Diff)>)+'static, + T2::Builder: Builder, + L: FnMut(&K, &[(&V, R)], &mut Vec<(V,T2::Diff)>, &mut Vec<(V, T2::Diff)>)+'static, { self.arrange_by_key_named(&format!("Arrange: {}", name)) - .reduce_core(name, logic) + .reduce_core(name, from, logic) } } /// A key-wise reduction of values in an input trace. /// /// This method exists to provide reduce functionality without opinions about qualifying trace types. -pub fn reduce_trace(trace: &Arranged, name: &str, mut logic: L) -> Arranged> +pub fn reduce_trace(trace: &Arranged, name: &str, from: F, mut logic: L) -> Arranged> where G: Scope, T1: TraceReader + Clone + 'static, T2: for<'a> Trace=T1::Key<'a>, Time=T1::Time> + 'static, - T2::ValOwned: Data, + V: Data, + F: Fn(T2::Val<'_>) -> V + 'static, T2::Batch: Batch, - T2::Builder: Builder, - L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(T2::ValOwned,T2::Diff)>, &mut Vec<(T2::ValOwned, T2::Diff)>)+'static, + T2::Builder: Builder, + L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(V,T2::Diff)>, &mut Vec<(V, T2::Diff)>)+'static, { let mut result_trace = None; @@ -445,7 +447,7 @@ where // // TODO: It would be better if all updates went into one batch, but timely dataflow prevents // this as long as it requires that there is only one capability for each message. - let mut buffers = Vec::<(G::Timestamp, Vec<(::ValOwned, G::Timestamp, T2::Diff)>)>::new(); + let mut buffers = Vec::<(G::Timestamp, Vec<(V, G::Timestamp, T2::Diff)>)>::new(); let mut builders = Vec::new(); for cap in capabilities.iter() { buffers.push((cap.time().clone(), Vec::new())); @@ -472,7 +474,7 @@ where use std::borrow::Borrow; use crate::trace::cursor::MyTrait; - + // Determine the next key we will work on; could be synthetic, could be from a batch. let key1 = exposed.get(exposed_position).map(|x| <_ as MyTrait>::borrow_as(&x.0)); let key2 = batch_cursor.get_key(batch_storage); @@ -505,6 +507,7 @@ where (&mut output_cursor, output_storage), (&mut batch_cursor, batch_storage), &mut interesting_times, + &from, &mut logic, &upper_limit, &mut buffers[..], @@ -622,30 +625,33 @@ fn sort_dedup(list: &mut Vec) { list.dedup(); } -trait PerKeyCompute<'a, C1, C2, C3> +trait PerKeyCompute<'a, C1, C2, C3, V> where C1: Cursor, C2: Cursor = C1::Key<'a>, Time = C1::Time>, C3: Cursor = C1::Key<'a>, Val<'a> = C1::Val<'a>, Time = C1::Time, Diff = C1::Diff>, + V: Clone + Ord, { fn new() -> Self; - fn compute( + fn compute( &mut self, key: C1::Key<'a>, source_cursor: (&mut C1, &'a C1::Storage), output_cursor: (&mut C2, &'a C2::Storage), batch_cursor: (&mut C3, &'a C3::Storage), times: &mut Vec, + from: &F, logic: &mut L, upper_limit: &Antichain, - outputs: &mut [(C2::Time, Vec<(C2::ValOwned, C2::Time, C2::Diff)>)], + outputs: &mut [(C2::Time, Vec<(V, C2::Time, C2::Diff)>)], new_interesting: &mut Vec) -> (usize, usize) where + F: Fn(C2::Val<'_>) -> V, L: FnMut( C1::Key<'a>, &[(C1::Val<'a>, C1::Diff)], - &mut Vec<(C2::ValOwned, C2::Diff)>, - &mut Vec<(C2::ValOwned, C2::Diff)>, + &mut Vec<(V, C2::Diff)>, + &mut Vec<(V, C2::Diff)>, ); } @@ -664,30 +670,32 @@ mod history_replay { /// The `HistoryReplayer` is a compute strategy based on moving through existing inputs, interesting times, etc in /// time order, maintaining consolidated representations of updates with respect to future interesting times. - pub struct HistoryReplayer<'a, C1, C2, C3> + pub struct HistoryReplayer<'a, C1, C2, C3, V> where C1: Cursor, C2: Cursor = C1::Key<'a>, Time = C1::Time>, C3: Cursor = C1::Key<'a>, Val<'a> = C1::Val<'a>, Time = C1::Time, Diff = C1::Diff>, + V: Clone + Ord, { input_history: ValueHistory<'a, C1>, output_history: ValueHistory<'a, C2>, batch_history: ValueHistory<'a, C3>, input_buffer: Vec<(C1::Val<'a>, C1::Diff)>, - output_buffer: Vec<(C2::ValOwned, C2::Diff)>, - update_buffer: Vec<(C2::ValOwned, C2::Diff)>, - output_produced: Vec<((C2::ValOwned, C2::Time), C2::Diff)>, + output_buffer: Vec<(V, C2::Diff)>, + update_buffer: Vec<(V, C2::Diff)>, + output_produced: Vec<((V, C2::Time), C2::Diff)>, synth_times: Vec, meets: Vec, times_current: Vec, temporary: Vec, } - impl<'a, C1, C2, C3> PerKeyCompute<'a, C1, C2, C3> for HistoryReplayer<'a, C1, C2, C3> + impl<'a, C1, C2, C3, V> PerKeyCompute<'a, C1, C2, C3, V> for HistoryReplayer<'a, C1, C2, C3, V> where C1: Cursor, C2: Cursor = C1::Key<'a>, Time = C1::Time>, C3: Cursor = C1::Key<'a>, Val<'a> = C1::Val<'a>, Time = C1::Time, Diff = C1::Diff>, + V: Clone + Ord, { fn new() -> Self { HistoryReplayer { @@ -705,23 +713,25 @@ mod history_replay { } } #[inline(never)] - fn compute( + fn compute( &mut self, key: C1::Key<'a>, (source_cursor, source_storage): (&mut C1, &'a C1::Storage), (output_cursor, output_storage): (&mut C2, &'a C2::Storage), (batch_cursor, batch_storage): (&mut C3, &'a C3::Storage), times: &mut Vec, + from: &F, logic: &mut L, upper_limit: &Antichain, - outputs: &mut [(C2::Time, Vec<(C2::ValOwned, C2::Time, C2::Diff)>)], + outputs: &mut [(C2::Time, Vec<(V, C2::Time, C2::Diff)>)], new_interesting: &mut Vec) -> (usize, usize) where + F: Fn(C2::Val<'_>) -> V, L: FnMut( C1::Key<'a>, &[(C1::Val<'a>, C1::Diff)], - &mut Vec<(C2::ValOwned, C2::Diff)>, - &mut Vec<(C2::ValOwned, C2::Diff)>, + &mut Vec<(V, C2::Diff)>, + &mut Vec<(V, C2::Diff)>, ) { @@ -884,8 +894,7 @@ mod history_replay { meet.as_ref().map(|meet| output_replay.advance_buffer_by(meet)); for &((value, ref time), ref diff) in output_replay.buffer().iter() { if time.less_equal(&next_time) { - use crate::trace::cursor::MyTrait; - self.output_buffer.push((<_ as MyTrait>::into_owned(value), diff.clone())); + self.output_buffer.push((from(value), diff.clone())); } else { self.temporary.push(next_time.join(time)); diff --git a/src/trace/cursor/cursor_list.rs b/src/trace/cursor/cursor_list.rs index e884ca55b..98bf6a808 100644 --- a/src/trace/cursor/cursor_list.rs +++ b/src/trace/cursor/cursor_list.rs @@ -88,7 +88,6 @@ impl Cursor for CursorList { type Key<'a> = C::Key<'a>; type KeyOwned = C::KeyOwned; type Val<'a> = C::Val<'a>; - type ValOwned = C::ValOwned; type Time = C::Time; type Diff = C::Diff; diff --git a/src/trace/cursor/mod.rs b/src/trace/cursor/mod.rs index 46fb4a6ba..927dba4b2 100644 --- a/src/trace/cursor/mod.rs +++ b/src/trace/cursor/mod.rs @@ -63,9 +63,7 @@ pub trait Cursor { /// Owned version of the above. type KeyOwned: Ord + Clone; /// Values associated with keys. - type Val<'a>: Copy + Clone + MyTrait<'a, Owned = Self::ValOwned> + for<'b> PartialOrd>; - /// Owned version of the above. - type ValOwned: Ord + Clone; + type Val<'a>: Copy + Clone + MyTrait<'a> + for<'b> PartialOrd>; /// Timestamps associated with updates type Time: Timestamp + Lattice + Ord + Clone; /// Associated update. @@ -121,7 +119,10 @@ pub trait Cursor { fn rewind_vals(&mut self, storage: &Self::Storage); /// Rewinds the cursor and outputs its contents to a Vec - fn to_vec(&mut self, storage: &Self::Storage) -> Vec<((Self::KeyOwned, Self::ValOwned), Vec<(Self::Time, Self::Diff)>)> { + fn to_vec(&mut self, from: F, storage: &Self::Storage) -> Vec<((Self::KeyOwned, V), Vec<(Self::Time, Self::Diff)>)> + where + F: Fn(Self::Val<'_>) -> V, + { let mut out = Vec::new(); self.rewind_keys(storage); self.rewind_vals(storage); @@ -131,7 +132,7 @@ pub trait Cursor { self.map_times(storage, |ts, r| { kv_out.push((ts.clone(), r.clone())); }); - out.push(((self.key(storage).into_owned(), self.val(storage).into_owned()), kv_out)); + out.push(((self.key(storage).into_owned(), from(self.val(storage))), kv_out)); self.step_val(storage); } self.step_key(storage); diff --git a/src/trace/implementations/ord_neu.rs b/src/trace/implementations/ord_neu.rs index ce0c6532a..42e45aefe 100644 --- a/src/trace/implementations/ord_neu.rs +++ b/src/trace/implementations/ord_neu.rs @@ -140,7 +140,6 @@ mod val_batch { type Key<'a> = ::ReadItem<'a>; type KeyOwned = ::Key; type Val<'a> = ::ReadItem<'a>; - type ValOwned = ::Val; type Time = ::Time; type Diff = ::Diff; @@ -447,7 +446,6 @@ mod val_batch { type Key<'a> = ::ReadItem<'a>; type KeyOwned = ::Key; type Val<'a> = ::ReadItem<'a>; - type ValOwned = ::Val; type Time = ::Time; type Diff = ::Diff; @@ -700,7 +698,6 @@ mod key_batch { type Key<'a> = ::ReadItem<'a>; type KeyOwned = ::Key; type Val<'a> = &'a (); - type ValOwned = (); type Time = ::Time; type Diff = ::Diff; @@ -918,7 +915,6 @@ mod key_batch { type Key<'a> = ::ReadItem<'a>; type KeyOwned = ::Key; type Val<'a> = &'a (); - type ValOwned = (); type Time = ::Time; type Diff = ::Diff; diff --git a/src/trace/implementations/rhh.rs b/src/trace/implementations/rhh.rs index f29d39a8b..b01d1dae3 100644 --- a/src/trace/implementations/rhh.rs +++ b/src/trace/implementations/rhh.rs @@ -256,7 +256,6 @@ mod val_batch { type Key<'a> = ::ReadItem<'a>; type KeyOwned = ::Key; type Val<'a> = ::ReadItem<'a>; - type ValOwned = ::Val; type Time = ::Time; type Diff = ::Diff; @@ -608,7 +607,6 @@ mod val_batch { type Key<'a> = ::ReadItem<'a>; type KeyOwned = ::Key; type Val<'a> = ::ReadItem<'a>; - type ValOwned = ::Val; type Time = ::Time; type Diff = ::Diff; diff --git a/src/trace/implementations/spine_fueled.rs b/src/trace/implementations/spine_fueled.rs index c616e2b7d..faea1dfdb 100644 --- a/src/trace/implementations/spine_fueled.rs +++ b/src/trace/implementations/spine_fueled.rs @@ -112,7 +112,6 @@ where type Key<'a> = B::Key<'a>; type KeyOwned = B::KeyOwned; type Val<'a> = B::Val<'a>; - type ValOwned = B::ValOwned; type Time = B::Time; type Diff = B::Diff; diff --git a/src/trace/mod.rs b/src/trace/mod.rs index 145c2990f..d1f878342 100644 --- a/src/trace/mod.rs +++ b/src/trace/mod.rs @@ -56,22 +56,20 @@ pub trait TraceReader { /// Owned version of the above. type KeyOwned: Ord + Clone; /// Values associated with keys. - type Val<'a>: Copy + Clone + MyTrait<'a, Owned = Self::ValOwned>; - /// Owned version of the above. - type ValOwned: Ord + Clone; + type Val<'a>: Copy + Clone + MyTrait<'a>; /// Timestamps associated with updates type Time: Timestamp + Lattice + Ord + Clone; /// Associated update. type Diff: Semigroup; /// The type of an immutable collection of updates. - type Batch: for<'a> BatchReader = Self::Key<'a>, KeyOwned = Self::KeyOwned, Val<'a> = Self::Val<'a>, ValOwned = Self::ValOwned, Time = Self::Time, Diff = Self::Diff>+Clone+'static; + type Batch: for<'a> BatchReader = Self::Key<'a>, KeyOwned = Self::KeyOwned, Val<'a> = Self::Val<'a>, Time = Self::Time, Diff = Self::Diff>+Clone+'static; /// Storage type for `Self::Cursor`. Likely related to `Self::Batch`. type Storage; /// The type used to enumerate the collections contents. - type Cursor: for<'a> Cursor = Self::Key<'a>, KeyOwned = Self::KeyOwned, Val<'a> = Self::Val<'a>, ValOwned = Self::ValOwned, Time = Self::Time, Diff = Self::Diff>; + type Cursor: for<'a> Cursor = Self::Key<'a>, KeyOwned = Self::KeyOwned, Val<'a> = Self::Val<'a>, Time = Self::Time, Diff = Self::Diff>; /// Provides a cursor over updates contained in the trace. fn cursor(&mut self) -> (Self::Cursor, Self::Storage) { @@ -264,16 +262,14 @@ where /// Owned version of the above. type KeyOwned: Ord + Clone; /// Values associated with keys. - type Val<'a>: Copy + Clone + MyTrait<'a, Owned = Self::ValOwned>; - /// Owned version of the above. - type ValOwned: Ord + Clone; + type Val<'a>: Copy + Clone + MyTrait<'a>; /// Timestamps associated with updates type Time: Timestamp + Lattice + Ord + Clone; /// Associated update. type Diff: Semigroup; /// The type used to enumerate the batch's contents. - type Cursor: for<'a> Cursor = Self::Key<'a>, KeyOwned = Self::KeyOwned, Val<'a> = Self::Val<'a>, ValOwned = Self::ValOwned, Time = Self::Time, Diff = Self::Diff>; + type Cursor: for<'a> Cursor = Self::Key<'a>, KeyOwned = Self::KeyOwned, Val<'a> = Self::Val<'a>, Time = Self::Time, Diff = Self::Diff>; /// Acquires a cursor to the batch's contents. fn cursor(&self) -> Self::Cursor; /// The number of updates in the batch. @@ -384,7 +380,6 @@ pub mod rc_blanket_impls { type Key<'a> = B::Key<'a>; type KeyOwned = B::KeyOwned; type Val<'a> = B::Val<'a>; - type ValOwned = B::ValOwned; type Time = B::Time; type Diff = B::Diff; @@ -419,7 +414,6 @@ pub mod rc_blanket_impls { type Key<'a> = C::Key<'a>; type KeyOwned = C::KeyOwned; type Val<'a> = C::Val<'a>; - type ValOwned = C::ValOwned; type Time = C::Time; type Diff = C::Diff; @@ -490,7 +484,6 @@ pub mod abomonated_blanket_impls { type Key<'a> = B::Key<'a>; type KeyOwned = B::KeyOwned; type Val<'a> = B::Val<'a>; - type ValOwned = B::ValOwned; type Time = B::Time; type Diff = B::Diff; @@ -525,7 +518,6 @@ pub mod abomonated_blanket_impls { type Key<'a> = C::Key<'a>; type KeyOwned = C::KeyOwned; type Val<'a> = C::Val<'a>; - type ValOwned = C::ValOwned; type Time = C::Time; type Diff = C::Diff; diff --git a/src/trace/wrappers/enter.rs b/src/trace/wrappers/enter.rs index 49f3a1823..ce59e955e 100644 --- a/src/trace/wrappers/enter.rs +++ b/src/trace/wrappers/enter.rs @@ -34,7 +34,6 @@ where type Key<'a> = Tr::Key<'a>; type KeyOwned = Tr::KeyOwned; type Val<'a> = Tr::Val<'a>; - type ValOwned = Tr::ValOwned; type Time = TInner; type Diff = Tr::Diff; @@ -118,7 +117,6 @@ where type Key<'a> = B::Key<'a>; type KeyOwned = B::KeyOwned; type Val<'a> = B::Val<'a>; - type ValOwned = B::ValOwned; type Time = TInner; type Diff = B::Diff; @@ -172,7 +170,6 @@ where type Key<'a> = C::Key<'a>; type KeyOwned = C::KeyOwned; type Val<'a> = C::Val<'a>; - type ValOwned = C::ValOwned; type Time = TInner; type Diff = C::Diff; @@ -225,7 +222,6 @@ where type Key<'a> = C::Key<'a>; type KeyOwned = C::KeyOwned; type Val<'a> = C::Val<'a>; - type ValOwned = C::ValOwned; type Time = TInner; type Diff = C::Diff; diff --git a/src/trace/wrappers/enter_at.rs b/src/trace/wrappers/enter_at.rs index 7f67234d9..7e157c409 100644 --- a/src/trace/wrappers/enter_at.rs +++ b/src/trace/wrappers/enter_at.rs @@ -52,7 +52,6 @@ where type Key<'a> = Tr::Key<'a>; type KeyOwned = Tr::KeyOwned; type Val<'a> = Tr::Val<'a>; - type ValOwned = Tr::ValOwned; type Time = TInner; type Diff = Tr::Diff; @@ -141,7 +140,6 @@ where type Key<'a> = B::Key<'a>; type KeyOwned = B::KeyOwned; type Val<'a> = B::Val<'a>; - type ValOwned = B::ValOwned; type Time = TInner; type Diff = B::Diff; @@ -199,7 +197,6 @@ where type Key<'a> = C::Key<'a>; type KeyOwned = C::KeyOwned; type Val<'a> = C::Val<'a>; - type ValOwned = C::ValOwned; type Time = TInner; type Diff = C::Diff; @@ -258,7 +255,6 @@ where type Key<'a> = C::Key<'a>; type KeyOwned = C::KeyOwned; type Val<'a> = C::Val<'a>; - type ValOwned = C::ValOwned; type Time = TInner; type Diff = C::Diff; diff --git a/src/trace/wrappers/filter.rs b/src/trace/wrappers/filter.rs index 385285529..b4f320700 100644 --- a/src/trace/wrappers/filter.rs +++ b/src/trace/wrappers/filter.rs @@ -33,7 +33,6 @@ where type Key<'a> = Tr::Key<'a>; type KeyOwned = Tr::KeyOwned; type Val<'a> = Tr::Val<'a>; - type ValOwned = Tr::ValOwned; type Time = Tr::Time; type Diff = Tr::Diff; @@ -87,7 +86,6 @@ where type Key<'a> = B::Key<'a>; type KeyOwned = B::KeyOwned; type Val<'a> = B::Val<'a>; - type ValOwned = B::ValOwned; type Time = B::Time; type Diff = B::Diff; @@ -136,7 +134,6 @@ where type Key<'a> = C::Key<'a>; type KeyOwned = C::KeyOwned; type Val<'a> = C::Val<'a>; - type ValOwned = C::ValOwned; type Time = C::Time; type Diff = C::Diff; @@ -191,7 +188,6 @@ where type Key<'a> = C::Key<'a>; type KeyOwned = C::KeyOwned; type Val<'a> = C::Val<'a>; - type ValOwned = C::ValOwned; type Time = C::Time; type Diff = C::Diff; diff --git a/src/trace/wrappers/freeze.rs b/src/trace/wrappers/freeze.rs index 9115cd244..310ef3157 100644 --- a/src/trace/wrappers/freeze.rs +++ b/src/trace/wrappers/freeze.rs @@ -78,7 +78,6 @@ where type Key<'a> = Tr::Key<'a>; type KeyOwned = Tr::KeyOwned; type Val<'a> = Tr::Val<'a>; - type ValOwned = Tr::ValOwned; type Time = Tr::Time; type Diff = Tr::Diff; @@ -142,7 +141,6 @@ where type Key<'a> = B::Key<'a>; type KeyOwned = B::KeyOwned; type Val<'a> = B::Val<'a>; - type ValOwned = B::ValOwned; type Time = B::Time; type Diff = B::Diff; @@ -186,7 +184,6 @@ where type Key<'a> = C::Key<'a>; type KeyOwned = C::KeyOwned; type Val<'a> = C::Val<'a>; - type ValOwned = C::ValOwned; type Time = C::Time; type Diff = C::Diff; @@ -238,7 +235,6 @@ where type Key<'a> = C::Key<'a>; type KeyOwned = C::KeyOwned; type Val<'a> = C::Val<'a>; - type ValOwned = C::ValOwned; type Time = C::Time; type Diff = C::Diff; diff --git a/src/trace/wrappers/frontier.rs b/src/trace/wrappers/frontier.rs index 9d9d50307..02cce0dd1 100644 --- a/src/trace/wrappers/frontier.rs +++ b/src/trace/wrappers/frontier.rs @@ -35,7 +35,6 @@ impl TraceReader for TraceFrontier { type Key<'a> = Tr::Key<'a>; type KeyOwned = Tr::KeyOwned; type Val<'a> = Tr::Val<'a>; - type ValOwned = Tr::ValOwned; type Time = Tr::Time; type Diff = Tr::Diff; @@ -86,7 +85,6 @@ impl BatchReader for BatchFrontier { type Key<'a> = B::Key<'a>; type KeyOwned = B::KeyOwned; type Val<'a> = B::Val<'a>; - type ValOwned = B::ValOwned; type Time = B::Time; type Diff = B::Diff; @@ -131,7 +129,6 @@ impl Cursor for CursorFrontier { type Key<'a> = C::Key<'a>; type KeyOwned = C::KeyOwned; type Val<'a> = C::Val<'a>; - type ValOwned = C::ValOwned; type Time = C::Time; type Diff = C::Diff; @@ -193,7 +190,6 @@ where type Key<'a> = C::Key<'a>; type KeyOwned = C::KeyOwned; type Val<'a> = C::Val<'a>; - type ValOwned = C::ValOwned; type Time = C::Time; type Diff = C::Diff; diff --git a/src/trace/wrappers/rc.rs b/src/trace/wrappers/rc.rs index ceda2e479..29d91d969 100644 --- a/src/trace/wrappers/rc.rs +++ b/src/trace/wrappers/rc.rs @@ -82,7 +82,6 @@ impl TraceReader for TraceRc { type Key<'a> = Tr::Key<'a>; type KeyOwned = Tr::KeyOwned; type Val<'a> = Tr::Val<'a>; - type ValOwned = Tr::ValOwned; type Time = Tr::Time; type Diff = Tr::Diff; diff --git a/tests/trace.rs b/tests/trace.rs index dac7eff22..a637b1700 100644 --- a/tests/trace.rs +++ b/tests/trace.rs @@ -35,11 +35,11 @@ fn test_trace() { let mut trace = get_trace(); let (mut cursor1, storage1) = trace.cursor_through(AntichainRef::new(&[1])).unwrap(); - let vec_1 = cursor1.to_vec(&storage1); + let vec_1 = cursor1.to_vec(|v| v.clone(), &storage1); assert_eq!(vec_1, vec![((1, 2), vec![(0, 1)])]); let (mut cursor2, storage2) = trace.cursor_through(AntichainRef::new(&[2])).unwrap(); - let vec_2 = cursor2.to_vec(&storage2); + let vec_2 = cursor2.to_vec(|v| v.clone(), &storage2); println!("--> {:?}", vec_2); assert_eq!(vec_2, vec![ ((1, 2), vec![(0, 1)]), @@ -47,13 +47,13 @@ fn test_trace() { ]); let (mut cursor3, storage3) = trace.cursor_through(AntichainRef::new(&[3])).unwrap(); - let vec_3 = cursor3.to_vec(&storage3); + let vec_3 = cursor3.to_vec(|v| v.clone(), &storage3); assert_eq!(vec_3, vec![ ((1, 2), vec![(0, 1)]), ((2, 3), vec![(1, 1), (2, -1)]), ]); let (mut cursor4, storage4) = trace.cursor(); - let vec_4 = cursor4.to_vec(&storage4); + let vec_4 = cursor4.to_vec(|v| v.clone(), &storage4); assert_eq!(vec_4, vec_3); } From b281e50565c26d14fcbf4bcdab8e8ad01e1f4baa Mon Sep 17 00:00:00 2001 From: Moritz Hoffmann Date: Fri, 26 Apr 2024 17:10:26 -0400 Subject: [PATCH 3/5] Merge batcher generic over containers (#474) * Split default merge batcher This change splits the default merge batcher implementation into a type that maintains the outer part of its algorithm, specifically knows how to maintain chains, and an inner part that knows how to maintain the individual batches in chains. The benefit is that the outer part does not need to know about the contents of the containers it holds on to because that's encapsulated in the inner trait's implementation. Signed-off-by: Moritz Hoffmann * Undo some changes, rip out old columnated batcher Signed-off-by: Moritz Hoffmann * Address feedback Signed-off-by: Moritz Hoffmann * Formatting and renaming Signed-off-by: Moritz Hoffmann * Address feedback Signed-off-by: Moritz Hoffmann --------- Signed-off-by: Moritz Hoffmann --- src/operators/arrange/arrangement.rs | 2 +- src/operators/reduce.rs | 2 +- src/trace/implementations/merge_batcher.rs | 589 +++++++++++------- .../implementations/merge_batcher_col.rs | 570 +++++++---------- src/trace/implementations/mod.rs | 3 - src/trace/implementations/ord_neu.rs | 14 +- src/trace/implementations/rhh.rs | 16 +- src/trace/mod.rs | 6 +- tests/trace.rs | 2 +- 9 files changed, 630 insertions(+), 574 deletions(-) diff --git a/src/operators/arrange/arrangement.rs b/src/operators/arrange/arrangement.rs index c22749a27..ba7bf2045 100644 --- a/src/operators/arrange/arrangement.rs +++ b/src/operators/arrange/arrangement.rs @@ -464,7 +464,7 @@ where input.for_each(|cap, data| { capabilities.insert(cap.retain()); - batcher.push_batch(data); + batcher.push_container(data); }); // The frontier may have advanced by multiple elements, which is an issue because diff --git a/src/operators/reduce.rs b/src/operators/reduce.rs index db80a5d44..dda549bca 100644 --- a/src/operators/reduce.rs +++ b/src/operators/reduce.rs @@ -240,7 +240,7 @@ pub trait ReduceCore where /// .map(|x| (x, x)) /// .reduce_abelian::<_,_,ValSpine<_,_,_,_>>( /// "Example", - /// Clone::clone, + /// Clone::clone, /// move |_key, src, dst| dst.push((*src[0].0, 1)) /// ) /// .trace; diff --git a/src/trace/implementations/merge_batcher.rs b/src/trace/implementations/merge_batcher.rs index 2cf27ed1b..45e2a60f8 100644 --- a/src/trace/implementations/merge_batcher.rs +++ b/src/trace/implementations/merge_batcher.rs @@ -5,165 +5,243 @@ use std::collections::VecDeque; use timely::communication::message::RefOrMut; use timely::logging::WorkerIdentifier; use timely::logging_core::Logger; +use timely::progress::frontier::AntichainRef; use timely::progress::{frontier::Antichain, Timestamp}; +use timely::{Container, PartialOrder}; +use crate::consolidation::consolidate_updates; use crate::difference::Semigroup; use crate::logging::{BatcherEvent, DifferentialEvent}; use crate::trace::{Batcher, Builder}; +use crate::Data; /// Creates batches from unordered tuples. -pub struct MergeBatcher { - sorter: MergeSorter<(K, V), T, D>, +pub struct MergeBatcher +where + M: Merger, +{ + /// each power-of-two length list of allocations. + /// Do not push/pop directly but use the corresponding functions + /// ([`Self::chain_push`]/[`Self::chain_pop`]). + chains: Vec>, + /// Stash of empty chunks + stash: Vec, + /// Thing to accept data, merge chains, and talk to the builder. + merger: M, + /// Logger for size accounting. + logger: Option>, + /// Timely operator ID. + operator_id: usize, + /// Current lower frontier, we sealed up to here. lower: Antichain, + /// The lower-bound frontier of the data, after the last call to seal. frontier: Antichain, } -impl Batcher for MergeBatcher +impl Batcher for MergeBatcher where - K: Ord + Clone, - V: Ord + Clone, + M: Merger