forked from TimelyDataflow/timely-dataflow
-
Notifications
You must be signed in to change notification settings - Fork 10
/
capture_send.rs
31 lines (24 loc) · 950 Bytes
/
capture_send.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
use timely::dataflow::operators::ToStream;
use timely::dataflow::operators::capture::Capture;
use rdkafka::config::ClientConfig;
use kafkaesque::EventProducer;
fn main() {
timely::execute_from_args(std::env::args(), |worker| {
// target topic name.
let topic = std::env::args().nth(1).unwrap();
let count = std::env::args().nth(2).unwrap().parse::<u64>().unwrap();
let brokers = "localhost:9092";
// Create Kafka stuff.
let mut producer_config = ClientConfig::new();
producer_config
.set("produce.offset.report", "true")
.set("bootstrap.servers", brokers);
let topic = format!("{}-{:?}", topic, worker.index());
let producer = EventProducer::new(producer_config, topic);
worker.dataflow::<u64,_,_>(|scope|
(0 .. count)
.to_stream(scope)
.capture_into(producer)
);
}).unwrap();
}