diff --git a/Cargo.lock b/Cargo.lock index 8f986059e..5c4284779 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1339,21 +1339,21 @@ dependencies = [ [[package]] name = "tokio" -version = "1.0.1" +version = "1.19.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d258221f566b6c803c7b4714abadc080172b272090cdc5e244a6d4dd13c3a6bd" +checksum = "c51a52ed6686dd62c320f9b89299e9dfb46f730c7a48e635c19f21d116cb1439" dependencies = [ - "autocfg 1.0.0", "num_cpus", + "once_cell", "pin-project-lite", "tokio-macros", ] [[package]] name = "tokio-macros" -version = "1.0.0" +version = "1.8.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "42517d2975ca3114b22a16192634e8241dc5cc1f130be194645970cc1c371494" +checksum = "d266c00fde287f55d3f1c3e96c500c362a2b8c695076ec180f27918820bc6df8" dependencies = [ "proc-macro2", "quote", diff --git a/src/client.rs b/src/client.rs index d0c6d6fa2..cf6dc08f3 100644 --- a/src/client.rs +++ b/src/client.rs @@ -175,7 +175,18 @@ pub struct NativeClient { unsafe impl KafkaDrop for RDKafka { const TYPE: &'static str = "client"; - const DROP: unsafe extern "C" fn(*mut Self) = rdsys::rd_kafka_destroy; + const DROP: unsafe extern "C" fn(*mut Self) = drop_rdkafka; +} + +unsafe extern "C" fn drop_rdkafka(ptr: *mut RDKafka) { + // We don't want the semantics of blocking the thread until the client shuts + // down (this involves waiting for offset commits, message production, + // rebalance callbacks), as this can cause deadlocks if the client is + // dropped from an async task that's scheduled on the same thread as an + // async task handling a librdkafka callback. So we destroy on a new thread + // that we know can't be handling any librdkafka callbacks. + let ptr = ptr as usize; + std::thread::spawn(move || rdsys::rd_kafka_destroy(ptr as *mut RDKafka)); } // The library is completely thread safe, according to the documentation. diff --git a/src/consumer/base_consumer.rs b/src/consumer/base_consumer.rs index 623a7127f..af8e4b825 100644 --- a/src/consumer/base_consumer.rs +++ b/src/consumer/base_consumer.rs @@ -541,17 +541,6 @@ where } } -impl Drop for BaseConsumer -where - C: ConsumerContext, -{ - fn drop(&mut self) { - trace!("Destroying consumer: {:?}", self.client.native_ptr()); // TODO: fix me (multiple executions ?) - unsafe { rdsys::rd_kafka_consumer_close(self.client.native_ptr()) }; - trace!("Consumer destroyed: {:?}", self.client.native_ptr()); - } -} - /// A convenience iterator over the messages in a [`BaseConsumer`]. /// /// Each call to [`Iter::next`] simply calls [`BaseConsumer::poll`] with an