Skip to content

Commit

Permalink
Merge pull request #187 from benesch/async-await
Browse files Browse the repository at this point in the history
Upgrade to async/await ecosystem
  • Loading branch information
benesch authored Dec 5, 2019
2 parents d78a9fb + f195f98 commit f898f92
Show file tree
Hide file tree
Showing 15 changed files with 345 additions and 328 deletions.
4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ edition = "2018"

[dependencies]
rdkafka-sys = { path = "rdkafka-sys", version = "1.2.1", default-features = false }
futures = "0.1.21"
futures = "0.3.0"
libc = "0.2.0"
log = "0.4.8"
serde = "1.0.0"
Expand All @@ -26,7 +26,7 @@ clap = "2.18.0"
env_logger = "0.7.1"
rand = "0.3.15"
regex = "1.1.6"
tokio = "0.1.7"
tokio = { version = "0.2", features = ["blocking", "macros", "rt-core", "time"] }

# These features are re-exports of the features that the rdkafka-sys crate
# provides. See the rdkafka-sys documentation for details.
Expand Down
29 changes: 28 additions & 1 deletion changelog.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,33 @@
# Changelog

<a name="0.2120"></a>
## Unreleased

* Upgrade to the async/await ecosystem, including `std::future::Future`, v0.3
of the futures crate, and v0.2 of Tokio. The minimum supported Rust version
is now Rust 1.39. Special thanks to [@sd2k] and [@dbcfd]. ([#186])

The main difference is that functions that previously returned
```rust
futures01::Future<Item = T, Error = E>
```
now return:
```rust
std::future::Future<Output = Result<T, E>>
```
In the special case when the error was `()`, the new signature is further
simplified to:
```rust
std::future::Future<Output = T>
```
Functions that return `future::Stream`s have had the analogous transformation
applied.

[#186]: https://github.com/fede1024/rust-rdkafka/pull/183

[@sd2k]: https://github.com/sd2k
[@dbcfd]: https://github.com/dbcfd

<a name="0.22.0"></a>
## 0.22.0 (2019-12-01)

* Add a client for Kafka's Admin API, which allows actions like creating and
Expand Down
100 changes: 37 additions & 63 deletions examples/asynchronous_processing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,7 @@ extern crate rdkafka;
extern crate tokio;

use clap::{App, Arg};
use futures::{lazy, Future, Stream};
use tokio::runtime::current_thread;
use futures::{future, TryStreamExt};

use rdkafka::config::ClientConfig;
use rdkafka::consumer::stream_consumer::StreamConsumer;
Expand Down Expand Up @@ -43,10 +42,10 @@ fn expensive_computation<'a>(msg: OwnedMessage) -> String {
// 2) filter out eventual Kafka errors.
// 3) send the message to a thread pool for processing.
// 4) produce the result to the output topic.
// Moving each message from one stage of the pipeline to next one is handled by the event loop,
// that runs on a single thread. The expensive CPU-bound computation is handled by the `ThreadPool`,
// without blocking the event loop.
fn run_async_processor(brokers: &str, group_id: &str, input_topic: &str, output_topic: &str) {
// `tokio::spawn` is used to handle IO-bound tasks in parallel (e.g., producing
// the messages), while `tokio::task::spawn_blocking` is used to handle the
// simulated CPU-bound task.
async fn run_async_processor(brokers: &str, group_id: &str, input_topic: &str, output_topic: &str) {
// Create the `StreamConsumer`, to receive the messages from the topic in form of a `Stream`.
let consumer: StreamConsumer = ClientConfig::new()
.set("group.id", group_id)
Expand All @@ -68,70 +67,45 @@ fn run_async_processor(brokers: &str, group_id: &str, input_topic: &str, output_
.create()
.expect("Producer creation error");

// Create the runtime where the expensive computation will be performed.
let mut thread_pool = tokio::runtime::Builder::new()
.name_prefix("pool-")
.core_threads(4)
.build()
.unwrap();

// Use the current thread as IO thread to drive consumer and producer.
let mut io_thread = current_thread::Runtime::new().unwrap();
let io_thread_handle = io_thread.handle();

// Create the outer pipeline on the message stream.
let stream_processor = consumer
.start()
.filter_map(|result| {
// Filter out errors
match result {
Ok(msg) => Some(msg),
Err(kafka_error) => {
warn!("Error while receiving from Kafka: {:?}", kafka_error);
None
}
let stream_processor = consumer.start().try_for_each(|borrowed_message| {
// Process each message
info!("Message received: {}", borrowed_message.offset());
// Borrowed messages can't outlive the consumer they are received from, so they need to
// be owned in order to be sent to a separate thread.
let owned_message = borrowed_message.detach();
let output_topic = output_topic.to_string();
let producer = producer.clone();
tokio::spawn(async move {
// The body of this block will be executed on the main thread pool,
// but we perform `expensive_computation` on a separate thread pool
// for CPU-intensive tasks via `tokio::task::spawn_blocking`.
let computation_result =
tokio::task::spawn_blocking(|| expensive_computation(owned_message))
.await
.expect("failed to wait for expensive computation");
let produce_future = producer.send(
FutureRecord::to(&output_topic)
.key("some key")
.payload(&computation_result),
0,
);
match produce_future.await {
Ok(Ok(delivery)) => println!("Sent: {:?}", delivery),
Ok(Err((e, _))) => println!("Error: {:?}", e),
Err(_) => println!("Future cancelled"),
}
})
.for_each(move |borrowed_message| {
// Process each message
info!("Message received: {}", borrowed_message.offset());
// Borrowed messages can't outlive the consumer they are received from, so they need to
// be owned in order to be sent to a separate thread.
let owned_message = borrowed_message.detach();
let output_topic = output_topic.to_string();
let producer = producer.clone();
let io_thread_handle = io_thread_handle.clone();
let message_future = lazy(move || {
// The body of this closure will be executed in the thread pool.
let computation_result = expensive_computation(owned_message);
let producer_future = producer
.send(
FutureRecord::to(&output_topic)
.key("some key")
.payload(&computation_result),
0,
)
.then(|result| {
match result {
Ok(Ok(delivery)) => println!("Sent: {:?}", delivery),
Ok(Err((e, _))) => println!("Error: {:?}", e),
Err(_) => println!("Future cancelled"),
}
Ok(())
});
let _ = io_thread_handle.spawn(producer_future);
Ok(())
});
thread_pool.spawn(message_future);
Ok(())
});
future::ready(Ok(()))
});

info!("Starting event loop");
let _ = io_thread.block_on(stream_processor);
stream_processor.await.expect("stream processing failed");
info!("Stream processing terminated");
}

fn main() {
#[tokio::main]
async fn main() {
let matches = App::new("Async example")
.version(option_env!("CARGO_PKG_VERSION").unwrap_or(""))
.about("Asynchronous computation example")
Expand Down Expand Up @@ -180,5 +154,5 @@ fn main() {
let input_topic = matches.value_of("input-topic").unwrap();
let output_topic = matches.value_of("output-topic").unwrap();

run_async_processor(brokers, group_id, input_topic, output_topic);
run_async_processor(brokers, group_id, input_topic, output_topic).await
}
23 changes: 11 additions & 12 deletions examples/at_least_once.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,8 @@ extern crate rdkafka;
extern crate rdkafka_sys;

use clap::{App, Arg};
use futures::future::join_all;
use futures::stream::Stream;
use futures::Future;
use futures::future;
use futures::stream::StreamExt;

use rdkafka::client::ClientContext;
use rdkafka::config::{ClientConfig, RDKafkaLogLevel};
Expand Down Expand Up @@ -89,7 +88,8 @@ fn create_producer(brokers: &str) -> FutureProducer {
.expect("Producer creation failed")
}

fn main() {
#[tokio::main]
async fn main() {
let matches = App::new("at-least-once")
.version(option_env!("CARGO_PKG_VERSION").unwrap_or(""))
.about("At-least-once delivery example")
Expand Down Expand Up @@ -148,18 +148,17 @@ fn main() {
let consumer = create_consumer(brokers, group_id, input_topic);
let producer = create_producer(brokers);

for message in consumer.start().wait() {
let mut stream = consumer.start();

while let Some(message) = stream.next().await {
match message {
Err(()) => {
warn!("Error while reading from stream");
}
Ok(Err(e)) => {
Err(e) => {
warn!("Kafka error: {}", e);
}
Ok(Ok(m)) => {
Ok(m) => {
// Send a copy to the message to every output topic in parallel, and wait for the
// delivery report to be received.
join_all(output_topics.iter().map(|output_topic| {
future::try_join_all(output_topics.iter().map(|output_topic| {
let mut record = FutureRecord::to(output_topic);
if let Some(p) = m.payload() {
record = record.payload(p);
Expand All @@ -169,7 +168,7 @@ fn main() {
}
producer.send(record, 1000)
}))
.wait()
.await
.expect("Message delivery failed for some topic");
// Now that the message is completely processed, add it's position to the offset
// store. The actual offset will be committed every 5 seconds.
Expand Down
18 changes: 9 additions & 9 deletions examples/simple_consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ extern crate rdkafka;
extern crate rdkafka_sys;

use clap::{App, Arg};
use futures::stream::Stream;
use futures::StreamExt;

use rdkafka::client::ClientContext;
use rdkafka::config::{ClientConfig, RDKafkaLogLevel};
Expand Down Expand Up @@ -47,7 +47,7 @@ impl ConsumerContext for CustomContext {
// A type alias with your custom consumer can be created for convenience.
type LoggingConsumer = StreamConsumer<CustomContext>;

fn consume_and_print(brokers: &str, group_id: &str, topics: &[&str]) {
async fn consume_and_print(brokers: &str, group_id: &str, topics: &[&str]) {
let context = CustomContext;

let consumer: LoggingConsumer = ClientConfig::new()
Expand All @@ -68,13 +68,12 @@ fn consume_and_print(brokers: &str, group_id: &str, topics: &[&str]) {

// consumer.start() returns a stream. The stream can be used ot chain together expensive steps,
// such as complex computations on a thread pool or asynchronous IO.
let message_stream = consumer.start();
let mut message_stream = consumer.start();

for message in message_stream.wait() {
while let Some(message) = message_stream.next().await {
match message {
Err(_) => warn!("Error while reading from stream."),
Ok(Err(e)) => warn!("Kafka error: {}", e),
Ok(Ok(m)) => {
Err(e) => warn!("Kafka error: {}", e),
Ok(m) => {
let payload = match m.payload_view::<str>() {
None => "",
Some(Ok(s)) => s,
Expand All @@ -97,7 +96,8 @@ fn consume_and_print(brokers: &str, group_id: &str, topics: &[&str]) {
}
}

fn main() {
#[tokio::main]
async fn main() {
let matches = App::new("consumer example")
.version(option_env!("CARGO_PKG_VERSION").unwrap_or(""))
.about("Simple command line consumer")
Expand Down Expand Up @@ -143,5 +143,5 @@ fn main() {
let brokers = matches.value_of("brokers").unwrap();
let group_id = matches.value_of("group-id").unwrap();

consume_and_print(brokers, group_id, &topics);
consume_and_print(brokers, group_id, &topics).await
}
9 changes: 5 additions & 4 deletions examples/simple_producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ mod example_utils;
use crate::example_utils::setup_logger;
use rdkafka::message::OwnedHeaders;

fn produce(brokers: &str, topic_name: &str) {
async fn produce(brokers: &str, topic_name: &str) {
let producer: FutureProducer = ClientConfig::new()
.set("bootstrap.servers", brokers)
.set("message.timeout.ms", "5000")
Expand Down Expand Up @@ -46,11 +46,12 @@ fn produce(brokers: &str, topic_name: &str) {

// This loop will wait until all delivery statuses have been received received.
for future in futures {
info!("Future completed. Result: {:?}", future.wait());
info!("Future completed. Result: {:?}", future.await);
}
}

fn main() {
#[tokio::main]
async fn main() {
let matches = App::new("producer example")
.version(option_env!("CARGO_PKG_VERSION").unwrap_or(""))
.about("Simple command line producer")
Expand Down Expand Up @@ -86,5 +87,5 @@ fn main() {
let topic = matches.value_of("topic").unwrap();
let brokers = matches.value_of("brokers").unwrap();

produce(brokers, topic);
produce(brokers, topic).await;
}
Loading

0 comments on commit f898f92

Please sign in to comment.