Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add split_partition_queue to StreamConsumer #411

Merged
merged 3 commits into from
Nov 27, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 42 additions & 3 deletions changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,28 @@ See also the [rdkafka-sys changelog](rdkafka-sys/changelog.md).

## Unreleased

* Add the `StreamConsumer::split_partition_queue` method to mirror
`BaseConsumer::split_partition_queue` ([#411]).

Thanks to [@davidblewett], [@duarten], and [@nemosupremo] for contributing to
the implementation.

* **Breaking change.** Remove the `StreamConsumerContext` type and the
`ConsumerContext::message_queue_nonempty_callback` method. These were
essentially implementation details of `StreamConsumer` that had leaked into
the public API. The vast majority of users should be unaffected.

* **Breaking change.** Remove the type parameters from the `MessageStream` type.

* **Breaking change.** Add the received `TopicPartitionList` to the
`Rebalance::Revoke` variant, which is useful when using incremental
cooperative rebalancing ([#398]).

* Avoid crashing if librdkafka invokes the commit callback with a null
topic partition list ([#406]).

Thanks, [@thijsc].

* Add the new statistics fields in librdkafka v1.7.0 to the various statistics
types. The new fields are:

Expand Down Expand Up @@ -46,11 +68,28 @@ See also the [rdkafka-sys changelog](rdkafka-sys/changelog.md).
statistics from librdkafka. The default implementation calls
`ClientContext::stats` with the decoded statistics.

* **Breaking change.** Add the received `TopicPartitionList` to the
`Rebalance::Revoke` variant, which is useful when using incremental
cooperative rebalancing ([#398]).
* Add the `Default` trait to the statistics types: `Statistics`, `Broker`,
`Window`, `TopicPartition`, `Topic`, `Partition`, `ConsumerGroup`, and
`ExactlyOnceSemantics` ([#410]).

Thanks, [@scanterog].

* Add the `Debug` trait to `DefaultClientContext` and `DefaultConsumerContext`
([#401]).

Thanks, [@DXist].

[#398]: https://github.com/fede1024/rust-rdkafka/issues/398
[#401]: https://github.com/fede1024/rust-rdkafka/issues/401
[#406]: https://github.com/fede1024/rust-rdkafka/issues/406
[#410]: https://github.com/fede1024/rust-rdkafka/issues/410
[#411]: https://github.com/fede1024/rust-rdkafka/issues/411
[@davidblewett]: https://github.com/davidblewett
[@duarten]: https://github.com/duarten
[@DXist]: https://github.com/DXist
[@nemosupremo]: https://github.com/nemosupremo
[@scanterog]: https://github.com/scanterog
[@thijsc]: https://github.com/thijsc

<a name="0.27.0"></a>
## 0.27.0 (2021-10-17)
Expand Down
5 changes: 3 additions & 2 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,9 @@ pub trait ClientContext: Send + Sync {
}

// NOTE: when adding a new method, remember to add it to the
// StreamConsumerContext and FutureProducerContext as well.
// https://github.com/rust-lang/rfcs/pull/1406 will maybe help in the future.
// FutureProducerContext as well.
// https://github.com/rust-lang/rfcs/pull/1406 will maybe help in the
// future.
}

/// An empty [`ClientContext`] that can be used when no customizations are
Expand Down
38 changes: 4 additions & 34 deletions src/consumer/base_consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,25 +62,6 @@ unsafe extern "C" fn native_rebalance_cb<C: ConsumerContext>(
context.rebalance(&native_client, err, &mut tpl);
}

/// Native message queue nonempty callback. This callback will run whenever the
/// consumer's message queue switches from empty to nonempty.
unsafe extern "C" fn native_message_queue_nonempty_cb<C: ConsumerContext>(
_: *mut RDKafka,
opaque_ptr: *mut c_void,
) {
let context = &mut *(opaque_ptr as *mut C);

(*context).message_queue_nonempty_callback();
}

unsafe fn enable_nonempty_callback<C: ConsumerContext>(queue: &NativeQueue, context: &Arc<C>) {
rdsys::rd_kafka_queue_cb_event_enable(
queue.ptr(),
Some(native_message_queue_nonempty_cb::<C>),
Arc::as_ptr(context) as *mut c_void,
)
}

/// A low-level consumer that requires manual polling.
///
/// This consumer must be periodically polled to make progress on rebalancing,
Expand All @@ -91,7 +72,6 @@ where
{
client: Client<C>,
main_queue_min_poll_interval: Timeout,
_queue: Option<NativeQueue>,
}

impl FromClientConfig for BaseConsumer {
Expand Down Expand Up @@ -133,16 +113,9 @@ where
RDKafkaType::RD_KAFKA_CONSUMER,
context,
)?;
let queue = client.consumer_queue();
if let Some(queue) = &queue {
unsafe {
enable_nonempty_callback(queue, client.context());
}
}
Ok(BaseConsumer {
client,
main_queue_min_poll_interval,
_queue: queue,
})
}

Expand Down Expand Up @@ -269,10 +242,7 @@ where
))
};
queue.map(|queue| {
unsafe {
enable_nonempty_callback(&queue, self.client.context());
rdsys::rd_kafka_queue_forward(queue.ptr(), ptr::null_mut());
}
unsafe { rdsys::rd_kafka_queue_forward(queue.ptr(), ptr::null_mut()) }
PartitionQueue::new(self.clone(), queue)
})
}
Expand Down Expand Up @@ -623,15 +593,15 @@ pub struct PartitionQueue<C>
where
C: ConsumerContext,
{
consumer: Arc<BaseConsumer<C>>,
queue: NativeQueue,
pub(crate) consumer: Arc<BaseConsumer<C>>,
pub(crate) queue: NativeQueue,
}

impl<C> PartitionQueue<C>
where
C: ConsumerContext,
{
fn new(consumer: Arc<BaseConsumer<C>>, queue: NativeQueue) -> Self {
pub(crate) fn new(consumer: Arc<BaseConsumer<C>>, queue: NativeQueue) -> Self {
PartitionQueue { consumer, queue }
}

Expand Down
7 changes: 0 additions & 7 deletions src/consumer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,13 +130,6 @@ pub trait ConsumerContext: ClientContext {
fn main_queue_min_poll_interval(&self) -> Timeout {
Timeout::After(Duration::from_secs(1))
}

/// Message queue nonempty callback. This method will run when the
/// consumer's message queue switches from empty to nonempty.
fn message_queue_nonempty_callback(&self) {}

// NOTE: when adding a new method, remember to add it to the
// StreamConsumerContext as well.
}

/// An inert [`ConsumerContext`] that can be used when no customizations are
Expand Down
Loading