Skip to content
This repository has been archived by the owner on Oct 30, 2024. It is now read-only.

Commit

Permalink
Merge pull request #8 from TimelyDataflow/master
Browse files Browse the repository at this point in the history
Update
  • Loading branch information
antiguru authored Apr 18, 2024
2 parents 448e158 + c564e8f commit 023857a
Show file tree
Hide file tree
Showing 22 changed files with 259 additions and 250 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/deploy.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ jobs:
deploy:
runs-on: ubuntu-22.04
steps:
- uses: actions/checkout@v3
- uses: actions/checkout@v4
- run: cargo install mdbook --version 0.4.31
- run: cd mdbook && mdbook build
- uses: JamesIves/github-pages-deploy-action@v4
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,15 @@ jobs:
toolchain:
- stable
- 1.72
name: cargo test on ${{ matrix.os }}
name: cargo test on ${{ matrix.os }}, rust ${{ matrix.toolchain }}
runs-on: ${{ matrix.os }}-latest
steps:
- uses: actions/checkout@v4
- uses: actions-rust-lang/setup-rust-toolchain@v1
with:
toolchain: ${{ matrix.toolchain }}
- name: Cargo test
run: cargo test
run: cargo test --workspace --all-targets

# Check formatting with rustfmt
mdbook:
Expand Down
8 changes: 4 additions & 4 deletions doop/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#![allow(non_snake_case)]
#![allow(non_snake_case, dead_code)]

use std::collections::HashMap;
use std::rc::Rc;
Expand Down Expand Up @@ -145,7 +145,7 @@ fn load<'a>(filename: &str, interner: Rc<RefCell<StringInterner>>) -> impl Itera
})
}

fn load1<'a>(index: usize, prefix: &str, filename: &str, interner: Rc<RefCell<StringInterner>>) -> impl Iterator<Item=((Symbol), Time, Diff)>+'a {
fn load1<'a>(index: usize, prefix: &str, filename: &str, interner: Rc<RefCell<StringInterner>>) -> impl Iterator<Item=(Symbol, Time, Diff)>+'a {
read_file(&format!("{}{}", prefix, filename))
.filter(move |_| index == 0)
.map(move |line| {
Expand Down Expand Up @@ -791,7 +791,7 @@ fn main() {
let SupertypeOf = SupertypeOf.enter(scope);

// Required by all
let mut Reachable = Relation::<_,(Method)>::new(scope);
let mut Reachable = Relation::<_,Method>::new(scope);

// NOTE: Common subexpression.
let Reachable_Invocation =
Expand All @@ -805,7 +805,7 @@ fn main() {
// let Reachable = ReachableFinal.clone();

// Class initialization
let mut InitializedClass = Relation::<_,(Type)>::new(scope);
let mut InitializedClass = Relation::<_,Type>::new(scope);

// ClassInitializer(?type, ?method) :- basic.MethodImplemented("<clinit>", "void()", ?type, ?method).
let temp1 = interner.borrow_mut().intern("<clinit>");
Expand Down
9 changes: 8 additions & 1 deletion examples/accumulate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,14 @@ fn main() {

let mut input = worker.dataflow::<(), _, _>(|scope| {
let (input, data) = scope.new_collection::<_, isize>();
data.consolidate();

use timely::dataflow::Scope;
scope.iterative::<u32,_,_>(|inner| {
data.enter_at(inner, |_| 0)
.consolidate()
.leave()
});

input
});

Expand Down
4 changes: 3 additions & 1 deletion experiments/src/bin/graphs-interactive-alt.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
#![allow(dead_code)]

use rand::{Rng, SeedableRng, StdRng};

use timely::dataflow::*;
Expand Down Expand Up @@ -389,4 +391,4 @@ where G::Timestamp: Lattice {
.concat(&prop)
.reduce(|_, s, t| { t.push((*s[0].0, 1)); })
})
}
}
4 changes: 3 additions & 1 deletion experiments/src/bin/graphs-interactive-neu.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
#![allow(dead_code)]

use rand::{Rng, SeedableRng, StdRng};

use timely::dataflow::*;
Expand Down Expand Up @@ -392,4 +394,4 @@ where G::Timestamp: Lattice+Ord {

reached.leave()
})
}
}
6 changes: 3 additions & 3 deletions interactive/src/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ where
println!("\tTimely logging connection {} of {}", index, number);
let socket = listener.incoming().next().unwrap().unwrap();
socket.set_nonblocking(true).expect("failed to set nonblocking");
streams.push(EventReader::<Duration, (Duration, usize, TimelyEvent),_>::new(socket));
streams.push(EventReader::<Duration, Vec<(Duration, usize, TimelyEvent)>,_>::new(socket));
}

println!("\tAll logging connections established");
Expand All @@ -174,7 +174,7 @@ where
for _ in 0 .. number {
let socket = listener.incoming().next().unwrap().unwrap();
socket.set_nonblocking(true).expect("failed to set nonblocking");
streams.push(EventReader::<Duration, (Duration, usize, DifferentialEvent),_>::new(socket));
streams.push(EventReader::<Duration, Vec<(Duration, usize, DifferentialEvent)>,_>::new(socket));
}
}
crate::logging::publish_differential_logging(manager, worker, granularity, &name_as, streams);
Expand All @@ -195,4 +195,4 @@ where
pub fn serialize_into<W: Write>(&self, writer: W) {
bincode::serialize_into(writer, self).expect("bincode: serialization failed");
}
}
}
6 changes: 3 additions & 3 deletions interactive/src/logging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ where
V: ExchangeData+Hash+LoggingValue+Datum,
A: Allocate,
I : IntoIterator,
<I as IntoIterator>::Item: EventIterator<Duration, (Duration, usize, TimelyEvent)>+'static
<I as IntoIterator>::Item: EventIterator<Duration, Vec<(Duration, usize, TimelyEvent)>>+'static
{
let (operates, channels, schedule, messages, shutdown, park, text) =
worker.dataflow(move |scope| {
Expand Down Expand Up @@ -217,7 +217,7 @@ where
V: ExchangeData+Hash+LoggingValue+Datum,
A: Allocate,
I : IntoIterator,
<I as IntoIterator>::Item: EventIterator<Duration, (Duration, usize, DifferentialEvent)>+'static
<I as IntoIterator>::Item: EventIterator<Duration, Vec<(Duration, usize, DifferentialEvent)>>+'static
{
let (merge,batch) =
worker.dataflow(move |scope| {
Expand Down Expand Up @@ -280,4 +280,4 @@ where

manager.traces.set_unkeyed(&Plan::Source(format!("logs/{}/differential/arrange/batch", name)), &batch);
manager.traces.set_unkeyed(&Plan::Source(format!("logs/{}/differential/arrange/merge", name)), &merge);
}
}
10 changes: 3 additions & 7 deletions src/collection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -322,20 +322,16 @@ impl<G: Scope, D: Data, R: Semigroup> Collection<G, D, R> where G::Timestamp: Da
/// data.assert_eq(&result);
/// });
/// ```
pub fn enter_at<'a, T, F>(&self, child: &Iterative<'a, G, T>, initial: F) -> Collection<Iterative<'a, G, T>, D, R>
pub fn enter_at<'a, T, F>(&self, child: &Iterative<'a, G, T>, mut initial: F) -> Collection<Iterative<'a, G, T>, D, R>
where
T: Timestamp+Hash,
F: FnMut(&D) -> T + Clone + 'static,
G::Timestamp: Hash,
{

let mut initial1 = initial.clone();
let mut initial2 = initial.clone();

self.inner
.enter_at(child, move |x| initial1(&x.0))
.enter(child)
.map(move |(data, time, diff)| {
let new_time = Product::new(time, initial2(&data));
let new_time = Product::new(time, initial(&data));
(data, new_time, diff)
})
.as_collection()
Expand Down
Loading

0 comments on commit 023857a

Please sign in to comment.