From b6ca958887973b67e4b4b5427d606712e68e93ba Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Sat, 25 May 2024 18:48:12 -0400 Subject: [PATCH 1/5] Remove references to KeyOwned --- dogsdogsdogs/src/operators/count.rs | 10 ++++++---- dogsdogsdogs/src/operators/half_join.rs | 23 ++++++++++++---------- dogsdogsdogs/src/operators/lookup_map.rs | 20 ++++++++++--------- dogsdogsdogs/src/operators/propose.rs | 19 ++++++++++-------- dogsdogsdogs/src/operators/validate.rs | 6 ++++-- examples/spines.rs | 4 ++-- src/operators/arrange/arrangement.rs | 18 +++++++++++------ src/operators/arrange/upsert.rs | 16 ++++++++------- src/operators/consolidate.rs | 5 ++++- src/operators/count.rs | 9 ++++++--- src/operators/reduce.rs | 25 +++++++++++++++--------- src/trace/cursor/mod.rs | 3 ++- 12 files changed, 96 insertions(+), 62 deletions(-) diff --git a/dogsdogsdogs/src/operators/count.rs b/dogsdogsdogs/src/operators/count.rs index d77c0b7e4..700801b7b 100644 --- a/dogsdogsdogs/src/operators/count.rs +++ b/dogsdogsdogs/src/operators/count.rs @@ -4,6 +4,7 @@ use differential_dataflow::{ExchangeData, Collection, Hashable}; use differential_dataflow::difference::{Monoid, Multiply}; use differential_dataflow::operators::arrange::Arranged; use differential_dataflow::trace::TraceReader; +use differential_dataflow::trace::cursor::IntoOwned; /// Reports a number of extensions to a stream of prefixes. /// @@ -11,7 +12,7 @@ use differential_dataflow::trace::TraceReader; /// For each triple, it extracts a key using `key_selector`, and finds the /// associated count in `arrangement`. If the found count is less than `count`, /// the `count` and `index` fields are overwritten with their new values. -pub fn count( +pub fn count( prefixes: &Collection, arrangement: Arranged, key_selector: F, @@ -20,15 +21,16 @@ pub fn count( where G: Scope, Tr: TraceReader+Clone+'static, - Tr::KeyOwned: Hashable + Default, + for<'a> Tr::Key<'a>: IntoOwned<'a, Owned = K>, + K: Hashable + Ord + Default + 'static, R: Monoid+Multiply+ExchangeData, - F: Fn(&P)->Tr::KeyOwned+Clone+'static, + F: Fn(&P)->K+Clone+'static, P: ExchangeData, { crate::operators::lookup_map( prefixes, arrangement, - move |p: &(P,usize,usize), k: &mut Tr::KeyOwned| { *k = key_selector(&p.0); }, + move |p: &(P,usize,usize), k: &mut K| { *k = key_selector(&p.0); }, move |(p,c,i), r, _, s| { let s = *s as usize; if *c < s { ((p.clone(), *c, *i), r.clone()) } diff --git a/dogsdogsdogs/src/operators/half_join.rs b/dogsdogsdogs/src/operators/half_join.rs index 877b1399e..799febeef 100644 --- a/dogsdogsdogs/src/operators/half_join.rs +++ b/dogsdogsdogs/src/operators/half_join.rs @@ -45,6 +45,7 @@ use differential_dataflow::lattice::Lattice; use differential_dataflow::operators::arrange::Arranged; use differential_dataflow::trace::{Cursor, TraceReader}; use differential_dataflow::consolidation::{consolidate, consolidate_updates}; +use differential_dataflow::trace::cursor::IntoOwned; /// A binary equijoin that responds to updates on only its first input. /// @@ -68,8 +69,8 @@ use differential_dataflow::consolidation::{consolidate, consolidate_updates}; /// Notice that the time is hoisted up into data. The expectation is that /// once out of the "delta flow region", the updates will be `delay`d to the /// times specified in the payloads. -pub fn half_join( - stream: &Collection, +pub fn half_join( + stream: &Collection, arrangement: Arranged, frontier_func: FF, comparison: CF, @@ -77,18 +78,19 @@ pub fn half_join( ) -> Collection>::Output> where G: Scope, - Tr::KeyOwned: Hashable + ExchangeData, + K: Hashable + ExchangeData, V: ExchangeData, R: ExchangeData + Monoid, Tr: TraceReader+Clone+'static, + for<'a> Tr::Key<'a> : IntoOwned<'a, Owned = K>, R: Mul, >::Output: Semigroup, FF: Fn(&G::Timestamp, &mut Antichain) + 'static, CF: Fn(&G::Timestamp, &G::Timestamp) -> bool + 'static, DOut: Clone+'static, - S: FnMut(&Tr::KeyOwned, &V, Tr::Val<'_>)->DOut+'static, + S: FnMut(&K, &V, Tr::Val<'_>)->DOut+'static, { - let output_func = move |k: &Tr::KeyOwned, v1: &V, v2: Tr::Val<'_>, initial: &G::Timestamp, time: &G::Timestamp, diff1: &R, diff2: &Tr::Diff| { + let output_func = move |k: &K, v1: &V, v2: Tr::Val<'_>, initial: &G::Timestamp, time: &G::Timestamp, diff1: &R, diff2: &Tr::Diff| { let diff = diff1.clone() * diff2.clone(); let dout = (output_func(k, v1, v2), time.clone()); Some((dout, initial.clone(), diff)) @@ -120,8 +122,8 @@ where /// yield control, as a function of the elapsed time and the number of matched /// records. Note this is not the number of *output* records, owing mainly to /// the number of matched records being easiest to record with low overhead. -pub fn half_join_internal_unsafe( - stream: &Collection, +pub fn half_join_internal_unsafe( + stream: &Collection, mut arrangement: Arranged, frontier_func: FF, comparison: CF, @@ -130,17 +132,18 @@ pub fn half_join_internal_unsafe( ) -> Collection where G: Scope, - Tr::KeyOwned: Hashable + ExchangeData, + K: Hashable + ExchangeData, V: ExchangeData, R: ExchangeData + Monoid, Tr: TraceReader+Clone+'static, + for<'a> Tr::Key<'a> : IntoOwned<'a, Owned = K>, FF: Fn(&G::Timestamp, &mut Antichain) + 'static, CF: Fn(&G::Timestamp, &G::Timestamp) -> bool + 'static, DOut: Clone+'static, ROut: Semigroup, Y: Fn(std::time::Instant, usize) -> bool + 'static, I: IntoIterator, - S: FnMut(&Tr::KeyOwned, &V, Tr::Val<'_>, &G::Timestamp, &G::Timestamp, &R, &Tr::Diff)-> I + 'static, + S: FnMut(&K, &V, Tr::Val<'_>, &G::Timestamp, &G::Timestamp, &R, &Tr::Diff)-> I + 'static, { // No need to block physical merging for this operator. arrangement.trace.set_physical_compaction(Antichain::new().borrow()); @@ -150,7 +153,7 @@ where let mut stash = HashMap::new(); let mut buffer = Vec::new(); - let exchange = Exchange::new(move |update: &((Tr::KeyOwned, V, G::Timestamp),G::Timestamp,R)| (update.0).0.hashed().into()); + let exchange = Exchange::new(move |update: &((K, V, G::Timestamp),G::Timestamp,R)| (update.0).0.hashed().into()); // Stash for (time, diff) accumulation. let mut output_buffer = Vec::new(); diff --git a/dogsdogsdogs/src/operators/lookup_map.rs b/dogsdogsdogs/src/operators/lookup_map.rs index f1369eab1..c361871ac 100644 --- a/dogsdogsdogs/src/operators/lookup_map.rs +++ b/dogsdogsdogs/src/operators/lookup_map.rs @@ -10,27 +10,29 @@ use differential_dataflow::{ExchangeData, Collection, AsCollection, Hashable}; use differential_dataflow::difference::{Semigroup, Monoid}; use differential_dataflow::operators::arrange::Arranged; use differential_dataflow::trace::{Cursor, TraceReader}; +use differential_dataflow::trace::cursor::IntoOwned; /// Proposes extensions to a stream of prefixes. /// /// This method takes a stream of prefixes and for each determines a /// key with `key_selector` and then proposes all pair af the prefix /// and values associated with the key in `arrangement`. -pub fn lookup_map( +pub fn lookup_map( prefixes: &Collection, mut arrangement: Arranged, key_selector: F, mut output_func: S, - supplied_key0: Tr::KeyOwned, - supplied_key1: Tr::KeyOwned, - supplied_key2: Tr::KeyOwned, + supplied_key0: K, + supplied_key1: K, + supplied_key2: K, ) -> Collection where G: Scope, Tr: TraceReader+Clone+'static, - Tr::KeyOwned: Hashable, + for<'a> Tr::Key<'a>: IntoOwned<'a, Owned = K>, + K: Hashable + Ord + 'static, Tr::Diff: Monoid+ExchangeData, - F: FnMut(&D, &mut Tr::KeyOwned)+Clone+'static, + F: FnMut(&D, &mut K)+Clone+'static, D: ExchangeData, R: ExchangeData+Monoid, DOut: Clone+'static, @@ -48,14 +50,14 @@ where let mut buffer = Vec::new(); - let mut key: Tr::KeyOwned = supplied_key0; + let mut key: K = supplied_key0; let exchange = Exchange::new(move |update: &(D,G::Timestamp,R)| { logic1(&update.0, &mut key); key.hashed().into() }); - let mut key1: Tr::KeyOwned = supplied_key1; - let mut key2: Tr::KeyOwned = supplied_key2; + let mut key1: K = supplied_key1; + let mut key2: K = supplied_key2; prefixes.inner.binary_frontier(&propose_stream, exchange, Pipeline, "LookupMap", move |_,_| move |input1, input2, output| { diff --git a/dogsdogsdogs/src/operators/propose.rs b/dogsdogsdogs/src/operators/propose.rs index 36024e6d4..cd0bc4cd0 100644 --- a/dogsdogsdogs/src/operators/propose.rs +++ b/dogsdogsdogs/src/operators/propose.rs @@ -4,6 +4,7 @@ use differential_dataflow::{ExchangeData, Collection, Hashable}; use differential_dataflow::difference::{Monoid, Multiply}; use differential_dataflow::operators::arrange::Arranged; use differential_dataflow::trace::TraceReader; +use differential_dataflow::trace::cursor::IntoOwned; /// Proposes extensions to a prefix stream. /// @@ -13,7 +14,7 @@ use differential_dataflow::trace::TraceReader; /// create a join if the `prefixes` collection is also arranged and responds to changes that /// `arrangement` undergoes. More complicated patterns are also appropriate, as in the case /// of delta queries. -pub fn propose( +pub fn propose( prefixes: &Collection, arrangement: Arranged, key_selector: F, @@ -22,9 +23,10 @@ pub fn propose( where G: Scope, Tr: TraceReader+Clone+'static, - Tr::KeyOwned: Hashable + Default, + for<'a> Tr::Key<'a> : IntoOwned<'a, Owned = K>, + K: Hashable + Default + Ord + 'static, Tr::Diff: Monoid+Multiply+ExchangeData, - F: Fn(&P)->Tr::KeyOwned+Clone+'static, + F: Fn(&P)->K+Clone+'static, P: ExchangeData, V: Clone + 'static, VF: Fn(Tr::Val<'_>) -> V + 'static, @@ -32,7 +34,7 @@ where crate::operators::lookup_map( prefixes, arrangement, - move |p: &P, k: &mut Tr::KeyOwned | { *k = key_selector(p); }, + move |p: &P, k: &mut K | { *k = key_selector(p); }, move |prefix, diff, value, sum| ((prefix.clone(), val_from(value)), diff.clone().multiply(sum)), Default::default(), Default::default(), @@ -45,7 +47,7 @@ where /// Unlike `propose`, this method does not scale the multiplicity of matched /// prefixes by the number of matches in `arrangement`. This can be useful to /// avoid the need to prepare an arrangement of distinct extensions. -pub fn propose_distinct( +pub fn propose_distinct( prefixes: &Collection, arrangement: Arranged, key_selector: F, @@ -54,9 +56,10 @@ pub fn propose_distinct( where G: Scope, Tr: TraceReader+Clone+'static, - Tr::KeyOwned: Hashable + Default, + for<'a> Tr::Key<'a> : IntoOwned<'a, Owned = K>, + K: Hashable + Default + Ord + 'static, Tr::Diff: Monoid+Multiply+ExchangeData, - F: Fn(&P)->Tr::KeyOwned+Clone+'static, + F: Fn(&P)->K+Clone+'static, P: ExchangeData, V: Clone + 'static, VF: Fn(Tr::Val<'_>) -> V + 'static, @@ -64,7 +67,7 @@ where crate::operators::lookup_map( prefixes, arrangement, - move |p: &P, k: &mut Tr::KeyOwned| { *k = key_selector(p); }, + move |p: &P, k: &mut K| { *k = key_selector(p); }, move |prefix, diff, value, _sum| ((prefix.clone(), val_from(value)), diff.clone()), Default::default(), Default::default(), diff --git a/dogsdogsdogs/src/operators/validate.rs b/dogsdogsdogs/src/operators/validate.rs index 7ddbf38c5..fdadc7ba9 100644 --- a/dogsdogsdogs/src/operators/validate.rs +++ b/dogsdogsdogs/src/operators/validate.rs @@ -6,6 +6,7 @@ use differential_dataflow::{ExchangeData, Collection}; use differential_dataflow::difference::{Monoid, Multiply}; use differential_dataflow::operators::arrange::Arranged; use differential_dataflow::trace::TraceReader; +use differential_dataflow::trace::cursor::IntoOwned; /// Proposes extensions to a stream of prefixes. /// @@ -19,8 +20,9 @@ pub fn validate( ) -> Collection where G: Scope, - Tr: TraceReader+Clone+'static, - K: Ord+Hash+Clone+Default, + Tr: TraceReader+Clone+'static, + for<'a> Tr::Key<'a> : IntoOwned<'a, Owned = (K, V)>, + K: Ord+Hash+Clone+Default + 'static, V: ExchangeData+Hash+Default, Tr::Diff: Monoid+Multiply+ExchangeData, F: Fn(&P)->K+Clone+'static, diff --git a/examples/spines.rs b/examples/spines.rs index b0f5eddd1..947c7b26d 100644 --- a/examples/spines.rs +++ b/examples/spines.rs @@ -55,11 +55,11 @@ fn main() { let data = data.map(|x| (x.clone().into_bytes(), x.into_bytes())) .arrange::>() - .reduce_abelian::<_, _, _, PreferredSpine<[u8],(),_,_>>("distinct", |_| (), |_,_,output| output.push(((), 1))); + .reduce_abelian::<_, _, _, _, PreferredSpine<[u8],(),_,_>>("distinct", |_| (), |_,_,output| output.push(((), 1))); let keys = keys.map(|x| (x.clone().into_bytes(), 7)) .arrange::>() - .reduce_abelian::<_, _, _, PreferredSpine<[u8],(),_,_>>("distinct", |_| (), |_,_,output| output.push(((), 1))); + .reduce_abelian::<_, _, _, _, PreferredSpine<[u8],(),_,_>>("distinct", |_| (), |_,_,output| output.push(((), 1))); keys.join_core(&data, |_k, &(), &()| Option::<()>::None) .probe_with(&mut probe); diff --git a/src/operators/arrange/arrangement.rs b/src/operators/arrange/arrangement.rs index cdc3b2759..10e7becca 100644 --- a/src/operators/arrange/arrangement.rs +++ b/src/operators/arrange/arrangement.rs @@ -279,6 +279,8 @@ where } } +use crate::trace::cursor::IntoOwned; + // Direct reduce implementations. use crate::difference::Abelian; impl Arranged @@ -287,17 +289,19 @@ where T1: TraceReader + Clone + 'static, { /// A direct implementation of `ReduceCore::reduce_abelian`. - pub fn reduce_abelian(&self, name: &str, from: F, mut logic: L) -> Arranged> + pub fn reduce_abelian(&self, name: &str, from: F, mut logic: L) -> Arranged> where + for<'a> T1::Key<'a>: IntoOwned<'a, Owned = K>, T2: for<'a> Trace= T1::Key<'a>, Time=T1::Time>+'static, + K: Ord + 'static, V: Data, F: Fn(T2::Val<'_>) -> V + 'static, T2::Diff: Abelian, T2::Batch: Batch, - ::Input: Container + PushInto<((T1::KeyOwned, V), T2::Time, T2::Diff)>, + ::Input: Container + PushInto<((K, V), T2::Time, T2::Diff)>, L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(V, T2::Diff)>)+'static, { - self.reduce_core::<_,V,F,T2>(name, from, move |key, input, output, change| { + self.reduce_core::<_,K,V,F,T2>(name, from, move |key, input, output, change| { if !input.is_empty() { logic(key, input, change); } @@ -307,17 +311,19 @@ where } /// A direct implementation of `ReduceCore::reduce_core`. - pub fn reduce_core(&self, name: &str, from: F, logic: L) -> Arranged> + pub fn reduce_core(&self, name: &str, from: F, logic: L) -> Arranged> where + for<'a> T1::Key<'a>: IntoOwned<'a, Owned = K>, T2: for<'a> Trace=T1::Key<'a>, Time=T1::Time>+'static, + K: Ord + 'static, V: Data, F: Fn(T2::Val<'_>) -> V + 'static, T2::Batch: Batch, - ::Input: Container + PushInto<((T1::KeyOwned, V), T2::Time, T2::Diff)>, + ::Input: Container + PushInto<((K, V), T2::Time, T2::Diff)>, L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(V, T2::Diff)>, &mut Vec<(V, T2::Diff)>)+'static, { use crate::operators::reduce::reduce_trace; - reduce_trace::<_,_,_,V,_,_>(self, name, from, logic) + reduce_trace::<_,_,_,_,V,_,_>(self, name, from, logic) } } diff --git a/src/operators/arrange/upsert.rs b/src/operators/arrange/upsert.rs index a36781a5e..69f2317e6 100644 --- a/src/operators/arrange/upsert.rs +++ b/src/operators/arrange/upsert.rs @@ -59,7 +59,7 @@ //! use differential_dataflow::operators::arrange::upsert; //! //! let stream = scope.input_from(&mut input); -//! let arranged = upsert::arrange_from_upsert::<_, _, _, ValSpine>(&stream, &"test", |v| v.clone()); +//! let arranged = upsert::arrange_from_upsert::<_, _, _, _, ValSpine>(&stream, &"test", |v| v.clone()); //! //! arranged //! .as_collection(|k,v| (k.clone(), v.clone())) @@ -111,6 +111,7 @@ use timely::dataflow::operators::Capability; use crate::operators::arrange::arrangement::Arranged; use crate::trace::Builder; use crate::trace::{self, Trace, TraceReader, Batch, Cursor}; +use crate::trace::cursor::IntoOwned; use crate::{ExchangeData, Hashable}; use super::TraceAgent; @@ -125,20 +126,21 @@ use super::TraceAgent; /// This method is only implemented for totally ordered times, as we do not yet /// understand what a "sequence" of upserts would mean for partially ordered /// timestamps. -pub fn arrange_from_upsert( - stream: &Stream, G::Timestamp)>, +pub fn arrange_from_upsert( + stream: &Stream, G::Timestamp)>, name: &str, from: F, ) -> Arranged> where G: Scope, Tr: Trace+TraceReader+'static, - Tr::KeyOwned: ExchangeData+Hashable+std::hash::Hash, + for<'a> Tr::Key<'a> : IntoOwned<'a, Owned = K>, + K: ExchangeData+Hashable+std::hash::Hash, V: ExchangeData, F: Fn(Tr::Val<'_>) -> V + 'static, Tr::Time: TotalOrder+ExchangeData, Tr::Batch: Batch, - Tr::Builder: Builder>, + Tr::Builder: Builder>, { let mut reader: Option> = None; @@ -147,7 +149,7 @@ where let reader = &mut reader; - let exchange = Exchange::new(move |update: &(Tr::KeyOwned,Option,G::Timestamp)| (update.0).hashed().into()); + let exchange = Exchange::new(move |update: &(K,Option,G::Timestamp)| (update.0).hashed().into()); stream.unary_frontier(exchange, name, move |_capability, info| { @@ -177,7 +179,7 @@ where let mut prev_frontier = Antichain::from_elem(::minimum()); // For stashing input upserts, ordered increasing by time (`BinaryHeap` is a max-heap). - let mut priority_queue = BinaryHeap::)>>::new(); + let mut priority_queue = BinaryHeap::)>>::new(); let mut updates = Vec::new(); move |input, output| { diff --git a/src/operators/consolidate.rs b/src/operators/consolidate.rs index 86b7e729c..afe201d4a 100644 --- a/src/operators/consolidate.rs +++ b/src/operators/consolidate.rs @@ -8,6 +8,8 @@ use timely::dataflow::Scope; +use crate::trace::cursor::IntoOwned; + use crate::{Collection, ExchangeData, Hashable}; use crate::consolidation::ConsolidatingContainerBuilder; use crate::difference::Semigroup; @@ -52,7 +54,8 @@ where /// As `consolidate` but with the ability to name the operator and specify the trace type. pub fn consolidate_named(&self, name: &str) -> Self where - Tr: crate::trace::Trace+'static, + Tr: crate::trace::Trace+'static, + for<'a> Tr::Key<'a>: IntoOwned<'a, Owned = D>, Tr::Batch: crate::trace::Batch, Tr::Batcher: Batcher>, { diff --git a/src/operators/count.rs b/src/operators/count.rs index 42a030786..6c0400179 100644 --- a/src/operators/count.rs +++ b/src/operators/count.rs @@ -5,6 +5,8 @@ use timely::dataflow::*; use timely::dataflow::operators::Operator; use timely::dataflow::channels::pact::Pipeline; +use crate::trace::cursor::IntoOwned; + use crate::lattice::Lattice; use crate::{ExchangeData, Collection}; use crate::difference::Semigroup; @@ -50,15 +52,16 @@ where G::Timestamp: TotalOrder+Lattice+Ord { } } -impl CountTotal for Arranged +impl CountTotal for Arranged where G: Scope, T1: for<'a> TraceReader=&'a ()>+Clone+'static, - T1::KeyOwned: ExchangeData, + for<'a> T1::Key<'a>: IntoOwned<'a, Owned = K>, + K: ExchangeData, T1::Time: TotalOrder, T1::Diff: ExchangeData, { - fn count_total_core>(&self) -> Collection { + fn count_total_core>(&self) -> Collection { let mut trace = self.trace.clone(); let mut buffer = Vec::new(); diff --git a/src/operators/reduce.rs b/src/operators/reduce.rs index 6586fea8f..ebec54bf6 100644 --- a/src/operators/reduce.rs +++ b/src/operators/reduce.rs @@ -19,6 +19,8 @@ use timely::dataflow::operators::Operator; use timely::dataflow::channels::pact::Pipeline; use timely::dataflow::operators::Capability; +use crate::trace::cursor::IntoOwned; + use crate::operators::arrange::{Arranged, ArrangeByKey, ArrangeBySelf, TraceAgent}; use crate::lattice::Lattice; use crate::trace::{Batch, BatchReader, Cursor, Trace, Builder, ExertionLogic}; @@ -85,11 +87,12 @@ impl Reduce for Collection impl Reduce for Arranged where G: Scope, - T1: for<'a> TraceReader=&'a K, KeyOwned=K, Val<'a>=&'a V, Diff=R>+Clone+'static, + T1: for<'a> TraceReader=&'a K, Val<'a>=&'a V, Diff=R>+Clone+'static, + for<'a> T1::Key<'a>: IntoOwned<'a, Owned = K>, { fn reduce_named(&self, name: &str, logic: L) -> Collection where L: FnMut(&K, &[(&V, R)], &mut Vec<(V2, R2)>)+'static { - self.reduce_abelian::<_,V2,_,ValSpine<_,_,_,_>>(name, |val| val.clone(), logic) + self.reduce_abelian::<_,K,V2,_,ValSpine<_,_,_,_>>(name, |val| val.clone(), logic) .as_collection(|k,v| (k.clone(), v.clone())) } } @@ -162,10 +165,11 @@ where G::Timestamp: Lattice+Ord { impl Threshold for Arranged where G: Scope, - T1: for<'a> TraceReader=&'a K, KeyOwned=K, Val<'a>=&'a (), Diff=R1>+Clone+'static, + T1: for<'a> TraceReader=&'a K, Val<'a>=&'a (), Diff=R1>+Clone+'static, + for<'a> T1::Key<'a>: IntoOwned<'a, Owned = K>, { fn threshold_namedR2+'static>(&self, name: &str, mut thresh: F) -> Collection { - self.reduce_abelian::<_,(),_,KeySpine<_,_,_>>(name, |&()| (), move |k,s,t| t.push(((), thresh(k, &s[0].1)))) + self.reduce_abelian::<_,K,(),_,KeySpine<_,_,_>>(name, |&()| (), move |k,s,t| t.push(((), thresh(k, &s[0].1)))) .as_collection(|k,_| k.clone()) } } @@ -212,10 +216,11 @@ where impl Count for Arranged where G: Scope, - T1: for<'a> TraceReader=&'a K, KeyOwned=K, Val<'a>=&'a (), Diff=R>+Clone+'static, + T1: for<'a> TraceReader=&'a K, Val<'a>=&'a (), Diff=R>+Clone+'static, + for<'a> T1::Key<'a>: IntoOwned<'a, Owned = K>, { fn count_core>(&self) -> Collection { - self.reduce_abelian::<_,R,_,ValSpine<_,_,_,_>>("Count", |r| r.clone(), |_k,s,t| t.push((s[0].1.clone(), R2::from(1i8)))) + self.reduce_abelian::<_,K,R,_,ValSpine<_,_,_,_>>("Count", |r| r.clone(), |_k,s,t| t.push((s[0].1.clone(), R2::from(1i8)))) .as_collection(|k,c| (k.clone(), c.clone())) } } @@ -306,15 +311,17 @@ where /// A key-wise reduction of values in an input trace. /// /// This method exists to provide reduce functionality without opinions about qualifying trace types. -pub fn reduce_trace(trace: &Arranged, name: &str, from: F, mut logic: L) -> Arranged> +pub fn reduce_trace(trace: &Arranged, name: &str, from: F, mut logic: L) -> Arranged> where G: Scope, T1: TraceReader + Clone + 'static, + for<'a> T1::Key<'a> : IntoOwned<'a, Owned = K>, T2: for<'a> Trace=T1::Key<'a>, Time=T1::Time> + 'static, + K: Ord + 'static, V: Data, F: Fn(T2::Val<'_>) -> V + 'static, T2::Batch: Batch, - ::Input: Container + PushInto<((T1::KeyOwned, V), T2::Time, T2::Diff)>, + ::Input: Container + PushInto<((K, V), T2::Time, T2::Diff)>, L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(V,T2::Diff)>, &mut Vec<(V, T2::Diff)>)+'static, { let mut result_trace = None; @@ -352,7 +359,7 @@ where // Our implementation maintains a list of outstanding `(key, time)` synthetic interesting times, // as well as capabilities for these times (or their lower envelope, at least). - let mut interesting = Vec::<(T1::KeyOwned, G::Timestamp)>::new(); + let mut interesting = Vec::<(K, G::Timestamp)>::new(); let mut capabilities = Vec::>::new(); // buffers and logic for computing per-key interesting times "efficiently". diff --git a/src/trace/cursor/mod.rs b/src/trace/cursor/mod.rs index 2631c2aea..0ba923be8 100644 --- a/src/trace/cursor/mod.rs +++ b/src/trace/cursor/mod.rs @@ -99,8 +99,9 @@ pub trait Cursor { fn rewind_vals(&mut self, storage: &Self::Storage); /// Rewinds the cursor and outputs its contents to a Vec - fn to_vec(&mut self, from: F, storage: &Self::Storage) -> Vec<((Self::KeyOwned, V), Vec<(Self::Time, Self::Diff)>)> + fn to_vec(&mut self, from: F, storage: &Self::Storage) -> Vec<((K, V), Vec<(Self::Time, Self::Diff)>)> where + for<'a> Self::Key<'a> : IntoOwned<'a, Owned = K>, F: Fn(Self::Val<'_>) -> V, { let mut out = Vec::new(); From d14989969ed522fb0cd12abf93971350eadab01d Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Sat, 25 May 2024 18:54:41 -0400 Subject: [PATCH 2/5] Remove definitions of KeyOwned --- src/operators/arrange/agent.rs | 1 - src/trace/cursor/cursor_list.rs | 1 - src/trace/cursor/mod.rs | 4 +--- src/trace/implementations/ord_neu.rs | 4 ---- src/trace/implementations/rhh.rs | 2 -- src/trace/implementations/spine_fueled.rs | 1 - src/trace/mod.rs | 18 +++++------------- src/trace/wrappers/enter.rs | 4 ---- src/trace/wrappers/enter_at.rs | 4 ---- src/trace/wrappers/filter.rs | 4 ---- src/trace/wrappers/freeze.rs | 4 ---- src/trace/wrappers/frontier.rs | 4 ---- src/trace/wrappers/rc.rs | 1 - 13 files changed, 6 insertions(+), 46 deletions(-) diff --git a/src/operators/arrange/agent.rs b/src/operators/arrange/agent.rs index 012882e2b..670769e65 100644 --- a/src/operators/arrange/agent.rs +++ b/src/operators/arrange/agent.rs @@ -44,7 +44,6 @@ where Tr: TraceReader, { type Key<'a> = Tr::Key<'a>; - type KeyOwned = Tr::KeyOwned; type Val<'a> = Tr::Val<'a>; type Time = Tr::Time; type Diff = Tr::Diff; diff --git a/src/trace/cursor/cursor_list.rs b/src/trace/cursor/cursor_list.rs index 98bf6a808..f1b414590 100644 --- a/src/trace/cursor/cursor_list.rs +++ b/src/trace/cursor/cursor_list.rs @@ -86,7 +86,6 @@ impl CursorList { impl Cursor for CursorList { type Key<'a> = C::Key<'a>; - type KeyOwned = C::KeyOwned; type Val<'a> = C::Val<'a>; type Time = C::Time; type Diff = C::Diff; diff --git a/src/trace/cursor/mod.rs b/src/trace/cursor/mod.rs index 0ba923be8..ad78c716f 100644 --- a/src/trace/cursor/mod.rs +++ b/src/trace/cursor/mod.rs @@ -43,9 +43,7 @@ impl<'a, T: ToOwned+?Sized> IntoOwned<'a> for &'a T { pub trait Cursor { /// Key by which updates are indexed. - type Key<'a>: Copy + Clone + Ord + IntoOwned<'a, Owned = Self::KeyOwned>; - /// Owned version of the above. - type KeyOwned: Ord + Clone; + type Key<'a>: Copy + Clone + Ord + IntoOwned<'a>; /// Values associated with keys. type Val<'a>: Copy + Clone + Ord + IntoOwned<'a> + for<'b> PartialOrd>; /// Timestamps associated with updates diff --git a/src/trace/implementations/ord_neu.rs b/src/trace/implementations/ord_neu.rs index d519d0d63..01b3d7c8c 100644 --- a/src/trace/implementations/ord_neu.rs +++ b/src/trace/implementations/ord_neu.rs @@ -141,7 +141,6 @@ mod val_batch { impl BatchReader for OrdValBatch { type Key<'a> = ::ReadItem<'a>; - type KeyOwned = ::Key; type Val<'a> = ::ReadItem<'a>; type Time = ::Time; type Diff = ::Diff; @@ -447,7 +446,6 @@ mod val_batch { impl Cursor for OrdValCursor { type Key<'a> = ::ReadItem<'a>; - type KeyOwned = ::Key; type Val<'a> = ::ReadItem<'a>; type Time = ::Time; type Diff = ::Diff; @@ -680,7 +678,6 @@ mod key_batch { impl BatchReader for OrdKeyBatch { type Key<'a> = ::ReadItem<'a>; - type KeyOwned = ::Key; type Val<'a> = &'a (); type Time = ::Time; type Diff = ::Diff; @@ -897,7 +894,6 @@ mod key_batch { impl Cursor for OrdKeyCursor { type Key<'a> = ::ReadItem<'a>; - type KeyOwned = ::Key; type Val<'a> = &'a (); type Time = ::Time; type Diff = ::Diff; diff --git a/src/trace/implementations/rhh.rs b/src/trace/implementations/rhh.rs index b7e6c217e..e5279cb8b 100644 --- a/src/trace/implementations/rhh.rs +++ b/src/trace/implementations/rhh.rs @@ -269,7 +269,6 @@ mod val_batch { for<'a> ::ReadItem<'a>: HashOrdered, { type Key<'a> = ::ReadItem<'a>; - type KeyOwned = ::Key; type Val<'a> = ::ReadItem<'a>; type Time = ::Time; type Diff = ::Diff; @@ -615,7 +614,6 @@ mod val_batch { for<'a> ::ReadItem<'a>: HashOrdered, { type Key<'a> = ::ReadItem<'a>; - type KeyOwned = ::Key; type Val<'a> = ::ReadItem<'a>; type Time = ::Time; type Diff = ::Diff; diff --git a/src/trace/implementations/spine_fueled.rs b/src/trace/implementations/spine_fueled.rs index faea1dfdb..215fe791d 100644 --- a/src/trace/implementations/spine_fueled.rs +++ b/src/trace/implementations/spine_fueled.rs @@ -110,7 +110,6 @@ where B: Batch+Clone+'static, { type Key<'a> = B::Key<'a>; - type KeyOwned = B::KeyOwned; type Val<'a> = B::Val<'a>; type Time = B::Time; type Diff = B::Diff; diff --git a/src/trace/mod.rs b/src/trace/mod.rs index f73730a23..8fb1075b2 100644 --- a/src/trace/mod.rs +++ b/src/trace/mod.rs @@ -52,9 +52,7 @@ pub type ExertionLogic = std::sync::Arc Fn(&'a [(usize, usize, usize pub trait TraceReader { /// Key by which updates are indexed. - type Key<'a>: Copy + Clone + Ord + IntoOwned<'a, Owned = Self::KeyOwned>; - /// Owned version of the above. - type KeyOwned: Ord + Clone; + type Key<'a>: Copy + Clone + Ord + IntoOwned<'a>; /// Values associated with keys. type Val<'a>: Copy + Clone + IntoOwned<'a>; /// Timestamps associated with updates @@ -63,13 +61,13 @@ pub trait TraceReader { type Diff: Semigroup; /// The type of an immutable collection of updates. - type Batch: for<'a> BatchReader = Self::Key<'a>, KeyOwned = Self::KeyOwned, Val<'a> = Self::Val<'a>, Time = Self::Time, Diff = Self::Diff>+Clone+'static; + type Batch: for<'a> BatchReader = Self::Key<'a>, Val<'a> = Self::Val<'a>, Time = Self::Time, Diff = Self::Diff>+Clone+'static; /// Storage type for `Self::Cursor`. Likely related to `Self::Batch`. type Storage; /// The type used to enumerate the collections contents. - type Cursor: for<'a> Cursor = Self::Key<'a>, KeyOwned = Self::KeyOwned, Val<'a> = Self::Val<'a>, Time = Self::Time, Diff = Self::Diff>; + type Cursor: for<'a> Cursor = Self::Key<'a>, Val<'a> = Self::Val<'a>, Time = Self::Time, Diff = Self::Diff>; /// Provides a cursor over updates contained in the trace. fn cursor(&mut self) -> (Self::Cursor, Self::Storage) { @@ -258,9 +256,7 @@ where Self: ::std::marker::Sized, { /// Key by which updates are indexed. - type Key<'a>: Copy + Clone + Ord + IntoOwned<'a, Owned = Self::KeyOwned>; - /// Owned version of the above. - type KeyOwned: Ord + Clone; + type Key<'a>: Copy + Clone + Ord + IntoOwned<'a>; /// Values associated with keys. type Val<'a>: Copy + Clone + IntoOwned<'a>; /// Timestamps associated with updates @@ -269,7 +265,7 @@ where type Diff: Semigroup; /// The type used to enumerate the batch's contents. - type Cursor: for<'a> Cursor = Self::Key<'a>, KeyOwned = Self::KeyOwned, Val<'a> = Self::Val<'a>, Time = Self::Time, Diff = Self::Diff>; + type Cursor: for<'a> Cursor = Self::Key<'a>, Val<'a> = Self::Val<'a>, Time = Self::Time, Diff = Self::Diff>; /// Acquires a cursor to the batch's contents. fn cursor(&self) -> Self::Cursor; /// The number of updates in the batch. @@ -373,7 +369,6 @@ pub mod rc_blanket_impls { impl BatchReader for Rc { type Key<'a> = B::Key<'a>; - type KeyOwned = B::KeyOwned; type Val<'a> = B::Val<'a>; type Time = B::Time; type Diff = B::Diff; @@ -407,7 +402,6 @@ pub mod rc_blanket_impls { impl Cursor for RcBatchCursor { type Key<'a> = C::Key<'a>; - type KeyOwned = C::KeyOwned; type Val<'a> = C::Val<'a>; type Time = C::Time; type Diff = C::Diff; @@ -476,7 +470,6 @@ pub mod abomonated_blanket_impls { impl BatchReader for Abomonated> { type Key<'a> = B::Key<'a>; - type KeyOwned = B::KeyOwned; type Val<'a> = B::Val<'a>; type Time = B::Time; type Diff = B::Diff; @@ -510,7 +503,6 @@ pub mod abomonated_blanket_impls { impl Cursor for AbomonatedBatchCursor where C::Storage: Abomonation { type Key<'a> = C::Key<'a>; - type KeyOwned = C::KeyOwned; type Val<'a> = C::Val<'a>; type Time = C::Time; type Diff = C::Diff; diff --git a/src/trace/wrappers/enter.rs b/src/trace/wrappers/enter.rs index ce59e955e..64a21af99 100644 --- a/src/trace/wrappers/enter.rs +++ b/src/trace/wrappers/enter.rs @@ -32,7 +32,6 @@ where TInner: Refines+Lattice, { type Key<'a> = Tr::Key<'a>; - type KeyOwned = Tr::KeyOwned; type Val<'a> = Tr::Val<'a>; type Time = TInner; type Diff = Tr::Diff; @@ -115,7 +114,6 @@ where TInner: Refines+Lattice, { type Key<'a> = B::Key<'a>; - type KeyOwned = B::KeyOwned; type Val<'a> = B::Val<'a>; type Time = TInner; type Diff = B::Diff; @@ -168,7 +166,6 @@ where TInner: Refines+Lattice, { type Key<'a> = C::Key<'a>; - type KeyOwned = C::KeyOwned; type Val<'a> = C::Val<'a>; type Time = TInner; type Diff = C::Diff; @@ -220,7 +217,6 @@ where TInner: Refines+Lattice, { type Key<'a> = C::Key<'a>; - type KeyOwned = C::KeyOwned; type Val<'a> = C::Val<'a>; type Time = TInner; type Diff = C::Diff; diff --git a/src/trace/wrappers/enter_at.rs b/src/trace/wrappers/enter_at.rs index 7e157c409..f5a1b3990 100644 --- a/src/trace/wrappers/enter_at.rs +++ b/src/trace/wrappers/enter_at.rs @@ -50,7 +50,6 @@ where G: FnMut(&TInner)->Tr::Time+Clone+'static, { type Key<'a> = Tr::Key<'a>; - type KeyOwned = Tr::KeyOwned; type Val<'a> = Tr::Val<'a>; type Time = TInner; type Diff = Tr::Diff; @@ -138,7 +137,6 @@ where F: FnMut(B::Key<'_>, ::Val<'_>, &B::Time)->TInner+Clone, { type Key<'a> = B::Key<'a>; - type KeyOwned = B::KeyOwned; type Val<'a> = B::Val<'a>; type Time = TInner; type Diff = B::Diff; @@ -195,7 +193,6 @@ where F: FnMut(C::Key<'_>, C::Val<'_>, &C::Time)->TInner, { type Key<'a> = C::Key<'a>; - type KeyOwned = C::KeyOwned; type Val<'a> = C::Val<'a>; type Time = TInner; type Diff = C::Diff; @@ -253,7 +250,6 @@ where F: FnMut(C::Key<'_>, C::Val<'_>, &C::Time)->TInner, { type Key<'a> = C::Key<'a>; - type KeyOwned = C::KeyOwned; type Val<'a> = C::Val<'a>; type Time = TInner; type Diff = C::Diff; diff --git a/src/trace/wrappers/filter.rs b/src/trace/wrappers/filter.rs index b4f320700..dbac502e9 100644 --- a/src/trace/wrappers/filter.rs +++ b/src/trace/wrappers/filter.rs @@ -31,7 +31,6 @@ where F: FnMut(Tr::Key<'_>, Tr::Val<'_>)->bool+Clone+'static, { type Key<'a> = Tr::Key<'a>; - type KeyOwned = Tr::KeyOwned; type Val<'a> = Tr::Val<'a>; type Time = Tr::Time; type Diff = Tr::Diff; @@ -84,7 +83,6 @@ where F: FnMut(B::Key<'_>, B::Val<'_>)->bool+Clone+'static { type Key<'a> = B::Key<'a>; - type KeyOwned = B::KeyOwned; type Val<'a> = B::Val<'a>; type Time = B::Time; type Diff = B::Diff; @@ -132,7 +130,6 @@ where F: FnMut(C::Key<'_>, C::Val<'_>)->bool+'static { type Key<'a> = C::Key<'a>; - type KeyOwned = C::KeyOwned; type Val<'a> = C::Val<'a>; type Time = C::Time; type Diff = C::Diff; @@ -186,7 +183,6 @@ where F: FnMut(C::Key<'_>, C::Val<'_>)->bool+'static, { type Key<'a> = C::Key<'a>; - type KeyOwned = C::KeyOwned; type Val<'a> = C::Val<'a>; type Time = C::Time; type Diff = C::Diff; diff --git a/src/trace/wrappers/freeze.rs b/src/trace/wrappers/freeze.rs index 310ef3157..07006d5f9 100644 --- a/src/trace/wrappers/freeze.rs +++ b/src/trace/wrappers/freeze.rs @@ -76,7 +76,6 @@ where F: Fn(&Tr::Time)->Option+'static, { type Key<'a> = Tr::Key<'a>; - type KeyOwned = Tr::KeyOwned; type Val<'a> = Tr::Val<'a>; type Time = Tr::Time; type Diff = Tr::Diff; @@ -139,7 +138,6 @@ where F: Fn(&B::Time)->Option, { type Key<'a> = B::Key<'a>; - type KeyOwned = B::KeyOwned; type Val<'a> = B::Val<'a>; type Time = B::Time; type Diff = B::Diff; @@ -182,7 +180,6 @@ where F: Fn(&C::Time)->Option, { type Key<'a> = C::Key<'a>; - type KeyOwned = C::KeyOwned; type Val<'a> = C::Val<'a>; type Time = C::Time; type Diff = C::Diff; @@ -233,7 +230,6 @@ where F: Fn(&C::Time)->Option, { type Key<'a> = C::Key<'a>; - type KeyOwned = C::KeyOwned; type Val<'a> = C::Val<'a>; type Time = C::Time; type Diff = C::Diff; diff --git a/src/trace/wrappers/frontier.rs b/src/trace/wrappers/frontier.rs index 02cce0dd1..727e8ca1c 100644 --- a/src/trace/wrappers/frontier.rs +++ b/src/trace/wrappers/frontier.rs @@ -33,7 +33,6 @@ impl Clone for TraceFrontier { impl TraceReader for TraceFrontier { type Key<'a> = Tr::Key<'a>; - type KeyOwned = Tr::KeyOwned; type Val<'a> = Tr::Val<'a>; type Time = Tr::Time; type Diff = Tr::Diff; @@ -83,7 +82,6 @@ pub struct BatchFrontier { impl BatchReader for BatchFrontier { type Key<'a> = B::Key<'a>; - type KeyOwned = B::KeyOwned; type Val<'a> = B::Val<'a>; type Time = B::Time; type Diff = B::Diff; @@ -127,7 +125,6 @@ impl CursorFrontier where T: Clone { impl Cursor for CursorFrontier { type Key<'a> = C::Key<'a>; - type KeyOwned = C::KeyOwned; type Val<'a> = C::Val<'a>; type Time = C::Time; type Diff = C::Diff; @@ -188,7 +185,6 @@ where C::Storage: BatchReader, { type Key<'a> = C::Key<'a>; - type KeyOwned = C::KeyOwned; type Val<'a> = C::Val<'a>; type Time = C::Time; type Diff = C::Diff; diff --git a/src/trace/wrappers/rc.rs b/src/trace/wrappers/rc.rs index 29d91d969..d94dcf264 100644 --- a/src/trace/wrappers/rc.rs +++ b/src/trace/wrappers/rc.rs @@ -80,7 +80,6 @@ pub struct TraceRc { impl TraceReader for TraceRc { type Key<'a> = Tr::Key<'a>; - type KeyOwned = Tr::KeyOwned; type Val<'a> = Tr::Val<'a>; type Time = Tr::Time; type Diff = Tr::Diff; From 0f42bf3f20d40439ee7f5c111a5a6533dd9f290e Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Sat, 25 May 2024 18:58:11 -0400 Subject: [PATCH 3/5] Remove IntoOwned bounds --- src/trace/cursor/mod.rs | 4 ++-- src/trace/mod.rs | 8 ++++---- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/src/trace/cursor/mod.rs b/src/trace/cursor/mod.rs index ad78c716f..14a36f4fd 100644 --- a/src/trace/cursor/mod.rs +++ b/src/trace/cursor/mod.rs @@ -43,9 +43,9 @@ impl<'a, T: ToOwned+?Sized> IntoOwned<'a> for &'a T { pub trait Cursor { /// Key by which updates are indexed. - type Key<'a>: Copy + Clone + Ord + IntoOwned<'a>; + type Key<'a>: Copy + Clone + Ord; /// Values associated with keys. - type Val<'a>: Copy + Clone + Ord + IntoOwned<'a> + for<'b> PartialOrd>; + type Val<'a>: Copy + Clone + Ord + for<'b> PartialOrd>; /// Timestamps associated with updates type Time: Timestamp + Lattice + Ord + Clone; /// Associated update. diff --git a/src/trace/mod.rs b/src/trace/mod.rs index 8fb1075b2..8980c7397 100644 --- a/src/trace/mod.rs +++ b/src/trace/mod.rs @@ -52,9 +52,9 @@ pub type ExertionLogic = std::sync::Arc Fn(&'a [(usize, usize, usize pub trait TraceReader { /// Key by which updates are indexed. - type Key<'a>: Copy + Clone + Ord + IntoOwned<'a>; + type Key<'a>: Copy + Clone + Ord; /// Values associated with keys. - type Val<'a>: Copy + Clone + IntoOwned<'a>; + type Val<'a>: Copy + Clone; /// Timestamps associated with updates type Time: Timestamp + Lattice + Ord + Clone; /// Associated update. @@ -256,9 +256,9 @@ where Self: ::std::marker::Sized, { /// Key by which updates are indexed. - type Key<'a>: Copy + Clone + Ord + IntoOwned<'a>; + type Key<'a>: Copy + Clone + Ord; /// Values associated with keys. - type Val<'a>: Copy + Clone + IntoOwned<'a>; + type Val<'a>: Copy + Clone; /// Timestamps associated with updates type Time: Timestamp + Lattice + Ord + Clone; /// Associated update. From 2fbb39ba252add4ab165d0608a39938f1ef5db23 Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Sat, 25 May 2024 19:22:08 -0400 Subject: [PATCH 4/5] Pivot Val<'_> -> V into IntoOwned --- dogsdogsdogs/examples/delta_query.rs | 6 ++-- dogsdogsdogs/src/lib.rs | 2 +- dogsdogsdogs/src/operators/propose.rs | 14 ++++---- examples/cursors.rs | 2 +- examples/monoid-bfs.rs | 2 +- examples/spines.rs | 4 +-- src/algorithms/graphs/propagate.rs | 2 +- src/operators/arrange/arrangement.rs | 12 +++---- src/operators/arrange/upsert.rs | 9 +++--- src/operators/reduce.rs | 46 +++++++++++++-------------- src/trace/cursor/mod.rs | 6 ++-- tests/trace.rs | 8 ++--- 12 files changed, 54 insertions(+), 59 deletions(-) diff --git a/dogsdogsdogs/examples/delta_query.rs b/dogsdogsdogs/examples/delta_query.rs index dab7cc655..2f2a9ba10 100644 --- a/dogsdogsdogs/examples/delta_query.rs +++ b/dogsdogsdogs/examples/delta_query.rs @@ -79,17 +79,17 @@ fn main() { // Prior technology // dQ/dE1 := dE1(a,b), E2(b,c), E3(a,c) - let changes1 = propose(&changes, forward_key_neu.clone(), key2.clone(), Clone::clone); + let changes1 = propose(&changes, forward_key_neu.clone(), key2.clone()); let changes1 = validate(&changes1, forward_self_neu.clone(), key1.clone()); let changes1 = changes1.map(|((a,b),c)| (a,b,c)); // dQ/dE2 := dE2(b,c), E1(a,b), E3(a,c) - let changes2 = propose(&changes, reverse_key_alt.clone(), key1.clone(), Clone::clone); + let changes2 = propose(&changes, reverse_key_alt.clone(), key1.clone()); let changes2 = validate(&changes2, reverse_self_neu.clone(), key2.clone()); let changes2 = changes2.map(|((b,c),a)| (a,b,c)); // dQ/dE3 := dE3(a,c), E1(a,b), E2(b,c) - let changes3 = propose(&changes, forward_key_alt.clone(), key1.clone(), Clone::clone); + let changes3 = propose(&changes, forward_key_alt.clone(), key1.clone()); let changes3 = validate(&changes3, reverse_self_alt.clone(), key2.clone()); let changes3 = changes3.map(|((a,c),b)| (a,b,c)); diff --git a/dogsdogsdogs/src/lib.rs b/dogsdogsdogs/src/lib.rs index 1d90feb51..262e09609 100644 --- a/dogsdogsdogs/src/lib.rs +++ b/dogsdogsdogs/src/lib.rs @@ -199,7 +199,7 @@ where fn propose(&mut self, prefixes: &Collection) -> Collection { let propose = self.indices.propose_trace.import(&prefixes.scope()); - operators::propose::propose(prefixes, propose, self.key_selector.clone(), |x| x.clone()) + operators::propose::propose(prefixes, propose, self.key_selector.clone()) } fn validate(&mut self, extensions: &Collection) -> Collection { diff --git a/dogsdogsdogs/src/operators/propose.rs b/dogsdogsdogs/src/operators/propose.rs index cd0bc4cd0..a7cee8b52 100644 --- a/dogsdogsdogs/src/operators/propose.rs +++ b/dogsdogsdogs/src/operators/propose.rs @@ -14,11 +14,10 @@ use differential_dataflow::trace::cursor::IntoOwned; /// create a join if the `prefixes` collection is also arranged and responds to changes that /// `arrangement` undergoes. More complicated patterns are also appropriate, as in the case /// of delta queries. -pub fn propose( +pub fn propose( prefixes: &Collection, arrangement: Arranged, key_selector: F, - val_from: VF, ) -> Collection where G: Scope, @@ -29,13 +28,13 @@ where F: Fn(&P)->K+Clone+'static, P: ExchangeData, V: Clone + 'static, - VF: Fn(Tr::Val<'_>) -> V + 'static, + for<'a> Tr::Val<'a> : IntoOwned<'a, Owned = V>, { crate::operators::lookup_map( prefixes, arrangement, move |p: &P, k: &mut K | { *k = key_selector(p); }, - move |prefix, diff, value, sum| ((prefix.clone(), val_from(value)), diff.clone().multiply(sum)), + move |prefix, diff, value, sum| ((prefix.clone(), value.into_owned()), diff.clone().multiply(sum)), Default::default(), Default::default(), Default::default(), @@ -47,11 +46,10 @@ where /// Unlike `propose`, this method does not scale the multiplicity of matched /// prefixes by the number of matches in `arrangement`. This can be useful to /// avoid the need to prepare an arrangement of distinct extensions. -pub fn propose_distinct( +pub fn propose_distinct( prefixes: &Collection, arrangement: Arranged, key_selector: F, - val_from: VF, ) -> Collection where G: Scope, @@ -62,13 +60,13 @@ where F: Fn(&P)->K+Clone+'static, P: ExchangeData, V: Clone + 'static, - VF: Fn(Tr::Val<'_>) -> V + 'static, + for<'a> Tr::Val<'a> : IntoOwned<'a, Owned = V>, { crate::operators::lookup_map( prefixes, arrangement, move |p: &P, k: &mut K| { *k = key_selector(p); }, - move |prefix, diff, value, _sum| ((prefix.clone(), val_from(value)), diff.clone()), + move |prefix, diff, value, _sum| ((prefix.clone(), value.into_owned()), diff.clone()), Default::default(), Default::default(), Default::default(), diff --git a/examples/cursors.rs b/examples/cursors.rs index e4da79e47..1f91f615d 100644 --- a/examples/cursors.rs +++ b/examples/cursors.rs @@ -95,7 +95,7 @@ fn main() { /* Return trace content after the last round. */ let (mut cursor, storage) = graph_trace.cursor(); - cursor.to_vec(Clone::clone, &storage) + cursor.to_vec(&storage) }) .unwrap().join(); diff --git a/examples/monoid-bfs.rs b/examples/monoid-bfs.rs index 47563a065..f9835cef7 100644 --- a/examples/monoid-bfs.rs +++ b/examples/monoid-bfs.rs @@ -145,7 +145,7 @@ where G::Timestamp: Lattice+Ord { .join_map(&edges, |_k,&(),d| *d) .concat(&roots) .map(|x| (x,())) - .reduce_core::<_,_,KeySpine<_,_,_>>("Reduce", Clone::clone, |_key, input, output, updates| { + .reduce_core::<_,KeySpine<_,_,_>>("Reduce", |_key, input, output, updates| { if output.is_empty() || input[0].1 < output[0].1 { updates.push(((), input[0].1)); } diff --git a/examples/spines.rs b/examples/spines.rs index 947c7b26d..7151dc266 100644 --- a/examples/spines.rs +++ b/examples/spines.rs @@ -55,11 +55,11 @@ fn main() { let data = data.map(|x| (x.clone().into_bytes(), x.into_bytes())) .arrange::>() - .reduce_abelian::<_, _, _, _, PreferredSpine<[u8],(),_,_>>("distinct", |_| (), |_,_,output| output.push(((), 1))); + .reduce_abelian::<_, _, _, PreferredSpine<[u8],(),_,_>>("distinct", |_,_,output| output.push(((), 1))); let keys = keys.map(|x| (x.clone().into_bytes(), 7)) .arrange::>() - .reduce_abelian::<_, _, _, _, PreferredSpine<[u8],(),_,_>>("distinct", |_| (), |_,_,output| output.push(((), 1))); + .reduce_abelian::<_, _, _, PreferredSpine<[u8],(),_,_>>("distinct", |_,_,output| output.push(((), 1))); keys.join_core(&data, |_k, &(), &()| Option::<()>::None) .probe_with(&mut probe); diff --git a/src/algorithms/graphs/propagate.rs b/src/algorithms/graphs/propagate.rs index b73ff598b..9aef8e08b 100644 --- a/src/algorithms/graphs/propagate.rs +++ b/src/algorithms/graphs/propagate.rs @@ -96,7 +96,7 @@ where let labels = proposals .concat(&nodes) - .reduce_abelian::<_,_,ValSpine<_,_,_,_>>("Propagate", |v| v.clone(), |_, s, t| t.push((s[0].0.clone(), R::from(1_i8)))); + .reduce_abelian::<_,ValSpine<_,_,_,_>>("Propagate", |_, s, t| t.push((s[0].0.clone(), R::from(1_i8)))); let propagate: Collection<_, (N, L), R> = labels diff --git a/src/operators/arrange/arrangement.rs b/src/operators/arrange/arrangement.rs index 10e7becca..83231d58c 100644 --- a/src/operators/arrange/arrangement.rs +++ b/src/operators/arrange/arrangement.rs @@ -289,19 +289,19 @@ where T1: TraceReader + Clone + 'static, { /// A direct implementation of `ReduceCore::reduce_abelian`. - pub fn reduce_abelian(&self, name: &str, from: F, mut logic: L) -> Arranged> + pub fn reduce_abelian(&self, name: &str, mut logic: L) -> Arranged> where for<'a> T1::Key<'a>: IntoOwned<'a, Owned = K>, T2: for<'a> Trace= T1::Key<'a>, Time=T1::Time>+'static, K: Ord + 'static, V: Data, - F: Fn(T2::Val<'_>) -> V + 'static, + for<'a> T2::Val<'a> : IntoOwned<'a, Owned = V>, T2::Diff: Abelian, T2::Batch: Batch, ::Input: Container + PushInto<((K, V), T2::Time, T2::Diff)>, L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(V, T2::Diff)>)+'static, { - self.reduce_core::<_,K,V,F,T2>(name, from, move |key, input, output, change| { + self.reduce_core::<_,K,V,T2>(name, move |key, input, output, change| { if !input.is_empty() { logic(key, input, change); } @@ -311,19 +311,19 @@ where } /// A direct implementation of `ReduceCore::reduce_core`. - pub fn reduce_core(&self, name: &str, from: F, logic: L) -> Arranged> + pub fn reduce_core(&self, name: &str, logic: L) -> Arranged> where for<'a> T1::Key<'a>: IntoOwned<'a, Owned = K>, T2: for<'a> Trace=T1::Key<'a>, Time=T1::Time>+'static, K: Ord + 'static, V: Data, - F: Fn(T2::Val<'_>) -> V + 'static, + for<'a> T2::Val<'a> : IntoOwned<'a, Owned = V>, T2::Batch: Batch, ::Input: Container + PushInto<((K, V), T2::Time, T2::Diff)>, L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(V, T2::Diff)>, &mut Vec<(V, T2::Diff)>)+'static, { use crate::operators::reduce::reduce_trace; - reduce_trace::<_,_,_,_,V,_,_>(self, name, from, logic) + reduce_trace::<_,_,_,_,V,_>(self, name, logic) } } diff --git a/src/operators/arrange/upsert.rs b/src/operators/arrange/upsert.rs index 69f2317e6..3ffc1d6c7 100644 --- a/src/operators/arrange/upsert.rs +++ b/src/operators/arrange/upsert.rs @@ -59,7 +59,7 @@ //! use differential_dataflow::operators::arrange::upsert; //! //! let stream = scope.input_from(&mut input); -//! let arranged = upsert::arrange_from_upsert::<_, _, _, _, ValSpine>(&stream, &"test", |v| v.clone()); +//! let arranged = upsert::arrange_from_upsert::<_, _, _, ValSpine>(&stream, &"test"); //! //! arranged //! .as_collection(|k,v| (k.clone(), v.clone())) @@ -126,10 +126,9 @@ use super::TraceAgent; /// This method is only implemented for totally ordered times, as we do not yet /// understand what a "sequence" of upserts would mean for partially ordered /// timestamps. -pub fn arrange_from_upsert( +pub fn arrange_from_upsert( stream: &Stream, G::Timestamp)>, name: &str, - from: F, ) -> Arranged> where G: Scope, @@ -137,7 +136,7 @@ where for<'a> Tr::Key<'a> : IntoOwned<'a, Owned = K>, K: ExchangeData+Hashable+std::hash::Hash, V: ExchangeData, - F: Fn(Tr::Val<'_>) -> V + 'static, + for<'a> Tr::Val<'a> : IntoOwned<'a, Owned = V>, Tr::Time: TotalOrder+ExchangeData, Tr::Batch: Batch, Tr::Builder: Builder>, @@ -261,7 +260,7 @@ where assert!(count == 0 || count == 1); if count == 1 { assert!(prev_value.is_none()); - prev_value = Some(from(val)); + prev_value = Some(val.into_owned()); } trace_cursor.step_val(&trace_storage); } diff --git a/src/operators/reduce.rs b/src/operators/reduce.rs index ebec54bf6..34e8fef96 100644 --- a/src/operators/reduce.rs +++ b/src/operators/reduce.rs @@ -88,11 +88,12 @@ impl Reduce for Arranged, T1: for<'a> TraceReader=&'a K, Val<'a>=&'a V, Diff=R>+Clone+'static, - for<'a> T1::Key<'a>: IntoOwned<'a, Owned = K>, + for<'a> T1::Key<'a> : IntoOwned<'a, Owned = K>, + for<'a> T1::Val<'a> : IntoOwned<'a, Owned = V>, { fn reduce_named(&self, name: &str, logic: L) -> Collection where L: FnMut(&K, &[(&V, R)], &mut Vec<(V2, R2)>)+'static { - self.reduce_abelian::<_,K,V2,_,ValSpine<_,_,_,_>>(name, |val| val.clone(), logic) + self.reduce_abelian::<_,K,V2,ValSpine<_,_,_,_>>(name, logic) .as_collection(|k,v| (k.clone(), v.clone())) } } @@ -169,7 +170,7 @@ where for<'a> T1::Key<'a>: IntoOwned<'a, Owned = K>, { fn threshold_namedR2+'static>(&self, name: &str, mut thresh: F) -> Collection { - self.reduce_abelian::<_,K,(),_,KeySpine<_,_,_>>(name, |&()| (), move |k,s,t| t.push(((), thresh(k, &s[0].1)))) + self.reduce_abelian::<_,K,(),KeySpine<_,_,_>>(name, move |k,s,t| t.push(((), thresh(k, &s[0].1)))) .as_collection(|k,_| k.clone()) } } @@ -220,7 +221,7 @@ where for<'a> T1::Key<'a>: IntoOwned<'a, Owned = K>, { fn count_core>(&self) -> Collection { - self.reduce_abelian::<_,K,R,_,ValSpine<_,_,_,_>>("Count", |r| r.clone(), |_k,s,t| t.push((s[0].1.clone(), R2::from(1i8)))) + self.reduce_abelian::<_,K,R,ValSpine<_,R,_,_>>("Count", |_k,s,t| t.push((s[0].1.clone(), R2::from(1i8)))) .as_collection(|k,c| (k.clone(), c.clone())) } } @@ -245,24 +246,23 @@ pub trait ReduceCore where /// let trace = /// scope.new_collection_from(1 .. 10u32).1 /// .map(|x| (x, x)) - /// .reduce_abelian::<_,_,ValSpine<_,_,_,_>>( + /// .reduce_abelian::<_,ValSpine<_,_,_,_>>( /// "Example", - /// Clone::clone, /// move |_key, src, dst| dst.push((*src[0].0, 1)) /// ) /// .trace; /// }); /// ``` - fn reduce_abelian(&self, name: &str, from: F, mut logic: L) -> Arranged> + fn reduce_abelian(&self, name: &str, mut logic: L) -> Arranged> where T2: for<'a> Trace= &'a K, Time=G::Timestamp>+'static, - F: Fn(T2::Val<'_>) -> V + 'static, + for<'a> T2::Val<'a> : IntoOwned<'a, Owned = V>, T2::Diff: Abelian, T2::Batch: Batch, T2::Builder: Builder>, L: FnMut(&K, &[(&V, R)], &mut Vec<(V, T2::Diff)>)+'static, { - self.reduce_core::<_,_,T2>(name, from, move |key, input, output, change| { + self.reduce_core::<_,T2>(name, move |key, input, output, change| { if !input.is_empty() { logic(key, input, change); } @@ -276,10 +276,10 @@ pub trait ReduceCore where /// Unlike `reduce_arranged`, this method may be called with an empty `input`, /// and it may not be safe to index into the first element. /// At least one of the two collections will be non-empty. - fn reduce_core(&self, name: &str, from: F, logic: L) -> Arranged> + fn reduce_core(&self, name: &str, logic: L) -> Arranged> where T2: for<'a> Trace=&'a K, Time=G::Timestamp>+'static, - F: Fn(T2::Val<'_>) -> V + 'static, + for<'a> T2::Val<'a> : IntoOwned<'a, Owned = V>, T2::Batch: Batch, T2::Builder: Builder>, L: FnMut(&K, &[(&V, R)], &mut Vec<(V,T2::Diff)>, &mut Vec<(V, T2::Diff)>)+'static, @@ -294,24 +294,24 @@ where V: ExchangeData, R: ExchangeData+Semigroup, { - fn reduce_core(&self, name: &str, from: F, logic: L) -> Arranged> + fn reduce_core(&self, name: &str, logic: L) -> Arranged> where V: Data, - F: Fn(T2::Val<'_>) -> V + 'static, T2: for<'a> Trace=&'a K, Time=G::Timestamp>+'static, + for<'a> T2::Val<'a> : IntoOwned<'a, Owned = V>, T2::Batch: Batch, T2::Builder: Builder>, L: FnMut(&K, &[(&V, R)], &mut Vec<(V,T2::Diff)>, &mut Vec<(V, T2::Diff)>)+'static, { self.arrange_by_key_named(&format!("Arrange: {}", name)) - .reduce_core(name, from, logic) + .reduce_core(name, logic) } } /// A key-wise reduction of values in an input trace. /// /// This method exists to provide reduce functionality without opinions about qualifying trace types. -pub fn reduce_trace(trace: &Arranged, name: &str, from: F, mut logic: L) -> Arranged> +pub fn reduce_trace(trace: &Arranged, name: &str, mut logic: L) -> Arranged> where G: Scope, T1: TraceReader + Clone + 'static, @@ -319,7 +319,7 @@ where T2: for<'a> Trace=T1::Key<'a>, Time=T1::Time> + 'static, K: Ord + 'static, V: Data, - F: Fn(T2::Val<'_>) -> V + 'static, + for<'a> T2::Val<'a> : IntoOwned<'a, Owned = V>, T2::Batch: Batch, ::Input: Container + PushInto<((K, V), T2::Time, T2::Diff)>, L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(V,T2::Diff)>, &mut Vec<(V, T2::Diff)>)+'static, @@ -518,7 +518,6 @@ where (&mut output_cursor, output_storage), (&mut batch_cursor, batch_storage), &mut interesting_times, - &from, &mut logic, &upper_limit, &mut buffers[..], @@ -644,22 +643,21 @@ where C2: Cursor = C1::Key<'a>, Time = C1::Time>, C3: Cursor = C1::Key<'a>, Val<'a> = C1::Val<'a>, Time = C1::Time, Diff = C1::Diff>, V: Clone + Ord, + for<'b> C2::Val<'b> : IntoOwned<'b, Owned = V>, { fn new() -> Self; - fn compute( + fn compute( &mut self, key: C1::Key<'a>, source_cursor: (&mut C1, &'a C1::Storage), output_cursor: (&mut C2, &'a C2::Storage), batch_cursor: (&mut C3, &'a C3::Storage), times: &mut Vec, - from: &F, logic: &mut L, upper_limit: &Antichain, outputs: &mut [(C2::Time, Vec<(V, C2::Time, C2::Diff)>)], new_interesting: &mut Vec) -> (usize, usize) where - F: Fn(C2::Val<'_>) -> V, L: FnMut( C1::Key<'a>, &[(C1::Val<'a>, C1::Diff)], @@ -674,6 +672,7 @@ mod history_replay { use crate::lattice::Lattice; use crate::trace::Cursor; + use crate::trace::cursor::IntoOwned; use crate::operators::ValueHistory; use timely::progress::Antichain; @@ -709,6 +708,7 @@ mod history_replay { C2: Cursor = C1::Key<'a>, Time = C1::Time>, C3: Cursor = C1::Key<'a>, Val<'a> = C1::Val<'a>, Time = C1::Time, Diff = C1::Diff>, V: Clone + Ord, + for<'b> C2::Val<'b> : IntoOwned<'b, Owned = V>, { fn new() -> Self { HistoryReplayer { @@ -726,20 +726,18 @@ mod history_replay { } } #[inline(never)] - fn compute( + fn compute( &mut self, key: C1::Key<'a>, (source_cursor, source_storage): (&mut C1, &'a C1::Storage), (output_cursor, output_storage): (&mut C2, &'a C2::Storage), (batch_cursor, batch_storage): (&mut C3, &'a C3::Storage), times: &mut Vec, - from: &F, logic: &mut L, upper_limit: &Antichain, outputs: &mut [(C2::Time, Vec<(V, C2::Time, C2::Diff)>)], new_interesting: &mut Vec) -> (usize, usize) where - F: Fn(C2::Val<'_>) -> V, L: FnMut( C1::Key<'a>, &[(C1::Val<'a>, C1::Diff)], @@ -907,7 +905,7 @@ mod history_replay { meet.as_ref().map(|meet| output_replay.advance_buffer_by(meet)); for &((value, ref time), ref diff) in output_replay.buffer().iter() { if time.less_equal(&next_time) { - self.output_buffer.push((from(value), diff.clone())); + self.output_buffer.push((value.into_owned(), diff.clone())); } else { self.temporary.push(next_time.join(time)); diff --git a/src/trace/cursor/mod.rs b/src/trace/cursor/mod.rs index 14a36f4fd..8c3cc057d 100644 --- a/src/trace/cursor/mod.rs +++ b/src/trace/cursor/mod.rs @@ -97,10 +97,10 @@ pub trait Cursor { fn rewind_vals(&mut self, storage: &Self::Storage); /// Rewinds the cursor and outputs its contents to a Vec - fn to_vec(&mut self, from: F, storage: &Self::Storage) -> Vec<((K, V), Vec<(Self::Time, Self::Diff)>)> + fn to_vec(&mut self, storage: &Self::Storage) -> Vec<((K, V), Vec<(Self::Time, Self::Diff)>)> where for<'a> Self::Key<'a> : IntoOwned<'a, Owned = K>, - F: Fn(Self::Val<'_>) -> V, + for<'a> Self::Val<'a> : IntoOwned<'a, Owned = V>, { let mut out = Vec::new(); self.rewind_keys(storage); @@ -111,7 +111,7 @@ pub trait Cursor { self.map_times(storage, |ts, r| { kv_out.push((ts.clone(), r.clone())); }); - out.push(((self.key(storage).into_owned(), from(self.val(storage))), kv_out)); + out.push(((self.key(storage).into_owned(), self.val(storage).into_owned()), kv_out)); self.step_val(storage); } self.step_key(storage); diff --git a/tests/trace.rs b/tests/trace.rs index acf1db350..5a9050b93 100644 --- a/tests/trace.rs +++ b/tests/trace.rs @@ -35,11 +35,11 @@ fn test_trace() { let mut trace = get_trace(); let (mut cursor1, storage1) = trace.cursor_through(AntichainRef::new(&[1])).unwrap(); - let vec_1 = cursor1.to_vec(|v| v.clone(), &storage1); + let vec_1 = cursor1.to_vec(&storage1); assert_eq!(vec_1, vec![((1, 2), vec![(0, 1)])]); let (mut cursor2, storage2) = trace.cursor_through(AntichainRef::new(&[2])).unwrap(); - let vec_2 = cursor2.to_vec(|v| v.clone(), &storage2); + let vec_2 = cursor2.to_vec(&storage2); println!("--> {:?}", vec_2); assert_eq!(vec_2, vec![ ((1, 2), vec![(0, 1)]), @@ -47,13 +47,13 @@ fn test_trace() { ]); let (mut cursor3, storage3) = trace.cursor_through(AntichainRef::new(&[3])).unwrap(); - let vec_3 = cursor3.to_vec(|v| v.clone(), &storage3); + let vec_3 = cursor3.to_vec(&storage3); assert_eq!(vec_3, vec![ ((1, 2), vec![(0, 1)]), ((2, 3), vec![(1, 1), (2, -1)]), ]); let (mut cursor4, storage4) = trace.cursor(); - let vec_4 = cursor4.to_vec(|v| v.clone(), &storage4); + let vec_4 = cursor4.to_vec(&storage4); assert_eq!(vec_4, vec_3); } From 7f1a1ca587847ffa02d895b84fa3181daa0fe63b Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Sun, 26 May 2024 06:55:06 -0400 Subject: [PATCH 5/5] Update min CI version --- .github/workflows/test.yml | 2 +- src/operators/arrange/upsert.rs | 2 -- src/operators/consolidate.rs | 1 - 3 files changed, 1 insertion(+), 4 deletions(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index e3c9b6087..fbf40b2ca 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -15,7 +15,7 @@ jobs: - windows toolchain: - stable - - 1.72 + - 1.78 name: cargo test on ${{ matrix.os }}, rust ${{ matrix.toolchain }} runs-on: ${{ matrix.os }}-latest steps: diff --git a/src/operators/arrange/upsert.rs b/src/operators/arrange/upsert.rs index 3ffc1d6c7..c8012e770 100644 --- a/src/operators/arrange/upsert.rs +++ b/src/operators/arrange/upsert.rs @@ -245,8 +245,6 @@ where let mut builder = Tr::Builder::new(); for (key, mut list) in to_process.drain(..) { - use trace::cursor::IntoOwned; - // The prior value associated with the key. let mut prev_value: Option = None; diff --git a/src/operators/consolidate.rs b/src/operators/consolidate.rs index afe201d4a..715b96efc 100644 --- a/src/operators/consolidate.rs +++ b/src/operators/consolidate.rs @@ -60,7 +60,6 @@ where Tr::Batcher: Batcher>, { use crate::operators::arrange::arrangement::Arrange; - use crate::trace::cursor::IntoOwned; self.map(|k| (k, ())) .arrange_named::(name) .as_collection(|d, _| d.into_owned())