Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add support for preparing an actor and running it outside a spawned task #69

Merged
merged 7 commits into from
Oct 21, 2024
Merged
4 changes: 2 additions & 2 deletions src/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ pub use spawn::*;
/// }
///
/// async fn on_stop(
/// self,
/// &mut self,
/// actor_ref: WeakActorRef<Self>,
/// reason: ActorStopReason,
/// ) -> Result<(), BoxError> {
Expand Down Expand Up @@ -204,7 +204,7 @@ pub trait Actor: Sized + Send + 'static {
/// - `reason`: The reason why the actor is being stopped.
#[allow(unused_variables)]
fn on_stop(
self,
&mut self,
actor_ref: WeakActorRef<Self>,
reason: ActorStopReason,
) -> impl Future<Output = Result<(), BoxError>> + Send {
Expand Down
283 changes: 198 additions & 85 deletions src/actor/spawn.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{convert, panic::AssertUnwindSafe, sync::Arc};
use std::{convert, panic::AssertUnwindSafe, sync::Arc, thread};

use futures::{
stream::{AbortHandle, AbortRegistration, Abortable},
Expand All @@ -7,6 +7,7 @@ use futures::{
use tokio::{
runtime::{Handle, RuntimeFlavor},
sync::Semaphore,
task::JoinHandle,
};
use tracing::{error, trace};

Expand Down Expand Up @@ -45,8 +46,9 @@ pub fn spawn<A>(actor: A) -> ActorRef<A>
where
A: Actor,
{
let (actor_ref, mailbox_rx, abort_registration) = initialize_actor();
spawn_inner::<A, ActorBehaviour<A>>(actor, actor_ref.clone(), mailbox_rx, abort_registration);
let prepared_actor = prepare(actor);
let actor_ref = prepared_actor.actor_ref().clone();
prepared_actor.spawn();
actor_ref
}

Expand Down Expand Up @@ -78,9 +80,10 @@ where
A: Actor,
L: Actor,
{
let (actor_ref, mailbox_rx, abort_registration) = initialize_actor();
let prepared_actor = prepare(actor);
let actor_ref = prepared_actor.actor_ref().clone();
actor_ref.link(link_ref).await;
spawn_inner::<A, ActorBehaviour<A>>(actor, actor_ref.clone(), mailbox_rx, abort_registration);
prepared_actor.spawn();
actor_ref
}

Expand Down Expand Up @@ -117,9 +120,9 @@ where
F: FnOnce(&ActorRef<A>) -> Fu,
Fu: Future<Output = A>,
{
let (actor_ref, mailbox_rx, abort_registration) = initialize_actor();
let actor = f(&actor_ref).await;
spawn_inner::<A, ActorBehaviour<A>>(actor, actor_ref.clone(), mailbox_rx, abort_registration);
let prepared_actor = prepare_with(f).await;
let actor_ref = prepared_actor.actor_ref().clone();
prepared_actor.spawn();
actor_ref
}

Expand Down Expand Up @@ -164,99 +167,206 @@ pub fn spawn_in_thread<A>(actor: A) -> ActorRef<A>
where
A: Actor,
{
let (actor_ref, mailbox_rx, abort_registration) = initialize_actor();
spawn_in_thread_inner::<A, ActorBehaviour<A>>(
actor,
actor_ref.clone(),
mailbox_rx,
abort_registration,
);
let prepared_actor = prepare(actor);
let actor_ref = prepared_actor.actor_ref().clone();
prepared_actor.spawn_in_thread();
actor_ref
}

#[inline]
fn initialize_actor<A>() -> (
ActorRef<A>,
<A::Mailbox as Mailbox<A>>::Receiver,
AbortRegistration,
)
/// Prepares an actor without spawning it, returning a [`PreparedActor`].
///
/// The actor is fully initialized, but will not start processing messages until explicitly
/// run or spawned. This function is useful when you need to interact with the actor through
/// its [`ActorRef`] before it begins execution.
///
/// To start the actor, either run it synchronously in the current task with [`PreparedActor::run`]
/// or spawn it into a background task with [`PreparedActor::spawn`].
///
/// # Example
///
/// ```rust
/// # use kameo::Actor;
/// # use kameo::message::{Context, Message};
/// # use kameo::request::MessageSend;
/// #
/// # #[derive(Actor)]
/// # struct MyActor { }
/// #
/// # impl Message<&'static str> for MyActor {
/// # type Reply = ();
/// # async fn handle(&mut self, msg: &'static str, ctx: Context<'_, Self, Self::Reply>) -> Self::Reply { }
/// # }
/// #
/// # tokio_test::block_on(async {
/// let prepared_actor = kameo::actor::prepare(MyActor { });
///
/// // You can now interact with the actor using its ActorRef before running it
/// prepared_actor.actor_ref().tell("hello").send().await;
///
/// // Finally, spawn the actor
/// prepared_actor.spawn();
/// # Ok::<(), Box<dyn std::error::Error>>(())
/// # });
/// ```
pub fn prepare<A>(actor: A) -> PreparedActor<A>
where
A: Actor,
{
let (mailbox, mailbox_rx) = A::new_mailbox();
let (abort_handle, abort_registration) = AbortHandle::new_pair();
let links = Links::default();
let startup_semaphore = Arc::new(Semaphore::new(0));
(
ActorRef::new(
mailbox,
abort_handle,
links.clone(),
startup_semaphore.clone(),
),
mailbox_rx,
abort_registration,
)
PreparedActor::new(actor)
}

#[inline]
fn spawn_inner<A, S>(
async fn prepare_with<A, F, Fu>(f: F) -> PreparedActor<A>
where
A: Actor,
F: FnOnce(&ActorRef<A>) -> Fu,
Fu: Future<Output = A>,
{
PreparedActor::new_with(f).await
}

/// A `PreparedActor` represents an actor that has been initialized and is ready to be either run
/// in the current task or spawned into a new task.
///
/// The `PreparedActor` provides access to the actor's [`ActorRef`] for interacting with the actor
/// before it starts running. It allows for flexible execution, either by running the actor
/// synchronously in the current task or spawning it in a separate task or thread.
#[allow(missing_debug_implementations)]
pub struct PreparedActor<A: Actor> {
actor: A,
actor_ref: ActorRef<A>,
mailbox_rx: <A::Mailbox as Mailbox<A>>::Receiver,
abort_registration: AbortRegistration,
) where
A: Actor,
S: ActorState<A> + Send,
{
#[cfg(not(tokio_unstable))]
{
tokio::spawn(CURRENT_ACTOR_ID.scope(actor_ref.id(), async move {
run_actor_lifecycle::<A, S>(actor, actor_ref, mailbox_rx, abort_registration).await
}));
}

impl<A: Actor> PreparedActor<A> {
fn new(actor: A) -> Self {
let (mailbox, mailbox_rx) = A::new_mailbox();
let (abort_handle, abort_registration) = AbortHandle::new_pair();
let links = Links::default();
let startup_semaphore = Arc::new(Semaphore::new(0));
let actor_ref = ActorRef::new(
mailbox,
abort_handle,
links.clone(),
startup_semaphore.clone(),
);

PreparedActor {
actor,
actor_ref,
mailbox_rx,
abort_registration,
}
}

#[cfg(tokio_unstable)]
async fn new_with<F, Fu>(f: F) -> Self
where
F: FnOnce(&ActorRef<A>) -> Fu,
Fu: Future<Output = A>,
{
tokio::task::Builder::new()
.name(A::name())
.spawn(CURRENT_ACTOR_ID.scope(actor_ref.id(), async move {
run_actor_lifecycle::<A, S>(actor, actor_ref, mailbox_rx, abort_registration).await
}))
.unwrap();
let (mailbox, mailbox_rx) = A::new_mailbox();
let (abort_handle, abort_registration) = AbortHandle::new_pair();
let links = Links::default();
let startup_semaphore = Arc::new(Semaphore::new(0));
let actor_ref = ActorRef::new(
mailbox,
abort_handle,
links.clone(),
startup_semaphore.clone(),
);
let actor = f(&actor_ref).await;

PreparedActor {
actor,
actor_ref,
mailbox_rx,
abort_registration,
}
}
}

#[inline]
fn spawn_in_thread_inner<A, S>(
actor: A,
actor_ref: ActorRef<A>,
mailbox_rx: <A::Mailbox as Mailbox<A>>::Receiver,
abort_registration: AbortRegistration,
) -> ActorRef<A>
where
A: Actor,
S: ActorState<A> + Send,
{
let handle = Handle::current();
if matches!(handle.runtime_flavor(), RuntimeFlavor::CurrentThread) {
panic!("threaded actors are not supported in a single threaded tokio runtime");
/// Returns a reference to the [`ActorRef`], which can be used to send messages to the actor.
///
/// The `ActorRef` can be used for interaction before the actor starts processing its event loop.
pub fn actor_ref(&self) -> &ActorRef<A> {
&self.actor_ref
}

std::thread::Builder::new()
.name(A::name().to_string())
.spawn({
let actor_ref = actor_ref.clone();
move || {
handle.block_on(CURRENT_ACTOR_ID.scope(
actor_ref.id(),
run_actor_lifecycle::<A, S>(actor, actor_ref, mailbox_rx, abort_registration),
))
}
})
.unwrap();
/// Runs the actor in the current context **without** spawning a separate task, until the actor is stopped.
///
/// This is useful when you need to run an actor synchronously in the current context,
/// without background execution, and when the actor is expected to be short-lived.
///
/// Note that the actor's mailbox may already contain messages before `run` is called.
/// In this case, the actor will process all pending messages in the mailbox before completing.
///
/// # Example
///
/// ```no_run
/// # use kameo::Actor;
/// # use kameo::message::{Context, Message};
/// # use kameo::request::MessageSend;
/// #
/// # #[derive(Actor)]
/// # struct MyActor;
/// #
/// # impl Message<&'static str> for MyActor {
/// # type Reply = ();
/// # async fn handle(&mut self, msg: &'static str, ctx: Context<'_, Self, Self::Reply>) -> Self::Reply { }
/// # }
/// #
/// # tokio_test::block_on(async {
/// let prepared_actor = kameo::actor::prepare(MyActor);
/// // Send it a message before it runs
/// prepared_actor.actor_ref().tell("hello!").send().await?;
/// prepared_actor.run().await;
/// # Ok::<(), Box<dyn std::error::Error>>(())
/// # });
/// ```
pub async fn run(self) -> (A, ActorStopReason) {
run_actor_lifecycle::<A, ActorBehaviour<A>>(
self.actor,
self.actor_ref,
self.mailbox_rx,
self.abort_registration,
)
.await
}

actor_ref
/// Spawns the actor in a new background tokio task, returning the `JoinHandle`.
///
/// See [`spawn`] for more information.
pub fn spawn(self) -> JoinHandle<(A, ActorStopReason)> {
#[cfg(not(tokio_unstable))]
{
tokio::spawn(CURRENT_ACTOR_ID.scope(self.actor_ref.id(), self.run()))
}

#[cfg(tokio_unstable)]
{
tokio::task::Builder::new()
.name(A::name())
.spawn(CURRENT_ACTOR_ID.scope(actor_ref.id(), self.run()))
.unwrap()
}
}

/// Spawns the actor in a new background thread, returning the `JoinHandle`.
///
/// See [`spawn_in_thread`] for more information.
pub fn spawn_in_thread(self) -> thread::JoinHandle<(A, ActorStopReason)> {
let handle = Handle::current();
if matches!(handle.runtime_flavor(), RuntimeFlavor::CurrentThread) {
panic!("threaded actors are not supported in a single threaded tokio runtime");
}

std::thread::Builder::new()
.name(A::name().to_string())
.spawn({
let actor_ref = self.actor_ref.clone();
move || handle.block_on(CURRENT_ACTOR_ID.scope(actor_ref.id(), self.run()))
})
.unwrap()
}
}

#[inline]
Expand All @@ -265,7 +375,8 @@ async fn run_actor_lifecycle<A, S>(
actor_ref: ActorRef<A>,
mailbox_rx: <A::Mailbox as Mailbox<A>>::Receiver,
abort_registration: AbortRegistration,
) where
) -> (A, ActorStopReason)
where
A: Actor,
S: ActorState<A>,
{
Expand Down Expand Up @@ -294,13 +405,13 @@ async fn run_actor_lifecycle<A, S>(
let reason = ActorStopReason::Panicked(err);
let mut state = S::new_from_actor(actor, actor_ref.clone());
let reason = state.on_shutdown(reason.clone()).await.unwrap_or(reason);
let actor = state.shutdown().await;
let mut actor = state.shutdown().await;
actor
.on_stop(actor_ref.clone(), reason.clone())
.await
.unwrap();
log_actor_stop_reason(id, name, &reason);
return;
return (actor, reason);
}

let mut state = S::new_from_actor(actor, actor_ref.clone());
Expand All @@ -312,7 +423,7 @@ async fn run_actor_lifecycle<A, S>(
.await
.unwrap_or(ActorStopReason::Killed);

let actor = state.shutdown().await;
let mut actor = state.shutdown().await;

{
let mut links = links.lock().await;
Expand All @@ -324,6 +435,8 @@ async fn run_actor_lifecycle<A, S>(
let on_stop_res = actor.on_stop(actor_ref, reason.clone()).await;
log_actor_stop_reason(id, name, &reason);
on_stop_res.unwrap();

(actor, reason)
}

async fn abortable_actor_loop<A, S>(
Expand Down