From 38b440db70d382eb1d80f1c24ae0dc380087c569 Mon Sep 17 00:00:00 2001 From: Nikhil Benesch Date: Thu, 5 Dec 2019 13:39:38 -0500 Subject: [PATCH] Upgrade to async/await ecosystem Upgrade to the new async/await ecosystem, which includes std::future::Future, futures 0.3, tokio 0.2, and the new async/await keywords. This will bump the MSRV to Rust 1.39. Co-authored-by: Ben Sully Co-authored-by: Danny Browning --- Cargo.toml | 4 +- changelog.md | 13 ++- examples/asynchronous_processing.rs | 100 +++++++----------- examples/at_least_once.rs | 23 ++-- examples/simple_consumer.rs | 18 ++-- examples/simple_producer.rs | 9 +- src/admin.rs | 157 ++++++++++++++-------------- src/client.rs | 20 +++- src/consumer/stream_consumer.rs | 35 ++++--- src/producer/future_producer.rs | 27 +++-- tests/test_admin.rs | 96 ++++++++--------- tests/test_consumers.rs | 104 +++++++++--------- tests/test_high_producers.rs | 8 +- tests/test_metadata.rs | 33 +++--- tests/utils.rs | 11 +- 15 files changed, 330 insertions(+), 328 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index d04cda20a..e55cad8e3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,7 +12,7 @@ edition = "2018" [dependencies] rdkafka-sys = { path = "rdkafka-sys", version = "1.2.1", default-features = false } -futures = "0.1.21" +futures = "0.3.0" libc = "0.2.0" log = "0.4.8" serde = "1.0.0" @@ -26,7 +26,7 @@ clap = "2.18.0" env_logger = "0.7.1" rand = "0.3.15" regex = "1.1.6" -tokio = "0.1.7" +tokio = { version = "0.2", features = ["blocking", "macros", "rt-core", "time"] } # These features are re-exports of the features that the rdkafka-sys crate # provides. See the rdkafka-sys documentation for details. diff --git a/changelog.md b/changelog.md index 28821c000..0754b49a7 100644 --- a/changelog.md +++ b/changelog.md @@ -1,6 +1,17 @@ # Changelog - +## Unreleased + +* Upgrade to the async/await ecosystem, including `std::future::Future`, v0.3 + of the futures crate, and v0.2 of Tokio. The minimum supported Rust version + is now Rust 1.39. Special thanks to [@sd2k] and [@dbcfd]. ([#186]) + +[#186]: https://github.com/fede1024/rust-rdkafka/pull/183 + +[@sd2k]: https://github.com/sd2k +[@dbcfd]: https://github.com/dbcfd + + ## 0.22.0 (2019-12-01) * Add a client for Kafka's Admin API, which allows actions like creating and diff --git a/examples/asynchronous_processing.rs b/examples/asynchronous_processing.rs index 0e29d791b..f524c490b 100644 --- a/examples/asynchronous_processing.rs +++ b/examples/asynchronous_processing.rs @@ -7,8 +7,7 @@ extern crate rdkafka; extern crate tokio; use clap::{App, Arg}; -use futures::{lazy, Future, Stream}; -use tokio::runtime::current_thread; +use futures::{future, TryStreamExt}; use rdkafka::config::ClientConfig; use rdkafka::consumer::stream_consumer::StreamConsumer; @@ -43,10 +42,10 @@ fn expensive_computation<'a>(msg: OwnedMessage) -> String { // 2) filter out eventual Kafka errors. // 3) send the message to a thread pool for processing. // 4) produce the result to the output topic. -// Moving each message from one stage of the pipeline to next one is handled by the event loop, -// that runs on a single thread. The expensive CPU-bound computation is handled by the `ThreadPool`, -// without blocking the event loop. -fn run_async_processor(brokers: &str, group_id: &str, input_topic: &str, output_topic: &str) { +// `tokio::spawn` is used to handle IO-bound tasks in parallel (e.g., producing +// the messages), while `tokio::task::spawn_blocking` is used to handle the +// simulated CPU-bound task. +async fn run_async_processor(brokers: &str, group_id: &str, input_topic: &str, output_topic: &str) { // Create the `StreamConsumer`, to receive the messages from the topic in form of a `Stream`. let consumer: StreamConsumer = ClientConfig::new() .set("group.id", group_id) @@ -68,70 +67,45 @@ fn run_async_processor(brokers: &str, group_id: &str, input_topic: &str, output_ .create() .expect("Producer creation error"); - // Create the runtime where the expensive computation will be performed. - let mut thread_pool = tokio::runtime::Builder::new() - .name_prefix("pool-") - .core_threads(4) - .build() - .unwrap(); - - // Use the current thread as IO thread to drive consumer and producer. - let mut io_thread = current_thread::Runtime::new().unwrap(); - let io_thread_handle = io_thread.handle(); - // Create the outer pipeline on the message stream. - let stream_processor = consumer - .start() - .filter_map(|result| { - // Filter out errors - match result { - Ok(msg) => Some(msg), - Err(kafka_error) => { - warn!("Error while receiving from Kafka: {:?}", kafka_error); - None - } + let stream_processor = consumer.start().try_for_each(|borrowed_message| { + // Process each message + info!("Message received: {}", borrowed_message.offset()); + // Borrowed messages can't outlive the consumer they are received from, so they need to + // be owned in order to be sent to a separate thread. + let owned_message = borrowed_message.detach(); + let output_topic = output_topic.to_string(); + let producer = producer.clone(); + tokio::spawn(async move { + // The body of this block will be executed on the main thread pool, + // but we perform `expensive_computation` on a separate thread pool + // for CPU-intensive tasks via `tokio::task::spawn_blocking`. + let computation_result = + tokio::task::spawn_blocking(|| expensive_computation(owned_message)) + .await + .expect("failed to wait for expensive computation"); + let produce_future = producer.send( + FutureRecord::to(&output_topic) + .key("some key") + .payload(&computation_result), + 0, + ); + match produce_future.await { + Ok(Ok(delivery)) => println!("Sent: {:?}", delivery), + Ok(Err((e, _))) => println!("Error: {:?}", e), + Err(_) => println!("Future cancelled"), } - }) - .for_each(move |borrowed_message| { - // Process each message - info!("Message received: {}", borrowed_message.offset()); - // Borrowed messages can't outlive the consumer they are received from, so they need to - // be owned in order to be sent to a separate thread. - let owned_message = borrowed_message.detach(); - let output_topic = output_topic.to_string(); - let producer = producer.clone(); - let io_thread_handle = io_thread_handle.clone(); - let message_future = lazy(move || { - // The body of this closure will be executed in the thread pool. - let computation_result = expensive_computation(owned_message); - let producer_future = producer - .send( - FutureRecord::to(&output_topic) - .key("some key") - .payload(&computation_result), - 0, - ) - .then(|result| { - match result { - Ok(Ok(delivery)) => println!("Sent: {:?}", delivery), - Ok(Err((e, _))) => println!("Error: {:?}", e), - Err(_) => println!("Future cancelled"), - } - Ok(()) - }); - let _ = io_thread_handle.spawn(producer_future); - Ok(()) - }); - thread_pool.spawn(message_future); - Ok(()) }); + future::ready(Ok(())) + }); info!("Starting event loop"); - let _ = io_thread.block_on(stream_processor); + stream_processor.await.expect("stream processing failed"); info!("Stream processing terminated"); } -fn main() { +#[tokio::main] +async fn main() { let matches = App::new("Async example") .version(option_env!("CARGO_PKG_VERSION").unwrap_or("")) .about("Asynchronous computation example") @@ -180,5 +154,5 @@ fn main() { let input_topic = matches.value_of("input-topic").unwrap(); let output_topic = matches.value_of("output-topic").unwrap(); - run_async_processor(brokers, group_id, input_topic, output_topic); + run_async_processor(brokers, group_id, input_topic, output_topic).await } diff --git a/examples/at_least_once.rs b/examples/at_least_once.rs index 7b942d027..fed5c3232 100644 --- a/examples/at_least_once.rs +++ b/examples/at_least_once.rs @@ -19,9 +19,8 @@ extern crate rdkafka; extern crate rdkafka_sys; use clap::{App, Arg}; -use futures::future::join_all; -use futures::stream::Stream; -use futures::Future; +use futures::future; +use futures::stream::StreamExt; use rdkafka::client::ClientContext; use rdkafka::config::{ClientConfig, RDKafkaLogLevel}; @@ -89,7 +88,8 @@ fn create_producer(brokers: &str) -> FutureProducer { .expect("Producer creation failed") } -fn main() { +#[tokio::main] +async fn main() { let matches = App::new("at-least-once") .version(option_env!("CARGO_PKG_VERSION").unwrap_or("")) .about("At-least-once delivery example") @@ -148,18 +148,17 @@ fn main() { let consumer = create_consumer(brokers, group_id, input_topic); let producer = create_producer(brokers); - for message in consumer.start().wait() { + let mut stream = consumer.start(); + + while let Some(message) = stream.next().await { match message { - Err(()) => { - warn!("Error while reading from stream"); - } - Ok(Err(e)) => { + Err(e) => { warn!("Kafka error: {}", e); } - Ok(Ok(m)) => { + Ok(m) => { // Send a copy to the message to every output topic in parallel, and wait for the // delivery report to be received. - join_all(output_topics.iter().map(|output_topic| { + future::try_join_all(output_topics.iter().map(|output_topic| { let mut record = FutureRecord::to(output_topic); if let Some(p) = m.payload() { record = record.payload(p); @@ -169,7 +168,7 @@ fn main() { } producer.send(record, 1000) })) - .wait() + .await .expect("Message delivery failed for some topic"); // Now that the message is completely processed, add it's position to the offset // store. The actual offset will be committed every 5 seconds. diff --git a/examples/simple_consumer.rs b/examples/simple_consumer.rs index d2b78820b..782ef389f 100644 --- a/examples/simple_consumer.rs +++ b/examples/simple_consumer.rs @@ -6,7 +6,7 @@ extern crate rdkafka; extern crate rdkafka_sys; use clap::{App, Arg}; -use futures::stream::Stream; +use futures::StreamExt; use rdkafka::client::ClientContext; use rdkafka::config::{ClientConfig, RDKafkaLogLevel}; @@ -47,7 +47,7 @@ impl ConsumerContext for CustomContext { // A type alias with your custom consumer can be created for convenience. type LoggingConsumer = StreamConsumer; -fn consume_and_print(brokers: &str, group_id: &str, topics: &[&str]) { +async fn consume_and_print(brokers: &str, group_id: &str, topics: &[&str]) { let context = CustomContext; let consumer: LoggingConsumer = ClientConfig::new() @@ -68,13 +68,12 @@ fn consume_and_print(brokers: &str, group_id: &str, topics: &[&str]) { // consumer.start() returns a stream. The stream can be used ot chain together expensive steps, // such as complex computations on a thread pool or asynchronous IO. - let message_stream = consumer.start(); + let mut message_stream = consumer.start(); - for message in message_stream.wait() { + while let Some(message) = message_stream.next().await { match message { - Err(_) => warn!("Error while reading from stream."), - Ok(Err(e)) => warn!("Kafka error: {}", e), - Ok(Ok(m)) => { + Err(e) => warn!("Kafka error: {}", e), + Ok(m) => { let payload = match m.payload_view::() { None => "", Some(Ok(s)) => s, @@ -97,7 +96,8 @@ fn consume_and_print(brokers: &str, group_id: &str, topics: &[&str]) { } } -fn main() { +#[tokio::main] +async fn main() { let matches = App::new("consumer example") .version(option_env!("CARGO_PKG_VERSION").unwrap_or("")) .about("Simple command line consumer") @@ -143,5 +143,5 @@ fn main() { let brokers = matches.value_of("brokers").unwrap(); let group_id = matches.value_of("group-id").unwrap(); - consume_and_print(brokers, group_id, &topics); + consume_and_print(brokers, group_id, &topics).await } diff --git a/examples/simple_producer.rs b/examples/simple_producer.rs index 60e53adc2..90814034c 100644 --- a/examples/simple_producer.rs +++ b/examples/simple_producer.rs @@ -15,7 +15,7 @@ mod example_utils; use crate::example_utils::setup_logger; use rdkafka::message::OwnedHeaders; -fn produce(brokers: &str, topic_name: &str) { +async fn produce(brokers: &str, topic_name: &str) { let producer: FutureProducer = ClientConfig::new() .set("bootstrap.servers", brokers) .set("message.timeout.ms", "5000") @@ -46,11 +46,12 @@ fn produce(brokers: &str, topic_name: &str) { // This loop will wait until all delivery statuses have been received received. for future in futures { - info!("Future completed. Result: {:?}", future.wait()); + info!("Future completed. Result: {:?}", future.await); } } -fn main() { +#[tokio::main] +async fn main() { let matches = App::new("producer example") .version(option_env!("CARGO_PKG_VERSION").unwrap_or("")) .about("Simple command line producer") @@ -86,5 +87,5 @@ fn main() { let topic = matches.value_of("topic").unwrap(); let brokers = matches.value_of("brokers").unwrap(); - produce(brokers, topic); + produce(brokers, topic).await; } diff --git a/src/admin.rs b/src/admin.rs index 8065678e7..aa43000c6 100644 --- a/src/admin.rs +++ b/src/admin.rs @@ -12,14 +12,18 @@ use crate::config::{ClientConfig, FromClientConfig, FromClientConfigAndContext}; use crate::error::{IsError, KafkaError, KafkaResult}; use crate::util::{cstr_to_owned, AsCArray, ErrBuf, IntoOpaque, Timeout, WrappedCPointer}; +use futures::channel::oneshot; use futures::future::{self, Either}; -use futures::{Async, Canceled, Complete, Future, Oneshot, Poll}; +use futures::FutureExt; use std::collections::HashMap; use std::ffi::{CStr, CString}; +use std::future::Future; use std::mem; +use std::pin::Pin; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; +use std::task::{Context, Poll}; use std::thread::{self, JoinHandle}; use std::time::Duration; @@ -48,13 +52,13 @@ impl AdminClient { &self, topics: I, opts: &AdminOptions, - ) -> impl Future, Error = KafkaError> + ) -> impl Future>> where I: IntoIterator>, { match self.create_topics_inner(topics, opts) { - Ok(rx) => Either::A(CreateTopicsFuture { rx }), - Err(err) => Either::B(future::err(err)), + Ok(rx) => Either::Left(CreateTopicsFuture { rx }), + Err(err) => Either::Right(future::err(err)), } } @@ -62,7 +66,7 @@ impl AdminClient { &self, topics: I, opts: &AdminOptions, - ) -> KafkaResult> + ) -> KafkaResult> where I: IntoIterator>, { @@ -93,10 +97,10 @@ impl AdminClient { &self, topic_names: &[&str], opts: &AdminOptions, - ) -> impl Future, Error = KafkaError> { + ) -> impl Future>> { match self.delete_topics_inner(topic_names, opts) { - Ok(rx) => Either::A(DeleteTopicsFuture { rx }), - Err(err) => Either::B(future::err(err)), + Ok(rx) => Either::Left(DeleteTopicsFuture { rx }), + Err(err) => Either::Right(future::err(err)), } } @@ -104,7 +108,7 @@ impl AdminClient { &self, topic_names: &[&str], opts: &AdminOptions, - ) -> KafkaResult> { + ) -> KafkaResult> { let mut native_topics = Vec::new(); let mut err_buf = ErrBuf::new(); for tn in topic_names { @@ -138,13 +142,13 @@ impl AdminClient { &self, partitions: I, opts: &AdminOptions, - ) -> impl Future, Error = KafkaError> + ) -> impl Future>> where I: IntoIterator>, { match self.create_partitions_inner(partitions, opts) { - Ok(rx) => Either::A(CreatePartitionsFuture { rx }), - Err(err) => Either::B(future::err(err)), + Ok(rx) => Either::Left(CreatePartitionsFuture { rx }), + Err(err) => Either::Right(future::err(err)), } } @@ -152,7 +156,7 @@ impl AdminClient { &self, partitions: I, opts: &AdminOptions, - ) -> KafkaResult> + ) -> KafkaResult> where I: IntoIterator>, { @@ -183,13 +187,13 @@ impl AdminClient { &self, configs: I, opts: &AdminOptions, - ) -> impl Future, Error = KafkaError> + ) -> impl Future>> where I: IntoIterator>, { match self.describe_configs_inner(configs, opts) { - Ok(rx) => Either::A(DescribeConfigsFuture { rx }), - Err(err) => Either::B(future::err(err)), + Ok(rx) => Either::Left(DescribeConfigsFuture { rx }), + Err(err) => Either::Right(future::err(err)), } } @@ -197,7 +201,7 @@ impl AdminClient { &self, configs: I, opts: &AdminOptions, - ) -> KafkaResult> + ) -> KafkaResult> where I: IntoIterator>, { @@ -247,13 +251,13 @@ impl AdminClient { &self, configs: I, opts: &AdminOptions, - ) -> impl Future, Error = KafkaError> + ) -> impl Future>> where I: IntoIterator>, { match self.alter_configs_inner(configs, opts) { - Ok(rx) => Either::A(AlterConfigsFuture { rx }), - Err(err) => Either::B(future::err(err)), + Ok(rx) => Either::Left(AlterConfigsFuture { rx }), + Err(err) => Either::Right(future::err(err)), } } @@ -261,7 +265,7 @@ impl AdminClient { &self, configs: I, opts: &AdminOptions, - ) -> KafkaResult> + ) -> KafkaResult> where I: IntoIterator>, { @@ -344,7 +348,7 @@ fn start_poll_thread(queue: Arc, should_stop: Arc) -> J continue; } let event = unsafe { NativeEvent::from_ptr(event) }; - let tx: Box> = + let tx: Box> = unsafe { IntoOpaque::from_ptr(rdsys::rd_kafka_event_opaque(event.ptr())) }; let _ = tx.send(event); } @@ -457,7 +461,7 @@ impl AdminOptions { &self, client: *mut RDKafka, err_buf: &mut ErrBuf, - ) -> KafkaResult<(NativeAdminOptions, Oneshot)> { + ) -> KafkaResult<(NativeAdminOptions, oneshot::Receiver)> { let native_opts = unsafe { NativeAdminOptions::from_ptr(rdsys::rd_kafka_AdminOptions_new( client, @@ -513,7 +517,7 @@ impl AdminOptions { check_rdkafka_invalid_arg(res, err_buf)?; } - let (tx, rx) = futures::oneshot(); + let (tx, rx) = oneshot::channel(); let tx = Box::new(tx); unsafe { rdsys::rd_kafka_AdminOptions_set_opaque(native_opts.ptr, IntoOpaque::as_ptr(&tx)) @@ -734,31 +738,30 @@ impl Drop for NativeNewTopic { } struct CreateTopicsFuture { - rx: Oneshot, + rx: oneshot::Receiver, } impl Future for CreateTopicsFuture { - type Item = Vec; - type Error = KafkaError; + type Output = KafkaResult>; - fn poll(&mut self) -> Poll { - match self.rx.poll() { - Ok(Async::Ready(event)) => { + fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { + match self.rx.poll_unpin(cx) { + Poll::Ready(Ok(event)) => { event.check_error()?; let res = unsafe { rdsys::rd_kafka_event_CreateTopics_result(event.ptr()) }; if res.is_null() { let typ = unsafe { rdsys::rd_kafka_event_type(event.ptr()) }; - return Err(KafkaError::AdminOpCreation(format!( + return Poll::Ready(Err(KafkaError::AdminOpCreation(format!( "create topics request received response of incorrect type ({})", typ - ))); + )))); } let mut n = 0; let topics = unsafe { rdsys::rd_kafka_CreateTopics_result_topics(res, &mut n) }; - Ok(Async::Ready(build_topic_results(topics, n))) + Poll::Ready(Ok(build_topic_results(topics, n))) } - Ok(Async::NotReady) => Ok(Async::NotReady), - Err(Canceled) => Err(KafkaError::Canceled), + Poll::Pending => Poll::Pending, + Poll::Ready(Err(oneshot::Canceled)) => Poll::Ready(Err(KafkaError::Canceled)), } } } @@ -797,31 +800,30 @@ impl Drop for NativeDeleteTopic { } struct DeleteTopicsFuture { - rx: Oneshot, + rx: oneshot::Receiver, } impl Future for DeleteTopicsFuture { - type Item = Vec; - type Error = KafkaError; + type Output = KafkaResult>; - fn poll(&mut self) -> Poll { - match self.rx.poll() { - Ok(Async::Ready(event)) => { + fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { + match self.rx.poll_unpin(cx) { + Poll::Ready(Ok(event)) => { event.check_error()?; let res = unsafe { rdsys::rd_kafka_event_DeleteTopics_result(event.ptr()) }; if res.is_null() { let typ = unsafe { rdsys::rd_kafka_event_type(event.ptr()) }; - return Err(KafkaError::AdminOpCreation(format!( + return Poll::Ready(Err(KafkaError::AdminOpCreation(format!( "delete topics request received response of incorrect type ({})", typ - ))); + )))); } let mut n = 0; let topics = unsafe { rdsys::rd_kafka_DeleteTopics_result_topics(res, &mut n) }; - Ok(Async::Ready(build_topic_results(topics, n))) + Poll::Ready(Ok(build_topic_results(topics, n))) } - Ok(Async::NotReady) => Ok(Async::NotReady), - Err(Canceled) => Err(KafkaError::Canceled), + Poll::Pending => Poll::Pending, + Poll::Ready(Err(oneshot::Canceled)) => Poll::Ready(Err(KafkaError::Canceled)), } } } @@ -941,31 +943,30 @@ impl Drop for NativeNewPartitions { } struct CreatePartitionsFuture { - rx: Oneshot, + rx: oneshot::Receiver, } impl Future for CreatePartitionsFuture { - type Item = Vec; - type Error = KafkaError; + type Output = KafkaResult>; - fn poll(&mut self) -> Poll { - match self.rx.poll() { - Ok(Async::Ready(event)) => { + fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { + match self.rx.poll_unpin(cx) { + Poll::Ready(Ok(event)) => { event.check_error()?; let res = unsafe { rdsys::rd_kafka_event_CreatePartitions_result(event.ptr()) }; if res.is_null() { let typ = unsafe { rdsys::rd_kafka_event_type(event.ptr()) }; - return Err(KafkaError::AdminOpCreation(format!( + return Poll::Ready(Err(KafkaError::AdminOpCreation(format!( "create partitions request received response of incorrect type ({})", typ - ))); + )))); } let mut n = 0; let topics = unsafe { rdsys::rd_kafka_CreatePartitions_result_topics(res, &mut n) }; - Ok(Async::Ready(build_topic_results(topics, n))) + Poll::Ready(Ok(build_topic_results(topics, n))) } - Ok(Async::NotReady) => Ok(Async::NotReady), - Err(Canceled) => Err(KafkaError::Canceled), + Poll::Pending => Poll::Pending, + Poll::Ready(Err(oneshot::Canceled)) => Poll::Ready(Err(KafkaError::Canceled)), } } } @@ -1143,24 +1144,23 @@ fn extract_config_source(config_source: RDKafkaConfigSource) -> KafkaResult, + rx: oneshot::Receiver, } impl Future for DescribeConfigsFuture { - type Item = Vec; - type Error = KafkaError; + type Output = KafkaResult>; - fn poll(&mut self) -> Poll { - match self.rx.poll() { - Ok(Async::Ready(event)) => { + fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { + match self.rx.poll_unpin(cx) { + Poll::Ready(Ok(event)) => { event.check_error()?; let res = unsafe { rdsys::rd_kafka_event_DescribeConfigs_result(event.ptr()) }; if res.is_null() { let typ = unsafe { rdsys::rd_kafka_event_type(event.ptr()) }; - return Err(KafkaError::AdminOpCreation(format!( + return Poll::Ready(Err(KafkaError::AdminOpCreation(format!( "describe configs request received response of incorrect type ({})", typ - ))); + )))); } let mut n = 0; let resources = @@ -1206,10 +1206,10 @@ impl Future for DescribeConfigsFuture { entries: entries_out, })) } - Ok(Async::Ready(out)) + Poll::Ready(Ok(out)) } - Ok(Async::NotReady) => Ok(Async::NotReady), - Err(Canceled) => Err(KafkaError::Canceled), + Poll::Pending => Poll::Pending, + Poll::Ready(Err(oneshot::Canceled)) => Poll::Ready(Err(KafkaError::Canceled)), } } } @@ -1282,24 +1282,23 @@ impl<'a> AlterConfig<'a> { } struct AlterConfigsFuture { - rx: Oneshot, + rx: oneshot::Receiver, } impl Future for AlterConfigsFuture { - type Item = Vec; - type Error = KafkaError; + type Output = KafkaResult>; - fn poll(&mut self) -> Poll { - match self.rx.poll() { - Ok(Async::Ready(event)) => { + fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { + match self.rx.poll_unpin(cx) { + Poll::Ready(Ok(event)) => { event.check_error()?; let res = unsafe { rdsys::rd_kafka_event_AlterConfigs_result(event.ptr()) }; if res.is_null() { let typ = unsafe { rdsys::rd_kafka_event_type(event.ptr()) }; - return Err(KafkaError::AdminOpCreation(format!( + return Poll::Ready(Err(KafkaError::AdminOpCreation(format!( "alter configs request received response of incorrect type ({})", typ - ))); + )))); } let mut n = 0; let resources = @@ -1310,10 +1309,10 @@ impl Future for AlterConfigsFuture { let specifier = extract_config_specifier(resource)?; out.push(Ok(specifier)); } - Ok(Async::Ready(out)) + Poll::Ready(Ok(out)) } - Ok(Async::NotReady) => Ok(Async::NotReady), - Err(Canceled) => Err(KafkaError::Canceled), + Poll::Pending => Poll::Pending, + Poll::Ready(Err(oneshot::Canceled)) => Poll::Ready(Err(KafkaError::Canceled)), } } } diff --git a/src/client.rs b/src/client.rs index 337680724..f46ac5158 100644 --- a/src/client.rs +++ b/src/client.rs @@ -34,11 +34,21 @@ pub trait ClientContext: Send + Sync { RDKafkaLogLevel::Emerg | RDKafkaLogLevel::Alert | RDKafkaLogLevel::Critical - | RDKafkaLogLevel::Error => error!(target: "librdkafka", "librdkafka: {} {}", fac, log_message), - RDKafkaLogLevel::Warning => warn!(target: "librdkafka", "librdkafka: {} {}", fac, log_message), - RDKafkaLogLevel::Notice => info!(target: "librdkafka", "librdkafka: {} {}", fac, log_message), - RDKafkaLogLevel::Info => info!(target: "librdkafka", "librdkafka: {} {}", fac, log_message), - RDKafkaLogLevel::Debug => debug!(target: "librdkafka", "librdkafka: {} {}", fac, log_message), + | RDKafkaLogLevel::Error => { + error!(target: "librdkafka", "librdkafka: {} {}", fac, log_message) + } + RDKafkaLogLevel::Warning => { + warn!(target: "librdkafka", "librdkafka: {} {}", fac, log_message) + } + RDKafkaLogLevel::Notice => { + info!(target: "librdkafka", "librdkafka: {} {}", fac, log_message) + } + RDKafkaLogLevel::Info => { + info!(target: "librdkafka", "librdkafka: {} {}", fac, log_message) + } + RDKafkaLogLevel::Debug => { + debug!(target: "librdkafka", "librdkafka: {} {}", fac, log_message) + } } } diff --git a/src/consumer/stream_consumer.rs b/src/consumer/stream_consumer.rs index b8f2ec101..f01a3b991 100644 --- a/src/consumer/stream_consumer.rs +++ b/src/consumer/stream_consumer.rs @@ -1,8 +1,7 @@ //! Stream-based consumer implementation. + use crate::rdsys; use crate::rdsys::types::*; -use futures::sync::mpsc; -use futures::{Future, Poll, Sink, Stream}; use crate::config::{ClientConfig, FromClientConfig, FromClientConfigAndContext}; use crate::consumer::base_consumer::BaseConsumer; @@ -11,9 +10,15 @@ use crate::error::{KafkaError, KafkaResult}; use crate::message::BorrowedMessage; use crate::util::Timeout; +use futures::channel::mpsc; +use futures::{SinkExt, Stream, StreamExt}; +// use futures::{Future, Poll, Sink, Stream}; + +use std::pin::Pin; use std::ptr; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{Arc, Mutex}; +use std::task::{Context, Poll}; use std::thread::{self, JoinHandle}; use std::time::Duration; @@ -90,15 +95,12 @@ impl<'a, C: ConsumerContext + 'static> MessageStream<'a, C> { impl<'a, C: ConsumerContext + 'a> Stream for MessageStream<'a, C> { type Item = KafkaResult>; - type Error = (); - - fn poll(&mut self) -> Poll, Self::Error> { - self.receiver.poll().map(|ready| { - ready.map(|option| { - option.map(|polled_ptr_opt| { - polled_ptr_opt.map_or(Err(KafkaError::NoMessageReceived), |polled_ptr| { - polled_ptr.into_message_of(self.consumer) - }) + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + self.receiver.poll_next_unpin(cx).map(|ready| { + ready.map(|polled_ptr_opt| { + polled_ptr_opt.map_or(Err(KafkaError::NoMessageReceived), |polled_ptr| { + polled_ptr.into_message_of(self.consumer) }) }) }) @@ -109,27 +111,26 @@ impl<'a, C: ConsumerContext + 'a> Stream for MessageStream<'a, C> { /// If `send_none` is true, the loop will send a None into the sender every time the poll times out. fn poll_loop( consumer: &BaseConsumer, - sender: mpsc::Sender>, + mut sender: mpsc::Sender>, should_stop: &AtomicBool, poll_interval: Duration, send_none: bool, ) { trace!("Polling thread loop started"); - let mut curr_sender = sender; while !should_stop.load(Ordering::Relaxed) { trace!("Polling base consumer"); let future_sender = match consumer.poll_raw(Timeout::After(poll_interval)) { None => { if send_none { - curr_sender.send(None) + sender.send(None) } else { continue; // TODO: check stream closed } } - Some(m_ptr) => curr_sender.send(Some(PolledMessagePtr::new(m_ptr))), + Some(m_ptr) => sender.send(Some(PolledMessagePtr::new(m_ptr))), }; - match future_sender.wait() { - Ok(new_sender) => curr_sender = new_sender, + match futures::executor::block_on(future_sender) { + Ok(()) => (), Err(e) => { debug!("Sender not available: {:?}", e); break; diff --git a/src/producer/future_producer.rs b/src/producer/future_producer.rs index e3d2ef274..0b0d046a1 100644 --- a/src/producer/future_producer.rs +++ b/src/producer/future_producer.rs @@ -11,9 +11,13 @@ use crate::producer::{BaseRecord, DeliveryResult, ProducerContext, ThreadedProdu use crate::statistics::Statistics; use crate::util::{IntoOpaque, Timeout}; -use futures::{self, Canceled, Complete, Future, Oneshot, Poll}; +use futures::channel::oneshot; +use futures::FutureExt; +use std::future::Future; +use std::pin::Pin; use std::sync::Arc; +use std::task::{Context, Poll}; use std::time::{Duration, Instant}; // @@ -137,9 +141,13 @@ impl ClientContext for FutureProducerContext { } impl ProducerContext for FutureProducerContext { - type DeliveryOpaque = Box>; + type DeliveryOpaque = Box>; - fn delivery(&self, delivery_result: &DeliveryResult, tx: Box>) { + fn delivery( + &self, + delivery_result: &DeliveryResult, + tx: Box>, + ) { let owned_delivery_result = match *delivery_result { Ok(ref message) => Ok((message.partition(), message.offset())), Err((ref error, ref message)) => Err((error.clone(), message.detach())), @@ -195,15 +203,14 @@ impl FromClientConfigAndContext for FutureProduce /// Once completed, the future will contain an `OwnedDeliveryResult` with information on the /// delivery status of the message. pub struct DeliveryFuture { - rx: Oneshot, + rx: oneshot::Receiver, } impl Future for DeliveryFuture { - type Item = OwnedDeliveryResult; - type Error = Canceled; + type Output = Result; - fn poll(&mut self) -> Poll { - self.rx.poll() + fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { + self.rx.poll_unpin(cx) } } @@ -220,7 +227,7 @@ impl FutureProducer { { let start_time = Instant::now(); - let (tx, rx) = futures::oneshot(); + let (tx, rx) = oneshot::channel(); let mut base_record = record.into_base_record(Box::new(tx)); loop { @@ -266,7 +273,7 @@ impl FutureProducer { K: ToBytes + ?Sized, P: ToBytes + ?Sized, { - let (tx, rx) = futures::oneshot(); + let (tx, rx) = oneshot::channel(); let base_record = record.into_base_record(Box::new(tx)); self.producer .send(base_record) diff --git a/tests/test_admin.rs b/tests/test_admin.rs index a64e6a198..eb3550aa4 100644 --- a/tests/test_admin.rs +++ b/tests/test_admin.rs @@ -2,8 +2,6 @@ use backoff::{ExponentialBackoff, Operation}; -use futures::Future; - use std::time::Duration; use rdkafka::admin::{ @@ -78,8 +76,8 @@ fn verify_delete(topic: &str) { .unwrap() } -#[test] -fn test_topics() { +#[tokio::test] +async fn test_topics() { let admin_client = create_admin_client(); let opts = AdminOptions::new().operation_timeout(Some(Duration::from_secs(1))); @@ -101,7 +99,7 @@ fn test_topics() { let res = admin_client .create_topics(&[topic1, topic2], &opts) - .wait() + .await .expect("topic creation failed"); assert_eq!(res, &[Ok(name1.clone()), Ok(name2.clone())]); @@ -124,7 +122,7 @@ fn test_topics() { ], &opts, ) - .wait() + .await .expect("describe configs failed"); let config1 = &res[0].as_ref().expect("describe configs failed on topic 1"); let config2 = &res[1].as_ref().expect("describe configs failed on topic 2"); @@ -165,27 +163,28 @@ fn test_topics() { let partitions1 = NewPartitions::new(&name1, 5); let res = admin_client .create_partitions(&[partitions1], &opts) - .wait() + .await .expect("partition creation failed"); assert_eq!(res, &[Ok(name1.clone())]); - let mut backoff = ExponentialBackoff::default(); - backoff.max_elapsed_time = Some(Duration::from_secs(5)); - (|| { + let mut tries = 0; + loop { let metadata = fetch_metadata(&name1); let topic = &metadata.topics()[0]; let n = topic.partitions().len(); - if n != 5 { - Err(format!("topic has {} partitions, but expected {}", n, 5))?; + if n == 5 { + break; + } else if tries >= 5 { + panic!("topic has {} partitions, but expected {}", n, 5); + } else { + tries += 1; + tokio::time::delay_for(Duration::from_secs(1)).await; } - Ok(()) - }) - .retry(&mut backoff) - .unwrap(); + } let res = admin_client .delete_topics(&[&name1, &name2], &opts) - .wait() + .await .expect("topic deletion failed"); assert_eq!(res, &[Ok(name1.clone()), Ok(name2.clone())]); verify_delete(&name1); @@ -196,7 +195,7 @@ fn test_topics() { // creating topics. { let topic = NewTopic::new("ignored", 1, TopicReplication::Variable(&[&[0], &[0]])); - let res = admin_client.create_topics(&[topic], &opts).wait(); + let res = admin_client.create_topics(&[topic], &opts).await; assert_eq!( Err(KafkaError::AdminOpCreation( "replication configuration for topic 'ignored' assigns 2 partition(s), \ @@ -215,7 +214,7 @@ fn test_topics() { let res = admin_client .create_topics(vec![&topic], &opts) - .wait() + .await .expect("topic creation failed"); assert_eq!(res, &[Ok(name.clone())]); let _ = fetch_metadata(&name); @@ -223,7 +222,7 @@ fn test_topics() { // This partition specification is obviously garbage, and so trips // a client-side error. let partitions = NewPartitions::new(&name, 2).assign(&[&[0], &[0], &[0]]); - let res = admin_client.create_partitions(&[partitions], &opts).wait(); + let res = admin_client.create_partitions(&[partitions], &opts).await; assert_eq!( res, Err(KafkaError::AdminOpCreation(format!( @@ -237,7 +236,7 @@ fn test_topics() { let partitions = NewPartitions::new(&name, 2).assign(&[&[0], &[0]]); let res = admin_client .create_partitions(&[partitions], &opts) - .wait() + .await .expect("partition creation failed"); assert_eq!(res, &[Err((name, RDKafkaError::InvalidReplicaAssignment))],); } @@ -247,7 +246,7 @@ fn test_topics() { let name = rand_test_topic(); let res = admin_client .delete_topics(&[&name], &opts) - .wait() + .await .expect("delete topics failed"); assert_eq!(res, &[Err((name, RDKafkaError::UnknownTopicOrPartition))]); } @@ -263,14 +262,14 @@ fn test_topics() { let res = admin_client .create_topics(vec![&topic1], &opts) - .wait() + .await .expect("topic creation failed"); assert_eq!(res, &[Ok(name1.clone())]); let _ = fetch_metadata(&name1); let res = admin_client .create_topics(vec![&topic1, &topic2], &opts) - .wait() + .await .expect("topic creation failed"); assert_eq!( res, @@ -283,14 +282,14 @@ fn test_topics() { let res = admin_client .delete_topics(&[&name1], &opts) - .wait() + .await .expect("topic deletion failed"); assert_eq!(res, &[Ok(name1.clone())]); verify_delete(&name1); let res = admin_client .delete_topics(&[&name2, &name1], &opts) - .wait() + .await .expect("topic deletion failed"); assert_eq!( res, @@ -302,15 +301,15 @@ fn test_topics() { } } -#[test] -fn test_configs() { +#[tokio::test] +async fn test_configs() { let admin_client = create_admin_client(); let opts = AdminOptions::new(); let broker = ResourceSpecifier::Broker(0); let res = admin_client .describe_configs(&[broker], &opts) - .wait() + .await .expect("describe configs failed"); let config = &res[0].as_ref().expect("describe configs failed"); let orig_val = config @@ -323,16 +322,15 @@ fn test_configs() { let config = AlterConfig::new(broker).set("log.flush.interval.messages", "1234"); let res = admin_client .alter_configs(&[config], &opts) - .wait() + .await .expect("alter configs failed"); assert_eq!(res, &[Ok(OwnedResourceSpecifier::Broker(0))]); - let mut backoff = ExponentialBackoff::default(); - backoff.max_elapsed_time = Some(Duration::from_secs(5)); - (|| { + let mut tries = 0; + loop { let res = admin_client .describe_configs(&[broker], &opts) - .wait() + .await .expect("describe configs failed"); let config = &res[0].as_ref().expect("describe configs failed"); let entry = config.get("log.flush.interval.messages"); @@ -357,18 +355,20 @@ fn test_configs() { is_sensitive: false, } }; - if entry != Some(&expected_entry) { - Err(format!("{:?} != {:?}", entry, Some(&expected_entry)))? + if entry == Some(&expected_entry) { + break; + } else if tries >= 5 { + panic!("{:?} != {:?}", entry, Some(&expected_entry)); + } else { + tries += 1; + tokio::time::delay_for(Duration::from_secs(1)).await; } - Ok(()) - }) - .retry(&mut backoff) - .unwrap(); + } let config = AlterConfig::new(broker).set("log.flush.interval.ms", &orig_val); let res = admin_client .alter_configs(&[config], &opts) - .wait() + .await .expect("alter configs failed"); assert_eq!(res, &[Ok(OwnedResourceSpecifier::Broker(0))]); } @@ -376,8 +376,8 @@ fn test_configs() { // Tests whether each admin operation properly reports an error if the entire // request fails. The original implementations failed to check this, resulting // in confusing situations where a failed admin request would return Ok([]). -#[test] -fn test_event_errors() { +#[tokio::test] +async fn test_event_errors() { // Configure an admin client to target a Kafka server that doesn't exist, // then set an impossible timeout. This will ensure that every request fails // with an OperationTimedOut error, assuming, of course, that the request @@ -388,31 +388,31 @@ fn test_event_errors() { .expect("admin client creation failed"); let opts = AdminOptions::new().request_timeout(Some(Duration::from_nanos(1))); - let res = admin_client.create_topics(&[], &opts).wait(); + let res = admin_client.create_topics(&[], &opts).await; assert_eq!( res, Err(KafkaError::AdminOp(RDKafkaError::OperationTimedOut)) ); - let res = admin_client.create_partitions(&[], &opts).wait(); + let res = admin_client.create_partitions(&[], &opts).await; assert_eq!( res, Err(KafkaError::AdminOp(RDKafkaError::OperationTimedOut)) ); - let res = admin_client.delete_topics(&[], &opts).wait(); + let res = admin_client.delete_topics(&[], &opts).await; assert_eq!( res, Err(KafkaError::AdminOp(RDKafkaError::OperationTimedOut)) ); - let res = admin_client.describe_configs(&[], &opts).wait(); + let res = admin_client.describe_configs(&[], &opts).await; assert_eq!( res.err(), Some(KafkaError::AdminOp(RDKafkaError::OperationTimedOut)) ); - let res = admin_client.alter_configs(&[], &opts).wait(); + let res = admin_client.alter_configs(&[], &opts).await; assert_eq!( res, Err(KafkaError::AdminOp(RDKafkaError::OperationTimedOut)) diff --git a/tests/test_consumers.rs b/tests/test_consumers.rs index de42a59b9..7df35d21f 100644 --- a/tests/test_consumers.rs +++ b/tests/test_consumers.rs @@ -107,13 +107,13 @@ fn create_base_consumer( } // All produced messages should be consumed. -#[test] -fn test_produce_consume_iter() { +#[tokio::test] +async fn test_produce_consume_iter() { let _r = env_logger::try_init(); let start_time = current_time_millis(); let topic_name = rand_test_topic(); - let message_map = populate_topic(&topic_name, 100, &value_fn, &key_fn, None, None); + let message_map = populate_topic(&topic_name, 100, &value_fn, &key_fn, None, None).await; let consumer = create_base_consumer(&rand_test_group(), None); consumer.subscribe(&[topic_name.as_str()]).unwrap(); @@ -135,13 +135,13 @@ fn test_produce_consume_iter() { } // All produced messages should be consumed. -#[test] -fn test_produce_consume_base() { +#[tokio::test] +async fn test_produce_consume_base() { let _r = env_logger::try_init(); let start_time = current_time_millis(); let topic_name = rand_test_topic(); - let message_map = populate_topic(&topic_name, 100, &value_fn, &key_fn, None, None); + let message_map = populate_topic(&topic_name, 100, &value_fn, &key_fn, None, None).await; let consumer = create_stream_consumer(&rand_test_group(), None); consumer.subscribe(&[topic_name.as_str()]).unwrap(); @@ -162,18 +162,18 @@ fn test_produce_consume_base() { } Err(e) => panic!("Error receiving message: {:?}", e), }; - Ok(()) + future::ready(()) }) - .wait(); + .await; } // Seeking should allow replaying messages and skipping messages. -#[test] -fn test_produce_consume_seek() { +#[tokio::test] +async fn test_produce_consume_seek() { let _r = env_logger::try_init(); let topic_name = rand_test_topic(); - populate_topic(&topic_name, 5, &value_fn, &key_fn, Some(0), None); + populate_topic(&topic_name, 5, &value_fn, &key_fn, Some(0), None).await; let consumer = create_base_consumer(&rand_test_group(), None); consumer.subscribe(&[topic_name.as_str()]).unwrap(); @@ -201,14 +201,14 @@ fn test_produce_consume_seek() { } // All produced messages should be consumed. -#[test] -fn test_produce_consume_base_assign() { +#[tokio::test] +async fn test_produce_consume_base_assign() { let _r = env_logger::try_init(); let topic_name = rand_test_topic(); - populate_topic(&topic_name, 10, &value_fn, &key_fn, Some(0), None); - populate_topic(&topic_name, 10, &value_fn, &key_fn, Some(1), None); - populate_topic(&topic_name, 10, &value_fn, &key_fn, Some(2), None); + populate_topic(&topic_name, 10, &value_fn, &key_fn, Some(0), None).await; + populate_topic(&topic_name, 10, &value_fn, &key_fn, Some(1), None).await; + populate_topic(&topic_name, 10, &value_fn, &key_fn, Some(2), None).await; let consumer = create_stream_consumer(&rand_test_group(), None); let mut tpl = TopicPartitionList::new(); tpl.add_partition_offset(&topic_name, 0, Offset::Beginning); @@ -226,20 +226,21 @@ fn test_produce_consume_base_assign() { Ok(m) => partition_count[m.partition() as usize] += 1, Err(e) => panic!("Error receiving message: {:?}", e), }; - Ok(()) + future::ready(()) }) - .wait(); + .await; assert_eq!(partition_count, vec![10, 8, 1]); } // All produced messages should be consumed. -#[test] -fn test_produce_consume_with_timestamp() { +#[tokio::test] +async fn test_produce_consume_with_timestamp() { let _r = env_logger::try_init(); let topic_name = rand_test_topic(); - let message_map = populate_topic(&topic_name, 100, &value_fn, &key_fn, Some(0), Some(1111)); + let message_map = + populate_topic(&topic_name, 100, &value_fn, &key_fn, Some(0), Some(1111)).await; let consumer = create_stream_consumer(&rand_test_group(), None); consumer.subscribe(&[topic_name.as_str()]).unwrap(); @@ -256,11 +257,11 @@ fn test_produce_consume_with_timestamp() { } Err(e) => panic!("Error receiving message: {:?}", e), }; - Ok(()) + future::ready(()) }) - .wait(); + .await; - populate_topic(&topic_name, 10, &value_fn, &key_fn, Some(0), Some(999_999)); + populate_topic(&topic_name, 10, &value_fn, &key_fn, Some(0), Some(999_999)).await; // Lookup the offsets let tpl = consumer @@ -273,19 +274,19 @@ fn test_produce_consume_with_timestamp() { assert_eq!(tp.error(), Ok(())); } -#[test] -fn test_consume_with_no_message_error() { +#[tokio::test] +async fn test_consume_with_no_message_error() { let _r = env_logger::try_init(); let consumer = create_stream_consumer(&rand_test_group(), None); - let message_stream = consumer.start_with(Duration::from_millis(200), true); + let mut message_stream = consumer.start_with(Duration::from_millis(200), true); let mut first_poll_time = None; let mut timeouts_count = 0; - for message in message_stream.wait() { + while let Some(message) = message_stream.next().await { match message { - Ok(Err(KafkaError::NoMessageReceived)) => { + Err(KafkaError::NoMessageReceived) => { // TODO: use entry interface for Options once available if first_poll_time.is_none() { first_poll_time = Some(Instant::now()); @@ -308,14 +309,14 @@ fn test_consume_with_no_message_error() { } // TODO: add check that commit cb gets called correctly -#[test] -fn test_consumer_commit_message() { +#[tokio::test] +async fn test_consumer_commit_message() { let _r = env_logger::try_init(); let topic_name = rand_test_topic(); - populate_topic(&topic_name, 10, &value_fn, &key_fn, Some(0), None); - populate_topic(&topic_name, 11, &value_fn, &key_fn, Some(1), None); - populate_topic(&topic_name, 12, &value_fn, &key_fn, Some(2), None); + populate_topic(&topic_name, 10, &value_fn, &key_fn, Some(0), None).await; + populate_topic(&topic_name, 11, &value_fn, &key_fn, Some(1), None).await; + populate_topic(&topic_name, 12, &value_fn, &key_fn, Some(2), None).await; let consumer = create_stream_consumer(&rand_test_group(), None); consumer.subscribe(&[topic_name.as_str()]).unwrap(); @@ -331,9 +332,9 @@ fn test_consumer_commit_message() { } Err(e) => panic!("error receiving message: {:?}", e), }; - Ok(()) + future::ready(()) }) - .wait(); + .await; let timeout = Duration::from_secs(5); assert_eq!( @@ -368,14 +369,14 @@ fn test_consumer_commit_message() { assert_eq!(position, consumer.position().unwrap()); } -#[test] -fn test_consumer_store_offset_commit() { +#[tokio::test] +async fn test_consumer_store_offset_commit() { let _r = env_logger::try_init(); let topic_name = rand_test_topic(); - populate_topic(&topic_name, 10, &value_fn, &key_fn, Some(0), None); - populate_topic(&topic_name, 11, &value_fn, &key_fn, Some(1), None); - populate_topic(&topic_name, 12, &value_fn, &key_fn, Some(2), None); + populate_topic(&topic_name, 10, &value_fn, &key_fn, Some(0), None).await; + populate_topic(&topic_name, 11, &value_fn, &key_fn, Some(1), None).await; + populate_topic(&topic_name, 12, &value_fn, &key_fn, Some(2), None).await; let mut config = HashMap::new(); config.insert("enable.auto.offset.store", "false"); config.insert("enable.partition.eof", "true"); @@ -395,9 +396,9 @@ fn test_consumer_store_offset_commit() { Err(KafkaError::PartitionEOF(_)) => {} Err(e) => panic!("Error receiving message: {:?}", e), }; - Ok(()) + future::ready(()) }) - .wait(); + .await; // Commit the whole current state consumer.commit_consumer_state(CommitMode::Sync).unwrap(); @@ -443,8 +444,8 @@ fn ensure_empty(consumer: &BaseConsumer, err_msg: &str) { } } -#[test] -fn test_pause_resume_consumer_iter() { +#[tokio::test] +async fn test_pause_resume_consumer_iter() { const PAUSE_COUNT: i32 = 3; const MESSAGE_COUNT: i32 = 300; const MESSAGES_PER_PAUSE: i32 = MESSAGE_COUNT / PAUSE_COUNT; @@ -459,7 +460,8 @@ fn test_pause_resume_consumer_iter() { &key_fn, Some(0), None, - ); + ) + .await; let group_id = rand_test_group(); let consumer = create_base_consumer(&group_id, None); consumer.subscribe(&[topic_name.as_str()]).unwrap(); @@ -488,8 +490,8 @@ fn test_pause_resume_consumer_iter() { } // All produced messages should be consumed. -#[test] -fn test_produce_consume_message_queue_nonempty_callback() { +#[tokio::test] +async fn test_produce_consume_message_queue_nonempty_callback() { let _r = env_logger::try_init(); let topic_name = rand_test_topic(); @@ -534,7 +536,7 @@ fn test_produce_consume_message_queue_nonempty_callback() { assert!(consumer.poll(Duration::from_secs(0)).is_none()); // Populate the topic, and expect a wakeup notifying us of the new messages. - populate_topic(&topic_name, 2, &value_fn, &key_fn, None, None); + populate_topic(&topic_name, 2, &value_fn, &key_fn, None, None).await; wait_for_wakeups(2); // Read one of the messages. @@ -542,7 +544,7 @@ fn test_produce_consume_message_queue_nonempty_callback() { // Add more messages to the topic. Expect no additional wakeups, as the // queue is not fully drained, for 1s. - populate_topic(&topic_name, 2, &value_fn, &key_fn, None, None); + populate_topic(&topic_name, 2, &value_fn, &key_fn, None, None).await; std::thread::sleep(Duration::from_secs(1)); assert_eq!(wakeups.load(Ordering::SeqCst), 2); @@ -554,6 +556,6 @@ fn test_produce_consume_message_queue_nonempty_callback() { assert_eq!(wakeups.load(Ordering::SeqCst), 2); // Add another message, and expect a wakeup. - populate_topic(&topic_name, 1, &value_fn, &key_fn, None, None); + populate_topic(&topic_name, 1, &value_fn, &key_fn, None, None).await; wait_for_wakeups(3); } diff --git a/tests/test_high_producers.rs b/tests/test_high_producers.rs index adb0937cc..eb53e89bd 100644 --- a/tests/test_high_producers.rs +++ b/tests/test_high_producers.rs @@ -3,8 +3,6 @@ extern crate futures; extern crate rand; extern crate rdkafka; -use futures::Future; - use rdkafka::config::ClientConfig; use rdkafka::message::{Headers, Message, OwnedHeaders}; use rdkafka::producer::future_producer::FutureRecord; @@ -12,8 +10,8 @@ use rdkafka::producer::FutureProducer; use std::error::Error; -#[test] -fn test_future_producer_send_fail() { +#[tokio::test] +async fn test_future_producer_send_fail() { let producer = ClientConfig::new() .set("bootstrap.servers", "localhost") .set("message.timeout.ms", "5000") @@ -34,7 +32,7 @@ fn test_future_producer_send_fail() { 10000, ); - match future.wait() { + match future.await { Ok(Err((kafka_error, owned_message))) => { assert_eq!(kafka_error.description(), "Message production error"); assert_eq!(owned_message.topic(), "topic"); diff --git a/tests/test_metadata.rs b/tests/test_metadata.rs index 2a5cee53a..3c90ddd92 100644 --- a/tests/test_metadata.rs +++ b/tests/test_metadata.rs @@ -28,14 +28,14 @@ fn create_consumer(group_id: &str) -> StreamConsumer { .expect("Failed to create StreamConsumer") } -#[test] -fn test_metadata() { +#[tokio::test] +async fn test_metadata() { let _r = env_logger::try_init(); let topic_name = rand_test_topic(); - populate_topic(&topic_name, 1, &value_fn, &key_fn, Some(0), None); - populate_topic(&topic_name, 1, &value_fn, &key_fn, Some(1), None); - populate_topic(&topic_name, 1, &value_fn, &key_fn, Some(2), None); + populate_topic(&topic_name, 1, &value_fn, &key_fn, Some(0), None).await; + populate_topic(&topic_name, 1, &value_fn, &key_fn, Some(1), None).await; + populate_topic(&topic_name, 1, &value_fn, &key_fn, Some(2), None).await; let consumer = create_consumer(&rand_test_group()); let metadata = consumer @@ -89,36 +89,37 @@ fn test_metadata() { assert_eq!(metadata_one_topic.topics().len(), 1); } -#[test] -fn test_subscription() { +#[tokio::test] +async fn test_subscription() { let _r = env_logger::try_init(); let topic_name = rand_test_topic(); - populate_topic(&topic_name, 10, &value_fn, &key_fn, None, None); + populate_topic(&topic_name, 10, &value_fn, &key_fn, None, None).await; let consumer = create_consumer(&rand_test_group()); consumer.subscribe(&[topic_name.as_str()]).unwrap(); - let _consumer_future = consumer.start().take(10).wait(); + // Make sure the consumer joins the group. + let _consumer_future = consumer.start().next().await; let mut tpl = TopicPartitionList::new(); tpl.add_topic_unassigned(&topic_name); assert_eq!(tpl, consumer.subscription().unwrap()); } -#[test] -fn test_group_membership() { +#[tokio::test] +async fn test_group_membership() { let _r = env_logger::try_init(); let topic_name = rand_test_topic(); let group_name = rand_test_group(); - populate_topic(&topic_name, 1, &value_fn, &key_fn, Some(0), None); - populate_topic(&topic_name, 1, &value_fn, &key_fn, Some(1), None); - populate_topic(&topic_name, 1, &value_fn, &key_fn, Some(2), None); + populate_topic(&topic_name, 1, &value_fn, &key_fn, Some(0), None).await; + populate_topic(&topic_name, 1, &value_fn, &key_fn, Some(1), None).await; + populate_topic(&topic_name, 1, &value_fn, &key_fn, Some(2), None).await; let consumer = create_consumer(&group_name); consumer.subscribe(&[topic_name.as_str()]).unwrap(); - // Make sure the consumer joins the group - let _consumer_future = consumer.start().take(1).for_each(|_| Ok(())).wait(); + // Make sure the consumer joins the group. + let _consumer_future = consumer.start().next().await; let group_list = consumer .fetch_group_list(None, Duration::from_secs(5)) diff --git a/tests/utils.rs b/tests/utils.rs index d7868f4bc..34ae45d2a 100644 --- a/tests/utils.rs +++ b/tests/utils.rs @@ -4,7 +4,6 @@ extern crate rand; extern crate rdkafka; extern crate regex; -use futures::*; use rand::Rng; use regex::Regex; @@ -90,7 +89,7 @@ impl ClientContext for TestContext { /// Produce the specified count of messages to the topic and partition specified. A map /// of (partition, offset) -> message id will be returned. It panics if any error is encountered /// while populating the topic. -pub fn populate_topic( +pub async fn populate_topic( topic_name: &str, count: i32, value_fn: &P, @@ -135,7 +134,7 @@ where let mut message_map = HashMap::new(); for (id, future) in futures { - match future.wait() { + match future.await { Ok(Ok((partition, offset))) => message_map.insert((partition, offset), id), Ok(Err((kafka_error, _message))) => panic!("Delivery failed: {}", kafka_error), Err(e) => panic!("Waiting for future failed: {}", e), @@ -157,10 +156,10 @@ pub fn key_fn(id: i32) -> String { mod tests { use super::*; - #[test] - fn test_populate_topic() { + #[tokio::test] + async fn test_populate_topic() { let topic_name = rand_test_topic(); - let message_map = populate_topic(&topic_name, 100, &value_fn, &key_fn, Some(0), None); + let message_map = populate_topic(&topic_name, 100, &value_fn, &key_fn, Some(0), None).await; let total_messages = message_map .iter()