Skip to content

Commit

Permalink
client: avoid dropping context too early
Browse files Browse the repository at this point in the history
The change in c729f89 was not sound. Calling `rd_kafka_destroy` can
result in callbacks executing against `context`, but the change in that
commit allowed the context to be dropped before `rd_kafka_destroy`
completed.

This commit reworks the drop flow so that the context is kept alive
until *after* `rd_kafka_destroy` is called, but preserves the key
semantics of performing the drops on a background thread.
  • Loading branch information
benesch committed Feb 2, 2024
1 parent 357983e commit ed2a45d
Show file tree
Hide file tree
Showing 7 changed files with 37 additions and 32 deletions.
2 changes: 1 addition & 1 deletion src/admin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use crate::util::{cstr_to_owned, AsCArray, ErrBuf, IntoOpaque, KafkaDrop, Native
///
/// `AdminClient` provides programmatic access to managing a Kafka cluster,
/// notably manipulating topics, partitions, and configuration paramaters.
pub struct AdminClient<C: ClientContext> {
pub struct AdminClient<C: ClientContext + 'static> {
client: Client<C>,
queue: Arc<NativeQueue>,
should_stop: Arc<AtomicBool>,
Expand Down
51 changes: 28 additions & 23 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use std::error::Error;
use std::ffi::{CStr, CString};
use std::mem::ManuallyDrop;
use std::os::raw::{c_char, c_void};
use std::ptr;
use std::ptr::{self, NonNull};
use std::slice;
use std::string::ToString;
use std::sync::Arc;
Expand Down Expand Up @@ -170,23 +170,7 @@ impl ClientContext for DefaultClientContext {}
/// higher level `Client` or producers and consumers.
// TODO(benesch): this should be `pub(crate)`.
pub struct NativeClient {
ptr: NativePtr<RDKafka>,
}

unsafe impl KafkaDrop for RDKafka {
const TYPE: &'static str = "client";
const DROP: unsafe extern "C" fn(*mut Self) = drop_rdkafka;
}

unsafe extern "C" fn drop_rdkafka(ptr: *mut RDKafka) {
// We don't want the semantics of blocking the thread until the client shuts
// down (this involves waiting for offset commits, message production,
// rebalance callbacks), as this can cause deadlocks if the client is
// dropped from an async task that's scheduled on the same thread as an
// async task handling a librdkafka callback. So we destroy on a new thread
// that we know can't be handling any librdkafka callbacks.
let ptr = ptr as usize;
std::thread::spawn(move || rdsys::rd_kafka_destroy(ptr as *mut RDKafka));
ptr: NonNull<RDKafka>,
}

// The library is completely thread safe, according to the documentation.
Expand All @@ -197,13 +181,13 @@ impl NativeClient {
/// Wraps a pointer to an RDKafka object and returns a new NativeClient.
pub(crate) unsafe fn from_ptr(ptr: *mut RDKafka) -> NativeClient {
NativeClient {
ptr: NativePtr::from_ptr(ptr).unwrap(),
ptr: NonNull::new(ptr).unwrap(),
}
}

/// Returns the wrapped pointer to RDKafka.
pub fn ptr(&self) -> *mut RDKafka {
self.ptr.ptr()
self.ptr.as_ptr()
}

pub(crate) fn rebalance_protocol(&self) -> RebalanceProtocol {
Expand Down Expand Up @@ -233,12 +217,12 @@ impl NativeClient {
///
/// [`consumer`]: crate::consumer
/// [`producer`]: crate::producer
pub struct Client<C: ClientContext = DefaultClientContext> {
pub struct Client<C: ClientContext + 'static = DefaultClientContext> {
native: NativeClient,
context: Arc<C>,
}

impl<C: ClientContext> Client<C> {
impl<C: ClientContext + 'static> Client<C> {
/// Creates a new `Client` given a configuration, a client type and a context.
pub fn new(
config: &ClientConfig,
Expand Down Expand Up @@ -299,7 +283,7 @@ impl<C: ClientContext> Client<C> {

/// Returns a pointer to the native rdkafka-sys client.
pub fn native_ptr(&self) -> *mut RDKafka {
self.native.ptr.ptr()
self.native.ptr()
}

/// Returns a reference to the context.
Expand Down Expand Up @@ -449,6 +433,27 @@ impl<C: ClientContext> Client<C> {
}
}

impl<C: ClientContext + 'static> Drop for Client<C> {
fn drop(&mut self) {
// We don't want the semantics of blocking the thread until the client
// shuts down (this involves waiting for offset commits, message
// production, rebalance callbacks), as this can cause deadlocks if the
// client is dropped from an async task that's scheduled on the same
// thread as an async task handling a librdkafka callback. So we destroy
// on a new thread that we know can't be handling any librdkafka
// callbacks.
let context = Arc::clone(&self.context);
let ptr = self.native_ptr() as usize;
std::thread::spawn(move || {
unsafe { rdsys::rd_kafka_destroy(ptr as *mut RDKafka) }
// Ensure `context` is only dropped after `rd_kafka_destroy`
// returns, as the process of destruction may invoke callbacks on
// `context``.
drop(context);
});
}
}

pub(crate) type NativeTopic = NativePtr<RDKafkaTopic>;

unsafe impl KafkaDrop for RDKafkaTopic {
Expand Down
6 changes: 3 additions & 3 deletions src/consumer/base_consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ unsafe extern "C" fn native_rebalance_cb<C: ConsumerContext>(
/// callbacks and to receive messages.
pub struct BaseConsumer<C = DefaultConsumerContext>
where
C: ConsumerContext,
C: ConsumerContext + 'static,
{
client: Client<C>,
main_queue_min_poll_interval: Timeout,
Expand Down Expand Up @@ -546,7 +546,7 @@ where
/// infinite timeout.
pub struct Iter<'a, C>(&'a BaseConsumer<C>)
where
C: ConsumerContext;
C: ConsumerContext + 'static;

impl<'a, C> Iterator for Iter<'a, C>
where
Expand Down Expand Up @@ -578,7 +578,7 @@ where
/// A message queue for a single partition.
pub struct PartitionQueue<C>
where
C: ConsumerContext,
C: ConsumerContext + 'static,
{
consumer: Arc<BaseConsumer<C>>,
queue: NativeQueue,
Expand Down
2 changes: 1 addition & 1 deletion src/consumer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ pub enum RebalanceProtocol {
/// objects, since they are generic.
pub trait Consumer<C = DefaultConsumerContext>
where
C: ConsumerContext,
C: ConsumerContext + 'static,
{
/// Returns the [`Client`] underlying this consumer.
fn client(&self) -> &Client<C>;
Expand Down
4 changes: 2 additions & 2 deletions src/consumer/stream_consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ impl<'a> Drop for MessageStream<'a> {
#[must_use = "Consumer polling thread will stop immediately if unused"]
pub struct StreamConsumer<C = DefaultConsumerContext, R = DefaultRuntime>
where
C: ConsumerContext,
C: ConsumerContext + 'static,
{
base: BaseConsumer<C>,
wakers: Arc<WakerSlab>,
Expand Down Expand Up @@ -525,7 +525,7 @@ where
/// details.
pub struct StreamPartitionQueue<C, R = DefaultRuntime>
where
C: ConsumerContext,
C: ConsumerContext + 'static,
{
queue: NativeQueue,
wakers: Arc<WakerSlab>,
Expand Down
2 changes: 1 addition & 1 deletion src/producer/base_producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ where
/// [`examples`]: https://github.com/fede1024/rust-rdkafka/blob/master/examples/
pub struct BaseProducer<C = DefaultProducerContext>
where
C: ProducerContext,
C: ProducerContext + 'static,
{
client_arc: Arc<Client<C>>,
}
Expand Down
2 changes: 1 addition & 1 deletion src/producer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ impl ProducerContext for DefaultProducerContext {
/// Common trait for all producers.
pub trait Producer<C = DefaultProducerContext>
where
C: ProducerContext,
C: ProducerContext + 'static,
{
/// Returns the [`Client`] underlying this producer.
fn client(&self) -> &Client<C>;
Expand Down

0 comments on commit ed2a45d

Please sign in to comment.