-
Notifications
You must be signed in to change notification settings - Fork 276
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Introduce unordered timestamps do a dataflow #532
Comments
You should be able to send data with timestamp 9 after sending 10, because input.session(cap.delayed(&(round + rand() % 5))).give(round); which would use some random time within 5 of the current |
Thanks for the answer, Frank. I was not aware that delayed would not downgrade I find this implicit downgrade confusing. For example, my initial capability is 5: Data is sent with timestamp 8 and 6 from capability 5 input
.session(cap.delayed(&RootTimestamp::new(8)))
.give("data2");
input
.session(cap.delayed(&RootTimestamp::new(6)))
.give("data1"); Next, we replace capability 5 with capability 7: cap = cap.delayed(&RootTimestamp::new(7));
worker.step(); This cause the call of drop for cap with timestamp 5, correct? Will this capability replacement also allow the processing of data1 with timestamp 6 at some frontiered operator downstream after worker step? Or do I need to also downgrade capability 7 to cause data1 with timestamp 6 to process? cap = cap.delayed(&RootTimestamp::new(8)); |
It will. Strictly speaking, it only informs the system that this capability is no longer able to produce timestamp 6, but if this is the only input capability that you have, when you advance from 5 to 7, you should see the output for 6. Let me know if that does not happen! |
It happens! I managed to create an example of sending data out of order and advancing the frontier gradually: extern crate timely;
extern crate timely_communication;
use std::collections::HashMap;
use std::println;
use timely::dataflow::channels::pact::Pipeline;
use timely::dataflow::operators::*;
use timely::progress::timestamp::RootTimestamp;
use timely_communication::Configuration;
use timely::dataflow::operators::{Operator, Probe};
use timely::dataflow::ProbeHandle;
fn main() {
timely::execute(Configuration::Thread, |worker| {
let mut probe = ProbeHandle::new();
let (mut input, mut cap) = worker.dataflow(|scope| {
let (input, stream) = scope.new_unordered_input();
stream
.inspect_batch(move |t, xs| {
for x in xs.iter() {
println!("streamed {} @ {:?}", x, t)
}
})
.unary_frontier(Pipeline, "batcher", |_capability, _info| {
let mut buffer = HashMap::new();
move |input, output| {
while let Some((time, data)) = input.next() {
buffer
.entry(time.retain())
.or_insert(Vec::new())
.push(data.take());
}
for (key, val) in buffer.iter_mut() {
if !input.frontier().less_equal(key.time()) {
let mut session = output.session(key);
for mut batch in val.drain(..) {
for value in batch.drain(..) {
session.give(value);
}
}
}
}
buffer.retain(|_key, val| !val.is_empty());
}
})
.inspect_batch(move |t, xs| {
for x in xs.iter() {
println!("batched {} @ {:?}", x, t)
}
})
.probe_with(&mut probe);
input
});
cap = cap.delayed(&RootTimestamp::new(0));
input.session(cap.delayed(&RootTimestamp::new(2))).give(3);
input.session(cap.delayed(&RootTimestamp::new(0))).give(0);
input.session(cap.delayed(&RootTimestamp::new(5))).give(1);
input.session(cap.delayed(&RootTimestamp::new(5))).give(2);
worker.step();
println!("Replaces initial cap by 4");
cap = cap.delayed(&RootTimestamp::new(4));
while probe.less_than(&RootTimestamp::new(4)) {
worker.step();
}
println!("Replaces cap 4 by 5");
cap = cap.delayed(&RootTimestamp::new(5));
while probe.less_than(&RootTimestamp::new(5)) {
worker.step();
}
println!("Replaces cap 5 by 7");
cap = cap.delayed(&RootTimestamp::new(7));
while probe.less_than(&RootTimestamp::new(7)) {
worker.step();
}
println!("Finish");
})
.unwrap();
} output:
I should dive more into Timely's implementation to see how this works. |
How one can introduce data with unordered timestamps?
The example from
examples/unordered_input.rs
sends data with timestamps ordered.I cannot use
delayed
to send data with timestamp 9 after sending 10, for example.I am asking this because I would like to validate that an algorithm still works even if data is not in order.
The text was updated successfully, but these errors were encountered: