Skip to content

Commit

Permalink
fix: remote ser/de requirements for remote actors
Browse files Browse the repository at this point in the history
  • Loading branch information
tqwewe committed Aug 30, 2024
1 parent 938d3c1 commit 45b539c
Show file tree
Hide file tree
Showing 6 changed files with 4 additions and 139 deletions.
8 changes: 0 additions & 8 deletions kameo/src/actor/spawn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,14 +93,6 @@ where
spawn_in_thread_inner::<A, SyncActor<A>>(actor)
}

// /// Spawns an actor which runs remotely on another node.
// pub async fn spawn_remote<A>(actor: &A) -> Result<RemoteActorRef<A>, RemoteSpawnError>
// where
// A: Actor + RemoteActor + Serialize,
// {
// // TODO
// }

/// Spawns an `!Sync` actor in a tokio task.
///
/// Unsync actors cannot handle queries, as this would require the actor be to `Sync` since queries are procesed
Expand Down
27 changes: 0 additions & 27 deletions kameo/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -640,33 +640,6 @@ impl fmt::Display for PanicError {
}
}

/// Errors that can occur when spawning a remote actor.
#[derive(Debug, Serialize, Deserialize)]
pub enum RemoteSpawnError {
/// Failed to serialize actor state.
SerializeActor(String),
/// Failed to deserialize actor state.
DeserializeActor(String),
/// Unknown actor.
UnknownActor(String),
}

impl fmt::Display for RemoteSpawnError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
RemoteSpawnError::SerializeActor(err) => write!(f, "failed to serialize actor: {err}"),
RemoteSpawnError::DeserializeActor(err) => {
write!(f, "failed to deserialize actor: {err}")
}
RemoteSpawnError::UnknownActor(actor_remote_id) => {
write!(f, "unknown actor '{actor_remote_id}'")
}
}
}
}

impl error::Error for RemoteSpawnError {}

/// An infallible error type, similar to std::convert::Infallible.
///
/// Kameo provides its own Infallible type in order to implement Serialize/Deserialize for it.
Expand Down
28 changes: 2 additions & 26 deletions kameo/src/remote.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,14 @@ use std::{
};

use _internal::{
AskRemoteMessageFn, RemoteMessageRegistrationID, RemoteSpawnFn, TellRemoteMessageFn,
REMOTE_ACTORS, REMOTE_MESSAGES,
AskRemoteMessageFn, RemoteMessageRegistrationID, TellRemoteMessageFn, REMOTE_MESSAGES,
};
pub use libp2p::PeerId;
pub use libp2p_identity::Keypair;
use once_cell::sync::Lazy;
use tokio::sync::Mutex;

use crate::{
actor::ActorID,
error::{RemoteSendError, RemoteSpawnError},
};
use crate::{actor::ActorID, error::RemoteSendError};

#[doc(hidden)]
pub mod _internal;
Expand All @@ -30,16 +26,6 @@ pub use swarm::*;
pub(crate) static REMOTE_REGISTRY: Lazy<Mutex<HashMap<ActorID, Box<dyn any::Any + Send + Sync>>>> =
Lazy::new(|| Mutex::new(HashMap::new()));

static REMOTE_ACTORS_MAP: Lazy<HashMap<&'static str, RemoteSpawnFn>> = Lazy::new(|| {
let mut existing_ids = HashSet::new();
for (id, _) in REMOTE_ACTORS {
if !existing_ids.insert(id) {
panic!("duplicate remote actor detected for actor '{id}'",);
}
}
REMOTE_ACTORS.iter().copied().collect()
});

static REMOTE_MESSAGES_MAP: Lazy<
HashMap<RemoteMessageRegistrationID<'static>, (AskRemoteMessageFn, TellRemoteMessageFn)>,
> = Lazy::new(|| {
Expand Down Expand Up @@ -73,16 +59,6 @@ pub trait RemoteMessage<M> {
const REMOTE_ID: &'static str;
}

pub(crate) async fn spawn(
actor_remote_id: String,
payload: Vec<u8>,
) -> Result<ActorID, RemoteSpawnError> {
let Some(spawn) = REMOTE_ACTORS_MAP.get(&actor_remote_id.as_str()) else {
return Err(RemoteSpawnError::UnknownActor(actor_remote_id));
};
spawn(payload).await
}

pub(crate) async fn ask(
actor_id: ActorID,
actor_remote_id: Cow<'static, str>,
Expand Down
24 changes: 1 addition & 23 deletions kameo/src/remote/_internal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,24 +6,18 @@ use serde::de::DeserializeOwned;
use serde::Serialize;

use crate::actor::{ActorID, ActorRef};
use crate::error::{RemoteSendError, RemoteSpawnError};
use crate::error::RemoteSendError;
use crate::message::Message;
use crate::{Actor, Reply};

use super::REMOTE_REGISTRY;

#[distributed_slice]
pub static REMOTE_ACTORS: [(&'static str, RemoteSpawnFn)];

#[distributed_slice]
pub static REMOTE_MESSAGES: [(
RemoteMessageRegistrationID<'static>,
(AskRemoteMessageFn, TellRemoteMessageFn),
)];

pub type RemoteSpawnFn =
fn(actor: Vec<u8>) -> BoxFuture<'static, Result<ActorID, RemoteSpawnError>>;

pub type AskRemoteMessageFn = fn(
actor_id: ActorID,
msg: Vec<u8>,
Expand All @@ -45,22 +39,6 @@ pub struct RemoteMessageRegistrationID<'a> {
pub message_remote_id: &'a str,
}

pub async fn spawn_remote<A>(actor: Vec<u8>) -> Result<ActorID, RemoteSpawnError>
where
A: Actor + DeserializeOwned + Send + Sync + 'static,
{
let actor: A = rmp_serde::decode::from_slice(&actor)
.map_err(|err| RemoteSpawnError::DeserializeActor(err.to_string()))?;
let actor_ref = crate::spawn(actor);
let actor_id = actor_ref.id();
REMOTE_REGISTRY
.lock()
.await
.insert(actor_id, Box::new(actor_ref));

Ok(actor_id)
}

pub async fn ask_remote_message<A, M>(
actor_id: ActorID,
msg: Vec<u8>,
Expand Down
39 changes: 1 addition & 38 deletions kameo/src/remote/swarm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use tracing::trace;

use crate::{
actor::{ActorID, ActorRef, RemoteActorRef},
error::{BootstrapError, RegistrationError, RemoteSendError, RemoteSpawnError},
error::{BootstrapError, RegistrationError, RemoteSendError},
remote, Actor,
};

Expand Down Expand Up @@ -341,15 +341,6 @@ impl SwarmActor {
} => {
if self.swarm.network_info().num_peers() == 0 {
match req {
SwarmReq::Spawn {
actor_remote_id,
payload,
} => {
tokio::spawn(async move {
let result = remote::spawn(actor_remote_id, payload).await;
let _ = reply.send(SwarmResp::Spawn(result));
});
}
SwarmReq::Ask {
actor_id,
actor_remote_id,
Expand Down Expand Up @@ -404,13 +395,6 @@ impl SwarmActor {
self.requests.insert(req_id, reply);
}
}
SwarmCommand::SendSpawnResponse { result, channel } => {
let _ = self
.swarm
.behaviour_mut()
.request_response
.send_response(channel, SwarmResp::Spawn(result));
}
SwarmCommand::SendAskResponse { result, channel } => {
let _ = self
.swarm
Expand Down Expand Up @@ -483,18 +467,6 @@ impl SwarmActor {
},
},
)) => match request {
SwarmReq::Spawn {
actor_remote_id,
payload,
} => {
let tx = self.cmd_tx.clone();
tokio::spawn(async move {
let result = remote::spawn(actor_remote_id, payload).await;
let _ = tx
.send(SwarmCommand::SendSpawnResponse { result, channel })
.await;
});
}
SwarmReq::Ask {
actor_id,
actor_remote_id,
Expand Down Expand Up @@ -608,10 +580,6 @@ pub(crate) enum SwarmCommand {
req: SwarmReq,
reply: oneshot::Sender<SwarmResp>,
},
SendSpawnResponse {
result: Result<ActorID, RemoteSpawnError>,
channel: ResponseChannel<SwarmResp>,
},
SendAskResponse {
result: Result<Vec<u8>, RemoteSendError<Vec<u8>>>,
channel: ResponseChannel<SwarmResp>,
Expand Down Expand Up @@ -658,10 +626,6 @@ impl<'a> ActorRegistration<'a> {

#[derive(Debug, Serialize, Deserialize)]
pub(crate) enum SwarmReq {
Spawn {
actor_remote_id: String,
payload: Vec<u8>,
},
Ask {
actor_id: ActorID,
actor_remote_id: Cow<'static, str>,
Expand All @@ -683,7 +647,6 @@ pub(crate) enum SwarmReq {

#[derive(Debug, Serialize, Deserialize)]
pub(crate) enum SwarmResp {
Spawn(Result<ActorID, RemoteSpawnError>),
Ask(Result<Vec<u8>, RemoteSendError<Vec<u8>>>),
Tell(Result<(), RemoteSendError<Vec<u8>>>),
OutboundFailure(RemoteSendError<()>),
Expand Down
17 changes: 0 additions & 17 deletions kameo_macros/src/derive_remote_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,6 @@ impl ToTokens for DeriveRemoteActor {
const REMOTE_ID: &'static str =
::std::concat!(::std::module_path!(), "::", ::std::stringify!(#ident));
}

const _: () = {
#[::kameo::remote::_internal::distributed_slice(
::kameo::remote::_internal::REMOTE_ACTORS
)]
static REG: (
&'static str,
::kameo::remote::_internal::RemoteSpawnFn,
) = (
<#ident as ::kameo::remote::RemoteActor>::REMOTE_ID,
(|actor: ::std::vec::Vec<u8>| {
::std::boxed::Box::pin(::kameo::remote::_internal::spawn_remote::<
#ident,
>(actor))
}) as ::kameo::remote::_internal::RemoteSpawnFn,
);
};
});
}
}
Expand Down

0 comments on commit 45b539c

Please sign in to comment.