Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Async/Await: No futures executor #166

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,6 @@ jobs:
- os: linux
arch: amd64
rust: stable
- os: linux
arch: amd64
rust: nightly-2019-10-17
env: RDKAFKA_RUN_TESTS=1
- os: linux
arch: arm64
Expand Down
10 changes: 6 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,21 +12,23 @@ edition = "2018"

[dependencies]
rdkafka-sys = { path = "rdkafka-sys", version = "1.2.1" }
futures = "0.1.21"
futures = "0.3"
libc = "0.2.0"
log = "0.4.8"
serde = "1.0.0"
serde_derive = "1.0.0"
pin-project = "0.4"
serde = { version = "1.0.0", features = ["derive"] }
serde_json = "1.0.0"
tokio-executor = "=0.2.0-alpha.6"

[dev-dependencies]
backoff = "0.1.5"
chrono = "0.4.0"
clap = "2.18.0"
env_logger = "0.7.1"
lazy_static = "1.4.0"
rand = "0.3.15"
regex = "1.1.6"
tokio = "0.1.7"
tokio = "=0.2.0-alpha.6"

[features]
default = []
Expand Down
162 changes: 61 additions & 101 deletions examples/asynchronous_processing.rs
Original file line number Diff line number Diff line change
@@ -1,36 +1,28 @@
#[macro_use]
extern crate log;
extern crate clap;
extern crate futures;
extern crate rand;
extern crate rdkafka;
extern crate tokio;

use clap::{App, Arg};
use futures::{lazy, Future, Stream};
use tokio::runtime::current_thread;
use futures::{FutureExt, StreamExt};
use futures::future::ready;
use log::*;

use rdkafka::config::ClientConfig;
use rdkafka::consumer::stream_consumer::StreamConsumer;
use rdkafka::consumer::StreamConsumer;
use rdkafka::consumer::Consumer;
use rdkafka::message::OwnedMessage;
use rdkafka::producer::{FutureProducer, FutureRecord};
use rdkafka::Message;
use rdkafka::async_support::*;

use std::thread;
use std::time::Duration;

use tokio_executor::blocking::{run as block_on};

mod example_utils;
use crate::example_utils::setup_logger;
use example_utils::setup_logger;


// Emulates an expensive, synchronous computation.
fn expensive_computation<'a>(msg: OwnedMessage) -> String {
info!("Starting expensive computation on message {}", msg.offset());
thread::sleep(Duration::from_millis(rand::random::<u64>() % 5000));
info!(
"Expensive computation completed on message {}",
msg.offset()
);
info!("Expensive computation completed on message {}", msg.offset());
match msg.payload_view::<str>() {
Some(Ok(payload)) => format!("Payload len for {} is {}", payload, payload.len()),
Some(Err(_)) => "Message payload is not a string".to_owned(),
Expand All @@ -46,7 +38,7 @@ fn expensive_computation<'a>(msg: OwnedMessage) -> String {
// Moving each message from one stage of the pipeline to next one is handled by the event loop,
// that runs on a single thread. The expensive CPU-bound computation is handled by the `ThreadPool`,
// without blocking the event loop.
fn run_async_processor(brokers: &str, group_id: &str, input_topic: &str, output_topic: &str) {
async fn run_async_processor(brokers: &str, group_id: &str, input_topic: &str, output_topic: &str) {
// Create the `StreamConsumer`, to receive the messages from the topic in form of a `Stream`.
let consumer: StreamConsumer = ClientConfig::new()
.set("group.id", group_id)
Expand All @@ -57,9 +49,7 @@ fn run_async_processor(brokers: &str, group_id: &str, input_topic: &str, output_
.create()
.expect("Consumer creation failed");

consumer
.subscribe(&[input_topic])
.expect("Can't subscribe to specified topic");
consumer.subscribe(&[input_topic]).expect("Can't subscribe to specified topic");

// Create the `FutureProducer` to produce asynchronously.
let producer: FutureProducer = ClientConfig::new()
Expand All @@ -69,109 +59,79 @@ fn run_async_processor(brokers: &str, group_id: &str, input_topic: &str, output_
.create()
.expect("Producer creation error");

// Create the runtime where the expensive computation will be performed.
let mut thread_pool = tokio::runtime::Builder::new()
.name_prefix("pool-")
.core_threads(4)
.build()
.unwrap();

// Use the current thread as IO thread to drive consumer and producer.
let mut io_thread = current_thread::Runtime::new().unwrap();
let io_thread_handle = io_thread.handle();

// Create the outer pipeline on the message stream.
let stream_processor = consumer
.start()
.filter_map(|result| {
// Filter out errors
match result {
let stream_processor = consumer.start()
.filter_map(|result| { // Filter out errors
ready(match result {
Ok(msg) => Some(msg),
Err(kafka_error) => {
warn!("Error while receiving from Kafka: {:?}", kafka_error);
None
}
}
})
.for_each(move |borrowed_message| {
// Process each message
})
}).for_each(move |borrowed_message| { // Process each message
info!("Message received: {}", borrowed_message.offset());
// Borrowed messages can't outlive the consumer they are received from, so they need to
// be owned in order to be sent to a separate thread.
let owned_message = borrowed_message.detach();
let output_topic = output_topic.to_string();
let producer = producer.clone();
let io_thread_handle = io_thread_handle.clone();
let message_future = lazy(move || {
let message_future = async move {
// The body of this closure will be executed in the thread pool.
let computation_result = expensive_computation(owned_message);
let producer_future = producer
.send(
FutureRecord::to(&output_topic)
.key("some key")
.payload(&computation_result),
0,
)
.then(|result| {
let computation_result = block_on(move || expensive_computation(owned_message)).await;
let producer_future = producer.send(
FutureRecord::to(&output_topic)
.key("some key")
.payload(&computation_result),
0).then(|result| {
match result {
Ok(Ok(delivery)) => println!("Sent: {:?}", delivery),
Ok(Err((e, _))) => println!("Error: {:?}", e),
Err(_) => println!("Future cancelled"),
Ok(delivery) => println!("Sent: {:?}", delivery),
Err(e) => println!("Error: {:?}", e),
}
Ok(())
ready(())
});
let _ = io_thread_handle.spawn(producer_future);
Ok(())
});
thread_pool.spawn(message_future);
Ok(())
producer_future.await
};
tokio::spawn(message_future);
ready(())
});

info!("Starting event loop");
let _ = io_thread.block_on(stream_processor);
stream_processor.await;
info!("Stream processing terminated");
}

fn main() {
#[tokio::main]
async fn main() {
let matches = App::new("Async example")
.version(option_env!("CARGO_PKG_VERSION").unwrap_or(""))
.about("Asynchronous computation example")
.arg(
Arg::with_name("brokers")
.short("b")
.long("brokers")
.help("Broker list in kafka format")
.takes_value(true)
.default_value("localhost:9092"),
)
.arg(
Arg::with_name("group-id")
.short("g")
.long("group-id")
.help("Consumer group id")
.takes_value(true)
.default_value("example_consumer_group_id"),
)
.arg(
Arg::with_name("log-conf")
.long("log-conf")
.help("Configure the logging format (example: 'rdkafka=trace')")
.takes_value(true),
)
.arg(
Arg::with_name("input-topic")
.long("input-topic")
.help("Input topic")
.takes_value(true)
.required(true),
)
.arg(
Arg::with_name("output-topic")
.long("output-topic")
.help("Output topic")
.takes_value(true)
.required(true),
)
.arg(Arg::with_name("brokers")
.short("b")
.long("brokers")
.help("Broker list in kafka format")
.takes_value(true)
.default_value("localhost:9092"))
.arg(Arg::with_name("group-id")
.short("g")
.long("group-id")
.help("Consumer group id")
.takes_value(true)
.default_value("example_consumer_group_id"))
.arg(Arg::with_name("log-conf")
.long("log-conf")
.help("Configure the logging format (example: 'rdkafka=trace')")
.takes_value(true))
.arg(Arg::with_name("input-topic")
.long("input-topic")
.help("Input topic")
.takes_value(true)
.required(true))
.arg(Arg::with_name("output-topic")
.long("output-topic")
.help("Output topic")
.takes_value(true)
.required(true))
.get_matches();

setup_logger(true, matches.value_of("log-conf"));
Expand All @@ -181,5 +141,5 @@ fn main() {
let input_topic = matches.value_of("input-topic").unwrap();
let output_topic = matches.value_of("output-topic").unwrap();

run_async_processor(brokers, group_id, input_topic, output_topic);
run_async_processor(brokers, group_id, input_topic, output_topic).await
}
Loading