From 869ec47883e89de89d92efcfc99744c488153d67 Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Wed, 8 May 2024 13:35:12 -0400 Subject: [PATCH 1/8] Correct defects in getting started (#484) --- mdbook/src/chapter_0/chapter_0_0.md | 4 ++-- mdbook/src/chapter_0/chapter_0_1.md | 4 ++-- mdbook/src/chapter_0/chapter_0_3.md | 2 +- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/mdbook/src/chapter_0/chapter_0_0.md b/mdbook/src/chapter_0/chapter_0_0.md index f72ef11d3..406bb5089 100644 --- a/mdbook/src/chapter_0/chapter_0_0.md +++ b/mdbook/src/chapter_0/chapter_0_0.md @@ -25,8 +25,8 @@ version = "0.1.0" authors = ["Your Name "] [dependencies] -timely = "0.11.1" -differential-dataflow = "0.11.0" +timely = "0.12.0" +differential-dataflow = "0.12.0" ``` You should only need to add those last two lines there, which bring in dependencies on both [timely dataflow](https://github.com/TimelyDataflow/timely-dataflow) and [differential dataflow](https://github.com/TimelyDataflow/differential-dataflow). We will be using both of those. diff --git a/mdbook/src/chapter_0/chapter_0_1.md b/mdbook/src/chapter_0/chapter_0_1.md index 5e69289a5..e031fb7aa 100644 --- a/mdbook/src/chapter_0/chapter_0_1.md +++ b/mdbook/src/chapter_0/chapter_0_1.md @@ -34,8 +34,8 @@ fn main() { .inspect(|x| println!("{:?}", x)); }); - // Set an arbitrary size for our organization. - let size = 100; + // Set a size for our organization from the input. + let size = std::env::args().nth(1).and_then(|s| s.parse::().ok()).unwrap_or(10); // Load input (a binary tree). input.advance_to(0); diff --git a/mdbook/src/chapter_0/chapter_0_3.md b/mdbook/src/chapter_0/chapter_0_3.md index d47d6f395..0f16ab4f1 100644 --- a/mdbook/src/chapter_0/chapter_0_3.md +++ b/mdbook/src/chapter_0/chapter_0_3.md @@ -107,7 +107,7 @@ We can then use this probe to limit the introduction of new data, by waiting for This starts to print out a mess of data, indicating not only how long it takes to start up the computation, but also how long each individual round of updates takes. ``` ignore - Echidnatron% cargo run --release --example hello 10000000 + Echidnatron% cargo run --release -- 10000000 Finished release [optimized + debuginfo] target(s) in 0.06s Running `target/release/examples/hello 10000000` 4.092895186s data loaded From b215b9a76eaa1f1e2ef0945fce1bf5cdb822bc40 Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Mon, 13 May 2024 11:45:58 -0400 Subject: [PATCH 2/8] Remove uses of time::Instant from core DD (#485) --- src/algorithms/graphs/sequential.rs | 2 -- src/capture.rs | 43 ++--------------------------- 2 files changed, 2 insertions(+), 43 deletions(-) diff --git a/src/algorithms/graphs/sequential.rs b/src/algorithms/graphs/sequential.rs index 319c02def..f5a730638 100644 --- a/src/algorithms/graphs/sequential.rs +++ b/src/algorithms/graphs/sequential.rs @@ -52,8 +52,6 @@ where F: Fn(&N, &[(&V, isize)])->V+'static { - let _timer = ::std::time::Instant::now(); - // start iteration with None messages for all. state .map(|(node, _state)| (node, None)) diff --git a/src/capture.rs b/src/capture.rs index 33f706b85..c08f3c05b 100644 --- a/src/capture.rs +++ b/src/capture.rs @@ -42,47 +42,6 @@ pub struct Progress { pub counts: Vec<(T, usize)>, } -/// An iterator that yields with a `None` every so often. -pub struct YieldingIter { - /// When set, a time after which we should return `None`. - start: Option, - after: std::time::Duration, - iter: I, -} - -impl YieldingIter { - /// Construct a yielding iterator from an inter-yield duration. - pub fn new_from(iter: I, yield_after: std::time::Duration) -> Self { - Self { - start: None, - after: yield_after, - iter, - } - } -} - -impl Iterator for YieldingIter { - type Item = I::Item; - fn next(&mut self) -> Option { - if self.start.is_none() { - self.start = Some(std::time::Instant::now()); - } - let start = self.start.as_ref().unwrap(); - if start.elapsed() > self.after { - self.start = None; - None - } else { - match self.iter.next() { - Some(x) => Some(x), - None => { - self.start = None; - None - } - } - } - } -} - /// A simple sink for byte slices. pub trait Writer { /// Returns an amount of time to wait before retrying, or `None` for success. @@ -785,6 +744,8 @@ pub mod sink { // { // super::source::build(scope, |activator| { // let source = KafkaSource::new(addr, topic, group, activator); +// // An iterator combinator that yields every "duration" even if more items exist. +// // The implementation of such an iterator exists in the git history, or can be rewritten easily. // super::YieldingIter::new_from(Iter::::new_from(source), std::time::Duration::from_millis(10)) // }) // } From 7063c6fd592fc4bb126ade1feb5ba3c7e060583f Mon Sep 17 00:00:00 2001 From: Moritz Hoffmann Date: Wed, 15 May 2024 14:30:26 -0400 Subject: [PATCH 3/8] Use consolidating builder in consolidate_stream (#488) This switches from consolidating each input individually to consolidating at the output. The benefits are that it can yield better consolidation performance because it can consolidate across input containers instead of only being able to consolidate each individual input container. Signed-off-by: Moritz Hoffmann --- src/operators/consolidate.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/operators/consolidate.rs b/src/operators/consolidate.rs index ac0ef7f8f..2235c4df9 100644 --- a/src/operators/consolidate.rs +++ b/src/operators/consolidate.rs @@ -9,6 +9,7 @@ use timely::dataflow::Scope; use crate::{Collection, ExchangeData, Hashable}; +use crate::consolidation::ConsolidatingContainerBuilder; use crate::difference::Semigroup; use crate::Data; @@ -92,14 +93,13 @@ where use crate::collection::AsCollection; self.inner - .unary(Pipeline, "ConsolidateStream", |_cap, _info| { + .unary::, _, _, _>(Pipeline, "ConsolidateStream", |_cap, _info| { let mut vector = Vec::new(); move |input, output| { input.for_each(|time, data| { data.swap(&mut vector); - crate::consolidation::consolidate_updates(&mut vector); - output.session(&time).give_container(&mut vector); + output.session_with_builder(&time).give_container(&mut vector); }) } }) From b0c8f2adea5a8b9a39ada50525f760136ec13000 Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Fri, 17 May 2024 08:44:10 -0400 Subject: [PATCH 4/8] Use usize and u64 better (#486) --- src/trace/implementations/rhh.rs | 24 ++++++++++++++++-------- 1 file changed, 16 insertions(+), 8 deletions(-) diff --git a/src/trace/implementations/rhh.rs b/src/trace/implementations/rhh.rs index 64cac268b..088055507 100644 --- a/src/trace/implementations/rhh.rs +++ b/src/trace/implementations/rhh.rs @@ -113,7 +113,9 @@ mod val_batch { /// would most like to end up. The `BatchContainer` trait does not provide a `capacity()` method, /// otherwise we would just use that. pub key_capacity: usize, - pub divisor: usize, + /// A number large enough that when it divides any `u64` the result is at most `self.key_capacity`. + /// When that capacity is zero or one, this is set to zero instead. + pub divisor: u64, /// The number of present keys, distinct from `keys.len()` which contains pub key_count: usize, @@ -194,8 +196,10 @@ mod val_batch { /// Indicates both the desired location and the hash signature of the key. fn desired_location(&self, key: &K) -> usize { - let hash: usize = key.hashed().into().try_into().unwrap(); - hash / self.divisor + if self.divisor == 0 { 0 } + else { + (key.hashed().into() / self.divisor).try_into().expect("divisor not large enough to force u64 into uisze") + } } /// Returns true if one should advance one's index in the search for `key`. @@ -216,11 +220,15 @@ mod val_batch { } } - // I hope this works out; meant to be 2^64 / self.key_capacity, so that dividing - // `signature` by this gives something in `[0, self.key_capacity)`. We could also - // do powers of two and just make this really easy. - fn divisor_for_capacity(capacity: usize) -> usize { - if capacity == 0 { 0 } + /// A value large enough that any `u64` divided by it is less than `capacity`. + /// + /// This is `2^64 / capacity`, except in the cases where `capacity` is zero or one. + /// In those cases, we'll return `0` to communicate the exception, for which we should + /// just return `0` when announcing a target location (and a zero capacity that we insert + /// into becomes a bug). + fn divisor_for_capacity(capacity: usize) -> u64 { + let capacity: u64 = capacity.try_into().expect("usize exceeds u64"); + if capacity == 0 || capacity == 1 { 0 } else { ((1 << 63) / capacity) << 1 } From 9171e73d551109be7afcc5914cab31f06fbe60b8 Mon Sep 17 00:00:00 2001 From: Moritz Hoffmann Date: Thu, 23 May 2024 09:48:34 -0400 Subject: [PATCH 5/8] Semigroup generic over Rhs (#493) Adds support for implementing semigroup for types that can plus_equals from another type, for example vectors and slices. Fixes a bug where the semigroup implementation for `Vec` would add overhanging elements twice, i.e., `[1,2] + [1,1,1]` would result in `[2,3,2`] instead of `[2,3,1]`. This leaves one quirk where `is_zero` does not depend on `Rhs`, so Rust cannot figure out which implementation to use when two are in scope, forcing the caller to phrase it as `::is_zero(&value)`. This only applies if `R` implements `Semigroup + Semigroup`. Signed-off-by: Moritz Hoffmann --- src/difference.rs | 37 +++++++++++++++++++++++++++++-------- 1 file changed, 29 insertions(+), 8 deletions(-) diff --git a/src/difference.rs b/src/difference.rs index fc6c8680b..46c7ba3eb 100644 --- a/src/difference.rs +++ b/src/difference.rs @@ -22,9 +22,9 @@ pub use self::Abelian as Diff; /// There is a light presumption of commutativity here, in that while we will largely perform addition /// in order of timestamps, for many types of timestamps there is no total order and consequently no /// obvious order to respect. Non-commutative semigroups should be used with care. -pub trait Semigroup : ::std::marker::Sized + Data + Clone { +pub trait Semigroup : Data + Clone { /// The method of `std::ops::AddAssign`, for types that do not implement `AddAssign`. - fn plus_equals(&mut self, rhs: &Self); + fn plus_equals(&mut self, rhs: &Rhs); /// Returns true if the element is the additive identity. /// /// This is primarily used by differential dataflow to know when it is safe to delete an update. @@ -233,22 +233,43 @@ mod vector { impl Semigroup for Vec { fn plus_equals(&mut self, rhs: &Self) { - // Ensure sufficient length to receive addition. + self.plus_equals(&rhs[..]) + } + fn is_zero(&self) -> bool { + self.iter().all(|x| x.is_zero()) + } + } + + impl Semigroup<[R]> for Vec { + fn plus_equals(&mut self, rhs: &[R]) { + // Apply all updates to existing elements + for (index, update) in rhs.iter().enumerate().take(self.len()) { + self[index].plus_equals(update); + } + + // Clone leftover elements from `rhs` while self.len() < rhs.len() { let element = &rhs[self.len()]; self.push(element.clone()); } - - // As other is not longer, apply updates without tests. - for (index, update) in rhs.iter().enumerate() { - self[index].plus_equals(update); - } } fn is_zero(&self) -> bool { self.iter().all(|x| x.is_zero()) } } + #[cfg(test)] + mod tests { + use crate::difference::Semigroup; + + #[test] + fn test_semigroup_vec() { + let mut a = vec![1,2,3]; + a.plus_equals([1,1,1,1].as_slice()); + assert_eq!(vec![2,3,4,1], a); + } + } + impl Monoid for Vec { fn zero() -> Self { Self::new() From 9731d7fc957f22393291ffd38d23666af889cd2b Mon Sep 17 00:00:00 2001 From: Moritz Hoffmann Date: Thu, 23 May 2024 15:13:49 -0400 Subject: [PATCH 6/8] Cleanup, fix rhh example (#492) Signed-off-by: Moritz Hoffmann --- examples/spines.rs | 21 +++-- src/trace/implementations/mod.rs | 132 +------------------------------ src/trace/implementations/rhh.rs | 9 ++- 3 files changed, 19 insertions(+), 143 deletions(-) diff --git a/examples/spines.rs b/examples/spines.rs index 4ae540e5d..b0f5eddd1 100644 --- a/examples/spines.rs +++ b/examples/spines.rs @@ -41,13 +41,13 @@ fn main() { keys.join_core(&data, |_k, &(), &()| Option::<()>::None) .probe_with(&mut probe); }, - // "rhh" => { - // use differential_dataflow::trace::implementations::rhh::{HashWrapper, VecSpine}; - // let data = data.map(|x| HashWrapper { inner: x }).arrange::>(); - // let keys = keys.map(|x| HashWrapper { inner: x }).arrange::>(); - // keys.join_core(&data, |_k, &(), &()| Option::<()>::None) - // .probe_with(&mut probe); - // }, + "rhh" => { + use differential_dataflow::trace::implementations::rhh::{HashWrapper, VecSpine}; + let data = data.map(|x| HashWrapper { inner: x }).arrange::>(); + let keys = keys.map(|x| HashWrapper { inner: x }).arrange::>(); + keys.join_core(&data, |_k, &(), &()| Option::<()>::None) + .probe_with(&mut probe); + }, "slc" => { use differential_dataflow::trace::implementations::ord_neu::PreferredSpine; @@ -61,10 +61,7 @@ fn main() { .arrange::>() .reduce_abelian::<_, _, _, PreferredSpine<[u8],(),_,_>>("distinct", |_| (), |_,_,output| output.push(((), 1))); - keys.join_core(&data, |k,_v1,_v2| { - println!("{:?}", k.text); - Option::<((),isize,isize)>::None - }) + keys.join_core(&data, |_k, &(), &()| Option::<()>::None) .probe_with(&mut probe); }, _ => { @@ -122,4 +119,4 @@ fn main() { println!("{:?}\tshut down", timer2.elapsed()); -} \ No newline at end of file +} diff --git a/src/trace/implementations/mod.rs b/src/trace/implementations/mod.rs index cd0b8fd9a..128ec0bf1 100644 --- a/src/trace/implementations/mod.rs +++ b/src/trace/implementations/mod.rs @@ -149,7 +149,7 @@ impl PreferredContainer for T { } impl PreferredContainer for [T] { - type Container = SliceContainer2; + type Container = SliceContainer; } /// An update and layout description based on preferred containers. @@ -320,7 +320,7 @@ impl BatchContainer for OffsetList { } } -pub use self::containers::{BatchContainer, SliceContainer, SliceContainer2}; +pub use self::containers::{BatchContainer, SliceContainer}; /// Containers for data that resemble `Vec`, with leaner implementations. pub mod containers { @@ -559,132 +559,4 @@ pub mod containers { } } } - - /// A container that accepts slices `[B::Item]`. - pub struct SliceContainer2 { - text: String, - /// Offsets that bound each contained slice. - /// - /// The length will be one greater than the number of contained slices, - /// starting with zero and ending with `self.inner.len()`. - offsets: Vec, - /// An inner container for sequences of `B` that dereferences to a slice. - inner: Vec, - } - - /// Welcome to GATs! - pub struct Greetings<'a, B> { - /// Text that decorates the data. - pub text: Option<&'a str>, - /// The data itself. - pub slice: &'a [B], - } - - impl<'a, B> Copy for Greetings<'a, B> { } - impl<'a, B> Clone for Greetings<'a, B> { - fn clone(&self) -> Self { *self } - } - - use std::cmp::Ordering; - impl<'a, 'b, B: Ord> PartialEq> for Greetings<'b, B> { - fn eq(&self, other: &Greetings<'a, B>) -> bool { - self.slice.eq(other.slice) - } - } - impl<'a, B: Ord> Eq for Greetings<'a, B> { } - impl<'a, 'b, B: Ord> PartialOrd> for Greetings<'b, B> { - fn partial_cmp(&self, other: &Greetings<'a, B>) -> Option { - self.slice.partial_cmp(other.slice) - } - } - impl<'a, B: Ord> Ord for Greetings<'a, B> { - fn cmp(&self, other: &Self) -> Ordering { - self.partial_cmp(other).unwrap() - } - } - - impl<'a, B: Ord + Clone> MyTrait<'a> for Greetings<'a, B> { - type Owned = Vec; - fn into_owned(self) -> Self::Owned { self.slice.to_vec() } - fn clone_onto(&self, other: &mut Self::Owned) { - self.slice.clone_into(other); - } - fn compare(&self, other: &Self::Owned) -> std::cmp::Ordering { - self.slice.cmp(&other[..]) - } - fn borrow_as(other: &'a Self::Owned) -> Self { - Self { - text: None, - slice: &other[..], - } - } - } - - impl BatchContainer for SliceContainer2 - where - B: Ord + Clone + Sized + 'static, - { - type PushItem = Vec; - type ReadItem<'a> = Greetings<'a, B>; - fn push(&mut self, item: Vec) { - for x in item.into_iter() { - self.inner.push(x); - } - self.offsets.push(self.inner.len()); - } - fn copy_push(&mut self, item: &Vec) { - self.copy(<_ as MyTrait>::borrow_as(item)); - } - fn copy(&mut self, item: Self::ReadItem<'_>) { - for x in item.slice.iter() { - self.inner.copy(x); - } - self.offsets.push(self.inner.len()); - } - fn copy_range(&mut self, other: &Self, start: usize, end: usize) { - for index in start .. end { - self.copy(other.index(index)); - } - } - fn with_capacity(size: usize) -> Self { - let mut offsets = Vec::with_capacity(size + 1); - offsets.push(0); - Self { - text: format!("Hello!"), - offsets, - inner: Vec::with_capacity(size), - } - } - fn merge_capacity(cont1: &Self, cont2: &Self) -> Self { - let mut offsets = Vec::with_capacity(cont1.inner.len() + cont2.inner.len() + 1); - offsets.push(0); - Self { - text: format!("Hello!"), - offsets, - inner: Vec::with_capacity(cont1.inner.len() + cont2.inner.len()), - } - } - fn index(&self, index: usize) -> Self::ReadItem<'_> { - let lower = self.offsets[index]; - let upper = self.offsets[index+1]; - Greetings { - text: Some(&self.text), - slice: &self.inner[lower .. upper], - } - } - fn len(&self) -> usize { - self.offsets.len() - 1 - } - } - - /// Default implementation introduces a first offset. - impl Default for SliceContainer2 { - fn default() -> Self { - Self { - text: format!("Hello!"), - offsets: vec![0], - inner: Default::default(), - } - } - } } diff --git a/src/trace/implementations/rhh.rs b/src/trace/implementations/rhh.rs index 088055507..8389b62a6 100644 --- a/src/trace/implementations/rhh.rs +++ b/src/trace/implementations/rhh.rs @@ -68,7 +68,14 @@ where ::Output: PartialOrd { impl HashOrdered for HashWrapper { } -impl Hashable for HashWrapper { +impl Hashable for HashWrapper { + type Output = T::Output; + fn hashed(&self) -> Self::Output { self.inner.hashed() } +} + +impl HashOrdered for &HashWrapper { } + +impl Hashable for &HashWrapper { type Output = T::Output; fn hashed(&self) -> Self::Output { self.inner.hashed() } } From 85b126c61a2b41e61268e09e39e5bb7dfce3cfc0 Mon Sep 17 00:00:00 2001 From: Moritz Hoffmann Date: Thu, 23 May 2024 16:17:04 -0400 Subject: [PATCH 7/8] Pass data from batcher to builder by chunk (#491) * Pass data from batcher to builder by chain Currently, the data shared between the batcher and the builder are individual tuples, either moved or by reference. This limits flexibility around what kind of data can be provided to a builder, i.e., it has to be in the form of tuples, either owned or a reference to a fully-formed one. This works fine for vector-like structures, but will not work for containers that like to arrange their data differently. This change alters the contract between the batcher and the builder to provide chunks instead of individual items (it does not require _chains_.) The data in the chunks must be sorted, and subsequent calls must maintain order, too. The input containers need to implement `BuilderInput`, a type that describes how a container's items can be broken into key, value, time, and diff, where key and value can be references or owned data, as long as they can be pushed into the underlying key and value containers. The change has some quirks around comparing keys to keys already in the builder. The types can differ, and the best solution I could come up with was to add two explicit comparison functions to `BuilderInput` to compare keys and values. While it is not elegant, it allows us to move forward with this change, without adding nightmare-inducing trait bounds all-over. Signed-off-by: Moritz Hoffmann * Address feedback Signed-off-by: Moritz Hoffmann --------- Signed-off-by: Moritz Hoffmann --- src/operators/arrange/arrangement.rs | 8 +- src/operators/arrange/upsert.rs | 6 +- src/operators/reduce.rs | 23 ++- src/trace/implementations/merge_batcher.rs | 6 +- .../implementations/merge_batcher_col.rs | 9 +- src/trace/implementations/mod.rs | 169 +++++++++++++++++- src/trace/implementations/ord_neu.rs | 161 +++++++---------- src/trace/implementations/rhh.rs | 89 ++++----- src/trace/mod.rs | 17 +- 9 files changed, 298 insertions(+), 190 deletions(-) diff --git a/src/operators/arrange/arrangement.rs b/src/operators/arrange/arrangement.rs index 13bbb915b..d8fb9720d 100644 --- a/src/operators/arrange/arrangement.rs +++ b/src/operators/arrange/arrangement.rs @@ -75,6 +75,8 @@ where use ::timely::dataflow::scopes::Child; use ::timely::progress::timestamp::Refines; +use timely::Container; +use timely::container::PushInto; impl Arranged where @@ -292,7 +294,8 @@ where F: Fn(T2::Val<'_>) -> V + 'static, T2::Diff: Abelian, T2::Batch: Batch, - T2::Builder: Builder, + ::Input: Container, + ((T1::KeyOwned, V), T2::Time, T2::Diff): PushInto<::Input>, L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(V, T2::Diff)>)+'static, { self.reduce_core::<_,V,F,T2>(name, from, move |key, input, output, change| { @@ -311,7 +314,8 @@ where V: Data, F: Fn(T2::Val<'_>) -> V + 'static, T2::Batch: Batch, - T2::Builder: Builder, + ::Input: Container, + ((T1::KeyOwned, V), T2::Time, T2::Diff): PushInto<::Input>, L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(V, T2::Diff)>, &mut Vec<(V, T2::Diff)>)+'static, { use crate::operators::reduce::reduce_trace; diff --git a/src/operators/arrange/upsert.rs b/src/operators/arrange/upsert.rs index 758ec8df3..65cdc0b4b 100644 --- a/src/operators/arrange/upsert.rs +++ b/src/operators/arrange/upsert.rs @@ -138,7 +138,7 @@ where F: Fn(Tr::Val<'_>) -> V + 'static, Tr::Time: TotalOrder+ExchangeData, Tr::Batch: Batch, - Tr::Builder: Builder, + Tr::Builder: Builder>, { let mut reader: Option> = None; @@ -282,9 +282,7 @@ where } // Must insert updates in (key, val, time) order. updates.sort(); - for update in updates.drain(..) { - builder.push(update); - } + builder.push(&mut updates); } let batch = builder.done(prev_frontier.clone(), upper.clone(), Antichain::from_elem(G::Timestamp::minimum())); prev_frontier.clone_from(&upper); diff --git a/src/operators/reduce.rs b/src/operators/reduce.rs index dda549bca..8711e66a0 100644 --- a/src/operators/reduce.rs +++ b/src/operators/reduce.rs @@ -5,6 +5,8 @@ //! to the key and the list of values. //! The function is expected to populate a list of output values. +use timely::Container; +use timely::container::PushInto; use crate::hashable::Hashable; use crate::{Data, ExchangeData, Collection}; use crate::difference::{Semigroup, Abelian}; @@ -252,7 +254,7 @@ pub trait ReduceCore where F: Fn(T2::Val<'_>) -> V + 'static, T2::Diff: Abelian, T2::Batch: Batch, - T2::Builder: Builder, + T2::Builder: Builder>, L: FnMut(&K, &[(&V, R)], &mut Vec<(V, T2::Diff)>)+'static, { self.reduce_core::<_,_,T2>(name, from, move |key, input, output, change| { @@ -274,7 +276,7 @@ pub trait ReduceCore where T2: for<'a> Trace=&'a K, Time=G::Timestamp>+'static, F: Fn(T2::Val<'_>) -> V + 'static, T2::Batch: Batch, - T2::Builder: Builder, + T2::Builder: Builder>, L: FnMut(&K, &[(&V, R)], &mut Vec<(V,T2::Diff)>, &mut Vec<(V, T2::Diff)>)+'static, ; } @@ -293,7 +295,7 @@ where F: Fn(T2::Val<'_>) -> V + 'static, T2: for<'a> Trace=&'a K, Time=G::Timestamp>+'static, T2::Batch: Batch, - T2::Builder: Builder, + T2::Builder: Builder>, L: FnMut(&K, &[(&V, R)], &mut Vec<(V,T2::Diff)>, &mut Vec<(V, T2::Diff)>)+'static, { self.arrange_by_key_named(&format!("Arrange: {}", name)) @@ -312,7 +314,8 @@ where V: Data, F: Fn(T2::Val<'_>) -> V + 'static, T2::Batch: Batch, - T2::Builder: Builder, + ::Input: Container, + ((T1::KeyOwned, V), T2::Time, T2::Diff): PushInto<::Input>, L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(V,T2::Diff)>, &mut Vec<(V, T2::Diff)>)+'static, { let mut result_trace = None; @@ -454,6 +457,8 @@ where builders.push(T2::Builder::new()); } + let mut buffer = Default::default(); + // cursors for navigating input and output traces. let (mut source_cursor, source_storage): (T1::Cursor, _) = source_trace.cursor_through(lower_limit.borrow()).expect("failed to acquire source cursor"); let source_storage = &source_storage; @@ -531,7 +536,9 @@ where for index in 0 .. buffers.len() { buffers[index].1.sort_by(|x,y| x.0.cmp(&y.0)); for (val, time, diff) in buffers[index].1.drain(..) { - builders[index].push(((key.into_owned(), val), time, diff)); + ((key.into_owned(), val), time, diff).push_into(&mut buffer); + builders[index].push(&mut buffer); + buffer.clear(); } } } @@ -648,7 +655,7 @@ where where F: Fn(C2::Val<'_>) -> V, L: FnMut( - C1::Key<'a>, + C1::Key<'a>, &[(C1::Val<'a>, C1::Diff)], &mut Vec<(V, C2::Diff)>, &mut Vec<(V, C2::Diff)>, @@ -728,7 +735,7 @@ mod history_replay { where F: Fn(C2::Val<'_>) -> V, L: FnMut( - C1::Key<'a>, + C1::Key<'a>, &[(C1::Val<'a>, C1::Diff)], &mut Vec<(V, C2::Diff)>, &mut Vec<(V, C2::Diff)>, @@ -1020,7 +1027,7 @@ mod history_replay { new_interesting.push(next_time.clone()); debug_assert!(outputs.iter().any(|(t,_)| t.less_equal(&next_time))) } - + // Update `meet` to track the meet of each source of times. meet = None;//T::maximum(); diff --git a/src/trace/implementations/merge_batcher.rs b/src/trace/implementations/merge_batcher.rs index 45e2a60f8..bb13cf650 100644 --- a/src/trace/implementations/merge_batcher.rs +++ b/src/trace/implementations/merge_batcher.rs @@ -282,7 +282,7 @@ where type Time = T; type Input = Vec<((K, V), T, R)>; type Chunk = Vec<((K, V), T, R)>; - type Output = ((K, V), T, R); + type Output = Vec<((K, V), T, R)>; fn accept(&mut self, container: RefOrMut, stash: &mut Vec) -> Vec { // Ensure `self.pending` has the desired capacity. We should never have a larger capacity @@ -497,8 +497,8 @@ where } let mut builder = B::with_capacity(keys, vals, upds); - for datum in chain.drain(..).flatten() { - builder.push(datum); + for mut chunk in chain.drain(..) { + builder.push(&mut chunk); } builder.done(lower.to_owned(), upper.to_owned(), since.to_owned()) diff --git a/src/trace/implementations/merge_batcher_col.rs b/src/trace/implementations/merge_batcher_col.rs index aed0039d8..265f2e649 100644 --- a/src/trace/implementations/merge_batcher_col.rs +++ b/src/trace/implementations/merge_batcher_col.rs @@ -67,7 +67,7 @@ where type Time = T; type Input = Vec<((K, V), T, R)>; type Chunk = TimelyStack<((K, V), T, R)>; - type Output = ((K, V), T, R); + type Output = TimelyStack<((K, V), T, R)>; fn accept(&mut self, container: RefOrMut, stash: &mut Vec) -> Vec { // Ensure `self.pending` has the desired capacity. We should never have a larger capacity @@ -290,11 +290,8 @@ where } } let mut builder = B::with_capacity(keys, vals, upds); - - for chunk in chain.drain(..) { - for datum in chunk.iter() { - builder.copy(datum); - } + for mut chunk in chain.drain(..) { + builder.push(&mut chunk); } builder.done(lower.to_owned(), upper.to_owned(), since.to_owned()) diff --git a/src/trace/implementations/mod.rs b/src/trace/implementations/mod.rs index 128ec0bf1..d0e4a459a 100644 --- a/src/trace/implementations/mod.rs +++ b/src/trace/implementations/mod.rs @@ -54,7 +54,10 @@ pub use self::ord_neu::OrdKeySpine as KeySpine; use std::borrow::{ToOwned}; use std::cmp::Ordering; +use timely::Container; use timely::container::columnation::{Columnation, TimelyStack}; +use timely::container::PushInto; +use timely::progress::Timestamp; use crate::lattice::Lattice; use crate::difference::Semigroup; @@ -138,7 +141,7 @@ where /// A type with a preferred container. /// -/// Examples include types that implement `Clone` who prefer +/// Examples include types that implement `Clone` who prefer pub trait PreferredContainer : ToOwned { /// The preferred container for the type. type Container: BatchContainer; @@ -161,8 +164,8 @@ impl Update for Preferred where K: ToOwned + ?Sized, K::Owned: Ord+Clone+'static, - V: ToOwned + ?Sized + 'static, - V::Owned: Ord+Clone, + V: ToOwned + ?Sized, + V::Owned: Ord+Clone+'static, T: Ord+Lattice+timely::progress::Timestamp+Clone, R: Semigroup+Clone, { @@ -177,8 +180,8 @@ where K: Ord+ToOwned+PreferredContainer + ?Sized, K::Owned: Ord+Clone+'static, // for<'a> K::Container: BatchContainer = &'a K>, - V: Ord+ToOwned+PreferredContainer + ?Sized + 'static, - V::Owned: Ord+Clone, + V: Ord+ToOwned+PreferredContainer + ?Sized, + V::Owned: Ord+Clone+'static, T: Ord+Lattice+timely::progress::Timestamp+Clone, D: Semigroup+Clone, { @@ -195,7 +198,7 @@ use abomonation_derive::Abomonation; use crate::trace::cursor::MyTrait; /// A list of unsigned integers that uses `u32` elements as long as they are small enough, and switches to `u64` once they are not. -#[derive(Eq, PartialEq, Ord, PartialOrd, Clone, Debug, Abomonation)] +#[derive(Eq, PartialEq, Ord, PartialOrd, Clone, Abomonation)] pub struct OffsetList { /// Length of a prefix of zero elements. pub zero_prefix: usize, @@ -205,6 +208,12 @@ pub struct OffsetList { pub chonk: Vec, } +impl std::fmt::Debug for OffsetList { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_list().entries(self.into_iter()).finish() + } +} + impl OffsetList { /// Allocate a new list with a specified capacity. pub fn with_capacity(cap: usize) -> Self { @@ -222,7 +231,7 @@ impl OffsetList { else if self.chonk.is_empty() { if let Ok(smol) = offset.try_into() { self.smol.push(smol); - } + } else { self.chonk.push(offset.try_into().unwrap()) } @@ -249,6 +258,41 @@ impl OffsetList { } } +impl<'a> IntoIterator for &'a OffsetList { + type Item = usize; + type IntoIter = OffsetListIter<'a>; + + fn into_iter(self) -> Self::IntoIter { + OffsetListIter {list: self, index: 0 } + } +} + +/// An iterator for [`OffsetList`]. +pub struct OffsetListIter<'a> { + list: &'a OffsetList, + index: usize, +} + +impl<'a> Iterator for OffsetListIter<'a> { + type Item = usize; + + fn next(&mut self) -> Option { + if self.index < self.list.len() { + let res = Some(self.list.index(self.index)); + self.index += 1; + res + } else { + None + } + } +} + +impl PushInto for usize { + fn push_into(self, target: &mut OffsetList) { + target.push(self); + } +} + /// Helper struct to provide `MyTrait` for `Copy` types. #[derive(Eq, PartialEq, Ord, PartialOrd, Clone, Copy)] pub struct Wrapper(T); @@ -320,12 +364,111 @@ impl BatchContainer for OffsetList { } } +/// Behavior to split an update into principal components. +pub trait BuilderInput: Container { + /// Key portion + type Key<'a>: Ord; + /// Value portion + type Val<'a>: Ord; + /// Time + type Time; + /// Diff + type Diff; + + /// Split an item into separate parts. + fn into_parts<'a>(item: Self::Item<'a>) -> (Self::Key<'a>, Self::Val<'a>, Self::Time, Self::Diff); + + /// Test that the key equals a key in the layout's key container. + fn key_eq(this: &Self::Key<'_>, other: ::ReadItem<'_>) -> bool; + + /// Test that the value equals a key in the layout's value container. + fn val_eq(this: &Self::Val<'_>, other: ::ReadItem<'_>) -> bool; +} + +impl BuilderInput> for Vec<((K, V), T, R)> +where + K: Ord + Clone + 'static, + V: Ord + Clone + 'static, + T: Timestamp + Lattice + Clone + 'static, + R: Semigroup + Clone + 'static, +{ + type Key<'a> = K; + type Val<'a> = V; + type Time = T; + type Diff = R; + + fn into_parts<'a>(((key, val), time, diff): Self::Item<'a>) -> (Self::Key<'a>, Self::Val<'a>, Self::Time, Self::Diff) { + (key, val, time, diff) + } + + fn key_eq(this: &K, other: &K) -> bool { + this == other + } + + fn val_eq(this: &V, other: &V) -> bool { + this == other + } +} + +impl BuilderInput> for TimelyStack<((K, V), T, R)> +where + K: Ord + Columnation + Clone + 'static, + V: Ord + Columnation + Clone + 'static, + T: Timestamp + Lattice + Columnation + Clone + 'static, + R: Semigroup + Columnation + Clone + 'static, +{ + type Key<'a> = &'a K; + type Val<'a> = &'a V; + type Time = T; + type Diff = R; + + fn into_parts<'a>(((key, val), time, diff): Self::Item<'a>) -> (Self::Key<'a>, Self::Val<'a>, Self::Time, Self::Diff) { + (key, val, time.clone(), diff.clone()) + } + + fn key_eq(this: &&K, other: &K) -> bool { + *this == other + } + + fn val_eq(this: &&V, other: &V) -> bool { + *this == other + } +} + +impl BuilderInput> for TimelyStack<((::Owned, ::Owned), T, R)> +where + K: Ord+ToOwned+PreferredContainer + ?Sized, + K::Owned: Columnation + Ord+Clone+'static, + V: Ord+ToOwned+PreferredContainer + ?Sized, + V::Owned: Columnation + Ord+Clone+'static, + T: Columnation + Ord+Lattice+Timestamp+Clone, + R: Columnation + Semigroup+Clone, +{ + type Key<'a> = &'a K::Owned; + type Val<'a> = &'a V::Owned; + type Time = T; + type Diff = R; + + fn into_parts<'a>(((key, val), time, diff): Self::Item<'a>) -> (Self::Key<'a>, Self::Val<'a>, Self::Time, Self::Diff) { + (key, val, time.clone(), diff.clone()) + } + + fn key_eq(this: &&K::Owned, other: <::Container as BatchContainer>::ReadItem<'_>) -> bool { + other.equals(this) + } + + fn val_eq(this: &&V::Owned, other: <::Container as BatchContainer>::ReadItem<'_>) -> bool { + other.equals(this) + } +} + pub use self::containers::{BatchContainer, SliceContainer}; /// Containers for data that resemble `Vec`, with leaner implementations. pub mod containers { use timely::container::columnation::{Columnation, TimelyStack}; + use timely::container::PushInto; use std::borrow::ToOwned; use crate::trace::MyTrait; @@ -498,6 +641,18 @@ pub mod containers { inner: Vec, } + impl PushInto> for &[B] { + fn push_into(self, target: &mut SliceContainer) { + target.copy(self) + } + } + + impl PushInto> for &Vec { + fn push_into(self, target: &mut SliceContainer) { + target.copy(self) + } + } + impl BatchContainer for SliceContainer where B: Ord + Clone + Sized + 'static, diff --git a/src/trace/implementations/ord_neu.rs b/src/trace/implementations/ord_neu.rs index a5afee109..ddc8a4409 100644 --- a/src/trace/implementations/ord_neu.rs +++ b/src/trace/implementations/ord_neu.rs @@ -9,6 +9,7 @@ //! and should consume fewer resources (computation and memory) when it applies. use std::rc::Rc; +use timely::container::columnation::{TimelyStack}; use crate::trace::implementations::spine_fueled::Spine; use crate::trace::implementations::merge_batcher::{MergeBatcher, VecMerger}; @@ -24,7 +25,7 @@ pub use self::key_batch::{OrdKeyBatch, OrdKeyBuilder}; pub type OrdValSpine = Spine< Rc>>, MergeBatcher, T>, - RcBuilder>>, + RcBuilder, Vec<((K,V),T,R)>>>, >; // /// A trace implementation for empty values using a spine of ordered lists. // pub type OrdKeySpine = Spine>>>; @@ -33,14 +34,14 @@ pub type OrdValSpine = Spine< pub type ColValSpine = Spine< Rc>>, MergeBatcher, T>, - RcBuilder>>, + RcBuilder, TimelyStack<((K,V),T,R)>>>, >; /// A trace implementation using a spine of ordered lists. pub type OrdKeySpine = Spine< Rc>>, MergeBatcher, T>, - RcBuilder>>, + RcBuilder, Vec<((K,()),T,R)>>>, >; // /// A trace implementation for empty values using a spine of ordered lists. // pub type OrdKeySpine = Spine>>>; @@ -49,28 +50,30 @@ pub type OrdKeySpine = Spine< pub type ColKeySpine = Spine< Rc>>, MergeBatcher, T>, - RcBuilder>>, + RcBuilder, TimelyStack<((K,()),T,R)>>>, >; /// A trace implementation backed by columnar storage. pub type PreferredSpine = Spine< Rc>>, MergeBatcher::Owned,::Owned),T,R)>,T>, - RcBuilder>>, + RcBuilder, TimelyStack<((::Owned,::Owned),T,R)>>>, >; // /// A trace implementation backed by columnar storage. // pub type ColKeySpine = Spine>>>; + mod val_batch { use std::marker::PhantomData; use abomonation_derive::Abomonation; + use timely::container::PushInto; use timely::progress::{Antichain, frontier::AntichainRef}; use crate::trace::{Batch, BatchReader, Builder, Cursor, Description, Merger}; - use crate::trace::implementations::BatchContainer; + use crate::trace::implementations::{BatchContainer, BuilderInput}; use crate::trace::cursor::MyTrait; use super::{Layout, Update}; @@ -148,7 +151,7 @@ mod val_batch { OrdValCursor { key_cursor: 0, val_cursor: 0, - phantom: std::marker::PhantomData, + phantom: PhantomData, } } fn len(&self) -> usize { @@ -189,7 +192,7 @@ mod val_batch { impl Merger> for OrdValMerger where - OrdValBatch: Batch::Time> + OrdValBatch: Batch::Time>, { fn new(batch1: &OrdValBatch, batch2: &OrdValBatch, compaction_frontier: AntichainRef<::Time>) -> Self { @@ -498,7 +501,7 @@ mod val_batch { } /// A builder for creating layers from unsorted update tuples. - pub struct OrdValBuilder { + pub struct OrdValBuilder { result: OrdValStorage, singleton: Option<(::Time, ::Diff)>, /// Counts the number of singleton optimizations we performed. @@ -506,9 +509,10 @@ mod val_batch { /// This number allows us to correctly gauge the total number of updates reflected in a batch, /// even though `updates.len()` may be much shorter than this amount. singletons: usize, + _marker: PhantomData, } - impl OrdValBuilder { + impl OrdValBuilder { /// Pushes a single update, which may set `self.singleton` rather than push. /// /// This operation is meant to be equivalent to `self.results.updates.push((time, diff))`. @@ -536,9 +540,15 @@ mod val_batch { } } - impl Builder for OrdValBuilder { + impl Builder for OrdValBuilder + where + L: Layout, + CI: for<'a> BuilderInput::Time, Diff=::Diff>, + for<'a> CI::Key<'a>: PushInto, + for<'a> CI::Val<'a>: PushInto, + { - type Input = ((::Key, ::Val), ::Time, ::Diff); + type Input = CI; type Time = ::Time; type Output = OrdValBatch; @@ -554,62 +564,35 @@ mod val_batch { }, singleton: None, singletons: 0, + _marker: PhantomData, } } #[inline] - fn push(&mut self, ((key, val), time, diff): Self::Input) { - - // Perhaps this is a continuation of an already received key. - if self.result.keys.last().map(|k| k.equals(&key)).unwrap_or(false) { - // Perhaps this is a continuation of an already received value. - if self.result.vals.last().map(|v| v.equals(&val)).unwrap_or(false) { - self.push_update(time, diff); + fn push(&mut self, chunk: &mut Self::Input) { + for item in chunk.drain() { + let (key, val, time, diff) = CI::into_parts(item); + // Perhaps this is a continuation of an already received key. + if self.result.keys.last().map(|k| CI::key_eq(&key, k)).unwrap_or(false) { + // Perhaps this is a continuation of an already received value. + if self.result.vals.last().map(|v| CI::val_eq(&val, v)).unwrap_or(false) { + self.push_update(time, diff); + } else { + // New value; complete representation of prior value. + self.result.vals_offs.push(self.result.updates.len()); + if self.singleton.take().is_some() { self.singletons += 1; } + self.push_update(time, diff); + val.push_into(&mut self.result.vals); + } } else { - // New value; complete representation of prior value. + // New key; complete representation of prior key. self.result.vals_offs.push(self.result.updates.len()); if self.singleton.take().is_some() { self.singletons += 1; } + self.result.keys_offs.push(self.result.vals.len()); self.push_update(time, diff); - self.result.vals.push(val); - } - } else { - // New key; complete representation of prior key. - self.result.vals_offs.push(self.result.updates.len()); - if self.singleton.take().is_some() { self.singletons += 1; } - self.result.keys_offs.push(self.result.vals.len()); - self.push_update(time, diff); - self.result.vals.push(val); - self.result.keys.push(key); - } - } - - #[inline] - fn copy(&mut self, ((key, val), time, diff): &Self::Input) { - - // Perhaps this is a continuation of an already received key. - if self.result.keys.last().map(|k| k.equals(key)).unwrap_or(false) { - // Perhaps this is a continuation of an already received value. - if self.result.vals.last().map(|v| v.equals(val)).unwrap_or(false) { - // TODO: here we could look for repetition, and not push the update in that case. - // More logic (and state) would be required to correctly wrangle this. - self.push_update(time.clone(), diff.clone()); - } else { - // New value; complete representation of prior value. - self.result.vals_offs.push(self.result.updates.len()); - // Remove any pending singleton, and if it was set increment our count. - if self.singleton.take().is_some() { self.singletons += 1; } - self.push_update(time.clone(), diff.clone()); - self.result.vals.copy_push(val); + val.push_into(&mut self.result.vals); + key.push_into(&mut self.result.keys); } - } else { - // New key; complete representation of prior key. - self.result.vals_offs.push(self.result.updates.len()); - // Remove any pending singleton, and if it was set increment our count. - if self.singleton.take().is_some() { self.singletons += 1; } - self.result.keys_offs.push(self.result.vals.len()); - self.push_update(time.clone(), diff.clone()); - self.result.vals.copy_push(val); - self.result.keys.copy_push(key); } } @@ -634,10 +617,11 @@ mod key_batch { use std::marker::PhantomData; use abomonation_derive::Abomonation; + use timely::container::PushInto; use timely::progress::{Antichain, frontier::AntichainRef}; use crate::trace::{Batch, BatchReader, Builder, Cursor, Description, Merger}; - use crate::trace::implementations::BatchContainer; + use crate::trace::implementations::{BatchContainer, BuilderInput}; use crate::trace::cursor::MyTrait; use super::{Layout, Update}; @@ -962,7 +946,7 @@ mod key_batch { } /// A builder for creating layers from unsorted update tuples. - pub struct OrdKeyBuilder { + pub struct OrdKeyBuilder { result: OrdKeyStorage, singleton: Option<(::Time, ::Diff)>, /// Counts the number of singleton optimizations we performed. @@ -970,9 +954,10 @@ mod key_batch { /// This number allows us to correctly gauge the total number of updates reflected in a batch, /// even though `updates.len()` may be much shorter than this amount. singletons: usize, + _marker: PhantomData, } - impl OrdKeyBuilder { + impl OrdKeyBuilder { /// Pushes a single update, which may set `self.singleton` rather than push. /// /// This operation is meant to be equivalent to `self.results.updates.push((time, diff))`. @@ -1000,9 +985,14 @@ mod key_batch { } } - impl Builder for OrdKeyBuilder { + impl Builder for OrdKeyBuilder + where + L: Layout, + CI: for<'a> BuilderInput::Time, Diff=::Diff>, + for<'a> CI::Key<'a>: PushInto, + { - type Input = ((::Key, ()), ::Time, ::Diff); + type Input = CI; type Time = ::Time; type Output = OrdKeyBatch; @@ -1016,38 +1006,25 @@ mod key_batch { }, singleton: None, singletons: 0, + _marker: PhantomData, } } #[inline] - fn push(&mut self, ((key, ()), time, diff): Self::Input) { - - // Perhaps this is a continuation of an already received key. - if self.result.keys.last().map(|k| k.equals(&key)).unwrap_or(false) { - self.push_update(time, diff); - } else { - // New key; complete representation of prior key. - self.result.keys_offs.push(self.result.updates.len()); - // Remove any pending singleton, and if it was set increment our count. - if self.singleton.take().is_some() { self.singletons += 1; } - self.push_update(time, diff); - self.result.keys.push(key); - } - } - - #[inline] - fn copy(&mut self, ((key, ()), time, diff): &Self::Input) { - - // Perhaps this is a continuation of an already received key. - if self.result.keys.last().map(|k| k.equals(key)).unwrap_or(false) { - self.push_update(time.clone(), diff.clone()); - } else { - // New key; complete representation of prior key. - self.result.keys_offs.push(self.result.updates.len()); - // Remove any pending singleton, and if it was set increment our count. - if self.singleton.take().is_some() { self.singletons += 1; } - self.push_update(time.clone(), diff.clone()); - self.result.keys.copy_push(key); + fn push(&mut self, chunk: &mut Self::Input) { + for item in chunk.drain() { + let (key, _val, time, diff) = CI::into_parts(item); + // Perhaps this is a continuation of an already received key. + if self.result.keys.last().map(|k| CI::key_eq(&key, k)).unwrap_or(false) { + self.push_update(time, diff); + } else { + // New key; complete representation of prior key. + self.result.keys_offs.push(self.result.updates.len()); + // Remove any pending singleton, and if it was set increment our count. + if self.singleton.take().is_some() { self.singletons += 1; } + self.push_update(time, diff); + key.push_into(&mut self.result.keys); + } } } diff --git a/src/trace/implementations/rhh.rs b/src/trace/implementations/rhh.rs index 8389b62a6..60ed6afd4 100644 --- a/src/trace/implementations/rhh.rs +++ b/src/trace/implementations/rhh.rs @@ -9,6 +9,7 @@ use std::rc::Rc; use std::cmp::Ordering; use abomonation_derive::Abomonation; +use timely::container::columnation::TimelyStack; use crate::Hashable; use crate::trace::implementations::merge_batcher::{MergeBatcher, VecMerger}; @@ -24,7 +25,7 @@ use self::val_batch::{RhhValBatch, RhhValBuilder}; pub type VecSpine = Spine< Rc>>, MergeBatcher, T>, - RcBuilder>>, + RcBuilder, Vec<((K,V),T,R)>>>, >; // /// A trace implementation for empty values using a spine of ordered lists. // pub type OrdKeySpine = Spine>>>; @@ -33,7 +34,7 @@ pub type VecSpine = Spine< pub type ColSpine = Spine< Rc>>, MergeBatcher, T>, - RcBuilder>>, + RcBuilder, TimelyStack<((K,V),T,R)>>>, >; // /// A trace implementation backed by columnar storage. // pub type ColKeySpine = Spine>>>; @@ -86,12 +87,13 @@ mod val_batch { use std::convert::TryInto; use std::marker::PhantomData; use abomonation_derive::Abomonation; + use timely::container::PushInto; use timely::progress::{Antichain, frontier::AntichainRef}; use crate::hashable::Hashable; use crate::trace::{Batch, BatchReader, Builder, Cursor, Description, Merger}; - use crate::trace::implementations::BatchContainer; + use crate::trace::implementations::{BatchContainer, BuilderInput}; use crate::trace::cursor::MyTrait; use super::{Layout, Update, HashOrdered}; @@ -695,7 +697,7 @@ mod val_batch { } /// A builder for creating layers from unsorted update tuples. - pub struct RhhValBuilder + pub struct RhhValBuilder where ::Key: Default + HashOrdered, { @@ -706,9 +708,10 @@ mod val_batch { /// This number allows us to correctly gauge the total number of updates reflected in a batch, /// even though `updates.len()` may be much shorter than this amount. singletons: usize, + _marker: PhantomData, } - impl RhhValBuilder + impl RhhValBuilder where ::Key: Default + HashOrdered, { @@ -739,12 +742,14 @@ mod val_batch { } } - impl Builder for RhhValBuilder + impl Builder for RhhValBuilder where ::Key: Default + HashOrdered, // RhhValBatch: Batch::Key, Val=::Val, Time=::Time, Diff=::Diff>, + CI: for<'a> BuilderInput = ::Key, Time=::Time, Diff=::Diff>, + for<'a> CI::Val<'a>: PushInto, { - type Input = ((::Key, ::Val), ::Time, ::Diff); + type Input = CI; type Time = ::Time; type Output = RhhValBatch; @@ -772,64 +777,36 @@ mod val_batch { }, singleton: None, singletons: 0, + _marker: PhantomData, } } #[inline] - fn push(&mut self, ((key, val), time, diff): Self::Input) { - - // Perhaps this is a continuation of an already received key. - if self.result.keys.last().map(|k| k.equals(&key)).unwrap_or(false) { - // Perhaps this is a continuation of an already received value. - if self.result.vals.last().map(|v| v.equals(&val)).unwrap_or(false) { - self.push_update(time, diff); + fn push(&mut self, chunk: &mut Self::Input) { + for item in chunk.drain() { + let (key, val, time, diff) = CI::into_parts(item); + // Perhaps this is a continuation of an already received key. + if self.result.keys.last().map(|k| CI::key_eq(&key, k)).unwrap_or(false) { + // Perhaps this is a continuation of an already received value. + if self.result.vals.last().map(|v| CI::val_eq(&val, v)).unwrap_or(false) { + self.push_update(time, diff); + } else { + // New value; complete representation of prior value. + self.result.vals_offs.push(self.result.updates.len()); + if self.singleton.take().is_some() { self.singletons += 1; } + self.push_update(time, diff); + val.push_into(&mut self.result.vals); + } } else { - // New value; complete representation of prior value. + // New key; complete representation of prior key. self.result.vals_offs.push(self.result.updates.len()); if self.singleton.take().is_some() { self.singletons += 1; } + self.result.keys_offs.push(self.result.vals.len()); self.push_update(time, diff); - self.result.vals.push(val); + val.push_into(&mut self.result.vals); + // Insert the key, but with no specified offset. + self.result.insert_key(key.borrow(), None); } - } else { - // New key; complete representation of prior key. - self.result.vals_offs.push(self.result.updates.len()); - if self.singleton.take().is_some() { self.singletons += 1; } - self.result.keys_offs.push(self.result.vals.len()); - self.push_update(time, diff); - self.result.vals.push(val); - // Insert the key, but with no specified offset. - self.result.insert_key(key.borrow(), None); - } - } - - #[inline] - fn copy(&mut self, ((key, val), time, diff): &Self::Input) { - - // Perhaps this is a continuation of an already received key. - if self.result.keys.last().map(|k| k.equals(key)).unwrap_or(false) { - // Perhaps this is a continuation of an already received value. - if self.result.vals.last().map(|v| v.equals(val)).unwrap_or(false) { - // TODO: here we could look for repetition, and not push the update in that case. - // More logic (and state) would be required to correctly wrangle this. - self.push_update(time.clone(), diff.clone()); - } else { - // New value; complete representation of prior value. - self.result.vals_offs.push(self.result.updates.len()); - // Remove any pending singleton, and if it was set increment our count. - if self.singleton.take().is_some() { self.singletons += 1; } - self.push_update(time.clone(), diff.clone()); - self.result.vals.copy_push(val); - } - } else { - // New key; complete representation of prior key. - self.result.vals_offs.push(self.result.updates.len()); - // Remove any pending singleton, and if it was set increment our count. - if self.singleton.take().is_some() { self.singletons += 1; } - self.result.keys_offs.push(self.result.vals.len()); - self.push_update(time.clone(), diff.clone()); - self.result.vals.copy_push(val); - // Insert the key, but with no specified offset. - self.result.insert_key(key, None); } } diff --git a/src/trace/mod.rs b/src/trace/mod.rs index afee7c22e..00b72fc1d 100644 --- a/src/trace/mod.rs +++ b/src/trace/mod.rs @@ -336,15 +336,10 @@ pub trait Builder: Sized { /// /// They represent respectively the number of distinct `key`, `(key, val)`, and total updates. fn with_capacity(keys: usize, vals: usize, upds: usize) -> Self; - /// Adds an element to the batch. + /// Adds a chunk of elements to the batch. /// - /// The default implementation uses `self.copy` with references to the owned arguments. - /// One should override it if the builder can take advantage of owned arguments. - fn push(&mut self, element: Self::Input) { - self.copy(&element); - } - /// Adds an element to the batch. - fn copy(&mut self, element: &Self::Input); + /// Adds all elements from `chunk` to the builder and leaves `chunk` in an undefined state. + fn push(&mut self, chunk: &mut Self::Input); /// Completes building and returns the batch. fn done(self, lower: Antichain, upper: Antichain, since: Antichain) -> Self::Output; } @@ -454,8 +449,7 @@ pub mod rc_blanket_impls { type Time = B::Time; type Output = Rc; fn with_capacity(keys: usize, vals: usize, upds: usize) -> Self { RcBuilder { builder: B::with_capacity(keys, vals, upds) } } - fn push(&mut self, element: Self::Input) { self.builder.push(element) } - fn copy(&mut self, element: &Self::Input) { self.builder.copy(element) } + fn push(&mut self, input: &mut Self::Input) { self.builder.push(input) } fn done(self, lower: Antichain, upper: Antichain, since: Antichain) -> Rc { Rc::new(self.builder.done(lower, upper, since)) } } @@ -561,8 +555,7 @@ pub mod abomonated_blanket_impls { type Time = B::Time; type Output = Abomonated>; fn with_capacity(keys: usize, vals: usize, upds: usize) -> Self { AbomonatedBuilder { builder: B::with_capacity(keys, vals, upds) } } - fn push(&mut self, element: Self::Input) { self.builder.push(element) } - fn copy(&mut self, element: &Self::Input) { self.builder.copy(element) } + fn push(&mut self, input: &mut Self::Input) { self.builder.push(input) } fn done(self, lower: Antichain, upper: Antichain, since: Antichain) -> Self::Output { let batch = self.builder.done(lower, upper, since); let mut bytes = Vec::with_capacity(measure(&batch)); From 2de0cbd0a7e674a075aa6a758220272101cc99a0 Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Fri, 24 May 2024 11:32:24 -0400 Subject: [PATCH 8/8] Introduce and integrate `IntoOwned` trait (#495) * Introduce and integrate IntoOwned trait * Respond to feedback --- dogsdogsdogs/src/operators/half_join.rs | 6 +- dogsdogsdogs/src/operators/lookup_map.rs | 6 +- src/operators/arrange/upsert.rs | 6 +- src/operators/consolidate.rs | 2 +- src/operators/count.rs | 2 +- src/operators/reduce.rs | 6 +- src/trace/cursor/mod.rs | 46 ++----- .../implementations/huffman_container.rs | 32 ++--- src/trace/implementations/mod.rs | 119 +++++++++--------- src/trace/implementations/option_container.rs | 62 ++++----- src/trace/implementations/ord_neu.rs | 9 +- src/trace/implementations/rhh.rs | 24 ++-- src/trace/mod.rs | 10 +- 13 files changed, 148 insertions(+), 182 deletions(-) diff --git a/dogsdogsdogs/src/operators/half_join.rs b/dogsdogsdogs/src/operators/half_join.rs index 283284525..877b1399e 100644 --- a/dogsdogsdogs/src/operators/half_join.rs +++ b/dogsdogsdogs/src/operators/half_join.rs @@ -208,9 +208,9 @@ where // Use TOTAL ORDER to allow the release of `time`. yielded = yielded || yield_function(timer, work); if !yielded && !input2.frontier.frontier().iter().any(|t| comparison(t, initial)) { - use differential_dataflow::trace::cursor::MyTrait; - cursor.seek_key(&storage, MyTrait::borrow_as(key)); - if cursor.get_key(&storage) == Some(MyTrait::borrow_as(key)) { + use differential_dataflow::trace::cursor::IntoOwned; + cursor.seek_key(&storage, IntoOwned::borrow_as(key)); + if cursor.get_key(&storage) == Some(IntoOwned::borrow_as(key)) { while let Some(val2) = cursor.get_val(&storage) { cursor.map_times(&storage, |t, d| { if comparison(t, initial) { diff --git a/dogsdogsdogs/src/operators/lookup_map.rs b/dogsdogsdogs/src/operators/lookup_map.rs index e8ab279ff..f1369eab1 100644 --- a/dogsdogsdogs/src/operators/lookup_map.rs +++ b/dogsdogsdogs/src/operators/lookup_map.rs @@ -93,9 +93,9 @@ where for &mut (ref prefix, ref time, ref mut diff) in prefixes.iter_mut() { if !input2.frontier.less_equal(time) { logic2(prefix, &mut key1); - use differential_dataflow::trace::cursor::MyTrait; - cursor.seek_key(&storage, MyTrait::borrow_as(&key1)); - if cursor.get_key(&storage) == Some(MyTrait::borrow_as(&key1)) { + use differential_dataflow::trace::cursor::IntoOwned; + cursor.seek_key(&storage, IntoOwned::borrow_as(&key1)); + if cursor.get_key(&storage) == Some(IntoOwned::borrow_as(&key1)) { while let Some(value) = cursor.get_val(&storage) { let mut count = Tr::Diff::zero(); cursor.map_times(&storage, |t, d| { diff --git a/src/operators/arrange/upsert.rs b/src/operators/arrange/upsert.rs index 65cdc0b4b..a36781a5e 100644 --- a/src/operators/arrange/upsert.rs +++ b/src/operators/arrange/upsert.rs @@ -244,14 +244,14 @@ where let mut builder = Tr::Builder::new(); for (key, mut list) in to_process.drain(..) { - use trace::cursor::MyTrait; + use trace::cursor::IntoOwned; // The prior value associated with the key. let mut prev_value: Option = None; // Attempt to find the key in the trace. - trace_cursor.seek_key_owned(&trace_storage, &key); - if trace_cursor.get_key(&trace_storage).map(|k| k.equals(&key)).unwrap_or(false) { + trace_cursor.seek_key(&trace_storage, IntoOwned::borrow_as(&key)); + if trace_cursor.get_key(&trace_storage).map(|k| k.eq(&IntoOwned::borrow_as(&key))).unwrap_or(false) { // Determine the prior value associated with the key. while let Some(val) = trace_cursor.get_val(&trace_storage) { let mut count = 0; diff --git a/src/operators/consolidate.rs b/src/operators/consolidate.rs index 2235c4df9..86b7e729c 100644 --- a/src/operators/consolidate.rs +++ b/src/operators/consolidate.rs @@ -57,7 +57,7 @@ where Tr::Batcher: Batcher>, { use crate::operators::arrange::arrangement::Arrange; - use crate::trace::cursor::MyTrait; + use crate::trace::cursor::IntoOwned; self.map(|k| (k, ())) .arrange_named::(name) .as_collection(|d, _| d.into_owned()) diff --git a/src/operators/count.rs b/src/operators/count.rs index 0779bde06..42a030786 100644 --- a/src/operators/count.rs +++ b/src/operators/count.rs @@ -70,7 +70,7 @@ where move |input, output| { - use crate::trace::cursor::MyTrait; + use crate::trace::cursor::IntoOwned; input.for_each(|capability, batches| { batches.swap(&mut buffer); let mut session = output.session(&capability); diff --git a/src/operators/reduce.rs b/src/operators/reduce.rs index 8711e66a0..ffaf1652b 100644 --- a/src/operators/reduce.rs +++ b/src/operators/reduce.rs @@ -478,10 +478,10 @@ where while batch_cursor.key_valid(batch_storage) || exposed_position < exposed.len() { use std::borrow::Borrow; - use crate::trace::cursor::MyTrait; + use crate::trace::cursor::IntoOwned; // Determine the next key we will work on; could be synthetic, could be from a batch. - let key1 = exposed.get(exposed_position).map(|x| <_ as MyTrait>::borrow_as(&x.0)); + let key1 = exposed.get(exposed_position).map(|x| <_ as IntoOwned>::borrow_as(&x.0)); let key2 = batch_cursor.get_key(batch_storage); let key = match (key1, key2) { (Some(key1), Some(key2)) => ::std::cmp::min(key1, key2), @@ -497,7 +497,7 @@ where interesting_times.clear(); // Populate `interesting_times` with synthetic interesting times (below `upper_limit`) for this key. - while exposed.get(exposed_position).map(|x| x.0.borrow()).map(|k| key.equals(k)).unwrap_or(false) { + while exposed.get(exposed_position).map(|x| x.0.borrow()).map(|k| key.eq(& as IntoOwned>::borrow_as(&k))).unwrap_or(false) { interesting_times.push(exposed[exposed_position].1.clone()); exposed_position += 1; } diff --git a/src/trace/cursor/mod.rs b/src/trace/cursor/mod.rs index 927dba4b2..2631c2aea 100644 --- a/src/trace/cursor/mod.rs +++ b/src/trace/cursor/mod.rs @@ -14,56 +14,40 @@ pub mod cursor_list; pub use self::cursor_list::CursorList; use std::borrow::Borrow; -use std::cmp::Ordering; -/// A type that may be converted into and compared with another type. +/// A reference type corresponding to an owned type, supporting conversion in each direction. /// -/// The type must also be comparable with itself, and follow the same -/// order as if converting instances to `T` and comparing the results. -pub trait MyTrait<'a> : Ord { +/// This trait can be implemented by a GAT, and enables owned types to be borrowed as a GAT. +/// This trait is analogous to `ToOwned`, but not as prescriptive. Specifically, it avoids the +/// requirement that the other trait implement `Borrow`, for which a borrow must result in a +/// `&'self Borrowed`, which cannot move the lifetime into a GAT borrowed type. +pub trait IntoOwned<'a> { /// Owned type into which this type can be converted. type Owned; /// Conversion from an instance of this type to the owned type. fn into_owned(self) -> Self::Owned; - /// + /// Clones `self` onto an existing instance of the owned type. fn clone_onto(&self, other: &mut Self::Owned); - /// Indicates that `self <= other`; used for sorting. - fn compare(&self, other: &Self::Owned) -> Ordering; - /// `self <= other` - fn less_equals(&self, other: &Self::Owned) -> bool { - self.compare(other) != Ordering::Greater - } - /// `self == other` - fn equals(&self, other: &Self::Owned) -> bool { - self.compare(other) == Ordering::Equal - } - /// `self < other` - fn less_than(&self, other: &Self::Owned) -> bool { - self.compare(other) == Ordering::Less - } - /// Borrows an owned instance as onesself. - fn borrow_as(other: &'a Self::Owned) -> Self; + /// Borrows an owned instance as oneself. + fn borrow_as(owned: &'a Self::Owned) -> Self; } -impl<'a, T: Ord+ToOwned+?Sized> MyTrait<'a> for &'a T { +impl<'a, T: ToOwned+?Sized> IntoOwned<'a> for &'a T { type Owned = T::Owned; fn into_owned(self) -> Self::Owned { self.to_owned() } fn clone_onto(&self, other: &mut Self::Owned) { ::clone_into(self, other) } - fn compare(&self, other: &Self::Owned) -> Ordering { self.cmp(&other.borrow()) } - fn borrow_as(other: &'a Self::Owned) -> Self { - other.borrow() - } + fn borrow_as(owned: &'a Self::Owned) -> Self { owned.borrow() } } /// A cursor for navigating ordered `(key, val, time, diff)` updates. pub trait Cursor { /// Key by which updates are indexed. - type Key<'a>: Copy + Clone + MyTrait<'a, Owned = Self::KeyOwned>; + type Key<'a>: Copy + Clone + Ord + IntoOwned<'a, Owned = Self::KeyOwned>; /// Owned version of the above. type KeyOwned: Ord + Clone; /// Values associated with keys. - type Val<'a>: Copy + Clone + MyTrait<'a> + for<'b> PartialOrd>; + type Val<'a>: Copy + Clone + Ord + IntoOwned<'a> + for<'b> PartialOrd>; /// Timestamps associated with updates type Time: Timestamp + Lattice + Ord + Clone; /// Associated update. @@ -103,10 +87,6 @@ pub trait Cursor { fn step_key(&mut self, storage: &Self::Storage); /// Advances the cursor to the specified key. fn seek_key(&mut self, storage: &Self::Storage, key: Self::Key<'_>); - /// Convenience method to get access by reference to an owned key. - fn seek_key_owned(&mut self, storage: &Self::Storage, key: &Self::KeyOwned) { - self.seek_key(storage, MyTrait::borrow_as(key)); - } /// Advances the cursor to the next value. fn step_val(&mut self, storage: &Self::Storage); diff --git a/src/trace/implementations/huffman_container.rs b/src/trace/implementations/huffman_container.rs index 290b5d870..4fa67b2c7 100644 --- a/src/trace/implementations/huffman_container.rs +++ b/src/trace/implementations/huffman_container.rs @@ -32,12 +32,8 @@ where } } -impl BatchContainer for HuffmanContainer -where - B: Ord + Clone + Sized + 'static, -{ - type PushItem = Vec; - type ReadItem<'a> = Wrapped<'a, B>; +use crate::trace::implementations::containers::Push; +impl Push> for HuffmanContainer { fn push(&mut self, item: Vec) { for x in item.iter() { *self.stats.entry(x.clone()).or_insert(0) += 1; } match &mut self.inner { @@ -51,10 +47,12 @@ where } } } - fn copy_push(&mut self, item: &Vec) { - use crate::trace::MyTrait; - self.copy(<_ as MyTrait>::borrow_as(item)); - } +} + +impl BatchContainer for HuffmanContainer { + type OwnedItem = Vec; + type ReadItem<'a> = Wrapped<'a, B>; + fn copy(&mut self, item: Self::ReadItem<'_>) { match item.decode() { Ok(decoded) => { @@ -152,7 +150,7 @@ impl Default for HuffmanContainer { mod wrapper { - use crate::trace::MyTrait; + use crate::trace::IntoOwned; use super::Encoded; pub struct Wrapped<'a, B: Ord> { @@ -205,7 +203,7 @@ mod wrapper { self.partial_cmp(other).unwrap() } } - impl<'a, B: Ord+Clone> MyTrait<'a> for Wrapped<'a, B> { + impl<'a, B: Ord+Clone> IntoOwned<'a> for Wrapped<'a, B> { type Owned = Vec; fn into_owned(self) -> Self::Owned { match self.decode() { @@ -220,14 +218,8 @@ mod wrapper { Err(bytes) => other.extend_from_slice(bytes), } } - fn compare(&self, other: &Self::Owned) -> std::cmp::Ordering { - match self.decode() { - Ok(decode) => decode.partial_cmp(other.iter()).unwrap(), - Err(bytes) => bytes.cmp(&other[..]), - } - } - fn borrow_as(other: &'a Self::Owned) -> Self { - Self { inner: Err(&other[..]) } + fn borrow_as(owned: &'a Self::Owned) -> Self { + Self { inner: Err(&owned[..]) } } } } diff --git a/src/trace/implementations/mod.rs b/src/trace/implementations/mod.rs index d0e4a459a..ea14b09ce 100644 --- a/src/trace/implementations/mod.rs +++ b/src/trace/implementations/mod.rs @@ -52,7 +52,6 @@ pub use self::ord_neu::OrdValSpine as ValSpine; pub use self::ord_neu::OrdKeySpine as KeySpine; use std::borrow::{ToOwned}; -use std::cmp::Ordering; use timely::Container; use timely::container::columnation::{Columnation, TimelyStack}; @@ -86,21 +85,22 @@ where type Diff = R; } +use crate::trace::implementations::containers::Push; + /// A type with opinions on how updates should be laid out. pub trait Layout { /// The represented update. type Target: Update + ?Sized; /// Container for update keys. - type KeyContainer: - BatchContainer::Key>; + type KeyContainer: BatchContainer::Key> + Push<::Key>; /// Container for update vals. - type ValContainer: - BatchContainer::Val>; + type ValContainer: BatchContainer::Val> + Push<::Val>; /// Container for update vals. type UpdContainer: - for<'a> BatchContainer::Time, ::Diff), ReadItem<'a> = &'a (::Time, ::Diff)>; + Push<(::Time, ::Diff)> + + for<'a> BatchContainer = &'a (::Time, ::Diff), OwnedItem = (::Time, ::Diff)>; /// Container for offsets. - type OffsetContainer: BatchContainer; + type OffsetContainer: BatchContainer + Push; } /// A layout that uses vectors @@ -144,7 +144,7 @@ where /// Examples include types that implement `Clone` who prefer pub trait PreferredContainer : ToOwned { /// The preferred container for the type. - type Container: BatchContainer; + type Container: BatchContainer + Push; } impl PreferredContainer for T { @@ -195,7 +195,7 @@ where use std::convert::TryInto; use std::ops::Deref; use abomonation_derive::Abomonation; -use crate::trace::cursor::MyTrait; +use crate::trace::cursor::IntoOwned; /// A list of unsigned integers that uses `u32` elements as long as they are small enough, and switches to `u64` once they are not. #[derive(Eq, PartialEq, Ord, PartialOrd, Clone, Abomonation)] @@ -293,7 +293,7 @@ impl PushInto for usize { } } -/// Helper struct to provide `MyTrait` for `Copy` types. +/// Helper struct to provide `IntoOwned` for `Copy` types. #[derive(Eq, PartialEq, Ord, PartialOrd, Clone, Copy)] pub struct Wrapper(T); @@ -305,7 +305,7 @@ impl Deref for Wrapper { } } -impl<'a, T: Copy + Ord> MyTrait<'a> for Wrapper { +impl<'a, T: Copy + Ord> IntoOwned<'a> for Wrapper { type Owned = T; fn into_owned(self) -> Self::Owned { @@ -316,27 +316,21 @@ impl<'a, T: Copy + Ord> MyTrait<'a> for Wrapper { *other = self.0; } - fn compare(&self, other: &Self::Owned) -> Ordering { - self.0.cmp(other) + fn borrow_as(owned: &'a Self::Owned) -> Self { + Self(*owned) } +} - fn borrow_as(other: &'a Self::Owned) -> Self { - Self(*other) +impl Push for OffsetList { + fn push(&mut self, item: usize) { + self.push(item); } } impl BatchContainer for OffsetList { - type PushItem = usize; + type OwnedItem = usize; type ReadItem<'a> = Wrapper; - fn push(&mut self, item: Self::PushItem) { - self.push(item); - } - - fn copy_push(&mut self, item: &Self::PushItem) { - self.push(*item); - } - fn copy(&mut self, item: Self::ReadItem<'_>) { self.push(item.0); } @@ -454,11 +448,11 @@ where } fn key_eq(this: &&K::Owned, other: <::Container as BatchContainer>::ReadItem<'_>) -> bool { - other.equals(this) + other.eq(&<<::Container as BatchContainer>::ReadItem<'_> as IntoOwned>::borrow_as(this)) } fn val_eq(this: &&V::Owned, other: <::Container as BatchContainer>::ReadItem<'_>) -> bool { - other.equals(this) + other.eq(&<<::Container as BatchContainer>::ReadItem<'_> as IntoOwned>::borrow_as(this)) } } @@ -471,24 +465,27 @@ pub mod containers { use timely::container::PushInto; use std::borrow::ToOwned; - use crate::trace::MyTrait; + use crate::trace::IntoOwned; + + /// Supports the ability to receive an item of type `T`. + pub trait Push { + /// Pushes the item into `self`. + fn push(&mut self, item: T); + } + + impl Push for TimelyStack { + fn push(&mut self, item: T) { + self.copy(&item); + } + } /// A general-purpose container resembling `Vec`. pub trait BatchContainer: 'static { - /// The type of contained item. - /// - /// The container only supplies references to the item, so it needn't be sized. - type PushItem; + /// An type that all `Self::ReadItem<'_>` can be converted into. + type OwnedItem; /// The type that can be read back out of the container. - type ReadItem<'a>: Copy + MyTrait<'a, Owned = Self::PushItem> + for<'b> PartialOrd>; - /// Inserts an owned item. - fn push(&mut self, item: Self::PushItem) { - self.copy_push(&item); - } - /// Inserts an owned item. - fn copy_push(&mut self, item: &Self::PushItem) { - self.copy(MyTrait::borrow_as(item)); - } + type ReadItem<'a>: Copy + IntoOwned<'a, Owned = Self::OwnedItem> + Ord + for<'b> PartialOrd>; + /// Inserts a borrowed item. fn copy(&mut self, item: Self::ReadItem<'_>); /// Extends from a range of items in another`Self`. @@ -563,18 +560,18 @@ pub mod containers { } } + impl Push for Vec { + fn push(&mut self, item: T) { + self.push(item); + } + } + // All `T: Clone` also implement `ToOwned`, but without the constraint Rust // struggles to understand why the owned type must be `T` (i.e. the one blanket impl). impl BatchContainer for Vec { - type PushItem = T; - type ReadItem<'a> = &'a Self::PushItem; + type OwnedItem = T; + type ReadItem<'a> = &'a Self::OwnedItem; - fn push(&mut self, item: T) { - self.push(item); - } - fn copy_push(&mut self, item: &T) { - self.copy(item); - } fn copy(&mut self, item: &T) { self.push(item.clone()); } @@ -598,12 +595,9 @@ pub mod containers { // The `ToOwned` requirement exists to satisfy `self.reserve_items`, who must for now // be presented with the actual contained type, rather than a type that borrows into it. impl + 'static> BatchContainer for TimelyStack { - type PushItem = T; - type ReadItem<'a> = &'a Self::PushItem; + type OwnedItem = T; + type ReadItem<'a> = &'a Self::OwnedItem; - fn copy_push(&mut self, item: &Self::PushItem) { - self.copy(item); - } fn copy(&mut self, item: &T) { self.copy(item); } @@ -653,21 +647,22 @@ pub mod containers { } } - impl BatchContainer for SliceContainer - where - B: Ord + Clone + Sized + 'static, - { - type PushItem = Vec; - type ReadItem<'a> = &'a [B]; + impl Push> for SliceContainer { fn push(&mut self, item: Vec) { for x in item.into_iter() { self.inner.push(x); } self.offsets.push(self.inner.len()); } - fn copy_push(&mut self, item: &Vec) { - self.copy(&item[..]); - } + } + + impl BatchContainer for SliceContainer + where + B: Ord + Clone + Sized + 'static, + { + type OwnedItem = Vec; + type ReadItem<'a> = &'a [B]; + fn copy(&mut self, item: Self::ReadItem<'_>) { for x in item.iter() { self.inner.copy(x); diff --git a/src/trace/implementations/option_container.rs b/src/trace/implementations/option_container.rs index 18f93de74..a6d2eff03 100644 --- a/src/trace/implementations/option_container.rs +++ b/src/trace/implementations/option_container.rs @@ -1,6 +1,6 @@ //! A container optimized for identical contents. -use crate::trace::cursor::MyTrait; +use crate::trace::cursor::IntoOwned; use crate::trace::implementations::BatchContainer; /// A container that effectively represents default values. @@ -14,15 +14,13 @@ pub struct OptionContainer { container: C, } -impl BatchContainer for OptionContainer +use crate::trace::implementations::containers::Push; +impl Push for OptionContainer where - C: BatchContainer, - C::PushItem: Default + Ord, + C: BatchContainer + Push, + C::OwnedItem: Default + Ord, { - type PushItem = C::PushItem; - type ReadItem<'a> = OptionWrapper<'a, C>; - - fn push(&mut self, item: Self::PushItem) { + fn push(&mut self, item: C::OwnedItem) { if item == Default::default() && self.container.is_empty() { self.defaults += 1; } @@ -30,8 +28,18 @@ where self.container.push(item) } } +} + +impl BatchContainer for OptionContainer +where + C: BatchContainer , + C::OwnedItem: Default + Ord, +{ + type OwnedItem = C::OwnedItem; + type ReadItem<'a> = OptionWrapper<'a, C>; + fn copy<'a>(&mut self, item: Self::ReadItem<'a>) { - if item.equals(&Default::default()) && self.container.is_empty() { + if item.eq(&IntoOwned::borrow_as(&Default::default())) && self.container.is_empty() { self.defaults += 1; } else { @@ -39,7 +47,7 @@ where self.container.copy(item); } else { - self.container.push(Default::default()); + self.container.copy(IntoOwned::borrow_as(&Default::default())); } } } @@ -82,22 +90,22 @@ impl<'a, C: BatchContainer> Clone for OptionWrapper<'a, C> { use std::cmp::Ordering; impl<'a, 'b, C: BatchContainer> PartialEq> for OptionWrapper<'b, C> where - C::PushItem: Default + Ord, + C::OwnedItem: Default + Ord, { fn eq(&self, other: &OptionWrapper<'a, C>) -> bool { match (&self.inner, &other.inner) { (None, None) => true, - (None, Some(item2)) => item2.equals(&Default::default()), - (Some(item1), None) => item1.equals(&Default::default()), + (None, Some(item2)) => item2.eq(& as IntoOwned>::borrow_as(&Default::default())), + (Some(item1), None) => item1.eq(& as IntoOwned>::borrow_as(&Default::default())), (Some(item1), Some(item2)) => item1.eq(item2) } } } -impl<'a, C: BatchContainer> Eq for OptionWrapper<'a, C> where -C::PushItem: Default + Ord -{ } + +impl<'a, C: BatchContainer> Eq for OptionWrapper<'a, C> where C::OwnedItem: Default + Ord { } + impl<'a, 'b, C: BatchContainer> PartialOrd> for OptionWrapper<'b, C> where -C::PushItem: Default + Ord, +C::OwnedItem: Default + Ord, { fn partial_cmp(&self, other: &OptionWrapper<'a, C>) -> Option { let default = Default::default(); @@ -110,7 +118,7 @@ C::PushItem: Default + Ord, } } impl<'a, C: BatchContainer> Ord for OptionWrapper<'a, C> where -C::PushItem: Default + Ord, +C::OwnedItem: Default + Ord, { fn cmp(&self, other: &Self) -> Ordering { self.partial_cmp(other).unwrap() @@ -118,11 +126,11 @@ C::PushItem: Default + Ord, } -impl<'a, C: BatchContainer> MyTrait<'a> for OptionWrapper<'a, C> +impl<'a, C: BatchContainer> IntoOwned<'a> for OptionWrapper<'a, C> where - C::PushItem : Default + Ord, + C::OwnedItem : Default + Ord, { - type Owned = C::PushItem; + type Owned = C::OwnedItem; fn into_owned(self) -> Self::Owned { self.inner.map(|r| r.into_owned()).unwrap_or_else(Default::default) @@ -135,17 +143,9 @@ where *other = Default::default(); } } - fn compare(&self, other: &Self::Owned) -> std::cmp::Ordering { - if let Some(item) = &self.inner { - item.compare(other) - } - else { - ::default().cmp(other) - } - } - fn borrow_as(other: &'a Self::Owned) -> Self { + fn borrow_as(owned: &'a Self::Owned) -> Self { Self { - inner: Some(<_>::borrow_as(other)) + inner: Some(IntoOwned::borrow_as(owned)) } } } diff --git a/src/trace/implementations/ord_neu.rs b/src/trace/implementations/ord_neu.rs index ddc8a4409..a83cf728e 100644 --- a/src/trace/implementations/ord_neu.rs +++ b/src/trace/implementations/ord_neu.rs @@ -72,9 +72,11 @@ mod val_batch { use timely::container::PushInto; use timely::progress::{Antichain, frontier::AntichainRef}; + use crate::trace::implementations::containers::Push; + use crate::trace::{Batch, BatchReader, Builder, Cursor, Description, Merger}; use crate::trace::implementations::{BatchContainer, BuilderInput}; - use crate::trace::cursor::MyTrait; + use crate::trace::cursor::IntoOwned; use super::{Layout, Update}; @@ -416,7 +418,7 @@ mod val_batch { if !self.update_stash.is_empty() { // If there is a single element, equal to a just-prior recorded update, // we push nothing and report an unincremented offset to encode this case. - if self.update_stash.len() == 1 && self.result.updates.last().map(|ud| self.update_stash.last().unwrap().equals(ud)).unwrap_or(false) { + if self.update_stash.len() == 1 && self.result.updates.last().map(|ud| self.update_stash.last().unwrap().eq(IntoOwned::borrow_as(ud))).unwrap_or(false) { // Just clear out update_stash, as we won't drain it here. self.update_stash.clear(); self.singletons += 1; @@ -620,9 +622,10 @@ mod key_batch { use timely::container::PushInto; use timely::progress::{Antichain, frontier::AntichainRef}; + use crate::trace::implementations::containers::Push; use crate::trace::{Batch, BatchReader, Builder, Cursor, Description, Merger}; use crate::trace::implementations::{BatchContainer, BuilderInput}; - use crate::trace::cursor::MyTrait; + use crate::trace::cursor::IntoOwned; use super::{Layout, Update}; diff --git a/src/trace/implementations/rhh.rs b/src/trace/implementations/rhh.rs index 60ed6afd4..b5cc93ffa 100644 --- a/src/trace/implementations/rhh.rs +++ b/src/trace/implementations/rhh.rs @@ -83,7 +83,6 @@ impl Hashable for &HashWrapper { mod val_batch { - use std::borrow::Borrow; use std::convert::TryInto; use std::marker::PhantomData; use abomonation_derive::Abomonation; @@ -92,9 +91,11 @@ mod val_batch { use crate::hashable::Hashable; + use crate::trace::implementations::containers::Push; + use crate::trace::{Batch, BatchReader, Builder, Cursor, Description, Merger}; use crate::trace::implementations::{BatchContainer, BuilderInput}; - use crate::trace::cursor::MyTrait; + use crate::trace::cursor::IntoOwned; use super::{Layout, Update, HashOrdered}; @@ -183,8 +184,8 @@ mod val_batch { /// If `offset` is specified, we will insert it at the appropriate location. If it is not specified, /// we leave `keys_offs` ready to receive it as the next `push`. This is so that builders that may /// not know the final offset at the moment of key insertion can prepare for receiving the offset. - fn insert_key(&mut self, key: &::Key, offset: Option) { - let desired = self.desired_location(key); + fn insert_key(&mut self, key: ::Key, offset: Option) { + let desired = self.desired_location(&key); // Were we to push the key now, it would be at `self.keys.len()`, so while that is wrong, // push additional blank entries in. while self.keys.len() < desired { @@ -196,7 +197,7 @@ mod val_batch { // Now we insert the key. Even if it is no longer the desired location because of contention. // If an offset has been supplied we insert it, and otherwise leave it for future determination. - self.keys.copy_push(key); + self.keys.push(key); if let Some(offset) = offset { self.keys_offs.push(offset); } @@ -320,8 +321,6 @@ mod val_batch { /// description description: Description<::Time>, - /// Owned key for copying into. - key_owned: <::Key as ToOwned>::Owned, /// Local stash of updates, to use for consolidation. /// /// We could emulate a `ChangeBatch` here, with related compaction smarts. @@ -375,7 +374,6 @@ mod val_batch { key_cursor2: 0, result: storage, description, - key_owned: Default::default(), update_stash: Vec::new(), singletons: 0, } @@ -451,8 +449,7 @@ mod val_batch { // If we have pushed any values, copy the key as well. if self.result.vals.len() > init_vals { - source.keys.index(cursor).clone_onto(&mut self.key_owned); - self.result.insert_key(&self.key_owned, Some(self.result.vals.len())); + self.result.insert_key(source.keys.index(cursor).into_owned(), Some(self.result.vals.len())); } } /// Merge the next key in each of `source1` and `source2` into `self`, updating the appropriate cursors. @@ -472,8 +469,7 @@ mod val_batch { let (lower1, upper1) = source1.values_for_key(self.key_cursor1); let (lower2, upper2) = source2.values_for_key(self.key_cursor2); if let Some(off) = self.merge_vals((source1, lower1, upper1), (source2, lower2, upper2)) { - source1.keys.index(self.key_cursor1).clone_onto(&mut self.key_owned); - self.result.insert_key(&self.key_owned, Some(off)); + self.result.insert_key(source1.keys.index(self.key_cursor1).into_owned(), Some(off)); } // Increment cursors in either case; the keys are merged. self.key_cursor1 += 1; @@ -578,7 +574,7 @@ mod val_batch { if !self.update_stash.is_empty() { // If there is a single element, equal to a just-prior recorded update, // we push nothing and report an unincremented offset to encode this case. - if self.update_stash.len() == 1 && self.result.updates.last().map(|l| l.equals(self.update_stash.last().unwrap())).unwrap_or(false) { + if self.update_stash.len() == 1 && self.result.updates.last().map(|l| l.eq(IntoOwned::borrow_as(self.update_stash.last().unwrap()))).unwrap_or(false) { // Just clear out update_stash, as we won't drain it here. self.update_stash.clear(); self.singletons += 1; @@ -805,7 +801,7 @@ mod val_batch { self.push_update(time, diff); val.push_into(&mut self.result.vals); // Insert the key, but with no specified offset. - self.result.insert_key(key.borrow(), None); + self.result.insert_key(key, None); } } } diff --git a/src/trace/mod.rs b/src/trace/mod.rs index 00b72fc1d..f73730a23 100644 --- a/src/trace/mod.rs +++ b/src/trace/mod.rs @@ -19,7 +19,7 @@ use timely::progress::{Antichain, frontier::AntichainRef}; use timely::progress::Timestamp; use crate::logging::DifferentialEvent; -use crate::trace::cursor::MyTrait; +use crate::trace::cursor::IntoOwned; use crate::difference::Semigroup; use crate::lattice::Lattice; // use ::difference::Semigroup; @@ -52,11 +52,11 @@ pub type ExertionLogic = std::sync::Arc Fn(&'a [(usize, usize, usize pub trait TraceReader { /// Key by which updates are indexed. - type Key<'a>: Copy + Clone + MyTrait<'a, Owned = Self::KeyOwned>; + type Key<'a>: Copy + Clone + Ord + IntoOwned<'a, Owned = Self::KeyOwned>; /// Owned version of the above. type KeyOwned: Ord + Clone; /// Values associated with keys. - type Val<'a>: Copy + Clone + MyTrait<'a>; + type Val<'a>: Copy + Clone + IntoOwned<'a>; /// Timestamps associated with updates type Time: Timestamp + Lattice + Ord + Clone; /// Associated update. @@ -258,11 +258,11 @@ where Self: ::std::marker::Sized, { /// Key by which updates are indexed. - type Key<'a>: Copy + Clone + MyTrait<'a, Owned = Self::KeyOwned>; + type Key<'a>: Copy + Clone + Ord + IntoOwned<'a, Owned = Self::KeyOwned>; /// Owned version of the above. type KeyOwned: Ord + Clone; /// Values associated with keys. - type Val<'a>: Copy + Clone + MyTrait<'a>; + type Val<'a>: Copy + Clone + IntoOwned<'a>; /// Timestamps associated with updates type Time: Timestamp + Lattice + Ord + Clone; /// Associated update.