diff --git a/dogsdogsdogs/src/operators/half_join.rs b/dogsdogsdogs/src/operators/half_join.rs index 283284525..877b1399e 100644 --- a/dogsdogsdogs/src/operators/half_join.rs +++ b/dogsdogsdogs/src/operators/half_join.rs @@ -208,9 +208,9 @@ where // Use TOTAL ORDER to allow the release of `time`. yielded = yielded || yield_function(timer, work); if !yielded && !input2.frontier.frontier().iter().any(|t| comparison(t, initial)) { - use differential_dataflow::trace::cursor::MyTrait; - cursor.seek_key(&storage, MyTrait::borrow_as(key)); - if cursor.get_key(&storage) == Some(MyTrait::borrow_as(key)) { + use differential_dataflow::trace::cursor::IntoOwned; + cursor.seek_key(&storage, IntoOwned::borrow_as(key)); + if cursor.get_key(&storage) == Some(IntoOwned::borrow_as(key)) { while let Some(val2) = cursor.get_val(&storage) { cursor.map_times(&storage, |t, d| { if comparison(t, initial) { diff --git a/dogsdogsdogs/src/operators/lookup_map.rs b/dogsdogsdogs/src/operators/lookup_map.rs index e8ab279ff..f1369eab1 100644 --- a/dogsdogsdogs/src/operators/lookup_map.rs +++ b/dogsdogsdogs/src/operators/lookup_map.rs @@ -93,9 +93,9 @@ where for &mut (ref prefix, ref time, ref mut diff) in prefixes.iter_mut() { if !input2.frontier.less_equal(time) { logic2(prefix, &mut key1); - use differential_dataflow::trace::cursor::MyTrait; - cursor.seek_key(&storage, MyTrait::borrow_as(&key1)); - if cursor.get_key(&storage) == Some(MyTrait::borrow_as(&key1)) { + use differential_dataflow::trace::cursor::IntoOwned; + cursor.seek_key(&storage, IntoOwned::borrow_as(&key1)); + if cursor.get_key(&storage) == Some(IntoOwned::borrow_as(&key1)) { while let Some(value) = cursor.get_val(&storage) { let mut count = Tr::Diff::zero(); cursor.map_times(&storage, |t, d| { diff --git a/examples/spines.rs b/examples/spines.rs index 4ae540e5d..b0f5eddd1 100644 --- a/examples/spines.rs +++ b/examples/spines.rs @@ -41,13 +41,13 @@ fn main() { keys.join_core(&data, |_k, &(), &()| Option::<()>::None) .probe_with(&mut probe); }, - // "rhh" => { - // use differential_dataflow::trace::implementations::rhh::{HashWrapper, VecSpine}; - // let data = data.map(|x| HashWrapper { inner: x }).arrange::>(); - // let keys = keys.map(|x| HashWrapper { inner: x }).arrange::>(); - // keys.join_core(&data, |_k, &(), &()| Option::<()>::None) - // .probe_with(&mut probe); - // }, + "rhh" => { + use differential_dataflow::trace::implementations::rhh::{HashWrapper, VecSpine}; + let data = data.map(|x| HashWrapper { inner: x }).arrange::>(); + let keys = keys.map(|x| HashWrapper { inner: x }).arrange::>(); + keys.join_core(&data, |_k, &(), &()| Option::<()>::None) + .probe_with(&mut probe); + }, "slc" => { use differential_dataflow::trace::implementations::ord_neu::PreferredSpine; @@ -61,10 +61,7 @@ fn main() { .arrange::>() .reduce_abelian::<_, _, _, PreferredSpine<[u8],(),_,_>>("distinct", |_| (), |_,_,output| output.push(((), 1))); - keys.join_core(&data, |k,_v1,_v2| { - println!("{:?}", k.text); - Option::<((),isize,isize)>::None - }) + keys.join_core(&data, |_k, &(), &()| Option::<()>::None) .probe_with(&mut probe); }, _ => { @@ -122,4 +119,4 @@ fn main() { println!("{:?}\tshut down", timer2.elapsed()); -} \ No newline at end of file +} diff --git a/mdbook/src/chapter_0/chapter_0_0.md b/mdbook/src/chapter_0/chapter_0_0.md index f72ef11d3..406bb5089 100644 --- a/mdbook/src/chapter_0/chapter_0_0.md +++ b/mdbook/src/chapter_0/chapter_0_0.md @@ -25,8 +25,8 @@ version = "0.1.0" authors = ["Your Name "] [dependencies] -timely = "0.11.1" -differential-dataflow = "0.11.0" +timely = "0.12.0" +differential-dataflow = "0.12.0" ``` You should only need to add those last two lines there, which bring in dependencies on both [timely dataflow](https://github.com/TimelyDataflow/timely-dataflow) and [differential dataflow](https://github.com/TimelyDataflow/differential-dataflow). We will be using both of those. diff --git a/mdbook/src/chapter_0/chapter_0_1.md b/mdbook/src/chapter_0/chapter_0_1.md index 5e69289a5..e031fb7aa 100644 --- a/mdbook/src/chapter_0/chapter_0_1.md +++ b/mdbook/src/chapter_0/chapter_0_1.md @@ -34,8 +34,8 @@ fn main() { .inspect(|x| println!("{:?}", x)); }); - // Set an arbitrary size for our organization. - let size = 100; + // Set a size for our organization from the input. + let size = std::env::args().nth(1).and_then(|s| s.parse::().ok()).unwrap_or(10); // Load input (a binary tree). input.advance_to(0); diff --git a/mdbook/src/chapter_0/chapter_0_3.md b/mdbook/src/chapter_0/chapter_0_3.md index d47d6f395..0f16ab4f1 100644 --- a/mdbook/src/chapter_0/chapter_0_3.md +++ b/mdbook/src/chapter_0/chapter_0_3.md @@ -107,7 +107,7 @@ We can then use this probe to limit the introduction of new data, by waiting for This starts to print out a mess of data, indicating not only how long it takes to start up the computation, but also how long each individual round of updates takes. ``` ignore - Echidnatron% cargo run --release --example hello 10000000 + Echidnatron% cargo run --release -- 10000000 Finished release [optimized + debuginfo] target(s) in 0.06s Running `target/release/examples/hello 10000000` 4.092895186s data loaded diff --git a/src/algorithms/graphs/sequential.rs b/src/algorithms/graphs/sequential.rs index 319c02def..f5a730638 100644 --- a/src/algorithms/graphs/sequential.rs +++ b/src/algorithms/graphs/sequential.rs @@ -52,8 +52,6 @@ where F: Fn(&N, &[(&V, isize)])->V+'static { - let _timer = ::std::time::Instant::now(); - // start iteration with None messages for all. state .map(|(node, _state)| (node, None)) diff --git a/src/capture.rs b/src/capture.rs index 33f706b85..c08f3c05b 100644 --- a/src/capture.rs +++ b/src/capture.rs @@ -42,47 +42,6 @@ pub struct Progress { pub counts: Vec<(T, usize)>, } -/// An iterator that yields with a `None` every so often. -pub struct YieldingIter { - /// When set, a time after which we should return `None`. - start: Option, - after: std::time::Duration, - iter: I, -} - -impl YieldingIter { - /// Construct a yielding iterator from an inter-yield duration. - pub fn new_from(iter: I, yield_after: std::time::Duration) -> Self { - Self { - start: None, - after: yield_after, - iter, - } - } -} - -impl Iterator for YieldingIter { - type Item = I::Item; - fn next(&mut self) -> Option { - if self.start.is_none() { - self.start = Some(std::time::Instant::now()); - } - let start = self.start.as_ref().unwrap(); - if start.elapsed() > self.after { - self.start = None; - None - } else { - match self.iter.next() { - Some(x) => Some(x), - None => { - self.start = None; - None - } - } - } - } -} - /// A simple sink for byte slices. pub trait Writer { /// Returns an amount of time to wait before retrying, or `None` for success. @@ -785,6 +744,8 @@ pub mod sink { // { // super::source::build(scope, |activator| { // let source = KafkaSource::new(addr, topic, group, activator); +// // An iterator combinator that yields every "duration" even if more items exist. +// // The implementation of such an iterator exists in the git history, or can be rewritten easily. // super::YieldingIter::new_from(Iter::::new_from(source), std::time::Duration::from_millis(10)) // }) // } diff --git a/src/difference.rs b/src/difference.rs index fc6c8680b..46c7ba3eb 100644 --- a/src/difference.rs +++ b/src/difference.rs @@ -22,9 +22,9 @@ pub use self::Abelian as Diff; /// There is a light presumption of commutativity here, in that while we will largely perform addition /// in order of timestamps, for many types of timestamps there is no total order and consequently no /// obvious order to respect. Non-commutative semigroups should be used with care. -pub trait Semigroup : ::std::marker::Sized + Data + Clone { +pub trait Semigroup : Data + Clone { /// The method of `std::ops::AddAssign`, for types that do not implement `AddAssign`. - fn plus_equals(&mut self, rhs: &Self); + fn plus_equals(&mut self, rhs: &Rhs); /// Returns true if the element is the additive identity. /// /// This is primarily used by differential dataflow to know when it is safe to delete an update. @@ -233,22 +233,43 @@ mod vector { impl Semigroup for Vec { fn plus_equals(&mut self, rhs: &Self) { - // Ensure sufficient length to receive addition. + self.plus_equals(&rhs[..]) + } + fn is_zero(&self) -> bool { + self.iter().all(|x| x.is_zero()) + } + } + + impl Semigroup<[R]> for Vec { + fn plus_equals(&mut self, rhs: &[R]) { + // Apply all updates to existing elements + for (index, update) in rhs.iter().enumerate().take(self.len()) { + self[index].plus_equals(update); + } + + // Clone leftover elements from `rhs` while self.len() < rhs.len() { let element = &rhs[self.len()]; self.push(element.clone()); } - - // As other is not longer, apply updates without tests. - for (index, update) in rhs.iter().enumerate() { - self[index].plus_equals(update); - } } fn is_zero(&self) -> bool { self.iter().all(|x| x.is_zero()) } } + #[cfg(test)] + mod tests { + use crate::difference::Semigroup; + + #[test] + fn test_semigroup_vec() { + let mut a = vec![1,2,3]; + a.plus_equals([1,1,1,1].as_slice()); + assert_eq!(vec![2,3,4,1], a); + } + } + impl Monoid for Vec { fn zero() -> Self { Self::new() diff --git a/src/operators/arrange/arrangement.rs b/src/operators/arrange/arrangement.rs index 13bbb915b..d8fb9720d 100644 --- a/src/operators/arrange/arrangement.rs +++ b/src/operators/arrange/arrangement.rs @@ -75,6 +75,8 @@ where use ::timely::dataflow::scopes::Child; use ::timely::progress::timestamp::Refines; +use timely::Container; +use timely::container::PushInto; impl Arranged where @@ -292,7 +294,8 @@ where F: Fn(T2::Val<'_>) -> V + 'static, T2::Diff: Abelian, T2::Batch: Batch, - T2::Builder: Builder, + ::Input: Container, + ((T1::KeyOwned, V), T2::Time, T2::Diff): PushInto<::Input>, L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(V, T2::Diff)>)+'static, { self.reduce_core::<_,V,F,T2>(name, from, move |key, input, output, change| { @@ -311,7 +314,8 @@ where V: Data, F: Fn(T2::Val<'_>) -> V + 'static, T2::Batch: Batch, - T2::Builder: Builder, + ::Input: Container, + ((T1::KeyOwned, V), T2::Time, T2::Diff): PushInto<::Input>, L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(V, T2::Diff)>, &mut Vec<(V, T2::Diff)>)+'static, { use crate::operators::reduce::reduce_trace; diff --git a/src/operators/arrange/upsert.rs b/src/operators/arrange/upsert.rs index 758ec8df3..a36781a5e 100644 --- a/src/operators/arrange/upsert.rs +++ b/src/operators/arrange/upsert.rs @@ -138,7 +138,7 @@ where F: Fn(Tr::Val<'_>) -> V + 'static, Tr::Time: TotalOrder+ExchangeData, Tr::Batch: Batch, - Tr::Builder: Builder, + Tr::Builder: Builder>, { let mut reader: Option> = None; @@ -244,14 +244,14 @@ where let mut builder = Tr::Builder::new(); for (key, mut list) in to_process.drain(..) { - use trace::cursor::MyTrait; + use trace::cursor::IntoOwned; // The prior value associated with the key. let mut prev_value: Option = None; // Attempt to find the key in the trace. - trace_cursor.seek_key_owned(&trace_storage, &key); - if trace_cursor.get_key(&trace_storage).map(|k| k.equals(&key)).unwrap_or(false) { + trace_cursor.seek_key(&trace_storage, IntoOwned::borrow_as(&key)); + if trace_cursor.get_key(&trace_storage).map(|k| k.eq(&IntoOwned::borrow_as(&key))).unwrap_or(false) { // Determine the prior value associated with the key. while let Some(val) = trace_cursor.get_val(&trace_storage) { let mut count = 0; @@ -282,9 +282,7 @@ where } // Must insert updates in (key, val, time) order. updates.sort(); - for update in updates.drain(..) { - builder.push(update); - } + builder.push(&mut updates); } let batch = builder.done(prev_frontier.clone(), upper.clone(), Antichain::from_elem(G::Timestamp::minimum())); prev_frontier.clone_from(&upper); diff --git a/src/operators/consolidate.rs b/src/operators/consolidate.rs index ac0ef7f8f..86b7e729c 100644 --- a/src/operators/consolidate.rs +++ b/src/operators/consolidate.rs @@ -9,6 +9,7 @@ use timely::dataflow::Scope; use crate::{Collection, ExchangeData, Hashable}; +use crate::consolidation::ConsolidatingContainerBuilder; use crate::difference::Semigroup; use crate::Data; @@ -56,7 +57,7 @@ where Tr::Batcher: Batcher>, { use crate::operators::arrange::arrangement::Arrange; - use crate::trace::cursor::MyTrait; + use crate::trace::cursor::IntoOwned; self.map(|k| (k, ())) .arrange_named::(name) .as_collection(|d, _| d.into_owned()) @@ -92,14 +93,13 @@ where use crate::collection::AsCollection; self.inner - .unary(Pipeline, "ConsolidateStream", |_cap, _info| { + .unary::, _, _, _>(Pipeline, "ConsolidateStream", |_cap, _info| { let mut vector = Vec::new(); move |input, output| { input.for_each(|time, data| { data.swap(&mut vector); - crate::consolidation::consolidate_updates(&mut vector); - output.session(&time).give_container(&mut vector); + output.session_with_builder(&time).give_container(&mut vector); }) } }) diff --git a/src/operators/count.rs b/src/operators/count.rs index 0779bde06..42a030786 100644 --- a/src/operators/count.rs +++ b/src/operators/count.rs @@ -70,7 +70,7 @@ where move |input, output| { - use crate::trace::cursor::MyTrait; + use crate::trace::cursor::IntoOwned; input.for_each(|capability, batches| { batches.swap(&mut buffer); let mut session = output.session(&capability); diff --git a/src/operators/reduce.rs b/src/operators/reduce.rs index dda549bca..ffaf1652b 100644 --- a/src/operators/reduce.rs +++ b/src/operators/reduce.rs @@ -5,6 +5,8 @@ //! to the key and the list of values. //! The function is expected to populate a list of output values. +use timely::Container; +use timely::container::PushInto; use crate::hashable::Hashable; use crate::{Data, ExchangeData, Collection}; use crate::difference::{Semigroup, Abelian}; @@ -252,7 +254,7 @@ pub trait ReduceCore where F: Fn(T2::Val<'_>) -> V + 'static, T2::Diff: Abelian, T2::Batch: Batch, - T2::Builder: Builder, + T2::Builder: Builder>, L: FnMut(&K, &[(&V, R)], &mut Vec<(V, T2::Diff)>)+'static, { self.reduce_core::<_,_,T2>(name, from, move |key, input, output, change| { @@ -274,7 +276,7 @@ pub trait ReduceCore where T2: for<'a> Trace=&'a K, Time=G::Timestamp>+'static, F: Fn(T2::Val<'_>) -> V + 'static, T2::Batch: Batch, - T2::Builder: Builder, + T2::Builder: Builder>, L: FnMut(&K, &[(&V, R)], &mut Vec<(V,T2::Diff)>, &mut Vec<(V, T2::Diff)>)+'static, ; } @@ -293,7 +295,7 @@ where F: Fn(T2::Val<'_>) -> V + 'static, T2: for<'a> Trace=&'a K, Time=G::Timestamp>+'static, T2::Batch: Batch, - T2::Builder: Builder, + T2::Builder: Builder>, L: FnMut(&K, &[(&V, R)], &mut Vec<(V,T2::Diff)>, &mut Vec<(V, T2::Diff)>)+'static, { self.arrange_by_key_named(&format!("Arrange: {}", name)) @@ -312,7 +314,8 @@ where V: Data, F: Fn(T2::Val<'_>) -> V + 'static, T2::Batch: Batch, - T2::Builder: Builder, + ::Input: Container, + ((T1::KeyOwned, V), T2::Time, T2::Diff): PushInto<::Input>, L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(V,T2::Diff)>, &mut Vec<(V, T2::Diff)>)+'static, { let mut result_trace = None; @@ -454,6 +457,8 @@ where builders.push(T2::Builder::new()); } + let mut buffer = Default::default(); + // cursors for navigating input and output traces. let (mut source_cursor, source_storage): (T1::Cursor, _) = source_trace.cursor_through(lower_limit.borrow()).expect("failed to acquire source cursor"); let source_storage = &source_storage; @@ -473,10 +478,10 @@ where while batch_cursor.key_valid(batch_storage) || exposed_position < exposed.len() { use std::borrow::Borrow; - use crate::trace::cursor::MyTrait; + use crate::trace::cursor::IntoOwned; // Determine the next key we will work on; could be synthetic, could be from a batch. - let key1 = exposed.get(exposed_position).map(|x| <_ as MyTrait>::borrow_as(&x.0)); + let key1 = exposed.get(exposed_position).map(|x| <_ as IntoOwned>::borrow_as(&x.0)); let key2 = batch_cursor.get_key(batch_storage); let key = match (key1, key2) { (Some(key1), Some(key2)) => ::std::cmp::min(key1, key2), @@ -492,7 +497,7 @@ where interesting_times.clear(); // Populate `interesting_times` with synthetic interesting times (below `upper_limit`) for this key. - while exposed.get(exposed_position).map(|x| x.0.borrow()).map(|k| key.equals(k)).unwrap_or(false) { + while exposed.get(exposed_position).map(|x| x.0.borrow()).map(|k| key.eq(& as IntoOwned>::borrow_as(&k))).unwrap_or(false) { interesting_times.push(exposed[exposed_position].1.clone()); exposed_position += 1; } @@ -531,7 +536,9 @@ where for index in 0 .. buffers.len() { buffers[index].1.sort_by(|x,y| x.0.cmp(&y.0)); for (val, time, diff) in buffers[index].1.drain(..) { - builders[index].push(((key.into_owned(), val), time, diff)); + ((key.into_owned(), val), time, diff).push_into(&mut buffer); + builders[index].push(&mut buffer); + buffer.clear(); } } } @@ -648,7 +655,7 @@ where where F: Fn(C2::Val<'_>) -> V, L: FnMut( - C1::Key<'a>, + C1::Key<'a>, &[(C1::Val<'a>, C1::Diff)], &mut Vec<(V, C2::Diff)>, &mut Vec<(V, C2::Diff)>, @@ -728,7 +735,7 @@ mod history_replay { where F: Fn(C2::Val<'_>) -> V, L: FnMut( - C1::Key<'a>, + C1::Key<'a>, &[(C1::Val<'a>, C1::Diff)], &mut Vec<(V, C2::Diff)>, &mut Vec<(V, C2::Diff)>, @@ -1020,7 +1027,7 @@ mod history_replay { new_interesting.push(next_time.clone()); debug_assert!(outputs.iter().any(|(t,_)| t.less_equal(&next_time))) } - + // Update `meet` to track the meet of each source of times. meet = None;//T::maximum(); diff --git a/src/trace/cursor/mod.rs b/src/trace/cursor/mod.rs index 927dba4b2..2631c2aea 100644 --- a/src/trace/cursor/mod.rs +++ b/src/trace/cursor/mod.rs @@ -14,56 +14,40 @@ pub mod cursor_list; pub use self::cursor_list::CursorList; use std::borrow::Borrow; -use std::cmp::Ordering; -/// A type that may be converted into and compared with another type. +/// A reference type corresponding to an owned type, supporting conversion in each direction. /// -/// The type must also be comparable with itself, and follow the same -/// order as if converting instances to `T` and comparing the results. -pub trait MyTrait<'a> : Ord { +/// This trait can be implemented by a GAT, and enables owned types to be borrowed as a GAT. +/// This trait is analogous to `ToOwned`, but not as prescriptive. Specifically, it avoids the +/// requirement that the other trait implement `Borrow`, for which a borrow must result in a +/// `&'self Borrowed`, which cannot move the lifetime into a GAT borrowed type. +pub trait IntoOwned<'a> { /// Owned type into which this type can be converted. type Owned; /// Conversion from an instance of this type to the owned type. fn into_owned(self) -> Self::Owned; - /// + /// Clones `self` onto an existing instance of the owned type. fn clone_onto(&self, other: &mut Self::Owned); - /// Indicates that `self <= other`; used for sorting. - fn compare(&self, other: &Self::Owned) -> Ordering; - /// `self <= other` - fn less_equals(&self, other: &Self::Owned) -> bool { - self.compare(other) != Ordering::Greater - } - /// `self == other` - fn equals(&self, other: &Self::Owned) -> bool { - self.compare(other) == Ordering::Equal - } - /// `self < other` - fn less_than(&self, other: &Self::Owned) -> bool { - self.compare(other) == Ordering::Less - } - /// Borrows an owned instance as onesself. - fn borrow_as(other: &'a Self::Owned) -> Self; + /// Borrows an owned instance as oneself. + fn borrow_as(owned: &'a Self::Owned) -> Self; } -impl<'a, T: Ord+ToOwned+?Sized> MyTrait<'a> for &'a T { +impl<'a, T: ToOwned+?Sized> IntoOwned<'a> for &'a T { type Owned = T::Owned; fn into_owned(self) -> Self::Owned { self.to_owned() } fn clone_onto(&self, other: &mut Self::Owned) { ::clone_into(self, other) } - fn compare(&self, other: &Self::Owned) -> Ordering { self.cmp(&other.borrow()) } - fn borrow_as(other: &'a Self::Owned) -> Self { - other.borrow() - } + fn borrow_as(owned: &'a Self::Owned) -> Self { owned.borrow() } } /// A cursor for navigating ordered `(key, val, time, diff)` updates. pub trait Cursor { /// Key by which updates are indexed. - type Key<'a>: Copy + Clone + MyTrait<'a, Owned = Self::KeyOwned>; + type Key<'a>: Copy + Clone + Ord + IntoOwned<'a, Owned = Self::KeyOwned>; /// Owned version of the above. type KeyOwned: Ord + Clone; /// Values associated with keys. - type Val<'a>: Copy + Clone + MyTrait<'a> + for<'b> PartialOrd>; + type Val<'a>: Copy + Clone + Ord + IntoOwned<'a> + for<'b> PartialOrd>; /// Timestamps associated with updates type Time: Timestamp + Lattice + Ord + Clone; /// Associated update. @@ -103,10 +87,6 @@ pub trait Cursor { fn step_key(&mut self, storage: &Self::Storage); /// Advances the cursor to the specified key. fn seek_key(&mut self, storage: &Self::Storage, key: Self::Key<'_>); - /// Convenience method to get access by reference to an owned key. - fn seek_key_owned(&mut self, storage: &Self::Storage, key: &Self::KeyOwned) { - self.seek_key(storage, MyTrait::borrow_as(key)); - } /// Advances the cursor to the next value. fn step_val(&mut self, storage: &Self::Storage); diff --git a/src/trace/implementations/huffman_container.rs b/src/trace/implementations/huffman_container.rs index 290b5d870..4fa67b2c7 100644 --- a/src/trace/implementations/huffman_container.rs +++ b/src/trace/implementations/huffman_container.rs @@ -32,12 +32,8 @@ where } } -impl BatchContainer for HuffmanContainer -where - B: Ord + Clone + Sized + 'static, -{ - type PushItem = Vec; - type ReadItem<'a> = Wrapped<'a, B>; +use crate::trace::implementations::containers::Push; +impl Push> for HuffmanContainer { fn push(&mut self, item: Vec) { for x in item.iter() { *self.stats.entry(x.clone()).or_insert(0) += 1; } match &mut self.inner { @@ -51,10 +47,12 @@ where } } } - fn copy_push(&mut self, item: &Vec) { - use crate::trace::MyTrait; - self.copy(<_ as MyTrait>::borrow_as(item)); - } +} + +impl BatchContainer for HuffmanContainer { + type OwnedItem = Vec; + type ReadItem<'a> = Wrapped<'a, B>; + fn copy(&mut self, item: Self::ReadItem<'_>) { match item.decode() { Ok(decoded) => { @@ -152,7 +150,7 @@ impl Default for HuffmanContainer { mod wrapper { - use crate::trace::MyTrait; + use crate::trace::IntoOwned; use super::Encoded; pub struct Wrapped<'a, B: Ord> { @@ -205,7 +203,7 @@ mod wrapper { self.partial_cmp(other).unwrap() } } - impl<'a, B: Ord+Clone> MyTrait<'a> for Wrapped<'a, B> { + impl<'a, B: Ord+Clone> IntoOwned<'a> for Wrapped<'a, B> { type Owned = Vec; fn into_owned(self) -> Self::Owned { match self.decode() { @@ -220,14 +218,8 @@ mod wrapper { Err(bytes) => other.extend_from_slice(bytes), } } - fn compare(&self, other: &Self::Owned) -> std::cmp::Ordering { - match self.decode() { - Ok(decode) => decode.partial_cmp(other.iter()).unwrap(), - Err(bytes) => bytes.cmp(&other[..]), - } - } - fn borrow_as(other: &'a Self::Owned) -> Self { - Self { inner: Err(&other[..]) } + fn borrow_as(owned: &'a Self::Owned) -> Self { + Self { inner: Err(&owned[..]) } } } } diff --git a/src/trace/implementations/merge_batcher.rs b/src/trace/implementations/merge_batcher.rs index 45e2a60f8..bb13cf650 100644 --- a/src/trace/implementations/merge_batcher.rs +++ b/src/trace/implementations/merge_batcher.rs @@ -282,7 +282,7 @@ where type Time = T; type Input = Vec<((K, V), T, R)>; type Chunk = Vec<((K, V), T, R)>; - type Output = ((K, V), T, R); + type Output = Vec<((K, V), T, R)>; fn accept(&mut self, container: RefOrMut, stash: &mut Vec) -> Vec { // Ensure `self.pending` has the desired capacity. We should never have a larger capacity @@ -497,8 +497,8 @@ where } let mut builder = B::with_capacity(keys, vals, upds); - for datum in chain.drain(..).flatten() { - builder.push(datum); + for mut chunk in chain.drain(..) { + builder.push(&mut chunk); } builder.done(lower.to_owned(), upper.to_owned(), since.to_owned()) diff --git a/src/trace/implementations/merge_batcher_col.rs b/src/trace/implementations/merge_batcher_col.rs index aed0039d8..265f2e649 100644 --- a/src/trace/implementations/merge_batcher_col.rs +++ b/src/trace/implementations/merge_batcher_col.rs @@ -67,7 +67,7 @@ where type Time = T; type Input = Vec<((K, V), T, R)>; type Chunk = TimelyStack<((K, V), T, R)>; - type Output = ((K, V), T, R); + type Output = TimelyStack<((K, V), T, R)>; fn accept(&mut self, container: RefOrMut, stash: &mut Vec) -> Vec { // Ensure `self.pending` has the desired capacity. We should never have a larger capacity @@ -290,11 +290,8 @@ where } } let mut builder = B::with_capacity(keys, vals, upds); - - for chunk in chain.drain(..) { - for datum in chunk.iter() { - builder.copy(datum); - } + for mut chunk in chain.drain(..) { + builder.push(&mut chunk); } builder.done(lower.to_owned(), upper.to_owned(), since.to_owned()) diff --git a/src/trace/implementations/mod.rs b/src/trace/implementations/mod.rs index cd0b8fd9a..ea14b09ce 100644 --- a/src/trace/implementations/mod.rs +++ b/src/trace/implementations/mod.rs @@ -52,9 +52,11 @@ pub use self::ord_neu::OrdValSpine as ValSpine; pub use self::ord_neu::OrdKeySpine as KeySpine; use std::borrow::{ToOwned}; -use std::cmp::Ordering; +use timely::Container; use timely::container::columnation::{Columnation, TimelyStack}; +use timely::container::PushInto; +use timely::progress::Timestamp; use crate::lattice::Lattice; use crate::difference::Semigroup; @@ -83,21 +85,22 @@ where type Diff = R; } +use crate::trace::implementations::containers::Push; + /// A type with opinions on how updates should be laid out. pub trait Layout { /// The represented update. type Target: Update + ?Sized; /// Container for update keys. - type KeyContainer: - BatchContainer::Key>; + type KeyContainer: BatchContainer::Key> + Push<::Key>; /// Container for update vals. - type ValContainer: - BatchContainer::Val>; + type ValContainer: BatchContainer::Val> + Push<::Val>; /// Container for update vals. type UpdContainer: - for<'a> BatchContainer::Time, ::Diff), ReadItem<'a> = &'a (::Time, ::Diff)>; + Push<(::Time, ::Diff)> + + for<'a> BatchContainer = &'a (::Time, ::Diff), OwnedItem = (::Time, ::Diff)>; /// Container for offsets. - type OffsetContainer: BatchContainer; + type OffsetContainer: BatchContainer + Push; } /// A layout that uses vectors @@ -138,10 +141,10 @@ where /// A type with a preferred container. /// -/// Examples include types that implement `Clone` who prefer +/// Examples include types that implement `Clone` who prefer pub trait PreferredContainer : ToOwned { /// The preferred container for the type. - type Container: BatchContainer; + type Container: BatchContainer + Push; } impl PreferredContainer for T { @@ -149,7 +152,7 @@ impl PreferredContainer for T { } impl PreferredContainer for [T] { - type Container = SliceContainer2; + type Container = SliceContainer; } /// An update and layout description based on preferred containers. @@ -161,8 +164,8 @@ impl Update for Preferred where K: ToOwned + ?Sized, K::Owned: Ord+Clone+'static, - V: ToOwned + ?Sized + 'static, - V::Owned: Ord+Clone, + V: ToOwned + ?Sized, + V::Owned: Ord+Clone+'static, T: Ord+Lattice+timely::progress::Timestamp+Clone, R: Semigroup+Clone, { @@ -177,8 +180,8 @@ where K: Ord+ToOwned+PreferredContainer + ?Sized, K::Owned: Ord+Clone+'static, // for<'a> K::Container: BatchContainer = &'a K>, - V: Ord+ToOwned+PreferredContainer + ?Sized + 'static, - V::Owned: Ord+Clone, + V: Ord+ToOwned+PreferredContainer + ?Sized, + V::Owned: Ord+Clone+'static, T: Ord+Lattice+timely::progress::Timestamp+Clone, D: Semigroup+Clone, { @@ -192,10 +195,10 @@ where use std::convert::TryInto; use std::ops::Deref; use abomonation_derive::Abomonation; -use crate::trace::cursor::MyTrait; +use crate::trace::cursor::IntoOwned; /// A list of unsigned integers that uses `u32` elements as long as they are small enough, and switches to `u64` once they are not. -#[derive(Eq, PartialEq, Ord, PartialOrd, Clone, Debug, Abomonation)] +#[derive(Eq, PartialEq, Ord, PartialOrd, Clone, Abomonation)] pub struct OffsetList { /// Length of a prefix of zero elements. pub zero_prefix: usize, @@ -205,6 +208,12 @@ pub struct OffsetList { pub chonk: Vec, } +impl std::fmt::Debug for OffsetList { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_list().entries(self.into_iter()).finish() + } +} + impl OffsetList { /// Allocate a new list with a specified capacity. pub fn with_capacity(cap: usize) -> Self { @@ -222,7 +231,7 @@ impl OffsetList { else if self.chonk.is_empty() { if let Ok(smol) = offset.try_into() { self.smol.push(smol); - } + } else { self.chonk.push(offset.try_into().unwrap()) } @@ -249,7 +258,42 @@ impl OffsetList { } } -/// Helper struct to provide `MyTrait` for `Copy` types. +impl<'a> IntoIterator for &'a OffsetList { + type Item = usize; + type IntoIter = OffsetListIter<'a>; + + fn into_iter(self) -> Self::IntoIter { + OffsetListIter {list: self, index: 0 } + } +} + +/// An iterator for [`OffsetList`]. +pub struct OffsetListIter<'a> { + list: &'a OffsetList, + index: usize, +} + +impl<'a> Iterator for OffsetListIter<'a> { + type Item = usize; + + fn next(&mut self) -> Option { + if self.index < self.list.len() { + let res = Some(self.list.index(self.index)); + self.index += 1; + res + } else { + None + } + } +} + +impl PushInto for usize { + fn push_into(self, target: &mut OffsetList) { + target.push(self); + } +} + +/// Helper struct to provide `IntoOwned` for `Copy` types. #[derive(Eq, PartialEq, Ord, PartialOrd, Clone, Copy)] pub struct Wrapper(T); @@ -261,7 +305,7 @@ impl Deref for Wrapper { } } -impl<'a, T: Copy + Ord> MyTrait<'a> for Wrapper { +impl<'a, T: Copy + Ord> IntoOwned<'a> for Wrapper { type Owned = T; fn into_owned(self) -> Self::Owned { @@ -272,27 +316,21 @@ impl<'a, T: Copy + Ord> MyTrait<'a> for Wrapper { *other = self.0; } - fn compare(&self, other: &Self::Owned) -> Ordering { - self.0.cmp(other) + fn borrow_as(owned: &'a Self::Owned) -> Self { + Self(*owned) } +} - fn borrow_as(other: &'a Self::Owned) -> Self { - Self(*other) +impl Push for OffsetList { + fn push(&mut self, item: usize) { + self.push(item); } } impl BatchContainer for OffsetList { - type PushItem = usize; + type OwnedItem = usize; type ReadItem<'a> = Wrapper; - fn push(&mut self, item: Self::PushItem) { - self.push(item); - } - - fn copy_push(&mut self, item: &Self::PushItem) { - self.push(*item); - } - fn copy(&mut self, item: Self::ReadItem<'_>) { self.push(item.0); } @@ -320,32 +358,134 @@ impl BatchContainer for OffsetList { } } -pub use self::containers::{BatchContainer, SliceContainer, SliceContainer2}; +/// Behavior to split an update into principal components. +pub trait BuilderInput: Container { + /// Key portion + type Key<'a>: Ord; + /// Value portion + type Val<'a>: Ord; + /// Time + type Time; + /// Diff + type Diff; + + /// Split an item into separate parts. + fn into_parts<'a>(item: Self::Item<'a>) -> (Self::Key<'a>, Self::Val<'a>, Self::Time, Self::Diff); + + /// Test that the key equals a key in the layout's key container. + fn key_eq(this: &Self::Key<'_>, other: ::ReadItem<'_>) -> bool; + + /// Test that the value equals a key in the layout's value container. + fn val_eq(this: &Self::Val<'_>, other: ::ReadItem<'_>) -> bool; +} + +impl BuilderInput> for Vec<((K, V), T, R)> +where + K: Ord + Clone + 'static, + V: Ord + Clone + 'static, + T: Timestamp + Lattice + Clone + 'static, + R: Semigroup + Clone + 'static, +{ + type Key<'a> = K; + type Val<'a> = V; + type Time = T; + type Diff = R; + + fn into_parts<'a>(((key, val), time, diff): Self::Item<'a>) -> (Self::Key<'a>, Self::Val<'a>, Self::Time, Self::Diff) { + (key, val, time, diff) + } + + fn key_eq(this: &K, other: &K) -> bool { + this == other + } + + fn val_eq(this: &V, other: &V) -> bool { + this == other + } +} + +impl BuilderInput> for TimelyStack<((K, V), T, R)> +where + K: Ord + Columnation + Clone + 'static, + V: Ord + Columnation + Clone + 'static, + T: Timestamp + Lattice + Columnation + Clone + 'static, + R: Semigroup + Columnation + Clone + 'static, +{ + type Key<'a> = &'a K; + type Val<'a> = &'a V; + type Time = T; + type Diff = R; + + fn into_parts<'a>(((key, val), time, diff): Self::Item<'a>) -> (Self::Key<'a>, Self::Val<'a>, Self::Time, Self::Diff) { + (key, val, time.clone(), diff.clone()) + } + + fn key_eq(this: &&K, other: &K) -> bool { + *this == other + } + + fn val_eq(this: &&V, other: &V) -> bool { + *this == other + } +} + +impl BuilderInput> for TimelyStack<((::Owned, ::Owned), T, R)> +where + K: Ord+ToOwned+PreferredContainer + ?Sized, + K::Owned: Columnation + Ord+Clone+'static, + V: Ord+ToOwned+PreferredContainer + ?Sized, + V::Owned: Columnation + Ord+Clone+'static, + T: Columnation + Ord+Lattice+Timestamp+Clone, + R: Columnation + Semigroup+Clone, +{ + type Key<'a> = &'a K::Owned; + type Val<'a> = &'a V::Owned; + type Time = T; + type Diff = R; + + fn into_parts<'a>(((key, val), time, diff): Self::Item<'a>) -> (Self::Key<'a>, Self::Val<'a>, Self::Time, Self::Diff) { + (key, val, time.clone(), diff.clone()) + } + + fn key_eq(this: &&K::Owned, other: <::Container as BatchContainer>::ReadItem<'_>) -> bool { + other.eq(&<<::Container as BatchContainer>::ReadItem<'_> as IntoOwned>::borrow_as(this)) + } + + fn val_eq(this: &&V::Owned, other: <::Container as BatchContainer>::ReadItem<'_>) -> bool { + other.eq(&<<::Container as BatchContainer>::ReadItem<'_> as IntoOwned>::borrow_as(this)) + } +} + +pub use self::containers::{BatchContainer, SliceContainer}; /// Containers for data that resemble `Vec`, with leaner implementations. pub mod containers { use timely::container::columnation::{Columnation, TimelyStack}; + use timely::container::PushInto; use std::borrow::ToOwned; - use crate::trace::MyTrait; + use crate::trace::IntoOwned; + + /// Supports the ability to receive an item of type `T`. + pub trait Push { + /// Pushes the item into `self`. + fn push(&mut self, item: T); + } + + impl Push for TimelyStack { + fn push(&mut self, item: T) { + self.copy(&item); + } + } /// A general-purpose container resembling `Vec`. pub trait BatchContainer: 'static { - /// The type of contained item. - /// - /// The container only supplies references to the item, so it needn't be sized. - type PushItem; + /// An type that all `Self::ReadItem<'_>` can be converted into. + type OwnedItem; /// The type that can be read back out of the container. - type ReadItem<'a>: Copy + MyTrait<'a, Owned = Self::PushItem> + for<'b> PartialOrd>; - /// Inserts an owned item. - fn push(&mut self, item: Self::PushItem) { - self.copy_push(&item); - } - /// Inserts an owned item. - fn copy_push(&mut self, item: &Self::PushItem) { - self.copy(MyTrait::borrow_as(item)); - } + type ReadItem<'a>: Copy + IntoOwned<'a, Owned = Self::OwnedItem> + Ord + for<'b> PartialOrd>; + /// Inserts a borrowed item. fn copy(&mut self, item: Self::ReadItem<'_>); /// Extends from a range of items in another`Self`. @@ -420,18 +560,18 @@ pub mod containers { } } + impl Push for Vec { + fn push(&mut self, item: T) { + self.push(item); + } + } + // All `T: Clone` also implement `ToOwned`, but without the constraint Rust // struggles to understand why the owned type must be `T` (i.e. the one blanket impl). impl BatchContainer for Vec { - type PushItem = T; - type ReadItem<'a> = &'a Self::PushItem; + type OwnedItem = T; + type ReadItem<'a> = &'a Self::OwnedItem; - fn push(&mut self, item: T) { - self.push(item); - } - fn copy_push(&mut self, item: &T) { - self.copy(item); - } fn copy(&mut self, item: &T) { self.push(item.clone()); } @@ -455,12 +595,9 @@ pub mod containers { // The `ToOwned` requirement exists to satisfy `self.reserve_items`, who must for now // be presented with the actual contained type, rather than a type that borrows into it. impl + 'static> BatchContainer for TimelyStack { - type PushItem = T; - type ReadItem<'a> = &'a Self::PushItem; + type OwnedItem = T; + type ReadItem<'a> = &'a Self::OwnedItem; - fn copy_push(&mut self, item: &Self::PushItem) { - self.copy(item); - } fn copy(&mut self, item: &T) { self.copy(item); } @@ -498,145 +635,36 @@ pub mod containers { inner: Vec, } - impl BatchContainer for SliceContainer - where - B: Ord + Clone + Sized + 'static, - { - type PushItem = Vec; - type ReadItem<'a> = &'a [B]; - fn push(&mut self, item: Vec) { - for x in item.into_iter() { - self.inner.push(x); - } - self.offsets.push(self.inner.len()); - } - fn copy_push(&mut self, item: &Vec) { - self.copy(&item[..]); - } - fn copy(&mut self, item: Self::ReadItem<'_>) { - for x in item.iter() { - self.inner.copy(x); - } - self.offsets.push(self.inner.len()); - } - fn copy_range(&mut self, other: &Self, start: usize, end: usize) { - for index in start .. end { - self.copy(other.index(index)); - } - } - fn with_capacity(size: usize) -> Self { - let mut offsets = Vec::with_capacity(size + 1); - offsets.push(0); - Self { - offsets, - inner: Vec::with_capacity(size), - } - } - fn merge_capacity(cont1: &Self, cont2: &Self) -> Self { - let mut offsets = Vec::with_capacity(cont1.inner.len() + cont2.inner.len() + 1); - offsets.push(0); - Self { - offsets, - inner: Vec::with_capacity(cont1.inner.len() + cont2.inner.len()), - } - } - fn index(&self, index: usize) -> Self::ReadItem<'_> { - let lower = self.offsets[index]; - let upper = self.offsets[index+1]; - &self.inner[lower .. upper] - } - fn len(&self) -> usize { - self.offsets.len() - 1 - } - } - - /// Default implementation introduces a first offset. - impl Default for SliceContainer { - fn default() -> Self { - Self { - offsets: vec![0], - inner: Default::default(), - } + impl PushInto> for &[B] { + fn push_into(self, target: &mut SliceContainer) { + target.copy(self) } } - /// A container that accepts slices `[B::Item]`. - pub struct SliceContainer2 { - text: String, - /// Offsets that bound each contained slice. - /// - /// The length will be one greater than the number of contained slices, - /// starting with zero and ending with `self.inner.len()`. - offsets: Vec, - /// An inner container for sequences of `B` that dereferences to a slice. - inner: Vec, - } - - /// Welcome to GATs! - pub struct Greetings<'a, B> { - /// Text that decorates the data. - pub text: Option<&'a str>, - /// The data itself. - pub slice: &'a [B], - } - - impl<'a, B> Copy for Greetings<'a, B> { } - impl<'a, B> Clone for Greetings<'a, B> { - fn clone(&self) -> Self { *self } - } - - use std::cmp::Ordering; - impl<'a, 'b, B: Ord> PartialEq> for Greetings<'b, B> { - fn eq(&self, other: &Greetings<'a, B>) -> bool { - self.slice.eq(other.slice) - } - } - impl<'a, B: Ord> Eq for Greetings<'a, B> { } - impl<'a, 'b, B: Ord> PartialOrd> for Greetings<'b, B> { - fn partial_cmp(&self, other: &Greetings<'a, B>) -> Option { - self.slice.partial_cmp(other.slice) - } - } - impl<'a, B: Ord> Ord for Greetings<'a, B> { - fn cmp(&self, other: &Self) -> Ordering { - self.partial_cmp(other).unwrap() + impl PushInto> for &Vec { + fn push_into(self, target: &mut SliceContainer) { + target.copy(self) } } - impl<'a, B: Ord + Clone> MyTrait<'a> for Greetings<'a, B> { - type Owned = Vec; - fn into_owned(self) -> Self::Owned { self.slice.to_vec() } - fn clone_onto(&self, other: &mut Self::Owned) { - self.slice.clone_into(other); - } - fn compare(&self, other: &Self::Owned) -> std::cmp::Ordering { - self.slice.cmp(&other[..]) - } - fn borrow_as(other: &'a Self::Owned) -> Self { - Self { - text: None, - slice: &other[..], + impl Push> for SliceContainer { + fn push(&mut self, item: Vec) { + for x in item.into_iter() { + self.inner.push(x); } + self.offsets.push(self.inner.len()); } } - impl BatchContainer for SliceContainer2 + impl BatchContainer for SliceContainer where B: Ord + Clone + Sized + 'static, { - type PushItem = Vec; - type ReadItem<'a> = Greetings<'a, B>; - fn push(&mut self, item: Vec) { - for x in item.into_iter() { - self.inner.push(x); - } - self.offsets.push(self.inner.len()); - } - fn copy_push(&mut self, item: &Vec) { - self.copy(<_ as MyTrait>::borrow_as(item)); - } + type OwnedItem = Vec; + type ReadItem<'a> = &'a [B]; + fn copy(&mut self, item: Self::ReadItem<'_>) { - for x in item.slice.iter() { + for x in item.iter() { self.inner.copy(x); } self.offsets.push(self.inner.len()); @@ -650,7 +678,6 @@ pub mod containers { let mut offsets = Vec::with_capacity(size + 1); offsets.push(0); Self { - text: format!("Hello!"), offsets, inner: Vec::with_capacity(size), } @@ -659,7 +686,6 @@ pub mod containers { let mut offsets = Vec::with_capacity(cont1.inner.len() + cont2.inner.len() + 1); offsets.push(0); Self { - text: format!("Hello!"), offsets, inner: Vec::with_capacity(cont1.inner.len() + cont2.inner.len()), } @@ -667,10 +693,7 @@ pub mod containers { fn index(&self, index: usize) -> Self::ReadItem<'_> { let lower = self.offsets[index]; let upper = self.offsets[index+1]; - Greetings { - text: Some(&self.text), - slice: &self.inner[lower .. upper], - } + &self.inner[lower .. upper] } fn len(&self) -> usize { self.offsets.len() - 1 @@ -678,10 +701,9 @@ pub mod containers { } /// Default implementation introduces a first offset. - impl Default for SliceContainer2 { + impl Default for SliceContainer { fn default() -> Self { Self { - text: format!("Hello!"), offsets: vec![0], inner: Default::default(), } diff --git a/src/trace/implementations/option_container.rs b/src/trace/implementations/option_container.rs index 18f93de74..a6d2eff03 100644 --- a/src/trace/implementations/option_container.rs +++ b/src/trace/implementations/option_container.rs @@ -1,6 +1,6 @@ //! A container optimized for identical contents. -use crate::trace::cursor::MyTrait; +use crate::trace::cursor::IntoOwned; use crate::trace::implementations::BatchContainer; /// A container that effectively represents default values. @@ -14,15 +14,13 @@ pub struct OptionContainer { container: C, } -impl BatchContainer for OptionContainer +use crate::trace::implementations::containers::Push; +impl Push for OptionContainer where - C: BatchContainer, - C::PushItem: Default + Ord, + C: BatchContainer + Push, + C::OwnedItem: Default + Ord, { - type PushItem = C::PushItem; - type ReadItem<'a> = OptionWrapper<'a, C>; - - fn push(&mut self, item: Self::PushItem) { + fn push(&mut self, item: C::OwnedItem) { if item == Default::default() && self.container.is_empty() { self.defaults += 1; } @@ -30,8 +28,18 @@ where self.container.push(item) } } +} + +impl BatchContainer for OptionContainer +where + C: BatchContainer , + C::OwnedItem: Default + Ord, +{ + type OwnedItem = C::OwnedItem; + type ReadItem<'a> = OptionWrapper<'a, C>; + fn copy<'a>(&mut self, item: Self::ReadItem<'a>) { - if item.equals(&Default::default()) && self.container.is_empty() { + if item.eq(&IntoOwned::borrow_as(&Default::default())) && self.container.is_empty() { self.defaults += 1; } else { @@ -39,7 +47,7 @@ where self.container.copy(item); } else { - self.container.push(Default::default()); + self.container.copy(IntoOwned::borrow_as(&Default::default())); } } } @@ -82,22 +90,22 @@ impl<'a, C: BatchContainer> Clone for OptionWrapper<'a, C> { use std::cmp::Ordering; impl<'a, 'b, C: BatchContainer> PartialEq> for OptionWrapper<'b, C> where - C::PushItem: Default + Ord, + C::OwnedItem: Default + Ord, { fn eq(&self, other: &OptionWrapper<'a, C>) -> bool { match (&self.inner, &other.inner) { (None, None) => true, - (None, Some(item2)) => item2.equals(&Default::default()), - (Some(item1), None) => item1.equals(&Default::default()), + (None, Some(item2)) => item2.eq(& as IntoOwned>::borrow_as(&Default::default())), + (Some(item1), None) => item1.eq(& as IntoOwned>::borrow_as(&Default::default())), (Some(item1), Some(item2)) => item1.eq(item2) } } } -impl<'a, C: BatchContainer> Eq for OptionWrapper<'a, C> where -C::PushItem: Default + Ord -{ } + +impl<'a, C: BatchContainer> Eq for OptionWrapper<'a, C> where C::OwnedItem: Default + Ord { } + impl<'a, 'b, C: BatchContainer> PartialOrd> for OptionWrapper<'b, C> where -C::PushItem: Default + Ord, +C::OwnedItem: Default + Ord, { fn partial_cmp(&self, other: &OptionWrapper<'a, C>) -> Option { let default = Default::default(); @@ -110,7 +118,7 @@ C::PushItem: Default + Ord, } } impl<'a, C: BatchContainer> Ord for OptionWrapper<'a, C> where -C::PushItem: Default + Ord, +C::OwnedItem: Default + Ord, { fn cmp(&self, other: &Self) -> Ordering { self.partial_cmp(other).unwrap() @@ -118,11 +126,11 @@ C::PushItem: Default + Ord, } -impl<'a, C: BatchContainer> MyTrait<'a> for OptionWrapper<'a, C> +impl<'a, C: BatchContainer> IntoOwned<'a> for OptionWrapper<'a, C> where - C::PushItem : Default + Ord, + C::OwnedItem : Default + Ord, { - type Owned = C::PushItem; + type Owned = C::OwnedItem; fn into_owned(self) -> Self::Owned { self.inner.map(|r| r.into_owned()).unwrap_or_else(Default::default) @@ -135,17 +143,9 @@ where *other = Default::default(); } } - fn compare(&self, other: &Self::Owned) -> std::cmp::Ordering { - if let Some(item) = &self.inner { - item.compare(other) - } - else { - ::default().cmp(other) - } - } - fn borrow_as(other: &'a Self::Owned) -> Self { + fn borrow_as(owned: &'a Self::Owned) -> Self { Self { - inner: Some(<_>::borrow_as(other)) + inner: Some(IntoOwned::borrow_as(owned)) } } } diff --git a/src/trace/implementations/ord_neu.rs b/src/trace/implementations/ord_neu.rs index a5afee109..a83cf728e 100644 --- a/src/trace/implementations/ord_neu.rs +++ b/src/trace/implementations/ord_neu.rs @@ -9,6 +9,7 @@ //! and should consume fewer resources (computation and memory) when it applies. use std::rc::Rc; +use timely::container::columnation::{TimelyStack}; use crate::trace::implementations::spine_fueled::Spine; use crate::trace::implementations::merge_batcher::{MergeBatcher, VecMerger}; @@ -24,7 +25,7 @@ pub use self::key_batch::{OrdKeyBatch, OrdKeyBuilder}; pub type OrdValSpine = Spine< Rc>>, MergeBatcher, T>, - RcBuilder>>, + RcBuilder, Vec<((K,V),T,R)>>>, >; // /// A trace implementation for empty values using a spine of ordered lists. // pub type OrdKeySpine = Spine>>>; @@ -33,14 +34,14 @@ pub type OrdValSpine = Spine< pub type ColValSpine = Spine< Rc>>, MergeBatcher, T>, - RcBuilder>>, + RcBuilder, TimelyStack<((K,V),T,R)>>>, >; /// A trace implementation using a spine of ordered lists. pub type OrdKeySpine = Spine< Rc>>, MergeBatcher, T>, - RcBuilder>>, + RcBuilder, Vec<((K,()),T,R)>>>, >; // /// A trace implementation for empty values using a spine of ordered lists. // pub type OrdKeySpine = Spine>>>; @@ -49,29 +50,33 @@ pub type OrdKeySpine = Spine< pub type ColKeySpine = Spine< Rc>>, MergeBatcher, T>, - RcBuilder>>, + RcBuilder, TimelyStack<((K,()),T,R)>>>, >; /// A trace implementation backed by columnar storage. pub type PreferredSpine = Spine< Rc>>, MergeBatcher::Owned,::Owned),T,R)>,T>, - RcBuilder>>, + RcBuilder, TimelyStack<((::Owned,::Owned),T,R)>>>, >; // /// A trace implementation backed by columnar storage. // pub type ColKeySpine = Spine>>>; + mod val_batch { use std::marker::PhantomData; use abomonation_derive::Abomonation; + use timely::container::PushInto; use timely::progress::{Antichain, frontier::AntichainRef}; + use crate::trace::implementations::containers::Push; + use crate::trace::{Batch, BatchReader, Builder, Cursor, Description, Merger}; - use crate::trace::implementations::BatchContainer; - use crate::trace::cursor::MyTrait; + use crate::trace::implementations::{BatchContainer, BuilderInput}; + use crate::trace::cursor::IntoOwned; use super::{Layout, Update}; @@ -148,7 +153,7 @@ mod val_batch { OrdValCursor { key_cursor: 0, val_cursor: 0, - phantom: std::marker::PhantomData, + phantom: PhantomData, } } fn len(&self) -> usize { @@ -189,7 +194,7 @@ mod val_batch { impl Merger> for OrdValMerger where - OrdValBatch: Batch::Time> + OrdValBatch: Batch::Time>, { fn new(batch1: &OrdValBatch, batch2: &OrdValBatch, compaction_frontier: AntichainRef<::Time>) -> Self { @@ -413,7 +418,7 @@ mod val_batch { if !self.update_stash.is_empty() { // If there is a single element, equal to a just-prior recorded update, // we push nothing and report an unincremented offset to encode this case. - if self.update_stash.len() == 1 && self.result.updates.last().map(|ud| self.update_stash.last().unwrap().equals(ud)).unwrap_or(false) { + if self.update_stash.len() == 1 && self.result.updates.last().map(|ud| self.update_stash.last().unwrap().eq(IntoOwned::borrow_as(ud))).unwrap_or(false) { // Just clear out update_stash, as we won't drain it here. self.update_stash.clear(); self.singletons += 1; @@ -498,7 +503,7 @@ mod val_batch { } /// A builder for creating layers from unsorted update tuples. - pub struct OrdValBuilder { + pub struct OrdValBuilder { result: OrdValStorage, singleton: Option<(::Time, ::Diff)>, /// Counts the number of singleton optimizations we performed. @@ -506,9 +511,10 @@ mod val_batch { /// This number allows us to correctly gauge the total number of updates reflected in a batch, /// even though `updates.len()` may be much shorter than this amount. singletons: usize, + _marker: PhantomData, } - impl OrdValBuilder { + impl OrdValBuilder { /// Pushes a single update, which may set `self.singleton` rather than push. /// /// This operation is meant to be equivalent to `self.results.updates.push((time, diff))`. @@ -536,9 +542,15 @@ mod val_batch { } } - impl Builder for OrdValBuilder { + impl Builder for OrdValBuilder + where + L: Layout, + CI: for<'a> BuilderInput::Time, Diff=::Diff>, + for<'a> CI::Key<'a>: PushInto, + for<'a> CI::Val<'a>: PushInto, + { - type Input = ((::Key, ::Val), ::Time, ::Diff); + type Input = CI; type Time = ::Time; type Output = OrdValBatch; @@ -554,62 +566,35 @@ mod val_batch { }, singleton: None, singletons: 0, + _marker: PhantomData, } } #[inline] - fn push(&mut self, ((key, val), time, diff): Self::Input) { - - // Perhaps this is a continuation of an already received key. - if self.result.keys.last().map(|k| k.equals(&key)).unwrap_or(false) { - // Perhaps this is a continuation of an already received value. - if self.result.vals.last().map(|v| v.equals(&val)).unwrap_or(false) { - self.push_update(time, diff); + fn push(&mut self, chunk: &mut Self::Input) { + for item in chunk.drain() { + let (key, val, time, diff) = CI::into_parts(item); + // Perhaps this is a continuation of an already received key. + if self.result.keys.last().map(|k| CI::key_eq(&key, k)).unwrap_or(false) { + // Perhaps this is a continuation of an already received value. + if self.result.vals.last().map(|v| CI::val_eq(&val, v)).unwrap_or(false) { + self.push_update(time, diff); + } else { + // New value; complete representation of prior value. + self.result.vals_offs.push(self.result.updates.len()); + if self.singleton.take().is_some() { self.singletons += 1; } + self.push_update(time, diff); + val.push_into(&mut self.result.vals); + } } else { - // New value; complete representation of prior value. + // New key; complete representation of prior key. self.result.vals_offs.push(self.result.updates.len()); if self.singleton.take().is_some() { self.singletons += 1; } + self.result.keys_offs.push(self.result.vals.len()); self.push_update(time, diff); - self.result.vals.push(val); + val.push_into(&mut self.result.vals); + key.push_into(&mut self.result.keys); } - } else { - // New key; complete representation of prior key. - self.result.vals_offs.push(self.result.updates.len()); - if self.singleton.take().is_some() { self.singletons += 1; } - self.result.keys_offs.push(self.result.vals.len()); - self.push_update(time, diff); - self.result.vals.push(val); - self.result.keys.push(key); - } - } - - #[inline] - fn copy(&mut self, ((key, val), time, diff): &Self::Input) { - - // Perhaps this is a continuation of an already received key. - if self.result.keys.last().map(|k| k.equals(key)).unwrap_or(false) { - // Perhaps this is a continuation of an already received value. - if self.result.vals.last().map(|v| v.equals(val)).unwrap_or(false) { - // TODO: here we could look for repetition, and not push the update in that case. - // More logic (and state) would be required to correctly wrangle this. - self.push_update(time.clone(), diff.clone()); - } else { - // New value; complete representation of prior value. - self.result.vals_offs.push(self.result.updates.len()); - // Remove any pending singleton, and if it was set increment our count. - if self.singleton.take().is_some() { self.singletons += 1; } - self.push_update(time.clone(), diff.clone()); - self.result.vals.copy_push(val); - } - } else { - // New key; complete representation of prior key. - self.result.vals_offs.push(self.result.updates.len()); - // Remove any pending singleton, and if it was set increment our count. - if self.singleton.take().is_some() { self.singletons += 1; } - self.result.keys_offs.push(self.result.vals.len()); - self.push_update(time.clone(), diff.clone()); - self.result.vals.copy_push(val); - self.result.keys.copy_push(key); } } @@ -634,11 +619,13 @@ mod key_batch { use std::marker::PhantomData; use abomonation_derive::Abomonation; + use timely::container::PushInto; use timely::progress::{Antichain, frontier::AntichainRef}; + use crate::trace::implementations::containers::Push; use crate::trace::{Batch, BatchReader, Builder, Cursor, Description, Merger}; - use crate::trace::implementations::BatchContainer; - use crate::trace::cursor::MyTrait; + use crate::trace::implementations::{BatchContainer, BuilderInput}; + use crate::trace::cursor::IntoOwned; use super::{Layout, Update}; @@ -962,7 +949,7 @@ mod key_batch { } /// A builder for creating layers from unsorted update tuples. - pub struct OrdKeyBuilder { + pub struct OrdKeyBuilder { result: OrdKeyStorage, singleton: Option<(::Time, ::Diff)>, /// Counts the number of singleton optimizations we performed. @@ -970,9 +957,10 @@ mod key_batch { /// This number allows us to correctly gauge the total number of updates reflected in a batch, /// even though `updates.len()` may be much shorter than this amount. singletons: usize, + _marker: PhantomData, } - impl OrdKeyBuilder { + impl OrdKeyBuilder { /// Pushes a single update, which may set `self.singleton` rather than push. /// /// This operation is meant to be equivalent to `self.results.updates.push((time, diff))`. @@ -1000,9 +988,14 @@ mod key_batch { } } - impl Builder for OrdKeyBuilder { + impl Builder for OrdKeyBuilder + where + L: Layout, + CI: for<'a> BuilderInput::Time, Diff=::Diff>, + for<'a> CI::Key<'a>: PushInto, + { - type Input = ((::Key, ()), ::Time, ::Diff); + type Input = CI; type Time = ::Time; type Output = OrdKeyBatch; @@ -1016,38 +1009,25 @@ mod key_batch { }, singleton: None, singletons: 0, + _marker: PhantomData, } } #[inline] - fn push(&mut self, ((key, ()), time, diff): Self::Input) { - - // Perhaps this is a continuation of an already received key. - if self.result.keys.last().map(|k| k.equals(&key)).unwrap_or(false) { - self.push_update(time, diff); - } else { - // New key; complete representation of prior key. - self.result.keys_offs.push(self.result.updates.len()); - // Remove any pending singleton, and if it was set increment our count. - if self.singleton.take().is_some() { self.singletons += 1; } - self.push_update(time, diff); - self.result.keys.push(key); - } - } - - #[inline] - fn copy(&mut self, ((key, ()), time, diff): &Self::Input) { - - // Perhaps this is a continuation of an already received key. - if self.result.keys.last().map(|k| k.equals(key)).unwrap_or(false) { - self.push_update(time.clone(), diff.clone()); - } else { - // New key; complete representation of prior key. - self.result.keys_offs.push(self.result.updates.len()); - // Remove any pending singleton, and if it was set increment our count. - if self.singleton.take().is_some() { self.singletons += 1; } - self.push_update(time.clone(), diff.clone()); - self.result.keys.copy_push(key); + fn push(&mut self, chunk: &mut Self::Input) { + for item in chunk.drain() { + let (key, _val, time, diff) = CI::into_parts(item); + // Perhaps this is a continuation of an already received key. + if self.result.keys.last().map(|k| CI::key_eq(&key, k)).unwrap_or(false) { + self.push_update(time, diff); + } else { + // New key; complete representation of prior key. + self.result.keys_offs.push(self.result.updates.len()); + // Remove any pending singleton, and if it was set increment our count. + if self.singleton.take().is_some() { self.singletons += 1; } + self.push_update(time, diff); + key.push_into(&mut self.result.keys); + } } } diff --git a/src/trace/implementations/rhh.rs b/src/trace/implementations/rhh.rs index 64cac268b..b5cc93ffa 100644 --- a/src/trace/implementations/rhh.rs +++ b/src/trace/implementations/rhh.rs @@ -9,6 +9,7 @@ use std::rc::Rc; use std::cmp::Ordering; use abomonation_derive::Abomonation; +use timely::container::columnation::TimelyStack; use crate::Hashable; use crate::trace::implementations::merge_batcher::{MergeBatcher, VecMerger}; @@ -24,7 +25,7 @@ use self::val_batch::{RhhValBatch, RhhValBuilder}; pub type VecSpine = Spine< Rc>>, MergeBatcher, T>, - RcBuilder>>, + RcBuilder, Vec<((K,V),T,R)>>>, >; // /// A trace implementation for empty values using a spine of ordered lists. // pub type OrdKeySpine = Spine>>>; @@ -33,7 +34,7 @@ pub type VecSpine = Spine< pub type ColSpine = Spine< Rc>>, MergeBatcher, T>, - RcBuilder>>, + RcBuilder, TimelyStack<((K,V),T,R)>>>, >; // /// A trace implementation backed by columnar storage. // pub type ColKeySpine = Spine>>>; @@ -68,24 +69,33 @@ where ::Output: PartialOrd { impl HashOrdered for HashWrapper { } -impl Hashable for HashWrapper { +impl Hashable for HashWrapper { + type Output = T::Output; + fn hashed(&self) -> Self::Output { self.inner.hashed() } +} + +impl HashOrdered for &HashWrapper { } + +impl Hashable for &HashWrapper { type Output = T::Output; fn hashed(&self) -> Self::Output { self.inner.hashed() } } mod val_batch { - use std::borrow::Borrow; use std::convert::TryInto; use std::marker::PhantomData; use abomonation_derive::Abomonation; + use timely::container::PushInto; use timely::progress::{Antichain, frontier::AntichainRef}; use crate::hashable::Hashable; + use crate::trace::implementations::containers::Push; + use crate::trace::{Batch, BatchReader, Builder, Cursor, Description, Merger}; - use crate::trace::implementations::BatchContainer; - use crate::trace::cursor::MyTrait; + use crate::trace::implementations::{BatchContainer, BuilderInput}; + use crate::trace::cursor::IntoOwned; use super::{Layout, Update, HashOrdered}; @@ -113,7 +123,9 @@ mod val_batch { /// would most like to end up. The `BatchContainer` trait does not provide a `capacity()` method, /// otherwise we would just use that. pub key_capacity: usize, - pub divisor: usize, + /// A number large enough that when it divides any `u64` the result is at most `self.key_capacity`. + /// When that capacity is zero or one, this is set to zero instead. + pub divisor: u64, /// The number of present keys, distinct from `keys.len()` which contains pub key_count: usize, @@ -172,8 +184,8 @@ mod val_batch { /// If `offset` is specified, we will insert it at the appropriate location. If it is not specified, /// we leave `keys_offs` ready to receive it as the next `push`. This is so that builders that may /// not know the final offset at the moment of key insertion can prepare for receiving the offset. - fn insert_key(&mut self, key: &::Key, offset: Option) { - let desired = self.desired_location(key); + fn insert_key(&mut self, key: ::Key, offset: Option) { + let desired = self.desired_location(&key); // Were we to push the key now, it would be at `self.keys.len()`, so while that is wrong, // push additional blank entries in. while self.keys.len() < desired { @@ -185,7 +197,7 @@ mod val_batch { // Now we insert the key. Even if it is no longer the desired location because of contention. // If an offset has been supplied we insert it, and otherwise leave it for future determination. - self.keys.copy_push(key); + self.keys.push(key); if let Some(offset) = offset { self.keys_offs.push(offset); } @@ -194,8 +206,10 @@ mod val_batch { /// Indicates both the desired location and the hash signature of the key. fn desired_location(&self, key: &K) -> usize { - let hash: usize = key.hashed().into().try_into().unwrap(); - hash / self.divisor + if self.divisor == 0 { 0 } + else { + (key.hashed().into() / self.divisor).try_into().expect("divisor not large enough to force u64 into uisze") + } } /// Returns true if one should advance one's index in the search for `key`. @@ -216,11 +230,15 @@ mod val_batch { } } - // I hope this works out; meant to be 2^64 / self.key_capacity, so that dividing - // `signature` by this gives something in `[0, self.key_capacity)`. We could also - // do powers of two and just make this really easy. - fn divisor_for_capacity(capacity: usize) -> usize { - if capacity == 0 { 0 } + /// A value large enough that any `u64` divided by it is less than `capacity`. + /// + /// This is `2^64 / capacity`, except in the cases where `capacity` is zero or one. + /// In those cases, we'll return `0` to communicate the exception, for which we should + /// just return `0` when announcing a target location (and a zero capacity that we insert + /// into becomes a bug). + fn divisor_for_capacity(capacity: usize) -> u64 { + let capacity: u64 = capacity.try_into().expect("usize exceeds u64"); + if capacity == 0 || capacity == 1 { 0 } else { ((1 << 63) / capacity) << 1 } @@ -303,8 +321,6 @@ mod val_batch { /// description description: Description<::Time>, - /// Owned key for copying into. - key_owned: <::Key as ToOwned>::Owned, /// Local stash of updates, to use for consolidation. /// /// We could emulate a `ChangeBatch` here, with related compaction smarts. @@ -358,7 +374,6 @@ mod val_batch { key_cursor2: 0, result: storage, description, - key_owned: Default::default(), update_stash: Vec::new(), singletons: 0, } @@ -434,8 +449,7 @@ mod val_batch { // If we have pushed any values, copy the key as well. if self.result.vals.len() > init_vals { - source.keys.index(cursor).clone_onto(&mut self.key_owned); - self.result.insert_key(&self.key_owned, Some(self.result.vals.len())); + self.result.insert_key(source.keys.index(cursor).into_owned(), Some(self.result.vals.len())); } } /// Merge the next key in each of `source1` and `source2` into `self`, updating the appropriate cursors. @@ -455,8 +469,7 @@ mod val_batch { let (lower1, upper1) = source1.values_for_key(self.key_cursor1); let (lower2, upper2) = source2.values_for_key(self.key_cursor2); if let Some(off) = self.merge_vals((source1, lower1, upper1), (source2, lower2, upper2)) { - source1.keys.index(self.key_cursor1).clone_onto(&mut self.key_owned); - self.result.insert_key(&self.key_owned, Some(off)); + self.result.insert_key(source1.keys.index(self.key_cursor1).into_owned(), Some(off)); } // Increment cursors in either case; the keys are merged. self.key_cursor1 += 1; @@ -561,7 +574,7 @@ mod val_batch { if !self.update_stash.is_empty() { // If there is a single element, equal to a just-prior recorded update, // we push nothing and report an unincremented offset to encode this case. - if self.update_stash.len() == 1 && self.result.updates.last().map(|l| l.equals(self.update_stash.last().unwrap())).unwrap_or(false) { + if self.update_stash.len() == 1 && self.result.updates.last().map(|l| l.eq(IntoOwned::borrow_as(self.update_stash.last().unwrap()))).unwrap_or(false) { // Just clear out update_stash, as we won't drain it here. self.update_stash.clear(); self.singletons += 1; @@ -680,7 +693,7 @@ mod val_batch { } /// A builder for creating layers from unsorted update tuples. - pub struct RhhValBuilder + pub struct RhhValBuilder where ::Key: Default + HashOrdered, { @@ -691,9 +704,10 @@ mod val_batch { /// This number allows us to correctly gauge the total number of updates reflected in a batch, /// even though `updates.len()` may be much shorter than this amount. singletons: usize, + _marker: PhantomData, } - impl RhhValBuilder + impl RhhValBuilder where ::Key: Default + HashOrdered, { @@ -724,12 +738,14 @@ mod val_batch { } } - impl Builder for RhhValBuilder + impl Builder for RhhValBuilder where ::Key: Default + HashOrdered, // RhhValBatch: Batch::Key, Val=::Val, Time=::Time, Diff=::Diff>, + CI: for<'a> BuilderInput = ::Key, Time=::Time, Diff=::Diff>, + for<'a> CI::Val<'a>: PushInto, { - type Input = ((::Key, ::Val), ::Time, ::Diff); + type Input = CI; type Time = ::Time; type Output = RhhValBatch; @@ -757,64 +773,36 @@ mod val_batch { }, singleton: None, singletons: 0, + _marker: PhantomData, } } #[inline] - fn push(&mut self, ((key, val), time, diff): Self::Input) { - - // Perhaps this is a continuation of an already received key. - if self.result.keys.last().map(|k| k.equals(&key)).unwrap_or(false) { - // Perhaps this is a continuation of an already received value. - if self.result.vals.last().map(|v| v.equals(&val)).unwrap_or(false) { - self.push_update(time, diff); + fn push(&mut self, chunk: &mut Self::Input) { + for item in chunk.drain() { + let (key, val, time, diff) = CI::into_parts(item); + // Perhaps this is a continuation of an already received key. + if self.result.keys.last().map(|k| CI::key_eq(&key, k)).unwrap_or(false) { + // Perhaps this is a continuation of an already received value. + if self.result.vals.last().map(|v| CI::val_eq(&val, v)).unwrap_or(false) { + self.push_update(time, diff); + } else { + // New value; complete representation of prior value. + self.result.vals_offs.push(self.result.updates.len()); + if self.singleton.take().is_some() { self.singletons += 1; } + self.push_update(time, diff); + val.push_into(&mut self.result.vals); + } } else { - // New value; complete representation of prior value. + // New key; complete representation of prior key. self.result.vals_offs.push(self.result.updates.len()); if self.singleton.take().is_some() { self.singletons += 1; } + self.result.keys_offs.push(self.result.vals.len()); self.push_update(time, diff); - self.result.vals.push(val); + val.push_into(&mut self.result.vals); + // Insert the key, but with no specified offset. + self.result.insert_key(key, None); } - } else { - // New key; complete representation of prior key. - self.result.vals_offs.push(self.result.updates.len()); - if self.singleton.take().is_some() { self.singletons += 1; } - self.result.keys_offs.push(self.result.vals.len()); - self.push_update(time, diff); - self.result.vals.push(val); - // Insert the key, but with no specified offset. - self.result.insert_key(key.borrow(), None); - } - } - - #[inline] - fn copy(&mut self, ((key, val), time, diff): &Self::Input) { - - // Perhaps this is a continuation of an already received key. - if self.result.keys.last().map(|k| k.equals(key)).unwrap_or(false) { - // Perhaps this is a continuation of an already received value. - if self.result.vals.last().map(|v| v.equals(val)).unwrap_or(false) { - // TODO: here we could look for repetition, and not push the update in that case. - // More logic (and state) would be required to correctly wrangle this. - self.push_update(time.clone(), diff.clone()); - } else { - // New value; complete representation of prior value. - self.result.vals_offs.push(self.result.updates.len()); - // Remove any pending singleton, and if it was set increment our count. - if self.singleton.take().is_some() { self.singletons += 1; } - self.push_update(time.clone(), diff.clone()); - self.result.vals.copy_push(val); - } - } else { - // New key; complete representation of prior key. - self.result.vals_offs.push(self.result.updates.len()); - // Remove any pending singleton, and if it was set increment our count. - if self.singleton.take().is_some() { self.singletons += 1; } - self.result.keys_offs.push(self.result.vals.len()); - self.push_update(time.clone(), diff.clone()); - self.result.vals.copy_push(val); - // Insert the key, but with no specified offset. - self.result.insert_key(key, None); } } diff --git a/src/trace/mod.rs b/src/trace/mod.rs index afee7c22e..f73730a23 100644 --- a/src/trace/mod.rs +++ b/src/trace/mod.rs @@ -19,7 +19,7 @@ use timely::progress::{Antichain, frontier::AntichainRef}; use timely::progress::Timestamp; use crate::logging::DifferentialEvent; -use crate::trace::cursor::MyTrait; +use crate::trace::cursor::IntoOwned; use crate::difference::Semigroup; use crate::lattice::Lattice; // use ::difference::Semigroup; @@ -52,11 +52,11 @@ pub type ExertionLogic = std::sync::Arc Fn(&'a [(usize, usize, usize pub trait TraceReader { /// Key by which updates are indexed. - type Key<'a>: Copy + Clone + MyTrait<'a, Owned = Self::KeyOwned>; + type Key<'a>: Copy + Clone + Ord + IntoOwned<'a, Owned = Self::KeyOwned>; /// Owned version of the above. type KeyOwned: Ord + Clone; /// Values associated with keys. - type Val<'a>: Copy + Clone + MyTrait<'a>; + type Val<'a>: Copy + Clone + IntoOwned<'a>; /// Timestamps associated with updates type Time: Timestamp + Lattice + Ord + Clone; /// Associated update. @@ -258,11 +258,11 @@ where Self: ::std::marker::Sized, { /// Key by which updates are indexed. - type Key<'a>: Copy + Clone + MyTrait<'a, Owned = Self::KeyOwned>; + type Key<'a>: Copy + Clone + Ord + IntoOwned<'a, Owned = Self::KeyOwned>; /// Owned version of the above. type KeyOwned: Ord + Clone; /// Values associated with keys. - type Val<'a>: Copy + Clone + MyTrait<'a>; + type Val<'a>: Copy + Clone + IntoOwned<'a>; /// Timestamps associated with updates type Time: Timestamp + Lattice + Ord + Clone; /// Associated update. @@ -336,15 +336,10 @@ pub trait Builder: Sized { /// /// They represent respectively the number of distinct `key`, `(key, val)`, and total updates. fn with_capacity(keys: usize, vals: usize, upds: usize) -> Self; - /// Adds an element to the batch. + /// Adds a chunk of elements to the batch. /// - /// The default implementation uses `self.copy` with references to the owned arguments. - /// One should override it if the builder can take advantage of owned arguments. - fn push(&mut self, element: Self::Input) { - self.copy(&element); - } - /// Adds an element to the batch. - fn copy(&mut self, element: &Self::Input); + /// Adds all elements from `chunk` to the builder and leaves `chunk` in an undefined state. + fn push(&mut self, chunk: &mut Self::Input); /// Completes building and returns the batch. fn done(self, lower: Antichain, upper: Antichain, since: Antichain) -> Self::Output; } @@ -454,8 +449,7 @@ pub mod rc_blanket_impls { type Time = B::Time; type Output = Rc; fn with_capacity(keys: usize, vals: usize, upds: usize) -> Self { RcBuilder { builder: B::with_capacity(keys, vals, upds) } } - fn push(&mut self, element: Self::Input) { self.builder.push(element) } - fn copy(&mut self, element: &Self::Input) { self.builder.copy(element) } + fn push(&mut self, input: &mut Self::Input) { self.builder.push(input) } fn done(self, lower: Antichain, upper: Antichain, since: Antichain) -> Rc { Rc::new(self.builder.done(lower, upper, since)) } } @@ -561,8 +555,7 @@ pub mod abomonated_blanket_impls { type Time = B::Time; type Output = Abomonated>; fn with_capacity(keys: usize, vals: usize, upds: usize) -> Self { AbomonatedBuilder { builder: B::with_capacity(keys, vals, upds) } } - fn push(&mut self, element: Self::Input) { self.builder.push(element) } - fn copy(&mut self, element: &Self::Input) { self.builder.copy(element) } + fn push(&mut self, input: &mut Self::Input) { self.builder.push(input) } fn done(self, lower: Antichain, upper: Antichain, since: Antichain) -> Self::Output { let batch = self.builder.done(lower, upper, since); let mut bytes = Vec::with_capacity(measure(&batch));