Skip to content

Commit

Permalink
feat!: add request traits (#39)
Browse files Browse the repository at this point in the history
* feat!: add request traits
  • Loading branch information
tqwewe authored Sep 8, 2024
1 parent 6f7b6f7 commit 51d864e
Show file tree
Hide file tree
Showing 32 changed files with 2,484 additions and 1,319 deletions.
3 changes: 2 additions & 1 deletion benches/fibonacci.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use criterion::BenchmarkId;
use criterion::Criterion;
use criterion::{criterion_group, criterion_main};
use kameo::actor::BoundedMailbox;
use kameo::mailbox::bounded::BoundedMailbox;
use kameo::request::MessageSend;
use kameo::{
message::{Context, Message},
Actor,
Expand Down
3 changes: 2 additions & 1 deletion benches/overhead.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use criterion::Criterion;
use criterion::{criterion_group, criterion_main};
use kameo::actor::UnboundedMailbox;
use kameo::mailbox::unbounded::UnboundedMailbox;
use kameo::request::MessageSend;
use kameo::{
message::{Context, Message},
Actor,
Expand Down
7 changes: 4 additions & 3 deletions examples/ask.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
use std::time::Duration;

use kameo::{
actor::UnboundedMailbox,
mailbox::unbounded::UnboundedMailbox,
message::{Context, Message},
request::{MessageSend, MessageSendSync},
Actor,
};
use tracing::info;
Expand Down Expand Up @@ -35,7 +36,7 @@ impl Message<Inc> for MyActor {
}
}

#[tokio::main(flavor = "current_thread")]
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
tracing_subscriber::fmt()
.with_env_filter("trace".parse::<EnvFilter>().unwrap())
Expand All @@ -45,7 +46,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {

let my_actor_ref = kameo::spawn(MyActor::default());

my_actor_ref.tell(Inc { amount: 3 }).send()?;
my_actor_ref.tell(Inc { amount: 3 }).send_sync()?;
tokio::time::sleep(Duration::from_millis(200)).await;

// Increment the count by 3
Expand Down
9 changes: 5 additions & 4 deletions examples/basic.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use kameo::{
actor::UnboundedMailbox,
mailbox::unbounded::UnboundedMailbox,
message::{Context, Message},
request::{MessageSend, MessageSendSync},
Actor,
};
use tracing::info;
Expand Down Expand Up @@ -48,7 +49,7 @@ impl Message<ForceErr> for MyActor {
}
}

#[tokio::main(flavor = "current_thread")]
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
tracing_subscriber::fmt()
.with_env_filter("trace".parse::<EnvFilter>().unwrap())
Expand All @@ -63,14 +64,14 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
info!("Count is {count}");

// Increment the count by 50 in the background
my_actor_ref.tell(Inc { amount: 50 }).send()?;
my_actor_ref.tell(Inc { amount: 50 }).send_sync()?;

// Increment the count by 2
let count = my_actor_ref.ask(Inc { amount: 2 }).send().await?;
info!("Count is {count}");

// Async messages that return an Err will cause the actor to panic
my_actor_ref.tell(ForceErr).send()?;
my_actor_ref.tell(ForceErr).send_sync()?;

// Actor should be stopped, so we cannot send more messages to it
assert!(my_actor_ref.ask(Inc { amount: 2 }).send().await.is_err());
Expand Down
10 changes: 7 additions & 3 deletions examples/macro.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
use std::fmt;

use kameo::{messages, Actor};
use kameo::{
messages,
request::{MessageSend, MessageSendSync},
Actor,
};
use tracing::info;
use tracing_subscriber::EnvFilter;

Expand Down Expand Up @@ -54,7 +58,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
info!("Count is {count}");

// Increment the count by 50 in the background
my_actor_ref.tell(Inc { amount: 50 }).send()?;
my_actor_ref.tell(Inc { amount: 50 }).send_sync()?;

// Generic message
my_actor_ref
Expand All @@ -65,7 +69,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
.await?;

// Async messages that return an Err will cause the actor to panic
my_actor_ref.tell(ForceErr).send()?;
my_actor_ref.tell(ForceErr).send_sync()?;

// Actor should be stopped, so we cannot send more messages to it
assert!(my_actor_ref.ask(Inc { amount: 2 }).send().await.is_err());
Expand Down
3 changes: 2 additions & 1 deletion examples/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::time::Duration;
use kameo::{
actor::{ActorPool, BroadcastMsg, WorkerMsg},
message::{Context, Message},
request::MessageSend,
Actor,
};
use tracing_subscriber::EnvFilter;
Expand Down Expand Up @@ -36,7 +37,7 @@ impl Message<ForceStop> for MyActor {
}
}

#[tokio::main(flavor = "current_thread")]
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
tracing_subscriber::fmt()
.with_env_filter("warn".parse::<EnvFilter>().unwrap())
Expand Down
3 changes: 2 additions & 1 deletion examples/pubsub.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use kameo::{
actor::{PubSub, Publish, Subscribe},
message::{Context, Message},
request::MessageSend,
Actor,
};
use tracing_subscriber::EnvFilter;
Expand Down Expand Up @@ -38,7 +39,7 @@ impl Message<PrintActorID> for ActorB {
}
}

#[tokio::main(flavor = "current_thread")]
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
tracing_subscriber::fmt()
.with_env_filter("warn".parse::<EnvFilter>().unwrap())
Expand Down
1 change: 1 addition & 0 deletions examples/remote.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use kameo::{
actor::RemoteActorRef,
message::{Context, Message},
remote::ActorSwarm,
request::MessageSend,
Actor,
};
use kameo_macros::{remote_message, RemoteActor};
Expand Down
5 changes: 3 additions & 2 deletions examples/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@ use std::{future::pending, time};

use futures::stream;
use kameo::{
actor::{ActorRef, UnboundedMailbox},
actor::ActorRef,
error::BoxError,
mailbox::unbounded::UnboundedMailbox,
message::{Context, Message, StreamMessage},
Actor,
};
Expand Down Expand Up @@ -57,7 +58,7 @@ impl Message<StreamMessage<i64, &'static str, &'static str>> for MyActor {
}
}

#[tokio::main(flavor = "current_thread")]
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
tracing_subscriber::fmt()
.with_env_filter("trace".parse::<EnvFilter>().unwrap())
Expand Down
10 changes: 5 additions & 5 deletions macros/src/derive_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,21 +28,21 @@ impl ToTokens for DeriveActor {

let mailbox_expanded = match attrs.mailbox {
MailboxKind::Bounded(_) => quote! {
::kameo::actor::BoundedMailbox<Self>
::kameo::mailbox::bounded::BoundedMailbox<Self>
},
MailboxKind::Unbounded => quote! {
::kameo::actor::UnboundedMailbox<Self>
::kameo::mailbox::unbounded::UnboundedMailbox<Self>
},
};
let new_mailbox_expanded = match attrs.mailbox {
MailboxKind::Bounded(cap) => {
let cap = cap.unwrap_or(1000);
quote! {
::kameo::actor::BoundedMailbox::new(#cap)
::kameo::mailbox::bounded::BoundedMailbox::new(#cap)
}
}
MailboxKind::Unbounded => quote! {
::kameo::actor::UnboundedMailbox::new()
::kameo::mailbox::unbounded::UnboundedMailbox::new()
},
};

Expand All @@ -55,7 +55,7 @@ impl ToTokens for DeriveActor {
#name
}

fn new_mailbox() -> (Self::Mailbox, ::kameo::actor::MailboxReceiver<Self>) {
fn new_mailbox() -> (Self::Mailbox, <Self::Mailbox as ::kameo::mailbox::Mailbox<Self>>::Receiver) {
#new_mailbox_expanded
}
}
Expand Down
84 changes: 51 additions & 33 deletions macros/src/remote_message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,44 +60,62 @@ impl RemoteMessage {
#[linkme(crate = ::kameo::remote::_internal::linkme)]
static REG: (
::kameo::remote::_internal::RemoteMessageRegistrationID<'static>,
(
::kameo::remote::_internal::AskRemoteMessageFn,
::kameo::remote::_internal::TellRemoteMessageFn,
),
::kameo::remote::_internal::RemoteMessageFns,
) = (
::kameo::remote::_internal::RemoteMessageRegistrationID {
actor_remote_id: <#actor_ty as ::kameo::remote::RemoteActor>::REMOTE_ID,
message_remote_id: <#actor_ty #ty_generics as ::kameo::remote::RemoteMessage<#message_generics>>::REMOTE_ID,
},
(
(|actor_id: ::kameo::actor::ActorID,
msg: ::std::vec::Vec<u8>,
mailbox_timeout: ::std::option::Option<::std::time::Duration>,
reply_timeout: ::std::option::Option<::std::time::Duration>,
immediate: bool| {
::std::boxed::Box::pin(::kameo::remote::_internal::ask_remote_message::<
#actor_ty,
#message_generics,
>(
actor_id,
msg,
mailbox_timeout,
reply_timeout,
immediate,
))
}) as ::kameo::remote::_internal::AskRemoteMessageFn,
(|actor_id: ::kameo::actor::ActorID,
msg: ::std::vec::Vec<u8>,
mailbox_timeout: ::std::option::Option<::std::time::Duration>,
immediate: bool| {
::std::boxed::Box::pin(::kameo::remote::_internal::tell_remote_message::<
#actor_ty,
#message_generics,
>(
actor_id, msg, mailbox_timeout, immediate
))
}) as ::kameo::remote::_internal::TellRemoteMessageFn,
),
::kameo::remote::_internal::RemoteMessageFns {
ask: (|actor_id: ::kameo::actor::ActorID,
msg: ::std::vec::Vec<u8>,
mailbox_timeout: ::std::option::Option<::std::time::Duration>,
reply_timeout: ::std::option::Option<::std::time::Duration>| {
::std::boxed::Box::pin(::kameo::remote::_internal::ask::<
#actor_ty,
#message_generics,
>(
actor_id,
msg,
mailbox_timeout,
reply_timeout,
))
}) as ::kameo::remote::_internal::RemoteAskFn,
try_ask: (|actor_id: ::kameo::actor::ActorID,
msg: ::std::vec::Vec<u8>,
reply_timeout: ::std::option::Option<::std::time::Duration>| {
::std::boxed::Box::pin(::kameo::remote::_internal::try_ask::<
#actor_ty,
#message_generics,
>(
actor_id,
msg,
reply_timeout,
))
}) as ::kameo::remote::_internal::RemoteTryAskFn,
tell: (|actor_id: ::kameo::actor::ActorID,
msg: ::std::vec::Vec<u8>,
mailbox_timeout: ::std::option::Option<::std::time::Duration>| {
::std::boxed::Box::pin(::kameo::remote::_internal::tell::<
#actor_ty,
#message_generics,
>(
actor_id,
msg,
mailbox_timeout,
))
}) as ::kameo::remote::_internal::RemoteTellFn,
try_tell: (|actor_id: ::kameo::actor::ActorID,
msg: ::std::vec::Vec<u8>| {
::std::boxed::Box::pin(::kameo::remote::_internal::try_tell::<
#actor_ty,
#message_generics,
>(
actor_id,
msg,
))
}) as ::kameo::remote::_internal::RemoteTryTellFn,
},
);
};
}
Expand Down
9 changes: 5 additions & 4 deletions src/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
mod actor_ref;
mod id;
mod kind;
mod mailbox;
mod pool;
mod pubsub;
mod spawn;
Expand All @@ -33,11 +32,13 @@ use std::any;

use futures::Future;

use crate::error::{ActorStopReason, BoxError, PanicError};
use crate::{
error::{ActorStopReason, BoxError, PanicError},
mailbox::Mailbox,
};

pub use actor_ref::*;
pub use id::*;
pub use mailbox::*;
pub use pool::*;
pub use pubsub::*;
pub use spawn::*;
Expand Down Expand Up @@ -87,7 +88,7 @@ pub trait Actor: Sized + Send + 'static {
}

/// Creates a new mailbox.
fn new_mailbox() -> (Self::Mailbox, MailboxReceiver<Self>) {
fn new_mailbox() -> (Self::Mailbox, <Self::Mailbox as Mailbox<Self>>::Receiver) {
Self::Mailbox::default_mailbox()
}

Expand Down
9 changes: 5 additions & 4 deletions src/actor/actor_ref.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use tokio::{

use crate::{
error::{RegistrationError, SendError},
mailbox::{Mailbox, Signal, SignalMailbox, WeakMailbox},
message::{Message, StreamMessage},
remote::{ActorSwarm, RemoteActor, RemoteMessage, SwarmCommand},
reply::Reply,
Expand All @@ -20,7 +21,7 @@ use crate::{
Actor,
};

use super::{id::ActorID, Mailbox, Signal, SignalMailbox, WeakMailbox};
use super::id::ActorID;

task_local! {
pub(crate) static CURRENT_ACTOR_ID: ActorID;
Expand Down Expand Up @@ -320,8 +321,8 @@ where
reply: None,
sent_within_actor: false,
})
.await?;
Ok(())
.await
.map_err(|err| err.map_msg(|msg| msg.downcast_message::<M>().unwrap()))
}
}

Expand Down Expand Up @@ -362,7 +363,7 @@ impl<A: Actor> AsRef<Links> for ActorRef<A> {
pub struct RemoteActorRef<A: Actor> {
id: ActorID,
swarm_tx: mpsc::Sender<SwarmCommand>,
phantom: PhantomData<A>,
phantom: PhantomData<A::Mailbox>,
}

impl<A: Actor> RemoteActorRef<A> {
Expand Down
3 changes: 2 additions & 1 deletion src/actor/kind.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,11 @@ use tokio::sync::oneshot;
use crate::{
actor::{Actor, ActorRef, WeakActorRef},
error::{ActorStopReason, BoxSendError, PanicError},
mailbox::Signal,
message::{BoxReply, DynMessage},
};

use super::{ActorID, Signal};
use super::ActorID;

pub(crate) trait ActorState<A: Actor>: Sized {
fn new_from_actor(actor: A, actor_ref: WeakActorRef<A>) -> Self;
Expand Down
Loading

0 comments on commit 51d864e

Please sign in to comment.