diff --git a/benches/fibonacci.rs b/benches/fibonacci.rs index 25ecd2f..4c92ead 100644 --- a/benches/fibonacci.rs +++ b/benches/fibonacci.rs @@ -1,7 +1,8 @@ use criterion::BenchmarkId; use criterion::Criterion; use criterion::{criterion_group, criterion_main}; -use kameo::actor::BoundedMailbox; +use kameo::mailbox::bounded::BoundedMailbox; +use kameo::request::MessageSend; use kameo::{ message::{Context, Message}, Actor, diff --git a/benches/overhead.rs b/benches/overhead.rs index 4543ead..1f08116 100644 --- a/benches/overhead.rs +++ b/benches/overhead.rs @@ -1,6 +1,7 @@ use criterion::Criterion; use criterion::{criterion_group, criterion_main}; -use kameo::actor::UnboundedMailbox; +use kameo::mailbox::unbounded::UnboundedMailbox; +use kameo::request::MessageSend; use kameo::{ message::{Context, Message}, Actor, diff --git a/examples/ask.rs b/examples/ask.rs index 1350c09..72461d4 100644 --- a/examples/ask.rs +++ b/examples/ask.rs @@ -1,8 +1,9 @@ use std::time::Duration; use kameo::{ - actor::UnboundedMailbox, + mailbox::unbounded::UnboundedMailbox, message::{Context, Message}, + request::{MessageSend, MessageSendSync}, Actor, }; use tracing::info; @@ -35,7 +36,7 @@ impl Message for MyActor { } } -#[tokio::main(flavor = "current_thread")] +#[tokio::main] async fn main() -> Result<(), Box> { tracing_subscriber::fmt() .with_env_filter("trace".parse::().unwrap()) @@ -45,7 +46,7 @@ async fn main() -> Result<(), Box> { let my_actor_ref = kameo::spawn(MyActor::default()); - my_actor_ref.tell(Inc { amount: 3 }).send()?; + my_actor_ref.tell(Inc { amount: 3 }).send_sync()?; tokio::time::sleep(Duration::from_millis(200)).await; // Increment the count by 3 diff --git a/examples/basic.rs b/examples/basic.rs index b63249d..6d807d2 100644 --- a/examples/basic.rs +++ b/examples/basic.rs @@ -1,6 +1,7 @@ use kameo::{ - actor::UnboundedMailbox, + mailbox::unbounded::UnboundedMailbox, message::{Context, Message}, + request::{MessageSend, MessageSendSync}, Actor, }; use tracing::info; @@ -48,7 +49,7 @@ impl Message for MyActor { } } -#[tokio::main(flavor = "current_thread")] +#[tokio::main] async fn main() -> Result<(), Box> { tracing_subscriber::fmt() .with_env_filter("trace".parse::().unwrap()) @@ -63,14 +64,14 @@ async fn main() -> Result<(), Box> { info!("Count is {count}"); // Increment the count by 50 in the background - my_actor_ref.tell(Inc { amount: 50 }).send()?; + my_actor_ref.tell(Inc { amount: 50 }).send_sync()?; // Increment the count by 2 let count = my_actor_ref.ask(Inc { amount: 2 }).send().await?; info!("Count is {count}"); // Async messages that return an Err will cause the actor to panic - my_actor_ref.tell(ForceErr).send()?; + my_actor_ref.tell(ForceErr).send_sync()?; // Actor should be stopped, so we cannot send more messages to it assert!(my_actor_ref.ask(Inc { amount: 2 }).send().await.is_err()); diff --git a/examples/macro.rs b/examples/macro.rs index 9dc677a..bba1633 100644 --- a/examples/macro.rs +++ b/examples/macro.rs @@ -1,6 +1,10 @@ use std::fmt; -use kameo::{messages, Actor}; +use kameo::{ + messages, + request::{MessageSend, MessageSendSync}, + Actor, +}; use tracing::info; use tracing_subscriber::EnvFilter; @@ -54,7 +58,7 @@ async fn main() -> Result<(), Box> { info!("Count is {count}"); // Increment the count by 50 in the background - my_actor_ref.tell(Inc { amount: 50 }).send()?; + my_actor_ref.tell(Inc { amount: 50 }).send_sync()?; // Generic message my_actor_ref @@ -65,7 +69,7 @@ async fn main() -> Result<(), Box> { .await?; // Async messages that return an Err will cause the actor to panic - my_actor_ref.tell(ForceErr).send()?; + my_actor_ref.tell(ForceErr).send_sync()?; // Actor should be stopped, so we cannot send more messages to it assert!(my_actor_ref.ask(Inc { amount: 2 }).send().await.is_err()); diff --git a/examples/pool.rs b/examples/pool.rs index 4f3f41b..9246258 100644 --- a/examples/pool.rs +++ b/examples/pool.rs @@ -3,6 +3,7 @@ use std::time::Duration; use kameo::{ actor::{ActorPool, BroadcastMsg, WorkerMsg}, message::{Context, Message}, + request::MessageSend, Actor, }; use tracing_subscriber::EnvFilter; @@ -36,7 +37,7 @@ impl Message for MyActor { } } -#[tokio::main(flavor = "current_thread")] +#[tokio::main] async fn main() -> Result<(), Box> { tracing_subscriber::fmt() .with_env_filter("warn".parse::().unwrap()) diff --git a/examples/pubsub.rs b/examples/pubsub.rs index b04c249..14c2665 100644 --- a/examples/pubsub.rs +++ b/examples/pubsub.rs @@ -1,6 +1,7 @@ use kameo::{ actor::{PubSub, Publish, Subscribe}, message::{Context, Message}, + request::MessageSend, Actor, }; use tracing_subscriber::EnvFilter; @@ -38,7 +39,7 @@ impl Message for ActorB { } } -#[tokio::main(flavor = "current_thread")] +#[tokio::main] async fn main() -> Result<(), Box> { tracing_subscriber::fmt() .with_env_filter("warn".parse::().unwrap()) diff --git a/examples/remote.rs b/examples/remote.rs index eb19c46..765751e 100644 --- a/examples/remote.rs +++ b/examples/remote.rs @@ -4,6 +4,7 @@ use kameo::{ actor::RemoteActorRef, message::{Context, Message}, remote::ActorSwarm, + request::MessageSend, Actor, }; use kameo_macros::{remote_message, RemoteActor}; diff --git a/examples/stream.rs b/examples/stream.rs index c3ba7cf..dd35cdf 100644 --- a/examples/stream.rs +++ b/examples/stream.rs @@ -2,8 +2,9 @@ use std::{future::pending, time}; use futures::stream; use kameo::{ - actor::{ActorRef, UnboundedMailbox}, + actor::ActorRef, error::BoxError, + mailbox::unbounded::UnboundedMailbox, message::{Context, Message, StreamMessage}, Actor, }; @@ -57,7 +58,7 @@ impl Message> for MyActor { } } -#[tokio::main(flavor = "current_thread")] +#[tokio::main] async fn main() -> Result<(), Box> { tracing_subscriber::fmt() .with_env_filter("trace".parse::().unwrap()) diff --git a/macros/src/derive_actor.rs b/macros/src/derive_actor.rs index 0f02efe..f94bc72 100644 --- a/macros/src/derive_actor.rs +++ b/macros/src/derive_actor.rs @@ -28,21 +28,21 @@ impl ToTokens for DeriveActor { let mailbox_expanded = match attrs.mailbox { MailboxKind::Bounded(_) => quote! { - ::kameo::actor::BoundedMailbox + ::kameo::mailbox::bounded::BoundedMailbox }, MailboxKind::Unbounded => quote! { - ::kameo::actor::UnboundedMailbox + ::kameo::mailbox::unbounded::UnboundedMailbox }, }; let new_mailbox_expanded = match attrs.mailbox { MailboxKind::Bounded(cap) => { let cap = cap.unwrap_or(1000); quote! { - ::kameo::actor::BoundedMailbox::new(#cap) + ::kameo::mailbox::bounded::BoundedMailbox::new(#cap) } } MailboxKind::Unbounded => quote! { - ::kameo::actor::UnboundedMailbox::new() + ::kameo::mailbox::unbounded::UnboundedMailbox::new() }, }; @@ -55,7 +55,7 @@ impl ToTokens for DeriveActor { #name } - fn new_mailbox() -> (Self::Mailbox, ::kameo::actor::MailboxReceiver) { + fn new_mailbox() -> (Self::Mailbox, >::Receiver) { #new_mailbox_expanded } } diff --git a/macros/src/remote_message.rs b/macros/src/remote_message.rs index ef481c7..913be68 100644 --- a/macros/src/remote_message.rs +++ b/macros/src/remote_message.rs @@ -60,44 +60,62 @@ impl RemoteMessage { #[linkme(crate = ::kameo::remote::_internal::linkme)] static REG: ( ::kameo::remote::_internal::RemoteMessageRegistrationID<'static>, - ( - ::kameo::remote::_internal::AskRemoteMessageFn, - ::kameo::remote::_internal::TellRemoteMessageFn, - ), + ::kameo::remote::_internal::RemoteMessageFns, ) = ( ::kameo::remote::_internal::RemoteMessageRegistrationID { actor_remote_id: <#actor_ty as ::kameo::remote::RemoteActor>::REMOTE_ID, message_remote_id: <#actor_ty #ty_generics as ::kameo::remote::RemoteMessage<#message_generics>>::REMOTE_ID, }, - ( - (|actor_id: ::kameo::actor::ActorID, - msg: ::std::vec::Vec, - mailbox_timeout: ::std::option::Option<::std::time::Duration>, - reply_timeout: ::std::option::Option<::std::time::Duration>, - immediate: bool| { - ::std::boxed::Box::pin(::kameo::remote::_internal::ask_remote_message::< - #actor_ty, - #message_generics, - >( - actor_id, - msg, - mailbox_timeout, - reply_timeout, - immediate, - )) - }) as ::kameo::remote::_internal::AskRemoteMessageFn, - (|actor_id: ::kameo::actor::ActorID, - msg: ::std::vec::Vec, - mailbox_timeout: ::std::option::Option<::std::time::Duration>, - immediate: bool| { - ::std::boxed::Box::pin(::kameo::remote::_internal::tell_remote_message::< - #actor_ty, - #message_generics, - >( - actor_id, msg, mailbox_timeout, immediate - )) - }) as ::kameo::remote::_internal::TellRemoteMessageFn, - ), + ::kameo::remote::_internal::RemoteMessageFns { + ask: (|actor_id: ::kameo::actor::ActorID, + msg: ::std::vec::Vec, + mailbox_timeout: ::std::option::Option<::std::time::Duration>, + reply_timeout: ::std::option::Option<::std::time::Duration>| { + ::std::boxed::Box::pin(::kameo::remote::_internal::ask::< + #actor_ty, + #message_generics, + >( + actor_id, + msg, + mailbox_timeout, + reply_timeout, + )) + }) as ::kameo::remote::_internal::RemoteAskFn, + try_ask: (|actor_id: ::kameo::actor::ActorID, + msg: ::std::vec::Vec, + reply_timeout: ::std::option::Option<::std::time::Duration>| { + ::std::boxed::Box::pin(::kameo::remote::_internal::try_ask::< + #actor_ty, + #message_generics, + >( + actor_id, + msg, + reply_timeout, + )) + }) as ::kameo::remote::_internal::RemoteTryAskFn, + tell: (|actor_id: ::kameo::actor::ActorID, + msg: ::std::vec::Vec, + mailbox_timeout: ::std::option::Option<::std::time::Duration>| { + ::std::boxed::Box::pin(::kameo::remote::_internal::tell::< + #actor_ty, + #message_generics, + >( + actor_id, + msg, + mailbox_timeout, + )) + }) as ::kameo::remote::_internal::RemoteTellFn, + try_tell: (|actor_id: ::kameo::actor::ActorID, + msg: ::std::vec::Vec| { + ::std::boxed::Box::pin(::kameo::remote::_internal::try_tell::< + #actor_ty, + #message_generics, + >( + actor_id, + msg, + )) + }) as ::kameo::remote::_internal::RemoteTryTellFn, + }, ); }; } diff --git a/src/actor.rs b/src/actor.rs index a63bc60..2250702 100644 --- a/src/actor.rs +++ b/src/actor.rs @@ -24,7 +24,6 @@ mod actor_ref; mod id; mod kind; -mod mailbox; mod pool; mod pubsub; mod spawn; @@ -33,11 +32,13 @@ use std::any; use futures::Future; -use crate::error::{ActorStopReason, BoxError, PanicError}; +use crate::{ + error::{ActorStopReason, BoxError, PanicError}, + mailbox::Mailbox, +}; pub use actor_ref::*; pub use id::*; -pub use mailbox::*; pub use pool::*; pub use pubsub::*; pub use spawn::*; @@ -87,7 +88,7 @@ pub trait Actor: Sized + Send + 'static { } /// Creates a new mailbox. - fn new_mailbox() -> (Self::Mailbox, MailboxReceiver) { + fn new_mailbox() -> (Self::Mailbox, >::Receiver) { Self::Mailbox::default_mailbox() } diff --git a/src/actor/actor_ref.rs b/src/actor/actor_ref.rs index a4f13a2..6ede030 100644 --- a/src/actor/actor_ref.rs +++ b/src/actor/actor_ref.rs @@ -10,6 +10,7 @@ use tokio::{ use crate::{ error::{RegistrationError, SendError}, + mailbox::{Mailbox, Signal, SignalMailbox, WeakMailbox}, message::{Message, StreamMessage}, remote::{ActorSwarm, RemoteActor, RemoteMessage, SwarmCommand}, reply::Reply, @@ -20,7 +21,7 @@ use crate::{ Actor, }; -use super::{id::ActorID, Mailbox, Signal, SignalMailbox, WeakMailbox}; +use super::id::ActorID; task_local! { pub(crate) static CURRENT_ACTOR_ID: ActorID; @@ -320,8 +321,8 @@ where reply: None, sent_within_actor: false, }) - .await?; - Ok(()) + .await + .map_err(|err| err.map_msg(|msg| msg.downcast_message::().unwrap())) } } @@ -362,7 +363,7 @@ impl AsRef for ActorRef { pub struct RemoteActorRef { id: ActorID, swarm_tx: mpsc::Sender, - phantom: PhantomData, + phantom: PhantomData, } impl RemoteActorRef { diff --git a/src/actor/kind.rs b/src/actor/kind.rs index c688bfd..4c53ea9 100644 --- a/src/actor/kind.rs +++ b/src/actor/kind.rs @@ -6,10 +6,11 @@ use tokio::sync::oneshot; use crate::{ actor::{Actor, ActorRef, WeakActorRef}, error::{ActorStopReason, BoxSendError, PanicError}, + mailbox::Signal, message::{BoxReply, DynMessage}, }; -use super::{ActorID, Signal}; +use super::ActorID; pub(crate) trait ActorState: Sized { fn new_from_actor(actor: A, actor_ref: WeakActorRef) -> Self; diff --git a/src/actor/mailbox.rs b/src/actor/mailbox.rs index 044252e..f9b3df3 100644 --- a/src/actor/mailbox.rs +++ b/src/actor/mailbox.rs @@ -1,6 +1,9 @@ +mod bounded; +mod unbounded; + use dyn_clone::DynClone; -use futures::{future::BoxFuture, Future, FutureExt}; -use tokio::sync::{mpsc, oneshot}; +use futures::{future::BoxFuture, Future}; +use tokio::sync::oneshot; use crate::{ error::{ActorStopReason, BoxSendError, SendError}, @@ -10,15 +13,16 @@ use crate::{ use super::{ActorID, ActorRef}; -#[doc(hidden)] +/// A trait defining the behaviour and functionality of a mailbox. pub trait Mailbox: SignalMailbox + Clone + Send + Sync { + type Receiver: MailboxReceiver; type WeakMailbox: WeakMailbox; - fn default_mailbox() -> (Self, MailboxReceiver); + fn default_mailbox() -> (Self, Self::Receiver); fn send( &self, signal: Signal, - ) -> impl Future>>> + Send + '_; + ) -> impl Future>>> + Send + '_; fn closed(&self) -> impl Future + '_; fn is_closed(&self) -> bool; fn downgrade(&self) -> Self::WeakMailbox; @@ -26,7 +30,12 @@ pub trait Mailbox: SignalMailbox + Clone + Send + Sync { fn weak_count(&self) -> usize; } -#[doc(hidden)] +/// A mailbox receiver. +pub trait MailboxReceiver: Send + 'static { + fn recv(&mut self) -> impl Future>> + Send + '_; +} + +/// A weak mailbox which can be upraded. pub trait WeakMailbox: SignalMailbox + Clone + Send + Sync { type StrongMailbox; @@ -35,199 +44,6 @@ pub trait WeakMailbox: SignalMailbox + Clone + Send + Sync { fn weak_count(&self) -> usize; } -/// An unbounded mailbox, where the sending messages to a full mailbox causes backpressure. -#[allow(missing_debug_implementations)] -pub struct BoundedMailbox(pub(crate) mpsc::Sender>); - -impl BoundedMailbox { - /// Creates a new bounded mailbox with a given capacity. - #[inline] - pub fn new(capacity: usize) -> (Self, MailboxReceiver) { - let (tx, rx) = mpsc::channel(capacity); - (BoundedMailbox(tx), MailboxReceiver::Bounded(rx)) - } -} - -impl Mailbox for BoundedMailbox { - type WeakMailbox = WeakBoundedMailbox; - - #[inline] - fn default_mailbox() -> (Self, MailboxReceiver) { - BoundedMailbox::new(1000) - } - - #[inline] - async fn send(&self, signal: Signal) -> Result<(), mpsc::error::SendError>> { - self.0.send(signal).await - } - - #[inline] - async fn closed(&self) { - self.0.closed().await - } - - #[inline] - fn is_closed(&self) -> bool { - self.0.is_closed() - } - - #[inline] - fn downgrade(&self) -> Self::WeakMailbox { - WeakBoundedMailbox(self.0.downgrade()) - } - - #[inline] - fn strong_count(&self) -> usize { - self.0.strong_count() - } - - #[inline] - fn weak_count(&self) -> usize { - self.0.weak_count() - } -} - -impl Clone for BoundedMailbox { - fn clone(&self) -> Self { - BoundedMailbox(self.0.clone()) - } -} - -/// A weak bounded mailbox that does not prevent the actor from being stopped. -#[allow(missing_debug_implementations)] -pub struct WeakBoundedMailbox(pub(crate) mpsc::WeakSender>); - -impl WeakMailbox for WeakBoundedMailbox { - type StrongMailbox = BoundedMailbox; - - #[inline] - fn upgrade(&self) -> Option { - self.0.upgrade().map(BoundedMailbox) - } - - #[inline] - fn strong_count(&self) -> usize { - self.0.strong_count() - } - - #[inline] - fn weak_count(&self) -> usize { - self.0.weak_count() - } -} - -impl Clone for WeakBoundedMailbox { - fn clone(&self) -> Self { - WeakBoundedMailbox(self.0.clone()) - } -} - -/// An unbounded mailbox, where the number of messages queued can grow infinitely. -#[allow(missing_debug_implementations)] -pub struct UnboundedMailbox(pub(crate) mpsc::UnboundedSender>); - -impl UnboundedMailbox { - /// Creates a new unbounded mailbox. - #[inline] - pub fn new() -> (Self, MailboxReceiver) { - let (tx, rx) = mpsc::unbounded_channel(); - (UnboundedMailbox(tx), MailboxReceiver::Unbounded(rx)) - } -} - -impl Mailbox for UnboundedMailbox { - type WeakMailbox = WeakUnboundedMailbox; - - #[inline] - fn default_mailbox() -> (Self, MailboxReceiver) { - let (tx, rx) = mpsc::unbounded_channel(); - (UnboundedMailbox(tx), MailboxReceiver::Unbounded(rx)) - } - - #[inline] - async fn send(&self, signal: Signal) -> Result<(), mpsc::error::SendError>> { - self.0.send(signal) - } - - #[inline] - async fn closed(&self) { - self.0.closed().await - } - - #[inline] - fn is_closed(&self) -> bool { - self.0.is_closed() - } - - #[inline] - fn downgrade(&self) -> Self::WeakMailbox { - WeakUnboundedMailbox(self.0.downgrade()) - } - - #[inline] - fn strong_count(&self) -> usize { - self.0.strong_count() - } - - #[inline] - fn weak_count(&self) -> usize { - self.0.weak_count() - } -} - -impl Clone for UnboundedMailbox { - fn clone(&self) -> Self { - UnboundedMailbox(self.0.clone()) - } -} - -/// A weak unbounded mailbox that does not prevent the actor from being stopped. -#[allow(missing_debug_implementations)] -pub struct WeakUnboundedMailbox(pub(crate) mpsc::WeakUnboundedSender>); - -impl WeakMailbox for WeakUnboundedMailbox { - type StrongMailbox = UnboundedMailbox; - - #[inline] - fn upgrade(&self) -> Option { - self.0.upgrade().map(UnboundedMailbox) - } - - #[inline] - fn strong_count(&self) -> usize { - self.0.strong_count() - } - - #[inline] - fn weak_count(&self) -> usize { - self.0.weak_count() - } -} - -impl Clone for WeakUnboundedMailbox { - fn clone(&self) -> Self { - WeakUnboundedMailbox(self.0.clone()) - } -} - -/// A mailbox receiver, either bounded or unbounded. -#[allow(missing_debug_implementations)] -pub enum MailboxReceiver { - /// A bounded mailbox receiver. - Bounded(mpsc::Receiver>), - /// An unbounded mailbox receiver. - Unbounded(mpsc::UnboundedReceiver>), -} - -impl MailboxReceiver { - pub(crate) async fn recv(&mut self) -> Option> { - match self { - MailboxReceiver::Bounded(rx) => rx.recv().await, - MailboxReceiver::Unbounded(rx) => rx.recv().await, - } - } -} - #[allow(missing_debug_implementations)] #[doc(hidden)] pub enum Signal { @@ -269,156 +85,3 @@ pub trait SignalMailbox: DynClone + Send { } dyn_clone::clone_trait_object!(SignalMailbox); - -impl SignalMailbox for BoundedMailbox -where - A: Actor, -{ - fn signal_startup_finished(&self) -> BoxFuture<'_, Result<(), SendError>> { - async move { - self.0 - .send(Signal::StartupFinished) - .await - .map_err(|_| SendError::ActorNotRunning(())) - } - .boxed() - } - - fn signal_link_died( - &self, - id: ActorID, - reason: ActorStopReason, - ) -> BoxFuture<'_, Result<(), SendError>> { - async move { - self.0 - .send(Signal::LinkDied { id, reason }) - .await - .map_err(|_| SendError::ActorNotRunning(())) - } - .boxed() - } - - fn signal_stop(&self) -> BoxFuture<'_, Result<(), SendError>> { - async move { - self.0 - .send(Signal::Stop) - .await - .map_err(|_| SendError::ActorNotRunning(())) - } - .boxed() - } -} - -impl SignalMailbox for WeakBoundedMailbox -where - A: Actor, -{ - fn signal_startup_finished(&self) -> BoxFuture<'_, Result<(), SendError>> { - async move { - match self.upgrade() { - Some(mb) => mb.signal_startup_finished().await, - None => Err(SendError::ActorNotRunning(())), - } - } - .boxed() - } - - fn signal_link_died( - &self, - id: ActorID, - reason: ActorStopReason, - ) -> BoxFuture<'_, Result<(), SendError>> { - async move { - match self.upgrade() { - Some(mb) => mb.signal_link_died(id, reason).await, - None => Err(SendError::ActorNotRunning(())), - } - } - .boxed() - } - - fn signal_stop(&self) -> BoxFuture<'_, Result<(), SendError>> { - async move { - match self.upgrade() { - Some(mb) => mb.signal_stop().await, - None => Err(SendError::ActorNotRunning(())), - } - } - .boxed() - } -} - -impl SignalMailbox for UnboundedMailbox -where - A: Actor, -{ - fn signal_startup_finished(&self) -> BoxFuture<'_, Result<(), SendError>> { - async move { - self.0 - .send(Signal::StartupFinished) - .map_err(|_| SendError::ActorNotRunning(())) - } - .boxed() - } - - fn signal_link_died( - &self, - id: ActorID, - reason: ActorStopReason, - ) -> BoxFuture<'_, Result<(), SendError>> { - async move { - self.0 - .send(Signal::LinkDied { id, reason }) - .map_err(|_| SendError::ActorNotRunning(())) - } - .boxed() - } - - fn signal_stop(&self) -> BoxFuture<'_, Result<(), SendError>> { - async move { - self.0 - .send(Signal::Stop) - .map_err(|_| SendError::ActorNotRunning(())) - } - .boxed() - } -} - -impl SignalMailbox for WeakUnboundedMailbox -where - A: Actor, -{ - fn signal_startup_finished(&self) -> BoxFuture<'_, Result<(), SendError>> { - async move { - match self.upgrade() { - Some(mb) => mb.signal_startup_finished().await, - None => Err(SendError::ActorNotRunning(())), - } - } - .boxed() - } - - fn signal_link_died( - &self, - id: ActorID, - reason: ActorStopReason, - ) -> BoxFuture<'_, Result<(), SendError>> { - async move { - match self.upgrade() { - Some(mb) => mb.signal_link_died(id, reason).await, - None => Err(SendError::ActorNotRunning(())), - } - } - .boxed() - } - - fn signal_stop(&self) -> BoxFuture<'_, Result<(), SendError>> { - async move { - match self.upgrade() { - Some(mb) => mb.signal_stop().await, - None => Err(SendError::ActorNotRunning(())), - } - } - .boxed() - } -} diff --git a/src/actor/mailbox/bounded.rs b/src/actor/mailbox/bounded.rs new file mode 100644 index 0000000..699b98a --- /dev/null +++ b/src/actor/mailbox/bounded.rs @@ -0,0 +1,209 @@ +use std::fmt; + +use futures::{future::BoxFuture, FutureExt}; +use tokio::sync::mpsc; + +use crate::{ + actor::ActorID, + error::{ActorStopReason, SendError}, + Actor, +}; + +use super::{Mailbox, MailboxReceiver, Signal, SignalMailbox, WeakMailbox}; + +/// An unbounded mailbox, where the sending messages to a full mailbox causes backpressure. +pub struct BoundedMailbox(mpsc::Sender>); + +impl BoundedMailbox { + /// Creates a new bounded mailbox with a given capacity. + #[inline] + pub fn new(capacity: usize) -> (Self, BoundedMailboxReceiver) { + let (tx, rx) = mpsc::channel(capacity); + (BoundedMailbox(tx), BoundedMailboxReceiver(rx)) + } +} + +impl Mailbox for BoundedMailbox { + type Receiver = BoundedMailboxReceiver; + type WeakMailbox = WeakBoundedMailbox; + + #[inline] + fn default_mailbox() -> (Self, Self::Receiver) { + BoundedMailbox::new(1000) + } + + #[inline] + async fn send(&self, signal: Signal) -> Result<(), SendError>> { + Ok(self.0.send(signal).await?) + } + + #[inline] + async fn closed(&self) { + self.0.closed().await + } + + #[inline] + fn is_closed(&self) -> bool { + self.0.is_closed() + } + + #[inline] + fn downgrade(&self) -> Self::WeakMailbox { + WeakBoundedMailbox(self.0.downgrade()) + } + + #[inline] + fn strong_count(&self) -> usize { + self.0.strong_count() + } + + #[inline] + fn weak_count(&self) -> usize { + self.0.weak_count() + } +} + +impl Clone for BoundedMailbox { + fn clone(&self) -> Self { + BoundedMailbox(self.0.clone()) + } +} + +impl fmt::Debug for BoundedMailbox { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("BoundedMailbox") + .field("tx", &self.0) + .finish() + } +} + +/// A bounded mailbox receiver. +pub struct BoundedMailboxReceiver(mpsc::Receiver>); + +impl fmt::Debug for BoundedMailboxReceiver { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("BoundedMailboxReceiver") + .field("rx", &self.0) + .finish() + } +} + +impl MailboxReceiver for BoundedMailboxReceiver { + async fn recv(&mut self) -> Option> { + self.0.recv().await + } +} + +/// A weak bounded mailbox that does not prevent the actor from being stopped. +pub struct WeakBoundedMailbox(mpsc::WeakSender>); + +impl WeakMailbox for WeakBoundedMailbox { + type StrongMailbox = BoundedMailbox; + + #[inline] + fn upgrade(&self) -> Option { + self.0.upgrade().map(BoundedMailbox) + } + + #[inline] + fn strong_count(&self) -> usize { + self.0.strong_count() + } + + #[inline] + fn weak_count(&self) -> usize { + self.0.weak_count() + } +} + +impl Clone for WeakBoundedMailbox { + fn clone(&self) -> Self { + WeakBoundedMailbox(self.0.clone()) + } +} + +impl fmt::Debug for WeakBoundedMailbox { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("WeakBoundedMailbox") + .field("tx", &self.0) + .finish() + } +} + +impl SignalMailbox for BoundedMailbox +where + A: Actor, +{ + fn signal_startup_finished(&self) -> BoxFuture<'_, Result<(), SendError>> { + async move { + self.0 + .send(Signal::StartupFinished) + .await + .map_err(|_| SendError::ActorNotRunning(())) + } + .boxed() + } + + fn signal_link_died( + &self, + id: ActorID, + reason: ActorStopReason, + ) -> BoxFuture<'_, Result<(), SendError>> { + async move { + self.0 + .send(Signal::LinkDied { id, reason }) + .await + .map_err(|_| SendError::ActorNotRunning(())) + } + .boxed() + } + + fn signal_stop(&self) -> BoxFuture<'_, Result<(), SendError>> { + async move { + self.0 + .send(Signal::Stop) + .await + .map_err(|_| SendError::ActorNotRunning(())) + } + .boxed() + } +} + +impl SignalMailbox for WeakBoundedMailbox +where + A: Actor, +{ + fn signal_startup_finished(&self) -> BoxFuture<'_, Result<(), SendError>> { + async move { + match self.upgrade() { + Some(mb) => mb.signal_startup_finished().await, + None => Err(SendError::ActorNotRunning(())), + } + } + .boxed() + } + + fn signal_link_died( + &self, + id: ActorID, + reason: ActorStopReason, + ) -> BoxFuture<'_, Result<(), SendError>> { + async move { + match self.upgrade() { + Some(mb) => mb.signal_link_died(id, reason).await, + None => Err(SendError::ActorNotRunning(())), + } + } + .boxed() + } + + fn signal_stop(&self) -> BoxFuture<'_, Result<(), SendError>> { + async move { + match self.upgrade() { + Some(mb) => mb.signal_stop().await, + None => Err(SendError::ActorNotRunning(())), + } + } + .boxed() + } +} diff --git a/src/actor/mailbox/unbounded.rs b/src/actor/mailbox/unbounded.rs new file mode 100644 index 0000000..63ac899 --- /dev/null +++ b/src/actor/mailbox/unbounded.rs @@ -0,0 +1,198 @@ +use std::fmt; + +use futures::{future::BoxFuture, FutureExt}; +use tokio::sync::mpsc; + +use crate::{ + actor::ActorID, + error::{ActorStopReason, SendError}, + Actor, +}; + +use super::{Mailbox, MailboxReceiver, Signal, SignalMailbox, WeakMailbox}; + +/// An unbounded mailbox, where the number of messages queued can grow infinitely. +pub struct UnboundedMailbox(mpsc::UnboundedSender>); + +impl UnboundedMailbox { + /// Creates a new unbounded mailbox. + #[inline] + pub fn new() -> (Self, UnboundedMailboxReceiver) { + let (tx, rx) = mpsc::unbounded_channel(); + (UnboundedMailbox(tx), UnboundedMailboxReceiver(rx)) + } +} + +impl Mailbox for UnboundedMailbox { + type Receiver = UnboundedMailboxReceiver; + type WeakMailbox = WeakUnboundedMailbox; + + #[inline] + fn default_mailbox() -> (Self, Self::Receiver) { + UnboundedMailbox::new() + } + + #[inline] + async fn send(&self, signal: Signal) -> Result<(), SendError>> { + Ok(self.0.send(signal)?) + } + + #[inline] + async fn closed(&self) { + self.0.closed().await + } + + #[inline] + fn is_closed(&self) -> bool { + self.0.is_closed() + } + + #[inline] + fn downgrade(&self) -> Self::WeakMailbox { + WeakUnboundedMailbox(self.0.downgrade()) + } + + #[inline] + fn strong_count(&self) -> usize { + self.0.strong_count() + } + + #[inline] + fn weak_count(&self) -> usize { + self.0.weak_count() + } +} + +impl Clone for UnboundedMailbox { + fn clone(&self) -> Self { + UnboundedMailbox(self.0.clone()) + } +} + +impl fmt::Debug for UnboundedMailbox { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("UnboundedMailbox") + .field("tx", &self.0) + .finish() + } +} + +/// An unbounded mailbox receiver. +pub struct UnboundedMailboxReceiver(mpsc::UnboundedReceiver>); + +impl MailboxReceiver for UnboundedMailboxReceiver { + async fn recv(&mut self) -> Option> { + self.0.recv().await + } +} + +/// A weak unbounded mailbox that does not prevent the actor from being stopped. +pub struct WeakUnboundedMailbox(mpsc::WeakUnboundedSender>); + +impl WeakMailbox for WeakUnboundedMailbox { + type StrongMailbox = UnboundedMailbox; + + #[inline] + fn upgrade(&self) -> Option { + self.0.upgrade().map(UnboundedMailbox) + } + + #[inline] + fn strong_count(&self) -> usize { + self.0.strong_count() + } + + #[inline] + fn weak_count(&self) -> usize { + self.0.weak_count() + } +} + +impl Clone for WeakUnboundedMailbox { + fn clone(&self) -> Self { + WeakUnboundedMailbox(self.0.clone()) + } +} + +impl fmt::Debug for WeakUnboundedMailbox { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("WeakUnboundedMailbox") + .field("tx", &self.0) + .finish() + } +} + +impl SignalMailbox for UnboundedMailbox +where + A: Actor, +{ + fn signal_startup_finished(&self) -> BoxFuture<'_, Result<(), SendError>> { + async move { + self.0 + .send(Signal::StartupFinished) + .map_err(|_| SendError::ActorNotRunning(())) + } + .boxed() + } + + fn signal_link_died( + &self, + id: ActorID, + reason: ActorStopReason, + ) -> BoxFuture<'_, Result<(), SendError>> { + async move { + self.0 + .send(Signal::LinkDied { id, reason }) + .map_err(|_| SendError::ActorNotRunning(())) + } + .boxed() + } + + fn signal_stop(&self) -> BoxFuture<'_, Result<(), SendError>> { + async move { + self.0 + .send(Signal::Stop) + .map_err(|_| SendError::ActorNotRunning(())) + } + .boxed() + } +} + +impl SignalMailbox for WeakUnboundedMailbox +where + A: Actor, +{ + fn signal_startup_finished(&self) -> BoxFuture<'_, Result<(), SendError>> { + async move { + match self.upgrade() { + Some(mb) => mb.signal_startup_finished().await, + None => Err(SendError::ActorNotRunning(())), + } + } + .boxed() + } + + fn signal_link_died( + &self, + id: ActorID, + reason: ActorStopReason, + ) -> BoxFuture<'_, Result<(), SendError>> { + async move { + match self.upgrade() { + Some(mb) => mb.signal_link_died(id, reason).await, + None => Err(SendError::ActorNotRunning(())), + } + } + .boxed() + } + + fn signal_stop(&self) -> BoxFuture<'_, Result<(), SendError>> { + async move { + match self.upgrade() { + Some(mb) => mb.signal_stop().await, + None => Err(SendError::ActorNotRunning(())), + } + } + .boxed() + } +} diff --git a/src/actor/pool.rs b/src/actor/pool.rs index d8a6acf..621590e 100644 --- a/src/actor/pool.rs +++ b/src/actor/pool.rs @@ -9,12 +9,16 @@ use itertools::repeat_n; use crate::{ actor::{Actor, ActorRef}, error::{ActorStopReason, BoxError, SendError}, + mailbox::bounded::BoundedMailbox, message::{BoxDebug, Context, Message}, reply::Reply, - request::Request, + request::{ + AskRequest, ForwardMessageSend, LocalAskRequest, LocalTellRequest, MessageSend, + TellRequest, WithoutRequestTimeout, + }, }; -use super::{ActorID, BoundedMailbox, WeakActorRef}; +use super::{ActorID, WeakActorRef}; enum Factory { Sync(Box ActorRef + Send + Sync + 'static>), @@ -235,7 +239,10 @@ where Mb: Send + 'static, R: Reply, ::Error: fmt::Debug, - ActorRef: Request, + AskRequest, Mb, M, WithoutRequestTimeout, WithoutRequestTimeout>: + ForwardMessageSend, + TellRequest, Mb, M, WithoutRequestTimeout>: + MessageSend::Error>>, { type Reply = WorkerReply; @@ -249,7 +256,7 @@ where let worker = self.next_worker().1.clone(); match reply_sender { Some(tx) => { - if let Err(err) = Request::ask_forwarded(&worker, msg, tx).await { + if let Err(err) = worker.ask(msg).forward(tx).await { match err { SendError::ActorNotRunning((m, tx)) => { msg = m; @@ -263,7 +270,7 @@ where return WorkerReply::Forwarded; } None => { - if let Err(err) = Request::tell(&worker, msg, None, false).await { + if let Err(err) = worker.tell(msg).send().await { match err { SendError::ActorNotRunning(m) => { msg = m; @@ -292,7 +299,8 @@ where A: Actor + Message, M: Clone + Send + 'static, ::Error: fmt::Debug, - ActorRef: Request, + TellRequest, A::Mailbox, M, WithoutRequestTimeout>: + MessageSend::Error>>, { type Reply = Vec::Error>>>; @@ -305,7 +313,7 @@ where self.workers .iter() .zip(repeat_n(msg, self.workers.len())) // Avoids unnecessary clone of msg on last iteration - .map(|(worker, msg)| Request::tell(worker, msg, None, false)), + .map(|(worker, msg)| worker.tell(msg).send()), ) .await } diff --git a/src/actor/pubsub.rs b/src/actor/pubsub.rs index b582cdc..48ea1ed 100644 --- a/src/actor/pubsub.rs +++ b/src/actor/pubsub.rs @@ -4,12 +4,13 @@ use futures::future::{join_all, BoxFuture}; use crate::{ error::SendError, + mailbox::bounded::BoundedMailbox, message::{Context, Message}, - request::Request, - Actor, + request::{LocalTellRequest, MessageSend, TellRequest, WithoutRequestTimeout}, + Actor, Reply, }; -use super::{ActorID, ActorRef, BoundedMailbox}; +use super::{ActorID, ActorRef}; /// A mpsc-like pubsub actor. #[allow(missing_debug_implementations)] @@ -54,7 +55,8 @@ impl PubSub { where A: Actor + Message, M: Send + 'static, - ActorRef: Request, + TellRequest, A::Mailbox, M, WithoutRequestTimeout>: + MessageSend::Error>>, { self.subscribers.insert(actor_ref.id(), Box::new(actor_ref)); } @@ -97,7 +99,8 @@ impl Message> for PubSub where A: Actor + Message, M: Send + 'static, - ActorRef: Request, + TellRequest, A::Mailbox, M, WithoutRequestTimeout>: + MessageSend::Error>>, { type Reply = (); @@ -119,11 +122,13 @@ where A: Actor + Message, M: Send + 'static, Mb: Sync, - ActorRef: Request, + TellRequest, Mb, M, WithoutRequestTimeout>: + MessageSend::Error>>, { fn tell(&self, msg: M) -> BoxFuture<'_, Result<(), SendError>> { Box::pin(async move { - Request::tell(self, msg, None, false) + self.tell(msg) + .send() .await .map_err(|err| err.map_err(|_| ())) }) diff --git a/src/actor/spawn.rs b/src/actor/spawn.rs index a819424..1bf9bb9 100644 --- a/src/actor/spawn.rs +++ b/src/actor/spawn.rs @@ -10,12 +10,13 @@ use tracing::{error, trace}; use crate::{ actor::{ kind::{ActorBehaviour, ActorState}, - Actor, ActorRef, Links, Signal, CURRENT_ACTOR_ID, + Actor, ActorRef, Links, CURRENT_ACTOR_ID, }, error::{ActorStopReason, PanicError}, + mailbox::{Mailbox, MailboxReceiver, Signal}, }; -use super::{ActorID, MailboxReceiver}; +use super::ActorID; /// Spawns an actor in a tokio task. /// @@ -185,7 +186,7 @@ where async fn run_actor_lifecycle( actor_ref: ActorRef, mut actor: A, - mailbox_rx: MailboxReceiver, + mailbox_rx: >::Receiver, abort_registration: AbortRegistration, links: Links, ) where @@ -252,7 +253,7 @@ async fn run_actor_lifecycle( async fn abortable_actor_loop( state: &mut S, - mut mailbox_rx: MailboxReceiver, + mut mailbox_rx: >::Receiver, ) -> ActorStopReason where A: Actor, @@ -268,7 +269,7 @@ where async fn recv_mailbox_loop( state: &mut S, - mailbox_rx: &mut MailboxReceiver, + mailbox_rx: &mut >::Receiver, ) -> ActorStopReason where A: Actor, diff --git a/src/error.rs b/src/error.rs index ea805f4..9a7c0e5 100644 --- a/src/error.rs +++ b/src/error.rs @@ -21,11 +21,7 @@ use tokio::{ time::error::Elapsed, }; -use crate::{ - actor::{ActorID, Signal}, - message::BoxDebug, - Actor, -}; +use crate::{actor::ActorID, mailbox::Signal, message::BoxDebug, Actor}; /// A dyn boxed error. pub type BoxError = Box; diff --git a/src/lib.rs b/src/lib.rs index 1939a53..048e74e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -7,6 +7,7 @@ pub mod actor; pub mod error; +pub mod mailbox; pub mod message; pub mod remote; pub mod reply; diff --git a/src/mailbox.rs b/src/mailbox.rs new file mode 100644 index 0000000..edf599f --- /dev/null +++ b/src/mailbox.rs @@ -0,0 +1,104 @@ +//! Behaviour for actor mailboxes. +//! +//! An actor mailbox is a channel which stores pending messages and signals for an actor to process sequentially. + +pub mod bounded; +pub mod unbounded; + +use dyn_clone::DynClone; +use futures::{future::BoxFuture, Future}; +use tokio::sync::oneshot; + +use crate::{ + actor::{ActorID, ActorRef}, + error::{ActorStopReason, BoxSendError, SendError}, + message::{BoxReply, DynMessage}, + Actor, +}; + +/// A trait defining the behaviour and functionality of a mailbox. +pub trait Mailbox: SignalMailbox + Clone + Send + Sync { + /// Mailbox receiver type. + type Receiver: MailboxReceiver; + /// Mailbox weak type. + type WeakMailbox: WeakMailbox; + + /// Creates a default mailbox and receiver. + fn default_mailbox() -> (Self, Self::Receiver); + /// Sends a signal to the mailbox. + fn send( + &self, + signal: Signal, + ) -> impl Future, E>>> + Send + '_; + /// Waits for the mailbox to be closed. + fn closed(&self) -> impl Future + '_; + /// Checks if the mailbox is closed. + fn is_closed(&self) -> bool; + /// Downgrades the mailbox to a weak mailbox. + fn downgrade(&self) -> Self::WeakMailbox; + /// Returns the number of strong mailboxes. + fn strong_count(&self) -> usize; + /// Returns the number of weak mailboxes. + fn weak_count(&self) -> usize; +} + +/// A mailbox receiver. +pub trait MailboxReceiver: Send + 'static { + /// Receives a value from the mailbox. + fn recv(&mut self) -> impl Future>> + Send + '_; +} + +/// A weak mailbox which can be upraded. +pub trait WeakMailbox: SignalMailbox + Clone + Send + Sync { + /// Strong mailbox type. + type StrongMailbox; + + /// Upgrades the mailbox to a strong mailbox, or returns None if all strong mailboxes have been dropped. + fn upgrade(&self) -> Option; + /// Returns the number of strong mailboxes. + fn strong_count(&self) -> usize; + /// Returns the number of weak mailboxes. + fn weak_count(&self) -> usize; +} + +#[allow(missing_debug_implementations)] +#[doc(hidden)] +pub enum Signal { + StartupFinished, + Message { + message: Box>, + actor_ref: ActorRef, + reply: Option>>, + sent_within_actor: bool, + }, + LinkDied { + id: ActorID, + reason: ActorStopReason, + }, + Stop, +} + +impl Signal { + pub(crate) fn downcast_message(self) -> Option + where + M: 'static, + { + match self { + Signal::Message { message, .. } => message.as_any().downcast().ok().map(|v| *v), + _ => None, + } + } +} + +#[doc(hidden)] +pub trait SignalMailbox: DynClone + Send { + fn signal_startup_finished(&self) -> BoxFuture<'_, Result<(), SendError>>; + fn signal_link_died( + &self, + id: ActorID, + reason: ActorStopReason, + ) -> BoxFuture<'_, Result<(), SendError>>; + fn signal_stop(&self) -> BoxFuture<'_, Result<(), SendError>>; +} + +dyn_clone::clone_trait_object!(SignalMailbox); diff --git a/src/mailbox/bounded.rs b/src/mailbox/bounded.rs new file mode 100644 index 0000000..bf8ce20 --- /dev/null +++ b/src/mailbox/bounded.rs @@ -0,0 +1,211 @@ +//! Bounded mailbox types based on tokio mpsc bounded channels. + +use std::fmt; + +use futures::{future::BoxFuture, FutureExt}; +use tokio::sync::mpsc; + +use crate::{ + actor::ActorID, + error::{ActorStopReason, SendError}, + Actor, +}; + +use super::{Mailbox, MailboxReceiver, Signal, SignalMailbox, WeakMailbox}; + +/// An unbounded mailbox, where the sending messages to a full mailbox causes backpressure. +pub struct BoundedMailbox(pub(crate) mpsc::Sender>); + +impl BoundedMailbox { + /// Creates a new bounded mailbox with a given capacity. + #[inline] + pub fn new(capacity: usize) -> (Self, BoundedMailboxReceiver) { + let (tx, rx) = mpsc::channel(capacity); + (BoundedMailbox(tx), BoundedMailboxReceiver(rx)) + } +} + +impl Mailbox for BoundedMailbox { + type Receiver = BoundedMailboxReceiver; + type WeakMailbox = WeakBoundedMailbox; + + #[inline] + fn default_mailbox() -> (Self, Self::Receiver) { + BoundedMailbox::new(1000) + } + + #[inline] + async fn send(&self, signal: Signal) -> Result<(), SendError, E>> { + Ok(self.0.send(signal).await?) + } + + #[inline] + async fn closed(&self) { + self.0.closed().await + } + + #[inline] + fn is_closed(&self) -> bool { + self.0.is_closed() + } + + #[inline] + fn downgrade(&self) -> Self::WeakMailbox { + WeakBoundedMailbox(self.0.downgrade()) + } + + #[inline] + fn strong_count(&self) -> usize { + self.0.strong_count() + } + + #[inline] + fn weak_count(&self) -> usize { + self.0.weak_count() + } +} + +impl Clone for BoundedMailbox { + fn clone(&self) -> Self { + BoundedMailbox(self.0.clone()) + } +} + +impl fmt::Debug for BoundedMailbox { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("BoundedMailbox") + .field("tx", &self.0) + .finish() + } +} + +/// A bounded mailbox receiver. +pub struct BoundedMailboxReceiver(mpsc::Receiver>); + +impl MailboxReceiver for BoundedMailboxReceiver { + async fn recv(&mut self) -> Option> { + self.0.recv().await + } +} + +impl fmt::Debug for BoundedMailboxReceiver { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("BoundedMailboxReceiver") + .field("rx", &self.0) + .finish() + } +} + +/// A weak bounded mailbox that does not prevent the actor from being stopped. +pub struct WeakBoundedMailbox(mpsc::WeakSender>); + +impl WeakMailbox for WeakBoundedMailbox { + type StrongMailbox = BoundedMailbox; + + #[inline] + fn upgrade(&self) -> Option { + self.0.upgrade().map(BoundedMailbox) + } + + #[inline] + fn strong_count(&self) -> usize { + self.0.strong_count() + } + + #[inline] + fn weak_count(&self) -> usize { + self.0.weak_count() + } +} + +impl Clone for WeakBoundedMailbox { + fn clone(&self) -> Self { + WeakBoundedMailbox(self.0.clone()) + } +} + +impl fmt::Debug for WeakBoundedMailbox { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("WeakBoundedMailbox") + .field("tx", &self.0) + .finish() + } +} + +impl SignalMailbox for BoundedMailbox +where + A: Actor, +{ + fn signal_startup_finished(&self) -> BoxFuture<'_, Result<(), SendError>> { + async move { + self.0 + .send(Signal::StartupFinished) + .await + .map_err(|_| SendError::ActorNotRunning(())) + } + .boxed() + } + + fn signal_link_died( + &self, + id: ActorID, + reason: ActorStopReason, + ) -> BoxFuture<'_, Result<(), SendError>> { + async move { + self.0 + .send(Signal::LinkDied { id, reason }) + .await + .map_err(|_| SendError::ActorNotRunning(())) + } + .boxed() + } + + fn signal_stop(&self) -> BoxFuture<'_, Result<(), SendError>> { + async move { + self.0 + .send(Signal::Stop) + .await + .map_err(|_| SendError::ActorNotRunning(())) + } + .boxed() + } +} + +impl SignalMailbox for WeakBoundedMailbox +where + A: Actor, +{ + fn signal_startup_finished(&self) -> BoxFuture<'_, Result<(), SendError>> { + async move { + match self.upgrade() { + Some(mb) => mb.signal_startup_finished().await, + None => Err(SendError::ActorNotRunning(())), + } + } + .boxed() + } + + fn signal_link_died( + &self, + id: ActorID, + reason: ActorStopReason, + ) -> BoxFuture<'_, Result<(), SendError>> { + async move { + match self.upgrade() { + Some(mb) => mb.signal_link_died(id, reason).await, + None => Err(SendError::ActorNotRunning(())), + } + } + .boxed() + } + + fn signal_stop(&self) -> BoxFuture<'_, Result<(), SendError>> { + async move { + match self.upgrade() { + Some(mb) => mb.signal_stop().await, + None => Err(SendError::ActorNotRunning(())), + } + } + .boxed() + } +} diff --git a/src/mailbox/unbounded.rs b/src/mailbox/unbounded.rs new file mode 100644 index 0000000..b8e463a --- /dev/null +++ b/src/mailbox/unbounded.rs @@ -0,0 +1,208 @@ +//! Unbounded mailbox types based on tokio mpsc unbounded channels. + +use std::fmt; + +use futures::{future::BoxFuture, FutureExt}; +use tokio::sync::mpsc; + +use crate::{ + actor::ActorID, + error::{ActorStopReason, SendError}, + Actor, +}; + +use super::{Mailbox, MailboxReceiver, Signal, SignalMailbox, WeakMailbox}; + +/// An unbounded mailbox, where the number of messages queued can grow infinitely. +pub struct UnboundedMailbox(pub(crate) mpsc::UnboundedSender>); + +impl UnboundedMailbox { + /// Creates a new unbounded mailbox. + #[inline] + pub fn new() -> (Self, UnboundedMailboxReceiver) { + let (tx, rx) = mpsc::unbounded_channel(); + (UnboundedMailbox(tx), UnboundedMailboxReceiver(rx)) + } +} + +impl Mailbox for UnboundedMailbox { + type Receiver = UnboundedMailboxReceiver; + type WeakMailbox = WeakUnboundedMailbox; + + #[inline] + fn default_mailbox() -> (Self, Self::Receiver) { + UnboundedMailbox::new() + } + + #[inline] + async fn send(&self, signal: Signal) -> Result<(), SendError, E>> { + Ok(self.0.send(signal)?) + } + + #[inline] + async fn closed(&self) { + self.0.closed().await + } + + #[inline] + fn is_closed(&self) -> bool { + self.0.is_closed() + } + + #[inline] + fn downgrade(&self) -> Self::WeakMailbox { + WeakUnboundedMailbox(self.0.downgrade()) + } + + #[inline] + fn strong_count(&self) -> usize { + self.0.strong_count() + } + + #[inline] + fn weak_count(&self) -> usize { + self.0.weak_count() + } +} + +impl Clone for UnboundedMailbox { + fn clone(&self) -> Self { + UnboundedMailbox(self.0.clone()) + } +} + +impl fmt::Debug for UnboundedMailbox { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("UnboundedMailbox") + .field("tx", &self.0) + .finish() + } +} + +/// An unbounded mailbox receiver. +pub struct UnboundedMailboxReceiver(mpsc::UnboundedReceiver>); + +impl MailboxReceiver for UnboundedMailboxReceiver { + async fn recv(&mut self) -> Option> { + self.0.recv().await + } +} + +impl fmt::Debug for UnboundedMailboxReceiver { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("UnboundedMailboxReceiver") + .field("rx", &self.0) + .finish() + } +} + +/// A weak unbounded mailbox that does not prevent the actor from being stopped. +pub struct WeakUnboundedMailbox(mpsc::WeakUnboundedSender>); + +impl WeakMailbox for WeakUnboundedMailbox { + type StrongMailbox = UnboundedMailbox; + + #[inline] + fn upgrade(&self) -> Option { + self.0.upgrade().map(UnboundedMailbox) + } + + #[inline] + fn strong_count(&self) -> usize { + self.0.strong_count() + } + + #[inline] + fn weak_count(&self) -> usize { + self.0.weak_count() + } +} + +impl Clone for WeakUnboundedMailbox { + fn clone(&self) -> Self { + WeakUnboundedMailbox(self.0.clone()) + } +} + +impl fmt::Debug for WeakUnboundedMailbox { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("WeakUnboundedMailbox") + .field("tx", &self.0) + .finish() + } +} + +impl SignalMailbox for UnboundedMailbox +where + A: Actor, +{ + fn signal_startup_finished(&self) -> BoxFuture<'_, Result<(), SendError>> { + async move { + self.0 + .send(Signal::StartupFinished) + .map_err(|_| SendError::ActorNotRunning(())) + } + .boxed() + } + + fn signal_link_died( + &self, + id: ActorID, + reason: ActorStopReason, + ) -> BoxFuture<'_, Result<(), SendError>> { + async move { + self.0 + .send(Signal::LinkDied { id, reason }) + .map_err(|_| SendError::ActorNotRunning(())) + } + .boxed() + } + + fn signal_stop(&self) -> BoxFuture<'_, Result<(), SendError>> { + async move { + self.0 + .send(Signal::Stop) + .map_err(|_| SendError::ActorNotRunning(())) + } + .boxed() + } +} + +impl SignalMailbox for WeakUnboundedMailbox +where + A: Actor, +{ + fn signal_startup_finished(&self) -> BoxFuture<'_, Result<(), SendError>> { + async move { + match self.upgrade() { + Some(mb) => mb.signal_startup_finished().await, + None => Err(SendError::ActorNotRunning(())), + } + } + .boxed() + } + + fn signal_link_died( + &self, + id: ActorID, + reason: ActorStopReason, + ) -> BoxFuture<'_, Result<(), SendError>> { + async move { + match self.upgrade() { + Some(mb) => mb.signal_link_died(id, reason).await, + None => Err(SendError::ActorNotRunning(())), + } + } + .boxed() + } + + fn signal_stop(&self) -> BoxFuture<'_, Result<(), SendError>> { + async move { + match self.upgrade() { + Some(mb) => mb.signal_stop().await, + None => Err(SendError::ActorNotRunning(())), + } + } + .boxed() + } +} diff --git a/src/message.rs b/src/message.rs index 1eda153..3238d4d 100644 --- a/src/message.rs +++ b/src/message.rs @@ -23,7 +23,7 @@ use crate::{ actor::ActorRef, error::{BoxSendError, SendError}, reply::{DelegatedReply, ForwardedReply, Reply, ReplySender}, - request::Request, + request::{AskRequest, LocalAskRequest, MessageSend, WithoutRequestTimeout}, Actor, }; @@ -151,11 +151,17 @@ where R2: Reply>, E: fmt::Debug + Unpin + Send + Sync + 'static, R::Ok: Unpin, - ActorRef: Request, + AskRequest< + LocalAskRequest, + B::Mailbox, + M, + WithoutRequestTimeout, + WithoutRequestTimeout, + >: MessageSend>, { let (delegated_reply, reply_sender) = self.reply_sender(); tokio::spawn(async move { - let reply = Request::ask(&actor_ref, message, None, None, false).await; + let reply = MessageSend::send(actor_ref.ask(message)).await; if let Some(reply_sender) = reply_sender { reply_sender.send(reply); } diff --git a/src/remote.rs b/src/remote.rs index 6c0fa71..05e51a4 100644 --- a/src/remote.rs +++ b/src/remote.rs @@ -7,9 +7,7 @@ use std::{ time::Duration, }; -use _internal::{ - AskRemoteMessageFn, RemoteMessageRegistrationID, TellRemoteMessageFn, REMOTE_MESSAGES, -}; +use _internal::{RemoteMessageFns, RemoteMessageRegistrationID, REMOTE_MESSAGES}; pub use libp2p::PeerId; pub use libp2p_identity::Keypair; use once_cell::sync::Lazy; @@ -26,20 +24,19 @@ pub use swarm::*; pub(crate) static REMOTE_REGISTRY: Lazy>>> = Lazy::new(|| Mutex::new(HashMap::new())); -static REMOTE_MESSAGES_MAP: Lazy< - HashMap, (AskRemoteMessageFn, TellRemoteMessageFn)>, -> = Lazy::new(|| { - let mut existing_ids = HashSet::new(); - for (id, _) in REMOTE_MESSAGES { - if !existing_ids.insert(id) { - panic!( - "duplicate remote message detected for actor '{}' and message '{}'", - id.actor_remote_id, id.message_remote_id - ); +static REMOTE_MESSAGES_MAP: Lazy, RemoteMessageFns>> = + Lazy::new(|| { + let mut existing_ids = HashSet::new(); + for (id, _) in REMOTE_MESSAGES { + if !existing_ids.insert(id) { + panic!( + "duplicate remote message detected for actor '{}' and message '{}'", + id.actor_remote_id, id.message_remote_id + ); + } } - } - REMOTE_MESSAGES.iter().copied().collect() -}); + REMOTE_MESSAGES.iter().copied().collect() + }); /// A trait for identifying actors remotely. /// @@ -68,7 +65,7 @@ pub(crate) async fn ask( reply_timeout: Option, immediate: bool, ) -> Result, RemoteSendError>> { - let Some((handler, _)) = REMOTE_MESSAGES_MAP.get(&RemoteMessageRegistrationID { + let Some(fns) = REMOTE_MESSAGES_MAP.get(&RemoteMessageRegistrationID { actor_remote_id: &actor_remote_id, message_remote_id: &message_remote_id, }) else { @@ -77,7 +74,11 @@ pub(crate) async fn ask( message_remote_id, }); }; - handler(actor_id, payload, mailbox_timeout, reply_timeout, immediate).await + if immediate { + (fns.try_ask)(actor_id, payload, reply_timeout).await + } else { + (fns.ask)(actor_id, payload, mailbox_timeout, reply_timeout).await + } } pub(crate) async fn tell( @@ -88,7 +89,7 @@ pub(crate) async fn tell( mailbox_timeout: Option, immediate: bool, ) -> Result<(), RemoteSendError>> { - let Some((_, handler)) = REMOTE_MESSAGES_MAP.get(&RemoteMessageRegistrationID { + let Some(fns) = REMOTE_MESSAGES_MAP.get(&RemoteMessageRegistrationID { actor_remote_id: &actor_remote_id, message_remote_id: &message_remote_id, }) else { @@ -97,5 +98,9 @@ pub(crate) async fn tell( message_remote_id, }); }; - handler(actor_id, payload, mailbox_timeout, immediate).await + if immediate { + (fns.try_tell)(actor_id, payload).await + } else { + (fns.tell)(actor_id, payload, mailbox_timeout).await + } } diff --git a/src/remote/_internal.rs b/src/remote/_internal.rs index b46a868..892e3c6 100644 --- a/src/remote/_internal.rs +++ b/src/remote/_internal.rs @@ -6,60 +6,95 @@ use serde::de::DeserializeOwned; use serde::Serialize; use crate::actor::{ActorID, ActorRef}; -use crate::error::RemoteSendError; +use crate::error::{RemoteSendError, SendError}; use crate::message::Message; +use crate::request::{ + AskRequest, LocalAskRequest, LocalTellRequest, MaybeRequestTimeout, MessageSend, TellRequest, + TryMessageSend, +}; use crate::{Actor, Reply}; use super::REMOTE_REGISTRY; #[linkme::distributed_slice] -pub static REMOTE_MESSAGES: [( - RemoteMessageRegistrationID<'static>, - (AskRemoteMessageFn, TellRemoteMessageFn), -)]; +pub static REMOTE_MESSAGES: [(RemoteMessageRegistrationID<'static>, RemoteMessageFns)]; -pub type AskRemoteMessageFn = fn( +#[derive(Clone, Copy, Debug)] +pub struct RemoteMessageFns { + pub ask: RemoteAskFn, + pub try_ask: RemoteTryAskFn, + pub tell: RemoteTellFn, + pub try_tell: RemoteTryTellFn, +} + +pub type RemoteAskFn = fn( actor_id: ActorID, msg: Vec, mailbox_timeout: Option, reply_timeout: Option, - immediate: bool, ) -> BoxFuture<'static, Result, RemoteSendError>>>; -pub type TellRemoteMessageFn = fn( +pub type RemoteTryAskFn = fn( + actor_id: ActorID, + msg: Vec, + reply_timeout: Option, +) -> BoxFuture<'static, Result, RemoteSendError>>>; + +pub type RemoteTellFn = fn( actor_id: ActorID, msg: Vec, mailbox_timeout: Option, - immediate: bool, ) -> BoxFuture<'static, Result<(), RemoteSendError>>>; +pub type RemoteTryTellFn = + fn(actor_id: ActorID, msg: Vec) -> BoxFuture<'static, Result<(), RemoteSendError>>>; + #[derive(Clone, Copy, Debug, Hash, PartialEq, Eq)] pub struct RemoteMessageRegistrationID<'a> { pub actor_remote_id: &'a str, pub message_remote_id: &'a str, } -pub async fn ask_remote_message( +pub async fn ask( actor_id: ActorID, msg: Vec, mailbox_timeout: Option, reply_timeout: Option, - immediate: bool, ) -> Result, RemoteSendError>> where A: Actor + Message, - M: DeserializeOwned, - ActorRef: crate::request::Request, + M: DeserializeOwned + Send + 'static, ::Ok: Serialize, ::Error: Serialize, + AskRequest< + LocalAskRequest, + A::Mailbox, + M, + MaybeRequestTimeout, + MaybeRequestTimeout, + >: MessageSend::Ok, Error = SendError::Error>>, { - let res = - ask_remote_message_inner::(actor_id, msg, mailbox_timeout, reply_timeout, immediate) - .await; + let actor_ref = { + let remote_actors = REMOTE_REGISTRY.lock().await; + remote_actors + .get(&actor_id) + .ok_or(RemoteSendError::ActorNotRunning)? + .downcast_ref::>() + .ok_or(RemoteSendError::BadActorType)? + .clone() + }; + let msg: M = rmp_serde::decode::from_slice(&msg) + .map_err(|err| RemoteSendError::DeserializeMessage(err.to_string()))?; + + let res = actor_ref + .ask(msg) + .into_maybe_timeouts(mailbox_timeout.into(), reply_timeout.into()) + .send() + .await; match res { Ok(reply) => Ok(rmp_serde::to_vec_named(&reply) .map_err(|err| RemoteSendError::SerializeReply(err.to_string()))?), - Err(err) => Err(err + Err(err) => Err(RemoteSendError::from(err) .map_err(|err| match rmp_serde::to_vec_named(&err) { Ok(payload) => RemoteSendError::HandlerError(payload), Err(err) => RemoteSendError::SerializeHandlerError(err.to_string()), @@ -68,17 +103,26 @@ where } } -async fn ask_remote_message_inner( +pub async fn try_ask( actor_id: ActorID, msg: Vec, - mailbox_timeout: Option, reply_timeout: Option, - immediate: bool, -) -> Result<::Ok, RemoteSendError<::Error>> +) -> Result, RemoteSendError>> where A: Actor + Message, - M: DeserializeOwned, - ActorRef: crate::request::Request, + M: DeserializeOwned + Send + 'static, + ::Ok: Serialize, + ::Error: Serialize, + AskRequest< + LocalAskRequest, + A::Mailbox, + M, + MaybeRequestTimeout, + MaybeRequestTimeout, + >: TryMessageSend< + Ok = ::Ok, + Error = SendError::Error>, + >, { let actor_ref = { let remote_actors = REMOTE_REGISTRY.lock().await; @@ -92,29 +136,55 @@ where let msg: M = rmp_serde::decode::from_slice(&msg) .map_err(|err| RemoteSendError::DeserializeMessage(err.to_string()))?; - let reply = - crate::request::Request::ask(&actor_ref, msg, mailbox_timeout, reply_timeout, immediate) - .await?; - - Ok(reply) + let res = actor_ref + .ask(msg) + .into_maybe_timeouts(None.into(), reply_timeout.into()) + .try_send() + .await; + match res { + Ok(reply) => Ok(rmp_serde::to_vec_named(&reply) + .map_err(|err| RemoteSendError::SerializeReply(err.to_string()))?), + Err(err) => Err(RemoteSendError::from(err) + .map_err(|err| match rmp_serde::to_vec_named(&err) { + Ok(payload) => RemoteSendError::HandlerError(payload), + Err(err) => RemoteSendError::SerializeHandlerError(err.to_string()), + }) + .flatten()), + } } -pub async fn tell_remote_message( +pub async fn tell( actor_id: ActorID, msg: Vec, mailbox_timeout: Option, - immediate: bool, ) -> Result<(), RemoteSendError>> where A: Actor + Message, - M: DeserializeOwned, - ActorRef: crate::request::Request, + M: DeserializeOwned + Send + 'static, ::Error: Serialize, + TellRequest, A::Mailbox, M, MaybeRequestTimeout>: + MessageSend::Error>>, { - let res = tell_remote_message_inner::(actor_id, msg, mailbox_timeout, immediate).await; + let actor_ref = { + let remote_actors = REMOTE_REGISTRY.lock().await; + remote_actors + .get(&actor_id) + .ok_or(RemoteSendError::ActorNotRunning)? + .downcast_ref::>() + .ok_or(RemoteSendError::BadActorType)? + .clone() + }; + let msg: M = rmp_serde::decode::from_slice(&msg) + .map_err(|err| RemoteSendError::DeserializeMessage(err.to_string()))?; + + let res = actor_ref + .tell(msg) + .into_maybe_timeouts(mailbox_timeout.into()) + .send() + .await; match res { Ok(()) => Ok(()), - Err(err) => Err(err + Err(err) => Err(RemoteSendError::from(err) .map_err(|err| match rmp_serde::to_vec_named(&err) { Ok(payload) => RemoteSendError::HandlerError(payload), Err(err) => RemoteSendError::SerializeHandlerError(err.to_string()), @@ -123,16 +193,13 @@ where } } -async fn tell_remote_message_inner( - actor_id: ActorID, - msg: Vec, - mailbox_timeout: Option, - immediate: bool, -) -> Result<(), RemoteSendError<::Error>> +pub async fn try_tell(actor_id: ActorID, msg: Vec) -> Result<(), RemoteSendError>> where A: Actor + Message, - M: DeserializeOwned, - ActorRef: crate::request::Request, + M: DeserializeOwned + Send + 'static, + ::Error: Serialize, + TellRequest, A::Mailbox, M, MaybeRequestTimeout>: + TryMessageSend::Error>>, { let actor_ref = { let remote_actors = REMOTE_REGISTRY.lock().await; @@ -146,7 +213,18 @@ where let msg: M = rmp_serde::decode::from_slice(&msg) .map_err(|err| RemoteSendError::DeserializeMessage(err.to_string()))?; - crate::request::Request::tell(&actor_ref, msg, mailbox_timeout, immediate).await?; - - Ok(()) + let res = actor_ref + .tell(msg) + .into_maybe_timeouts(None.into()) + .try_send() + .await; + match res { + Ok(()) => Ok(()), + Err(err) => Err(RemoteSendError::from(err) + .map_err(|err| match rmp_serde::to_vec_named(&err) { + Ok(payload) => RemoteSendError::HandlerError(payload), + Err(err) => RemoteSendError::SerializeHandlerError(err.to_string()), + }) + .flatten()), + } } diff --git a/src/reply.rs b/src/reply.rs index c621834..5225d82 100644 --- a/src/reply.rs +++ b/src/reply.rs @@ -98,7 +98,7 @@ pub trait Reply: Send + 'static { #[must_use = "the deligated reply should be returned by the handler"] #[derive(Clone, Copy, Debug)] pub struct DelegatedReply { - phantom: PhantomData, + phantom: PhantomData R>, } impl DelegatedReply { diff --git a/src/request.rs b/src/request.rs index 8c8b2d1..6f75b37 100644 --- a/src/request.rs +++ b/src/request.rs @@ -1,175 +1,177 @@ //! Types for sending requests including messages and queries to actors. +//! +//! # Supported Send Methods Overview +//! +//! Below is a table showing which features are supported for different requests. +//! +//! - **Ask (bounded)**: refers to sending a message using [`ask`] on an actor with a [`BoundedMailbox`]. +//! - **Ask (unbounded)**: refers to sending a message using [`ask`] on an actor with an [`UnboundedMailbox`]. +//! - **Tell (bounded)**: refers to sending a message using [`tell`] on an actor with a [`BoundedMailbox`]. +//! - **Tell (unbounded)**: refers to sending a message using [`tell`] on an actor with an [`UnboundedMailbox`]. +//! +//! [`tell`]: method@crate::actor::ActorRef::tell +//! [`ask`]: method@crate::actor::ActorRef::ask +//! [`BoundedMailbox`]: crate::mailbox::bounded::BoundedMailbox +//! [`UnboundedMailbox`]: crate::mailbox::unbounded::UnboundedMailbox +//! +//! **Legend** +//! +//! - Supported: ✅ +//! - With mailbox timeout: 📬 +//! - With reply timeout: ⏳ +//! - Remote: 🌐 +//! - Unsupported: ❌ +//! +//! | Method | Ask (bounded) | Ask (unbounded) | Tell (bounded) | Tell (unbounded) | +//! |-----------------------|---------------|-----------------|----------------|------------------| +//! | [`send`] | ✅ 📬 ⏳ 🌐 | ✅ ⏳ 🌐 | ✅ 📬 🌐 | ✅ 🌐 | +//! | [`send_sync`] | ❌ | ❌ | ❌ | ✅ | +//! | [`try_send`] | ✅ ⏳ 🌐 | ✅ ⏳ 🌐 | ✅ 🌐 | ✅ 🌐 | +//! | [`try_send_sync`] | ❌ | ❌ | ✅ | ✅ | +//! | [`blocking_send`] | ✅ | ✅ | ✅ | ✅ | +//! | [`try_blocking_send`] | ✅ | ✅ | ✅ | ✅ | +//! | [`forward`] | ✅ 📬 | ✅ | ❌ | ❌ | +//! +//! [`send`]: method@MessageSend::send +//! [`send_sync`]: method@MessageSendSync::send_sync +//! [`try_send`]: method@TryMessageSend::try_send +//! [`try_send_sync`]: method@TryMessageSendSync::try_send_sync +//! [`blocking_send`]: method@BlockingMessageSend::blocking_send +//! [`try_blocking_send`]: method@TryBlockingMessageSend::try_blocking_send +//! [`forward`]: method@ForwardMessageSend::forward use std::time::Duration; use futures::Future; -use crate::{ - actor::{ActorRef, BoundedMailbox, UnboundedMailbox}, - error::SendError, - message::Message, - reply::ReplySender, - Actor, Reply, -}; - mod ask; mod tell; pub use ask::{AskRequest, LocalAskRequest, RemoteAskRequest}; pub use tell::{LocalTellRequest, RemoteTellRequest, TellRequest}; -/// A trait for sending messages to an actor. -/// -/// Typically `ActorRef::tell` and `ActorRef::ask` would be used directly, as they provide more functionality such as setting timeouts. -/// Methods on this trait are helpful shorthand methods, which can be used for ActorRef's with any mailbox type (bounded or unbounded). -#[doc(hidden)] -pub trait Request -where - A: Actor + Message, -{ - /// Sends a message to an actor without waiting for any reply. - fn tell( - &self, - msg: M, - mailbox_timeout: Option, - immediate: bool, - ) -> impl Future::Error>>> + Send; - - /// Sends a message to an actor, waiting for a reply. - fn ask( - &self, - msg: M, - mailbox_timeout: Option, - reply_timeout: Option, - immediate: bool, - ) -> impl Future< - Output = Result<::Ok, SendError::Error>>, - > + Send; - - /// Forwards a message to an actor, waiting for a reply to be sent back to `tx`. - fn ask_forwarded( - &self, - msg: M, - tx: ReplySender<::Value>, - ) -> impl Future< - Output = Result< - (), - SendError<(M, ReplySender<::Value>), ::Error>, - >, - > + Send; +use crate::{error::SendError, reply::ReplySender, Reply}; + +/// Trait representing the ability to send a message. +pub trait MessageSend { + /// Success value. + type Ok; + /// Error value. + type Error; + + /// Sends a message. + fn send(self) -> impl Future> + Send; } -impl Request> for ActorRef -where - A: Actor> + Message, - M: Send + 'static, -{ - async fn tell( - &self, - msg: M, - mailbox_timeout: Option, - immediate: bool, - ) -> Result<(), SendError::Error>> { - let req = ActorRef::tell(self, msg); - match (mailbox_timeout, immediate) { - (None, true) => req.try_send(), - (None, false) => req.send().await, - (Some(_), true) => panic!("immediate is not available when a mailbox timeout is set"), - (Some(t), false) => req.timeout(t).send().await, - } - } +/// Trait representing the ability to send a message synchronously. +pub trait MessageSendSync { + /// Success value. + type Ok; + /// Error value. + type Error; - async fn ask( - &self, - msg: M, - mailbox_timeout: Option, - reply_timeout: Option, - immediate: bool, - ) -> Result<::Ok, SendError::Error>> { - let req = ActorRef::ask(self, msg); - match (mailbox_timeout, reply_timeout, immediate) { - (None, None, true) => req.try_send().await, - (None, None, false) => req.send().await, - (None, Some(t), true) => req.reply_timeout(t).try_send().await, - (None, Some(t), false) => req.reply_timeout(t).send().await, - (Some(_), None, true) => { - panic!("immediate is not available when a mailbox timeout is set") - } - (Some(t), None, false) => req.mailbox_timeout(t).send().await, - (Some(_), Some(_), true) => { - panic!("immediate is not available when a mailbox timeout is set") - } - (Some(mt), Some(rt), false) => req.mailbox_timeout(mt).reply_timeout(rt).send().await, - } - } + /// Sends a message synchronously. + fn send_sync(self) -> Result; +} - async fn ask_forwarded( - &self, - msg: M, - tx: ReplySender<::Value>, - ) -> Result< - (), - SendError<(M, ReplySender<::Value>), ::Error>, - > { - ActorRef::ask(self, msg).forward(tx).await - } +/// Trait representing the ability to attempt to send a message without waiting for mailbox capacity. +pub trait TryMessageSend { + /// Success value. + type Ok; + /// Error value. + type Error; + + /// Attempts to sends a message without waiting for mailbox capacity. + /// + /// If the actors mailbox is full, [`SendError::MailboxFull`] error will be returned. + /// + /// [`SendError::MailboxFull`]: crate::error::SendError::MailboxFull + fn try_send(self) -> impl Future> + Send; } -impl Request> for ActorRef -where - A: Actor> + Message, - M: Send + 'static, -{ - async fn tell( - &self, - msg: M, - mailbox_timeout: Option, - immediate: bool, - ) -> Result<(), SendError::Error>> { - let req = ActorRef::tell(self, msg); - match (mailbox_timeout, immediate) { - (None, false) => req.send(), - (_, true) => panic!("immediate is not available with unbounded mailboxes"), - (Some(_), _) => { - panic!("mailbox timeout is not available with unbounded mailboxes") - } - } - } +/// Trait representing the ability to attempt to send a message without waiting for mailbox capacity synchronously. +pub trait TryMessageSendSync { + /// Success value. + type Ok; + /// Error value. + type Error; - async fn ask( - &self, - msg: M, - mailbox_timeout: Option, - reply_timeout: Option, - immediate: bool, - ) -> Result<::Ok, SendError::Error>> { - let req = ActorRef::ask(self, msg); - match (mailbox_timeout, reply_timeout, immediate) { - (None, None, false) => req.send().await, - (None, Some(t), false) => req.reply_timeout(t).send().await, - (_, _, true) => panic!("immediate is not available with unbounded mailboxes"), - (Some(_), _, _) => { - panic!("mailbox timeout is not available with unbounded mailboxes") - } - } - } + /// Attemps to send a message synchronously without waiting for mailbox capacity. + fn try_send_sync(self) -> Result; +} - async fn ask_forwarded( - &self, - msg: M, - tx: ReplySender<::Value>, - ) -> Result< - (), - SendError<(M, ReplySender<::Value>), ::Error>, - > { - ActorRef::ask(self, msg).forward(tx) - } +/// Trait representing the ability to send a message in a blocking context, useful outside an async runtime. +pub trait BlockingMessageSend { + /// Success value. + type Ok; + /// Error value. + type Error; + + /// Sends a message, blocking the current thread. + fn blocking_send(self) -> Result; +} + +/// Trait representing the ability to send a message in a blocking context without waiting for mailbox capacity, +/// useful outside an async runtime. +pub trait TryBlockingMessageSend { + /// Success value. + type Ok; + /// Error value. + type Error; + + /// Attempts to sends a message in a blocking context without waiting for mailbox capacity. + /// + /// If the actors mailbox is full, [`SendError::MailboxFull`] error will be returned. + /// + /// [`SendError::MailboxFull`]: crate::error::SendError::MailboxFull + fn try_blocking_send(self) -> Result; +} + +/// Trait representing the ability to send a message with the reply being sent back to a channel. +pub trait ForwardMessageSend { + /// Sends a message with the reply being sent back to a channel. + fn forward( + self, + tx: ReplySender, + ) -> impl Future), R::Error>>> + Send; } /// A type for requests without any timeout set. -#[allow(missing_debug_implementations)] +#[derive(Clone, Copy, Debug)] pub struct WithoutRequestTimeout; /// A type for timeouts in actor requests. -#[allow(missing_debug_implementations)] +#[derive(Clone, Copy, Debug)] pub struct WithRequestTimeout(Duration); + +/// A type which might contain a request timeout. +/// +/// This type is used internally for remote messaging and will panic if used incorrectly with any MessageSend trait. +#[derive(Clone, Copy, Debug)] +pub enum MaybeRequestTimeout { + /// No timeout set. + NoTimeout, + /// A timeout with a duration. + Timeout(Duration), +} + +impl From> for MaybeRequestTimeout { + fn from(timeout: Option) -> Self { + match timeout { + Some(timeout) => MaybeRequestTimeout::Timeout(timeout), + None => MaybeRequestTimeout::NoTimeout, + } + } +} + +impl From for MaybeRequestTimeout { + fn from(_: WithoutRequestTimeout) -> Self { + MaybeRequestTimeout::NoTimeout + } +} + +impl From for MaybeRequestTimeout { + fn from(WithRequestTimeout(timeout): WithRequestTimeout) -> Self { + MaybeRequestTimeout::Timeout(timeout) + } +} diff --git a/src/request/ask.rs b/src/request/ask.rs index 03882ff..cc84cc5 100644 --- a/src/request/ask.rs +++ b/src/request/ask.rs @@ -4,15 +4,19 @@ use serde::{de::DeserializeOwned, Serialize}; use tokio::{sync::oneshot, time::timeout}; use crate::{ - actor::{ActorRef, BoundedMailbox, RemoteActorRef, Signal, UnboundedMailbox}, + actor::{ActorRef, RemoteActorRef}, error::{BoxSendError, RemoteSendError, SendError}, + mailbox::{bounded::BoundedMailbox, unbounded::UnboundedMailbox, Signal}, message::{BoxReply, Message}, remote::{ActorSwarm, RemoteActor, RemoteMessage, SwarmCommand, SwarmReq, SwarmResp}, reply::ReplySender, Actor, Reply, }; -use super::{WithRequestTimeout, WithoutRequestTimeout}; +use super::{ + BlockingMessageSend, ForwardMessageSend, MaybeRequestTimeout, MessageSend, + TryBlockingMessageSend, TryMessageSend, WithRequestTimeout, WithoutRequestTimeout, +}; /// A request to send a message to an actor, waiting for a reply. #[allow(missing_debug_implementations)] @@ -23,6 +27,27 @@ pub struct AskRequest { phantom: PhantomData<(Mb, M)>, } +/// A request to a local actor. +#[allow(missing_debug_implementations)] +pub struct LocalAskRequest +where + A: Actor, +{ + mailbox: Mb, + signal: Signal, + rx: oneshot::Receiver>, +} + +/// A request to a remote actor. +#[allow(missing_debug_implementations)] +pub struct RemoteAskRequest<'a, A, M> +where + A: Actor, +{ + actor_ref: &'a RemoteActorRef, + msg: &'a M, +} + impl AskRequest< LocalAskRequest, @@ -80,31 +105,7 @@ where } } -impl AskRequest>, BoundedMailbox, M, Tm, Tr> -where - A: Actor>, -{ - /// Sets the timeout for waiting for the actors mailbox to have capacity. - pub fn mailbox_timeout( - self, - duration: Duration, - ) -> AskRequest< - LocalAskRequest>, - BoundedMailbox, - M, - WithRequestTimeout, - Tr, - > { - AskRequest { - location: self.location, - mailbox_timeout: WithRequestTimeout(duration), - reply_timeout: self.reply_timeout, - phantom: PhantomData, - } - } -} - -impl<'a, A, M, Tm, Tr> AskRequest, BoundedMailbox, M, Tm, Tr> +impl AskRequest, M, Tm, Tr> where A: Actor>, { @@ -112,7 +113,7 @@ where pub fn mailbox_timeout( self, duration: Duration, - ) -> AskRequest, BoundedMailbox, M, WithRequestTimeout, Tr> { + ) -> AskRequest, M, WithRequestTimeout, Tr> { AskRequest { location: self.location, mailbox_timeout: WithRequestTimeout(duration), @@ -122,15 +123,9 @@ where } } -impl AskRequest, Mb, M, Tm, Tr> -where - A: Actor, -{ +impl AskRequest { /// Sets the timeout for waiting for a reply from the actor. - pub fn reply_timeout( - self, - duration: Duration, - ) -> AskRequest, Mb, M, Tm, WithRequestTimeout> { + pub fn reply_timeout(self, duration: Duration) -> AskRequest { AskRequest { location: self.location, mailbox_timeout: self.mailbox_timeout, @@ -140,313 +135,520 @@ where } } -impl<'a, A, Mb, M, Tm, Tr> AskRequest, Mb, M, Tm, Tr> -where - A: Actor, -{ - /// Sets the timeout for waiting for a reply from the actor. - pub fn reply_timeout( +impl AskRequest { + pub(crate) fn into_maybe_timeouts( self, - duration: Duration, - ) -> AskRequest, Mb, M, Tm, WithRequestTimeout> { + mailbox_timeout: MaybeRequestTimeout, + reply_timeout: MaybeRequestTimeout, + ) -> AskRequest { AskRequest { location: self.location, - mailbox_timeout: self.mailbox_timeout, - reply_timeout: WithRequestTimeout(duration), + mailbox_timeout, + reply_timeout, phantom: PhantomData, } } } -// Bounded +///////////////////////// +// === MessageSend === // +///////////////////////// +macro_rules! impl_message_send { + (local, $mailbox:ident, $mailbox_timeout:ident, $reply_timeout:ident, |$req:ident| $($body:tt)*) => { + impl MessageSend + for AskRequest< + LocalAskRequest>, + $mailbox, + M, + $mailbox_timeout, + $reply_timeout, + > + where + A: Actor> + Message, + M: Send + 'static, + { + type Ok = ::Ok; + type Error = SendError::Error>; + + async fn send(self) -> Result { + let $req = self; + $($body)* + } + } + }; + (remote, $mailbox:ident, $mailbox_timeout:ident, $reply_timeout:ident, |$req:ident| ($mailbox_timeout_body:expr, $reply_timeout_body:expr)) => { + impl<'a, A, M> MessageSend + for AskRequest< + RemoteAskRequest<'a, A, M>, + $mailbox, + M, + $mailbox_timeout, + $reply_timeout, + > + where + AskRequest< + LocalAskRequest>, + $mailbox, + M, + $mailbox_timeout, + $reply_timeout, + >: MessageSend, + A: Actor> + Message + RemoteActor + RemoteMessage, + M: Serialize + Send + Sync, + ::Ok: DeserializeOwned, + ::Error: DeserializeOwned, + { + type Ok = ::Ok; + type Error = RemoteSendError<::Error>; + + async fn send(self) -> Result { + let $req = self; + remote_ask( + $req.location.actor_ref, + &$req.location.msg, + $mailbox_timeout_body, + $reply_timeout_body, + false + ).await + } + } + }; +} -impl - AskRequest< - LocalAskRequest>, - BoundedMailbox, - M, - WithRequestTimeout, - WithRequestTimeout, - > -where - A: Actor> + Message, - M: 'static, -{ - /// Sends the message with the mailbox and reply timeouts set, waiting for a reply. - pub async fn send( - self, - ) -> Result<::Ok, SendError::Error>> { - self.location - .mailbox - .0 - .send_timeout(self.location.signal, self.mailbox_timeout.0) - .await?; - match timeout(self.reply_timeout.0, self.location.rx).await?? { +impl_message_send!( + local, + BoundedMailbox, + WithoutRequestTimeout, + WithoutRequestTimeout, + |req| { + req.location.mailbox.0.send(req.location.signal).await?; + match req.location.rx.await? { Ok(val) => Ok(*val.downcast().unwrap()), Err(err) => Err(err.downcast()), } } -} - -impl<'a, A, M> - AskRequest< - RemoteAskRequest<'a, A, M>, - BoundedMailbox, - M, - WithRequestTimeout, - WithRequestTimeout, - > -where - A: Actor> + Message + RemoteActor + RemoteMessage, - M: Serialize, - ::Ok: DeserializeOwned, - ::Error: DeserializeOwned, -{ - /// Sends the message with the mailbox and reply timeouts set, waiting for a reply. - pub async fn send( - self, - ) -> Result<::Ok, RemoteSendError<::Error>> { - remote_ask( - self.location.actor_ref, - &self.location.msg, - Some(self.mailbox_timeout.0), - Some(self.reply_timeout.0), - false, - ) - .await +); +impl_message_send!( + local, + BoundedMailbox, + WithoutRequestTimeout, + WithRequestTimeout, + |req| { + req.location.mailbox.0.send(req.location.signal).await?; + match timeout(req.reply_timeout.0, req.location.rx).await?? { + Ok(val) => Ok(*val.downcast().unwrap()), + Err(err) => Err(err.downcast()), + } } -} - -impl - AskRequest< - LocalAskRequest>, - BoundedMailbox, - M, - WithRequestTimeout, - WithoutRequestTimeout, - > -where - A: Actor> + Message, - M: 'static, -{ - /// Sends the message with the mailbox timeout set, waiting for a reply. - pub async fn send( - self, - ) -> Result<::Ok, SendError::Error>> { - self.location +); +impl_message_send!( + local, + BoundedMailbox, + WithRequestTimeout, + WithoutRequestTimeout, + |req| { + req.location .mailbox .0 - .send_timeout(self.location.signal, self.mailbox_timeout.0) + .send_timeout(req.location.signal, req.mailbox_timeout.0) .await?; - match self.location.rx.await? { + match req.location.rx.await? { Ok(val) => Ok(*val.downcast().unwrap()), Err(err) => Err(err.downcast()), } } - - /// Sends the message with the reply being sent back to `tx`. - pub async fn forward( - mut self, - tx: oneshot::Sender>, - ) -> Result<(), SendError::Error>> { - match &mut self.location.signal { - Signal::Message { reply, .. } => *reply = Some(tx), - _ => unreachable!("ask requests only support messages"), - } - - self.location +); +impl_message_send!( + local, + BoundedMailbox, + WithRequestTimeout, + WithRequestTimeout, + |req| { + req.location .mailbox .0 - .send_timeout(self.location.signal, self.mailbox_timeout.0) + .send_timeout(req.location.signal, req.mailbox_timeout.0) .await?; - - Ok(()) + match timeout(req.reply_timeout.0, req.location.rx).await?? { + Ok(val) => Ok(*val.downcast().unwrap()), + Err(err) => Err(err.downcast()), + } } -} - -impl<'a, A, M> - AskRequest< - RemoteAskRequest<'a, A, M>, - BoundedMailbox, - M, - WithRequestTimeout, - WithoutRequestTimeout, - > -where - A: Actor> + Message + RemoteActor + RemoteMessage, - M: Serialize, - ::Ok: DeserializeOwned, - ::Error: DeserializeOwned, -{ - /// Sends the message with the mailbox timeout set, waiting for a reply. - pub async fn send( - self, - ) -> Result<::Ok, RemoteSendError<::Error>> { - remote_ask( - self.location.actor_ref, - &self.location.msg, - Some(self.mailbox_timeout.0), - None, - false, - ) - .await +); + +impl_message_send!( + local, + UnboundedMailbox, + WithoutRequestTimeout, + WithoutRequestTimeout, + |req| { + req.location.mailbox.0.send(req.location.signal)?; + match req.location.rx.await? { + Ok(val) => Ok(*val.downcast().unwrap()), + Err(err) => Err(err.downcast()), + } } +); +impl_message_send!( + local, + UnboundedMailbox, + WithoutRequestTimeout, + WithRequestTimeout, + |req| { + req.location.mailbox.0.send(req.location.signal)?; + match timeout(req.reply_timeout.0, req.location.rx).await?? { + Ok(val) => Ok(*val.downcast().unwrap()), + Err(err) => Err(err.downcast()), + } + } +); + +impl_message_send!( + remote, + BoundedMailbox, + WithoutRequestTimeout, + WithoutRequestTimeout, + |req| (None, None) +); +impl_message_send!( + remote, + BoundedMailbox, + WithoutRequestTimeout, + WithRequestTimeout, + |req| (None, Some(req.reply_timeout.0)) +); +impl_message_send!( + remote, + BoundedMailbox, + WithRequestTimeout, + WithoutRequestTimeout, + |req| (Some(req.mailbox_timeout.0), None) +); +impl_message_send!( + remote, + BoundedMailbox, + WithRequestTimeout, + WithRequestTimeout, + |req| (Some(req.mailbox_timeout.0), Some(req.reply_timeout.0)) +); + +impl_message_send!( + remote, + UnboundedMailbox, + WithoutRequestTimeout, + WithoutRequestTimeout, + |req| (None, None) +); +impl_message_send!( + remote, + UnboundedMailbox, + WithoutRequestTimeout, + WithRequestTimeout, + |req| (None, Some(req.reply_timeout.0)) +); + +///////////////////////// +// === MessageSend === // +///////////////////////// +macro_rules! impl_try_message_send { + (local, $mailbox:ident, $mailbox_timeout:ident, $reply_timeout:ident, |$req:ident| $($body:tt)*) => { + impl TryMessageSend + for AskRequest< + LocalAskRequest>, + $mailbox, + M, + $mailbox_timeout, + $reply_timeout, + > + where + A: Actor> + Message, + M: Send + 'static, + { + type Ok = ::Ok; + type Error = SendError::Error>; + + async fn try_send(self) -> Result { + let $req = self; + $($body)* + } + } + }; + (remote, $mailbox:ident, $mailbox_timeout:ident, $reply_timeout:ident, |$req:ident| ($mailbox_timeout_body:expr, $reply_timeout_body:expr)) => { + impl<'a, A, M> TryMessageSend + for AskRequest< + RemoteAskRequest<'a, A, M>, + $mailbox, + M, + $mailbox_timeout, + $reply_timeout, + > + where + AskRequest< + LocalAskRequest>, + $mailbox, + M, + $mailbox_timeout, + $reply_timeout, + >: TryMessageSend, + A: Actor> + Message + RemoteActor + RemoteMessage, + M: Serialize + Send + Sync, + ::Ok: DeserializeOwned, + ::Error: DeserializeOwned, + { + type Ok = ::Ok; + type Error = RemoteSendError<::Error>; + + async fn try_send(self) -> Result { + let $req = self; + remote_ask( + $req.location.actor_ref, + &$req.location.msg, + $mailbox_timeout_body, + $reply_timeout_body, + true + ).await + } + } + }; } -impl - AskRequest< - LocalAskRequest>, - BoundedMailbox, - M, - WithoutRequestTimeout, - WithRequestTimeout, - > -where - A: Actor> + Message, - M: 'static, -{ - /// Sends the message with the reply timeout set, waiting for a reply. - pub async fn send( - self, - ) -> Result<::Ok, SendError::Error>> { - self.location.mailbox.0.send(self.location.signal).await?; - match timeout(self.reply_timeout.0, self.location.rx).await?? { +impl_try_message_send!( + local, + BoundedMailbox, + WithoutRequestTimeout, + WithoutRequestTimeout, + |req| { + req.location.mailbox.0.try_send(req.location.signal)?; + match req.location.rx.await? { Ok(val) => Ok(*val.downcast().unwrap()), Err(err) => Err(err.downcast()), } } - - /// Tries to send the message if the mailbox is not full, waiting for a reply up to the timeout set. - pub async fn try_send( - self, - ) -> Result<::Ok, SendError::Error>> { - self.location.mailbox.0.try_send(self.location.signal)?; - match timeout(self.reply_timeout.0, self.location.rx).await?? { +); +impl_try_message_send!( + local, + BoundedMailbox, + WithoutRequestTimeout, + WithRequestTimeout, + |req| { + req.location.mailbox.0.try_send(req.location.signal)?; + match timeout(req.reply_timeout.0, req.location.rx).await?? { Ok(val) => Ok(*val.downcast().unwrap()), Err(err) => Err(err.downcast()), } } -} - -impl<'a, A, M> - AskRequest< - RemoteAskRequest<'a, A, M>, - BoundedMailbox, - M, - WithoutRequestTimeout, - WithRequestTimeout, - > -where - A: Actor> + Message + RemoteActor + RemoteMessage, - M: Serialize, - ::Ok: DeserializeOwned, - ::Error: DeserializeOwned, -{ - /// Sends the message with the reply timeout set, waiting for a reply. - pub async fn send( - self, - ) -> Result<::Ok, RemoteSendError<::Error>> { - remote_ask( - self.location.actor_ref, - &self.location.msg, - None, - Some(self.reply_timeout.0), - false, - ) - .await +); + +impl_try_message_send!( + local, + UnboundedMailbox, + WithoutRequestTimeout, + WithoutRequestTimeout, + |req| { + req.location.mailbox.0.send(req.location.signal)?; + match req.location.rx.await? { + Ok(val) => Ok(*val.downcast().unwrap()), + Err(err) => Err(err.downcast()), + } } - - /// Tries to send the message if the mailbox is not full, waiting for a reply up to the timeout set. - pub async fn try_send( - self, - ) -> Result<::Ok, RemoteSendError<::Error>> { - remote_ask( - self.location.actor_ref, - &self.location.msg, - None, - Some(self.reply_timeout.0), - true, - ) - .await +); +impl_try_message_send!( + local, + UnboundedMailbox, + WithoutRequestTimeout, + WithRequestTimeout, + |req| { + req.location.mailbox.0.send(req.location.signal)?; + match timeout(req.reply_timeout.0, req.location.rx).await?? { + Ok(val) => Ok(*val.downcast().unwrap()), + Err(err) => Err(err.downcast()), + } } +); + +impl_try_message_send!( + remote, + BoundedMailbox, + WithoutRequestTimeout, + WithoutRequestTimeout, + |req| (None, None) +); +impl_try_message_send!( + remote, + BoundedMailbox, + WithoutRequestTimeout, + WithRequestTimeout, + |req| (None, Some(req.reply_timeout.0)) +); + +impl_try_message_send!( + remote, + UnboundedMailbox, + WithoutRequestTimeout, + WithoutRequestTimeout, + |req| (None, None) +); +impl_try_message_send!( + remote, + UnboundedMailbox, + WithoutRequestTimeout, + WithRequestTimeout, + |req| (None, Some(req.reply_timeout.0)) +); + +///////////////////////////////// +// === BlockingMessageSend === // +///////////////////////////////// +macro_rules! impl_blocking_message_send { + (local, $mailbox:ident, $mailbox_timeout:ident, $reply_timeout:ident, |$req:ident| $($body:tt)*) => { + impl BlockingMessageSend + for AskRequest< + LocalAskRequest>, + $mailbox, + M, + $mailbox_timeout, + $reply_timeout, + > + where + A: Actor> + Message, + M: 'static, + { + type Ok = ::Ok; + type Error = SendError::Error>; + + fn blocking_send(self) -> Result { + let $req = self; + $($body)* + } + } + }; } -impl - AskRequest< - LocalAskRequest>, - BoundedMailbox, - M, - WithoutRequestTimeout, - WithoutRequestTimeout, - > -where - A: Actor> + Message, - M: 'static, -{ - /// Sends the message, waiting for a reply. - pub async fn send( - self, - ) -> Result<::Ok, SendError::Error>> { - self.location.mailbox.0.send(self.location.signal).await?; - match self.location.rx.await? { +impl_blocking_message_send!( + local, + BoundedMailbox, + WithoutRequestTimeout, + WithoutRequestTimeout, + |req| { + req.location.mailbox.0.blocking_send(req.location.signal)?; + match req.location.rx.blocking_recv()? { Ok(val) => Ok(*val.downcast().unwrap()), Err(err) => Err(err.downcast()), } } - - /// Sends the message from outside the async runtime. - pub fn blocking_send( - self, - ) -> Result<::Ok, SendError::Error>> { - self.location - .mailbox - .0 - .blocking_send(self.location.signal)?; - match self.location.rx.blocking_recv()? { +); + +impl_blocking_message_send!( + local, + UnboundedMailbox, + WithoutRequestTimeout, + WithoutRequestTimeout, + |req| { + req.location.mailbox.0.send(req.location.signal)?; + match req.location.rx.blocking_recv()? { Ok(val) => Ok(*val.downcast().unwrap()), Err(err) => Err(err.downcast()), } } +); + +///////////////////////////////// +// === BlockingMessageSend === // +///////////////////////////////// +macro_rules! impl_try_blocking_message_send { + (local, $mailbox:ident, $mailbox_timeout:ident, $reply_timeout:ident, |$req:ident| $($body:tt)*) => { + impl TryBlockingMessageSend + for AskRequest< + LocalAskRequest>, + $mailbox, + M, + $mailbox_timeout, + $reply_timeout, + > + where + A: Actor> + Message, + M: 'static, + { + type Ok = ::Ok; + type Error = SendError::Error>; + + fn try_blocking_send(self) -> Result { + let $req = self; + $($body)* + } + } + }; +} - /// Tries to send the message if the mailbox is not full, waiting for a reply. - pub async fn try_send( - self, - ) -> Result<::Ok, SendError::Error>> { - self.location.mailbox.0.try_send(self.location.signal)?; - match self.location.rx.await? { +impl_try_blocking_message_send!( + local, + BoundedMailbox, + WithoutRequestTimeout, + WithoutRequestTimeout, + |req| { + req.location.mailbox.0.try_send(req.location.signal)?; + match req.location.rx.blocking_recv()? { Ok(val) => Ok(*val.downcast().unwrap()), Err(err) => Err(err.downcast()), } } - - /// Tries to send the message if the mailbox is not full from outside the async runtime, waiting for a reply. - pub fn try_blocking_send( - self, - ) -> Result<::Ok, SendError::Error>> { - self.location.mailbox.0.try_send(self.location.signal)?; - match self.location.rx.blocking_recv()? { +); + +impl_try_blocking_message_send!( + local, + UnboundedMailbox, + WithoutRequestTimeout, + WithoutRequestTimeout, + |req| { + req.location.mailbox.0.send(req.location.signal)?; + match req.location.rx.blocking_recv()? { Ok(val) => Ok(*val.downcast().unwrap()), Err(err) => Err(err.downcast()), } } - - /// Sends the message with the reply being sent back to `tx`. - pub async fn forward( - mut self, - tx: ReplySender<::Value>, - ) -> Result< - (), - SendError<(M, ReplySender<::Value>), ::Error>, - > { - match &mut self.location.signal { +); + +//////////////////////////////// +// === ForwardMessageSend === // +//////////////////////////////// +macro_rules! impl_forward_message { + (local, $mailbox:ident, $mailbox_timeout:ident, $reply_timeout:ident, |$req:ident, $tx:ident| $($body:tt)*) => { + impl ForwardMessageSend + for AskRequest< + LocalAskRequest>, + $mailbox, + M, + $mailbox_timeout, + $reply_timeout, + > + where + A: Actor> + Message, + M: Send + 'static, + { + async fn forward(self, $tx: ReplySender<::Value>) + -> Result<(), SendError<(M, ReplySender<::Value>), ::Error>> + { + let mut $req = self; + $($body)* + } + } + }; +} +impl_forward_message!( + local, + BoundedMailbox, + WithoutRequestTimeout, + WithoutRequestTimeout, + |req, tx| { + match &mut req.location.signal { Signal::Message { reply, .. } => *reply = Some(tx.box_sender()), _ => unreachable!("ask requests only support messages"), } - self.location + req.location .mailbox .0 - .send(self.location.signal) + .send(req.location.signal) .await .map_err(|err| match err.0 { Signal::Message { @@ -458,222 +660,264 @@ where _ => unreachable!("ask requests only support messages"), }) } -} +); + +impl_forward_message!( + local, + BoundedMailbox, + WithRequestTimeout, + WithoutRequestTimeout, + |req, tx| { + match &mut req.location.signal { + Signal::Message { reply, .. } => *reply = Some(tx.box_sender()), + _ => unreachable!("ask requests only support messages"), + } -impl<'a, A, M> - AskRequest< - RemoteAskRequest<'a, A, M>, + req.location + .mailbox + .0 + .send_timeout(req.location.signal, req.mailbox_timeout.0) + .await?; + + Ok(()) + } +); + +impl_forward_message!( + local, + UnboundedMailbox, + WithoutRequestTimeout, + WithoutRequestTimeout, + |req, tx| { + match &mut req.location.signal { + Signal::Message { reply, .. } => *reply = Some(tx.box_sender()), + _ => unreachable!("ask requests only support messages"), + } + + req.location + .mailbox + .0 + .send(req.location.signal) + .map_err(|err| match err.0 { + Signal::Message { + message, mut reply, .. + } => SendError::ActorNotRunning(( + message.as_any().downcast::().ok().map(|v| *v).unwrap(), + ReplySender::new(reply.take().unwrap()), + )), + _ => unreachable!("ask requests only support messages"), + }) + } +); + +impl MessageSend + for AskRequest< + LocalAskRequest>, BoundedMailbox, M, - WithoutRequestTimeout, - WithoutRequestTimeout, + MaybeRequestTimeout, + MaybeRequestTimeout, > where - A: Actor> + Message + RemoteActor + RemoteMessage, - M: Serialize, - ::Ok: DeserializeOwned, - ::Error: DeserializeOwned, + A: Actor> + Message, + M: Send + 'static, { - /// Sends the message, waiting for a reply. - pub async fn send( - self, - ) -> Result<::Ok, RemoteSendError<::Error>> { - remote_ask( - self.location.actor_ref, - &self.location.msg, - None, - None, - false, - ) - .await - } - - /// Tries to send the message if the mailbox is not full, waiting for a reply. - pub async fn try_send( - self, - ) -> Result<::Ok, RemoteSendError<::Error>> { - remote_ask( - self.location.actor_ref, - &self.location.msg, - None, - None, - true, - ) - .await + type Ok = ::Ok; + type Error = SendError::Error>; + + async fn send(self) -> Result { + match (self.mailbox_timeout, self.reply_timeout) { + (MaybeRequestTimeout::NoTimeout, MaybeRequestTimeout::NoTimeout) => { + AskRequest { + location: self.location, + mailbox_timeout: WithoutRequestTimeout, + reply_timeout: WithoutRequestTimeout, + phantom: PhantomData, + } + .send() + .await + } + (MaybeRequestTimeout::NoTimeout, MaybeRequestTimeout::Timeout(reply_timeout)) => { + AskRequest { + location: self.location, + mailbox_timeout: WithoutRequestTimeout, + reply_timeout: WithRequestTimeout(reply_timeout), + phantom: PhantomData, + } + .send() + .await + } + (MaybeRequestTimeout::Timeout(mailbox_timeout), MaybeRequestTimeout::NoTimeout) => { + AskRequest { + location: self.location, + mailbox_timeout: WithRequestTimeout(mailbox_timeout), + reply_timeout: WithoutRequestTimeout, + phantom: PhantomData, + } + .send() + .await + } + ( + MaybeRequestTimeout::Timeout(mailbox_timeout), + MaybeRequestTimeout::Timeout(reply_timeout), + ) => { + AskRequest { + location: self.location, + mailbox_timeout: WithRequestTimeout(mailbox_timeout), + reply_timeout: WithRequestTimeout(reply_timeout), + phantom: PhantomData, + } + .send() + .await + } + } } } -// Unbounded - -impl - AskRequest< +impl MessageSend + for AskRequest< LocalAskRequest>, UnboundedMailbox, M, - WithoutRequestTimeout, - WithRequestTimeout, + MaybeRequestTimeout, + MaybeRequestTimeout, > where A: Actor> + Message, - M: 'static, + M: Send + 'static, { - /// Sends the message with the reply timeout set, waiting for a reply. - pub async fn send( - self, - ) -> Result<::Ok, SendError::Error>> { - self.location.mailbox.0.send(self.location.signal)?; - match timeout(self.reply_timeout.0, self.location.rx).await?? { - Ok(val) => Ok(*val.downcast().unwrap()), - Err(err) => Err(err.downcast()), + type Ok = ::Ok; + type Error = SendError::Error>; + + async fn send(self) -> Result { + match (self.mailbox_timeout, self.reply_timeout) { + (MaybeRequestTimeout::NoTimeout, MaybeRequestTimeout::NoTimeout) => { + AskRequest { + location: self.location, + mailbox_timeout: WithoutRequestTimeout, + reply_timeout: WithoutRequestTimeout, + phantom: PhantomData, + } + .send() + .await + } + (MaybeRequestTimeout::NoTimeout, MaybeRequestTimeout::Timeout(reply_timeout)) => { + AskRequest { + location: self.location, + mailbox_timeout: WithoutRequestTimeout, + reply_timeout: WithRequestTimeout(reply_timeout), + phantom: PhantomData, + } + .send() + .await + } + (MaybeRequestTimeout::Timeout(_), MaybeRequestTimeout::NoTimeout) => { + panic!("send is not available with a mailbox timeout on unbounded mailboxes") + } + (MaybeRequestTimeout::Timeout(_), MaybeRequestTimeout::Timeout(_)) => { + panic!("send is not available with a mailbox timeout on unbounded mailboxes") + } } } } -impl<'a, A, M> - AskRequest< - RemoteAskRequest<'a, A, M>, - UnboundedMailbox, +impl TryMessageSend + for AskRequest< + LocalAskRequest>, + BoundedMailbox, M, - WithoutRequestTimeout, - WithRequestTimeout, + MaybeRequestTimeout, + MaybeRequestTimeout, > where - A: Actor> + Message + RemoteActor + RemoteMessage, - M: Serialize, - ::Ok: DeserializeOwned, - ::Error: DeserializeOwned, + A: Actor> + Message, + M: Send + 'static, { - /// Sends the message with the reply timeout set, waiting for a reply. - pub async fn send( - self, - ) -> Result<::Ok, RemoteSendError<::Error>> { - remote_ask( - self.location.actor_ref, - &self.location.msg, - None, - Some(self.reply_timeout.0), - false, - ) - .await + type Ok = ::Ok; + type Error = SendError::Error>; + + async fn try_send(self) -> Result { + match (self.mailbox_timeout, self.reply_timeout) { + (MaybeRequestTimeout::NoTimeout, MaybeRequestTimeout::NoTimeout) => { + AskRequest { + location: self.location, + mailbox_timeout: WithoutRequestTimeout, + reply_timeout: WithoutRequestTimeout, + phantom: PhantomData, + } + .try_send() + .await + } + (MaybeRequestTimeout::NoTimeout, MaybeRequestTimeout::Timeout(reply_timeout)) => { + AskRequest { + location: self.location, + mailbox_timeout: WithoutRequestTimeout, + reply_timeout: WithRequestTimeout(reply_timeout), + phantom: PhantomData, + } + .try_send() + .await + } + (MaybeRequestTimeout::Timeout(_), MaybeRequestTimeout::NoTimeout) => { + panic!("try_send is not available when a mailbox timeout is set") + } + (MaybeRequestTimeout::Timeout(_), MaybeRequestTimeout::Timeout(_)) => { + panic!("try_send is not available when a mailbox timeout is set") + } + } } } -impl - AskRequest< +impl TryMessageSend + for AskRequest< LocalAskRequest>, UnboundedMailbox, M, - WithoutRequestTimeout, - WithoutRequestTimeout, + MaybeRequestTimeout, + MaybeRequestTimeout, > where A: Actor> + Message, - M: 'static, + M: Send + 'static, { - /// Sends the message, waiting for a reply. - pub async fn send( - self, - ) -> Result<::Ok, SendError::Error>> { - self.location.mailbox.0.send(self.location.signal)?; - match self.location.rx.await? { - Ok(val) => Ok(*val.downcast().unwrap()), - Err(err) => Err(err.downcast()), - } - } - - /// Sends the message from outside the async runtime. - pub fn blocking_send( - self, - ) -> Result<::Ok, SendError::Error>> { - self.location.mailbox.0.send(self.location.signal)?; - match self.location.rx.blocking_recv()? { - Ok(val) => Ok(*val.downcast().unwrap()), - Err(err) => Err(err.downcast()), - } - } - - /// Sends the message with the reply being sent back to `tx`. - pub fn forward( - mut self, - tx: ReplySender<::Value>, - ) -> Result< - (), - SendError<(M, ReplySender<::Value>), ::Error>, - > { - match &mut self.location.signal { - Signal::Message { reply, .. } => *reply = Some(tx.box_sender()), - _ => unreachable!("ask requests only support messages"), + type Ok = ::Ok; + type Error = SendError::Error>; + + async fn try_send(self) -> Result { + match (self.mailbox_timeout, self.reply_timeout) { + (MaybeRequestTimeout::NoTimeout, MaybeRequestTimeout::NoTimeout) => { + AskRequest { + location: self.location, + mailbox_timeout: WithoutRequestTimeout, + reply_timeout: WithoutRequestTimeout, + phantom: PhantomData, + } + .try_send() + .await + } + (MaybeRequestTimeout::NoTimeout, MaybeRequestTimeout::Timeout(reply_timeout)) => { + AskRequest { + location: self.location, + mailbox_timeout: WithoutRequestTimeout, + reply_timeout: WithRequestTimeout(reply_timeout), + phantom: PhantomData, + } + .try_send() + .await + } + (MaybeRequestTimeout::Timeout(_), MaybeRequestTimeout::NoTimeout) => { + panic!("try_send is not available when a mailbox timeout is set") + } + (MaybeRequestTimeout::Timeout(_), MaybeRequestTimeout::Timeout(_)) => { + panic!("try_send is not available when a mailbox timeout is set") + } } - - self.location - .mailbox - .0 - .send(self.location.signal) - .map_err(|err| match err.0 { - Signal::Message { - message, mut reply, .. - } => SendError::ActorNotRunning(( - message.as_any().downcast::().ok().map(|v| *v).unwrap(), - ReplySender::new(reply.take().unwrap()), - )), - _ => unreachable!("ask requests only support messages"), - }) - } -} - -impl<'a, A, M> - AskRequest< - RemoteAskRequest<'a, A, M>, - UnboundedMailbox, - M, - WithoutRequestTimeout, - WithoutRequestTimeout, - > -where - A: Actor> + Message + RemoteActor + RemoteMessage, - M: Serialize, - ::Ok: DeserializeOwned, - ::Error: DeserializeOwned, -{ - /// Sends the message, waiting for a reply. - pub async fn send( - self, - ) -> Result<::Ok, RemoteSendError<::Error>> { - remote_ask( - self.location.actor_ref, - &self.location.msg, - None, - None, - false, - ) - .await } } -/// A request to a local actor. -#[allow(missing_debug_implementations)] -pub struct LocalAskRequest -where - A: Actor, -{ - mailbox: Mb, - signal: Signal, - rx: oneshot::Receiver>, -} - -/// A request to a remote actor. -#[allow(missing_debug_implementations)] -pub struct RemoteAskRequest<'a, A, M> -where - A: Actor, -{ +async fn remote_ask<'a, A, M>( actor_ref: &'a RemoteActorRef, msg: &'a M, -} - -async fn remote_ask( - actor_ref: &RemoteActorRef, - msg: &M, mailbox_timeout: Option, reply_timeout: Option, immediate: bool, diff --git a/src/request/tell.rs b/src/request/tell.rs index 3bd0244..f7263cf 100644 --- a/src/request/tell.rs +++ b/src/request/tell.rs @@ -1,19 +1,22 @@ use core::panic; -use std::{borrow::Cow, marker::PhantomData, mem, time::Duration}; +use std::{borrow::Cow, marker::PhantomData, time::Duration}; -use futures::TryFutureExt; use serde::{de::DeserializeOwned, Serialize}; -use tokio::{sync::oneshot, task::JoinHandle, time::timeout}; +use tokio::sync::oneshot; use crate::{ - actor::{ActorRef, BoundedMailbox, RemoteActorRef, Signal, UnboundedMailbox}, + actor::{ActorRef, RemoteActorRef}, error::{RemoteSendError, SendError}, + mailbox::{bounded::BoundedMailbox, unbounded::UnboundedMailbox, Signal}, message::Message, remote::{ActorSwarm, RemoteActor, RemoteMessage, SwarmCommand, SwarmReq, SwarmResp}, Actor, Reply, }; -use super::{WithRequestTimeout, WithoutRequestTimeout}; +use super::{ + BlockingMessageSend, MaybeRequestTimeout, MessageSend, MessageSendSync, TryBlockingMessageSend, + TryMessageSend, TryMessageSendSync, WithRequestTimeout, WithoutRequestTimeout, +}; /// A request to send a message to an actor without any reply. /// @@ -25,6 +28,26 @@ pub struct TellRequest { phantom: PhantomData<(Mb, M)>, } +/// A request to a local actor. +#[allow(missing_debug_implementations)] +pub struct LocalTellRequest +where + A: Actor, +{ + mailbox: Mb, + signal: Signal, +} + +/// A request to a remote actor. +#[allow(missing_debug_implementations)] +pub struct RemoteTellRequest<'a, A, M> +where + A: Actor, +{ + actor_ref: &'a RemoteActorRef, + msg: &'a M, +} + impl TellRequest, A::Mailbox, M, WithoutRequestTimeout> where A: Actor, @@ -63,16 +86,15 @@ where } } -impl TellRequest>, BoundedMailbox, M, T> +impl TellRequest, M, T> where A: Actor>, { - /// Sets the timeout for waiting for a reply from the actor. - pub fn timeout( + /// Sets the timeout for waiting for the actors mailbox to have capacity. + pub fn mailbox_timeout( self, duration: Duration, - ) -> TellRequest>, BoundedMailbox, M, WithRequestTimeout> - { + ) -> TellRequest, M, WithRequestTimeout> { TellRequest { location: self.location, timeout: WithRequestTimeout(duration), @@ -81,250 +103,421 @@ where } } -impl<'a, A, M, T> TellRequest, BoundedMailbox, M, T> -where - A: Actor>, -{ - /// Sets the timeout for waiting for a reply from the actor. - pub fn timeout( +impl TellRequest { + pub(crate) fn into_maybe_timeouts( self, - duration: Duration, - ) -> TellRequest, BoundedMailbox, M, WithRequestTimeout> { + mailbox_timeout: MaybeRequestTimeout, + ) -> TellRequest { TellRequest { location: self.location, - timeout: WithRequestTimeout(duration), + timeout: mailbox_timeout, phantom: PhantomData, } } } -impl - TellRequest>, BoundedMailbox, M, WithoutRequestTimeout> -where - A: Actor> + Message, - M: 'static, -{ - /// Sends the message. - pub async fn send(self) -> Result<(), SendError::Error>> { - self.location.mailbox.0.send(self.location.signal).await?; - Ok(()) - } +///////////////////////// +// === MessageSend === // +///////////////////////// +macro_rules! impl_message_send { + (local, $mailbox:ident, $timeout:ident, |$req:ident| $($body:tt)*) => { + impl MessageSend + for TellRequest< + LocalTellRequest>, + $mailbox, + M, + $timeout, + > + where + A: Actor> + Message, + M: Send + 'static, + { + type Ok = (); + type Error = SendError::Error>; - /// Sends the message from outside the async runtime. - pub fn blocking_send(self) -> Result<(), SendError::Error>> { - self.location - .mailbox - .0 - .blocking_send(self.location.signal)?; - Ok(()) - } + async fn send(self) -> Result { + let $req = self; + $($body)* + } + } + }; + (remote, $mailbox:ident, $timeout:ident, |$req:ident| $($body:tt)*) => { + impl<'a, A, M> MessageSend + for TellRequest, $mailbox, M, $timeout> + where + TellRequest< + LocalTellRequest>, + $mailbox, + M, + $timeout, + >: MessageSend, + A: Actor> + Message + RemoteActor + RemoteMessage, + M: Serialize + Send + Sync, + ::Error: DeserializeOwned, + { + type Ok = (); + type Error = RemoteSendError<::Error>; - /// Tries to send the message if the mailbox is not full. - pub fn try_send(self) -> Result<(), SendError::Error>> { - self.location.mailbox.0.try_send(self.location.signal)?; - Ok(()) - } + async fn send(self) -> Result { + let $req = self; + remote_tell($req.location.actor_ref, &$req.location.msg, $($body)*, false).await + } + } + }; +} - /// Sends the message after the given delay in the background. - /// - /// If reserve is true, then a permit will be reserved in - /// the actors mailbox before waiting for the delay. - pub fn delayed_send( - mut self, - delay: Duration, - reserve: bool, - ) -> JoinHandle::Error>>> - where - M: Send, - { - tokio::spawn(async move { - let permit = match reserve { - true => Some(self.location.mailbox.0.reserve().await.map_err(|_| { - SendError::ActorNotRunning( - mem::replace(&mut self.location.signal, Signal::Stop) // Replace signal with a dummy value - .downcast_message::() - .unwrap(), - ) - })?), - false => None, - }; - - tokio::time::sleep(delay).await; - - match permit { - Some(permit) => permit.send(self.location.signal), - None => self.location.mailbox.0.send(self.location.signal).await?, +impl_message_send!(local, BoundedMailbox, WithoutRequestTimeout, |req| { + req.location.mailbox.0.send(req.location.signal).await?; + Ok(()) +}); +impl_message_send!(local, BoundedMailbox, WithRequestTimeout, |req| { + req.location + .mailbox + .0 + .send_timeout(req.location.signal, req.timeout.0) + .await?; + Ok(()) +}); +impl_message_send!(remote, BoundedMailbox, WithoutRequestTimeout, |req| None); +impl_message_send!(remote, BoundedMailbox, WithRequestTimeout, |req| Some( + req.timeout.0 +)); + +impl_message_send!(local, UnboundedMailbox, WithoutRequestTimeout, |req| { + req.location.mailbox.0.send(req.location.signal)?; + Ok(()) +}); +impl_message_send!(remote, UnboundedMailbox, WithoutRequestTimeout, |req| None); + +///////////////////////////// +// === MessageSendSync === // +///////////////////////////// +macro_rules! impl_message_send_sync { + (local, $mailbox:ident, $timeout:ident, |$req:ident| $($body:tt)*) => { + impl MessageSendSync + for TellRequest< + LocalTellRequest>, + $mailbox, + M, + $timeout, + > + where + A: Actor> + Message, + M: 'static, + { + type Ok = (); + type Error = SendError::Error>; + + fn send_sync(self) -> Result { + let $req = self; + $($body)* } + } + }; +} - Ok(()) - }) - } +impl_message_send_sync!(local, UnboundedMailbox, WithoutRequestTimeout, |req| { + req.location.mailbox.0.send(req.location.signal)?; + Ok(()) +}); + +//////////////////////////// +// === TryMessageSend === // +//////////////////////////// +macro_rules! impl_try_message_send { + (local, $mailbox:ident, $timeout:ident, |$req:ident| $($body:tt)*) => { + impl TryMessageSend + for TellRequest< + LocalTellRequest>, + $mailbox, + M, + $timeout, + > + where + A: Actor> + Message, + M: Send + 'static, + { + type Ok = (); + type Error = SendError::Error>; + + async fn try_send(self) -> Result { + let $req = self; + $($body)* + } + } + }; + (remote, $mailbox:ident, $timeout:ident, |$req:ident| $($body:tt)*) => { + impl<'a, A, M> TryMessageSend + for TellRequest, $mailbox, M, $timeout> + where + TellRequest< + LocalTellRequest>, + $mailbox, + M, + $timeout, + >: TryMessageSend, + A: Actor> + Message + RemoteActor + RemoteMessage, + M: Serialize + Send + Sync, + ::Error: DeserializeOwned, + { + type Ok = (); + type Error = RemoteSendError<::Error>; + + async fn try_send(self) -> Result { + let $req = self; + remote_tell($req.location.actor_ref, &$req.location.msg, $($body)*, true).await + } + } + }; } -impl<'a, A, M> TellRequest, BoundedMailbox, M, WithoutRequestTimeout> -where - A: Actor> + Message + RemoteActor + RemoteMessage, - M: Serialize, - ::Error: DeserializeOwned, -{ - /// Sends the message. - pub async fn send(self) -> Result<(), RemoteSendError<::Error>> { - remote_tell(self.location.actor_ref, &self.location.msg, None, false).await - } +impl_try_message_send!(local, BoundedMailbox, WithoutRequestTimeout, |req| { + req.location.mailbox.0.try_send(req.location.signal)?; + Ok(()) +}); +impl_try_message_send!(remote, BoundedMailbox, WithoutRequestTimeout, |req| None); +impl_try_message_send!(local, UnboundedMailbox, WithoutRequestTimeout, |req| { + req.location.mailbox.0.send(req.location.signal)?; + Ok(()) +}); +impl_try_message_send!(remote, UnboundedMailbox, WithoutRequestTimeout, |req| None); - /// Tries to send the message if the mailbox is not full. - pub async fn try_send(self) -> Result<(), RemoteSendError<::Error>> { - remote_tell(self.location.actor_ref, &self.location.msg, None, true).await - } +//////////////////////////////// +// === TryMessageSendSync === // +//////////////////////////////// +macro_rules! impl_try_message_send_sync { + (local, $mailbox:ident, $timeout:ident, |$req:ident| $($body:tt)*) => { + impl TryMessageSendSync + for TellRequest< + LocalTellRequest>, + $mailbox, + M, + $timeout, + > + where + A: Actor> + Message, + M: 'static, + { + type Ok = (); + type Error = SendError::Error>; + + fn try_send_sync(self) -> Result { + let $req = self; + $($body)* + } + } + }; } -impl - TellRequest>, BoundedMailbox, M, WithRequestTimeout> -where - A: Actor> + Message, - M: 'static, -{ - /// Sends the message with the timeout set. - pub async fn send(self) -> Result<(), SendError::Error>> { - self.location - .mailbox - .0 - .send_timeout(self.location.signal, self.timeout.0) - .await?; - Ok(()) - } +impl_try_message_send_sync!(local, BoundedMailbox, WithoutRequestTimeout, |req| { + req.location.mailbox.0.try_send(req.location.signal)?; + Ok(()) +}); - /// Sends the message after the given delay in the background with the timeout set. - /// - /// If reserve is true, then a permit will be reserved in - /// the actors mailbox before waiting for the delay. - pub fn delayed_send( - mut self, - delay: Duration, - reserve: bool, - ) -> JoinHandle::Error>>> - where - M: Send, - { - tokio::spawn(async move { - let permit = match reserve { - true => { - let permit = timeout( - self.timeout.0, - self.location.mailbox.0.reserve().map_err(|_| { - SendError::ActorNotRunning( - mem::replace(&mut self.location.signal, Signal::Stop) // Replace signal with a dummy value - .downcast_message::() - .unwrap(), - ) - }), - ) - .await??; - Some(permit) - } - false => None, - }; - - tokio::time::sleep(delay).await; - - match permit { - Some(permit) => permit.send(self.location.signal), - None => { - self.location - .mailbox - .0 - .send_timeout(self.location.signal, self.timeout.0) - .await? - } +impl_try_message_send_sync!(local, UnboundedMailbox, WithoutRequestTimeout, |req| { + req.location.mailbox.0.send(req.location.signal)?; + Ok(()) +}); + +//////////////////////////////// +// === BlockingMessageSend === // +//////////////////////////////// +macro_rules! impl_blocking_message_send { + (local, $mailbox:ident, $timeout:ident, |$req:ident| $($body:tt)*) => { + impl BlockingMessageSend + for TellRequest< + LocalTellRequest>, + $mailbox, + M, + $timeout, + > + where + A: Actor> + Message, + M: 'static, + { + type Ok = (); + type Error = SendError::Error>; + + fn blocking_send(self) -> Result { + let $req = self; + $($body)* } + } + }; +} - Ok(()) - }) - } +impl_blocking_message_send!(local, BoundedMailbox, WithoutRequestTimeout, |req| { + req.location.mailbox.0.blocking_send(req.location.signal)?; + Ok(()) +}); + +impl_blocking_message_send!(local, UnboundedMailbox, WithoutRequestTimeout, |req| { + req.location.mailbox.0.send(req.location.signal)?; + Ok(()) +}); + +//////////////////////////////////// +// === TryBlockingMessageSend === // +//////////////////////////////////// +macro_rules! impl_try_blocking_message_send { + (local, $mailbox:ident, $timeout:ident, |$req:ident| $($body:tt)*) => { + impl TryBlockingMessageSend + for TellRequest< + LocalTellRequest>, + $mailbox, + M, + $timeout, + > + where + A: Actor> + Message, + M: 'static, + { + type Ok = (); + type Error = SendError::Error>; + + fn try_blocking_send(self) -> Result { + let $req = self; + $($body)* + } + } + }; } -impl<'a, A, M> TellRequest, BoundedMailbox, M, WithRequestTimeout> +impl_try_blocking_message_send!(local, BoundedMailbox, WithoutRequestTimeout, |req| { + req.location.mailbox.0.try_send(req.location.signal)?; + Ok(()) +}); + +impl_try_blocking_message_send!(local, UnboundedMailbox, WithoutRequestTimeout, |req| { + req.location.mailbox.0.send(req.location.signal)?; + Ok(()) +}); + +impl MessageSend + for TellRequest< + LocalTellRequest>, + BoundedMailbox, + M, + MaybeRequestTimeout, + > where - A: Actor> + Message + RemoteActor + RemoteMessage, - M: Serialize, - ::Error: DeserializeOwned, + A: Actor> + Message, + M: Send + 'static, { - /// Sends the message with the timeout set. - pub async fn send(self) -> Result<(), RemoteSendError<::Error>> { - remote_tell( - self.location.actor_ref, - &self.location.msg, - Some(self.timeout.0), - false, - ) - .await + type Ok = (); + type Error = SendError::Error>; + + async fn send(self) -> Result { + match self.timeout { + MaybeRequestTimeout::NoTimeout => { + self.location.mailbox.0.send(self.location.signal).await?; + } + MaybeRequestTimeout::Timeout(timeout) => { + self.location + .mailbox + .0 + .send_timeout(self.location.signal, timeout) + .await?; + } + } + Ok(()) } } -impl - TellRequest< +impl MessageSend + for TellRequest< LocalTellRequest>, UnboundedMailbox, M, - WithoutRequestTimeout, + MaybeRequestTimeout, > where A: Actor> + Message, - M: 'static, + M: Send + 'static, { - /// Sends the message. - pub fn send(self) -> Result<(), SendError::Error>> { - self.location.mailbox.0.send(self.location.signal)?; - Ok(()) - } + type Ok = (); + type Error = SendError::Error>; - /// Sends the message after the given delay in the background. - pub fn delayed_send( - self, - delay: Duration, - ) -> JoinHandle::Error>>> - where - M: Send, - { - tokio::spawn(async move { - tokio::time::sleep(delay).await; - self.location.mailbox.0.send(self.location.signal)?; - Ok(()) - }) + async fn send(self) -> Result { + match self.timeout { + MaybeRequestTimeout::NoTimeout => { + TellRequest { + location: self.location, + timeout: WithoutRequestTimeout, + phantom: PhantomData, + } + .send() + .await + } + MaybeRequestTimeout::Timeout(_) => { + panic!("mailbox timeout is not available with unbounded mailboxes") + } + } } } -impl<'a, A, M> - TellRequest, UnboundedMailbox, M, WithoutRequestTimeout> +impl TryMessageSend + for TellRequest< + LocalTellRequest>, + BoundedMailbox, + M, + MaybeRequestTimeout, + > where - A: Actor> + Message + RemoteActor + RemoteMessage, - M: Serialize, - ::Error: DeserializeOwned, + A: Actor> + Message, + M: Send + 'static, { - /// Sends the message. - pub async fn send(self) -> Result<(), RemoteSendError<::Error>> { - remote_tell(self.location.actor_ref, &self.location.msg, None, false).await + type Ok = (); + type Error = SendError::Error>; + + async fn try_send(self) -> Result { + match self.timeout { + MaybeRequestTimeout::NoTimeout => { + TellRequest { + location: self.location, + timeout: WithoutRequestTimeout, + phantom: PhantomData, + } + .try_send() + .await + } + MaybeRequestTimeout::Timeout(_) => { + panic!("try_send is not available when a mailbox timeout is set") + } + } } } -/// A request to a local actor. -#[allow(missing_debug_implementations)] -pub struct LocalTellRequest +impl TryMessageSend + for TellRequest< + LocalTellRequest>, + UnboundedMailbox, + M, + MaybeRequestTimeout, + > where - A: Actor, + A: Actor> + Message, + M: Send + 'static, { - mailbox: Mb, - signal: Signal, -} + type Ok = (); + type Error = SendError::Error>; -/// A request to a remote actor. -#[allow(missing_debug_implementations)] -pub struct RemoteTellRequest<'a, A, M> -where - A: Actor, -{ - actor_ref: &'a RemoteActorRef, - msg: &'a M, + async fn try_send(self) -> Result { + match self.timeout { + MaybeRequestTimeout::NoTimeout => { + TellRequest { + location: self.location, + timeout: WithoutRequestTimeout, + phantom: PhantomData, + } + .try_send() + .await + } + MaybeRequestTimeout::Timeout(_) => { + panic!("try_send is not available when a mailbox timeout is set") + } + } + } } async fn remote_tell(