diff --git a/timely/src/dataflow/operators/core/mod.rs b/timely/src/dataflow/operators/core/mod.rs index 7a54e918e..d2ae145d9 100644 --- a/timely/src/dataflow/operators/core/mod.rs +++ b/timely/src/dataflow/operators/core/mod.rs @@ -13,6 +13,7 @@ pub mod ok_err; pub mod probe; pub mod rc; pub mod reclock; +pub mod to_stream; pub mod unordered_input; pub use concat::{Concat, Concatenate}; @@ -25,5 +26,6 @@ pub use inspect::{Inspect, InspectCore}; pub use map::Map; pub use ok_err::OkErr; pub use probe::Probe; +pub use to_stream::ToStream; pub use reclock::Reclock; pub use unordered_input::{UnorderedInput, UnorderedHandle}; diff --git a/timely/src/dataflow/operators/core/to_stream.rs b/timely/src/dataflow/operators/core/to_stream.rs new file mode 100644 index 000000000..3775e0267 --- /dev/null +++ b/timely/src/dataflow/operators/core/to_stream.rs @@ -0,0 +1,58 @@ +//! Conversion to the `StreamCore` type from iterators. + +use crate::container::{PushContainer, PushInto}; +use crate::Container; +use crate::dataflow::operators::generic::operator::source; +use crate::dataflow::{StreamCore, Scope}; + +/// Converts to a timely [StreamCore]. +pub trait ToStream { + /// Converts to a timely [StreamCore]. + /// + /// # Examples + /// + /// ``` + /// use timely::dataflow::operators::core::ToStream; + /// use timely::dataflow::operators::Capture; + /// use timely::dataflow::operators::capture::Extract; + /// + /// let (data1, data2) = timely::example(|scope| { + /// let data1 = (0..3).to_stream(scope).capture(); + /// let data2 = vec![0,1,2].to_stream(scope).capture(); + /// (data1, data2) + /// }); + /// + /// assert_eq!(data1.extract(), data2.extract()); + /// ``` + fn to_stream(self, scope: &mut S) -> StreamCore; +} + +impl ToStream for I where I::Item: PushInto { + fn to_stream(self, scope: &mut S) -> StreamCore { + + source(scope, "ToStream", |capability, info| { + + // Acquire an activator, so that the operator can rescheduled itself. + let activator = scope.activator_for(&info.address[..]); + + let mut iterator = self.into_iter().fuse(); + let mut capability = Some(capability); + + move |output| { + + if let Some(element) = iterator.next() { + let mut session = output.session(capability.as_ref().unwrap()); + session.give(element); + let n = 256 * crate::container::buffer::default_capacity::(); + for element in iterator.by_ref().take(n - 1) { + session.give(element); + } + activator.activate(); + } + else { + capability = None; + } + } + }) + } +} diff --git a/timely/src/dataflow/operators/mod.rs b/timely/src/dataflow/operators/mod.rs index 7a63dc96a..453e8838f 100644 --- a/timely/src/dataflow/operators/mod.rs +++ b/timely/src/dataflow/operators/mod.rs @@ -17,7 +17,6 @@ pub use self::filter::Filter; pub use self::delay::Delay; pub use self::exchange::Exchange; pub use self::broadcast::Broadcast; -pub use self::to_stream::{ToStream, ToStreamCore}; pub use self::capture::Capture; pub use self::branch::{Branch, BranchWhen}; pub use self::result::ResultStream; @@ -44,7 +43,7 @@ pub mod delay; pub use self::core::exchange; pub mod broadcast; pub use self::core::probe::{self, Probe}; -pub mod to_stream; +pub use self::core::to_stream::ToStream; pub mod capture; pub mod branch; pub use self::core::ok_err::{self, OkErr}; diff --git a/timely/src/dataflow/operators/to_stream.rs b/timely/src/dataflow/operators/to_stream.rs deleted file mode 100644 index 555a540b0..000000000 --- a/timely/src/dataflow/operators/to_stream.rs +++ /dev/null @@ -1,109 +0,0 @@ -//! Conversion to the `Stream` type from iterators. - -use crate::Container; -use crate::progress::Timestamp; -use crate::Data; -use crate::dataflow::operators::generic::operator::source; -use crate::dataflow::{StreamCore, Stream, Scope}; - -/// Converts to a timely `Stream`. -pub trait ToStream { - /// Converts to a timely `Stream`. - /// - /// # Examples - /// - /// ``` - /// use timely::dataflow::operators::{ToStream, Capture}; - /// use timely::dataflow::operators::capture::Extract; - /// - /// let (data1, data2) = timely::example(|scope| { - /// let data1 = (0..3).to_stream(scope).capture(); - /// let data2 = vec![0,1,2].to_stream(scope).capture(); - /// (data1, data2) - /// }); - /// - /// assert_eq!(data1.extract(), data2.extract()); - /// ``` - fn to_stream>(self, scope: &mut S) -> Stream; -} - -impl ToStream for I where I::Item: Data { - fn to_stream>(self, scope: &mut S) -> Stream { - - source(scope, "ToStream", |capability, info| { - - // Acquire an activator, so that the operator can rescheduled itself. - let activator = scope.activator_for(&info.address[..]); - - let mut iterator = self.into_iter().fuse(); - let mut capability = Some(capability); - - move |output| { - - if let Some(element) = iterator.next() { - let mut session = output.session(capability.as_ref().unwrap()); - session.give(element); - let n = 256 * crate::container::buffer::default_capacity::(); - for element in iterator.by_ref().take(n - 1) { - session.give(element); - } - activator.activate(); - } - else { - capability = None; - } - } - }) - } -} - -/// Converts to a timely [StreamCore]. -pub trait ToStreamCore { - /// Converts to a timely [StreamCore]. - /// - /// # Examples - /// - /// ``` - /// use timely::dataflow::operators::{ToStreamCore, Capture}; - /// use timely::dataflow::operators::capture::Extract; - /// - /// let (data1, data2) = timely::example(|scope| { - /// let data1 = Some((0..3).collect::>()).to_stream_core(scope).capture(); - /// let data2 = Some(vec![0,1,2]).to_stream_core(scope).capture(); - /// (data1, data2) - /// }); - /// - /// assert_eq!(data1.extract(), data2.extract()); - /// ``` - fn to_stream_core>(self, scope: &mut S) -> StreamCore; -} - -impl ToStreamCore for I where I::Item: Container { - fn to_stream_core>(self, scope: &mut S) -> StreamCore { - - source(scope, "ToStreamCore", |capability, info| { - - // Acquire an activator, so that the operator can rescheduled itself. - let activator = scope.activator_for(&info.address[..]); - - let mut iterator = self.into_iter().fuse(); - let mut capability = Some(capability); - - move |output| { - - if let Some(mut element) = iterator.next() { - let mut session = output.session(capability.as_ref().unwrap()); - session.give_container(&mut element); - let n = 256; - for mut element in iterator.by_ref().take(n - 1) { - session.give_container(&mut element); - } - activator.activate(); - } - else { - capability = None; - } - } - }) - } -}