Skip to content

Commit

Permalink
No Futures Executor
Browse files Browse the repository at this point in the history
Switch from futures executor (which is only for testing) to either pure
implementations or usages of tokio.
  • Loading branch information
dbcfd committed Oct 24, 2019
1 parent 0c8ab14 commit 86fe575
Show file tree
Hide file tree
Showing 27 changed files with 497 additions and 544 deletions.
8 changes: 5 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,20 @@ rdkafka-sys = { path = "rdkafka-sys", version = "1.2.1" }
futures-preview = "0.3.0-alpha.18"
libc = "0.2.0"
log = "0.4.8"
serde = "1.0.0"
serde_derive = "1.0.0"
pin-project = "0.4"
serde = { version = "1.0.0", features = ["derive"] }
serde_json = "1.0.0"
tokio-executor = "0.2.0-alpha.6"

[dev-dependencies]
backoff = "0.1.5"
chrono = "0.4.0"
clap = "2.18.0"
env_logger = "0.7.1"
lazy_static = "1.4.0"
rand = "0.3.15"
regex = "1.1.6"
tokio = "0.2.0-alpha.4"
tokio = "0.2.0-alpha.6"

[features]
default = []
Expand Down
57 changes: 19 additions & 38 deletions examples/asynchronous_processing.rs
Original file line number Diff line number Diff line change
@@ -1,27 +1,21 @@
#[macro_use] extern crate log;
extern crate clap;
extern crate futures;
extern crate rand;
extern crate rdkafka;
extern crate tokio;

use clap::{App, Arg};
use futures::{FutureExt, StreamExt};
use futures::future::{lazy, ready};
use tokio::runtime::current_thread;
use futures::future::ready;
use log::*;

use rdkafka::Message;
use rdkafka::consumer::Consumer;
use rdkafka::consumer::stream_consumer::StreamConsumer;
use rdkafka::config::ClientConfig;
use rdkafka::message::OwnedMessage;
use rdkafka::producer::{FutureProducer, FutureRecord};
use rdkafka::consumer::StreamConsumer;
use rdkafka::consumer::Consumer;
use rdkafka::Message;
use rdkafka::async_support::*;

use std::thread;
use std::time::Duration;

use tokio_executor::blocking::{run as block_on};

mod example_utils;
use crate::example_utils::setup_logger;
use example_utils::setup_logger;


// Emulates an expensive, synchronous computation.
Expand All @@ -44,8 +38,7 @@ fn expensive_computation<'a>(msg: OwnedMessage) -> String {
// Moving each message from one stage of the pipeline to next one is handled by the event loop,
// that runs on a single thread. The expensive CPU-bound computation is handled by the `ThreadPool`,
// without blocking the event loop.
fn run_async_processor(brokers: &str, group_id: &str, input_topic: &str, output_topic: &str) {

async fn run_async_processor(brokers: &str, group_id: &str, input_topic: &str, output_topic: &str) {
// Create the `StreamConsumer`, to receive the messages from the topic in form of a `Stream`.
let consumer: StreamConsumer = ClientConfig::new()
.set("group.id", group_id)
Expand All @@ -66,17 +59,6 @@ fn run_async_processor(brokers: &str, group_id: &str, input_topic: &str, output_
.create()
.expect("Producer creation error");

// Create the runtime where the expensive computation will be performed.
let thread_pool = tokio::runtime::Builder::new()
.name_prefix("pool-")
.core_threads(4)
.build()
.unwrap();

// Use the current thread as IO thread to drive consumer and producer.
let mut io_thread = current_thread::Runtime::new().unwrap();
let io_thread_handle = io_thread.handle();

// Create the outer pipeline on the message stream.
let stream_processor = consumer.start()
.filter_map(|result| { // Filter out errors
Expand All @@ -94,10 +76,9 @@ fn run_async_processor(brokers: &str, group_id: &str, input_topic: &str, output_
let owned_message = borrowed_message.detach();
let output_topic = output_topic.to_string();
let producer = producer.clone();
let io_thread_handle = io_thread_handle.clone();
let message_future = lazy(move |_| {
let message_future = async move {
// The body of this closure will be executed in the thread pool.
let computation_result = expensive_computation(owned_message);
let computation_result = block_on(move || expensive_computation(owned_message)).await;
let producer_future = producer.send(
FutureRecord::to(&output_topic)
.key("some key")
Expand All @@ -109,19 +90,19 @@ fn run_async_processor(brokers: &str, group_id: &str, input_topic: &str, output_
}
ready(())
});
let _ = io_thread_handle.spawn(producer_future);
()
});
thread_pool.spawn(message_future);
producer_future.await
};
tokio::spawn(message_future);
ready(())
});

info!("Starting event loop");
let _ = io_thread.block_on(stream_processor);
stream_processor.await;
info!("Stream processing terminated");
}

fn main() {
#[tokio::main]
async fn main() {
let matches = App::new("Async example")
.version(option_env!("CARGO_PKG_VERSION").unwrap_or(""))
.about("Asynchronous computation example")
Expand Down Expand Up @@ -160,5 +141,5 @@ fn main() {
let input_topic = matches.value_of("input-topic").unwrap();
let output_topic = matches.value_of("output-topic").unwrap();

run_async_processor(brokers, group_id, input_topic, output_topic);
run_async_processor(brokers, group_id, input_topic, output_topic).await
}
25 changes: 11 additions & 14 deletions examples/at_least_once.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,27 +11,21 @@
/// For a simpler example of consumers and producers, check the `simple_consumer` and
/// `simple_producer` files in the example folder.
///
#[macro_use] extern crate log;
extern crate clap;
extern crate futures;
extern crate rdkafka;
extern crate rdkafka_sys;

use clap::{App, Arg};
use futures::executor::{block_on, block_on_stream};
use futures::future::join_all;
use log::*;

use rdkafka::Message;
use rdkafka::client::ClientContext;
use rdkafka::config::{ClientConfig, RDKafkaLogLevel};
use rdkafka::consumer::stream_consumer::StreamConsumer;
use rdkafka::consumer::{Consumer, ConsumerContext};
use rdkafka::error::KafkaResult;
use rdkafka::producer::{FutureProducer, FutureRecord};
use rdkafka::util::get_rdkafka_version;
use rdkafka::async_support::*;

mod example_utils;
use crate::example_utils::setup_logger;
use example_utils::setup_logger;
use futures::StreamExt;


// A simple context to customize the consumer behavior and print a log line every time
Expand Down Expand Up @@ -82,7 +76,8 @@ fn create_producer(brokers: &str) -> FutureProducer {
.expect("Producer creation failed")
}

fn main() {
#[tokio::main]
async fn main() {
let matches = App::new("at-least-once")
.version(option_env!("CARGO_PKG_VERSION").unwrap_or(""))
.about("At-least-once delivery example")
Expand Down Expand Up @@ -128,15 +123,15 @@ fn main() {
let consumer = create_consumer(brokers, group_id, input_topic);
let producer = create_producer(brokers);

for message in block_on_stream(consumer.start()) {
for message in consumer.start().next().await {
match message {
Err(e) => {
warn!("Kafka error: {}", e);
}
Ok(m) => {
// Send a copy to the message to every output topic in parallel, and wait for the
// delivery report to be received.
block_on(join_all(
join_all(
output_topics.iter()
.map(|output_topic| {
let mut record = FutureRecord::to(output_topic);
Expand All @@ -147,7 +142,9 @@ fn main() {
record = record.key(k);
}
producer.send(record, 1000)
}))).into_iter().collect::<Result<Vec<_>, _>>()
}))
.await
.into_iter().collect::<Result<Vec<_>, _>>()
.expect("Message delivery failed for some topic");
// Now that the message is completely processed, add it's position to the offset
// store. The actual offset will be committed every 5 seconds.
Expand Down
12 changes: 4 additions & 8 deletions examples/example_utils.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,8 @@
extern crate chrono;
extern crate env_logger;
extern crate log;
use chrono::prelude::*;

use self::chrono::prelude::*;

use self::env_logger::fmt::Formatter;
use self::env_logger::Builder;
use self::log::{LevelFilter, Record};
use env_logger::fmt::Formatter;
use env_logger::Builder;
use log::{LevelFilter, Record};
use std::io::Write;
use std::thread;

Expand Down
12 changes: 4 additions & 8 deletions examples/metadata.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,14 @@
#[macro_use]
extern crate log;
#[macro_use]
extern crate clap;
extern crate rdkafka;

use clap::{App, Arg};
use clap::{App, Arg, value_t};
use log::*;

use rdkafka::config::ClientConfig;
use rdkafka::consumer::{BaseConsumer, Consumer};

use std::time::Duration;

#[macro_use]
mod example_utils;
use crate::example_utils::setup_logger;
use example_utils::setup_logger;

fn print_metadata(brokers: &str, topic: Option<&str>, timeout: Duration, fetch_offsets: bool) {
let consumer: BaseConsumer = ClientConfig::new()
Expand Down
26 changes: 11 additions & 15 deletions examples/simple_consumer.rs
Original file line number Diff line number Diff line change
@@ -1,23 +1,18 @@
#[macro_use]
extern crate log;
extern crate clap;
extern crate futures;
extern crate rdkafka;
extern crate rdkafka_sys;

use clap::{App, Arg};
use futures::executor::block_on_stream;

use log::*;

use rdkafka::client::ClientContext;
use rdkafka::config::{ClientConfig, RDKafkaLogLevel};
use rdkafka::consumer::stream_consumer::StreamConsumer;
use rdkafka::consumer::{CommitMode, Consumer, ConsumerContext, Rebalance};
use rdkafka::error::KafkaResult;
use rdkafka::message::{Headers, Message};
use rdkafka::util::get_rdkafka_version;
use rdkafka::async_support::*;

mod example_utils;
use crate::example_utils::setup_logger;
use example_utils::setup_logger;
use futures::StreamExt;

// A context can be used to change the behavior of producers and consumers by adding callbacks
// that will be executed by librdkafka.
Expand Down Expand Up @@ -47,7 +42,7 @@ impl ConsumerContext for CustomContext {
// A type alias with your custom consumer can be created for convenience.
type LoggingConsumer = StreamConsumer<CustomContext>;

fn consume_and_print(brokers: &str, group_id: &str, topics: &[&str]) {
async fn consume_and_print(brokers: &str, group_id: &str, topics: &[&str]) {
let context = CustomContext;

let consumer: LoggingConsumer = ClientConfig::new()
Expand All @@ -68,9 +63,9 @@ fn consume_and_print(brokers: &str, group_id: &str, topics: &[&str]) {

// consumer.start() returns a stream. The stream can be used ot chain together expensive steps,
// such as complex computations on a thread pool or asynchronous IO.
let message_stream = consumer.start();
let mut message_stream = consumer.start();

for message in block_on_stream(message_stream) {
while let Some(message) = message_stream.next().await {
match message {
Err(e) => warn!("Kafka error: {}", e),
Ok(m) => {
Expand All @@ -96,7 +91,8 @@ fn consume_and_print(brokers: &str, group_id: &str, topics: &[&str]) {
}
}

fn main() {
#[tokio::main]
async fn main() {
let matches = App::new("consumer example")
.version(option_env!("CARGO_PKG_VERSION").unwrap_or(""))
.about("Simple command line consumer")
Expand Down Expand Up @@ -142,5 +138,5 @@ fn main() {
let brokers = matches.value_of("brokers").unwrap();
let group_id = matches.value_of("group-id").unwrap();

consume_and_print(brokers, group_id, &topics);
consume_and_print(brokers, group_id, &topics).await;
}
Loading

0 comments on commit 86fe575

Please sign in to comment.