Skip to content

Commit

Permalink
Plumb message_queue_nonempty callback
Browse files Browse the repository at this point in the history
This makes it possible to receive a notification via a Rust callback
when there are new messages available.
  • Loading branch information
benesch committed Nov 21, 2019
1 parent 24a28bb commit b8ef316
Show file tree
Hide file tree
Showing 4 changed files with 131 additions and 3 deletions.
11 changes: 11 additions & 0 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,17 @@ impl<C: ClientContext> Client<C> {
pub(crate) fn new_native_queue(&self) -> NativeQueue {
unsafe { NativeQueue::from_ptr(rdsys::rd_kafka_queue_new(self.native_ptr())) }
}

pub(crate) fn consumer_queue(&self) -> Option<NativeQueue> {
unsafe {
let ptr = rdsys::rd_kafka_queue_get_consumer(self.native_ptr());
if ptr.is_null() {
None
} else {
Some(NativeQueue::from_ptr(ptr))
}
}
}
}

pub(crate) struct NativeTopic {
Expand Down
33 changes: 32 additions & 1 deletion src/consumer/base_consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
use crate::rdsys;
use crate::rdsys::types::*;

use crate::client::{Client, NativeClient};
use crate::client::{Client, NativeClient, NativeQueue};
use crate::config::{ClientConfig, FromClientConfig, FromClientConfigAndContext};
use crate::consumer::{CommitMode, Consumer, ConsumerContext, DefaultConsumerContext};
use crate::error::{IsError, KafkaError, KafkaResult};
Expand Down Expand Up @@ -55,11 +55,25 @@ unsafe extern "C" fn native_rebalance_cb<C: ConsumerContext>(
tpl.leak() // Do not free native topic partition list
}

/// Native message queue nonempty callback. This callback will run whenever the
/// consumer's message queue switches from empty to nonempty.
unsafe extern "C" fn native_message_queue_nonempty_cb<C: ConsumerContext>(
_: *mut RDKafka,
opaque_ptr: *mut c_void,
) {
let context = Box::from_raw(opaque_ptr as *mut C);

(*context).message_queue_nonempty_callback();

mem::forget(context); // Do not free the context
}

/// Low level wrapper around the librdkafka consumer. This consumer requires to be periodically polled
/// to make progress on rebalance, callbacks and to receive messages.
pub struct BaseConsumer<C: ConsumerContext = DefaultConsumerContext> {
client: Client<C>,
main_queue_min_poll_interval: Timeout,
_queue: Option<NativeQueue>,
}

impl FromClientConfig for BaseConsumer {
Expand Down Expand Up @@ -89,14 +103,31 @@ impl<C: ConsumerContext> FromClientConfigAndContext<C> for BaseConsumer<C> {
RDKafkaType::RD_KAFKA_CONSUMER,
context,
)?;
let queue = client.consumer_queue();
unsafe {
if let Some(queue) = &queue {
let context_ptr = client.context() as *const C as *mut c_void;
rdsys::rd_kafka_queue_cb_event_enable(
queue.ptr(),
Some(native_message_queue_nonempty_cb::<C>),
context_ptr,
);
}
}
Ok(BaseConsumer {
client,
main_queue_min_poll_interval,
_queue: queue,
})
}
}

impl<C: ConsumerContext> BaseConsumer<C> {
/// Returns the context used to create this consumer.
pub fn context(&self) -> &C {
self.client.context()
}

/// Polls the consumer for messages and returns a pointer to the native rdkafka-sys struct.
/// This method is for internal use only. Use poll instead.
pub(crate) fn poll_raw(&self, mut timeout: Timeout) -> Option<*mut RDKafkaMessage> {
Expand Down
4 changes: 4 additions & 0 deletions src/consumer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,10 @@ pub trait ConsumerContext: ClientContext {
fn main_queue_min_poll_interval(&self) -> Timeout {
Timeout::After(Duration::from_secs(1))
}

/// Message queue nonempty callback. This method will run when the
/// consumer's message queue switches from empty to nonempty.
fn message_queue_nonempty_callback(&self) {}
}

/// An empty consumer context that can be user when no context is needed.
Expand Down
86 changes: 84 additions & 2 deletions tests/test_consumers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,14 @@ mod utils;
use crate::utils::*;

use std::collections::HashMap;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::thread;
use std::time::{Duration, Instant};

struct TestContext {
_n: i64, // Add data for memory access validation
wakeups: Arc<AtomicUsize>,
}

impl ClientContext for TestContext {
Expand All @@ -39,6 +43,10 @@ impl ConsumerContext for TestContext {
) {
println!("Committing offsets: {:?}", result);
}

fn message_queue_nonempty_callback(&self) {
self.wakeups.fetch_add(1, Ordering::SeqCst);
}
}

fn consumer_config(group_id: &str, config_overrides: Option<HashMap<&str, &str>>) -> ClientConfig {
Expand Down Expand Up @@ -69,7 +77,10 @@ fn create_stream_consumer(
group_id: &str,
config_overrides: Option<HashMap<&str, &str>>,
) -> StreamConsumer<TestContext> {
let cons_context = TestContext { _n: 64 };
let cons_context = TestContext {
_n: 64,
wakeups: Arc::new(AtomicUsize::new(0)),
};
create_stream_consumer_with_context(group_id, config_overrides, cons_context)
}

Expand All @@ -88,7 +99,10 @@ fn create_base_consumer(
config_overrides: Option<HashMap<&str, &str>>,
) -> BaseConsumer<TestContext> {
consumer_config(group_id, config_overrides)
.create_with_context(TestContext { _n: 64 })
.create_with_context(TestContext {
_n: 64,
wakeups: Arc::new(AtomicUsize::new(0)),
})
.expect("Consumer creation failed")
}

Expand Down Expand Up @@ -472,3 +486,71 @@ fn test_pause_resume_consumer_iter() {

ensure_empty(&consumer, "There should be no messages left");
}

// All produced messages should be consumed.
#[test]
fn test_produce_consume_message_queue_nonempty_callback() {
let _r = env_logger::try_init();

let topic_name = rand_test_topic();

let consumer: BaseConsumer<_> = consumer_config(&rand_test_group(), None)
.create_with_context(TestContext {
_n: 64,
wakeups: Arc::new(AtomicUsize::new(0)),
})
.expect("Consumer creation failed");
consumer.subscribe(&[topic_name.as_str()]).unwrap();

let wakeups = consumer.context().wakeups.clone();
let wait_for_wakeups = |target| {
let start = Instant::now();
let timeout = Duration::from_secs(5);
loop {
let w = wakeups.load(Ordering::SeqCst);
if w == target {
break;
} else if w > target {
panic!("wakeups {} exceeds target {}", w, target);
}
thread::sleep(Duration::from_millis(100));
if start.elapsed() > timeout {
panic!("timeout exceeded while waiting for wakeup");
}
}
};

// Expect one initial rebalance callback.
wait_for_wakeups(1);

// Expect no additional wakeups for 1s.
std::thread::sleep(Duration::from_secs(1));
assert_eq!(wakeups.load(Ordering::SeqCst), 1);

// Verify there are no messages waiting.
assert!(consumer.poll(Duration::from_secs(0)).is_none());

// Populate the topic, and expect a wakeup notifying us of the new messages.
populate_topic(&topic_name, 2, &value_fn, &key_fn, None, None);
wait_for_wakeups(2);

// Read one of the messages.
assert!(consumer.poll(Duration::from_secs(0)).is_some());

// Add more messages to the topic. Expect no additional wakeups, as the
// queue is not fully drained, for 1s.
populate_topic(&topic_name, 2, &value_fn, &key_fn, None, None);
std::thread::sleep(Duration::from_secs(1));
assert_eq!(wakeups.load(Ordering::SeqCst), 2);

// Drain the consumer.
assert_eq!(consumer.iter().take(3).count(), 3);

// Expect no additional wakeups for 1s.
std::thread::sleep(Duration::from_secs(1));
assert_eq!(wakeups.load(Ordering::SeqCst), 2);

// Add another message, and expect a wakeup.
populate_topic(&topic_name, 1, &value_fn, &key_fn, None, None);
wait_for_wakeups(3);
}

0 comments on commit b8ef316

Please sign in to comment.