Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow to join the Arbiter #60

Merged
merged 1 commit into from
Nov 14, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions actix-rt/CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@

* Fix arbiter's thread panic message.

### Added

* Allow to join arbiter's thread. #60


## [0.2.5] - 2019-09-02

Expand Down
45 changes: 34 additions & 11 deletions actix-rt/src/arbiter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,20 @@ impl fmt::Debug for ArbiterCommand {
}
}

#[derive(Debug, Clone)]
#[derive(Debug)]
/// Arbiters provide an asynchronous execution environment for actors, functions
/// and futures. When an Arbiter is created, they spawn a new OS thread, and
/// host an event loop. Some Arbiter functions execute on the current thread.
pub struct Arbiter(UnboundedSender<ArbiterCommand>);
pub struct Arbiter {
sender: UnboundedSender<ArbiterCommand>,
thread_handle: Option<thread::JoinHandle<()>>,
}

impl Clone for Arbiter {
fn clone(&self) -> Self {
Self::with_sender(self.sender.clone())
}
}

impl Default for Arbiter {
fn default() -> Self {
Expand All @@ -55,7 +64,7 @@ impl Arbiter {
pub(crate) fn new_system() -> Self {
let (tx, rx) = unbounded();

let arb = Arbiter(tx);
let arb = Arbiter::with_sender(tx);
ADDR.with(|cell| *cell.borrow_mut() = Some(arb.clone()));
RUNNING.with(|cell| cell.set(false));
STORAGE.with(|cell| cell.borrow_mut().clear());
Expand All @@ -75,7 +84,7 @@ impl Arbiter {

/// Stop arbiter from continuing it's event loop.
pub fn stop(&self) {
let _ = self.0.unbounded_send(ArbiterCommand::Stop);
let _ = self.sender.unbounded_send(ArbiterCommand::Stop);
}

/// Spawn new thread and run event loop in spawned thread.
Expand All @@ -87,9 +96,9 @@ impl Arbiter {
let (arb_tx, arb_rx) = unbounded();
let arb_tx2 = arb_tx.clone();

let _ = thread::Builder::new().name(name.clone()).spawn(move || {
let handle = thread::Builder::new().name(name.clone()).spawn(move || {
let mut rt = Builder::new().build_rt().expect("Can not create Runtime");
let arb = Arbiter(arb_tx);
let arb = Arbiter::with_sender(arb_tx);

let (stop, stop_rx) = channel();
RUNNING.with(|cell| cell.set(true));
Expand Down Expand Up @@ -119,9 +128,9 @@ impl Arbiter {
let _ = System::current()
.sys()
.unbounded_send(SystemCommand::UnregisterArbiter(id));
});
}).unwrap_or_else(|err| panic!("Cannot spawn an arbiter's thread {:?}: {:?}", &name, err));

Arbiter(arb_tx2)
Arbiter{sender: arb_tx2, thread_handle: Some(handle)}
}

pub(crate) fn run_system() {
Expand Down Expand Up @@ -171,7 +180,7 @@ impl Arbiter {
F: Future<Item = (), Error = ()> + Send + 'static,
{
let _ = self
.0
.sender
.unbounded_send(ArbiterCommand::Execute(Box::new(future)));
}

Expand All @@ -182,7 +191,7 @@ impl Arbiter {
F: FnOnce() + Send + 'static,
{
let _ = self
.0
.sender
.unbounded_send(ArbiterCommand::ExecuteFn(Box::new(move || {
f();
})));
Expand All @@ -198,7 +207,7 @@ impl Arbiter {
{
let (tx, rx) = channel();
let _ = self
.0
.sender
.unbounded_send(ArbiterCommand::ExecuteFn(Box::new(move || {
if !tx.is_canceled() {
let _ = tx.send(f());
Expand Down Expand Up @@ -250,6 +259,20 @@ impl Arbiter {
f(item)
})
}

fn with_sender(sender: UnboundedSender<ArbiterCommand>) -> Self {
Self{sender, thread_handle: None}
}

/// Wait for the event loop to stop by joining the underlying thread (if have Some).
pub fn join(&mut self) -> thread::Result<()>{
if let Some(thread_handle) = self.thread_handle.take() {
thread_handle.join()
}
else {
Ok(())
}
}
}

struct ArbiterController {
Expand Down