diff --git a/.travis.yml b/.travis.yml index 7b95b04c2..40b682024 100644 --- a/.travis.yml +++ b/.travis.yml @@ -8,9 +8,6 @@ jobs: - os: linux arch: amd64 rust: stable - - os: linux - arch: amd64 - rust: nightly-2019-10-17 env: RDKAFKA_RUN_TESTS=1 - os: linux arch: arm64 diff --git a/Cargo.toml b/Cargo.toml index cd5da2e82..450ded80e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,21 +12,23 @@ edition = "2018" [dependencies] rdkafka-sys = { path = "rdkafka-sys", version = "1.2.1" } -futures = "0.1.21" +futures = "0.3" libc = "0.2.0" log = "0.4.8" -serde = "1.0.0" -serde_derive = "1.0.0" +pin-project = "0.4" +serde = { version = "1.0.0", features = ["derive"] } serde_json = "1.0.0" +tokio-executor = "=0.2.0-alpha.6" [dev-dependencies] backoff = "0.1.5" chrono = "0.4.0" clap = "2.18.0" env_logger = "0.7.1" +lazy_static = "1.4.0" rand = "0.3.15" regex = "1.1.6" -tokio = "0.1.7" +tokio = "=0.2.0-alpha.6" [features] default = [] diff --git a/examples/asynchronous_processing.rs b/examples/asynchronous_processing.rs index 65aedac9a..6d779c963 100644 --- a/examples/asynchronous_processing.rs +++ b/examples/asynchronous_processing.rs @@ -1,36 +1,28 @@ -#[macro_use] -extern crate log; -extern crate clap; -extern crate futures; -extern crate rand; -extern crate rdkafka; -extern crate tokio; - use clap::{App, Arg}; -use futures::{lazy, Future, Stream}; -use tokio::runtime::current_thread; +use futures::{FutureExt, StreamExt}; +use futures::future::ready; +use log::*; use rdkafka::config::ClientConfig; -use rdkafka::consumer::stream_consumer::StreamConsumer; +use rdkafka::consumer::StreamConsumer; use rdkafka::consumer::Consumer; -use rdkafka::message::OwnedMessage; -use rdkafka::producer::{FutureProducer, FutureRecord}; use rdkafka::Message; +use rdkafka::async_support::*; use std::thread; use std::time::Duration; +use tokio_executor::blocking::{run as block_on}; + mod example_utils; -use crate::example_utils::setup_logger; +use example_utils::setup_logger; + // Emulates an expensive, synchronous computation. fn expensive_computation<'a>(msg: OwnedMessage) -> String { info!("Starting expensive computation on message {}", msg.offset()); thread::sleep(Duration::from_millis(rand::random::() % 5000)); - info!( - "Expensive computation completed on message {}", - msg.offset() - ); + info!("Expensive computation completed on message {}", msg.offset()); match msg.payload_view::() { Some(Ok(payload)) => format!("Payload len for {} is {}", payload, payload.len()), Some(Err(_)) => "Message payload is not a string".to_owned(), @@ -46,7 +38,7 @@ fn expensive_computation<'a>(msg: OwnedMessage) -> String { // Moving each message from one stage of the pipeline to next one is handled by the event loop, // that runs on a single thread. The expensive CPU-bound computation is handled by the `ThreadPool`, // without blocking the event loop. -fn run_async_processor(brokers: &str, group_id: &str, input_topic: &str, output_topic: &str) { +async fn run_async_processor(brokers: &str, group_id: &str, input_topic: &str, output_topic: &str) { // Create the `StreamConsumer`, to receive the messages from the topic in form of a `Stream`. let consumer: StreamConsumer = ClientConfig::new() .set("group.id", group_id) @@ -57,9 +49,7 @@ fn run_async_processor(brokers: &str, group_id: &str, input_topic: &str, output_ .create() .expect("Consumer creation failed"); - consumer - .subscribe(&[input_topic]) - .expect("Can't subscribe to specified topic"); + consumer.subscribe(&[input_topic]).expect("Can't subscribe to specified topic"); // Create the `FutureProducer` to produce asynchronously. let producer: FutureProducer = ClientConfig::new() @@ -69,109 +59,79 @@ fn run_async_processor(brokers: &str, group_id: &str, input_topic: &str, output_ .create() .expect("Producer creation error"); - // Create the runtime where the expensive computation will be performed. - let mut thread_pool = tokio::runtime::Builder::new() - .name_prefix("pool-") - .core_threads(4) - .build() - .unwrap(); - - // Use the current thread as IO thread to drive consumer and producer. - let mut io_thread = current_thread::Runtime::new().unwrap(); - let io_thread_handle = io_thread.handle(); - // Create the outer pipeline on the message stream. - let stream_processor = consumer - .start() - .filter_map(|result| { - // Filter out errors - match result { + let stream_processor = consumer.start() + .filter_map(|result| { // Filter out errors + ready(match result { Ok(msg) => Some(msg), Err(kafka_error) => { warn!("Error while receiving from Kafka: {:?}", kafka_error); None } - } - }) - .for_each(move |borrowed_message| { - // Process each message + }) + }).for_each(move |borrowed_message| { // Process each message info!("Message received: {}", borrowed_message.offset()); // Borrowed messages can't outlive the consumer they are received from, so they need to // be owned in order to be sent to a separate thread. let owned_message = borrowed_message.detach(); let output_topic = output_topic.to_string(); let producer = producer.clone(); - let io_thread_handle = io_thread_handle.clone(); - let message_future = lazy(move || { + let message_future = async move { // The body of this closure will be executed in the thread pool. - let computation_result = expensive_computation(owned_message); - let producer_future = producer - .send( - FutureRecord::to(&output_topic) - .key("some key") - .payload(&computation_result), - 0, - ) - .then(|result| { + let computation_result = block_on(move || expensive_computation(owned_message)).await; + let producer_future = producer.send( + FutureRecord::to(&output_topic) + .key("some key") + .payload(&computation_result), + 0).then(|result| { match result { - Ok(Ok(delivery)) => println!("Sent: {:?}", delivery), - Ok(Err((e, _))) => println!("Error: {:?}", e), - Err(_) => println!("Future cancelled"), + Ok(delivery) => println!("Sent: {:?}", delivery), + Err(e) => println!("Error: {:?}", e), } - Ok(()) + ready(()) }); - let _ = io_thread_handle.spawn(producer_future); - Ok(()) - }); - thread_pool.spawn(message_future); - Ok(()) + producer_future.await + }; + tokio::spawn(message_future); + ready(()) }); info!("Starting event loop"); - let _ = io_thread.block_on(stream_processor); + stream_processor.await; info!("Stream processing terminated"); } -fn main() { +#[tokio::main] +async fn main() { let matches = App::new("Async example") .version(option_env!("CARGO_PKG_VERSION").unwrap_or("")) .about("Asynchronous computation example") - .arg( - Arg::with_name("brokers") - .short("b") - .long("brokers") - .help("Broker list in kafka format") - .takes_value(true) - .default_value("localhost:9092"), - ) - .arg( - Arg::with_name("group-id") - .short("g") - .long("group-id") - .help("Consumer group id") - .takes_value(true) - .default_value("example_consumer_group_id"), - ) - .arg( - Arg::with_name("log-conf") - .long("log-conf") - .help("Configure the logging format (example: 'rdkafka=trace')") - .takes_value(true), - ) - .arg( - Arg::with_name("input-topic") - .long("input-topic") - .help("Input topic") - .takes_value(true) - .required(true), - ) - .arg( - Arg::with_name("output-topic") - .long("output-topic") - .help("Output topic") - .takes_value(true) - .required(true), - ) + .arg(Arg::with_name("brokers") + .short("b") + .long("brokers") + .help("Broker list in kafka format") + .takes_value(true) + .default_value("localhost:9092")) + .arg(Arg::with_name("group-id") + .short("g") + .long("group-id") + .help("Consumer group id") + .takes_value(true) + .default_value("example_consumer_group_id")) + .arg(Arg::with_name("log-conf") + .long("log-conf") + .help("Configure the logging format (example: 'rdkafka=trace')") + .takes_value(true)) + .arg(Arg::with_name("input-topic") + .long("input-topic") + .help("Input topic") + .takes_value(true) + .required(true)) + .arg(Arg::with_name("output-topic") + .long("output-topic") + .help("Output topic") + .takes_value(true) + .required(true)) .get_matches(); setup_logger(true, matches.value_of("log-conf")); @@ -181,5 +141,5 @@ fn main() { let input_topic = matches.value_of("input-topic").unwrap(); let output_topic = matches.value_of("output-topic").unwrap(); - run_async_processor(brokers, group_id, input_topic, output_topic); + run_async_processor(brokers, group_id, input_topic, output_topic).await } diff --git a/examples/at_least_once.rs b/examples/at_least_once.rs index 7b942d027..f02c7a6f3 100644 --- a/examples/at_least_once.rs +++ b/examples/at_least_once.rs @@ -11,29 +11,22 @@ /// For a simpler example of consumers and producers, check the `simple_consumer` and /// `simple_producer` files in the example folder. /// -#[macro_use] -extern crate log; -extern crate clap; -extern crate futures; -extern crate rdkafka; -extern crate rdkafka_sys; - use clap::{App, Arg}; use futures::future::join_all; -use futures::stream::Stream; -use futures::Future; +use log::*; +use rdkafka::Message; use rdkafka::client::ClientContext; use rdkafka::config::{ClientConfig, RDKafkaLogLevel}; -use rdkafka::consumer::stream_consumer::StreamConsumer; use rdkafka::consumer::{Consumer, ConsumerContext}; use rdkafka::error::KafkaResult; -use rdkafka::producer::{FutureProducer, FutureRecord}; use rdkafka::util::get_rdkafka_version; -use rdkafka::Message; +use rdkafka::async_support::*; mod example_utils; -use crate::example_utils::setup_logger; +use example_utils::setup_logger; +use futures::StreamExt; + // A simple context to customize the consumer behavior and print a log line every time // offsets are committed @@ -42,11 +35,7 @@ struct LoggingConsumerContext; impl ClientContext for LoggingConsumerContext {} impl ConsumerContext for LoggingConsumerContext { - fn commit_callback( - &self, - result: KafkaResult<()>, - _offsets: *mut rdkafka_sys::RDKafkaTopicPartitionList, - ) { + fn commit_callback(&self, result: KafkaResult<()>, _offsets: *mut rdkafka_sys::RDKafkaTopicPartitionList) { match result { Ok(_) => info!("Offsets committed successfully"), Err(e) => warn!("Error while committing offsets: {}", e), @@ -74,9 +63,7 @@ fn create_consumer(brokers: &str, group_id: &str, topic: &str) -> LoggingConsume .create_with_context(context) .expect("Consumer creation failed"); - consumer - .subscribe(&[topic]) - .expect("Can't subscribe to specified topic"); + consumer.subscribe(&[topic]).expect("Can't subscribe to specified topic"); consumer } @@ -84,52 +71,43 @@ fn create_consumer(brokers: &str, group_id: &str, topic: &str) -> LoggingConsume fn create_producer(brokers: &str) -> FutureProducer { ClientConfig::new() .set("bootstrap.servers", brokers) - .set("queue.buffering.max.ms", "0") // Do not buffer + .set("queue.buffering.max.ms", "0") // Do not buffer .create() .expect("Producer creation failed") } -fn main() { +#[tokio::main] +async fn main() { let matches = App::new("at-least-once") .version(option_env!("CARGO_PKG_VERSION").unwrap_or("")) .about("At-least-once delivery example") - .arg( - Arg::with_name("brokers") - .short("b") - .long("brokers") - .help("Broker list in kafka format") - .takes_value(true) - .default_value("localhost:9092"), - ) - .arg( - Arg::with_name("group-id") - .short("g") - .long("group-id") - .help("Consumer group id") - .takes_value(true) - .default_value("example_consumer_group_id"), - ) - .arg( - Arg::with_name("log-conf") - .long("log-conf") - .help("Configure the logging format (example: 'rdkafka=trace')") - .takes_value(true), - ) - .arg( - Arg::with_name("input-topic") - .long("input-topic") - .help("Input topic name") - .takes_value(true) - .required(true), - ) - .arg( - Arg::with_name("output-topics") - .long("output-topics") - .help("Output topics names") - .takes_value(true) - .multiple(true) - .required(true), - ) + .arg(Arg::with_name("brokers") + .short("b") + .long("brokers") + .help("Broker list in kafka format") + .takes_value(true) + .default_value("localhost:9092")) + .arg(Arg::with_name("group-id") + .short("g") + .long("group-id") + .help("Consumer group id") + .takes_value(true) + .default_value("example_consumer_group_id")) + .arg(Arg::with_name("log-conf") + .long("log-conf") + .help("Configure the logging format (example: 'rdkafka=trace')") + .takes_value(true)) + .arg(Arg::with_name("input-topic") + .long("input-topic") + .help("Input topic name") + .takes_value(true) + .required(true)) + .arg(Arg::with_name("output-topics") + .long("output-topics") + .help("Output topics names") + .takes_value(true) + .multiple(true) + .required(true)) .get_matches(); setup_logger(true, matches.value_of("log-conf")); @@ -138,39 +116,36 @@ fn main() { info!("rd_kafka_version: {}", version); let input_topic = matches.value_of("input-topic").unwrap(); - let output_topics = matches - .values_of("output-topics") - .unwrap() - .collect::>(); + let output_topics = matches.values_of("output-topics").unwrap().collect::>(); let brokers = matches.value_of("brokers").unwrap(); let group_id = matches.value_of("group-id").unwrap(); let consumer = create_consumer(brokers, group_id, input_topic); let producer = create_producer(brokers); - for message in consumer.start().wait() { + for message in consumer.start().next().await { match message { - Err(()) => { - warn!("Error while reading from stream"); - } - Ok(Err(e)) => { + Err(e) => { warn!("Kafka error: {}", e); } - Ok(Ok(m)) => { + Ok(m) => { // Send a copy to the message to every output topic in parallel, and wait for the // delivery report to be received. - join_all(output_topics.iter().map(|output_topic| { - let mut record = FutureRecord::to(output_topic); - if let Some(p) = m.payload() { - record = record.payload(p); - } - if let Some(k) = m.key() { - record = record.key(k); - } - producer.send(record, 1000) - })) - .wait() - .expect("Message delivery failed for some topic"); + join_all( + output_topics.iter() + .map(|output_topic| { + let mut record = FutureRecord::to(output_topic); + if let Some(p) = m.payload() { + record = record.payload(p); + } + if let Some(k) = m.key() { + record = record.key(k); + } + producer.send(record, 1000) + })) + .await + .into_iter().collect::, _>>() + .expect("Message delivery failed for some topic"); // Now that the message is completely processed, add it's position to the offset // store. The actual offset will be committed every 5 seconds. if let Err(e) = consumer.store_offset(&m) { diff --git a/examples/example_utils.rs b/examples/example_utils.rs index cd52e5e4c..dceff4b52 100644 --- a/examples/example_utils.rs +++ b/examples/example_utils.rs @@ -1,12 +1,8 @@ -extern crate chrono; -extern crate env_logger; -extern crate log; +use chrono::prelude::*; -use self::chrono::prelude::*; - -use self::env_logger::fmt::Formatter; -use self::env_logger::Builder; -use self::log::{LevelFilter, Record}; +use env_logger::fmt::Formatter; +use env_logger::Builder; +use log::{LevelFilter, Record}; use std::io::Write; use std::thread; diff --git a/examples/metadata.rs b/examples/metadata.rs index cc9612cfa..f316903f9 100644 --- a/examples/metadata.rs +++ b/examples/metadata.rs @@ -1,18 +1,14 @@ -#[macro_use] -extern crate log; -#[macro_use] -extern crate clap; -extern crate rdkafka; - -use clap::{App, Arg}; +use clap::{App, Arg, value_t}; +use log::*; use rdkafka::config::ClientConfig; use rdkafka::consumer::{BaseConsumer, Consumer}; use std::time::Duration; +#[macro_use] mod example_utils; -use crate::example_utils::setup_logger; +use example_utils::setup_logger; fn print_metadata(brokers: &str, topic: Option<&str>, timeout: Duration, fetch_offsets: bool) { let consumer: BaseConsumer = ClientConfig::new() diff --git a/examples/simple_consumer.rs b/examples/simple_consumer.rs index d2b78820b..8a86b77b2 100644 --- a/examples/simple_consumer.rs +++ b/examples/simple_consumer.rs @@ -1,23 +1,18 @@ -#[macro_use] -extern crate log; -extern crate clap; -extern crate futures; -extern crate rdkafka; -extern crate rdkafka_sys; - use clap::{App, Arg}; -use futures::stream::Stream; + +use log::*; use rdkafka::client::ClientContext; use rdkafka::config::{ClientConfig, RDKafkaLogLevel}; -use rdkafka::consumer::stream_consumer::StreamConsumer; use rdkafka::consumer::{CommitMode, Consumer, ConsumerContext, Rebalance}; use rdkafka::error::KafkaResult; use rdkafka::message::{Headers, Message}; use rdkafka::util::get_rdkafka_version; +use rdkafka::async_support::*; mod example_utils; -use crate::example_utils::setup_logger; +use example_utils::setup_logger; +use futures::StreamExt; // A context can be used to change the behavior of producers and consumers by adding callbacks // that will be executed by librdkafka. @@ -47,7 +42,7 @@ impl ConsumerContext for CustomContext { // A type alias with your custom consumer can be created for convenience. type LoggingConsumer = StreamConsumer; -fn consume_and_print(brokers: &str, group_id: &str, topics: &[&str]) { +async fn consume_and_print(brokers: &str, group_id: &str, topics: &[&str]) { let context = CustomContext; let consumer: LoggingConsumer = ClientConfig::new() @@ -68,13 +63,12 @@ fn consume_and_print(brokers: &str, group_id: &str, topics: &[&str]) { // consumer.start() returns a stream. The stream can be used ot chain together expensive steps, // such as complex computations on a thread pool or asynchronous IO. - let message_stream = consumer.start(); + let mut message_stream = consumer.start(); - for message in message_stream.wait() { + while let Some(message) = message_stream.next().await { match message { - Err(_) => warn!("Error while reading from stream."), - Ok(Err(e)) => warn!("Kafka error: {}", e), - Ok(Ok(m)) => { + Err(e) => warn!("Kafka error: {}", e), + Ok(m) => { let payload = match m.payload_view::() { None => "", Some(Ok(s)) => s, @@ -97,7 +91,8 @@ fn consume_and_print(brokers: &str, group_id: &str, topics: &[&str]) { } } -fn main() { +#[tokio::main] +async fn main() { let matches = App::new("consumer example") .version(option_env!("CARGO_PKG_VERSION").unwrap_or("")) .about("Simple command line consumer") @@ -143,5 +138,5 @@ fn main() { let brokers = matches.value_of("brokers").unwrap(); let group_id = matches.value_of("group-id").unwrap(); - consume_and_print(brokers, group_id, &topics); + consume_and_print(brokers, group_id, &topics).await; } diff --git a/examples/simple_producer.rs b/examples/simple_producer.rs index 8856569d8..45d45a36f 100644 --- a/examples/simple_producer.rs +++ b/examples/simple_producer.rs @@ -1,21 +1,32 @@ -#[macro_use] -extern crate log; -extern crate clap; -extern crate futures; -extern crate rdkafka; - use clap::{App, Arg}; use futures::*; +use log::*; use rdkafka::config::ClientConfig; -use rdkafka::producer::{FutureProducer, FutureRecord}; +use rdkafka::async_support::*; use rdkafka::util::get_rdkafka_version; mod example_utils; -use crate::example_utils::setup_logger; +use example_utils::setup_logger; use rdkafka::message::OwnedHeaders; -fn produce(brokers: &str, topic_name: &str) { +async fn send(producer: FutureProducer, topic_name: &str, i: &i32, payload: &String, key: &String) -> KafkaResult{ + producer + .send( + FutureRecord::to(topic_name) + .payload(&payload) + .key(&key) + .headers(OwnedHeaders::new().add("header_key", "header_value")), + 0, + ) + .map(move |delivery_status| { + // This will be executed onw the result is received + info!("Delivery status for message {} received", i); + delivery_status + }).await +} + +async fn produce(brokers: &str, topic_name: &str) { let producer: FutureProducer = ClientConfig::new() .set("bootstrap.servers", brokers) .set("produce.offset.report", "true") @@ -25,33 +36,32 @@ fn produce(brokers: &str, topic_name: &str) { // This loop is non blocking: all messages will be sent one after the other, without waiting // for the results. - let futures = (0..5) + let payloads: Vec<_> = (0..5) .map(|i| { + let payload = format!("Message {}", i); + let key = format!("Key {}", i); + (i, payload, key) + }) + .collect(); + let futures = payloads + .iter() + .map(|t| { + let (i, payload, key) = t; // The send operation on the topic returns a future, that will be completed once the // result or failure from Kafka will be received. - producer - .send( - FutureRecord::to(topic_name) - .payload(&format!("Message {}", i)) - .key(&format!("Key {}", i)) - .headers(OwnedHeaders::new().add("header_key", "header_value")), - 0, - ) - .map(move |delivery_status| { - // This will be executed onw the result is received - info!("Delivery status for message {} received", i); - delivery_status - }) + + send(producer.clone(), topic_name, i, payload, key) }) .collect::>(); // This loop will wait until all delivery statuses have been received received. for future in futures { - info!("Future completed. Result: {:?}", future.wait()); + info!("Future completed. Result: {:?}", future.await); } } -fn main() { +#[tokio::main] +async fn main() { let matches = App::new("producer example") .version(option_env!("CARGO_PKG_VERSION").unwrap_or("")) .about("Simple command line producer") @@ -87,5 +97,5 @@ fn main() { let topic = matches.value_of("topic").unwrap(); let brokers = matches.value_of("brokers").unwrap(); - produce(brokers, topic); + produce(brokers, topic).await; } diff --git a/rdkafka-sys/Cargo.toml b/rdkafka-sys/Cargo.toml index f7715642c..4e4077cf3 100644 --- a/rdkafka-sys/Cargo.toml +++ b/rdkafka-sys/Cargo.toml @@ -18,6 +18,7 @@ openssl-sys = { version = "~ 0.9.0", optional = true } lz4-sys = { version = "1.8.3", optional = true } [build-dependencies] +bindgen = "0.51.1" num_cpus = "0.2.0" pkg-config = "0.3.9" cmake = { version = "^0.1", optional = true } diff --git a/src/admin.rs b/src/admin.rs index cdb5b64e5..86c1774b3 100644 --- a/src/admin.rs +++ b/src/admin.rs @@ -12,14 +12,15 @@ use crate::config::{ClientConfig, FromClientConfig, FromClientConfigAndContext}; use crate::error::{IsError, KafkaError, KafkaResult}; use crate::util::{cstr_to_owned, timeout_to_ms, AsCArray, ErrBuf, IntoOpaque, WrappedCPointer}; -use futures::future::{self, Either}; -use futures::{Async, Canceled, Complete, Future, Oneshot, Poll}; - +use futures::channel::oneshot::{Canceled, Receiver, Sender, channel}; +use futures::future::{self, Either, Future, FutureExt}; +use log::*; use std::collections::HashMap; use std::ffi::{CStr, CString}; -use std::mem; +use std::pin::Pin; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; +use std::task::{Context, Poll}; use std::thread::{self, JoinHandle}; use std::time::Duration; @@ -48,13 +49,13 @@ impl AdminClient { &self, topics: I, opts: &AdminOptions, - ) -> impl Future, Error = KafkaError> + ) -> impl Future>> where I: IntoIterator>, { match self.create_topics_inner(topics, opts) { - Ok(rx) => Either::A(CreateTopicsFuture { rx }), - Err(err) => Either::B(future::err(err)), + Ok(rx) => Either::Left(CreateTopicsFuture { rx }), + Err(err) => Either::Right(future::err(err)), } } @@ -62,7 +63,7 @@ impl AdminClient { &self, topics: I, opts: &AdminOptions, - ) -> KafkaResult> + ) -> KafkaResult> where I: IntoIterator>, { @@ -93,10 +94,10 @@ impl AdminClient { &self, topic_names: &[&str], opts: &AdminOptions, - ) -> impl Future, Error = KafkaError> { + ) -> impl Future>> { match self.delete_topics_inner(topic_names, opts) { - Ok(rx) => Either::A(DeleteTopicsFuture { rx }), - Err(err) => Either::B(future::err(err)), + Ok(rx) => Either::Left(DeleteTopicsFuture { rx }), + Err(err) => Either::Right(future::err(err)), } } @@ -104,7 +105,7 @@ impl AdminClient { &self, topic_names: &[&str], opts: &AdminOptions, - ) -> KafkaResult> { + ) -> KafkaResult> { let mut native_topics = Vec::new(); let mut err_buf = ErrBuf::new(); for tn in topic_names { @@ -138,13 +139,13 @@ impl AdminClient { &self, partitions: I, opts: &AdminOptions, - ) -> impl Future, Error = KafkaError> + ) -> impl Future>> where I: IntoIterator>, { match self.create_partitions_inner(partitions, opts) { - Ok(rx) => Either::A(CreatePartitionsFuture { rx }), - Err(err) => Either::B(future::err(err)), + Ok(rx) => Either::Left(CreatePartitionsFuture { rx }), + Err(err) => Either::Right(future::err(err)), } } @@ -152,7 +153,7 @@ impl AdminClient { &self, partitions: I, opts: &AdminOptions, - ) -> KafkaResult> + ) -> KafkaResult> where I: IntoIterator>, { @@ -183,13 +184,13 @@ impl AdminClient { &self, configs: I, opts: &AdminOptions, - ) -> impl Future, Error = KafkaError> + ) -> impl Future>> where I: IntoIterator>, { match self.describe_configs_inner(configs, opts) { - Ok(rx) => Either::A(DescribeConfigsFuture { rx }), - Err(err) => Either::B(future::err(err)), + Ok(rx) => Either::Left(DescribeConfigsFuture { rx }), + Err(err) => Either::Right(future::err(err)), } } @@ -197,7 +198,7 @@ impl AdminClient { &self, configs: I, opts: &AdminOptions, - ) -> KafkaResult> + ) -> KafkaResult> where I: IntoIterator>, { @@ -247,13 +248,13 @@ impl AdminClient { &self, configs: I, opts: &AdminOptions, - ) -> impl Future, Error = KafkaError> + ) -> impl Future>> where I: IntoIterator>, { match self.alter_configs_inner(configs, opts) { - Ok(rx) => Either::A(AlterConfigsFuture { rx }), - Err(err) => Either::B(future::err(err)), + Ok(rx) => Either::Left(AlterConfigsFuture { rx }), + Err(err) => Either::Right(future::err(err)), } } @@ -261,7 +262,7 @@ impl AdminClient { &self, configs: I, opts: &AdminOptions, - ) -> KafkaResult> + ) -> KafkaResult> where I: IntoIterator>, { @@ -344,7 +345,7 @@ fn start_poll_thread(queue: Arc, should_stop: Arc) -> J continue; } let event = unsafe { NativeEvent::from_ptr(event) }; - let tx: Box> = + let tx: Box> = unsafe { IntoOpaque::from_ptr(rdsys::rd_kafka_event_opaque(event.ptr())) }; let _ = tx.send(event); } @@ -457,7 +458,7 @@ impl AdminOptions { &self, client: *mut RDKafka, err_buf: &mut ErrBuf, - ) -> KafkaResult<(NativeAdminOptions, Oneshot)> { + ) -> KafkaResult<(NativeAdminOptions, Receiver)> { let native_opts = unsafe { NativeAdminOptions::from_ptr(rdsys::rd_kafka_AdminOptions_new( client, @@ -513,12 +514,12 @@ impl AdminOptions { check_rdkafka_invalid_arg(res, err_buf)?; } - let (tx, rx) = futures::oneshot(); + let (tx, rx) = channel(); let tx = Box::new(tx); unsafe { - rdsys::rd_kafka_AdminOptions_set_opaque(native_opts.ptr, IntoOpaque::as_ptr(&tx)) + rdsys::rd_kafka_AdminOptions_set_opaque(native_opts.ptr, IntoOpaque::as_ptr(&tx)); + std::mem::forget(tx); }; - mem::forget(tx); Ok((native_opts, rx)) } @@ -734,31 +735,32 @@ impl Drop for NativeNewTopic { } struct CreateTopicsFuture { - rx: Oneshot, + rx: Receiver, } impl Future for CreateTopicsFuture { - type Item = Vec; - type Error = KafkaError; + type Output = KafkaResult>; - fn poll(&mut self) -> Poll { - match self.rx.poll() { - Ok(Async::Ready(event)) => { - event.check_error()?; + fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { + match self.rx.poll_unpin(cx) { + Poll::Ready(Ok(event)) => { + if let Err(e) = event.check_error() { + return Poll::Ready(Err(e)); + } let res = unsafe { rdsys::rd_kafka_event_CreateTopics_result(event.ptr()) }; if res.is_null() { let typ = unsafe { rdsys::rd_kafka_event_type(event.ptr()) }; - return Err(KafkaError::AdminOpCreation(format!( + return Poll::Ready(Err(KafkaError::AdminOpCreation(format!( "create topics request received response of incorrect type ({})", typ - ))); + )))); } let mut n = 0; let topics = unsafe { rdsys::rd_kafka_CreateTopics_result_topics(res, &mut n) }; - Ok(Async::Ready(build_topic_results(topics, n))) + Poll::Ready(Ok(build_topic_results(topics, n))) } - Ok(Async::NotReady) => Ok(Async::NotReady), - Err(Canceled) => Err(KafkaError::Canceled), + Poll::Ready(Err(Canceled)) => Poll::Ready(Err(KafkaError::Canceled)), + Poll::Pending => Poll::Pending, } } } @@ -797,31 +799,32 @@ impl Drop for NativeDeleteTopic { } struct DeleteTopicsFuture { - rx: Oneshot, + rx: Receiver, } impl Future for DeleteTopicsFuture { - type Item = Vec; - type Error = KafkaError; + type Output = KafkaResult>; - fn poll(&mut self) -> Poll { - match self.rx.poll() { - Ok(Async::Ready(event)) => { - event.check_error()?; + fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { + match self.rx.poll_unpin(cx) { + Poll::Ready(Ok(event)) => { + if let Err(e) = event.check_error() { + return Poll::Ready(Err(e)) + }; let res = unsafe { rdsys::rd_kafka_event_DeleteTopics_result(event.ptr()) }; if res.is_null() { let typ = unsafe { rdsys::rd_kafka_event_type(event.ptr()) }; - return Err(KafkaError::AdminOpCreation(format!( + return Poll::Ready(Err(KafkaError::AdminOpCreation(format!( "delete topics request received response of incorrect type ({})", typ - ))); + )))); } let mut n = 0; let topics = unsafe { rdsys::rd_kafka_DeleteTopics_result_topics(res, &mut n) }; - Ok(Async::Ready(build_topic_results(topics, n))) + Poll::Ready(Ok(build_topic_results(topics, n))) } - Ok(Async::NotReady) => Ok(Async::NotReady), - Err(Canceled) => Err(KafkaError::Canceled), + Poll::Ready(Err(Canceled)) => Poll::Ready(Err(KafkaError::Canceled)), + Poll::Pending => Poll::Pending, } } } @@ -941,31 +944,32 @@ impl Drop for NativeNewPartitions { } struct CreatePartitionsFuture { - rx: Oneshot, + rx: Receiver, } impl Future for CreatePartitionsFuture { - type Item = Vec; - type Error = KafkaError; + type Output = KafkaResult>; - fn poll(&mut self) -> Poll { - match self.rx.poll() { - Ok(Async::Ready(event)) => { - event.check_error()?; + fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { + match self.rx.poll_unpin(cx) { + Poll::Ready(Ok(event)) => { + if let Err(e) = event.check_error() { + return Poll::Ready(Err(e)); + } let res = unsafe { rdsys::rd_kafka_event_CreatePartitions_result(event.ptr()) }; if res.is_null() { let typ = unsafe { rdsys::rd_kafka_event_type(event.ptr()) }; - return Err(KafkaError::AdminOpCreation(format!( + return Poll::Ready(Err(KafkaError::AdminOpCreation(format!( "create partitions request received response of incorrect type ({})", typ - ))); + )))); } let mut n = 0; let topics = unsafe { rdsys::rd_kafka_CreatePartitions_result_topics(res, &mut n) }; - Ok(Async::Ready(build_topic_results(topics, n))) + Poll::Ready(Ok(build_topic_results(topics, n))) } - Ok(Async::NotReady) => Ok(Async::NotReady), - Err(Canceled) => Err(KafkaError::Canceled), + Poll::Ready(Err(Canceled)) => Poll::Ready(Err(KafkaError::Canceled)), + Poll::Pending => Poll::Pending, } } } @@ -1143,24 +1147,25 @@ fn extract_config_source(config_source: RDKafkaConfigSource) -> KafkaResult, + rx: Receiver, } impl Future for DescribeConfigsFuture { - type Item = Vec; - type Error = KafkaError; + type Output = KafkaResult>; - fn poll(&mut self) -> Poll { - match self.rx.poll() { - Ok(Async::Ready(event)) => { - event.check_error()?; + fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { + match self.rx.poll_unpin(cx) { + Poll::Ready(Ok(event)) => { + if let Err(e) = event.check_error() { + return Poll::Ready(Err(e)) + } let res = unsafe { rdsys::rd_kafka_event_DescribeConfigs_result(event.ptr()) }; if res.is_null() { let typ = unsafe { rdsys::rd_kafka_event_type(event.ptr()) }; - return Err(KafkaError::AdminOpCreation(format!( + return Poll::Ready(Err(KafkaError::AdminOpCreation(format!( "describe configs request received response of incorrect type ({})", typ - ))); + )))); } let mut n = 0; let resources = @@ -1206,10 +1211,10 @@ impl Future for DescribeConfigsFuture { entries: entries_out, })) } - Ok(Async::Ready(out)) + Poll::Ready(Ok(out)) } - Ok(Async::NotReady) => Ok(Async::NotReady), - Err(Canceled) => Err(KafkaError::Canceled), + Poll::Ready(Err(Canceled)) => Poll::Ready(Err(KafkaError::Canceled)), + Poll::Pending => Poll::Pending, } } } @@ -1282,24 +1287,25 @@ impl<'a> AlterConfig<'a> { } struct AlterConfigsFuture { - rx: Oneshot, + rx: Receiver, } impl Future for AlterConfigsFuture { - type Item = Vec; - type Error = KafkaError; + type Output = KafkaResult>; - fn poll(&mut self) -> Poll { - match self.rx.poll() { - Ok(Async::Ready(event)) => { - event.check_error()?; + fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { + match self.rx.poll_unpin(cx) { + Poll::Ready(Ok(event)) => { + if let Err(e) = event.check_error() { + return Poll::Ready(Err(e)) + }; let res = unsafe { rdsys::rd_kafka_event_AlterConfigs_result(event.ptr()) }; if res.is_null() { let typ = unsafe { rdsys::rd_kafka_event_type(event.ptr()) }; - return Err(KafkaError::AdminOpCreation(format!( + return Poll::Ready(Err(KafkaError::AdminOpCreation(format!( "alter configs request received response of incorrect type ({})", typ - ))); + )))); } let mut n = 0; let resources = @@ -1310,10 +1316,10 @@ impl Future for AlterConfigsFuture { let specifier = extract_config_specifier(resource)?; out.push(Ok(specifier)); } - Ok(Async::Ready(out)) + Poll::Ready(Ok(out)) } - Ok(Async::NotReady) => Ok(Async::NotReady), - Err(Canceled) => Err(KafkaError::Canceled), + Poll::Ready(Err(Canceled)) => Poll::Ready(Err(KafkaError::Canceled)), + Poll::Pending => Poll::Pending, } } } diff --git a/src/client.rs b/src/client.rs index daf7b8e7c..392d88b01 100644 --- a/src/client.rs +++ b/src/client.rs @@ -2,6 +2,7 @@ use crate::rdsys; use crate::rdsys::types::*; +use log::*; use std::ffi::{CStr, CString}; use std::mem; use std::os::raw::c_char; @@ -71,6 +72,7 @@ impl ClientContext for DefaultClientContext {} /// A native rdkafka-sys client. This struct shouldn't be used directly. Use higher level `Client` /// or producers and consumers. +#[derive(Clone)] pub struct NativeClient { ptr: *mut RDKafka, } @@ -104,6 +106,7 @@ impl Drop for NativeClient { /// A low level rdkafka client. This client shouldn't be used directly. The producer and consumer modules /// provide different producer and consumer implementations based on top of `Client` that can be /// used instead. +#[derive(Clone)] pub struct Client { native: NativeClient, context: Box, diff --git a/src/config.rs b/src/config.rs index 2dd8dc91d..1f9e5f896 100644 --- a/src/config.rs +++ b/src/config.rs @@ -20,12 +20,12 @@ use crate::rdsys; use crate::rdsys::types::*; -use log::Level; use crate::client::ClientContext; use crate::error::{IsError, KafkaError, KafkaResult}; use crate::util::ErrBuf; +use log::*; use std::collections::HashMap; use std::ffi::CString; use std::mem; @@ -144,6 +144,7 @@ impl ClientConfig { let conf = unsafe { rdsys::rd_kafka_conf_new() }; let mut err_buf = ErrBuf::new(); for (key, value) in &self.conf_map { + trace!("Adding config value {}={}", key, value); let key_c = CString::new(key.to_string())?; let value_c = CString::new(value.to_string())?; let ret = unsafe { diff --git a/src/consumer/base_consumer.rs b/src/consumer/base_consumer.rs index 0150008fa..925bc3e64 100644 --- a/src/consumer/base_consumer.rs +++ b/src/consumer/base_consumer.rs @@ -13,6 +13,7 @@ use crate::topic_partition_list::Offset::Offset; use crate::topic_partition_list::TopicPartitionList; use crate::util::{cstr_to_owned, timeout_to_ms}; +use log::*; use std::mem; use std::os::raw::c_void; use std::ptr; @@ -59,6 +60,7 @@ unsafe extern "C" fn native_rebalance_cb( /// Low level wrapper around the librdkafka consumer. This consumer requires to be periodically polled /// to make progress on rebalance, callbacks and to receive messages. +#[derive(Clone)] pub struct BaseConsumer { client: Client, } @@ -125,7 +127,7 @@ impl BaseConsumer { timeout: T, ) -> Option> { self.poll_raw(timeout_to_ms(timeout)) - .map(|ptr| unsafe { BorrowedMessage::from_consumer(ptr, self) }) + .map(|ptr| unsafe { BorrowedMessage::from_consumer(ptr) }) } /// Returns an iterator over the available messages. diff --git a/src/consumer/message_stream.rs b/src/consumer/message_stream.rs new file mode 100644 index 000000000..606d1ec2f --- /dev/null +++ b/src/consumer/message_stream.rs @@ -0,0 +1,162 @@ +use crate::rdsys; +use crate::rdsys::types::*; +use futures::Stream; + +use crate::consumer::{ConsumerContext, BaseConsumer}; +use crate::error::{KafkaError, KafkaResult}; +use crate::message::BorrowedMessage; +use crate::util::duration_to_millis; + +use log::*; +use pin_project::pin_project; +use std::future::Future; +use std::pin::Pin; +use std::ptr; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Arc; +use std::task::{Context, Poll}; +use std::time::Duration; +use tokio_executor::blocking::{run as block_on, Blocking}; + +/// A small wrapper for a message pointer. This wrapper is only used to +/// pass a message between the polling thread and the thread consuming the stream, +/// and to transform it from pointer to `BorrowedMessage` with a lifetime that derives from the +/// lifetime of the stream consumer. In general is not safe to pass a struct with an internal +/// reference across threads. However the `StreamConsumer` guarantees that the polling thread +/// is terminated before the consumer is actually dropped, ensuring that the messages +/// are safe to be used for their entire lifetime. +struct PolledMessagePtr { + message_ptr: *mut RDKafkaMessage, +} + +impl PolledMessagePtr { + /// Creates a new PolledPtr from a message pointer. It takes the ownership of the message. + fn new(message_ptr: *mut RDKafkaMessage) -> PolledMessagePtr { + trace!("New polled ptr {:?}", message_ptr); + PolledMessagePtr { message_ptr } + } + + /// Transforms the `PolledMessagePtr` into a message whose lifetime will be bound to the + /// lifetime of the provided consumer. If the librdkafka message represents an error, the error + /// will be returned instead. + fn into_message<'a>( + mut self, + ) -> KafkaResult> { + let msg = unsafe { BorrowedMessage::from_consumer(self.message_ptr) }; + self.message_ptr = ptr::null_mut(); + msg + } +} + +impl Drop for PolledMessagePtr { + /// If the `PolledMessagePtr` is hasn't been transformed into a message and the pointer is + /// still available, it will free the underlying resources. + fn drop(&mut self) { + if !self.message_ptr.is_null() { + trace!("Destroy PolledPtr {:?}", self.message_ptr); + unsafe { rdsys::rd_kafka_message_destroy(self.message_ptr) }; + } + } +} + +/// Allow message pointer to be moved across threads. +unsafe impl Send for PolledMessagePtr {} + +// A Kafka consumer implementing Stream. +/// +/// It can be used to receive messages as they are consumed from Kafka. Note: there might be +/// buffering between the actual Kafka consumer and the receiving end of this stream, so it is not +/// advised to use automatic commit, as some messages might have been consumed by the internal Kafka +/// consumer but not processed. Manual offset storing should be used, see the `store_offset` +/// function on `Consumer`. +#[pin_project] +pub struct MessageStream<'a, C: ConsumerContext + 'static> { + consumer: Arc>, + should_stop: Arc, + poll_interval_ms: i32, + send_none: bool, + #[pin] + pending: Option>, + phantom: &'a std::marker::PhantomData, +} + +enum PollConsumerResult { + Continue, + Ready(Option), +} + +impl<'a, C: ConsumerContext + 'static> MessageStream<'a, C> { + /// Create a new message stream from a base consumer + pub fn new( + consumer: Arc>, + should_stop: Arc, + poll_interval: Duration, + send_none: bool, + ) -> Self { + let poll_interval_ms = duration_to_millis(poll_interval) as i32; + Self { + consumer: consumer, + should_stop: should_stop, + poll_interval_ms: poll_interval_ms, + send_none: send_none, + pending: None, + phantom: &std::marker::PhantomData, + } + } + + /// Close the message stream + pub async fn close(&self) { + self.should_stop.store(true, Ordering::Relaxed); + } +} + +impl<'a, C: ConsumerContext + 'a> Stream for MessageStream<'a, C> { + type Item = KafkaResult>; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + let mut this = self.project(); + let mut pending: Pin<&mut Option>> = this.pending.as_mut(); + debug!("Polling stream for next message"); + loop { + if this.should_stop.load(Ordering::Relaxed) { + info!("Stopping"); + return Poll::Ready(None); + } else { + if let Some(to_poll) = pending.as_mut().as_pin_mut() { + debug!("Seeing if poll result is ready"); + if let PollConsumerResult::Ready(res) = futures::ready!(to_poll.poll(cx)) { + debug!("Poll result ready"); + pending.set(None); + let ret = match res { + None => { + debug!("No message polled, but forwarding none as requested"); + Poll::Ready(Some(Err(KafkaError::NoMessageReceived))) + }, + Some(polled_ptr) => Poll::Ready(Some(polled_ptr.into_message())), + }; + return ret; + } + } + debug!("Requesting next poll result"); + let consumer = Arc::clone(&this.consumer); + let poll_interval_ms = *this.poll_interval_ms; + let send_none = *this.send_none; + let f = block_on(move || { + match consumer.poll_raw(poll_interval_ms) { + None => { + if send_none { + PollConsumerResult::Ready(None) + } else { + PollConsumerResult::Continue + } + } + Some(m_ptr) => { + PollConsumerResult::Ready(Some(PolledMessagePtr::new(m_ptr))) + }, + } + }); + pending.replace(f); + } + } + } +} \ No newline at end of file diff --git a/src/consumer/mod.rs b/src/consumer/mod.rs index 9ac512ad5..77792fb20 100644 --- a/src/consumer/mod.rs +++ b/src/consumer/mod.rs @@ -1,10 +1,12 @@ //! Base trait and common functionality for all consumers. -pub mod base_consumer; -pub mod stream_consumer; +mod base_consumer; +mod message_stream; +mod stream_consumer; // Re-export -pub use self::base_consumer::BaseConsumer; -pub use self::stream_consumer::{MessageStream, StreamConsumer}; +pub use base_consumer::BaseConsumer; +pub use message_stream::MessageStream; +pub use stream_consumer::StreamConsumer; use crate::rdsys; use crate::rdsys::types::*; @@ -16,6 +18,7 @@ use crate::message::BorrowedMessage; use crate::metadata::Metadata; use crate::util::cstr_to_owned; +use log::*; use std::ptr; use std::time::Duration; diff --git a/src/consumer/stream_consumer.rs b/src/consumer/stream_consumer.rs index f72fe1978..eda56dc2f 100644 --- a/src/consumer/stream_consumer.rs +++ b/src/consumer/stream_consumer.rs @@ -1,145 +1,14 @@ //! Stream-based consumer implementation. -use crate::rdsys; -use crate::rdsys::types::*; -use futures::sync::mpsc; -use futures::{Future, Poll, Sink, Stream}; - use crate::config::{ClientConfig, FromClientConfig, FromClientConfigAndContext}; use crate::consumer::base_consumer::BaseConsumer; -use crate::consumer::{Consumer, ConsumerContext, DefaultConsumerContext}; -use crate::error::{KafkaError, KafkaResult}; -use crate::message::BorrowedMessage; -use crate::util::duration_to_millis; +use crate::consumer::{Consumer, ConsumerContext, DefaultConsumerContext, MessageStream}; +use crate::error::KafkaResult; -use std::ptr; +use log::*; use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::{Arc, Mutex}; -use std::thread::{self, JoinHandle}; +use std::sync::Arc; use std::time::Duration; -/// Default channel size for the stream consumer. The number of context switches -/// seems to decrease exponentially as the channel size is increased, and it stabilizes when -/// the channel size reaches 10 or so. -const CONSUMER_CHANNEL_SIZE: usize = 10; - -/// A small wrapper for a message pointer. This wrapper is only used to -/// pass a message between the polling thread and the thread consuming the stream, -/// and to transform it from pointer to `BorrowedMessage` with a lifetime that derives from the -/// lifetime of the stream consumer. In general is not safe to pass a struct with an internal -/// reference across threads. However the `StreamConsumer` guarantees that the polling thread -/// is terminated before the consumer is actually dropped, ensuring that the messages -/// are safe to be used for their entire lifetime. -struct PolledMessagePtr { - message_ptr: *mut RDKafkaMessage, -} - -impl PolledMessagePtr { - /// Creates a new PolledPtr from a message pointer. It takes the ownership of the message. - fn new(message_ptr: *mut RDKafkaMessage) -> PolledMessagePtr { - trace!("New polled ptr {:?}", message_ptr); - PolledMessagePtr { message_ptr } - } - - /// Transforms the `PolledMessagePtr` into a message whose lifetime will be bound to the - /// lifetime of the provided consumer. If the librdkafka message represents an error, the error - /// will be returned instead. - fn into_message_of( - mut self, - consumer: &StreamConsumer, - ) -> KafkaResult { - let msg = unsafe { BorrowedMessage::from_consumer(self.message_ptr, consumer) }; - self.message_ptr = ptr::null_mut(); - msg - } -} - -impl Drop for PolledMessagePtr { - /// If the `PolledMessagePtr` is hasn't been transformed into a message and the pointer is - /// still available, it will free the underlying resources. - fn drop(&mut self) { - if !self.message_ptr.is_null() { - trace!("Destroy PolledPtr {:?}", self.message_ptr); - unsafe { rdsys::rd_kafka_message_destroy(self.message_ptr) }; - } - } -} - -/// Allow message pointer to be moved across threads. -unsafe impl Send for PolledMessagePtr {} - -/// A Kafka consumer implementing Stream. -/// -/// It can be used to receive messages as they are consumed from Kafka. Note: there might be -/// buffering between the actual Kafka consumer and the receiving end of this stream, so it is not -/// advised to use automatic commit, as some messages might have been consumed by the internal Kafka -/// consumer but not processed. Manual offset storing should be used, see the `store_offset` -/// function on `Consumer`. -pub struct MessageStream<'a, C: ConsumerContext + 'static> { - consumer: &'a StreamConsumer, - receiver: mpsc::Receiver>, -} - -impl<'a, C: ConsumerContext + 'static> MessageStream<'a, C> { - fn new( - consumer: &'a StreamConsumer, - receiver: mpsc::Receiver>, - ) -> MessageStream<'a, C> { - MessageStream { consumer, receiver } - } -} - -impl<'a, C: ConsumerContext + 'a> Stream for MessageStream<'a, C> { - type Item = KafkaResult>; - type Error = (); - - fn poll(&mut self) -> Poll, Self::Error> { - self.receiver.poll().map(|ready| { - ready.map(|option| { - option.map(|polled_ptr_opt| { - polled_ptr_opt.map_or(Err(KafkaError::NoMessageReceived), |polled_ptr| { - polled_ptr.into_message_of(self.consumer) - }) - }) - }) - }) - } -} - -/// Internal consumer loop. This is the main body of the thread that will drive the stream consumer. -/// If `send_none` is true, the loop will send a None into the sender every time the poll times out. -fn poll_loop( - consumer: &BaseConsumer, - sender: mpsc::Sender>, - should_stop: &AtomicBool, - poll_interval: Duration, - send_none: bool, -) { - trace!("Polling thread loop started"); - let mut curr_sender = sender; - let poll_interval_ms = duration_to_millis(poll_interval) as i32; - while !should_stop.load(Ordering::Relaxed) { - trace!("Polling base consumer"); - let future_sender = match consumer.poll_raw(poll_interval_ms) { - None => { - if send_none { - curr_sender.send(None) - } else { - continue; // TODO: check stream closed - } - } - Some(m_ptr) => curr_sender.send(Some(PolledMessagePtr::new(m_ptr))), - }; - match future_sender.wait() { - Ok(new_sender) => curr_sender = new_sender, - Err(e) => { - debug!("Sender not available: {:?}", e); - break; - } - }; - } - trace!("Polling thread loop terminated"); -} - /// A Kafka Consumer providing a `futures::Stream` interface. /// /// This consumer doesn't need to be polled since it has a separate polling thread. Due to the @@ -149,9 +18,15 @@ fn poll_loop( /// `Consumer`. #[must_use = "Consumer polling thread will stop immediately if unused"] pub struct StreamConsumer { - consumer: Arc>, - should_stop: Arc, - handle: Mutex>>, + pub (crate) consumer: Arc>, + pub (crate) should_stop: Arc, +} + +impl StreamConsumer { + /// Expose the underlying consumer + pub fn consumer(&self) -> Arc> { + self.consumer.clone() + } } impl Consumer for StreamConsumer { @@ -175,7 +50,6 @@ impl FromClientConfigAndContext for StreamConsumer { let stream_consumer = StreamConsumer { consumer: Arc::new(BaseConsumer::from_config_and_context(config, context)?), should_stop: Arc::new(AtomicBool::new(false)), - handle: Mutex::new(None), }; Ok(stream_consumer) } @@ -193,37 +67,12 @@ impl StreamConsumer { /// `KafkaError::NoMessageReceived` every time the poll interval is reached and no message has /// been received. pub fn start_with(&self, poll_interval: Duration, no_message_error: bool) -> MessageStream { - // TODO: verify called once - let (sender, receiver) = mpsc::channel(CONSUMER_CHANNEL_SIZE); - let consumer = self.consumer.clone(); - let should_stop = self.should_stop.clone(); - let handle = thread::Builder::new() - .name("poll".to_string()) - .spawn(move || { - poll_loop( - consumer.as_ref(), - sender, - should_stop.as_ref(), - poll_interval, - no_message_error, - ); - }) - .expect("Failed to start polling thread"); - *self.handle.lock().unwrap() = Some(handle); - MessageStream::new(self, receiver) + MessageStream::new(Arc::clone(&self.consumer), Arc::clone(&self.should_stop), poll_interval, no_message_error) } - /// Stops the StreamConsumer, blocking the caller until the internal consumer has been stopped. + /// Stops the StreamConsumer pub fn stop(&self) { - let mut handle = self.handle.lock().unwrap(); - if let Some(handle) = handle.take() { - trace!("Stopping polling"); - self.should_stop.store(true, Ordering::Relaxed); - match handle.join() { - Ok(()) => trace!("Polling stopped"), - Err(e) => warn!("Failure while terminating thread: {:?}", e), - }; - } + self.should_stop.store(true, Ordering::Relaxed); } } diff --git a/src/error.rs b/src/error.rs index 49dd4ea8c..91525de09 100644 --- a/src/error.rs +++ b/src/error.rs @@ -74,47 +74,25 @@ impl fmt::Debug for KafkaError { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { match *self { KafkaError::AdminOp(err) => write!(f, "KafkaError (Admin operation error: {})", err), - KafkaError::AdminOpCreation(ref err) => { - write!(f, "KafkaError (Admin operation creation error: {})", err) - } + KafkaError::AdminOpCreation(ref err) => write!(f, "KafkaError (Admin operation creation error: {})", err), KafkaError::Canceled => write!(f, "KafkaError (Client dropped)"), - KafkaError::ClientConfig(_, ref desc, ref key, ref value) => write!( - f, - "KafkaError (Client config error: {} {} {})", - desc, key, value - ), - KafkaError::ClientCreation(ref err) => { - write!(f, "KafkaError (Client creation error: {})", err) - } - KafkaError::ConsumerCommit(err) => { - write!(f, "KafkaError (Consumer commit error: {})", err) + KafkaError::ClientConfig(_, ref desc, ref key, ref value) => { + write!(f, "KafkaError (Client config error: {} {} {})", desc, key, value) } + KafkaError::ClientCreation(ref err) => write!(f, "KafkaError (Client creation error: {})", err), + KafkaError::ConsumerCommit(err) => write!(f, "KafkaError (Consumer commit error: {})", err), KafkaError::Global(err) => write!(f, "KafkaError (Global error: {})", err), - KafkaError::GroupListFetch(err) => { - write!(f, "KafkaError (Group list fetch error: {})", err) - } - KafkaError::MessageConsumption(err) => { - write!(f, "KafkaError (Message consumption error: {})", err) - } - KafkaError::MessageProduction(err) => { - write!(f, "KafkaError (Message production error: {})", err) - } - KafkaError::MetadataFetch(err) => { - write!(f, "KafkaError (Metadata fetch error: {})", err) - } - KafkaError::NoMessageReceived => { - write!(f, "No message received within the given poll interval") - } + KafkaError::GroupListFetch(err) => write!(f, "KafkaError (Group list fetch error: {})", err), + KafkaError::MessageConsumption(err) => write!(f, "KafkaError (Message consumption error: {})", err), + KafkaError::MessageProduction(err) => write!(f, "KafkaError (Message production error: {})", err), + KafkaError::MetadataFetch(err) => write!(f, "KafkaError (Metadata fetch error: {})", err), + KafkaError::NoMessageReceived => write!(f, "No message received within the given poll interval"), KafkaError::Nul(_) => write!(f, "FFI null error"), KafkaError::OffsetFetch(err) => write!(f, "KafkaError (Offset fetch error: {})", err), KafkaError::PartitionEOF(part_n) => write!(f, "KafkaError (Partition EOF: {})", part_n), - KafkaError::SetPartitionOffset(err) => { - write!(f, "KafkaError (Set partition offset error: {})", err) - } + KafkaError::SetPartitionOffset(err) => write!(f, "KafkaError (Set partition offset error: {})", err), KafkaError::StoreOffset(err) => write!(f, "KafkaError (Store offset error: {})", err), - KafkaError::Subscription(ref err) => { - write!(f, "KafkaError (Subscription error: {})", err) - } + KafkaError::Subscription(ref err) => write!(f, "KafkaError (Subscription error: {})", err), } } } @@ -123,9 +101,7 @@ impl fmt::Display for KafkaError { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { match *self { KafkaError::AdminOp(err) => write!(f, "Admin operation error: {}", err), - KafkaError::AdminOpCreation(ref err) => { - write!(f, "Admin operation creation error: {}", err) - } + KafkaError::AdminOpCreation(ref err) => write!(f, "Admin operation creation error: {}", err), KafkaError::Canceled => write!(f, "KafkaError (Client dropped)"), KafkaError::ClientConfig(_, ref desc, ref key, ref value) => { write!(f, "Client config error: {} {} {}", desc, key, value) @@ -137,9 +113,7 @@ impl fmt::Display for KafkaError { KafkaError::MessageConsumption(err) => write!(f, "Message consumption error: {}", err), KafkaError::MessageProduction(err) => write!(f, "Message production error: {}", err), KafkaError::MetadataFetch(err) => write!(f, "Meta data fetch error: {}", err), - KafkaError::NoMessageReceived => { - write!(f, "No message received within the given poll interval") - } + KafkaError::NoMessageReceived => write!(f, "No message received within the given poll interval"), KafkaError::Nul(_) => write!(f, "FFI nul error"), KafkaError::OffsetFetch(err) => write!(f, "Offset fetch error: {}", err), KafkaError::PartitionEOF(part_n) => write!(f, "Partition EOF: {}", part_n), diff --git a/src/lib.rs b/src/lib.rs index b7c67a41b..fdd272cb1 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -214,15 +214,7 @@ //use std::alloc::System; //#[global_allocator] //static A: System = System; - -#[macro_use] -extern crate log; -#[macro_use] -extern crate serde_derive; -extern crate futures; -extern crate serde_json; - -extern crate rdkafka_sys as rdsys; +use rdkafka_sys as rdsys; pub use crate::rdsys::types; @@ -246,3 +238,12 @@ pub use crate::message::{Message, Timestamp}; pub use crate::statistics::Statistics; pub use crate::topic_partition_list::{Offset, TopicPartitionList}; pub use crate::util::IntoOpaque; + +/// Re-export of types useful when using rdkafka with async +pub mod async_support { + pub use crate::error::KafkaResult; + pub use crate::consumer::StreamConsumer; + pub use crate::message::OwnedMessage; + pub use crate::producer::{FutureProducer, FutureRecord}; + pub use crate::producer::future_producer::OwnedDeliveryResult; +} diff --git a/src/message.rs b/src/message.rs index f0c87c202..13872fec0 100644 --- a/src/message.rs +++ b/src/message.rs @@ -2,6 +2,7 @@ use crate::rdsys; use crate::rdsys::types::*; +use log::*; use std::ffi::{CStr, CString}; use std::fmt; use std::marker::PhantomData; @@ -214,10 +215,9 @@ impl<'a> BorrowedMessage<'a> { /// consumer. The lifetime of the message will be bound to the lifetime of the consumer passed /// as parameter. This method should only be used with messages coming from consumers. If the /// message contains an error, only the error is returned and the message structure is freed. - pub(crate) unsafe fn from_consumer( + pub(crate) unsafe fn from_consumer<'b>( ptr: *mut RDKafkaMessage, - _consumer: &'a C, - ) -> KafkaResult> { + ) -> KafkaResult> { if (*ptr).err.is_error() { let err = match (*ptr).err { rdsys::rd_kafka_resp_err_t::RD_KAFKA_RESP_ERR__PARTITION_EOF => { diff --git a/src/producer/base_producer.rs b/src/producer/base_producer.rs index eeaa10c6e..95c12769a 100644 --- a/src/producer/base_producer.rs +++ b/src/producer/base_producer.rs @@ -44,6 +44,7 @@ use crate::error::{IsError, KafkaError, KafkaResult}; use crate::message::{BorrowedMessage, OwnedHeaders, ToBytes}; use crate::util::{timeout_to_ms, IntoOpaque}; +use log::*; use std::ffi::CString; use std::mem; use std::os::raw::c_void; diff --git a/src/producer/future_producer.rs b/src/producer/future_producer.rs index e3a4218ea..fa3090a7b 100644 --- a/src/producer/future_producer.rs +++ b/src/producer/future_producer.rs @@ -11,10 +11,12 @@ use crate::producer::{BaseRecord, DeliveryResult, ProducerContext, ThreadedProdu use crate::statistics::Statistics; use crate::util::IntoOpaque; -use futures::{self, Canceled, Complete, Future, Oneshot, Poll}; +use futures::channel::oneshot::{channel, Sender}; +use log::*; use std::sync::Arc; use std::time::{Duration, Instant}; +use tokio_executor::blocking::{run as block_on}; // // ********** FUTURE PRODUCER ********** @@ -51,19 +53,6 @@ impl<'a, K: ToBytes + ?Sized, P: ToBytes + ?Sized> FutureRecord<'a, K, P> { } } - fn from_base_record( - base_record: BaseRecord<'a, K, P, D>, - ) -> FutureRecord<'a, K, P> { - FutureRecord { - topic: base_record.topic, - partition: base_record.partition, - key: base_record.key, - payload: base_record.payload, - timestamp: base_record.timestamp, - headers: base_record.headers, - } - } - /// Set the destination partition of the record. pub fn partition(mut self, partition: i32) -> FutureRecord<'a, K, P> { self.partition = Some(partition); @@ -119,7 +108,7 @@ struct FutureProducerContext { /// If message delivery was successful, `OwnedDeliveryResult` will return the partition and offset /// of the message. If the message failed to be delivered an error will be returned, together with /// an owned copy of the original message. -type OwnedDeliveryResult = Result<(i32, i64), (KafkaError, OwnedMessage)>; +pub type OwnedDeliveryResult = Result<(i32, i64), (KafkaError, OwnedMessage)>; // Delegates all the methods calls to the wrapped context. impl ClientContext for FutureProducerContext { @@ -137,9 +126,9 @@ impl ClientContext for FutureProducerContext { } impl ProducerContext for FutureProducerContext { - type DeliveryOpaque = Box>; + type DeliveryOpaque = Box>; - fn delivery(&self, delivery_result: &DeliveryResult, tx: Box>) { + fn delivery(&self, delivery_result: &DeliveryResult, tx: Box>) { let owned_delivery_result = match *delivery_result { Ok(ref message) => Ok((message.partition(), message.offset())), Err((ref error, ref message)) => Err((error.clone(), message.detach())), @@ -190,21 +179,13 @@ impl FromClientConfigAndContext for FutureProduce } } -/// A [Future] wrapping the result of the message production. -/// -/// Once completed, the future will contain an `OwnedDeliveryResult` with information on the -/// delivery status of the message. -pub struct DeliveryFuture { - rx: Oneshot, -} - -impl Future for DeliveryFuture { - type Item = OwnedDeliveryResult; - type Error = Canceled; - - fn poll(&mut self) -> Poll { - self.rx.poll() - } +enum ProducerPollResult<'a, K, P> + where + K: ToBytes + ?Sized, + P: ToBytes + ?Sized, +{ + Produce(BaseRecord<'a, K, P, Box>>), + Delivered, } impl FutureProducer { @@ -213,28 +194,43 @@ impl FutureProducer { /// is allowed to block if the queue is full. Set it to -1 to block forever, or 0 to never block. /// If `block_ms` is reached and the queue is still full, a [RDKafkaError::QueueFull] will be /// reported in the [DeliveryFuture]. - pub fn send(&self, record: FutureRecord, block_ms: i64) -> DeliveryFuture + pub async fn send<'a, K, P>(&self, record: FutureRecord<'a, K, P>, block_ms: i64) -> KafkaResult where K: ToBytes + ?Sized, P: ToBytes + ?Sized, { let start_time = Instant::now(); + let timeout_dur = if block_ms > 0 { + Some(Duration::from_millis(block_ms as u64)) + } else { + None + }; - let (tx, rx) = futures::oneshot(); - let mut base_record = record.into_base_record(Box::new(tx)); + let (tx, rx) = channel(); + let record = record.into_base_record(Box::new(tx)); - loop { - match self.producer.send(base_record) { - Ok(_) => break DeliveryFuture { rx }, + let mut poll_result = ProducerPollResult::Produce(record); + + while let ProducerPollResult::Produce(to_produce) = std::mem::replace(&mut poll_result, ProducerPollResult::Delivered) { + poll_result = match self.producer.send(to_produce) { + Ok(_) => { + trace!("Record successfully delivered"); + ProducerPollResult::Delivered + }, Err((KafkaError::MessageProduction(RDKafkaError::QueueFull), record)) => { - base_record = record; - if block_ms == -1 { - continue; - } else if block_ms > 0 - && start_time.elapsed() < Duration::from_millis(block_ms as u64) - { - self.poll(Duration::from_millis(100)); - continue; + if let Some(to) = timeout_dur { + let timed_out = start_time.elapsed() >= to; + if timed_out { + debug!("Queue full and timeout reached"); + return Err(KafkaError::MessageProduction(RDKafkaError::QueueFull)); + } else { + debug!("Queue full, polling for 100ms before trying again"); + let producer = Arc::clone(&self.producer); + block_on(move || producer.poll(Duration::from_millis(100))).await; + ProducerPollResult::Produce(record) + } + } else { + ProducerPollResult::Produce(record) } } Err((e, record)) => { @@ -250,39 +246,49 @@ impl FutureProducer { record.headers, ); let _ = record.delivery_opaque.send(Err((e, owned_message))); - break DeliveryFuture { rx }; + ProducerPollResult::Delivered } - } + }; } + + rx.await.map_err(|_| KafkaError::Canceled) } /// Same as [FutureProducer::send], with the only difference that if enqueuing fails, an /// error will be returned immediately, alongside the [FutureRecord] provided. - pub fn send_result<'a, K, P>( + pub async fn send_result<'a, K, P>( &self, record: FutureRecord<'a, K, P>, - ) -> Result)> + ) -> KafkaResult where K: ToBytes + ?Sized, P: ToBytes + ?Sized, { - let (tx, rx) = futures::oneshot(); - let base_record = record.into_base_record(Box::new(tx)); - self.producer - .send(base_record) - .map(|()| DeliveryFuture { rx }) - .map_err(|(e, record)| (e, FutureRecord::from_base_record(record))) - } + let (tx, rx) = channel(); + + let record = record.into_base_record(Box::new(tx)); + if let Err((e, record)) = self.producer.send(record) { + let owned_message = OwnedMessage::new( + record.payload.map(|p| p.to_bytes().to_vec()), + record.key.map(|k| k.to_bytes().to_vec()), + record.topic.to_owned(), + record + .timestamp + .map_or(Timestamp::NotAvailable, Timestamp::CreateTime), + record.partition.unwrap_or(-1), + 0, + record.headers, + ); + let _ = record.delivery_opaque.send(Err((e, owned_message))); + } - /// Polls the internal producer. This is not normally required since the `ThreadedProducer` had - /// a thread dedicated to calling `poll` regularly. - pub fn poll>>(&self, timeout: T) { - self.producer.poll(timeout); + rx.await.map_err(|_| KafkaError::Canceled) } /// Flushes the producer. Should be called before termination. - pub fn flush>>(&self, timeout: T) { - self.producer.flush(timeout); + pub async fn flush> + Send + 'static>(&self, timeout: T) { + let producer = Arc::clone(&self.producer); + block_on(move || producer.flush(timeout)).await } /// Returns the number of messages waiting to be sent, or send but not acknowledged yet. diff --git a/src/producer/mod.rs b/src/producer/mod.rs index 509581a2c..a558d1c20 100644 --- a/src/producer/mod.rs +++ b/src/producer/mod.rs @@ -78,8 +78,8 @@ pub mod base_producer; pub mod future_producer; -pub use self::base_producer::{ +pub use base_producer::{ BaseProducer, BaseRecord, DefaultProducerContext, DeliveryResult, ProducerContext, ThreadedProducer, }; -pub use self::future_producer::{DeliveryFuture, FutureProducer, FutureRecord}; +pub use future_producer::{FutureProducer, FutureRecord}; diff --git a/src/statistics.rs b/src/statistics.rs index 9b101d148..5af863bc9 100644 --- a/src/statistics.rs +++ b/src/statistics.rs @@ -2,6 +2,7 @@ #![allow(missing_docs)] use std::collections::HashMap; +use serde::Deserialize; /// Statistics from librdkafka. Refer to the [librdkafka documentation](https://github.com/edenhill/librdkafka/wiki/Statistics) /// for details. diff --git a/src/topic_partition_list.rs b/src/topic_partition_list.rs index 398fc1b60..98fcd974d 100644 --- a/src/topic_partition_list.rs +++ b/src/topic_partition_list.rs @@ -5,6 +5,7 @@ use crate::rdsys::types::*; use crate::error::{IsError, KafkaError, KafkaResult}; +use log::*; use std::collections::HashMap; use std::ffi::{CStr, CString}; use std::fmt; diff --git a/tests/test_admin.rs b/tests/test_admin.rs index af02d7122..1bcc42fdc 100644 --- a/tests/test_admin.rs +++ b/tests/test_admin.rs @@ -1,14 +1,11 @@ //! Test administrative commands using the admin API. - use backoff::{ExponentialBackoff, Operation}; -use futures::Future; - use std::time::Duration; use rdkafka::admin::{ - AdminClient, AdminOptions, AlterConfig, ConfigEntry, ConfigSource, NewPartitions, NewTopic, - OwnedResourceSpecifier, ResourceSpecifier, TopicReplication, + AdminClient, AdminOptions, AlterConfig, ConfigEntry, ConfigSource, NewPartitions, NewTopic, OwnedResourceSpecifier, + ResourceSpecifier, TopicReplication, }; use rdkafka::client::DefaultClientContext; use rdkafka::consumer::{BaseConsumer, Consumer, DefaultConsumerContext}; @@ -17,7 +14,8 @@ use rdkafka::metadata::Metadata; use rdkafka::ClientConfig; mod utils; -use crate::utils::*; +use utils::*; +use backoff::backoff::Backoff; fn create_config() -> ClientConfig { let mut config = ClientConfig::new(); @@ -26,14 +24,11 @@ fn create_config() -> ClientConfig { } fn create_admin_client() -> AdminClient { - create_config() - .create() - .expect("admin client creation failed") + create_config().create().expect("admin client creation failed") } fn fetch_metadata(topic: &str) -> Metadata { - let consumer: BaseConsumer = - create_config().create().expect("consumer creation failed"); + let consumer: BaseConsumer = create_config().create().expect("consumer creation failed"); let timeout = Some(Duration::from_secs(1)); let mut backoff = ExponentialBackoff::default(); @@ -56,8 +51,7 @@ fn fetch_metadata(topic: &str) -> Metadata { } fn verify_delete(topic: &str) { - let consumer: BaseConsumer = - create_config().create().expect("consumer creation failed"); + let consumer: BaseConsumer = create_config().create().expect("consumer creation failed"); let timeout = Some(Duration::from_secs(1)); let mut backoff = ExponentialBackoff::default(); @@ -66,9 +60,7 @@ fn verify_delete(topic: &str) { // Asking about the topic specifically will recreate it (under the // default Kafka configuration, at least) so we have to ask for the list // of all topics and search through it. - let metadata = consumer - .fetch_metadata(None, timeout) - .map_err(|e| e.to_string())?; + let metadata = consumer.fetch_metadata(None, timeout).map_err(|e| e.to_string())?; if let Some(_) = metadata.topics().iter().find(|t| t.name() == topic) { Err(format!("topic {} still exists", topic))? } @@ -78,8 +70,10 @@ fn verify_delete(topic: &str) { .unwrap() } -#[test] -fn test_topics() { +#[tokio::test] +async fn test_topics() { + let _ = env_logger::try_init(); + let admin_client = create_admin_client(); let opts = AdminOptions::new().operation_timeout(Duration::from_secs(1)); @@ -90,8 +84,8 @@ fn test_topics() { let name2 = rand_test_topic(); // Test both the builder API and the literal construction. - let topic1 = - NewTopic::new(&name1, 1, TopicReplication::Fixed(1)).set("max.message.bytes", "1234"); + let topic1 = NewTopic::new(&name1, 1, TopicReplication::Fixed(1)) + .set("max.message.bytes", "1234"); let topic2 = NewTopic { name: &name2, num_partitions: 3, @@ -101,7 +95,7 @@ fn test_topics() { let res = admin_client .create_topics(&[topic1, topic2], &opts) - .wait() + .await .expect("topic creation failed"); assert_eq!(res, &[Ok(name1.clone()), Ok(name2.clone())]); @@ -118,13 +112,9 @@ fn test_topics() { let res = admin_client .describe_configs( - &[ - ResourceSpecifier::Topic(&name1), - ResourceSpecifier::Topic(&name2), - ], + &[ResourceSpecifier::Topic(&name1), ResourceSpecifier::Topic(&name2)], &opts, - ) - .wait() + ).await .expect("describe configs failed"); let config1 = &res[0].as_ref().expect("describe configs failed on topic 1"); let config2 = &res[1].as_ref().expect("describe configs failed on topic 2"); @@ -153,19 +143,13 @@ fn test_topics() { let config_entries2 = config2.entry_map(); assert_eq!(config1.entries.len(), config_entries1.len()); assert_eq!(config2.entries.len(), config_entries2.len()); - assert_eq!( - Some(&&expected_entry1), - config_entries1.get("max.message.bytes") - ); - assert_eq!( - Some(&&expected_entry2), - config_entries2.get("max.message.bytes") - ); + assert_eq!(Some(&&expected_entry1), config_entries1.get("max.message.bytes")); + assert_eq!(Some(&&expected_entry2), config_entries2.get("max.message.bytes")); let partitions1 = NewPartitions::new(&name1, 5); let res = admin_client .create_partitions(&[partitions1], &opts) - .wait() + .await .expect("partition creation failed"); assert_eq!(res, &[Ok(name1.clone())]); @@ -185,7 +169,7 @@ fn test_topics() { let res = admin_client .delete_topics(&[&name1, &name2], &opts) - .wait() + .await .expect("topic deletion failed"); assert_eq!(res, &[Ok(name1.clone()), Ok(name2.clone())]); verify_delete(&name1); @@ -196,7 +180,7 @@ fn test_topics() { // creating topics. { let topic = NewTopic::new("ignored", 1, TopicReplication::Variable(&[&[0], &[0]])); - let res = admin_client.create_topics(&[topic], &opts).wait(); + let res = admin_client.create_topics(&[topic], &opts).await; assert_eq!( Err(KafkaError::AdminOpCreation( "replication configuration for topic 'ignored' assigns 2 partition(s), \ @@ -215,7 +199,7 @@ fn test_topics() { let res = admin_client .create_topics(vec![&topic], &opts) - .wait() + .await .expect("topic creation failed"); assert_eq!(res, &[Ok(name.clone())]); let _ = fetch_metadata(&name); @@ -223,7 +207,7 @@ fn test_topics() { // This partition specification is obviously garbage, and so trips // a client-side error. let partitions = NewPartitions::new(&name, 2).assign(&[&[0], &[0], &[0]]); - let res = admin_client.create_partitions(&[partitions], &opts).wait(); + let res = admin_client.create_partitions(&[partitions], &opts).await; assert_eq!( res, Err(KafkaError::AdminOpCreation(format!( @@ -237,7 +221,7 @@ fn test_topics() { let partitions = NewPartitions::new(&name, 2).assign(&[&[0], &[0]]); let res = admin_client .create_partitions(&[partitions], &opts) - .wait() + .await .expect("partition creation failed"); assert_eq!(res, &[Err((name, RDKafkaError::InvalidReplicaAssignment))],); } @@ -247,7 +231,7 @@ fn test_topics() { let name = rand_test_topic(); let res = admin_client .delete_topics(&[&name], &opts) - .wait() + .await .expect("delete topics failed"); assert_eq!(res, &[Err((name, RDKafkaError::UnknownTopicOrPartition))]); } @@ -263,14 +247,14 @@ fn test_topics() { let res = admin_client .create_topics(vec![&topic1], &opts) - .wait() + .await .expect("topic creation failed"); assert_eq!(res, &[Ok(name1.clone())]); let _ = fetch_metadata(&name1); let res = admin_client .create_topics(vec![&topic1, &topic2], &opts) - .wait() + .await .expect("topic creation failed"); assert_eq!( res, @@ -283,14 +267,14 @@ fn test_topics() { let res = admin_client .delete_topics(&[&name1], &opts) - .wait() + .await .expect("topic deletion failed"); assert_eq!(res, &[Ok(name1.clone())]); verify_delete(&name1); let res = admin_client .delete_topics(&[&name2, &name1], &opts) - .wait() + .await .expect("topic deletion failed"); assert_eq!( res, @@ -302,15 +286,17 @@ fn test_topics() { } } -#[test] -fn test_configs() { +#[tokio::test] +async fn test_configs() { + let _ = env_logger::try_init(); + let admin_client = create_admin_client(); let opts = AdminOptions::new(); let broker = ResourceSpecifier::Broker(0); let res = admin_client .describe_configs(&[broker], &opts) - .wait() + .await .expect("describe configs failed"); let config = &res[0].as_ref().expect("describe configs failed"); let orig_val = config @@ -323,16 +309,16 @@ fn test_configs() { let config = AlterConfig::new(broker).set("log.flush.interval.messages", "1234"); let res = admin_client .alter_configs(&[config], &opts) - .wait() + .await .expect("alter configs failed"); assert_eq!(res, &[Ok(OwnedResourceSpecifier::Broker(0))]); let mut backoff = ExponentialBackoff::default(); backoff.max_elapsed_time = Some(Duration::from_secs(5)); - (|| { + loop { let res = admin_client .describe_configs(&[broker], &opts) - .wait() + .await .expect("describe configs failed"); let config = &res[0].as_ref().expect("describe configs failed"); let entry = config.get("log.flush.interval.messages"); @@ -358,17 +344,16 @@ fn test_configs() { } }; if entry != Some(&expected_entry) { - Err(format!("{:?} != {:?}", entry, Some(&expected_entry)))? + let next_backoff = backoff.next_backoff().expect(&format!("{:?} != {:?}", entry, Some(&expected_entry))); + tokio::timer::delay_for(next_backoff).await; } - Ok(()) - }) - .retry(&mut backoff) - .unwrap(); + break + } let config = AlterConfig::new(broker).set("log.flush.interval.ms", &orig_val); let res = admin_client .alter_configs(&[config], &opts) - .wait() + .await .expect("alter configs failed"); assert_eq!(res, &[Ok(OwnedResourceSpecifier::Broker(0))]); } @@ -376,8 +361,10 @@ fn test_configs() { // Tests whether each admin operation properly reports an error if the entire // request fails. The original implementations failed to check this, resulting // in confusing situations where a failed admin request would return Ok([]). -#[test] -fn test_event_errors() { +#[tokio::test] +async fn test_event_errors() { + let _ = env_logger::try_init(); + // Configure an admin client to target a Kafka server that doesn't exist, // then set an impossible timeout. This will ensure that every request fails // with an OperationTimedOut error, assuming, of course, that the request @@ -388,33 +375,18 @@ fn test_event_errors() { .expect("admin client creation failed"); let opts = AdminOptions::new().request_timeout(Duration::from_nanos(1)); - let res = admin_client.create_topics(&[], &opts).wait(); - assert_eq!( - res, - Err(KafkaError::AdminOp(RDKafkaError::OperationTimedOut)) - ); - - let res = admin_client.create_partitions(&[], &opts).wait(); - assert_eq!( - res, - Err(KafkaError::AdminOp(RDKafkaError::OperationTimedOut)) - ); - - let res = admin_client.delete_topics(&[], &opts).wait(); - assert_eq!( - res, - Err(KafkaError::AdminOp(RDKafkaError::OperationTimedOut)) - ); - - let res = admin_client.describe_configs(&[], &opts).wait(); - assert_eq!( - res.err(), - Some(KafkaError::AdminOp(RDKafkaError::OperationTimedOut)) - ); - - let res = admin_client.alter_configs(&[], &opts).wait(); - assert_eq!( - res, - Err(KafkaError::AdminOp(RDKafkaError::OperationTimedOut)) - ); + let res = admin_client.create_topics(&[], &opts).await; + assert_eq!(res, Err(KafkaError::AdminOp(RDKafkaError::OperationTimedOut))); + + let res = admin_client.create_partitions(&[], &opts).await; + assert_eq!(res, Err(KafkaError::AdminOp(RDKafkaError::OperationTimedOut))); + + let res = admin_client.delete_topics(&[], &opts).await; + assert_eq!(res, Err(KafkaError::AdminOp(RDKafkaError::OperationTimedOut))); + + let res = admin_client.describe_configs(&[], &opts).await; + assert_eq!(res.err(), Some(KafkaError::AdminOp(RDKafkaError::OperationTimedOut))); + + let res = admin_client.alter_configs(&[], &opts).await; + assert_eq!(res, Err(KafkaError::AdminOp(RDKafkaError::OperationTimedOut))); } diff --git a/tests/test_consumers.rs b/tests/test_consumers.rs index d923b7f41..19f92c64e 100644 --- a/tests/test_consumers.rs +++ b/tests/test_consumers.rs @@ -1,23 +1,18 @@ //! Test data consumption using low level and high level consumers. -extern crate env_logger; -extern crate futures; -extern crate rand; -extern crate rdkafka; -extern crate rdkafka_sys; +use futures::StreamExt; +use log::*; -use futures::*; - -use rdkafka::consumer::{BaseConsumer, CommitMode, Consumer, ConsumerContext, StreamConsumer}; +use rdkafka::{ClientConfig, ClientContext, Message, Statistics, Timestamp}; +use rdkafka::consumer::{BaseConsumer, Consumer, ConsumerContext, CommitMode, StreamConsumer}; use rdkafka::error::{KafkaError, KafkaResult}; use rdkafka::topic_partition_list::{Offset, TopicPartitionList}; use rdkafka::util::current_time_millis; -use rdkafka::{ClientConfig, ClientContext, Message, Statistics, Timestamp}; mod utils; -use crate::utils::*; +use utils::*; -use std::collections::HashMap; use std::time::{Duration, Instant}; +use std::collections::HashMap; struct TestContext { _n: i64, // Add data for memory access validation @@ -32,11 +27,7 @@ impl ClientContext for TestContext { } impl ConsumerContext for TestContext { - fn commit_callback( - &self, - result: KafkaResult<()>, - _offsets: *mut rdkafka_sys::RDKafkaTopicPartitionList, - ) { + fn commit_callback(&self, result: KafkaResult<()>, _offsets: *mut rdkafka_sys::RDKafkaTopicPartitionList) { println!("Committing offsets: {:?}", result); } } @@ -85,7 +76,7 @@ fn create_stream_consumer_with_context( fn create_base_consumer( group_id: &str, - config_overrides: Option>, + config_overrides: Option> ) -> BaseConsumer { consumer_config(group_id, config_overrides) .create_with_context(TestContext { _n: 64 }) @@ -93,13 +84,13 @@ fn create_base_consumer( } // All produced messages should be consumed. -#[test] -fn test_produce_consume_iter() { +#[tokio::test] +async fn test_produce_consume_iter() { let _r = env_logger::try_init(); let start_time = current_time_millis(); let topic_name = rand_test_topic(); - let message_map = populate_topic(&topic_name, 100, &value_fn, &key_fn, None, None); + let message_map = populate_topic(&topic_name, 100, &value_fn, &key_fn, None, None).await; let consumer = create_base_consumer(&rand_test_group(), None); consumer.subscribe(&[topic_name.as_str()]).unwrap(); @@ -109,30 +100,29 @@ fn test_produce_consume_iter() { let id = message_map[&(m.partition(), m.offset())]; match m.timestamp() { Timestamp::CreateTime(timestamp) => assert!(timestamp >= start_time), - _ => panic!("Expected createtime for message timestamp"), + _ => panic!("Expected createtime for message timestamp") }; assert_eq!(m.payload_view::().unwrap().unwrap(), value_fn(id)); assert_eq!(m.key_view::().unwrap().unwrap(), key_fn(id)); assert_eq!(m.topic(), topic_name.as_str()); - } - Err(e) => panic!("Error receiving message: {:?}", e), + }, + Err(e) => panic!("Error receiving message: {:?}", e) } } } // All produced messages should be consumed. -#[test] -fn test_produce_consume_base() { +#[tokio::test] +async fn test_produce_consume_base() { let _r = env_logger::try_init(); let start_time = current_time_millis(); let topic_name = rand_test_topic(); - let message_map = populate_topic(&topic_name, 100, &value_fn, &key_fn, None, None); + let message_map = populate_topic(&topic_name, 100, &value_fn, &key_fn, None, None).await; let consumer = create_stream_consumer(&rand_test_group(), None); consumer.subscribe(&[topic_name.as_str()]).unwrap(); - let _consumer_future = consumer - .start() + let _consumer_future = consumer.start() .take(100) .for_each(|message| { match message { @@ -140,28 +130,29 @@ fn test_produce_consume_base() { let id = message_map[&(m.partition(), m.offset())]; match m.timestamp() { Timestamp::CreateTime(timestamp) => assert!(timestamp >= start_time), - _ => panic!("Expected createtime for message timestamp"), + _ => panic!("Expected createtime for message timestamp") }; assert_eq!(m.payload_view::().unwrap().unwrap(), value_fn(id)); assert_eq!(m.key_view::().unwrap().unwrap(), key_fn(id)); assert_eq!(m.topic(), topic_name.as_str()); - } - Err(e) => panic!("Error receiving message: {:?}", e), + }, + Err(e) => panic!("Error receiving message: {:?}", e) }; - Ok(()) - }) - .wait(); + futures::future::ready(()) + }).await; + + consumer.stop(); } // All produced messages should be consumed. -#[test] -fn test_produce_consume_base_assign() { +#[tokio::test] +async fn test_produce_consume_base_assign() { let _r = env_logger::try_init(); let topic_name = rand_test_topic(); - populate_topic(&topic_name, 10, &value_fn, &key_fn, Some(0), None); - populate_topic(&topic_name, 10, &value_fn, &key_fn, Some(1), None); - populate_topic(&topic_name, 10, &value_fn, &key_fn, Some(2), None); + populate_topic(&topic_name, 10, &value_fn, &key_fn, Some(0), None).await; + populate_topic(&topic_name, 10, &value_fn, &key_fn, Some(1), None).await; + populate_topic(&topic_name, 10, &value_fn, &key_fn, Some(2), None).await; let consumer = create_stream_consumer(&rand_test_group(), None); let mut tpl = TopicPartitionList::new(); tpl.add_partition_offset(&topic_name, 0, Offset::Beginning); @@ -169,35 +160,34 @@ fn test_produce_consume_base_assign() { tpl.add_partition_offset(&topic_name, 2, Offset::Offset(9)); consumer.assign(&tpl).unwrap(); - let mut partition_count = vec![0, 0, 0]; + let mut partition_count: Vec = vec![0, 0, 0]; - let _consumer_future = consumer - .start() + let _consumer_future = consumer.start() .take(19) .for_each(|message| { match message { Ok(m) => partition_count[m.partition() as usize] += 1, - Err(e) => panic!("Error receiving message: {:?}", e), + Err(e) => panic!("Error receiving message: {:?}", e) }; - Ok(()) - }) - .wait(); + futures::future::ready(()) + }).await; + + consumer.stop(); assert_eq!(partition_count, vec![10, 8, 1]); } // All produced messages should be consumed. -#[test] -fn test_produce_consume_with_timestamp() { +#[tokio::test] +async fn test_produce_consume_with_timestamp() { let _r = env_logger::try_init(); let topic_name = rand_test_topic(); - let message_map = populate_topic(&topic_name, 100, &value_fn, &key_fn, Some(0), Some(1111)); + let message_map = populate_topic(&topic_name, 100, &value_fn, &key_fn, Some(0), Some(1111)).await; let consumer = create_stream_consumer(&rand_test_group(), None); consumer.subscribe(&[topic_name.as_str()]).unwrap(); - let _consumer_future = consumer - .start() + let _consumer_future = consumer.start() .take(100) .for_each(|message| { match message { @@ -206,39 +196,39 @@ fn test_produce_consume_with_timestamp() { assert_eq!(m.timestamp(), Timestamp::CreateTime(1111)); assert_eq!(m.payload_view::().unwrap().unwrap(), value_fn(id)); assert_eq!(m.key_view::().unwrap().unwrap(), key_fn(id)); - } - Err(e) => panic!("Error receiving message: {:?}", e), + }, + Err(e) => panic!("Error receiving message: {:?}", e) }; - Ok(()) - }) - .wait(); + futures::future::ready(()) + }).await; - populate_topic(&topic_name, 10, &value_fn, &key_fn, Some(0), Some(999_999)); + populate_topic(&topic_name, 10, &value_fn, &key_fn, Some(0), Some(999_999)).await; // Lookup the offsets - let tpl = consumer - .offsets_for_timestamp(999_999, Duration::from_secs(10)) - .unwrap(); + let tpl = consumer.offsets_for_timestamp(999_999, Duration::from_secs(10)).unwrap(); let tp = tpl.find_partition(&topic_name, 0).unwrap(); assert_eq!(tp.topic(), topic_name); assert_eq!(tp.offset(), Offset::Offset(100)); assert_eq!(tp.partition(), 0); assert_eq!(tp.error(), Ok(())); + + consumer.stop(); } -#[test] -fn test_consume_with_no_message_error() { +#[tokio::test] +async fn test_consume_with_no_message_error() { let _r = env_logger::try_init(); let consumer = create_stream_consumer(&rand_test_group(), None); - let message_stream = consumer.start_with(Duration::from_millis(200), true); + let mut message_stream = consumer.start_with(Duration::from_millis(200), true); let mut first_poll_time = None; let mut timeouts_count = 0; - for message in message_stream.wait() { + while let Some(message) = message_stream.next().await { match message { - Ok(Err(KafkaError::NoMessageReceived)) => { + Err(KafkaError::NoMessageReceived) => { + debug!("No message received"); // TODO: use entry interface for Options once available if first_poll_time.is_none() { first_poll_time = Some(Instant::now()); @@ -249,10 +239,12 @@ fn test_consume_with_no_message_error() { } } Ok(m) => panic!("A message was actually received: {:?}", m), - Err(e) => panic!("Unexpected error while receiving message: {:?}", e), + Err(e) => panic!("Unexpected error while receiving message: {:?}", e) }; } + consumer.stop(); + assert_eq!(timeouts_count, 26); // It should take 5000ms println!("Duration: {:?}", first_poll_time.unwrap().elapsed()); @@ -260,20 +252,21 @@ fn test_consume_with_no_message_error() { assert!(first_poll_time.unwrap().elapsed() > Duration::from_millis(4500)); } + + // TODO: add check that commit cb gets called correctly -#[test] -fn test_consumer_commit_message() { +#[tokio::test] +async fn test_consumer_commit_message() { let _r = env_logger::try_init(); let topic_name = rand_test_topic(); - populate_topic(&topic_name, 10, &value_fn, &key_fn, Some(0), None); - populate_topic(&topic_name, 11, &value_fn, &key_fn, Some(1), None); - populate_topic(&topic_name, 12, &value_fn, &key_fn, Some(2), None); + populate_topic(&topic_name, 10, &value_fn, &key_fn, Some(0), None).await; + populate_topic(&topic_name, 11, &value_fn, &key_fn, Some(1), None).await; + populate_topic(&topic_name, 12, &value_fn, &key_fn, Some(2), None).await; let consumer = create_stream_consumer(&rand_test_group(), None); consumer.subscribe(&[topic_name.as_str()]).unwrap(); - let _consumer_future = consumer - .start() + let _consumer_future = consumer.start() .take(33) .for_each(|message| { match message { @@ -281,26 +274,16 @@ fn test_consumer_commit_message() { if m.partition() == 1 { consumer.commit_message(&m, CommitMode::Async).unwrap(); } - } - Err(e) => panic!("error receiving message: {:?}", e), + }, + Err(e) => panic!("error receiving message: {:?}", e) }; - Ok(()) - }) - .wait(); + futures::future::ready(()) + }); let timeout = Duration::from_secs(5); - assert_eq!( - consumer.fetch_watermarks(&topic_name, 0, timeout).unwrap(), - (0, 10) - ); - assert_eq!( - consumer.fetch_watermarks(&topic_name, 1, timeout).unwrap(), - (0, 11) - ); - assert_eq!( - consumer.fetch_watermarks(&topic_name, 2, timeout).unwrap(), - (0, 12) - ); + assert_eq!(consumer.fetch_watermarks(&topic_name, 0, timeout).unwrap(), (0, 10)); + assert_eq!(consumer.fetch_watermarks(&topic_name, 1, timeout).unwrap(), (0, 11)); + assert_eq!(consumer.fetch_watermarks(&topic_name, 2, timeout).unwrap(), (0, 12)); let mut assignment = TopicPartitionList::new(); assignment.add_partition_offset(&topic_name, 0, Offset::Invalid); @@ -319,24 +302,25 @@ fn test_consumer_commit_message() { position.add_partition_offset(&topic_name, 1, Offset::Offset(11)); position.add_partition_offset(&topic_name, 2, Offset::Offset(12)); assert_eq!(position, consumer.position().unwrap()); + + consumer.stop(); } -#[test] -fn test_consumer_store_offset_commit() { +#[tokio::test] +async fn test_consumer_store_offset_commit() { let _r = env_logger::try_init(); let topic_name = rand_test_topic(); - populate_topic(&topic_name, 10, &value_fn, &key_fn, Some(0), None); - populate_topic(&topic_name, 11, &value_fn, &key_fn, Some(1), None); - populate_topic(&topic_name, 12, &value_fn, &key_fn, Some(2), None); + populate_topic(&topic_name, 10, &value_fn, &key_fn, Some(0), None).await; + populate_topic(&topic_name, 11, &value_fn, &key_fn, Some(1), None).await; + populate_topic(&topic_name, 12, &value_fn, &key_fn, Some(2), None).await; let mut config = HashMap::new(); config.insert("enable.auto.offset.store", "false"); config.insert("enable.partition.eof", "true"); let consumer = create_stream_consumer(&rand_test_group(), Some(config)); consumer.subscribe(&[topic_name.as_str()]).unwrap(); - let _consumer_future = consumer - .start() + let _consumer_future = consumer.start() .take(36) .for_each(|message| { match message { @@ -344,30 +328,20 @@ fn test_consumer_store_offset_commit() { if m.partition() == 1 { consumer.store_offset(&m).unwrap(); } - } - Err(KafkaError::PartitionEOF(_)) => {} - Err(e) => panic!("Error receiving message: {:?}", e), + }, + Err(KafkaError::PartitionEOF(_)) => {}, + Err(e) => panic!("Error receiving message: {:?}", e) }; - Ok(()) - }) - .wait(); + futures::future::ready(()) + }).await; // Commit the whole current state consumer.commit_consumer_state(CommitMode::Sync).unwrap(); let timeout = Duration::from_secs(5); - assert_eq!( - consumer.fetch_watermarks(&topic_name, 0, timeout).unwrap(), - (0, 10) - ); - assert_eq!( - consumer.fetch_watermarks(&topic_name, 1, timeout).unwrap(), - (0, 11) - ); - assert_eq!( - consumer.fetch_watermarks(&topic_name, 2, timeout).unwrap(), - (0, 12) - ); + assert_eq!(consumer.fetch_watermarks(&topic_name, 0, timeout).unwrap(), (0, 10)); + assert_eq!(consumer.fetch_watermarks(&topic_name, 1, timeout).unwrap(), (0, 11)); + assert_eq!(consumer.fetch_watermarks(&topic_name, 2, timeout).unwrap(), (0, 12)); let mut assignment = TopicPartitionList::new(); assignment.add_partition_offset(&topic_name, 0, Offset::Invalid); @@ -386,4 +360,6 @@ fn test_consumer_store_offset_commit() { position.add_partition_offset(&topic_name, 1, Offset::Offset(11)); position.add_partition_offset(&topic_name, 2, Offset::Offset(12)); assert_eq!(position, consumer.position().unwrap()); + + consumer.stop(); } diff --git a/tests/test_high_producers.rs b/tests/test_high_producers.rs index 197cf765e..c0496653b 100644 --- a/tests/test_high_producers.rs +++ b/tests/test_high_producers.rs @@ -1,10 +1,4 @@ //! Test data production using high level producers. -extern crate futures; -extern crate rand; -extern crate rdkafka; - -use futures::Future; - use rdkafka::config::ClientConfig; use rdkafka::message::{Headers, Message, OwnedHeaders}; use rdkafka::producer::future_producer::FutureRecord; @@ -12,8 +6,10 @@ use rdkafka::producer::FutureProducer; use std::error::Error; -#[test] -fn test_future_producer_send_fail() { +#[tokio::test] +async fn test_future_producer_send_fail() { + let _ = env_logger::try_init(); + let producer = ClientConfig::new() .set("bootstrap.servers", "localhost") .set("produce.offset.report", "true") @@ -35,7 +31,7 @@ fn test_future_producer_send_fail() { 10000, ); - match future.wait() { + match future.await { Ok(Err((kafka_error, owned_message))) => { assert_eq!(kafka_error.description(), "Message production error"); assert_eq!(owned_message.topic(), "topic"); diff --git a/tests/test_low_producers.rs b/tests/test_low_producers.rs index b73f10381..33a3ce05b 100644 --- a/tests/test_low_producers.rs +++ b/tests/test_low_producers.rs @@ -1,8 +1,4 @@ //! Test data production using low level producers. -extern crate futures; -extern crate rand; -extern crate rdkafka; - use rdkafka::config::ClientConfig; use rdkafka::error::{KafkaError, RDKafkaError}; use rdkafka::message::{Headers, Message, OwnedMessage}; @@ -14,7 +10,7 @@ use rdkafka::{ClientContext, Statistics}; #[macro_use] mod utils; -use crate::utils::*; +use utils::*; use rdkafka::message::OwnedHeaders; use std::collections::{HashMap, HashSet}; @@ -125,6 +121,8 @@ fn threaded_producer_with_context( #[test] fn test_base_producer_queue_full() { + let _ = env_logger::try_init(); + let producer = base_producer(map!("queue.buffering.max.messages" => "10")); let topic_name = rand_test_topic(); @@ -162,6 +160,8 @@ fn test_base_producer_queue_full() { #[test] fn test_base_producer_timeout() { + let _ = env_logger::try_init(); + let context = CollectingContext::new(); let producer = base_producer_with_context( context.clone(), @@ -225,6 +225,8 @@ impl ProducerContext for HeaderCheckContext { #[test] fn test_base_producer_headers() { + let _ = env_logger::try_init(); + let ids_set = Arc::new(Mutex::new(HashSet::new())); let context = HeaderCheckContext { ids: ids_set.clone(), @@ -256,6 +258,8 @@ fn test_base_producer_headers() { #[test] fn test_threaded_producer_send() { + let _ = env_logger::try_init(); + let context = CollectingContext::new(); let producer = threaded_producer_with_context(context.clone(), HashMap::new()); let topic_name = rand_test_topic(); diff --git a/tests/test_metadata.rs b/tests/test_metadata.rs index 2a5cee53a..1b77b248d 100644 --- a/tests/test_metadata.rs +++ b/tests/test_metadata.rs @@ -1,19 +1,15 @@ //! Test metadata fetch, group membership, consumer metadata. -extern crate env_logger; -extern crate futures; -extern crate rand; -extern crate rdkafka; +use futures::StreamExt; -use futures::*; - -use rdkafka::config::ClientConfig; use rdkafka::consumer::{Consumer, StreamConsumer}; use rdkafka::topic_partition_list::TopicPartitionList; +use rdkafka::config::ClientConfig; use std::time::Duration; mod utils; -use crate::utils::*; +use utils::*; + fn create_consumer(group_id: &str) -> StreamConsumer { ClientConfig::new() @@ -28,27 +24,22 @@ fn create_consumer(group_id: &str) -> StreamConsumer { .expect("Failed to create StreamConsumer") } -#[test] -fn test_metadata() { +#[tokio::test] +async fn test_metadata() { let _r = env_logger::try_init(); let topic_name = rand_test_topic(); - populate_topic(&topic_name, 1, &value_fn, &key_fn, Some(0), None); - populate_topic(&topic_name, 1, &value_fn, &key_fn, Some(1), None); - populate_topic(&topic_name, 1, &value_fn, &key_fn, Some(2), None); + populate_topic(&topic_name, 1, &value_fn, &key_fn, Some(0), None).await; + populate_topic(&topic_name, 1, &value_fn, &key_fn, Some(1), None).await; + populate_topic(&topic_name, 1, &value_fn, &key_fn, Some(2), None).await; let consumer = create_consumer(&rand_test_group()); - let metadata = consumer - .fetch_metadata(None, Duration::from_secs(5)) - .unwrap(); + let metadata = consumer.fetch_metadata(None, Duration::from_secs(5)).unwrap(); let orig_broker_id = metadata.orig_broker_id(); // The orig_broker_id may be -1 if librdkafka's bootstrap "broker" handles // the request. if orig_broker_id != -1 && orig_broker_id != 0 { - panic!( - "metadata.orig_broker_id = {}, not 0 or 1 as expected", - orig_broker_id - ) + panic!("metadata.orig_broker_id = {}, not 0 or 1 as expected", orig_broker_id) } assert!(!metadata.orig_broker_name().is_empty()); @@ -58,15 +49,10 @@ fn test_metadata() { assert!(!broker_metadata[0].host().is_empty()); assert_eq!(broker_metadata[0].port(), 9092); - let topic_metadata = metadata - .topics() - .iter() - .find(|m| m.name() == topic_name) - .unwrap(); + let topic_metadata = metadata.topics().iter() + .find(|m| m.name() == topic_name).unwrap(); - let mut ids = topic_metadata - .partitions() - .iter() + let mut ids = topic_metadata.partitions().iter() .map(|p| { assert_eq!(p.error(), None); p.id() @@ -83,81 +69,59 @@ fn test_metadata() { assert_eq!(topic_metadata.partitions()[0].replicas(), &[0]); assert_eq!(topic_metadata.partitions()[0].isr(), &[0]); - let metadata_one_topic = consumer - .fetch_metadata(Some(&topic_name), Duration::from_secs(5)) + let metadata_one_topic = consumer.fetch_metadata(Some(&topic_name), Duration::from_secs(5)) .unwrap(); assert_eq!(metadata_one_topic.topics().len(), 1); } -#[test] -fn test_subscription() { +#[tokio::test] +async fn test_subscription() { let _r = env_logger::try_init(); let topic_name = rand_test_topic(); - populate_topic(&topic_name, 10, &value_fn, &key_fn, None, None); + populate_topic(&topic_name, 10, &value_fn, &key_fn, None, None).await; let consumer = create_consumer(&rand_test_group()); consumer.subscribe(&[topic_name.as_str()]).unwrap(); - let _consumer_future = consumer.start().take(10).wait(); + let _consumer_future = consumer.start().take(10).collect::>().await; let mut tpl = TopicPartitionList::new(); tpl.add_topic_unassigned(&topic_name); assert_eq!(tpl, consumer.subscription().unwrap()); } -#[test] -fn test_group_membership() { +#[tokio::test] +async fn test_group_membership() { let _r = env_logger::try_init(); let topic_name = rand_test_topic(); let group_name = rand_test_group(); - populate_topic(&topic_name, 1, &value_fn, &key_fn, Some(0), None); - populate_topic(&topic_name, 1, &value_fn, &key_fn, Some(1), None); - populate_topic(&topic_name, 1, &value_fn, &key_fn, Some(2), None); + populate_topic(&topic_name, 1, &value_fn, &key_fn, Some(0), None).await; + populate_topic(&topic_name, 1, &value_fn, &key_fn, Some(1), None).await; + populate_topic(&topic_name, 1, &value_fn, &key_fn, Some(2), None).await; let consumer = create_consumer(&group_name); consumer.subscribe(&[topic_name.as_str()]).unwrap(); // Make sure the consumer joins the group - let _consumer_future = consumer.start().take(1).for_each(|_| Ok(())).wait(); + let _ = consumer.start().take(1).collect::>().await; - let group_list = consumer - .fetch_group_list(None, Duration::from_secs(5)) - .unwrap(); + let group_list = consumer.fetch_group_list(None, Duration::from_secs(5)).unwrap(); // Print all the data, valgrind will check memory access for group in group_list.groups().iter() { - println!( - "{} {} {} {}", - group.name(), - group.state(), - group.protocol(), - group.protocol_type() - ); + println!("{} {} {} {}", group.name(), group.state(), group.protocol(), group.protocol_type()); for member in group.members() { - println!( - " {} {} {}", - member.id(), - member.client_id(), - member.client_host() - ); + println!(" {} {} {}", member.id(), member.client_id(), member.client_host()); } } - let group_list2 = consumer - .fetch_group_list(Some(&group_name), Duration::from_secs(5)) + let group_list2 = consumer.fetch_group_list(Some(&group_name), Duration::from_secs(5)) .unwrap(); assert_eq!(group_list2.groups().len(), 1); - let consumer_group = group_list2 - .groups() - .iter() - .find(|&g| g.name() == group_name) - .unwrap(); + let consumer_group = group_list2.groups().iter().find(|&g| g.name() == group_name).unwrap(); assert_eq!(consumer_group.members().len(), 1); let consumer_member = &consumer_group.members()[0]; - assert_eq!( - consumer_member.client_id(), - "rdkafka_integration_test_client" - ); + assert_eq!(consumer_member.client_id(), "rdkafka_integration_test_client"); } diff --git a/tests/utils.rs b/tests/utils.rs index 4d6270228..52b2a9cd8 100644 --- a/tests/utils.rs +++ b/tests/utils.rs @@ -1,10 +1,6 @@ #![allow(dead_code)] -extern crate futures; -extern crate rand; -extern crate rdkafka; -extern crate regex; - -use futures::*; +use lazy_static::lazy_static; +use log::*; use rand::Rng; use regex::Regex; @@ -90,7 +86,7 @@ impl ClientContext for TestContext { /// Produce the specified count of messages to the topic and partition specified. A map /// of (partition, offset) -> message id will be returned. It panics if any error is encountered /// while populating the topic. -pub fn populate_topic( +pub async fn populate_topic<'a, P, K, J, Q>( topic_name: &str, count: i32, value_fn: &P, @@ -99,16 +95,18 @@ pub fn populate_topic( timestamp: Option, ) -> HashMap<(i32, i64), i32> where - P: Fn(i32) -> J, - K: Fn(i32) -> Q, - J: ToBytes, - Q: ToBytes, + P: Fn(i32) -> &'a J, + K: Fn(i32) -> &'a Q, + J: ToBytes + 'a, + Q: ToBytes + 'a, { let prod_context = TestContext { _some_data: 1234 }; // Produce some messages + let bootstrap_servers = get_bootstrap_server(); + info!("Connecting to kafka at {}", bootstrap_servers); let producer = ClientConfig::new() - .set("bootstrap.servers", get_bootstrap_server().as_str()) + .set("bootstrap.servers", bootstrap_servers.as_str()) .set("statistics.interval.ms", "500") .set("api.version.request", "true") .set("debug", "all") @@ -117,51 +115,60 @@ where .create_with_context::>(prod_context) .expect("Producer creation error"); - let futures = (0..count) + let results = (0..count) .map(|id| { - let future = producer.send( + (id, producer.send( FutureRecord { topic: topic_name, - payload: Some(&value_fn(id)), - key: Some(&key_fn(id)), + payload: Some(value_fn(id)), + key: Some(key_fn(id)), partition, timestamp, headers: None, }, 1000, - ); - (id, future) + )) }) .collect::>(); let mut message_map = HashMap::new(); - for (id, future) in futures { - match future.wait() { - Ok(Ok((partition, offset))) => message_map.insert((partition, offset), id), - Ok(Err((kafka_error, _message))) => panic!("Delivery failed: {}", kafka_error), - Err(e) => panic!("Waiting for future failed: {}", e), + for (id, f) in results { + match f.await { + Ok(Ok((partition, offset))) => { + debug!("Successfully produced record {} to partition {} with offset {}", id, partition, offset); + message_map.insert((partition, offset), id) + }, + Ok(Err((kafka_error, _message))) => panic!("Delivery failed for record {}: {}", id, kafka_error), + Err(e) => panic!("Waiting for future failed for record {}: {}", id, e), }; } message_map } -pub fn value_fn(id: i32) -> String { - format!("Message {}", id) +lazy_static! { + static ref VALUES: Vec = (0..100).map(|i| format!("Message {}", i)).collect(); + static ref KEYS: Vec = (0..100).map(|i| format!("Key {}", i)).collect(); +} + +pub fn value_fn(id: i32) -> &'static String { + VALUES.get(id as usize).unwrap() } -pub fn key_fn(id: i32) -> String { - format!("Key {}", id) +pub fn key_fn(id: i32) -> &'static String { + KEYS.get(id as usize).unwrap() } #[cfg(test)] mod tests { use super::*; - #[test] - fn test_populate_topic() { + #[tokio::test] + async fn test_populate_topic() { + let _ = env_logger::try_init(); + let topic_name = rand_test_topic(); - let message_map = populate_topic(&topic_name, 100, &value_fn, &key_fn, Some(0), None); + let message_map = populate_topic(&topic_name, 100, &value_fn, &key_fn, Some(0), None).await; let total_messages = message_map .iter()