forked from TimelyDataflow/timely-dataflow
-
Notifications
You must be signed in to change notification settings - Fork 10
/
Copy pathcapture_recv.rs
46 lines (39 loc) · 1.59 KB
/
capture_recv.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
use timely::dataflow::operators::Inspect;
use timely::dataflow::operators::capture::Replay;
use timely::dataflow::operators::Accumulate;
use rdkafka::config::ClientConfig;
use kafkaesque::EventConsumer;
fn main() {
timely::execute_from_args(std::env::args(), |worker| {
let topic = std::env::args().nth(1).unwrap();
let source_peers = std::env::args().nth(2).unwrap().parse::<usize>().unwrap();
let brokers = "localhost:9092";
// Create Kafka stuff.
let mut consumer_config = ClientConfig::new();
consumer_config
.set("produce.offset.report", "true")
.set("auto.offset.reset", "smallest")
.set("group.id", "example")
.set("enable.auto.commit", "false")
.set("enable.partition.eof", "false")
.set("auto.offset.reset", "earliest")
.set("session.timeout.ms", "6000")
.set("bootstrap.servers", &brokers);
// create replayers from disjoint partition of source worker identifiers.
let replayers =
(0 .. source_peers)
.filter(|i| i % worker.peers() == worker.index())
.map(|i| {
let topic = format!("{}-{:?}", topic, i);
EventConsumer::<_,u64>::new(consumer_config.clone(), topic)
})
.collect::<Vec<_>>();
worker.dataflow::<u64,_,_>(|scope| {
replayers
.replay_into(scope)
.count()
.inspect(|x| println!("replayed: {:?}", x))
;
})
}).unwrap(); // asserts error-free execution
}