Well, not on Kafka. That wouldn't work. But Timely Dataflow can now happily round-trip data through Kafka queues using a fairly light amount of code. I thought I would show you how to do it!
As a note: this is happening now because some bugz just got fixed in librdkafka that were holding up the performance show. It's been mostly ready for a few months, just waiting for snazzy numbers by default. Also, I want to give a shout out to the rust-rdkafka
crate, which made things relatively painless once I understood things (actually: once I pestered the maintainer, who was very helpful).
To actually use timely with Kafka you'll need to grab a few things.
-
You'll need to grab Kafka, which just went 1.0 recently. You can go here to download your favorite flavor of Kafka. You can choose between a few different versions because different versions of Scala build different artifacts (???). I took the 2.11 version, as they recommended.
-
You'll want to check out the timely repo so that you can get at the
kafkaesque
project, which is where this code lives. It's not with core timely at the moment, because the build time imposition is significant.
There are some start-up actions you need to take to make sure that Kafka is up and running. In my experience, you'll want to open a few shells, typing first in one of them:
Echidnatron% bin/zookeeper-server-start.sh config/zookeeper.properties
This starts up ZooKeeper, on which Kafka relies. Now in another shell enter:
Echidnatron% bin/kafka-server-start.sh config/server.properties
This starts up Kafka, which we will use. Both of these commands produce walls of text that make it look like they are currently crashing, but they seem to actually do things.
Actually, if you want to check out the fault tolerance, ^C
the ZooKeeper instance, then try and shut down the Kafka instance with ^C
; hah! jokes on you, it is fault tolerant! It will keep trying to reconnect forever. Or, and I shit you not this actually happens, until you restart the ZooKeeper instance, at which point it will cleanly shut down.
We are going to start with a simple example, and then talk through what we had to do to make it work.
As it will turn out, we are mostly using timely's capture and replay infrastructure, which allow you to serialize streams to bytestreams and play them back, all with the timely dataflow progress tracking magic. If you want to read about that, there is a whole section on it in the timely dataflow mdbook.
Let's open up two more shells, because those Apache ones are probably busy printing things. In the first one, I recommend doing a clean release build:
Echidnatron% cargo build --release
Compiling futures v0.1.15
Compiling libc v0.2.30
...
Compiling rdkafka v0.13.0
Compiling kafkaesque v0.1.0 (file:///Users/mcsherry/Projects/timely-dataflow/kafkaesque)
Finished release [optimized] target(s) in 146.58 secs
Echidnatron%
So that takes a while. Ideally most of this is the sort of thing that doesn't need to get rebuilt too often.
What this built for us is two binaries: capture_send
and capture_recv
, which will respectively send a bunch of data to Kafka and read a bunch of data back out of Kafka. These are simple examples, so we will just send 0u64 .. limit
for a value of limit
specified at the command line. Let's do that now!
Echidnatron% time cargo run --release --bin capture_send -- numbat 100000000 -w5
Finished release [optimized] target(s) in 0.0 secs
Running `target/release/capture_send numbat 100000000 -w5`
allocating producer for topic "numbat-2"
allocating producer for topic "numbat-0"
allocating producer for topic "numbat-1"
allocating producer for topic "numbat-4"
allocating producer for topic "numbat-3"
cargo run --release --bin capture_send -- numbat 100000000 -w5 7.22s user 11.24s system 126% cpu 14.567 total
Echidnatron%
Ok wow, what did we do?
We ran the capture_send
binary, with arguments numbat
and 100000000
, which is one hundred million. We also have a -w5
on the end which tells timely dataflow to start up five workers. Each worker will create a topic and write 0u64 .. 100000000
to it (one hundred million 8 byte records). So, we write about 4GB of data in 15 seconds, which is about 266.66, repeating of course, megabytes per second.
What if we want to read it back out? We use the capture_recv
example, where instead of specifying the number of records, we specify the number of streams to look for (five, in the example above). The number of streams doesn't have to be the same as the number of workers; we can play back five streams on three workers, or seven workers. Let's do three:
Echidnatron% time cargo run --release --bin capture_recv -- numbat 5 -w3
Finished release [optimized] target(s) in 0.0 secs
Running `target/release/capture_recv numbat 5 -w3`
allocating consumer for topic "numbat-0"
allocating consumer for topic "numbat-1"
allocating consumer for topic "numbat-2"
allocating consumer for topic "numbat-3"
allocating consumer for topic "numbat-4"
replayed: 100000000
replayed: 200000000
replayed: 200000000
cargo run --release --bin capture_recv -- numbat 5 -w3 6.92s user 7.68s system 182% cpu 7.996 total
Echidnatron%
Here we see five consumers allocated, for the topics created up above (this is good; we want to read back all the data we wrote), but with only three workers we get three reports of data replayed, counting either one hundred million or two hundred million, depending on whether the worker got one stream or two to play back. We read back the 4GB of data in 8 seconds, which is a very reasonable 500MB/s.
Let's see how complicated it was to write this.
The sending side is the easiest, in capture_send.rs
, because there is the least amount of code to worry about. It actually looks quite a bit like the examples/capture_send.rs
example from the timely repository, which uses a TCP connection as the transport instead of Kafka. That is because I just stole that code.
I've hardwired a few things here, like the Kafka broker, and all the Kafka configuration stuff that I don't actually understand yet. But if you imagine that Confluent.io will rent you that expertise, you can blur your eyes and just check out the timely dataflow parts:
// [...] some other includes
use kafkaesque::EventProducer;
fn main() {
timely::execute_from_args(std::env::args(), |worker| {
let topic = std::env::args().nth(1).unwrap();
let count = std::env::args().nth(2).unwrap().parse::<u64>().unwrap();
let brokers = "localhost:9092";
// Kafka stuff [...] defines `producer_config`.
let topic = format!("{}-{:?}", topic, worker.index());
let producer = EventProducer::new(producer_config, topic);
worker.dataflow::<u64,_,_>(|scope|
(0 .. count)
.to_stream(scope)
.capture_into(producer)
);
}).unwrap(); // asserts error-free execution
}
All that we are doing here is creating a topic string for each worker, using the topic and the worker identifier, wiring up a kafkaesque::EventProducer
which is something I wrote with help, and then running a normal timely dataflow computation that ends with
.capture_into(producer)
That does it. The capture_into
method writes a sequence of timely "events" into anything that will listen. The timely "events" are both data messages and progress updates, a faithful pickling of everything an operator would see about the stream at that point. The kafkaesque::EventProducer
just happens to be something that serializes these events to binary and ships them to the Kafka topic.
Tada!
The receive side, in capture_recv.rs
, is a lot like timely's examples/capture_recv.rs
, again because I stole it. The receive sides are more complicated than their capture_send
counterparts because each needs to distribute responsibility for the varying number of source streams across the workers. Also there are a lot more Kafka options on this side.
Again, I'm just going to show you the blurry-eyed view where we see the timely dataflow part:
// [...] some other includes
use kafkaesque::EventConsumer;
fn main() {
timely::execute_from_args(std::env::args(), |worker| {
let topic = std::env::args().nth(1).unwrap();
let source_peers = std::env::args().nth(2).unwrap().parse::<usize>().unwrap();
let brokers = "localhost:9092";
// Kafka stuff [...] defines `consumer_config`.
// create replayers from disjoint partition of source worker identifiers.
let replayers =
(0 .. source_peers)
.filter(|i| i % worker.peers() == worker.index())
.map(|i| {
let topic = format!("{}-{:?}", topic, i);
EventConsumer::<_,u64>::new(consumer_config.clone(), topic)
})
.collect::<Vec<_>>();
worker.dataflow::<u64,_,_>(|scope| {
replayers
.replay_into(scope)
.count()
.inspect(|x| println!("replayed: {:?}", x));
})
}).unwrap(); // asserts error-free execution
}
This is a bit more grotty, but it is mostly the definition of replayers
, which again is complicated by the need to distribute 0 .. source_peers
among the workers in this computation, which are worker.peers()
in number. Once you do that, then you just need to call
replayers
.replay_into(scope)
This creates a source of data that is the union of the streams you chose to replay. This means that across all the workers, the collected stream is the union of all the source streams, just as if you were part of the original computation. The remaining .count()
and .inspect()
calls are just as if you had (0 .. count)
as your stream source.
There isn't a great deal going on in the definitions of EventProducer
and EventConsumer
, which might mean that I am not properly using the full awesome power of Kafka's interfaces. Or, not correctly using it. Whatever. I'm ok with this at the moment, at least up until Kafka turns into the bottleneck, or some helpful person points out what I'm doing wrong.
Here is the implementation of the EventPusher
trait for EventProducer
, which is the trait one needs to implement to be capture_into
compatible. You just need to do a thing with a timely Event<T, D>
:
impl<T: Abomonation, D: Abomonation> EventPusher<T, D> for EventProducer<T, D> {
fn push(&mut self, event: Event<T, D>) {
unsafe { ::abomonation::encode(&event, &mut self.buffer); }
self.producer.send_copy::<[u8],()>(self.topic.as_str(), None, Some(&self.buffer[..]), None, None, None).unwrap();
self.counter.fetch_add(1, Ordering::SeqCst);
self.producer.poll(0);
self.buffer.clear();
}
}
We just serialize the event into a local buffer using Abomonation, send the buffer along, and increment a counter of things we expect to be acknowledged before shutting down. There is some inelegant code in the Drop
implementation that makes sure to hang around until all the writes are acknowledge (and we probably block the system while doing it, too).
Here is the implementation of the EventIterator
trait for EventConsumer
, which is the trait one needs to implement to be replay_into
compatible. You just need to produce a reference to a timely Event<T, D>
.
impl<T: Abomonation, D: Abomonation> EventIterator<T, D> for EventConsumer<T, D> {
fn next(&mut self) -> Option<&Event<T, D>> {
if let Some(result) = self.consumer.poll(0) {
match result {
Ok(message) => {
self.buffer.clear();
self.buffer.extend_from_slice(message.payload().unwrap());
Some(unsafe { ::abomonation::decode::<Event<T,D>>(&mut self.buffer[..]).unwrap().0 })
},
Err(err) => {
println!("KafkaConsumer error: {:?}", err);
None
},
}
}
else { None }
}
}
This is similarly simple: we poll the Kafka consumer, which returns an Option<KafkaResult>
, and once we confirm that we actually have valid data, we grab the bytes and then re-interpret them as typed data, one of the things that Abomonation does that it probably shouldn't do. This could probably be more succinct if I understood all of the Rust error idioms, but I hope you get the point!
In principle this should make it easier to get data in to and out of Kafka. I'm guessing that no one else uses timely's Event
type for exchanging watermarked timestamped streams, but there are probably connectors to write. I might check in with the Flink folks and see what they do about in-stream watermarking, and maybe do a little demo where we bounce data back and forth between the two.
Now that Kafka has gone 1.0, this might be a good time to learn about it! Like, for me to learn about it! I don't know anything about Kafka, except that it does apparently does "exactly once delivery", which timely doesn't do because I once took a distributed computing theory class. What I think they actually mean is that you can acknowledge messages once you've processed them, rather than when you receive them, which is how most grown-up streaming systems work (again, not timely).
Maybe I'm being unfair and I should learn more!
Probably.