From 56c6844e083700fcea36b182e7e441d921f5eb86 Mon Sep 17 00:00:00 2001 From: Ari Seyhun Date: Sun, 29 Sep 2024 12:04:13 +0800 Subject: [PATCH] chore: remove unused mailbox modules --- src/actor/mailbox.rs | 87 -------------- src/actor/mailbox/bounded.rs | 209 --------------------------------- src/actor/mailbox/unbounded.rs | 198 ------------------------------- 3 files changed, 494 deletions(-) delete mode 100644 src/actor/mailbox.rs delete mode 100644 src/actor/mailbox/bounded.rs delete mode 100644 src/actor/mailbox/unbounded.rs diff --git a/src/actor/mailbox.rs b/src/actor/mailbox.rs deleted file mode 100644 index f9b3df3..0000000 --- a/src/actor/mailbox.rs +++ /dev/null @@ -1,87 +0,0 @@ -mod bounded; -mod unbounded; - -use dyn_clone::DynClone; -use futures::{future::BoxFuture, Future}; -use tokio::sync::oneshot; - -use crate::{ - error::{ActorStopReason, BoxSendError, SendError}, - message::{BoxReply, DynMessage}, - Actor, -}; - -use super::{ActorID, ActorRef}; - -/// A trait defining the behaviour and functionality of a mailbox. -pub trait Mailbox: SignalMailbox + Clone + Send + Sync { - type Receiver: MailboxReceiver; - type WeakMailbox: WeakMailbox; - - fn default_mailbox() -> (Self, Self::Receiver); - fn send( - &self, - signal: Signal, - ) -> impl Future>>> + Send + '_; - fn closed(&self) -> impl Future + '_; - fn is_closed(&self) -> bool; - fn downgrade(&self) -> Self::WeakMailbox; - fn strong_count(&self) -> usize; - fn weak_count(&self) -> usize; -} - -/// A mailbox receiver. -pub trait MailboxReceiver: Send + 'static { - fn recv(&mut self) -> impl Future>> + Send + '_; -} - -/// A weak mailbox which can be upraded. -pub trait WeakMailbox: SignalMailbox + Clone + Send + Sync { - type StrongMailbox; - - fn upgrade(&self) -> Option; - fn strong_count(&self) -> usize; - fn weak_count(&self) -> usize; -} - -#[allow(missing_debug_implementations)] -#[doc(hidden)] -pub enum Signal { - StartupFinished, - Message { - message: Box>, - actor_ref: ActorRef, - reply: Option>>, - sent_within_actor: bool, - }, - LinkDied { - id: ActorID, - reason: ActorStopReason, - }, - Stop, -} - -impl Signal { - pub(crate) fn downcast_message(self) -> Option - where - M: 'static, - { - match self { - Signal::Message { message, .. } => message.as_any().downcast().ok().map(|v| *v), - _ => None, - } - } -} - -#[doc(hidden)] -pub trait SignalMailbox: DynClone + Send { - fn signal_startup_finished(&self) -> BoxFuture<'_, Result<(), SendError>>; - fn signal_link_died( - &self, - id: ActorID, - reason: ActorStopReason, - ) -> BoxFuture<'_, Result<(), SendError>>; - fn signal_stop(&self) -> BoxFuture<'_, Result<(), SendError>>; -} - -dyn_clone::clone_trait_object!(SignalMailbox); diff --git a/src/actor/mailbox/bounded.rs b/src/actor/mailbox/bounded.rs deleted file mode 100644 index 699b98a..0000000 --- a/src/actor/mailbox/bounded.rs +++ /dev/null @@ -1,209 +0,0 @@ -use std::fmt; - -use futures::{future::BoxFuture, FutureExt}; -use tokio::sync::mpsc; - -use crate::{ - actor::ActorID, - error::{ActorStopReason, SendError}, - Actor, -}; - -use super::{Mailbox, MailboxReceiver, Signal, SignalMailbox, WeakMailbox}; - -/// An unbounded mailbox, where the sending messages to a full mailbox causes backpressure. -pub struct BoundedMailbox(mpsc::Sender>); - -impl BoundedMailbox { - /// Creates a new bounded mailbox with a given capacity. - #[inline] - pub fn new(capacity: usize) -> (Self, BoundedMailboxReceiver) { - let (tx, rx) = mpsc::channel(capacity); - (BoundedMailbox(tx), BoundedMailboxReceiver(rx)) - } -} - -impl Mailbox for BoundedMailbox { - type Receiver = BoundedMailboxReceiver; - type WeakMailbox = WeakBoundedMailbox; - - #[inline] - fn default_mailbox() -> (Self, Self::Receiver) { - BoundedMailbox::new(1000) - } - - #[inline] - async fn send(&self, signal: Signal) -> Result<(), SendError>> { - Ok(self.0.send(signal).await?) - } - - #[inline] - async fn closed(&self) { - self.0.closed().await - } - - #[inline] - fn is_closed(&self) -> bool { - self.0.is_closed() - } - - #[inline] - fn downgrade(&self) -> Self::WeakMailbox { - WeakBoundedMailbox(self.0.downgrade()) - } - - #[inline] - fn strong_count(&self) -> usize { - self.0.strong_count() - } - - #[inline] - fn weak_count(&self) -> usize { - self.0.weak_count() - } -} - -impl Clone for BoundedMailbox { - fn clone(&self) -> Self { - BoundedMailbox(self.0.clone()) - } -} - -impl fmt::Debug for BoundedMailbox { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("BoundedMailbox") - .field("tx", &self.0) - .finish() - } -} - -/// A bounded mailbox receiver. -pub struct BoundedMailboxReceiver(mpsc::Receiver>); - -impl fmt::Debug for BoundedMailboxReceiver { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("BoundedMailboxReceiver") - .field("rx", &self.0) - .finish() - } -} - -impl MailboxReceiver for BoundedMailboxReceiver { - async fn recv(&mut self) -> Option> { - self.0.recv().await - } -} - -/// A weak bounded mailbox that does not prevent the actor from being stopped. -pub struct WeakBoundedMailbox(mpsc::WeakSender>); - -impl WeakMailbox for WeakBoundedMailbox { - type StrongMailbox = BoundedMailbox; - - #[inline] - fn upgrade(&self) -> Option { - self.0.upgrade().map(BoundedMailbox) - } - - #[inline] - fn strong_count(&self) -> usize { - self.0.strong_count() - } - - #[inline] - fn weak_count(&self) -> usize { - self.0.weak_count() - } -} - -impl Clone for WeakBoundedMailbox { - fn clone(&self) -> Self { - WeakBoundedMailbox(self.0.clone()) - } -} - -impl fmt::Debug for WeakBoundedMailbox { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("WeakBoundedMailbox") - .field("tx", &self.0) - .finish() - } -} - -impl SignalMailbox for BoundedMailbox -where - A: Actor, -{ - fn signal_startup_finished(&self) -> BoxFuture<'_, Result<(), SendError>> { - async move { - self.0 - .send(Signal::StartupFinished) - .await - .map_err(|_| SendError::ActorNotRunning(())) - } - .boxed() - } - - fn signal_link_died( - &self, - id: ActorID, - reason: ActorStopReason, - ) -> BoxFuture<'_, Result<(), SendError>> { - async move { - self.0 - .send(Signal::LinkDied { id, reason }) - .await - .map_err(|_| SendError::ActorNotRunning(())) - } - .boxed() - } - - fn signal_stop(&self) -> BoxFuture<'_, Result<(), SendError>> { - async move { - self.0 - .send(Signal::Stop) - .await - .map_err(|_| SendError::ActorNotRunning(())) - } - .boxed() - } -} - -impl SignalMailbox for WeakBoundedMailbox -where - A: Actor, -{ - fn signal_startup_finished(&self) -> BoxFuture<'_, Result<(), SendError>> { - async move { - match self.upgrade() { - Some(mb) => mb.signal_startup_finished().await, - None => Err(SendError::ActorNotRunning(())), - } - } - .boxed() - } - - fn signal_link_died( - &self, - id: ActorID, - reason: ActorStopReason, - ) -> BoxFuture<'_, Result<(), SendError>> { - async move { - match self.upgrade() { - Some(mb) => mb.signal_link_died(id, reason).await, - None => Err(SendError::ActorNotRunning(())), - } - } - .boxed() - } - - fn signal_stop(&self) -> BoxFuture<'_, Result<(), SendError>> { - async move { - match self.upgrade() { - Some(mb) => mb.signal_stop().await, - None => Err(SendError::ActorNotRunning(())), - } - } - .boxed() - } -} diff --git a/src/actor/mailbox/unbounded.rs b/src/actor/mailbox/unbounded.rs deleted file mode 100644 index 63ac899..0000000 --- a/src/actor/mailbox/unbounded.rs +++ /dev/null @@ -1,198 +0,0 @@ -use std::fmt; - -use futures::{future::BoxFuture, FutureExt}; -use tokio::sync::mpsc; - -use crate::{ - actor::ActorID, - error::{ActorStopReason, SendError}, - Actor, -}; - -use super::{Mailbox, MailboxReceiver, Signal, SignalMailbox, WeakMailbox}; - -/// An unbounded mailbox, where the number of messages queued can grow infinitely. -pub struct UnboundedMailbox(mpsc::UnboundedSender>); - -impl UnboundedMailbox { - /// Creates a new unbounded mailbox. - #[inline] - pub fn new() -> (Self, UnboundedMailboxReceiver) { - let (tx, rx) = mpsc::unbounded_channel(); - (UnboundedMailbox(tx), UnboundedMailboxReceiver(rx)) - } -} - -impl Mailbox for UnboundedMailbox { - type Receiver = UnboundedMailboxReceiver; - type WeakMailbox = WeakUnboundedMailbox; - - #[inline] - fn default_mailbox() -> (Self, Self::Receiver) { - UnboundedMailbox::new() - } - - #[inline] - async fn send(&self, signal: Signal) -> Result<(), SendError>> { - Ok(self.0.send(signal)?) - } - - #[inline] - async fn closed(&self) { - self.0.closed().await - } - - #[inline] - fn is_closed(&self) -> bool { - self.0.is_closed() - } - - #[inline] - fn downgrade(&self) -> Self::WeakMailbox { - WeakUnboundedMailbox(self.0.downgrade()) - } - - #[inline] - fn strong_count(&self) -> usize { - self.0.strong_count() - } - - #[inline] - fn weak_count(&self) -> usize { - self.0.weak_count() - } -} - -impl Clone for UnboundedMailbox { - fn clone(&self) -> Self { - UnboundedMailbox(self.0.clone()) - } -} - -impl fmt::Debug for UnboundedMailbox { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("UnboundedMailbox") - .field("tx", &self.0) - .finish() - } -} - -/// An unbounded mailbox receiver. -pub struct UnboundedMailboxReceiver(mpsc::UnboundedReceiver>); - -impl MailboxReceiver for UnboundedMailboxReceiver { - async fn recv(&mut self) -> Option> { - self.0.recv().await - } -} - -/// A weak unbounded mailbox that does not prevent the actor from being stopped. -pub struct WeakUnboundedMailbox(mpsc::WeakUnboundedSender>); - -impl WeakMailbox for WeakUnboundedMailbox { - type StrongMailbox = UnboundedMailbox; - - #[inline] - fn upgrade(&self) -> Option { - self.0.upgrade().map(UnboundedMailbox) - } - - #[inline] - fn strong_count(&self) -> usize { - self.0.strong_count() - } - - #[inline] - fn weak_count(&self) -> usize { - self.0.weak_count() - } -} - -impl Clone for WeakUnboundedMailbox { - fn clone(&self) -> Self { - WeakUnboundedMailbox(self.0.clone()) - } -} - -impl fmt::Debug for WeakUnboundedMailbox { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("WeakUnboundedMailbox") - .field("tx", &self.0) - .finish() - } -} - -impl SignalMailbox for UnboundedMailbox -where - A: Actor, -{ - fn signal_startup_finished(&self) -> BoxFuture<'_, Result<(), SendError>> { - async move { - self.0 - .send(Signal::StartupFinished) - .map_err(|_| SendError::ActorNotRunning(())) - } - .boxed() - } - - fn signal_link_died( - &self, - id: ActorID, - reason: ActorStopReason, - ) -> BoxFuture<'_, Result<(), SendError>> { - async move { - self.0 - .send(Signal::LinkDied { id, reason }) - .map_err(|_| SendError::ActorNotRunning(())) - } - .boxed() - } - - fn signal_stop(&self) -> BoxFuture<'_, Result<(), SendError>> { - async move { - self.0 - .send(Signal::Stop) - .map_err(|_| SendError::ActorNotRunning(())) - } - .boxed() - } -} - -impl SignalMailbox for WeakUnboundedMailbox -where - A: Actor, -{ - fn signal_startup_finished(&self) -> BoxFuture<'_, Result<(), SendError>> { - async move { - match self.upgrade() { - Some(mb) => mb.signal_startup_finished().await, - None => Err(SendError::ActorNotRunning(())), - } - } - .boxed() - } - - fn signal_link_died( - &self, - id: ActorID, - reason: ActorStopReason, - ) -> BoxFuture<'_, Result<(), SendError>> { - async move { - match self.upgrade() { - Some(mb) => mb.signal_link_died(id, reason).await, - None => Err(SendError::ActorNotRunning(())), - } - } - .boxed() - } - - fn signal_stop(&self) -> BoxFuture<'_, Result<(), SendError>> { - async move { - match self.upgrade() { - Some(mb) => mb.signal_stop().await, - None => Err(SendError::ActorNotRunning(())), - } - } - .boxed() - } -}