diff --git a/src/consolidation.rs b/src/consolidation.rs index 3801729e8..497d15b9a 100644 --- a/src/consolidation.rs +++ b/src/consolidation.rs @@ -11,7 +11,7 @@ //! specific behavior you require. use std::collections::VecDeque; -use timely::container::{ContainerBuilder, PushContainer, PushInto}; +use timely::container::{ContainerBuilder, PushInto, SizableContainer}; use crate::Data; use crate::difference::Semigroup; @@ -173,12 +173,12 @@ where /// /// Precondition: `current` is not allocated or has space for at least one element. #[inline] - fn push>(&mut self, item: P) { + fn push

(&mut self, item: P) where Self::Container: PushInto

{ let preferred_capacity = >::preferred_capacity(); if self.current.capacity() < preferred_capacity * 2 { self.current.reserve(preferred_capacity * 2 - self.current.capacity()); } - item.push_into(&mut self.current); + self.current.push_into(item); if self.current.len() == self.current.capacity() { // Flush complete containers. self.consolidate_and_flush_through(preferred_capacity); diff --git a/src/dynamic/pointstamp.rs b/src/dynamic/pointstamp.rs index bfaec29e9..14a256e33 100644 --- a/src/dynamic/pointstamp.rs +++ b/src/dynamic/pointstamp.rs @@ -26,6 +26,38 @@ pub struct PointStamp { vector: Vec, } +impl PartialEq<[T]> for PointStamp { + fn eq(&self, other: &[T]) -> bool { + self.vector.iter() + .zip(other.iter().chain(std::iter::repeat(&T::minimum()))) + .all(|(t1, t2)| t1.eq(t2)) + } +} + +impl PartialEq> for [T] { + fn eq(&self, other: &PointStamp) -> bool { + self.iter() + .zip(other.vector.iter().chain(std::iter::repeat(&T::minimum()))) + .all(|(t1, t2)| t1.eq(t2)) + } +} + +impl PartialOrder<[T]> for PointStamp { + fn less_equal(&self, other: &[T]) -> bool { + self.vector.iter() + .zip(other.iter().chain(std::iter::repeat(&T::minimum()))) + .all(|(t1, t2)| t1.less_equal(t2)) + } +} + +impl PartialOrder> for [T] { + fn less_equal(&self, other: &PointStamp) -> bool { + self.iter() + .zip(other.vector.iter().chain(std::iter::repeat(&T::minimum()))) + .all(|(t1, t2)| t1.less_equal(t2)) + } +} + impl PointStamp { /// Create a new sequence. /// diff --git a/src/operators/arrange/arrangement.rs b/src/operators/arrange/arrangement.rs index d8fb9720d..cdc3b2759 100644 --- a/src/operators/arrange/arrangement.rs +++ b/src/operators/arrange/arrangement.rs @@ -294,8 +294,7 @@ where F: Fn(T2::Val<'_>) -> V + 'static, T2::Diff: Abelian, T2::Batch: Batch, - ::Input: Container, - ((T1::KeyOwned, V), T2::Time, T2::Diff): PushInto<::Input>, + ::Input: Container + PushInto<((T1::KeyOwned, V), T2::Time, T2::Diff)>, L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(V, T2::Diff)>)+'static, { self.reduce_core::<_,V,F,T2>(name, from, move |key, input, output, change| { @@ -314,8 +313,7 @@ where V: Data, F: Fn(T2::Val<'_>) -> V + 'static, T2::Batch: Batch, - ::Input: Container, - ((T1::KeyOwned, V), T2::Time, T2::Diff): PushInto<::Input>, + ::Input: Container + PushInto<((T1::KeyOwned, V), T2::Time, T2::Diff)>, L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(V, T2::Diff)>, &mut Vec<(V, T2::Diff)>)+'static, { use crate::operators::reduce::reduce_trace; diff --git a/src/operators/join.rs b/src/operators/join.rs index 03bdc1e0d..d5132d606 100644 --- a/src/operators/join.rs +++ b/src/operators/join.rs @@ -6,7 +6,7 @@ use std::cmp::Ordering; use timely::Container; -use timely::container::{ContainerBuilder, PushContainer, PushInto}; +use timely::container::{ContainerBuilder, SizableContainer, PushInto}; use timely::order::PartialOrder; use timely::progress::Timestamp; use timely::dataflow::{Scope, StreamCore}; @@ -330,7 +330,7 @@ impl ContainerBuilder for EffortBuilder { type Container = CB::Container; #[inline] - fn push>(&mut self, item: T) where Self::Container: PushContainer { + fn push(&mut self, item: T) where Self::Container: SizableContainer + PushInto { self.1.push(item) } diff --git a/src/operators/reduce.rs b/src/operators/reduce.rs index ffaf1652b..6586fea8f 100644 --- a/src/operators/reduce.rs +++ b/src/operators/reduce.rs @@ -314,8 +314,7 @@ where V: Data, F: Fn(T2::Val<'_>) -> V + 'static, T2::Batch: Batch, - ::Input: Container, - ((T1::KeyOwned, V), T2::Time, T2::Diff): PushInto<::Input>, + ::Input: Container + PushInto<((T1::KeyOwned, V), T2::Time, T2::Diff)>, L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(V,T2::Diff)>, &mut Vec<(V, T2::Diff)>)+'static, { let mut result_trace = None; @@ -457,7 +456,7 @@ where builders.push(T2::Builder::new()); } - let mut buffer = Default::default(); + let mut buffer = <::Batcher as crate::trace::Batcher>::Output::default(); // cursors for navigating input and output traces. let (mut source_cursor, source_storage): (T1::Cursor, _) = source_trace.cursor_through(lower_limit.borrow()).expect("failed to acquire source cursor"); @@ -536,7 +535,7 @@ where for index in 0 .. buffers.len() { buffers[index].1.sort_by(|x,y| x.0.cmp(&y.0)); for (val, time, diff) in buffers[index].1.drain(..) { - ((key.into_owned(), val), time, diff).push_into(&mut buffer); + buffer.push_into(((key.into_owned(), val), time, diff)); builders[index].push(&mut buffer); buffer.clear(); } diff --git a/src/trace/implementations/huffman_container.rs b/src/trace/implementations/huffman_container.rs index 4fa67b2c7..83f20be0e 100644 --- a/src/trace/implementations/huffman_container.rs +++ b/src/trace/implementations/huffman_container.rs @@ -1,6 +1,7 @@ //! A slice container that Huffman encodes its contents. use std::collections::BTreeMap; +use timely::container::PushInto; use crate::trace::implementations::{BatchContainer, OffsetList}; @@ -32,9 +33,8 @@ where } } -use crate::trace::implementations::containers::Push; -impl Push> for HuffmanContainer { - fn push(&mut self, item: Vec) { +impl PushInto> for HuffmanContainer { + fn push_into(&mut self, item: Vec) { for x in item.iter() { *self.stats.entry(x.clone()).or_insert(0) += 1; } match &mut self.inner { Ok((huffman, bytes)) => { @@ -537,4 +537,4 @@ mod huffman { } } -} \ No newline at end of file +} diff --git a/src/trace/implementations/mod.rs b/src/trace/implementations/mod.rs index ea14b09ce..a41f6924c 100644 --- a/src/trace/implementations/mod.rs +++ b/src/trace/implementations/mod.rs @@ -85,22 +85,20 @@ where type Diff = R; } -use crate::trace::implementations::containers::Push; - /// A type with opinions on how updates should be laid out. pub trait Layout { /// The represented update. type Target: Update + ?Sized; /// Container for update keys. - type KeyContainer: BatchContainer::Key> + Push<::Key>; + type KeyContainer: BatchContainer::Key> + PushInto<::Key>; /// Container for update vals. - type ValContainer: BatchContainer::Val> + Push<::Val>; + type ValContainer: BatchContainer::Val> + PushInto<::Val>; /// Container for update vals. type UpdContainer: - Push<(::Time, ::Diff)> + + PushInto<(::Time, ::Diff)> + for<'a> BatchContainer = &'a (::Time, ::Diff), OwnedItem = (::Time, ::Diff)>; /// Container for offsets. - type OffsetContainer: BatchContainer + Push; + type OffsetContainer: BatchContainer + PushInto; } /// A layout that uses vectors @@ -144,7 +142,7 @@ where /// Examples include types that implement `Clone` who prefer pub trait PreferredContainer : ToOwned { /// The preferred container for the type. - type Container: BatchContainer + Push; + type Container: BatchContainer + PushInto; } impl PreferredContainer for T { @@ -287,12 +285,6 @@ impl<'a> Iterator for OffsetListIter<'a> { } } -impl PushInto for usize { - fn push_into(self, target: &mut OffsetList) { - target.push(self); - } -} - /// Helper struct to provide `IntoOwned` for `Copy` types. #[derive(Eq, PartialEq, Ord, PartialOrd, Clone, Copy)] pub struct Wrapper(T); @@ -321,8 +313,8 @@ impl<'a, T: Copy + Ord> IntoOwned<'a> for Wrapper { } } -impl Push for OffsetList { - fn push(&mut self, item: usize) { +impl PushInto for OffsetList { + fn push_into(&mut self, item: usize) { self.push(item); } } @@ -467,18 +459,6 @@ pub mod containers { use std::borrow::ToOwned; use crate::trace::IntoOwned; - /// Supports the ability to receive an item of type `T`. - pub trait Push { - /// Pushes the item into `self`. - fn push(&mut self, item: T); - } - - impl Push for TimelyStack { - fn push(&mut self, item: T) { - self.copy(&item); - } - } - /// A general-purpose container resembling `Vec`. pub trait BatchContainer: 'static { /// An type that all `Self::ReadItem<'_>` can be converted into. @@ -486,6 +466,10 @@ pub mod containers { /// The type that can be read back out of the container. type ReadItem<'a>: Copy + IntoOwned<'a, Owned = Self::OwnedItem> + Ord + for<'b> PartialOrd>; + /// Push an item into this container + fn push(&mut self, item: D) where Self: PushInto { + self.push_into(item); + } /// Inserts a borrowed item. fn copy(&mut self, item: Self::ReadItem<'_>); /// Extends from a range of items in another`Self`. @@ -560,12 +544,6 @@ pub mod containers { } } - impl Push for Vec { - fn push(&mut self, item: T) { - self.push(item); - } - } - // All `T: Clone` also implement `ToOwned`, but without the constraint Rust // struggles to understand why the owned type must be `T` (i.e. the one blanket impl). impl BatchContainer for Vec { @@ -635,20 +613,20 @@ pub mod containers { inner: Vec, } - impl PushInto> for &[B] { - fn push_into(self, target: &mut SliceContainer) { - target.copy(self) + impl PushInto<&[B]> for SliceContainer { + fn push_into(&mut self, item: &[B]) { + self.copy(item); } } - impl PushInto> for &Vec { - fn push_into(self, target: &mut SliceContainer) { - target.copy(self) + impl PushInto<&Vec> for SliceContainer { + fn push_into(&mut self, item: &Vec) { + self.copy(item); } } - impl Push> for SliceContainer { - fn push(&mut self, item: Vec) { + impl PushInto> for SliceContainer { + fn push_into(&mut self, item: Vec) { for x in item.into_iter() { self.inner.push(x); } diff --git a/src/trace/implementations/option_container.rs b/src/trace/implementations/option_container.rs index a6d2eff03..abc8ef541 100644 --- a/src/trace/implementations/option_container.rs +++ b/src/trace/implementations/option_container.rs @@ -14,13 +14,13 @@ pub struct OptionContainer { container: C, } -use crate::trace::implementations::containers::Push; -impl Push for OptionContainer +use timely::container::PushInto; +impl PushInto for OptionContainer where - C: BatchContainer + Push, + C: BatchContainer + PushInto, C::OwnedItem: Default + Ord, { - fn push(&mut self, item: C::OwnedItem) { + fn push_into(&mut self, item: C::OwnedItem) { if item == Default::default() && self.container.is_empty() { self.defaults += 1; } @@ -88,7 +88,8 @@ impl<'a, C: BatchContainer> Clone for OptionWrapper<'a, C> { use std::cmp::Ordering; -impl<'a, 'b, C: BatchContainer> PartialEq> for OptionWrapper<'b, C> + +impl<'a, 'b, C: BatchContainer> PartialEq> for OptionWrapper<'b, C> where C::OwnedItem: Default + Ord, { diff --git a/src/trace/implementations/ord_neu.rs b/src/trace/implementations/ord_neu.rs index a83cf728e..d519d0d63 100644 --- a/src/trace/implementations/ord_neu.rs +++ b/src/trace/implementations/ord_neu.rs @@ -72,8 +72,6 @@ mod val_batch { use timely::container::PushInto; use timely::progress::{Antichain, frontier::AntichainRef}; - use crate::trace::implementations::containers::Push; - use crate::trace::{Batch, BatchReader, Builder, Cursor, Description, Merger}; use crate::trace::implementations::{BatchContainer, BuilderInput}; use crate::trace::cursor::IntoOwned; @@ -546,8 +544,8 @@ mod val_batch { where L: Layout, CI: for<'a> BuilderInput::Time, Diff=::Diff>, - for<'a> CI::Key<'a>: PushInto, - for<'a> CI::Val<'a>: PushInto, + for<'a> L::KeyContainer: PushInto>, + for<'a> L::ValContainer: PushInto>, { type Input = CI; @@ -584,7 +582,7 @@ mod val_batch { self.result.vals_offs.push(self.result.updates.len()); if self.singleton.take().is_some() { self.singletons += 1; } self.push_update(time, diff); - val.push_into(&mut self.result.vals); + self.result.vals.push(val); } } else { // New key; complete representation of prior key. @@ -592,8 +590,8 @@ mod val_batch { if self.singleton.take().is_some() { self.singletons += 1; } self.result.keys_offs.push(self.result.vals.len()); self.push_update(time, diff); - val.push_into(&mut self.result.vals); - key.push_into(&mut self.result.keys); + self.result.vals.push(val); + self.result.keys.push(key); } } } @@ -622,7 +620,6 @@ mod key_batch { use timely::container::PushInto; use timely::progress::{Antichain, frontier::AntichainRef}; - use crate::trace::implementations::containers::Push; use crate::trace::{Batch, BatchReader, Builder, Cursor, Description, Merger}; use crate::trace::implementations::{BatchContainer, BuilderInput}; use crate::trace::cursor::IntoOwned; @@ -992,7 +989,7 @@ mod key_batch { where L: Layout, CI: for<'a> BuilderInput::Time, Diff=::Diff>, - for<'a> CI::Key<'a>: PushInto, + for<'a> L::KeyContainer: PushInto>, { type Input = CI; @@ -1026,7 +1023,7 @@ mod key_batch { // Remove any pending singleton, and if it was set increment our count. if self.singleton.take().is_some() { self.singletons += 1; } self.push_update(time, diff); - key.push_into(&mut self.result.keys); + self.result.keys.push(key); } } } diff --git a/src/trace/implementations/rhh.rs b/src/trace/implementations/rhh.rs index b5cc93ffa..b7e6c217e 100644 --- a/src/trace/implementations/rhh.rs +++ b/src/trace/implementations/rhh.rs @@ -90,9 +90,6 @@ mod val_batch { use timely::progress::{Antichain, frontier::AntichainRef}; use crate::hashable::Hashable; - - use crate::trace::implementations::containers::Push; - use crate::trace::{Batch, BatchReader, Builder, Cursor, Description, Merger}; use crate::trace::implementations::{BatchContainer, BuilderInput}; use crate::trace::cursor::IntoOwned; @@ -743,7 +740,7 @@ mod val_batch { ::Key: Default + HashOrdered, // RhhValBatch: Batch::Key, Val=::Val, Time=::Time, Diff=::Diff>, CI: for<'a> BuilderInput = ::Key, Time=::Time, Diff=::Diff>, - for<'a> CI::Val<'a>: PushInto, + for<'a> L::ValContainer: PushInto>, { type Input = CI; type Time = ::Time; @@ -791,7 +788,7 @@ mod val_batch { self.result.vals_offs.push(self.result.updates.len()); if self.singleton.take().is_some() { self.singletons += 1; } self.push_update(time, diff); - val.push_into(&mut self.result.vals); + self.result.vals.push(val); } } else { // New key; complete representation of prior key. @@ -799,7 +796,7 @@ mod val_batch { if self.singleton.take().is_some() { self.singletons += 1; } self.result.keys_offs.push(self.result.vals.len()); self.push_update(time, diff); - val.push_into(&mut self.result.vals); + self.result.vals.push(val); // Insert the key, but with no specified offset. self.result.insert_key(key, None); }