From ed2a45d83befc465ef46459d7181457aef831603 Mon Sep 17 00:00:00 2001 From: Nikhil Benesch Date: Fri, 2 Feb 2024 01:38:09 -0500 Subject: [PATCH] client: avoid dropping context too early The change in c729f89 was not sound. Calling `rd_kafka_destroy` can result in callbacks executing against `context`, but the change in that commit allowed the context to be dropped before `rd_kafka_destroy` completed. This commit reworks the drop flow so that the context is kept alive until *after* `rd_kafka_destroy` is called, but preserves the key semantics of performing the drops on a background thread. --- src/admin.rs | 2 +- src/client.rs | 51 ++++++++++++++++++--------------- src/consumer/base_consumer.rs | 6 ++-- src/consumer/mod.rs | 2 +- src/consumer/stream_consumer.rs | 4 +-- src/producer/base_producer.rs | 2 +- src/producer/mod.rs | 2 +- 7 files changed, 37 insertions(+), 32 deletions(-) diff --git a/src/admin.rs b/src/admin.rs index da4101ab2..a1a6d3bcb 100644 --- a/src/admin.rs +++ b/src/admin.rs @@ -35,7 +35,7 @@ use crate::util::{cstr_to_owned, AsCArray, ErrBuf, IntoOpaque, KafkaDrop, Native /// /// `AdminClient` provides programmatic access to managing a Kafka cluster, /// notably manipulating topics, partitions, and configuration paramaters. -pub struct AdminClient { +pub struct AdminClient { client: Client, queue: Arc, should_stop: Arc, diff --git a/src/client.rs b/src/client.rs index da078d9cd..d88bbe3e3 100644 --- a/src/client.rs +++ b/src/client.rs @@ -16,7 +16,7 @@ use std::error::Error; use std::ffi::{CStr, CString}; use std::mem::ManuallyDrop; use std::os::raw::{c_char, c_void}; -use std::ptr; +use std::ptr::{self, NonNull}; use std::slice; use std::string::ToString; use std::sync::Arc; @@ -170,23 +170,7 @@ impl ClientContext for DefaultClientContext {} /// higher level `Client` or producers and consumers. // TODO(benesch): this should be `pub(crate)`. pub struct NativeClient { - ptr: NativePtr, -} - -unsafe impl KafkaDrop for RDKafka { - const TYPE: &'static str = "client"; - const DROP: unsafe extern "C" fn(*mut Self) = drop_rdkafka; -} - -unsafe extern "C" fn drop_rdkafka(ptr: *mut RDKafka) { - // We don't want the semantics of blocking the thread until the client shuts - // down (this involves waiting for offset commits, message production, - // rebalance callbacks), as this can cause deadlocks if the client is - // dropped from an async task that's scheduled on the same thread as an - // async task handling a librdkafka callback. So we destroy on a new thread - // that we know can't be handling any librdkafka callbacks. - let ptr = ptr as usize; - std::thread::spawn(move || rdsys::rd_kafka_destroy(ptr as *mut RDKafka)); + ptr: NonNull, } // The library is completely thread safe, according to the documentation. @@ -197,13 +181,13 @@ impl NativeClient { /// Wraps a pointer to an RDKafka object and returns a new NativeClient. pub(crate) unsafe fn from_ptr(ptr: *mut RDKafka) -> NativeClient { NativeClient { - ptr: NativePtr::from_ptr(ptr).unwrap(), + ptr: NonNull::new(ptr).unwrap(), } } /// Returns the wrapped pointer to RDKafka. pub fn ptr(&self) -> *mut RDKafka { - self.ptr.ptr() + self.ptr.as_ptr() } pub(crate) fn rebalance_protocol(&self) -> RebalanceProtocol { @@ -233,12 +217,12 @@ impl NativeClient { /// /// [`consumer`]: crate::consumer /// [`producer`]: crate::producer -pub struct Client { +pub struct Client { native: NativeClient, context: Arc, } -impl Client { +impl Client { /// Creates a new `Client` given a configuration, a client type and a context. pub fn new( config: &ClientConfig, @@ -299,7 +283,7 @@ impl Client { /// Returns a pointer to the native rdkafka-sys client. pub fn native_ptr(&self) -> *mut RDKafka { - self.native.ptr.ptr() + self.native.ptr() } /// Returns a reference to the context. @@ -449,6 +433,27 @@ impl Client { } } +impl Drop for Client { + fn drop(&mut self) { + // We don't want the semantics of blocking the thread until the client + // shuts down (this involves waiting for offset commits, message + // production, rebalance callbacks), as this can cause deadlocks if the + // client is dropped from an async task that's scheduled on the same + // thread as an async task handling a librdkafka callback. So we destroy + // on a new thread that we know can't be handling any librdkafka + // callbacks. + let context = Arc::clone(&self.context); + let ptr = self.native_ptr() as usize; + std::thread::spawn(move || { + unsafe { rdsys::rd_kafka_destroy(ptr as *mut RDKafka) } + // Ensure `context` is only dropped after `rd_kafka_destroy` + // returns, as the process of destruction may invoke callbacks on + // `context``. + drop(context); + }); + } +} + pub(crate) type NativeTopic = NativePtr; unsafe impl KafkaDrop for RDKafkaTopic { diff --git a/src/consumer/base_consumer.rs b/src/consumer/base_consumer.rs index d145a1743..c22eeaa29 100644 --- a/src/consumer/base_consumer.rs +++ b/src/consumer/base_consumer.rs @@ -66,7 +66,7 @@ unsafe extern "C" fn native_rebalance_cb( /// callbacks and to receive messages. pub struct BaseConsumer where - C: ConsumerContext, + C: ConsumerContext + 'static, { client: Client, main_queue_min_poll_interval: Timeout, @@ -546,7 +546,7 @@ where /// infinite timeout. pub struct Iter<'a, C>(&'a BaseConsumer) where - C: ConsumerContext; + C: ConsumerContext + 'static; impl<'a, C> Iterator for Iter<'a, C> where @@ -578,7 +578,7 @@ where /// A message queue for a single partition. pub struct PartitionQueue where - C: ConsumerContext, + C: ConsumerContext + 'static, { consumer: Arc>, queue: NativeQueue, diff --git a/src/consumer/mod.rs b/src/consumer/mod.rs index 037cb82b4..b7ce64109 100644 --- a/src/consumer/mod.rs +++ b/src/consumer/mod.rs @@ -207,7 +207,7 @@ pub enum RebalanceProtocol { /// objects, since they are generic. pub trait Consumer where - C: ConsumerContext, + C: ConsumerContext + 'static, { /// Returns the [`Client`] underlying this consumer. fn client(&self) -> &Client; diff --git a/src/consumer/stream_consumer.rs b/src/consumer/stream_consumer.rs index 325276611..113099b31 100644 --- a/src/consumer/stream_consumer.rs +++ b/src/consumer/stream_consumer.rs @@ -163,7 +163,7 @@ impl<'a> Drop for MessageStream<'a> { #[must_use = "Consumer polling thread will stop immediately if unused"] pub struct StreamConsumer where - C: ConsumerContext, + C: ConsumerContext + 'static, { base: BaseConsumer, wakers: Arc, @@ -525,7 +525,7 @@ where /// details. pub struct StreamPartitionQueue where - C: ConsumerContext, + C: ConsumerContext + 'static, { queue: NativeQueue, wakers: Arc, diff --git a/src/producer/base_producer.rs b/src/producer/base_producer.rs index ec9150d43..f877cb46a 100644 --- a/src/producer/base_producer.rs +++ b/src/producer/base_producer.rs @@ -273,7 +273,7 @@ where /// [`examples`]: https://github.com/fede1024/rust-rdkafka/blob/master/examples/ pub struct BaseProducer where - C: ProducerContext, + C: ProducerContext + 'static, { client_arc: Arc>, } diff --git a/src/producer/mod.rs b/src/producer/mod.rs index 7bd0c3e01..42be48418 100644 --- a/src/producer/mod.rs +++ b/src/producer/mod.rs @@ -216,7 +216,7 @@ impl ProducerContext for DefaultProducerContext { /// Common trait for all producers. pub trait Producer where - C: ProducerContext, + C: ProducerContext + 'static, { /// Returns the [`Client`] underlying this producer. fn client(&self) -> &Client;