diff --git a/src/server.rs b/src/server.rs index ebaa24f8..61701ab5 100644 --- a/src/server.rs +++ b/src/server.rs @@ -1,29 +1,22 @@ -use anyhow::anyhow; use log; use std::{ cell::{Cell, RefCell}, collections::HashSet, io::Result, - net::IpAddr, rc::Rc, time::Duration, }; -use tokio::{io::ReadHalf, net::UdpSocket, signal, sync::mpsc::Sender}; +use tokio::net::UdpSocket; +use tokio::signal; -#[cfg(unix)] -use tokio::net::UnixStream; - -#[cfg(windows)] -use tokio::net::TcpStream; - -use std::{io::ErrorKind, net::SocketAddr}; +use std::net::SocketAddr; use crate::{ - client::{ClientEvent, ClientHandle, ClientManager, Position}, + client::{ClientHandle, ClientManager}, config::Config, dns, event::Event, - frontend::{self, FrontendEvent, FrontendListener, FrontendNotify}, + frontend::{FrontendEvent, FrontendListener, FrontendNotify}, server::producer_task::ProducerEvent, }; use crate::{consumer, producer}; @@ -31,6 +24,7 @@ use crate::{consumer, producer}; use self::consumer_task::ConsumerEvent; mod consumer_task; +mod frontend_task; mod producer_task; const MAX_RESPONSE_TIME: Duration = Duration::from_millis(500); @@ -79,7 +73,7 @@ impl Server { pub async fn run(&self) -> anyhow::Result<()> { // create frontend communication adapter - let mut frontend = match FrontendListener::new().await { + let frontend = match FrontendListener::new().await { Some(f) => f?, None => { // none means some other instance is already running @@ -89,9 +83,7 @@ impl Server { }; let (consumer, producer) = tokio::join!(consumer::create(), producer::create()); - let (frontend_tx, mut frontend_rx) = tokio::sync::mpsc::channel(32); let (resolve_tx, mut resolve_rx) = tokio::sync::mpsc::channel(32); - let (frontend_notify_tx, mut frontend_notify_rx) = tokio::sync::mpsc::channel(32); let (receiver_tx, receiver_rx) = tokio::sync::mpsc::channel(32); let (sender_tx, mut sender_rx) = tokio::sync::mpsc::channel(32); let (port_tx, mut port_rx) = tokio::sync::mpsc::channel(32); @@ -112,38 +104,14 @@ impl Server { ); // frontend listener - let server = self.clone(); - let producer_notify = producer_channel.clone(); - let consumer_notify = consumer_channel.clone(); - let frontend_ch = frontend_tx.clone(); - let resolve_ch = resolve_tx.clone(); - let mut frontend_task = tokio::task::spawn_local(async move { - loop { - tokio::select! { - stream = frontend.accept() => { - let stream = match stream { - Ok(s) => s, - Err(e) => { - log::warn!("error accepting frontend connection: {e}"); - continue; - } - }; - server.handle_frontend_stream(&frontend_ch, stream).await; - } - event = frontend_rx.recv() => { - let frontend_event = event.ok_or(anyhow!("frontend channel closed"))?; - if server.handle_frontend_event(&producer_notify, &consumer_notify, &resolve_ch, &mut frontend, &port_tx, frontend_event).await { - break; - } - } - notify = frontend_notify_rx.recv() => { - let notify = notify.ok_or(anyhow!("frontend notify closed"))?; - let _ = frontend.notify_all(notify).await; - } - } - } - anyhow::Ok(()) - }); + let (mut frontend_task, frontend_tx, frontend_notify_tx) = frontend_task::new( + frontend, + self.clone(), + producer_channel.clone(), + consumer_channel.clone(), + resolve_tx.clone(), + port_tx, + ); // dns resolver @@ -405,268 +373,6 @@ impl Server { Ok(()) } - - pub async fn add_client( - &self, - resolver_tx: &Sender<(String, ClientHandle)>, - hostname: Option, - addr: HashSet, - port: u16, - pos: Position, - ) -> ClientHandle { - log::info!( - "adding client [{}]{} @ {:?}", - pos, - hostname.as_deref().unwrap_or(""), - &addr - ); - let handle = - self.client_manager - .borrow_mut() - .add_client(hostname.clone(), addr, port, pos, false); - - log::debug!("add_client {handle}"); - - if let Some(hostname) = hostname { - let _ = resolver_tx.send((hostname, handle)).await; - } - - handle - } - - pub async fn activate_client( - &self, - producer_notify_tx: &Sender, - consumer_notify_tx: &Sender, - client: ClientHandle, - active: bool, - ) { - let (client, pos) = match self.client_manager.borrow_mut().get_mut(client) { - Some(state) => { - state.active = active; - (state.client.handle, state.client.pos) - } - None => return, - }; - if active { - let _ = producer_notify_tx - .send(ProducerEvent::ClientEvent(ClientEvent::Create(client, pos))) - .await; - let _ = consumer_notify_tx - .send(ConsumerEvent::ClientEvent(ClientEvent::Create(client, pos))) - .await; - } else { - let _ = producer_notify_tx - .send(ProducerEvent::ClientEvent(ClientEvent::Destroy(client))) - .await; - let _ = consumer_notify_tx - .send(ConsumerEvent::ClientEvent(ClientEvent::Destroy(client))) - .await; - } - } - - pub async fn remove_client( - &self, - producer_notify_tx: &Sender, - consumer_notify_tx: &Sender, - frontend: &mut FrontendListener, - client: ClientHandle, - ) -> Option { - let _ = producer_notify_tx - .send(ProducerEvent::ClientEvent(ClientEvent::Destroy(client))) - .await; - let _ = consumer_notify_tx - .send(ConsumerEvent::ClientEvent(ClientEvent::Destroy(client))) - .await; - - let Some(client) = self - .client_manager - .borrow_mut() - .remove_client(client) - .map(|s| s.client.handle) - else { - return None; - }; - - let notify = FrontendNotify::NotifyClientDelete(client); - log::debug!("{notify:?}"); - if let Err(e) = frontend.notify_all(notify).await { - log::error!("error notifying frontend: {e}"); - } - Some(client) - } - - async fn update_client( - &self, - producer_notify_tx: &Sender, - consumer_notify_tx: &Sender, - resolve_tx: &Sender<(String, ClientHandle)>, - client_update: (ClientHandle, Option, u16, Position), - ) { - let (handle, hostname, port, pos) = client_update; - let (hostname, handle, active) = { - // retrieve state - let mut client_manager = self.client_manager.borrow_mut(); - let Some(state) = client_manager.get_mut(handle) else { - return; - }; - - // update pos - state.client.pos = pos; - - // update port - if state.client.port != port { - state.client.port = port; - state.active_addr = state.active_addr.map(|a| SocketAddr::new(a.ip(), port)); - } - - // update hostname - if state.client.hostname != hostname { - state.client.ips = HashSet::new(); - state.active_addr = None; - state.client.hostname = hostname; - } - - log::debug!("client updated: {:?}", state); - ( - state.client.hostname.clone(), - state.client.handle, - state.active, - ) - }; - - // resolve dns - if let Some(hostname) = hostname { - let _ = resolve_tx.send((hostname, handle)).await; - } - - // update state in event consumer & producer - if active { - let _ = producer_notify_tx - .send(ProducerEvent::ClientEvent(ClientEvent::Destroy(handle))) - .await; - let _ = consumer_notify_tx - .send(ConsumerEvent::ClientEvent(ClientEvent::Destroy(handle))) - .await; - let _ = producer_notify_tx - .send(ProducerEvent::ClientEvent(ClientEvent::Create(handle, pos))) - .await; - let _ = consumer_notify_tx - .send(ConsumerEvent::ClientEvent(ClientEvent::Create(handle, pos))) - .await; - } - } - - async fn handle_frontend_stream( - &self, - frontend_tx: &Sender, - #[cfg(unix)] mut stream: ReadHalf, - #[cfg(windows)] mut stream: ReadHalf, - ) { - use std::io; - - let tx = frontend_tx.clone(); - tokio::task::spawn_local(async move { - let _ = tx.send(FrontendEvent::Enumerate()).await; - loop { - let event = frontend::read_event(&mut stream).await; - match event { - Ok(event) => { - let _ = tx.send(event).await; - } - Err(e) => { - if let Some(e) = e.downcast_ref::() { - if e.kind() == ErrorKind::UnexpectedEof { - return; - } - } - log::error!("error reading frontend event: {e}"); - return; - } - } - } - }); - } - - async fn handle_frontend_event( - &self, - producer_tx: &Sender, - consumer_tx: &Sender, - resolve_tx: &Sender<(String, ClientHandle)>, - frontend: &mut FrontendListener, - port_tx: &Sender, - event: FrontendEvent, - ) -> bool { - log::debug!("frontend: {event:?}"); - let response = match event { - FrontendEvent::AddClient(hostname, port, pos) => { - let handle = self - .add_client(resolve_tx, hostname, HashSet::new(), port, pos) - .await; - - let client = self - .client_manager - .borrow() - .get(handle) - .unwrap() - .client - .clone(); - Some(FrontendNotify::NotifyClientCreate(client)) - } - FrontendEvent::ActivateClient(handle, active) => { - self.activate_client(producer_tx, consumer_tx, handle, active) - .await; - Some(FrontendNotify::NotifyClientActivate(handle, active)) - } - FrontendEvent::ChangePort(port) => { - let _ = port_tx.send(port).await; - None - } - FrontendEvent::DelClient(handle) => { - self.remove_client(producer_tx, consumer_tx, frontend, handle) - .await; - Some(FrontendNotify::NotifyClientDelete(handle)) - } - FrontendEvent::Enumerate() => { - let clients = self - .client_manager - .borrow() - .get_client_states() - .map(|s| (s.client.clone(), s.active)) - .collect(); - Some(FrontendNotify::Enumerate(clients)) - } - FrontendEvent::Shutdown() => { - log::info!("terminating gracefully..."); - return true; - } - FrontendEvent::UpdateClient(handle, hostname, port, pos) => { - self.update_client( - producer_tx, - consumer_tx, - resolve_tx, - (handle, hostname, port, pos), - ) - .await; - - let client = self - .client_manager - .borrow() - .get(handle) - .unwrap() - .client - .clone(); - Some(FrontendNotify::NotifyClientUpdate(client)) - } - }; - let Some(response) = response else { - return false; - }; - if let Err(e) = frontend.notify_all(response).await { - log::error!("error notifying frontend: {e}"); - } - false - } } async fn receive_event(socket: &UdpSocket) -> anyhow::Result<(Event, SocketAddr)> { diff --git a/src/server/frontend_task.rs b/src/server/frontend_task.rs new file mode 100644 index 00000000..611d72da --- /dev/null +++ b/src/server/frontend_task.rs @@ -0,0 +1,324 @@ +use std::{ + collections::HashSet, + io::ErrorKind, + net::{IpAddr, SocketAddr}, +}; +#[cfg(unix)] +use tokio::net::UnixStream; + +#[cfg(windows)] +use tokio::net::TcpStream; + +use anyhow::{anyhow, Result}; +use tokio::{io::ReadHalf, sync::mpsc::Sender, task::JoinHandle}; + +use crate::{ + client::{ClientEvent, ClientHandle, Position}, + frontend::{self, FrontendEvent, FrontendListener, FrontendNotify}, +}; + +use super::{consumer_task::ConsumerEvent, producer_task::ProducerEvent, Server}; + +pub(crate) fn new( + mut frontend: FrontendListener, + server: Server, + producer_notify: Sender, + consumer_notify: Sender, + resolve_ch: Sender<(String, u32)>, + port_tx: Sender, +) -> ( + JoinHandle>, + Sender, + Sender, +) { + let (event_tx, mut event_rx) = tokio::sync::mpsc::channel(32); + let (notify_tx, mut notify_rx) = tokio::sync::mpsc::channel(32); + let event_tx_clone = event_tx.clone(); + let frontend_task = tokio::task::spawn_local(async move { + loop { + tokio::select! { + stream = frontend.accept() => { + let stream = match stream { + Ok(s) => s, + Err(e) => { + log::warn!("error accepting frontend connection: {e}"); + continue; + } + }; + handle_frontend_stream(&event_tx_clone, stream).await; + } + event = event_rx.recv() => { + let frontend_event = event.ok_or(anyhow!("frontend channel closed"))?; + if handle_frontend_event(&server, &producer_notify, &consumer_notify, &resolve_ch, &mut frontend, &port_tx, frontend_event).await { + break; + } + } + notify = notify_rx.recv() => { + let notify = notify.ok_or(anyhow!("frontend notify closed"))?; + let _ = frontend.notify_all(notify).await; + } + } + } + anyhow::Ok(()) + }); + (frontend_task, event_tx, notify_tx) +} + +async fn handle_frontend_stream( + frontend_tx: &Sender, + #[cfg(unix)] mut stream: ReadHalf, + #[cfg(windows)] mut stream: ReadHalf, +) { + use std::io; + + let tx = frontend_tx.clone(); + tokio::task::spawn_local(async move { + let _ = tx.send(FrontendEvent::Enumerate()).await; + loop { + let event = frontend::read_event(&mut stream).await; + match event { + Ok(event) => { + let _ = tx.send(event).await; + } + Err(e) => { + if let Some(e) = e.downcast_ref::() { + if e.kind() == ErrorKind::UnexpectedEof { + return; + } + } + log::error!("error reading frontend event: {e}"); + return; + } + } + } + }); +} + +async fn handle_frontend_event( + server: &Server, + producer_tx: &Sender, + consumer_tx: &Sender, + resolve_tx: &Sender<(String, ClientHandle)>, + frontend: &mut FrontendListener, + port_tx: &Sender, + event: FrontendEvent, +) -> bool { + log::debug!("frontend: {event:?}"); + let response = match event { + FrontendEvent::AddClient(hostname, port, pos) => { + let handle = add_client(server, resolve_tx, hostname, HashSet::new(), port, pos).await; + + let client = server + .client_manager + .borrow() + .get(handle) + .unwrap() + .client + .clone(); + Some(FrontendNotify::NotifyClientCreate(client)) + } + FrontendEvent::ActivateClient(handle, active) => { + activate_client(server, producer_tx, consumer_tx, handle, active).await; + Some(FrontendNotify::NotifyClientActivate(handle, active)) + } + FrontendEvent::ChangePort(port) => { + let _ = port_tx.send(port).await; + None + } + FrontendEvent::DelClient(handle) => { + remove_client(server, producer_tx, consumer_tx, frontend, handle).await; + Some(FrontendNotify::NotifyClientDelete(handle)) + } + FrontendEvent::Enumerate() => { + let clients = server + .client_manager + .borrow() + .get_client_states() + .map(|s| (s.client.clone(), s.active)) + .collect(); + Some(FrontendNotify::Enumerate(clients)) + } + FrontendEvent::Shutdown() => { + log::info!("terminating gracefully..."); + return true; + } + FrontendEvent::UpdateClient(handle, hostname, port, pos) => { + update_client( + server, + producer_tx, + consumer_tx, + resolve_tx, + (handle, hostname, port, pos), + ) + .await; + + let client = server + .client_manager + .borrow() + .get(handle) + .unwrap() + .client + .clone(); + Some(FrontendNotify::NotifyClientUpdate(client)) + } + }; + let Some(response) = response else { + return false; + }; + if let Err(e) = frontend.notify_all(response).await { + log::error!("error notifying frontend: {e}"); + } + false +} + +pub async fn add_client( + server: &Server, + resolver_tx: &Sender<(String, ClientHandle)>, + hostname: Option, + addr: HashSet, + port: u16, + pos: Position, +) -> ClientHandle { + log::info!( + "adding client [{}]{} @ {:?}", + pos, + hostname.as_deref().unwrap_or(""), + &addr + ); + let handle = + server + .client_manager + .borrow_mut() + .add_client(hostname.clone(), addr, port, pos, false); + + log::debug!("add_client {handle}"); + + if let Some(hostname) = hostname { + let _ = resolver_tx.send((hostname, handle)).await; + } + + handle +} + +pub async fn activate_client( + server: &Server, + producer_notify_tx: &Sender, + consumer_notify_tx: &Sender, + client: ClientHandle, + active: bool, +) { + let (client, pos) = match server.client_manager.borrow_mut().get_mut(client) { + Some(state) => { + state.active = active; + (state.client.handle, state.client.pos) + } + None => return, + }; + if active { + let _ = producer_notify_tx + .send(ProducerEvent::ClientEvent(ClientEvent::Create(client, pos))) + .await; + let _ = consumer_notify_tx + .send(ConsumerEvent::ClientEvent(ClientEvent::Create(client, pos))) + .await; + } else { + let _ = producer_notify_tx + .send(ProducerEvent::ClientEvent(ClientEvent::Destroy(client))) + .await; + let _ = consumer_notify_tx + .send(ConsumerEvent::ClientEvent(ClientEvent::Destroy(client))) + .await; + } +} + +pub async fn remove_client( + server: &Server, + producer_notify_tx: &Sender, + consumer_notify_tx: &Sender, + frontend: &mut FrontendListener, + client: ClientHandle, +) -> Option { + let _ = producer_notify_tx + .send(ProducerEvent::ClientEvent(ClientEvent::Destroy(client))) + .await; + let _ = consumer_notify_tx + .send(ConsumerEvent::ClientEvent(ClientEvent::Destroy(client))) + .await; + + let Some(client) = server + .client_manager + .borrow_mut() + .remove_client(client) + .map(|s| s.client.handle) + else { + return None; + }; + + let notify = FrontendNotify::NotifyClientDelete(client); + log::debug!("{notify:?}"); + if let Err(e) = frontend.notify_all(notify).await { + log::error!("error notifying frontend: {e}"); + } + Some(client) +} + +async fn update_client( + server: &Server, + producer_notify_tx: &Sender, + consumer_notify_tx: &Sender, + resolve_tx: &Sender<(String, ClientHandle)>, + client_update: (ClientHandle, Option, u16, Position), +) { + let (handle, hostname, port, pos) = client_update; + let (hostname, handle, active) = { + // retrieve state + let mut client_manager = server.client_manager.borrow_mut(); + let Some(state) = client_manager.get_mut(handle) else { + return; + }; + + // update pos + state.client.pos = pos; + + // update port + if state.client.port != port { + state.client.port = port; + state.active_addr = state.active_addr.map(|a| SocketAddr::new(a.ip(), port)); + } + + // update hostname + if state.client.hostname != hostname { + state.client.ips = HashSet::new(); + state.active_addr = None; + state.client.hostname = hostname; + } + + log::debug!("client updated: {:?}", state); + ( + state.client.hostname.clone(), + state.client.handle, + state.active, + ) + }; + + // resolve dns + if let Some(hostname) = hostname { + let _ = resolve_tx.send((hostname, handle)).await; + } + + // update state in event consumer & producer + if active { + let _ = producer_notify_tx + .send(ProducerEvent::ClientEvent(ClientEvent::Destroy(handle))) + .await; + let _ = consumer_notify_tx + .send(ConsumerEvent::ClientEvent(ClientEvent::Destroy(handle))) + .await; + let _ = producer_notify_tx + .send(ProducerEvent::ClientEvent(ClientEvent::Create(handle, pos))) + .await; + let _ = consumer_notify_tx + .send(ConsumerEvent::ClientEvent(ClientEvent::Create(handle, pos))) + .await; + } +}