Skip to content

Commit

Permalink
Poll the main queue and message queue separately
Browse files Browse the repository at this point in the history
This makes it possible to receive more targeted notifications when there
is activity on just one queue.
  • Loading branch information
benesch committed Nov 21, 2019
1 parent 35cffb7 commit 24a28bb
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 13 deletions.
32 changes: 22 additions & 10 deletions src/consumer/base_consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use crate::metadata::Metadata;
use crate::topic_partition_list::{Offset, TopicPartitionList};
use crate::util::{cstr_to_owned, Timeout};

use std::cmp;
use std::mem;
use std::os::raw::c_void;
use std::ptr;
Expand Down Expand Up @@ -58,6 +59,7 @@ unsafe extern "C" fn native_rebalance_cb<C: ConsumerContext>(
/// to make progress on rebalance, callbacks and to receive messages.
pub struct BaseConsumer<C: ConsumerContext = DefaultConsumerContext> {
client: Client<C>,
main_queue_min_poll_interval: Timeout,
}

impl FromClientConfig for BaseConsumer {
Expand All @@ -80,27 +82,37 @@ impl<C: ConsumerContext> FromClientConfigAndContext<C> for BaseConsumer<C> {
Some(native_commit_cb::<C>),
);
}
let main_queue_min_poll_interval = context.main_queue_min_poll_interval();
let client = Client::new(
config,
native_config,
RDKafkaType::RD_KAFKA_CONSUMER,
context,
)?;
unsafe { rdsys::rd_kafka_poll_set_consumer(client.native_ptr()) };
Ok(BaseConsumer { client })
Ok(BaseConsumer {
client,
main_queue_min_poll_interval,
})
}
}

impl<C: ConsumerContext> BaseConsumer<C> {
/// Polls the consumer for messages and returns a pointer to the native rdkafka-sys struct.
/// This method is for internal use only. Use poll instead.
pub(crate) fn poll_raw(&self, timeout_ms: i32) -> Option<*mut RDKafkaMessage> {
let message_ptr =
unsafe { rdsys::rd_kafka_consumer_poll(self.client.native_ptr(), timeout_ms) };
if message_ptr.is_null() {
None
} else {
Some(message_ptr)
pub(crate) fn poll_raw(&self, mut timeout: Timeout) -> Option<*mut RDKafkaMessage> {
loop {
unsafe { rdsys::rd_kafka_poll(self.client.native_ptr(), 0) };
let op_timeout = cmp::min(timeout, self.main_queue_min_poll_interval);
let message_ptr = unsafe {
rdsys::rd_kafka_consumer_poll(self.client.native_ptr(), op_timeout.as_millis())
};
if !message_ptr.is_null() {
break Some(message_ptr);
}
if op_timeout >= timeout {
break None;
}
timeout -= op_timeout;
}
}

Expand All @@ -118,7 +130,7 @@ impl<C: ConsumerContext> BaseConsumer<C> {
///
/// The returned message lives in the memory of the consumer and cannot outlive it.
pub fn poll<T: Into<Timeout>>(&self, timeout: T) -> Option<KafkaResult<BorrowedMessage>> {
self.poll_raw(timeout.into().as_millis())
self.poll_raw(timeout.into())
.map(|ptr| unsafe { BorrowedMessage::from_consumer(ptr, self) })
}

Expand Down
19 changes: 19 additions & 0 deletions src/consumer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use crate::metadata::Metadata;
use crate::util::{cstr_to_owned, Timeout};

use std::ptr;
use std::time::Duration;

use crate::topic_partition_list::{Offset, TopicPartitionList};

Expand Down Expand Up @@ -88,6 +89,24 @@ pub trait ConsumerContext: ClientContext {
/// offset store.
#[allow(unused_variables)]
fn commit_callback(&self, result: KafkaResult<()>, offsets: *mut RDKafkaTopicPartitionList) {}

/// Returns the minimum interval at which to poll the main queue, which
/// services the logging, stats, and error callbacks.
///
/// The main queue is polled once whenever [`Consumer.poll`] is called. If
/// `Consumer.poll` is called with a timeout that is larger than this
/// interval, then the main queue will be polled at that interval while the
/// consumer queue is blocked.
///
/// For example, if the main queue's minimum poll interval is 200ms and
/// `Consumer.poll` is called with a timeout of 1s, then `Consumer.poll` may
/// block for up to 1s waiting for a message, but it will poll the main
/// queue every 200ms while it is waiting.
///
/// By default, the minimum poll interval for the main queue is 1s.
fn main_queue_min_poll_interval(&self) -> Timeout {
Timeout::After(Duration::from_secs(1))
}
}

/// An empty consumer context that can be user when no context is needed.
Expand Down
5 changes: 2 additions & 3 deletions src/consumer/stream_consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use crate::consumer::base_consumer::BaseConsumer;
use crate::consumer::{Consumer, ConsumerContext, DefaultConsumerContext};
use crate::error::{KafkaError, KafkaResult};
use crate::message::BorrowedMessage;
use crate::util::duration_to_millis;
use crate::util::Timeout;

use std::ptr;
use std::sync::atomic::{AtomicBool, Ordering};
Expand Down Expand Up @@ -116,10 +116,9 @@ fn poll_loop<C: ConsumerContext>(
) {
trace!("Polling thread loop started");
let mut curr_sender = sender;
let poll_interval_ms = duration_to_millis(poll_interval) as i32;
while !should_stop.load(Ordering::Relaxed) {
trace!("Polling base consumer");
let future_sender = match consumer.poll_raw(poll_interval_ms) {
let future_sender = match consumer.poll_raw(Timeout::After(poll_interval)) {
None => {
if send_none {
curr_sender.send(None)
Expand Down
10 changes: 10 additions & 0 deletions src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,16 @@ impl Timeout {
}
}

impl std::ops::SubAssign for Timeout {
fn sub_assign(&mut self, other: Self) {
match (self, other) {
(Timeout::After(lhs), Timeout::After(rhs)) => *lhs -= rhs,
(Timeout::Never, Timeout::After(_)) => (),
_ => panic!("subtraction of Timeout::Never is ill-defined"),
}
}
}

impl From<Duration> for Timeout {
fn from(d: Duration) -> Timeout {
Timeout::After(d)
Expand Down

0 comments on commit 24a28bb

Please sign in to comment.