Skip to content

Commit

Permalink
Promote feedback core-variants
Browse files Browse the repository at this point in the history
Signed-off-by: Moritz Hoffmann <[email protected]>
  • Loading branch information
antiguru committed Mar 17, 2023
1 parent 60ae900 commit 5987d4c
Show file tree
Hide file tree
Showing 5 changed files with 11 additions and 38 deletions.
2 changes: 1 addition & 1 deletion timely/examples/barrier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ fn main() {
timely::execute_from_args(std::env::args().skip(2), move |worker| {

worker.dataflow(move |scope| {
let (handle, stream) = scope.feedback::<usize>(1);
let (handle, stream) = scope.feedback::<Vec<usize>>(1);
stream.unary_notify(
Pipeline,
"Barrier",
Expand Down
6 changes: 3 additions & 3 deletions timely/examples/bfs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ fn main() {
.to_stream(scope);

// define a loop variable, for the (node, worker) pairs.
let (handle, stream) = scope.feedback(1usize);
let (handle, stream) = scope.feedback::<Vec<_>>(1usize);

// use the stream of edges
graph.binary_notify(
Expand All @@ -58,7 +58,7 @@ fn main() {
// receive edges, start to sort them
input1.for_each(|time, data| {
notify.notify_at(time.retain());
edge_list.push(data.replace(Vec::new()));
edge_list.push(data.take());
});

// receive (node, worker) pairs, note any new ones.
Expand All @@ -68,7 +68,7 @@ fn main() {
notify.notify_at(time.retain());
Vec::new()
})
.push(data.replace(Vec::new()));
.push(data.take());
});

notify.for_each(|time, _num, _notify| {
Expand Down
2 changes: 1 addition & 1 deletion timely/examples/pagerank.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ fn main() {
let edge_stream = input.to_stream(scope);

// create a new feedback stream, which will be changes to ranks.
let (handle, rank_stream) = scope.feedback(1);
let (handle, rank_stream) = scope.feedback::<Vec<_>>(1);

// bring edges and ranks together!
let changes = edge_stream.binary_frontier(
Expand Down
37 changes: 5 additions & 32 deletions timely/src/dataflow/operators/feedback.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//! Create cycles in a timely dataflow graph.
use crate::{Container, Data};
use crate::Container;

use crate::progress::{Timestamp, PathSummary};
use crate::progress::frontier::Antichain;
Expand All @@ -15,29 +15,6 @@ use crate::dataflow::operators::generic::OutputWrapper;

/// Creates a `Stream` and a `Handle` to later bind the source of that `Stream`.
pub trait Feedback<G: Scope> {
/// Creates a `Stream` and a `Handle` to later bind the source of that `Stream`.
///
/// The resulting `Stream` will have its data defined by a future call to `connect_loop` with
/// its `Handle` passed as an argument. Data passed through the stream will have their
/// timestamps advanced by `summary`.
///
/// # Examples
/// ```
/// use timely::dataflow::Scope;
/// use timely::dataflow::operators::{Feedback, ConnectLoop, ToStream, Concat, Inspect, BranchWhen};
///
/// timely::example(|scope| {
/// // circulate 0..10 for 100 iterations.
/// let (handle, cycle) = scope.feedback(1);
/// (0..10).to_stream(scope)
/// .concat(&cycle)
/// .inspect(|x| println!("seen: {:?}", x))
/// .branch_when(|t| t < &100).1
/// .connect_loop(handle);
/// });
/// ```
fn feedback<D: Data>(&mut self, summary: <G::Timestamp as Timestamp>::Summary) -> (Handle<G, Vec<D>>, Stream<G, Vec<D>>);

/// Creates a [Stream] and a [Handle] to later bind the source of that `Stream`.
///
/// The resulting `Stream` will have its data defined by a future call to `connect_loop` with
Expand All @@ -51,15 +28,15 @@ pub trait Feedback<G: Scope> {
///
/// timely::example(|scope| {
/// // circulate 0..10 for 100 iterations.
/// let (handle, cycle) = scope.feedback_core::<Vec<_>>(1);
/// let (handle, cycle) = scope.feedback::<Vec<_>>(1);
/// (0..10).to_stream(scope)
/// .concat(&cycle)
/// .inspect(|x| println!("seen: {:?}", x))
/// .branch_when(|t| t < &100).1
/// .connect_loop(handle);
/// });
/// ```
fn feedback_core<D: Container>(&mut self, summary: <G::Timestamp as Timestamp>::Summary) -> (Handle<G, D>, Stream<G, D>);
fn feedback<D: Container>(&mut self, summary: <G::Timestamp as Timestamp>::Summary) -> (Handle<G, D>, Stream<G, D>);
}

/// Creates a `Stream` and a `Handle` to later bind the source of that `Stream`.
Expand Down Expand Up @@ -91,11 +68,7 @@ pub trait LoopVariable<'a, G: Scope, T: Timestamp> {
}

impl<G: Scope> Feedback<G> for G {
fn feedback<D: Data>(&mut self, summary: <G::Timestamp as Timestamp>::Summary) -> (Handle<G, Vec<D>>, Stream<G, Vec<D>>) {
self.feedback_core(summary)
}

fn feedback_core<D: Container>(&mut self, summary: <G::Timestamp as Timestamp>::Summary) -> (Handle<G, D>, Stream<G, D>) {
fn feedback<D: Container>(&mut self, summary: <G::Timestamp as Timestamp>::Summary) -> (Handle<G, D>, Stream<G, D>) {

let mut builder = OperatorBuilder::new("Feedback".to_owned(), self.clone());
let (output, stream) = builder.new_output();
Expand All @@ -106,7 +79,7 @@ impl<G: Scope> Feedback<G> for G {

impl<'a, G: Scope, T: Timestamp> LoopVariable<'a, G, T> for Iterative<'a, G, T> {
fn loop_variable<D: Container>(&mut self, summary: T::Summary) -> (Handle<Iterative<'a, G, T>, D>, Stream<Iterative<'a, G, T>, D>) {
self.feedback_core(Product::new(Default::default(), summary))
self.feedback(Product::new(Default::default(), summary))
}
}

Expand Down
2 changes: 1 addition & 1 deletion timely/tests/barrier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ fn barrier_sync_helper(comm_config: ::timely::CommunicationConfig) {
};
timely::execute(config, move |worker| {
worker.dataflow(move |scope| {
let (handle, stream) = scope.feedback::<u64>(1);
let (handle, stream) = scope.feedback::<Vec<u64>>(1);
stream.unary_notify(
Pipeline,
"Barrier",
Expand Down

0 comments on commit 5987d4c

Please sign in to comment.