Skip to content

Commit

Permalink
feat!: return SendError from send methods allowing replies to be re…
Browse files Browse the repository at this point in the history
…ceived blocking (#27)
  • Loading branch information
tqwewe authored May 27, 2024
1 parent b059d59 commit bff40dc
Show file tree
Hide file tree
Showing 4 changed files with 197 additions and 31 deletions.
135 changes: 112 additions & 23 deletions crates/kameo/src/actor/actor_ref.rs
Original file line number Diff line number Diff line change
@@ -1,23 +1,27 @@
use std::{
cell::Cell,
collections::HashMap,
fmt,
marker::PhantomData,
pin::Pin,
sync::{
atomic::{AtomicU64, Ordering},
Arc, Mutex, TryLockError,
},
task::{self, ready, Poll},
time::Duration,
};

use dyn_clone::DynClone;
use futures::{stream::AbortHandle, Stream, StreamExt};
use futures::{stream::AbortHandle, Future, Stream, StreamExt};
use tokio::{
sync::{mpsc, oneshot},
task::JoinHandle,
task_local,
};

use crate::{
error::{ActorStopReason, BoxSendError, SendError},
error::{ActorStopReason, BoxSendError, SendError, SendResult},
message::{
BlockingMessage, BoxReply, DynBlockingMessage, DynMessage, DynQuery, Message, Query,
StreamMessage,
Expand All @@ -29,6 +33,9 @@ static ACTOR_COUNTER: AtomicU64 = AtomicU64::new(0);
task_local! {
pub(crate) static CURRENT_ACTOR_ID: u64;
}
thread_local! {
pub(crate) static CURRENT_THREAD_ACTOR_ID: Cell<Option<u64>> = Cell::new(None);
}

type Mailbox<A> = mpsc::UnboundedSender<Signal<A>>;
type WeakMailbox<A> = mpsc::WeakUnboundedSender<Signal<A>>;
Expand Down Expand Up @@ -140,10 +147,20 @@ impl<A> ActorRef<A> {
}

/// Sends a message to the actor, waiting for a reply.
pub async fn send<M>(
///
/// The message reply can be awaited asyncronously, or in a blocking context either by awaiting it directly,
/// or calling [`SendResult::blocking_recv()`].
///
/// # Example
///
/// ```
/// actor_ref.send(msg).await?; // Receive reply asyncronously
/// actor_ref.send(msg).blocking_recv()?; // Receive reply blocking
/// ```
pub fn send<M>(
&self,
msg: M,
) -> Result<<A::Reply as Reply>::Ok, SendError<M, <A::Reply as Reply>::Error>>
) -> SendResult<<A::Reply as Reply>::Ok, M, <A::Reply as Reply>::Error>
where
A: Message<M>,
M: Send + 'static,
Expand All @@ -154,16 +171,16 @@ impl<A> ActorRef<A> {
);

let (reply, rx) = oneshot::channel();
self.mailbox.send(Signal::Message {
let res = self.mailbox.send(Signal::Message {
message: Box::new(msg),
actor_ref: self.clone(),
reply: Some(reply),
sent_within_actor: self.is_current(),
})?;
match rx.await? {
Ok(val) => Ok(*val.downcast().unwrap()),
Err(err) => Err(err.downcast()),
});
if let Err(err) = res {
return SendResult::err(err.into());
}
SendResult::ok(rx)
}

/// Sends a message to the actor asyncronously without waiting for a reply.
Expand Down Expand Up @@ -200,10 +217,20 @@ impl<A> ActorRef<A> {
}

/// Sends a blocking message to the actor, waiting for a reply.
pub async fn send_blocking<M>(
///
/// The message reply can be awaited asyncronously, or in a blocking context either by awaiting it directly,
/// or calling [`SendResult::blocking_recv()`].
///
/// # Example
///
/// ```
/// actor_ref.send_blocking(msg).await?; // Receive reply asyncronously
/// actor_ref.send_blocking(msg).blocking_recv()?; // Receive reply blocking
/// ```
pub fn send_blocking<M>(
&self,
msg: M,
) -> Result<<A::Reply as Reply>::Ok, SendError<M, <A::Reply as Reply>::Error>>
) -> SendResult<<A::Reply as Reply>::Ok, M, <A::Reply as Reply>::Error>
where
A: BlockingMessage<M> + Sync,
M: Send + 'static,
Expand All @@ -214,16 +241,16 @@ impl<A> ActorRef<A> {
);

let (reply, rx) = oneshot::channel();
self.mailbox.send(Signal::BlockingMessage {
let res = self.mailbox.send(Signal::BlockingMessage {
message: Box::new(msg),
actor_ref: self.clone(),
reply: Some(reply),
sent_within_actor: self.is_current(),
})?;
match rx.await? {
Ok(val) => Ok(*val.downcast().unwrap()),
Err(err) => Err(err.downcast()),
});
if let Err(err) = res {
return SendResult::err(err.into());
}
SendResult::ok(rx)
}

/// Sends a blocking message to the actor asyncronously without waiting for a reply.
Expand Down Expand Up @@ -269,25 +296,35 @@ impl<A> ActorRef<A> {
///
/// If the actor was spawned as `!Sync` with [spawn_unsync](crate::actor::spawn_unsync),
/// then queries will not be supported and any query will return an error of [`SendError::QueriesNotSupported`].
pub async fn query<M>(
///
/// The query reply can be awaited asyncronously, or in a blocking context either by awaiting it directly,
/// or calling [`SendResult::blocking_recv()`].
///
/// # Example
///
/// ```
/// actor_ref.query(msg).await?; // Receive reply asyncronously
/// actor_ref.query(msg).blocking_recv()?; // Receive reply blocking
/// ```
pub fn query<M>(
&self,
msg: M,
) -> Result<<A::Reply as Reply>::Ok, SendError<M, <A::Reply as Reply>::Error>>
) -> SendResult<<A::Reply as Reply>::Ok, M, <A::Reply as Reply>::Error>
where
A: Query<M>,
M: Send + 'static,
{
let (reply, rx) = oneshot::channel();
self.mailbox.send(Signal::Query {
let res = self.mailbox.send(Signal::Query {
query: Box::new(msg),
actor_ref: self.clone(),
reply: Some(reply),
sent_within_actor: self.is_current(),
})?;
match rx.await? {
Ok(val) => Ok(*val.downcast().unwrap()),
Err(err) => Err(err.downcast()),
});
if let Err(err) = res {
return SendResult::err(err.into());
}
SendResult::ok(rx)
}

/// Attaches a stream of messages to the actor.
Expand Down Expand Up @@ -495,6 +532,58 @@ impl<A: ?Sized> fmt::Debug for WeakActorRef<A> {
}
}

/// A receiver for receiving a message response either asyncronously or blocking.
///
/// To receive the message asyncronously, simply `.await` the `MessageReceiver`.
/// Or to receive in a blocking context, use [`MessageReceiver::blocking_recv()`].
#[derive(Debug)]
pub struct MessageReceiver<T, M, E> {
rx: oneshot::Receiver<Result<BoxReply, BoxSendError>>,
phantom: PhantomData<(T, M, E)>,
}

impl<T, M, E> MessageReceiver<T, M, E>
where
T: 'static,
M: 'static,
E: 'static,
{
pub(crate) fn new(rx: oneshot::Receiver<Result<BoxReply, BoxSendError>>) -> Self {
MessageReceiver {
rx,
phantom: PhantomData,
}
}

/// Receives the message response outside an async context.
pub fn blocking_recv(self) -> Result<T, SendError<M, E>> {
let res = self.rx.blocking_recv()?;
match res {
Ok(val) => Ok(*val.downcast().unwrap()),
Err(err) => Err(err.downcast()),
}
}
}

impl<T, M, E> Future for MessageReceiver<T, M, E>
where
T: 'static,
M: 'static,
E: 'static,
{
type Output = Result<T, SendError<M, E>>;

fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
// Safety: `rx` is `Unpin` and we do not move `self`.
let rx = unsafe { self.map_unchecked_mut(|s| &mut s.rx) };
let res = ready!(rx.poll(cx));
match res? {
Ok(val) => Poll::Ready(Ok(*val.downcast().unwrap())),
Err(err) => Poll::Ready(Err(err.downcast())),
}
}
}

pub(crate) trait SignalMailbox: DynClone + Send {
fn signal_startup_finished(&self) -> Result<(), SendError>;
fn signal_link_died(&self, id: u64, reason: ActorStopReason) -> Result<(), SendError>;
Expand Down
16 changes: 11 additions & 5 deletions crates/kameo/src/actor/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,11 +94,14 @@ impl<A> ActorPool<A> {
) -> Result<<A::Reply as Reply>::Ok, SendError<M, <A::Reply as Reply>::Error>>
where
A: Actor + Message<M>,
M: Send + 'static,
M: Unpin + Send + 'static,
<A::Reply as Reply>::Ok: Unpin,
<A::Reply as Reply>::Error: Unpin,
{
for _ in 0..self.workers.len() * 2 {
let (idx, worker) = self.next_worker();
match worker.send(msg).await {
let res = worker.send(msg).await;
match res {
Err(SendError::ActorNotRunning(v)) => {
msg = v;
warn!(id = %worker.id(), name = %A::name(), "restarting worker");
Expand Down Expand Up @@ -139,7 +142,9 @@ impl<A> ActorPool<A> {
) -> Vec<Result<<A::Reply as Reply>::Ok, SendError<M, <A::Reply as Reply>::Error>>>
where
A: Actor + Message<M>,
M: Clone + Send + 'static,
M: Clone + Unpin + Send + 'static,
<A::Reply as Reply>::Ok: Unpin,
<A::Reply as Reply>::Error: Unpin,
{
let results = join_all(self.workers.iter().map(|worker| worker.send(msg.clone()))).await;
for (i, res) in results.iter().enumerate() {
Expand Down Expand Up @@ -195,8 +200,9 @@ impl<A> Actor for ActorPool<A> {
impl<A, M> Message<M> for ActorPool<A>
where
A: Actor + Message<M>,
M: Send + Sync + 'static,
<A::Reply as Reply>::Error: fmt::Debug + Sync,
M: Unpin + Send + Sync + 'static,
<A::Reply as Reply>::Ok: Unpin,
<A::Reply as Reply>::Error: fmt::Debug + Unpin + Sync,
{
type Reply = Result<<A::Reply as Reply>::Ok, SendError<M, <A::Reply as Reply>::Error>>;

Expand Down
72 changes: 71 additions & 1 deletion crates/kameo/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,88 @@ use std::{
any::{self, Any},
convert::Infallible,
error, fmt,
pin::Pin,
sync::{Arc, Mutex, MutexGuard, PoisonError},
task::{self, Poll},
};

use futures::{Future, FutureExt};
use tokio::sync::{mpsc, oneshot};

use crate::{actor::Signal, message::BoxDebug};
use crate::{
actor::{MessageReceiver, Signal},
message::{BoxDebug, BoxReply},
};

/// A dyn boxed error.
pub type BoxError = Box<dyn error::Error + Send + Sync + 'static>;
/// A dyn boxed send error.
pub type BoxSendError = SendError<Box<dyn any::Any + Send>, Box<dyn any::Any + Send>>;

/// A result returned when sending a message to an actor, allowing it to be awaited asyncronously, or received
/// in a blocking context with `SendResult::blocking_recv()`.
#[derive(Debug)]
pub struct SendResult<T, M, E> {
inner: SendResultInner<T, M, E>,
}

#[derive(Debug)]
enum SendResultInner<T, M, E> {
Ok(MessageReceiver<T, M, E>),
Err(Option<SendError<M, E>>),
}

impl<T, M, E> SendResult<T, M, E>
where
T: 'static,
M: 'static,
E: 'static,
{
pub(crate) fn ok(rx: oneshot::Receiver<Result<BoxReply, BoxSendError>>) -> SendResult<T, M, E> {
SendResult {
inner: SendResultInner::Ok(MessageReceiver::new(rx)),
}
}

pub(crate) fn err(err: SendError<M, E>) -> SendResult<T, M, E> {
SendResult {
inner: SendResultInner::Err(Some(err)),
}
}

/// Receives the message response outside an async context.
pub fn blocking_recv(self) -> Result<T, SendError<M, E>> {
match self.inner {
SendResultInner::Ok(rx) => rx.blocking_recv(),
SendResultInner::Err(mut err) => Err(err.take().unwrap()),
}
}

/// Converts the `SendResult` into a std `Result`.
pub fn into_result(self) -> Result<MessageReceiver<T, M, E>, SendError<M, E>> {
match self.inner {
SendResultInner::Ok(rx) => Ok(rx),
SendResultInner::Err(mut err) => Err(err.take().unwrap()),
}
}
}

impl<T, M, E> Future for SendResult<T, M, E>
where
T: Unpin + 'static,
M: Unpin + 'static,
E: Unpin + 'static,
{
type Output = Result<T, SendError<M, E>>;

fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
match &mut self.get_mut().inner {
SendResultInner::Ok(rx) => rx.poll_unpin(cx),
SendResultInner::Err(err) => Poll::Ready(Err(err.take().unwrap())),
}
}
}

/// Error that can occur when sending a message to an actor.
#[derive(Clone, Copy, PartialEq, Eq)]
pub enum SendError<M = (), E = Infallible> {
Expand Down
5 changes: 3 additions & 2 deletions crates/kameo/src/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,10 +176,11 @@ where
) -> ForwardedReply<R::Ok, M, E>
where
B: Message<M, Reply = R2>,
M: Send + Sync + 'static,
M: Unpin + Send + Sync + 'static,
R: Reply<Error = SendError<M, E>, Value = Result<<R as Reply>::Ok, SendError<M, E>>>,
R2: Reply<Ok = R::Ok, Error = E, Value = Result<R::Ok, E>>,
E: fmt::Debug + Send + Sync + 'static,
E: fmt::Debug + Unpin + Send + Sync + 'static,
R::Ok: Unpin,
{
let (delegated_reply, reply_sender) = self.reply_sender();
tokio::spawn(async move {
Expand Down

0 comments on commit bff40dc

Please sign in to comment.