Skip to content

Commit

Permalink
Add split_partition_queue to StreamConsumer
Browse files Browse the repository at this point in the history
Co-authored-by: Duarte Nunes <[email protected]>
Co-authored-by: Nikhil Benesch <[email protected]>
  • Loading branch information
3 people committed Nov 27, 2021
1 parent baddfe2 commit 164e22a
Show file tree
Hide file tree
Showing 4 changed files with 255 additions and 43 deletions.
5 changes: 5 additions & 0 deletions changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,11 @@ See also the [rdkafka-sys changelog](rdkafka-sys/changelog.md).
essentially implementation details of `StreamConsumer` that had leaked into
the public API. The vast majority of users should be unaffected.

* **Breaking change.** The `MessageStream` type is no longer generic.

* Add the `StreamConsumer::split_partition_queue` method to mirror
`BaseConsumer::split_partition_queue`.

[#398]: https://github.com/fede1024/rust-rdkafka/issues/398

<a name="0.27.0"></a>
Expand Down
6 changes: 3 additions & 3 deletions src/consumer/base_consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -593,15 +593,15 @@ pub struct PartitionQueue<C>
where
C: ConsumerContext,
{
consumer: Arc<BaseConsumer<C>>,
queue: NativeQueue,
pub(crate) consumer: Arc<BaseConsumer<C>>,
pub(crate) queue: NativeQueue,
}

impl<C> PartitionQueue<C>
where
C: ConsumerContext,
{
fn new(consumer: Arc<BaseConsumer<C>>, queue: NativeQueue) -> Self {
pub(crate) fn new(consumer: Arc<BaseConsumer<C>>, queue: NativeQueue) -> Self {
PartitionQueue { consumer, queue }
}

Expand Down
185 changes: 147 additions & 38 deletions src/consumer/stream_consumer.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
//! High-level consumers with a [`Stream`](futures::Stream) interface.
use std::ffi::CString;
use std::marker::PhantomData;
use std::os::raw::c_void;
use std::pin::Pin;
use std::ptr;
use std::sync::{Arc, Mutex};
use std::task::{Context, Poll, Waker};
use std::time::Duration;
Expand Down Expand Up @@ -83,36 +85,31 @@ impl WakerSlab {
/// A stream of messages from a [`StreamConsumer`].
///
/// See the documentation of [`StreamConsumer::stream`] for details.
pub struct MessageStream<'a, C, R = DefaultRuntime>
where
C: ConsumerContext + 'static,
{
consumer: &'a StreamConsumer<C, R>,
pub struct MessageStream<'a> {
wakers: &'a WakerSlab,
queue: &'a NativeQueue,
slot: usize,
}

impl<'a, C, R> MessageStream<'a, C, R>
where
C: ConsumerContext + 'static,
{
fn new(consumer: &'a StreamConsumer<C, R>) -> MessageStream<'a, C, R> {
let slot = consumer.wakers.register();
MessageStream { consumer, slot }
impl<'a> MessageStream<'a> {
fn new(wakers: &'a WakerSlab, queue: &'a NativeQueue) -> MessageStream<'a> {
let slot = wakers.register();
MessageStream {
wakers,
queue,
slot,
}
}

fn poll(&self) -> Option<KafkaResult<BorrowedMessage<'a>>> {
let client_ptr = self.consumer.client().native_ptr();
unsafe {
NativePtr::from_ptr(rdsys::rd_kafka_consumer_poll(client_ptr, 0))
.map(|p| BorrowedMessage::from_consumer(p, self.consumer))
NativePtr::from_ptr(rdsys::rd_kafka_consume_queue(self.queue.ptr(), 0))
.map(|p| BorrowedMessage::from_consumer(p, self.queue))
}
}
}

impl<'a, C, R> Stream for MessageStream<'a, C, R>
where
C: ConsumerContext + 'a,
{
impl<'a> Stream for MessageStream<'a> {
type Item = KafkaResult<BorrowedMessage<'a>>;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Expand All @@ -126,9 +123,7 @@ where
// the waker so that we are woken up if the queue flips from non-empty
// to empty. We have to store the waker repatedly in case this future
// migrates between tasks.
self.consumer
.wakers
.set_waker(self.slot, cx.waker().clone());
self.wakers.set_waker(self.slot, cx.waker().clone());

// Check whether a new message became available after we installed the
// waker. This avoids a race where `poll` returns None to indicate that
Expand All @@ -141,12 +136,9 @@ where
}
}

impl<'a, C, R> Drop for MessageStream<'a, C, R>
where
C: ConsumerContext + 'static,
{
impl<'a> Drop for MessageStream<'a> {
fn drop(&mut self) {
self.consumer.wakers.unregister(self.slot);
self.wakers.unregister(self.slot);
}
}

Expand All @@ -171,7 +163,7 @@ where
{
base: BaseConsumer<C>,
wakers: Arc<WakerSlab>,
_queue: NativeQueue,
queue: NativeQueue,
_shutdown_trigger: oneshot::Sender<()>,
_runtime: PhantomData<R>,
}
Expand Down Expand Up @@ -209,12 +201,9 @@ where
// rebalancings and stats.
unsafe { rdsys::rd_kafka_poll_set_consumer(base.client().native_ptr()) };

let queue = base
.client()
.consumer_queue()
.ok_or_else(|| KafkaError::ClientCreation(
"librdkafka failed to create consumer queue".into(),
))?;
let queue = base.client().consumer_queue().ok_or_else(|| {
KafkaError::ClientCreation("librdkafka failed to create consumer queue".into())
})?;
let wakers = Arc::new(WakerSlab::new());
unsafe { enable_nonempty_callback(&queue, &wakers) }

Expand Down Expand Up @@ -246,7 +235,7 @@ where
Ok(StreamConsumer {
base,
wakers,
_queue: queue,
queue,
_shutdown_trigger: shutdown_trigger,
_runtime: PhantomData,
})
Expand All @@ -269,13 +258,13 @@ where
///
/// If you want multiple independent views of a Kafka topic, create multiple
/// consumers, not multiple message streams.
pub fn stream(&self) -> MessageStream<'_, C, R> {
MessageStream::new(self)
pub fn stream(&self) -> MessageStream<'_> {
MessageStream::new(&self.wakers, &self.queue)
}

/// Constructs a stream that yields messages from this consumer.
#[deprecated = "use the more clearly named \"StreamConsumer::stream\" method instead"]
pub fn start(&self) -> MessageStream<'_, C, R> {
pub fn start(&self) -> MessageStream<'_> {
self.stream()
}

Expand All @@ -301,6 +290,69 @@ where
.await
.expect("kafka streams never terminate")
}

/// Splits messages for the specified partition into their own stream.
///
/// If the `topic` or `partition` is invalid, returns `None`.
///
/// After calling this method, newly-fetched messages for the specified
/// partition will be returned via [`StreamPartitionQueue::recv`] rather
/// than [`StreamConsumer::recv`]. Note that there may be buffered messages
/// for the specified partition that will continue to be returned by
/// `StreamConsumer::recv`. For best results, call `split_partition_queue`
/// before the first call to
/// `StreamConsumer::recv`.
///
/// You must periodically await `StreamConsumer::recv`, even if no messages
/// are expected, to serve callbacks. Consider using a background task like:
///
/// ```
/// # use rdkafka::consumer::StreamConsumer;
/// # async fn example(stream_consumer: StreamConsumer) {
/// tokio::spawn(async {
/// let message = stream_consumer.recv().await;
/// panic!("main stream consumer queue unexpectedly received message: {:?}", message);
/// })
/// # }
/// ```
///
/// Note that calling [`Consumer::assign`] will deactivate any existing
/// partition queues. You will need to call this method for every partition
/// that should be split after every call to `assign`.
///
/// Beware that this method is implemented for `&Arc<Self>`, not `&self`.
/// You will need to wrap your consumer in an `Arc` in order to call this
/// method. This design permits moving the partition queue to another thread
/// while ensuring the partition queue does not outlive the consumer.
pub fn split_partition_queue<'a>(
self: &'a Arc<Self>,
topic: &str,
partition: i32,
) -> Option<StreamPartitionQueue<C, R>> {
let topic = match CString::new(topic) {
Ok(topic) => topic,
Err(_) => return None,
};
let queue = unsafe {
NativeQueue::from_ptr(rdsys::rd_kafka_queue_get_partition(
self.base.client().native_ptr(),
topic.as_ptr(),
partition,
))
};
queue.map(|queue| {
let wakers = Arc::new(WakerSlab::new());
unsafe {
rdsys::rd_kafka_queue_forward(queue.ptr(), ptr::null_mut());
enable_nonempty_callback(&queue, &wakers);
}
StreamPartitionQueue {
queue,
wakers,
_consumer: self.clone(),
}
})
}
}

impl<C, R> Consumer<C> for StreamConsumer<C, R>
Expand Down Expand Up @@ -461,3 +513,60 @@ where
self.base.rebalance_protocol()
}
}

/// A message queue for a single partition of a [`StreamConsumer`].
///
/// See the documentation of [`StreamConsumer::split_partition_queue`] for
/// details.
pub struct StreamPartitionQueue<C, R = DefaultRuntime>
where
C: ConsumerContext,
{
queue: NativeQueue,
wakers: Arc<WakerSlab>,
_consumer: Arc<StreamConsumer<C, R>>,
}

impl<C, R> StreamPartitionQueue<C, R>
where
C: ConsumerContext,
{
/// Constructs a stream that yields messages from this partition.
///
/// It is legal to have multiple live message streams for the same
/// partition, and to move those message streams across threads. Note,
/// however, that the message streams share the same underlying state. A
/// message received by the partition will be delivered to only one of the
/// live message streams. If you seek the underlying partition, all message
/// streams created from the partition will begin to draw messages from the
/// new position of the partition.
///
/// If you want multiple independent views of a Kafka partition, create
/// multiple consumers, not multiple partition streams.
pub fn stream(&self) -> MessageStream<'_> {
MessageStream::new(&self.wakers, &self.queue)
}

/// Receives the next message from the stream.
///
/// This method will block until the next message is available or an error
/// occurs. It is legal to call `recv` from multiple threads simultaneously.
///
/// Note that this method is exactly as efficient as constructing a
/// single-use message stream and extracting one message from it:
///
/// ```
/// use futures::stream::StreamExt;
/// # use rdkafka::consumer::StreamConsumer;
///
/// # async fn example(partition_queue: StreamPartitionQueue) {
/// partition_queue.stream().next().await.expect("MessageStream never returns None");
/// # }
/// ```
pub async fn recv(&self) -> Result<BorrowedMessage<'_>, KafkaError> {
self.stream()
.next()
.await
.expect("kafka streams never terminate")
}
}
Loading

0 comments on commit 164e22a

Please sign in to comment.