, err_msg: &str) {
}
}
-#[test]
-fn test_pause_resume_consumer_iter() {
+#[tokio::test]
+async fn test_pause_resume_consumer_iter() {
const PAUSE_COUNT: i32 = 3;
const MESSAGE_COUNT: i32 = 300;
const MESSAGES_PER_PAUSE: i32 = MESSAGE_COUNT / PAUSE_COUNT;
@@ -459,7 +460,8 @@ fn test_pause_resume_consumer_iter() {
&key_fn,
Some(0),
None,
- );
+ )
+ .await;
let group_id = rand_test_group();
let consumer = create_base_consumer(&group_id, None);
consumer.subscribe(&[topic_name.as_str()]).unwrap();
@@ -488,8 +490,8 @@ fn test_pause_resume_consumer_iter() {
}
// All produced messages should be consumed.
-#[test]
-fn test_produce_consume_message_queue_nonempty_callback() {
+#[tokio::test]
+async fn test_produce_consume_message_queue_nonempty_callback() {
let _r = env_logger::try_init();
let topic_name = rand_test_topic();
@@ -534,7 +536,7 @@ fn test_produce_consume_message_queue_nonempty_callback() {
assert!(consumer.poll(Duration::from_secs(0)).is_none());
// Populate the topic, and expect a wakeup notifying us of the new messages.
- populate_topic(&topic_name, 2, &value_fn, &key_fn, None, None);
+ populate_topic(&topic_name, 2, &value_fn, &key_fn, None, None).await;
wait_for_wakeups(2);
// Read one of the messages.
@@ -542,7 +544,7 @@ fn test_produce_consume_message_queue_nonempty_callback() {
// Add more messages to the topic. Expect no additional wakeups, as the
// queue is not fully drained, for 1s.
- populate_topic(&topic_name, 2, &value_fn, &key_fn, None, None);
+ populate_topic(&topic_name, 2, &value_fn, &key_fn, None, None).await;
std::thread::sleep(Duration::from_secs(1));
assert_eq!(wakeups.load(Ordering::SeqCst), 2);
@@ -554,6 +556,6 @@ fn test_produce_consume_message_queue_nonempty_callback() {
assert_eq!(wakeups.load(Ordering::SeqCst), 2);
// Add another message, and expect a wakeup.
- populate_topic(&topic_name, 1, &value_fn, &key_fn, None, None);
+ populate_topic(&topic_name, 1, &value_fn, &key_fn, None, None).await;
wait_for_wakeups(3);
}
diff --git a/tests/test_high_producers.rs b/tests/test_high_producers.rs
index adb0937cc..eb53e89bd 100644
--- a/tests/test_high_producers.rs
+++ b/tests/test_high_producers.rs
@@ -3,8 +3,6 @@ extern crate futures;
extern crate rand;
extern crate rdkafka;
-use futures::Future;
-
use rdkafka::config::ClientConfig;
use rdkafka::message::{Headers, Message, OwnedHeaders};
use rdkafka::producer::future_producer::FutureRecord;
@@ -12,8 +10,8 @@ use rdkafka::producer::FutureProducer;
use std::error::Error;
-#[test]
-fn test_future_producer_send_fail() {
+#[tokio::test]
+async fn test_future_producer_send_fail() {
let producer = ClientConfig::new()
.set("bootstrap.servers", "localhost")
.set("message.timeout.ms", "5000")
@@ -34,7 +32,7 @@ fn test_future_producer_send_fail() {
10000,
);
- match future.wait() {
+ match future.await {
Ok(Err((kafka_error, owned_message))) => {
assert_eq!(kafka_error.description(), "Message production error");
assert_eq!(owned_message.topic(), "topic");
diff --git a/tests/test_metadata.rs b/tests/test_metadata.rs
index 2a5cee53a..3c90ddd92 100644
--- a/tests/test_metadata.rs
+++ b/tests/test_metadata.rs
@@ -28,14 +28,14 @@ fn create_consumer(group_id: &str) -> StreamConsumer {
.expect("Failed to create StreamConsumer")
}
-#[test]
-fn test_metadata() {
+#[tokio::test]
+async fn test_metadata() {
let _r = env_logger::try_init();
let topic_name = rand_test_topic();
- populate_topic(&topic_name, 1, &value_fn, &key_fn, Some(0), None);
- populate_topic(&topic_name, 1, &value_fn, &key_fn, Some(1), None);
- populate_topic(&topic_name, 1, &value_fn, &key_fn, Some(2), None);
+ populate_topic(&topic_name, 1, &value_fn, &key_fn, Some(0), None).await;
+ populate_topic(&topic_name, 1, &value_fn, &key_fn, Some(1), None).await;
+ populate_topic(&topic_name, 1, &value_fn, &key_fn, Some(2), None).await;
let consumer = create_consumer(&rand_test_group());
let metadata = consumer
@@ -89,36 +89,37 @@ fn test_metadata() {
assert_eq!(metadata_one_topic.topics().len(), 1);
}
-#[test]
-fn test_subscription() {
+#[tokio::test]
+async fn test_subscription() {
let _r = env_logger::try_init();
let topic_name = rand_test_topic();
- populate_topic(&topic_name, 10, &value_fn, &key_fn, None, None);
+ populate_topic(&topic_name, 10, &value_fn, &key_fn, None, None).await;
let consumer = create_consumer(&rand_test_group());
consumer.subscribe(&[topic_name.as_str()]).unwrap();
- let _consumer_future = consumer.start().take(10).wait();
+ // Make sure the consumer joins the group.
+ let _consumer_future = consumer.start().next().await;
let mut tpl = TopicPartitionList::new();
tpl.add_topic_unassigned(&topic_name);
assert_eq!(tpl, consumer.subscription().unwrap());
}
-#[test]
-fn test_group_membership() {
+#[tokio::test]
+async fn test_group_membership() {
let _r = env_logger::try_init();
let topic_name = rand_test_topic();
let group_name = rand_test_group();
- populate_topic(&topic_name, 1, &value_fn, &key_fn, Some(0), None);
- populate_topic(&topic_name, 1, &value_fn, &key_fn, Some(1), None);
- populate_topic(&topic_name, 1, &value_fn, &key_fn, Some(2), None);
+ populate_topic(&topic_name, 1, &value_fn, &key_fn, Some(0), None).await;
+ populate_topic(&topic_name, 1, &value_fn, &key_fn, Some(1), None).await;
+ populate_topic(&topic_name, 1, &value_fn, &key_fn, Some(2), None).await;
let consumer = create_consumer(&group_name);
consumer.subscribe(&[topic_name.as_str()]).unwrap();
- // Make sure the consumer joins the group
- let _consumer_future = consumer.start().take(1).for_each(|_| Ok(())).wait();
+ // Make sure the consumer joins the group.
+ let _consumer_future = consumer.start().next().await;
let group_list = consumer
.fetch_group_list(None, Duration::from_secs(5))
diff --git a/tests/utils.rs b/tests/utils.rs
index d7868f4bc..34ae45d2a 100644
--- a/tests/utils.rs
+++ b/tests/utils.rs
@@ -4,7 +4,6 @@ extern crate rand;
extern crate rdkafka;
extern crate regex;
-use futures::*;
use rand::Rng;
use regex::Regex;
@@ -90,7 +89,7 @@ impl ClientContext for TestContext {
/// Produce the specified count of messages to the topic and partition specified. A map
/// of (partition, offset) -> message id will be returned. It panics if any error is encountered
/// while populating the topic.
-pub fn populate_topic(
+pub async fn populate_topic
(
topic_name: &str,
count: i32,
value_fn: &P,
@@ -135,7 +134,7 @@ where
let mut message_map = HashMap::new();
for (id, future) in futures {
- match future.wait() {
+ match future.await {
Ok(Ok((partition, offset))) => message_map.insert((partition, offset), id),
Ok(Err((kafka_error, _message))) => panic!("Delivery failed: {}", kafka_error),
Err(e) => panic!("Waiting for future failed: {}", e),
@@ -157,10 +156,10 @@ pub fn key_fn(id: i32) -> String {
mod tests {
use super::*;
- #[test]
- fn test_populate_topic() {
+ #[tokio::test]
+ async fn test_populate_topic() {
let topic_name = rand_test_topic();
- let message_map = populate_topic(&topic_name, 100, &value_fn, &key_fn, Some(0), None);
+ let message_map = populate_topic(&topic_name, 100, &value_fn, &key_fn, Some(0), None).await;
let total_messages = message_map
.iter()