From 2779f77ba92c32619993e1638f59b65ab6adeff1 Mon Sep 17 00:00:00 2001 From: Nikhil Benesch Date: Mon, 9 Dec 2019 00:44:04 -0500 Subject: [PATCH] Implement Send + Sync on BorrowedMessage librdkafka is entirely thread safe, and guarantees that the memory referenced by a BorrowedMessage will remain valid as long as the consumer remains valid. Since BorrowedMessage has a reference to the consumer inside, Rust will enforce this invariant, and it is therefore safe to mark BorrowedMessage as Send + Sync. Also update the asynchronous_processing example to make use of this new feature. The example now allows spawning multiple workers, which requires tokio::spawn, which turn requires that the generated future implement Send, which finally requires the adjustment made in this patch. Fix #85. Fix #189. --- examples/asynchronous_processing.rs | 107 ++++++++++++++++++---------- src/message.rs | 3 + 2 files changed, 73 insertions(+), 37 deletions(-) diff --git a/examples/asynchronous_processing.rs b/examples/asynchronous_processing.rs index 32a529430..40c785b41 100644 --- a/examples/asynchronous_processing.rs +++ b/examples/asynchronous_processing.rs @@ -1,14 +1,15 @@ use std::thread; use std::time::Duration; -use clap::{App, Arg}; -use futures::{future, TryStreamExt}; -use log::{info, warn}; +use clap::{App, Arg, value_t}; +use futures::{StreamExt, TryStreamExt}; +use futures::stream::FuturesUnordered; +use log::info; use rdkafka::config::ClientConfig; use rdkafka::consumer::stream_consumer::StreamConsumer; use rdkafka::consumer::Consumer; -use rdkafka::message::OwnedMessage; +use rdkafka::message::{BorrowedMessage, OwnedMessage}; use rdkafka::producer::{FutureProducer, FutureRecord}; use rdkafka::Message; @@ -16,6 +17,12 @@ use crate::example_utils::setup_logger; mod example_utils; +async fn record_message_receipt(msg: &BorrowedMessage<'_>) { + // Simulate some work that must be done in the same order as messages are + // received; i.e., before truly parallel processing can begin. + info!("Message received: {}", msg.offset()); +} + // Emulates an expensive, synchronous computation. fn expensive_computation<'a>(msg: OwnedMessage) -> String { info!("Starting expensive computation on message {}", msg.offset()); @@ -39,11 +46,16 @@ fn expensive_computation<'a>(msg: OwnedMessage) -> String { // `tokio::spawn` is used to handle IO-bound tasks in parallel (e.g., producing // the messages), while `tokio::task::spawn_blocking` is used to handle the // simulated CPU-bound task. -async fn run_async_processor(brokers: &str, group_id: &str, input_topic: &str, output_topic: &str) { +async fn run_async_processor( + brokers: String, + group_id: String, + input_topic: String, + output_topic: String, +) { // Create the `StreamConsumer`, to receive the messages from the topic in form of a `Stream`. let consumer: StreamConsumer = ClientConfig::new() - .set("group.id", group_id) - .set("bootstrap.servers", brokers) + .set("group.id", &group_id) + .set("bootstrap.servers", &brokers) .set("enable.partition.eof", "false") .set("session.timeout.ms", "6000") .set("enable.auto.commit", "false") @@ -51,46 +63,48 @@ async fn run_async_processor(brokers: &str, group_id: &str, input_topic: &str, o .expect("Consumer creation failed"); consumer - .subscribe(&[input_topic]) + .subscribe(&[&input_topic]) .expect("Can't subscribe to specified topic"); // Create the `FutureProducer` to produce asynchronously. let producer: FutureProducer = ClientConfig::new() - .set("bootstrap.servers", brokers) + .set("bootstrap.servers", &brokers) .set("message.timeout.ms", "5000") .create() .expect("Producer creation error"); // Create the outer pipeline on the message stream. let stream_processor = consumer.start().try_for_each(|borrowed_message| { - // Process each message - info!("Message received: {}", borrowed_message.offset()); - // Borrowed messages can't outlive the consumer they are received from, so they need to - // be owned in order to be sent to a separate thread. - let owned_message = borrowed_message.detach(); - let output_topic = output_topic.to_string(); let producer = producer.clone(); - tokio::spawn(async move { - // The body of this block will be executed on the main thread pool, - // but we perform `expensive_computation` on a separate thread pool - // for CPU-intensive tasks via `tokio::task::spawn_blocking`. - let computation_result = - tokio::task::spawn_blocking(|| expensive_computation(owned_message)) - .await - .expect("failed to wait for expensive computation"); - let produce_future = producer.send( - FutureRecord::to(&output_topic) - .key("some key") - .payload(&computation_result), - 0, - ); - match produce_future.await { - Ok(Ok(delivery)) => println!("Sent: {:?}", delivery), - Ok(Err((e, _))) => println!("Error: {:?}", e), - Err(_) => println!("Future cancelled"), - } - }); - future::ready(Ok(())) + let output_topic = output_topic.to_string(); + async move { + // Process each message + record_message_receipt(&borrowed_message).await; + // Borrowed messages can't outlive the consumer they are received from, so they need to + // be owned in order to be sent to a separate thread. + let owned_message = borrowed_message.detach(); + tokio::spawn(async move { + // The body of this block will be executed on the main thread pool, + // but we perform `expensive_computation` on a separate thread pool + // for CPU-intensive tasks via `tokio::task::spawn_blocking`. + let computation_result = + tokio::task::spawn_blocking(|| expensive_computation(owned_message)) + .await + .expect("failed to wait for expensive computation"); + let produce_future = producer.send( + FutureRecord::to(&output_topic) + .key("some key") + .payload(&computation_result), + 0, + ); + match produce_future.await { + Ok(Ok(delivery)) => println!("Sent: {:?}", delivery), + Ok(Err((e, _))) => println!("Error: {:?}", e), + Err(_) => println!("Future cancelled"), + } + }); + Ok(()) + } }); info!("Starting event loop"); @@ -139,6 +153,13 @@ async fn main() { .takes_value(true) .required(true), ) + .arg( + Arg::with_name("num-workers") + .long("num-workers") + .help("Number of workers") + .takes_value(true) + .default_value("1"), + ) .get_matches(); setup_logger(true, matches.value_of("log-conf")); @@ -147,6 +168,18 @@ async fn main() { let group_id = matches.value_of("group-id").unwrap(); let input_topic = matches.value_of("input-topic").unwrap(); let output_topic = matches.value_of("output-topic").unwrap(); + let num_workers = value_t!(matches, "num-workers", usize).unwrap(); - run_async_processor(brokers, group_id, input_topic, output_topic).await + (0..num_workers) + .map(|_| { + tokio::spawn(run_async_processor( + brokers.to_owned(), + group_id.to_owned(), + input_topic.to_owned(), + output_topic.to_owned(), + )) + }) + .collect::>() + .for_each(|_| async { () }) + .await } diff --git a/src/message.rs b/src/message.rs index e02bd076b..778258d6c 100644 --- a/src/message.rs +++ b/src/message.rs @@ -362,6 +362,9 @@ impl<'a> Drop for BorrowedMessage<'a> { } } +unsafe impl<'a> Send for BorrowedMessage<'a> {} +unsafe impl<'a> Sync for BorrowedMessage<'a> {} + // // ********** OWNED MESSAGE ********** //