Skip to content

Commit

Permalink
Don't block thread when dropping Kafka client
Browse files Browse the repository at this point in the history
This can lead to deadlocks if a Kafka client is dropped in an async task
that is scheduled on the same thread as an async task that is handling a
librdkafka callback.
  • Loading branch information
benesch committed Jan 25, 2024
1 parent 8ea07c4 commit c729f89
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 17 deletions.
10 changes: 5 additions & 5 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 12 additions & 1 deletion src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,18 @@ pub struct NativeClient {

unsafe impl KafkaDrop for RDKafka {
const TYPE: &'static str = "client";
const DROP: unsafe extern "C" fn(*mut Self) = rdsys::rd_kafka_destroy;
const DROP: unsafe extern "C" fn(*mut Self) = drop_rdkafka;
}

unsafe extern "C" fn drop_rdkafka(ptr: *mut RDKafka) {
// We don't want the semantics of blocking the thread until the client shuts
// down (this involves waiting for offset commits, message production,
// rebalance callbacks), as this can cause deadlocks if the client is
// dropped from an async task that's scheduled on the same thread as an
// async task handling a librdkafka callback. So we destroy on a new thread
// that we know can't be handling any librdkafka callbacks.
let ptr = ptr as usize;
std::thread::spawn(move || rdsys::rd_kafka_destroy(ptr as *mut RDKafka));
}

// The library is completely thread safe, according to the documentation.
Expand Down
11 changes: 0 additions & 11 deletions src/consumer/base_consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -541,17 +541,6 @@ where
}
}

impl<C> Drop for BaseConsumer<C>
where
C: ConsumerContext,
{
fn drop(&mut self) {
trace!("Destroying consumer: {:?}", self.client.native_ptr()); // TODO: fix me (multiple executions ?)
unsafe { rdsys::rd_kafka_consumer_close(self.client.native_ptr()) };
trace!("Consumer destroyed: {:?}", self.client.native_ptr());
}
}

/// A convenience iterator over the messages in a [`BaseConsumer`].
///
/// Each call to [`Iter::next`] simply calls [`BaseConsumer::poll`] with an
Expand Down

0 comments on commit c729f89

Please sign in to comment.