From 6d28b3dec224c9d1dba3a283ed44bc6313ccd7b7 Mon Sep 17 00:00:00 2001 From: Ari Seyhun Date: Sat, 19 Oct 2024 17:15:11 +0800 Subject: [PATCH 1/5] feat: add support for preparing an actor and running it outside a spawned task --- src/actor/spawn.rs | 272 +++++++++++++++++++++++++++++++-------------- 1 file changed, 191 insertions(+), 81 deletions(-) diff --git a/src/actor/spawn.rs b/src/actor/spawn.rs index 3f5cd61..2ae1b4b 100644 --- a/src/actor/spawn.rs +++ b/src/actor/spawn.rs @@ -1,4 +1,4 @@ -use std::{convert, panic::AssertUnwindSafe, sync::Arc}; +use std::{convert, panic::AssertUnwindSafe, sync::Arc, thread}; use futures::{ stream::{AbortHandle, AbortRegistration, Abortable}, @@ -7,6 +7,7 @@ use futures::{ use tokio::{ runtime::{Handle, RuntimeFlavor}, sync::Semaphore, + task::JoinHandle, }; use tracing::{error, trace}; @@ -45,8 +46,9 @@ pub fn spawn(actor: A) -> ActorRef where A: Actor, { - let (actor_ref, mailbox_rx, abort_registration) = initialize_actor(); - spawn_inner::>(actor, actor_ref.clone(), mailbox_rx, abort_registration); + let prepared_actor = prepare(actor); + let actor_ref = prepared_actor.actor_ref().clone(); + prepared_actor.spawn(); actor_ref } @@ -78,9 +80,10 @@ where A: Actor, L: Actor, { - let (actor_ref, mailbox_rx, abort_registration) = initialize_actor(); + let prepared_actor = prepare(actor); + let actor_ref = prepared_actor.actor_ref().clone(); actor_ref.link(link_ref).await; - spawn_inner::>(actor, actor_ref.clone(), mailbox_rx, abort_registration); + prepared_actor.spawn(); actor_ref } @@ -117,9 +120,9 @@ where F: FnOnce(&ActorRef) -> Fu, Fu: Future, { - let (actor_ref, mailbox_rx, abort_registration) = initialize_actor(); - let actor = f(&actor_ref).await; - spawn_inner::>(actor, actor_ref.clone(), mailbox_rx, abort_registration); + let prepared_actor = prepare_with(f).await; + let actor_ref = prepared_actor.actor_ref().clone(); + prepared_actor.spawn(); actor_ref } @@ -164,99 +167,206 @@ pub fn spawn_in_thread(actor: A) -> ActorRef where A: Actor, { - let (actor_ref, mailbox_rx, abort_registration) = initialize_actor(); - spawn_in_thread_inner::>( - actor, - actor_ref.clone(), - mailbox_rx, - abort_registration, - ); + let prepared_actor = prepare(actor); + let actor_ref = prepared_actor.actor_ref().clone(); + prepared_actor.spawn_in_thread(); actor_ref } -#[inline] -fn initialize_actor() -> ( - ActorRef, - >::Receiver, - AbortRegistration, -) +/// Prepares an actor without spawning it, returning a [`PreparedActor`]. +/// +/// The actor is fully initialized, but will not start processing messages until explicitly +/// run or spawned. This function is useful when you need to interact with the actor through +/// its [`ActorRef`] before it begins execution. +/// +/// To start the actor, either run it synchronously in the current task with [`PreparedActor::run`] +/// or spawn it into a background task with [`PreparedActor::spawn`]. +/// +/// # Example +/// +/// ```rust +/// # use kameo::Actor; +/// # use kameo::message::{Context, Message}; +/// # use kameo::request::MessageSend; +/// # +/// # #[derive(Actor)] +/// # struct MyActor { } +/// # +/// # impl Message<&'static str> for MyActor { +/// # type Reply = (); +/// # async fn handle(&mut self, msg: &'static str, ctx: Context<'_, Self, Self::Reply>) -> Self::Reply { } +/// # } +/// # +/// # tokio_test::block_on(async { +/// let prepared_actor = kameo::actor::prepare(MyActor { }); +/// +/// // You can now interact with the actor using its ActorRef before running it +/// prepared_actor.actor_ref().tell("hello").send().await; +/// +/// // Finally, spawn the actor +/// prepared_actor.spawn(); +/// # Ok::<(), Box>(()) +/// # }); +/// ``` +pub fn prepare(actor: A) -> PreparedActor where A: Actor, { - let (mailbox, mailbox_rx) = A::new_mailbox(); - let (abort_handle, abort_registration) = AbortHandle::new_pair(); - let links = Links::default(); - let startup_semaphore = Arc::new(Semaphore::new(0)); - ( - ActorRef::new( - mailbox, - abort_handle, - links.clone(), - startup_semaphore.clone(), - ), - mailbox_rx, - abort_registration, - ) + PreparedActor::new(actor) } -#[inline] -fn spawn_inner( +async fn prepare_with(f: F) -> PreparedActor +where + A: Actor, + F: FnOnce(&ActorRef) -> Fu, + Fu: Future, +{ + PreparedActor::new_with(f).await +} + +/// A `PreparedActor` represents an actor that has been initialized and is ready to be either run +/// in the current task or spawned into a new task. +/// +/// The `PreparedActor` provides access to the actor's [`ActorRef`] for interacting with the actor +/// before it starts running. It allows for flexible execution, either by running the actor +/// synchronously in the current task or spawning it in a separate task or thread. +#[allow(missing_debug_implementations)] +pub struct PreparedActor { actor: A, actor_ref: ActorRef, mailbox_rx: >::Receiver, abort_registration: AbortRegistration, -) where - A: Actor, - S: ActorState + Send, -{ - #[cfg(not(tokio_unstable))] - { - tokio::spawn(CURRENT_ACTOR_ID.scope(actor_ref.id(), async move { - run_actor_lifecycle::(actor, actor_ref, mailbox_rx, abort_registration).await - })); +} + +impl PreparedActor { + fn new(actor: A) -> Self { + let (mailbox, mailbox_rx) = A::new_mailbox(); + let (abort_handle, abort_registration) = AbortHandle::new_pair(); + let links = Links::default(); + let startup_semaphore = Arc::new(Semaphore::new(0)); + let actor_ref = ActorRef::new( + mailbox, + abort_handle, + links.clone(), + startup_semaphore.clone(), + ); + + PreparedActor { + actor, + actor_ref, + mailbox_rx, + abort_registration, + } } - #[cfg(tokio_unstable)] + async fn new_with(f: F) -> Self + where + F: FnOnce(&ActorRef) -> Fu, + Fu: Future, { - tokio::task::Builder::new() - .name(A::name()) - .spawn(CURRENT_ACTOR_ID.scope(actor_ref.id(), async move { - run_actor_lifecycle::(actor, actor_ref, mailbox_rx, abort_registration).await - })) - .unwrap(); + let (mailbox, mailbox_rx) = A::new_mailbox(); + let (abort_handle, abort_registration) = AbortHandle::new_pair(); + let links = Links::default(); + let startup_semaphore = Arc::new(Semaphore::new(0)); + let actor_ref = ActorRef::new( + mailbox, + abort_handle, + links.clone(), + startup_semaphore.clone(), + ); + let actor = f(&actor_ref).await; + + PreparedActor { + actor, + actor_ref, + mailbox_rx, + abort_registration, + } } -} -#[inline] -fn spawn_in_thread_inner( - actor: A, - actor_ref: ActorRef, - mailbox_rx: >::Receiver, - abort_registration: AbortRegistration, -) -> ActorRef -where - A: Actor, - S: ActorState + Send, -{ - let handle = Handle::current(); - if matches!(handle.runtime_flavor(), RuntimeFlavor::CurrentThread) { - panic!("threaded actors are not supported in a single threaded tokio runtime"); + /// Returns a reference to the [`ActorRef`], which can be used to send messages to the actor. + /// + /// The `ActorRef` can be used for interaction before the actor starts processing its event loop. + pub fn actor_ref(&self) -> &ActorRef { + &self.actor_ref } - std::thread::Builder::new() - .name(A::name().to_string()) - .spawn({ - let actor_ref = actor_ref.clone(); - move || { - handle.block_on(CURRENT_ACTOR_ID.scope( - actor_ref.id(), - run_actor_lifecycle::(actor, actor_ref, mailbox_rx, abort_registration), - )) - } - }) - .unwrap(); + /// Runs the actor in the current context **without** spawning a separate task, until the actor is stopped. + /// + /// This is useful when you need to run an actor synchronously in the current context, + /// without background execution, and when the actor is expected to be short-lived. + /// + /// Note that the actor's mailbox may already contain messages before `run` is called. + /// In this case, the actor will process all pending messages in the mailbox before completing. + /// + /// # Example + /// + /// ```no_run + /// # use kameo::Actor; + /// # use kameo::message::{Context, Message}; + /// # use kameo::request::MessageSend; + /// # + /// # #[derive(Actor)] + /// # struct MyActor; + /// # + /// # impl Message<&'static str> for MyActor { + /// # type Reply = (); + /// # async fn handle(&mut self, msg: &'static str, ctx: Context<'_, Self, Self::Reply>) -> Self::Reply { } + /// # } + /// # + /// # tokio_test::block_on(async { + /// let prepared_actor = kameo::actor::prepare(MyActor); + /// // Send it a message before it runs + /// prepared_actor.actor_ref().tell("hello!").send().await?; + /// prepared_actor.run().await; + /// # Ok::<(), Box>(()) + /// # }); + /// ``` + pub async fn run(self) { + run_actor_lifecycle::>( + self.actor, + self.actor_ref, + self.mailbox_rx, + self.abort_registration, + ) + .await + } - actor_ref + /// Spawns the actor in a new background tokio task, returning the `JoinHandle`. + /// + /// See [`spawn`] for more information. + pub fn spawn(self) -> JoinHandle<()> { + #[cfg(not(tokio_unstable))] + { + tokio::spawn(CURRENT_ACTOR_ID.scope(self.actor_ref.id(), self.run())) + } + + #[cfg(tokio_unstable)] + { + tokio::task::Builder::new() + .name(A::name()) + .spawn(CURRENT_ACTOR_ID.scope(actor_ref.id(), self.run())) + .unwrap() + } + } + + /// Spawns the actor in a new background thread, returning the `JoinHandle`. + /// + /// See [`spawn_in_thread`] for more information. + pub fn spawn_in_thread(self) -> thread::JoinHandle<()> { + let handle = Handle::current(); + if matches!(handle.runtime_flavor(), RuntimeFlavor::CurrentThread) { + panic!("threaded actors are not supported in a single threaded tokio runtime"); + } + + std::thread::Builder::new() + .name(A::name().to_string()) + .spawn({ + let actor_ref = self.actor_ref.clone(); + move || handle.block_on(CURRENT_ACTOR_ID.scope(actor_ref.id(), self.run())) + }) + .unwrap() + } } #[inline] From 1cd3b54dc4690400e72bce75f669669f175762f3 Mon Sep 17 00:00:00 2001 From: Ari Seyhun Date: Mon, 21 Oct 2024 12:55:54 +0800 Subject: [PATCH 2/5] feat: return actor and reason after stopping --- src/actor.rs | 2 +- src/actor/spawn.rs | 17 ++++++++++------- 2 files changed, 11 insertions(+), 8 deletions(-) diff --git a/src/actor.rs b/src/actor.rs index 31bb675..82dc06e 100644 --- a/src/actor.rs +++ b/src/actor.rs @@ -204,7 +204,7 @@ pub trait Actor: Sized + Send + 'static { /// - `reason`: The reason why the actor is being stopped. #[allow(unused_variables)] fn on_stop( - self, + &mut self, actor_ref: WeakActorRef, reason: ActorStopReason, ) -> impl Future> + Send { diff --git a/src/actor/spawn.rs b/src/actor/spawn.rs index 2ae1b4b..7585c69 100644 --- a/src/actor/spawn.rs +++ b/src/actor/spawn.rs @@ -322,7 +322,7 @@ impl PreparedActor { /// # Ok::<(), Box>(()) /// # }); /// ``` - pub async fn run(self) { + pub async fn run(self) -> (A, ActorStopReason) { run_actor_lifecycle::>( self.actor, self.actor_ref, @@ -335,7 +335,7 @@ impl PreparedActor { /// Spawns the actor in a new background tokio task, returning the `JoinHandle`. /// /// See [`spawn`] for more information. - pub fn spawn(self) -> JoinHandle<()> { + pub fn spawn(self) -> JoinHandle<(A, ActorStopReason)> { #[cfg(not(tokio_unstable))] { tokio::spawn(CURRENT_ACTOR_ID.scope(self.actor_ref.id(), self.run())) @@ -353,7 +353,7 @@ impl PreparedActor { /// Spawns the actor in a new background thread, returning the `JoinHandle`. /// /// See [`spawn_in_thread`] for more information. - pub fn spawn_in_thread(self) -> thread::JoinHandle<()> { + pub fn spawn_in_thread(self) -> thread::JoinHandle<(A, ActorStopReason)> { let handle = Handle::current(); if matches!(handle.runtime_flavor(), RuntimeFlavor::CurrentThread) { panic!("threaded actors are not supported in a single threaded tokio runtime"); @@ -375,7 +375,8 @@ async fn run_actor_lifecycle( actor_ref: ActorRef, mailbox_rx: >::Receiver, abort_registration: AbortRegistration, -) where +) -> (A, ActorStopReason) +where A: Actor, S: ActorState, { @@ -404,13 +405,13 @@ async fn run_actor_lifecycle( let reason = ActorStopReason::Panicked(err); let mut state = S::new_from_actor(actor, actor_ref.clone()); let reason = state.on_shutdown(reason.clone()).await.unwrap_or(reason); - let actor = state.shutdown().await; + let mut actor = state.shutdown().await; actor .on_stop(actor_ref.clone(), reason.clone()) .await .unwrap(); log_actor_stop_reason(id, name, &reason); - return; + return (actor, reason); } let mut state = S::new_from_actor(actor, actor_ref.clone()); @@ -422,7 +423,7 @@ async fn run_actor_lifecycle( .await .unwrap_or(ActorStopReason::Killed); - let actor = state.shutdown().await; + let mut actor = state.shutdown().await; { let mut links = links.lock().await; @@ -434,6 +435,8 @@ async fn run_actor_lifecycle( let on_stop_res = actor.on_stop(actor_ref, reason.clone()).await; log_actor_stop_reason(id, name, &reason); on_stop_res.unwrap(); + + (actor, reason) } async fn abortable_actor_loop( From 87879367eea0a1543457f90e1baabd6fde8c5912 Mon Sep 17 00:00:00 2001 From: Ari Seyhun Date: Mon, 21 Oct 2024 13:00:51 +0800 Subject: [PATCH 3/5] refactor: use BoxReplySender type alias --- src/request/ask.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/request/ask.rs b/src/request/ask.rs index 6d656d7..9011ceb 100644 --- a/src/request/ask.rs +++ b/src/request/ask.rs @@ -10,7 +10,7 @@ use crate::{ error::{self, SendError}, mailbox::{bounded::BoundedMailbox, unbounded::UnboundedMailbox, Mailbox, Signal}, message::{BoxReply, Message}, - reply::ReplySender, + reply::{BoxReplySender, ReplySender}, Actor, Reply, }; @@ -36,7 +36,7 @@ where { mailbox: &'a Mb, signal: Signal, - rx: oneshot::Receiver>, + rx: BoxReplySender, } /// A request to a remote actor. From 01dbbaddabb6153417af843bfcaa8afeda847197 Mon Sep 17 00:00:00 2001 From: Ari Seyhun Date: Mon, 21 Oct 2024 13:43:06 +0800 Subject: [PATCH 4/5] fix: BoxReplySender used in place of receiver --- src/request/ask.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/request/ask.rs b/src/request/ask.rs index 9011ceb..6d656d7 100644 --- a/src/request/ask.rs +++ b/src/request/ask.rs @@ -10,7 +10,7 @@ use crate::{ error::{self, SendError}, mailbox::{bounded::BoundedMailbox, unbounded::UnboundedMailbox, Mailbox, Signal}, message::{BoxReply, Message}, - reply::{BoxReplySender, ReplySender}, + reply::ReplySender, Actor, Reply, }; @@ -36,7 +36,7 @@ where { mailbox: &'a Mb, signal: Signal, - rx: BoxReplySender, + rx: oneshot::Receiver>, } /// A request to a remote actor. From b74c98350cd99b34b78a49e96ff31e46d7a75585 Mon Sep 17 00:00:00 2001 From: Ari Seyhun Date: Mon, 21 Oct 2024 13:50:38 +0800 Subject: [PATCH 5/5] docs: fix doc test failing with on_stop actor method --- src/actor.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/actor.rs b/src/actor.rs index 82dc06e..64379ba 100644 --- a/src/actor.rs +++ b/src/actor.rs @@ -80,7 +80,7 @@ pub use spawn::*; /// } /// /// async fn on_stop( -/// self, +/// &mut self, /// actor_ref: WeakActorRef, /// reason: ActorStopReason, /// ) -> Result<(), BoxError> {