Skip to content

Commit

Permalink
refactor: remove redundant ActorSwarmEvent
Browse files Browse the repository at this point in the history
  • Loading branch information
tqwewe committed Jan 10, 2025
1 parent 7002036 commit 80b6679
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 41 deletions.
9 changes: 5 additions & 4 deletions examples/manual_swarm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ use kameo::{
actor::ActorID,
error::RemoteSendError,
remote::{
ActorSwarm, ActorSwarmEvent, ActorSwarmHandler, SwarmBehaviour, SwarmRequest, SwarmResponse,
ActorSwarm, ActorSwarmBehaviourEvent, ActorSwarmHandler, SwarmBehaviour, SwarmRequest,
SwarmResponse,
},
};
use libp2p::{
Expand Down Expand Up @@ -84,13 +85,13 @@ fn handle_event(
}
SwarmEvent::Behaviour(event) => match event {
CustomBehaviourEvent::Kademlia(event) => {
swarm_handler.handle_event(swarm, ActorSwarmEvent::Kademlia(event))
swarm_handler.handle_event(swarm, ActorSwarmBehaviourEvent::Kademlia(event))
}
CustomBehaviourEvent::ActorRequestResponse(event) => {
swarm_handler.handle_event(swarm, ActorSwarmEvent::RequestResponse(event))
swarm_handler.handle_event(swarm, ActorSwarmBehaviourEvent::RequestResponse(event))
}
CustomBehaviourEvent::Mdns(event) => {
swarm_handler.handle_event(swarm, ActorSwarmEvent::Mdns(event))
swarm_handler.handle_event(swarm, ActorSwarmBehaviourEvent::Mdns(event))
}
CustomBehaviourEvent::CustomRequestResponse(request_response::Event::Message {
message,
Expand Down
2 changes: 1 addition & 1 deletion src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ pub enum BootstrapError {
/// Swarm already bootstrapped.
AlreadyBootstrapped(
&'static crate::remote::ActorSwarm,
Option<libp2p::Swarm<crate::remote::ActorBehaviour>>,
Option<libp2p::Swarm<crate::remote::ActorSwarmBehaviour>>,
),
/// Behaviour error.
BehaviourError(Box<dyn error::Error + Send + Sync + 'static>),
Expand Down
63 changes: 27 additions & 36 deletions src/remote/swarm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ impl ActorSwarm {
/// ## Returns
/// A reference to the initialized `ActorSwarm` if successful, or an error if the bootstrap fails.
pub fn bootstrap_with_identity(keypair: Keypair) -> Result<&'static Self, BootstrapError> {
let behaviour = ActorBehaviour::new(&keypair)
let behaviour = ActorSwarmBehaviour::new(&keypair)
.map_err(|err| BootstrapError::BehaviourError(Box::new(err)))?;
ActorSwarm::bootstrap_with_behaviour(keypair, behaviour)
}
Expand All @@ -110,7 +110,7 @@ impl ActorSwarm {
/// A reference to the initialized `ActorSwarm` if successful, or an error if the bootstrap fails.
pub fn bootstrap_with_behaviour(
keypair: Keypair,
behaviour: ActorBehaviour,
behaviour: ActorSwarmBehaviour,
) -> Result<&'static Self, BootstrapError> {
if let Some(swarm) = ACTOR_SWARM.get() {
return Err(BootstrapError::AlreadyBootstrapped(swarm, None));
Expand All @@ -137,7 +137,7 @@ impl ActorSwarm {
/// ## Returns
/// A reference to the initialized `ActorSwarm` if successful, or an error if the bootstrap fails.
pub fn bootstrap_with_swarm(
mut swarm: Swarm<ActorBehaviour>,
mut swarm: Swarm<ActorSwarmBehaviour>,
) -> Result<&'static Self, BootstrapError> {
let local_peer_id = Intern::new(swarm.local_peer_id().clone());
let (cmd_tx, cmd_rx) = mpsc::unbounded_channel();
Expand Down Expand Up @@ -418,20 +418,20 @@ impl ActorSwarmHandler {
}
}

async fn run(&mut self, swarm: &mut Swarm<ActorBehaviour>) {
async fn run(&mut self, swarm: &mut Swarm<ActorSwarmBehaviour>) {
loop {
tokio::select! {
Some(cmd) = self.cmd_rx.recv() => self.handle_command(swarm, cmd),
Some(event) = swarm.next() => {
match event {
SwarmEvent::Behaviour(ActorBehaviourEvent::Kademlia(event)) => {
self.handle_event(swarm, ActorSwarmEvent::Kademlia(event));
SwarmEvent::Behaviour(ActorSwarmBehaviourEvent::Kademlia(event)) => {
self.handle_event(swarm, ActorSwarmBehaviourEvent::Kademlia(event));
}
SwarmEvent::Behaviour(ActorBehaviourEvent::RequestResponse(event)) => {
self.handle_event(swarm, ActorSwarmEvent::RequestResponse(event));
SwarmEvent::Behaviour(ActorSwarmBehaviourEvent::RequestResponse(event)) => {
self.handle_event(swarm, ActorSwarmBehaviourEvent::RequestResponse(event));
}
SwarmEvent::Behaviour(ActorBehaviourEvent::Mdns(event)) => {
self.handle_event(swarm, ActorSwarmEvent::Mdns(event));
SwarmEvent::Behaviour(ActorSwarmBehaviourEvent::Mdns(event)) => {
self.handle_event(swarm, ActorSwarmBehaviourEvent::Mdns(event));
}
_ => {},
}
Expand Down Expand Up @@ -585,18 +585,20 @@ impl ActorSwarmHandler {
pub fn handle_event<B: SwarmBehaviour>(
&mut self,
swarm: &mut Swarm<B>,
event: ActorSwarmEvent,
event: ActorSwarmBehaviourEvent,
) {
match event {
ActorSwarmEvent::Mdns(mdns::Event::Discovered(list)) => {
ActorSwarmBehaviourEvent::Mdns(mdns::Event::Discovered(list)) => {
for (peer_id, multiaddr) in list {
swarm
.behaviour_mut()
.kademlia_add_address(&peer_id, multiaddr);
}
}
ActorSwarmEvent::Kademlia(kad::Event::OutboundQueryProgressed {
id, result, ..
ActorSwarmBehaviourEvent::Kademlia(kad::Event::OutboundQueryProgressed {
id,
result,
..
}) => match result {
kad::QueryResult::GetRecord(Ok(kad::GetRecordOk::FoundRecord(
record @ kad::PeerRecord {
Expand Down Expand Up @@ -626,7 +628,7 @@ impl ActorSwarmHandler {
}
_ => {}
},
ActorSwarmEvent::RequestResponse(request_response::Event::Message {
ActorSwarmBehaviourEvent::RequestResponse(request_response::Event::Message {
peer: _,
message:
request_response::Message::Request {
Expand Down Expand Up @@ -682,7 +684,7 @@ impl ActorSwarmHandler {
});
}
},
ActorSwarmEvent::RequestResponse(request_response::Event::Message {
ActorSwarmBehaviourEvent::RequestResponse(request_response::Event::Message {
peer: _,
message:
request_response::Message::Response {
Expand All @@ -694,11 +696,11 @@ impl ActorSwarmHandler {
let _ = tx.send(response);
}
}
ActorSwarmEvent::RequestResponse(request_response::Event::OutboundFailure {
request_id,
error,
..
}) => {
ActorSwarmBehaviourEvent::RequestResponse(
request_response::Event::OutboundFailure {
request_id, error, ..
},
) => {
if let Some(tx) = self.requests.remove(&request_id) {
let err = match error {
OutboundFailure::DialFailure => RemoteSendError::DialFailure,
Expand All @@ -717,17 +719,6 @@ impl ActorSwarmHandler {
}
}

/// A swarm event which should be handled by the [`ActorSwarmBehaviour`].
#[derive(Debug)]
pub enum ActorSwarmEvent {
/// Kademlia event.
Kademlia(kad::Event),
/// Request response event.
RequestResponse(request_response::Event<SwarmRequest, SwarmResponse, SwarmResponse>),
/// Mdns event.
Mdns(mdns::Event),
}

#[derive(Clone, Debug)]
pub(crate) struct SwarmSender(mpsc::UnboundedSender<SwarmCommand>);

Expand Down Expand Up @@ -1057,7 +1048,7 @@ mod behaviour {
/// Uses kademlia for actor registration, request response for messaging, and mdns for discovery.
#[allow(missing_debug_implementations)]
#[derive(NetworkBehaviour)]
pub struct ActorBehaviour {
pub struct ActorSwarmBehaviour {
/// Kademlia network for actor registration.
pub kademlia: kad::Behaviour<MemoryStore>,
/// Request response for actor messaging.
Expand All @@ -1069,10 +1060,10 @@ mod behaviour {

pub use behaviour::*;

impl ActorBehaviour {
impl ActorSwarmBehaviour {
/// Creates a new default actor behaviour with a keypair.
pub fn new(keypair: &Keypair) -> io::Result<Self> {
Ok(ActorBehaviour {
Ok(ActorSwarmBehaviour {
kademlia: kad::Behaviour::new(
keypair.public().to_peer_id(),
MemoryStore::new(keypair.public().to_peer_id()),
Expand All @@ -1089,7 +1080,7 @@ impl ActorBehaviour {
}
}

impl SwarmBehaviour for ActorBehaviour {
impl SwarmBehaviour for ActorSwarmBehaviour {
fn ask(
&mut self,
peer: &PeerId,
Expand Down

0 comments on commit 80b6679

Please sign in to comment.