Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update timely dependency #496

Merged
merged 1 commit into from
May 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions src/consolidation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
//! specific behavior you require.

use std::collections::VecDeque;
use timely::container::{ContainerBuilder, PushContainer, PushInto};
use timely::container::{ContainerBuilder, PushInto, SizableContainer};
use crate::Data;
use crate::difference::Semigroup;

Expand Down Expand Up @@ -173,12 +173,12 @@ where
///
/// Precondition: `current` is not allocated or has space for at least one element.
#[inline]
fn push<P: PushInto<Self::Container>>(&mut self, item: P) {
fn push<P>(&mut self, item: P) where Self::Container: PushInto<P> {
let preferred_capacity = <Vec<(D,T,R)>>::preferred_capacity();
if self.current.capacity() < preferred_capacity * 2 {
self.current.reserve(preferred_capacity * 2 - self.current.capacity());
}
item.push_into(&mut self.current);
self.current.push_into(item);
if self.current.len() == self.current.capacity() {
// Flush complete containers.
self.consolidate_and_flush_through(preferred_capacity);
Expand Down
32 changes: 32 additions & 0 deletions src/dynamic/pointstamp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,38 @@ pub struct PointStamp<T> {
vector: Vec<T>,
}

impl<T: Timestamp> PartialEq<[T]> for PointStamp<T> {
fn eq(&self, other: &[T]) -> bool {
self.vector.iter()
.zip(other.iter().chain(std::iter::repeat(&T::minimum())))
.all(|(t1, t2)| t1.eq(t2))
}
}

impl<T: Timestamp> PartialEq<PointStamp<T>> for [T] {
fn eq(&self, other: &PointStamp<T>) -> bool {
self.iter()
.zip(other.vector.iter().chain(std::iter::repeat(&T::minimum())))
.all(|(t1, t2)| t1.eq(t2))
}
}

impl<T: Timestamp> PartialOrder<[T]> for PointStamp<T> {
fn less_equal(&self, other: &[T]) -> bool {
self.vector.iter()
.zip(other.iter().chain(std::iter::repeat(&T::minimum())))
.all(|(t1, t2)| t1.less_equal(t2))
}
}

impl<T: Timestamp> PartialOrder<PointStamp<T>> for [T] {
fn less_equal(&self, other: &PointStamp<T>) -> bool {
self.iter()
.zip(other.vector.iter().chain(std::iter::repeat(&T::minimum())))
.all(|(t1, t2)| t1.less_equal(t2))
}
}

impl<T: Timestamp> PointStamp<T> {
/// Create a new sequence.
///
Expand Down
6 changes: 2 additions & 4 deletions src/operators/arrange/arrangement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -294,8 +294,7 @@ where
F: Fn(T2::Val<'_>) -> V + 'static,
T2::Diff: Abelian,
T2::Batch: Batch,
<T2::Builder as Builder>::Input: Container,
((T1::KeyOwned, V), T2::Time, T2::Diff): PushInto<<T2::Builder as Builder>::Input>,
<T2::Builder as Builder>::Input: Container + PushInto<((T1::KeyOwned, V), T2::Time, T2::Diff)>,
L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(V, T2::Diff)>)+'static,
{
self.reduce_core::<_,V,F,T2>(name, from, move |key, input, output, change| {
Expand All @@ -314,8 +313,7 @@ where
V: Data,
F: Fn(T2::Val<'_>) -> V + 'static,
T2::Batch: Batch,
<T2::Builder as Builder>::Input: Container,
((T1::KeyOwned, V), T2::Time, T2::Diff): PushInto<<T2::Builder as Builder>::Input>,
<T2::Builder as Builder>::Input: Container + PushInto<((T1::KeyOwned, V), T2::Time, T2::Diff)>,
L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(V, T2::Diff)>, &mut Vec<(V, T2::Diff)>)+'static,
{
use crate::operators::reduce::reduce_trace;
Expand Down
4 changes: 2 additions & 2 deletions src/operators/join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
use std::cmp::Ordering;
use timely::Container;

use timely::container::{ContainerBuilder, PushContainer, PushInto};
use timely::container::{ContainerBuilder, SizableContainer, PushInto};
use timely::order::PartialOrder;
use timely::progress::Timestamp;
use timely::dataflow::{Scope, StreamCore};
Expand Down Expand Up @@ -330,7 +330,7 @@ impl<CB: ContainerBuilder> ContainerBuilder for EffortBuilder<CB> {
type Container = CB::Container;

#[inline]
fn push<T: PushInto<Self::Container>>(&mut self, item: T) where Self::Container: PushContainer {
fn push<T>(&mut self, item: T) where Self::Container: SizableContainer + PushInto<T> {
self.1.push(item)
}

Expand Down
7 changes: 3 additions & 4 deletions src/operators/reduce.rs
Original file line number Diff line number Diff line change
Expand Up @@ -314,8 +314,7 @@ where
V: Data,
F: Fn(T2::Val<'_>) -> V + 'static,
T2::Batch: Batch,
<T2::Builder as Builder>::Input: Container,
((T1::KeyOwned, V), T2::Time, T2::Diff): PushInto<<T2::Builder as Builder>::Input>,
<T2::Builder as Builder>::Input: Container + PushInto<((T1::KeyOwned, V), T2::Time, T2::Diff)>,
L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(V,T2::Diff)>, &mut Vec<(V, T2::Diff)>)+'static,
{
let mut result_trace = None;
Expand Down Expand Up @@ -457,7 +456,7 @@ where
builders.push(T2::Builder::new());
}

let mut buffer = Default::default();
let mut buffer = <<T2 as Trace>::Batcher as crate::trace::Batcher>::Output::default();

// cursors for navigating input and output traces.
let (mut source_cursor, source_storage): (T1::Cursor, _) = source_trace.cursor_through(lower_limit.borrow()).expect("failed to acquire source cursor");
Expand Down Expand Up @@ -536,7 +535,7 @@ where
for index in 0 .. buffers.len() {
buffers[index].1.sort_by(|x,y| x.0.cmp(&y.0));
for (val, time, diff) in buffers[index].1.drain(..) {
((key.into_owned(), val), time, diff).push_into(&mut buffer);
buffer.push_into(((key.into_owned(), val), time, diff));
builders[index].push(&mut buffer);
buffer.clear();
}
Expand Down
8 changes: 4 additions & 4 deletions src/trace/implementations/huffman_container.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
//! A slice container that Huffman encodes its contents.

use std::collections::BTreeMap;
use timely::container::PushInto;

use crate::trace::implementations::{BatchContainer, OffsetList};

Expand Down Expand Up @@ -32,9 +33,8 @@ where
}
}

use crate::trace::implementations::containers::Push;
impl<B: Ord + Clone + 'static> Push<Vec<B>> for HuffmanContainer<B> {
fn push(&mut self, item: Vec<B>) {
impl<B: Ord + Clone + 'static> PushInto<Vec<B>> for HuffmanContainer<B> {
fn push_into(&mut self, item: Vec<B>) {
for x in item.iter() { *self.stats.entry(x.clone()).or_insert(0) += 1; }
match &mut self.inner {
Ok((huffman, bytes)) => {
Expand Down Expand Up @@ -537,4 +537,4 @@ mod huffman {
}
}

}
}
60 changes: 19 additions & 41 deletions src/trace/implementations/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,22 +85,20 @@ where
type Diff = R;
}

use crate::trace::implementations::containers::Push;

/// A type with opinions on how updates should be laid out.
pub trait Layout {
/// The represented update.
type Target: Update + ?Sized;
/// Container for update keys.
type KeyContainer: BatchContainer<OwnedItem = <Self::Target as Update>::Key> + Push<<Self::Target as Update>::Key>;
type KeyContainer: BatchContainer<OwnedItem = <Self::Target as Update>::Key> + PushInto<<Self::Target as Update>::Key>;
/// Container for update vals.
type ValContainer: BatchContainer<OwnedItem = <Self::Target as Update>::Val> + Push<<Self::Target as Update>::Val>;
type ValContainer: BatchContainer<OwnedItem = <Self::Target as Update>::Val> + PushInto<<Self::Target as Update>::Val>;
/// Container for update vals.
type UpdContainer:
Push<(<Self::Target as Update>::Time, <Self::Target as Update>::Diff)> +
PushInto<(<Self::Target as Update>::Time, <Self::Target as Update>::Diff)> +
for<'a> BatchContainer<ReadItem<'a> = &'a (<Self::Target as Update>::Time, <Self::Target as Update>::Diff), OwnedItem = (<Self::Target as Update>::Time, <Self::Target as Update>::Diff)>;
/// Container for offsets.
type OffsetContainer: BatchContainer<OwnedItem = usize> + Push<usize>;
type OffsetContainer: BatchContainer<OwnedItem = usize> + PushInto<usize>;
}

/// A layout that uses vectors
Expand Down Expand Up @@ -144,7 +142,7 @@ where
/// Examples include types that implement `Clone` who prefer
pub trait PreferredContainer : ToOwned {
/// The preferred container for the type.
type Container: BatchContainer<OwnedItem = Self::Owned> + Push<Self::Owned>;
type Container: BatchContainer<OwnedItem = Self::Owned> + PushInto<Self::Owned>;
}

impl<T: Ord + Clone + 'static> PreferredContainer for T {
Expand Down Expand Up @@ -287,12 +285,6 @@ impl<'a> Iterator for OffsetListIter<'a> {
}
}

impl PushInto<OffsetList> for usize {
fn push_into(self, target: &mut OffsetList) {
target.push(self);
}
}

/// Helper struct to provide `IntoOwned` for `Copy` types.
#[derive(Eq, PartialEq, Ord, PartialOrd, Clone, Copy)]
pub struct Wrapper<T: Copy>(T);
Expand Down Expand Up @@ -321,8 +313,8 @@ impl<'a, T: Copy + Ord> IntoOwned<'a> for Wrapper<T> {
}
}

impl Push<usize> for OffsetList {
fn push(&mut self, item: usize) {
impl PushInto<usize> for OffsetList {
fn push_into(&mut self, item: usize) {
self.push(item);
}
}
Expand Down Expand Up @@ -467,25 +459,17 @@ pub mod containers {
use std::borrow::ToOwned;
use crate::trace::IntoOwned;

/// Supports the ability to receive an item of type `T`.
pub trait Push<T> {
/// Pushes the item into `self`.
fn push(&mut self, item: T);
}

impl<T: Columnation> Push<T> for TimelyStack<T> {
fn push(&mut self, item: T) {
self.copy(&item);
}
}

/// A general-purpose container resembling `Vec<T>`.
pub trait BatchContainer: 'static {
/// An type that all `Self::ReadItem<'_>` can be converted into.
type OwnedItem;
/// The type that can be read back out of the container.
type ReadItem<'a>: Copy + IntoOwned<'a, Owned = Self::OwnedItem> + Ord + for<'b> PartialOrd<Self::ReadItem<'b>>;

/// Push an item into this container
fn push<D>(&mut self, item: D) where Self: PushInto<D> {
self.push_into(item);
}
/// Inserts a borrowed item.
fn copy(&mut self, item: Self::ReadItem<'_>);
/// Extends from a range of items in another`Self`.
Expand Down Expand Up @@ -560,12 +544,6 @@ pub mod containers {
}
}

impl<T> Push<T> for Vec<T> {
fn push(&mut self, item: T) {
self.push(item);
}
}

// All `T: Clone` also implement `ToOwned<Owned = T>`, but without the constraint Rust
// struggles to understand why the owned type must be `T` (i.e. the one blanket impl).
impl<T: Ord + Clone + 'static> BatchContainer for Vec<T> {
Expand Down Expand Up @@ -635,20 +613,20 @@ pub mod containers {
inner: Vec<B>,
}

impl<B: Ord + Clone + 'static> PushInto<SliceContainer<B>> for &[B] {
fn push_into(self, target: &mut SliceContainer<B>) {
target.copy(self)
impl<B: Ord + Clone + 'static> PushInto<&[B]> for SliceContainer<B> {
fn push_into(&mut self, item: &[B]) {
self.copy(item);
}
}

impl<B: Ord + Clone + 'static> PushInto<SliceContainer<B>> for &Vec<B> {
fn push_into(self, target: &mut SliceContainer<B>) {
target.copy(self)
impl<B: Ord + Clone + 'static> PushInto<&Vec<B>> for SliceContainer<B> {
fn push_into(&mut self, item: &Vec<B>) {
self.copy(item);
}
}

impl<B> Push<Vec<B>> for SliceContainer<B> {
fn push(&mut self, item: Vec<B>) {
impl<B> PushInto<Vec<B>> for SliceContainer<B> {
fn push_into(&mut self, item: Vec<B>) {
for x in item.into_iter() {
self.inner.push(x);
}
Expand Down
11 changes: 6 additions & 5 deletions src/trace/implementations/option_container.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,13 @@ pub struct OptionContainer<C> {
container: C,
}

use crate::trace::implementations::containers::Push;
impl<C: BatchContainer> Push<C::OwnedItem> for OptionContainer<C>
use timely::container::PushInto;
impl<C: BatchContainer> PushInto<C::OwnedItem> for OptionContainer<C>
where
C: BatchContainer + Push<C::OwnedItem>,
C: BatchContainer + PushInto<C::OwnedItem>,
C::OwnedItem: Default + Ord,
{
fn push(&mut self, item: C::OwnedItem) {
fn push_into(&mut self, item: C::OwnedItem) {
if item == Default::default() && self.container.is_empty() {
self.defaults += 1;
}
Expand Down Expand Up @@ -88,7 +88,8 @@ impl<'a, C: BatchContainer> Clone for OptionWrapper<'a, C> {


use std::cmp::Ordering;
impl<'a, 'b, C: BatchContainer> PartialEq<OptionWrapper<'a, C>> for OptionWrapper<'b, C>

impl<'a, 'b, C: BatchContainer> PartialEq<OptionWrapper<'a, C>> for OptionWrapper<'b, C>
where
C::OwnedItem: Default + Ord,
{
Expand Down
17 changes: 7 additions & 10 deletions src/trace/implementations/ord_neu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,6 @@ mod val_batch {
use timely::container::PushInto;
use timely::progress::{Antichain, frontier::AntichainRef};

use crate::trace::implementations::containers::Push;

use crate::trace::{Batch, BatchReader, Builder, Cursor, Description, Merger};
use crate::trace::implementations::{BatchContainer, BuilderInput};
use crate::trace::cursor::IntoOwned;
Expand Down Expand Up @@ -546,8 +544,8 @@ mod val_batch {
where
L: Layout,
CI: for<'a> BuilderInput<L, Time=<L::Target as Update>::Time, Diff=<L::Target as Update>::Diff>,
for<'a> CI::Key<'a>: PushInto<L::KeyContainer>,
for<'a> CI::Val<'a>: PushInto<L::ValContainer>,
for<'a> L::KeyContainer: PushInto<CI::Key<'a>>,
for<'a> L::ValContainer: PushInto<CI::Val<'a>>,
{

type Input = CI;
Expand Down Expand Up @@ -584,16 +582,16 @@ mod val_batch {
self.result.vals_offs.push(self.result.updates.len());
if self.singleton.take().is_some() { self.singletons += 1; }
self.push_update(time, diff);
val.push_into(&mut self.result.vals);
self.result.vals.push(val);
}
} else {
// New key; complete representation of prior key.
self.result.vals_offs.push(self.result.updates.len());
if self.singleton.take().is_some() { self.singletons += 1; }
self.result.keys_offs.push(self.result.vals.len());
self.push_update(time, diff);
val.push_into(&mut self.result.vals);
key.push_into(&mut self.result.keys);
self.result.vals.push(val);
self.result.keys.push(key);
}
}
}
Expand Down Expand Up @@ -622,7 +620,6 @@ mod key_batch {
use timely::container::PushInto;
use timely::progress::{Antichain, frontier::AntichainRef};

use crate::trace::implementations::containers::Push;
use crate::trace::{Batch, BatchReader, Builder, Cursor, Description, Merger};
use crate::trace::implementations::{BatchContainer, BuilderInput};
use crate::trace::cursor::IntoOwned;
Expand Down Expand Up @@ -992,7 +989,7 @@ mod key_batch {
where
L: Layout,
CI: for<'a> BuilderInput<L, Time=<L::Target as Update>::Time, Diff=<L::Target as Update>::Diff>,
for<'a> CI::Key<'a>: PushInto<L::KeyContainer>,
for<'a> L::KeyContainer: PushInto<CI::Key<'a>>,
{

type Input = CI;
Expand Down Expand Up @@ -1026,7 +1023,7 @@ mod key_batch {
// Remove any pending singleton, and if it was set increment our count.
if self.singleton.take().is_some() { self.singletons += 1; }
self.push_update(time, diff);
key.push_into(&mut self.result.keys);
self.result.keys.push(key);
}
}
}
Expand Down
Loading