diff --git a/src/producer/base_producer.rs b/src/producer/base_producer.rs index f877cb46a..cf7f30cde 100644 --- a/src/producer/base_producer.rs +++ b/src/producer/base_producer.rs @@ -494,13 +494,14 @@ where /// queued events, such as delivery notifications. The thread will be /// automatically stopped when the producer is dropped. #[must_use = "The threaded producer will stop immediately if unused"] +#[derive(Clone)] pub struct ThreadedProducer where C: ProducerContext + 'static, { producer: BaseProducer, should_stop: Arc, - handle: Option>, + handle: Option>>, } impl FromClientConfig for ThreadedProducer { @@ -545,7 +546,7 @@ where Ok(ThreadedProducer { producer, should_stop, - handle: Some(thread), + handle: Some(Arc::new(thread)), }) } } @@ -628,7 +629,7 @@ where { fn drop(&mut self) { trace!("Destroy ThreadedProducer"); - if let Some(handle) = self.handle.take() { + if let Some(handle) = self.handle.take().and_then(Arc::into_inner) { trace!("Stopping polling"); self.should_stop.store(true, Ordering::Relaxed); trace!("Waiting for polling thread termination");