Skip to content

Commit

Permalink
Merge pull request #26 from MaterializeInc/clonable-threaded-producer…
Browse files Browse the repository at this point in the history
…-backport

make ThreadedProducer clonable like BaseProducer
  • Loading branch information
benesch authored Oct 3, 2024
2 parents 4a217ec + c0eb5c4 commit 3bec034
Showing 1 changed file with 4 additions and 3 deletions.
7 changes: 4 additions & 3 deletions src/producer/base_producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -494,13 +494,14 @@ where
/// queued events, such as delivery notifications. The thread will be
/// automatically stopped when the producer is dropped.
#[must_use = "The threaded producer will stop immediately if unused"]
#[derive(Clone)]
pub struct ThreadedProducer<C>
where
C: ProducerContext + 'static,
{
producer: BaseProducer<C>,
should_stop: Arc<AtomicBool>,
handle: Option<JoinHandle<()>>,
handle: Option<Arc<JoinHandle<()>>>,
}

impl FromClientConfig for ThreadedProducer<DefaultProducerContext> {
Expand Down Expand Up @@ -545,7 +546,7 @@ where
Ok(ThreadedProducer {
producer,
should_stop,
handle: Some(thread),
handle: Some(Arc::new(thread)),
})
}
}
Expand Down Expand Up @@ -628,7 +629,7 @@ where
{
fn drop(&mut self) {
trace!("Destroy ThreadedProducer");
if let Some(handle) = self.handle.take() {
if let Some(handle) = self.handle.take().and_then(Arc::into_inner) {
trace!("Stopping polling");
self.should_stop.store(true, Ordering::Relaxed);
trace!("Waiting for polling thread termination");
Expand Down

0 comments on commit 3bec034

Please sign in to comment.