Skip to content

Commit

Permalink
Merge pull request #411 from fede1024/davidblewett/partitionqueue-stream
Browse files Browse the repository at this point in the history
Add split_partition_queue to StreamConsumer
  • Loading branch information
benesch authored Nov 27, 2021
2 parents 6454372 + a0f6b6a commit 928f6f1
Show file tree
Hide file tree
Showing 8 changed files with 365 additions and 279 deletions.
45 changes: 42 additions & 3 deletions changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,28 @@ See also the [rdkafka-sys changelog](rdkafka-sys/changelog.md).

## Unreleased

* Add the `StreamConsumer::split_partition_queue` method to mirror
`BaseConsumer::split_partition_queue` ([#411]).

Thanks to [@davidblewett], [@duarten], and [@nemosupremo] for contributing to
the implementation.

* **Breaking change.** Remove the `StreamConsumerContext` type and the
`ConsumerContext::message_queue_nonempty_callback` method. These were
essentially implementation details of `StreamConsumer` that had leaked into
the public API. The vast majority of users should be unaffected.

* **Breaking change.** Remove the type parameters from the `MessageStream` type.

* **Breaking change.** Add the received `TopicPartitionList` to the
`Rebalance::Revoke` variant, which is useful when using incremental
cooperative rebalancing ([#398]).

* Avoid crashing if librdkafka invokes the commit callback with a null
topic partition list ([#406]).

Thanks, [@thijsc].

* Add the new statistics fields in librdkafka v1.7.0 to the various statistics
types. The new fields are:

Expand Down Expand Up @@ -46,11 +68,28 @@ See also the [rdkafka-sys changelog](rdkafka-sys/changelog.md).
statistics from librdkafka. The default implementation calls
`ClientContext::stats` with the decoded statistics.

* **Breaking change.** Add the received `TopicPartitionList` to the
`Rebalance::Revoke` variant, which is useful when using incremental
cooperative rebalancing ([#398]).
* Add the `Default` trait to the statistics types: `Statistics`, `Broker`,
`Window`, `TopicPartition`, `Topic`, `Partition`, `ConsumerGroup`, and
`ExactlyOnceSemantics` ([#410]).

Thanks, [@scanterog].

* Add the `Debug` trait to `DefaultClientContext` and `DefaultConsumerContext`
([#401]).

Thanks, [@DXist].

[#398]: https://github.com/fede1024/rust-rdkafka/issues/398
[#401]: https://github.com/fede1024/rust-rdkafka/issues/401
[#406]: https://github.com/fede1024/rust-rdkafka/issues/406
[#410]: https://github.com/fede1024/rust-rdkafka/issues/410
[#411]: https://github.com/fede1024/rust-rdkafka/issues/411
[@davidblewett]: https://github.com/davidblewett
[@duarten]: https://github.com/duarten
[@DXist]: https://github.com/DXist
[@nemosupremo]: https://github.com/nemosupremo
[@scanterog]: https://github.com/scanterog
[@thijsc]: https://github.com/thijsc

<a name="0.27.0"></a>
## 0.27.0 (2021-10-17)
Expand Down
5 changes: 3 additions & 2 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,9 @@ pub trait ClientContext: Send + Sync {
}

// NOTE: when adding a new method, remember to add it to the
// StreamConsumerContext and FutureProducerContext as well.
// https://github.com/rust-lang/rfcs/pull/1406 will maybe help in the future.
// FutureProducerContext as well.
// https://github.com/rust-lang/rfcs/pull/1406 will maybe help in the
// future.
}

/// An empty [`ClientContext`] that can be used when no customizations are
Expand Down
38 changes: 4 additions & 34 deletions src/consumer/base_consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,25 +62,6 @@ unsafe extern "C" fn native_rebalance_cb<C: ConsumerContext>(
context.rebalance(&native_client, err, &mut tpl);
}

/// Native message queue nonempty callback. This callback will run whenever the
/// consumer's message queue switches from empty to nonempty.
unsafe extern "C" fn native_message_queue_nonempty_cb<C: ConsumerContext>(
_: *mut RDKafka,
opaque_ptr: *mut c_void,
) {
let context = &mut *(opaque_ptr as *mut C);

(*context).message_queue_nonempty_callback();
}

unsafe fn enable_nonempty_callback<C: ConsumerContext>(queue: &NativeQueue, context: &Arc<C>) {
rdsys::rd_kafka_queue_cb_event_enable(
queue.ptr(),
Some(native_message_queue_nonempty_cb::<C>),
Arc::as_ptr(context) as *mut c_void,
)
}

/// A low-level consumer that requires manual polling.
///
/// This consumer must be periodically polled to make progress on rebalancing,
Expand All @@ -91,7 +72,6 @@ where
{
client: Client<C>,
main_queue_min_poll_interval: Timeout,
_queue: Option<NativeQueue>,
}

impl FromClientConfig for BaseConsumer {
Expand Down Expand Up @@ -133,16 +113,9 @@ where
RDKafkaType::RD_KAFKA_CONSUMER,
context,
)?;
let queue = client.consumer_queue();
if let Some(queue) = &queue {
unsafe {
enable_nonempty_callback(queue, client.context());
}
}
Ok(BaseConsumer {
client,
main_queue_min_poll_interval,
_queue: queue,
})
}

Expand Down Expand Up @@ -269,10 +242,7 @@ where
))
};
queue.map(|queue| {
unsafe {
enable_nonempty_callback(&queue, self.client.context());
rdsys::rd_kafka_queue_forward(queue.ptr(), ptr::null_mut());
}
unsafe { rdsys::rd_kafka_queue_forward(queue.ptr(), ptr::null_mut()) }
PartitionQueue::new(self.clone(), queue)
})
}
Expand Down Expand Up @@ -623,15 +593,15 @@ pub struct PartitionQueue<C>
where
C: ConsumerContext,
{
consumer: Arc<BaseConsumer<C>>,
queue: NativeQueue,
pub(crate) consumer: Arc<BaseConsumer<C>>,
pub(crate) queue: NativeQueue,
}

impl<C> PartitionQueue<C>
where
C: ConsumerContext,
{
fn new(consumer: Arc<BaseConsumer<C>>, queue: NativeQueue) -> Self {
pub(crate) fn new(consumer: Arc<BaseConsumer<C>>, queue: NativeQueue) -> Self {
PartitionQueue { consumer, queue }
}

Expand Down
7 changes: 0 additions & 7 deletions src/consumer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,13 +130,6 @@ pub trait ConsumerContext: ClientContext {
fn main_queue_min_poll_interval(&self) -> Timeout {
Timeout::After(Duration::from_secs(1))
}

/// Message queue nonempty callback. This method will run when the
/// consumer's message queue switches from empty to nonempty.
fn message_queue_nonempty_callback(&self) {}

// NOTE: when adding a new method, remember to add it to the
// StreamConsumerContext as well.
}

/// An inert [`ConsumerContext`] that can be used when no customizations are
Expand Down
Loading

0 comments on commit 928f6f1

Please sign in to comment.