From b8ef31692fa9095c00a6f0f78eda188e36f4603f Mon Sep 17 00:00:00 2001 From: Nikhil Benesch Date: Wed, 21 Aug 2019 12:02:05 -0400 Subject: [PATCH] Plumb message_queue_nonempty callback This makes it possible to receive a notification via a Rust callback when there are new messages available. --- src/client.rs | 11 +++++ src/consumer/base_consumer.rs | 33 +++++++++++++- src/consumer/mod.rs | 4 ++ tests/test_consumers.rs | 86 ++++++++++++++++++++++++++++++++++- 4 files changed, 131 insertions(+), 3 deletions(-) diff --git a/src/client.rs b/src/client.rs index 1597e118d..337680724 100644 --- a/src/client.rs +++ b/src/client.rs @@ -278,6 +278,17 @@ impl Client { pub(crate) fn new_native_queue(&self) -> NativeQueue { unsafe { NativeQueue::from_ptr(rdsys::rd_kafka_queue_new(self.native_ptr())) } } + + pub(crate) fn consumer_queue(&self) -> Option { + unsafe { + let ptr = rdsys::rd_kafka_queue_get_consumer(self.native_ptr()); + if ptr.is_null() { + None + } else { + Some(NativeQueue::from_ptr(ptr)) + } + } + } } pub(crate) struct NativeTopic { diff --git a/src/consumer/base_consumer.rs b/src/consumer/base_consumer.rs index 6663a385e..a6a2f78c1 100644 --- a/src/consumer/base_consumer.rs +++ b/src/consumer/base_consumer.rs @@ -2,7 +2,7 @@ use crate::rdsys; use crate::rdsys::types::*; -use crate::client::{Client, NativeClient}; +use crate::client::{Client, NativeClient, NativeQueue}; use crate::config::{ClientConfig, FromClientConfig, FromClientConfigAndContext}; use crate::consumer::{CommitMode, Consumer, ConsumerContext, DefaultConsumerContext}; use crate::error::{IsError, KafkaError, KafkaResult}; @@ -55,11 +55,25 @@ unsafe extern "C" fn native_rebalance_cb( tpl.leak() // Do not free native topic partition list } +/// Native message queue nonempty callback. This callback will run whenever the +/// consumer's message queue switches from empty to nonempty. +unsafe extern "C" fn native_message_queue_nonempty_cb( + _: *mut RDKafka, + opaque_ptr: *mut c_void, +) { + let context = Box::from_raw(opaque_ptr as *mut C); + + (*context).message_queue_nonempty_callback(); + + mem::forget(context); // Do not free the context +} + /// Low level wrapper around the librdkafka consumer. This consumer requires to be periodically polled /// to make progress on rebalance, callbacks and to receive messages. pub struct BaseConsumer { client: Client, main_queue_min_poll_interval: Timeout, + _queue: Option, } impl FromClientConfig for BaseConsumer { @@ -89,14 +103,31 @@ impl FromClientConfigAndContext for BaseConsumer { RDKafkaType::RD_KAFKA_CONSUMER, context, )?; + let queue = client.consumer_queue(); + unsafe { + if let Some(queue) = &queue { + let context_ptr = client.context() as *const C as *mut c_void; + rdsys::rd_kafka_queue_cb_event_enable( + queue.ptr(), + Some(native_message_queue_nonempty_cb::), + context_ptr, + ); + } + } Ok(BaseConsumer { client, main_queue_min_poll_interval, + _queue: queue, }) } } impl BaseConsumer { + /// Returns the context used to create this consumer. + pub fn context(&self) -> &C { + self.client.context() + } + /// Polls the consumer for messages and returns a pointer to the native rdkafka-sys struct. /// This method is for internal use only. Use poll instead. pub(crate) fn poll_raw(&self, mut timeout: Timeout) -> Option<*mut RDKafkaMessage> { diff --git a/src/consumer/mod.rs b/src/consumer/mod.rs index fc857ee36..5a12d0a4b 100644 --- a/src/consumer/mod.rs +++ b/src/consumer/mod.rs @@ -107,6 +107,10 @@ pub trait ConsumerContext: ClientContext { fn main_queue_min_poll_interval(&self) -> Timeout { Timeout::After(Duration::from_secs(1)) } + + /// Message queue nonempty callback. This method will run when the + /// consumer's message queue switches from empty to nonempty. + fn message_queue_nonempty_callback(&self) {} } /// An empty consumer context that can be user when no context is needed. diff --git a/tests/test_consumers.rs b/tests/test_consumers.rs index 2f11720bf..b1d08b2fb 100644 --- a/tests/test_consumers.rs +++ b/tests/test_consumers.rs @@ -17,10 +17,14 @@ mod utils; use crate::utils::*; use std::collections::HashMap; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::Arc; +use std::thread; use std::time::{Duration, Instant}; struct TestContext { _n: i64, // Add data for memory access validation + wakeups: Arc, } impl ClientContext for TestContext { @@ -39,6 +43,10 @@ impl ConsumerContext for TestContext { ) { println!("Committing offsets: {:?}", result); } + + fn message_queue_nonempty_callback(&self) { + self.wakeups.fetch_add(1, Ordering::SeqCst); + } } fn consumer_config(group_id: &str, config_overrides: Option>) -> ClientConfig { @@ -69,7 +77,10 @@ fn create_stream_consumer( group_id: &str, config_overrides: Option>, ) -> StreamConsumer { - let cons_context = TestContext { _n: 64 }; + let cons_context = TestContext { + _n: 64, + wakeups: Arc::new(AtomicUsize::new(0)), + }; create_stream_consumer_with_context(group_id, config_overrides, cons_context) } @@ -88,7 +99,10 @@ fn create_base_consumer( config_overrides: Option>, ) -> BaseConsumer { consumer_config(group_id, config_overrides) - .create_with_context(TestContext { _n: 64 }) + .create_with_context(TestContext { + _n: 64, + wakeups: Arc::new(AtomicUsize::new(0)), + }) .expect("Consumer creation failed") } @@ -472,3 +486,71 @@ fn test_pause_resume_consumer_iter() { ensure_empty(&consumer, "There should be no messages left"); } + +// All produced messages should be consumed. +#[test] +fn test_produce_consume_message_queue_nonempty_callback() { + let _r = env_logger::try_init(); + + let topic_name = rand_test_topic(); + + let consumer: BaseConsumer<_> = consumer_config(&rand_test_group(), None) + .create_with_context(TestContext { + _n: 64, + wakeups: Arc::new(AtomicUsize::new(0)), + }) + .expect("Consumer creation failed"); + consumer.subscribe(&[topic_name.as_str()]).unwrap(); + + let wakeups = consumer.context().wakeups.clone(); + let wait_for_wakeups = |target| { + let start = Instant::now(); + let timeout = Duration::from_secs(5); + loop { + let w = wakeups.load(Ordering::SeqCst); + if w == target { + break; + } else if w > target { + panic!("wakeups {} exceeds target {}", w, target); + } + thread::sleep(Duration::from_millis(100)); + if start.elapsed() > timeout { + panic!("timeout exceeded while waiting for wakeup"); + } + } + }; + + // Expect one initial rebalance callback. + wait_for_wakeups(1); + + // Expect no additional wakeups for 1s. + std::thread::sleep(Duration::from_secs(1)); + assert_eq!(wakeups.load(Ordering::SeqCst), 1); + + // Verify there are no messages waiting. + assert!(consumer.poll(Duration::from_secs(0)).is_none()); + + // Populate the topic, and expect a wakeup notifying us of the new messages. + populate_topic(&topic_name, 2, &value_fn, &key_fn, None, None); + wait_for_wakeups(2); + + // Read one of the messages. + assert!(consumer.poll(Duration::from_secs(0)).is_some()); + + // Add more messages to the topic. Expect no additional wakeups, as the + // queue is not fully drained, for 1s. + populate_topic(&topic_name, 2, &value_fn, &key_fn, None, None); + std::thread::sleep(Duration::from_secs(1)); + assert_eq!(wakeups.load(Ordering::SeqCst), 2); + + // Drain the consumer. + assert_eq!(consumer.iter().take(3).count(), 3); + + // Expect no additional wakeups for 1s. + std::thread::sleep(Duration::from_secs(1)); + assert_eq!(wakeups.load(Ordering::SeqCst), 2); + + // Add another message, and expect a wakeup. + populate_topic(&topic_name, 1, &value_fn, &key_fn, None, None); + wait_for_wakeups(3); +}