From 2559620724d3eae9ee783165e6ad2957413ea04b Mon Sep 17 00:00:00 2001 From: Ivan Ladelshchikov Date: Tue, 12 Nov 2019 15:46:05 +0400 Subject: [PATCH] store the thread's handle with arbiter --- actix-rt/CHANGES.md | 4 ++++ actix-rt/src/arbiter.rs | 45 +++++++++++++++++++++++++++++++---------- 2 files changed, 38 insertions(+), 11 deletions(-) diff --git a/actix-rt/CHANGES.md b/actix-rt/CHANGES.md index 98f6a60173..925aedcb8b 100644 --- a/actix-rt/CHANGES.md +++ b/actix-rt/CHANGES.md @@ -6,6 +6,10 @@ * Fix arbiter's thread panic message. +### Added + +* Allow to join arbiter's thread. #60 + ## [0.2.5] - 2019-09-02 diff --git a/actix-rt/src/arbiter.rs b/actix-rt/src/arbiter.rs index 083105b34e..c3357a8ab4 100644 --- a/actix-rt/src/arbiter.rs +++ b/actix-rt/src/arbiter.rs @@ -39,11 +39,20 @@ impl fmt::Debug for ArbiterCommand { } } -#[derive(Debug, Clone)] +#[derive(Debug)] /// Arbiters provide an asynchronous execution environment for actors, functions /// and futures. When an Arbiter is created, they spawn a new OS thread, and /// host an event loop. Some Arbiter functions execute on the current thread. -pub struct Arbiter(UnboundedSender); +pub struct Arbiter { + sender: UnboundedSender, + thread_handle: Option>, +} + +impl Clone for Arbiter { + fn clone(&self) -> Self { + Self::with_sender(self.sender.clone()) + } +} impl Default for Arbiter { fn default() -> Self { @@ -55,7 +64,7 @@ impl Arbiter { pub(crate) fn new_system() -> Self { let (tx, rx) = unbounded(); - let arb = Arbiter(tx); + let arb = Arbiter::with_sender(tx); ADDR.with(|cell| *cell.borrow_mut() = Some(arb.clone())); RUNNING.with(|cell| cell.set(false)); STORAGE.with(|cell| cell.borrow_mut().clear()); @@ -75,7 +84,7 @@ impl Arbiter { /// Stop arbiter from continuing it's event loop. pub fn stop(&self) { - let _ = self.0.unbounded_send(ArbiterCommand::Stop); + let _ = self.sender.unbounded_send(ArbiterCommand::Stop); } /// Spawn new thread and run event loop in spawned thread. @@ -87,9 +96,9 @@ impl Arbiter { let (arb_tx, arb_rx) = unbounded(); let arb_tx2 = arb_tx.clone(); - let _ = thread::Builder::new().name(name.clone()).spawn(move || { + let handle = thread::Builder::new().name(name.clone()).spawn(move || { let mut rt = Builder::new().build_rt().expect("Can not create Runtime"); - let arb = Arbiter(arb_tx); + let arb = Arbiter::with_sender(arb_tx); let (stop, stop_rx) = channel(); RUNNING.with(|cell| cell.set(true)); @@ -119,9 +128,9 @@ impl Arbiter { let _ = System::current() .sys() .unbounded_send(SystemCommand::UnregisterArbiter(id)); - }); + }).unwrap_or_else(|err| panic!("Cannot spawn an arbiter's thread {:?}: {:?}", &name, err)); - Arbiter(arb_tx2) + Arbiter{sender: arb_tx2, thread_handle: Some(handle)} } pub(crate) fn run_system() { @@ -171,7 +180,7 @@ impl Arbiter { F: Future + Send + 'static, { let _ = self - .0 + .sender .unbounded_send(ArbiterCommand::Execute(Box::new(future))); } @@ -182,7 +191,7 @@ impl Arbiter { F: FnOnce() + Send + 'static, { let _ = self - .0 + .sender .unbounded_send(ArbiterCommand::ExecuteFn(Box::new(move || { f(); }))); @@ -198,7 +207,7 @@ impl Arbiter { { let (tx, rx) = channel(); let _ = self - .0 + .sender .unbounded_send(ArbiterCommand::ExecuteFn(Box::new(move || { if !tx.is_canceled() { let _ = tx.send(f()); @@ -250,6 +259,20 @@ impl Arbiter { f(item) }) } + + fn with_sender(sender: UnboundedSender) -> Self { + Self{sender, thread_handle: None} + } + + /// Wait for the event loop to stop by joining the underlying thread (if have Some). + pub fn join(&mut self) -> thread::Result<()>{ + if let Some(thread_handle) = self.thread_handle.take() { + thread_handle.join() + } + else { + Ok(()) + } + } } struct ArbiterController {