Skip to content

Commit

Permalink
refactor frontend task
Browse files Browse the repository at this point in the history
  • Loading branch information
feschber committed Jan 19, 2024
1 parent 61ff05c commit 622b04b
Show file tree
Hide file tree
Showing 2 changed files with 339 additions and 309 deletions.
324 changes: 15 additions & 309 deletions src/server.rs
Original file line number Diff line number Diff line change
@@ -1,36 +1,30 @@
use anyhow::anyhow;
use log;
use std::{
cell::{Cell, RefCell},
collections::HashSet,
io::Result,
net::IpAddr,
rc::Rc,
time::Duration,
};
use tokio::{io::ReadHalf, net::UdpSocket, signal, sync::mpsc::Sender};
use tokio::net::UdpSocket;
use tokio::signal;

#[cfg(unix)]
use tokio::net::UnixStream;

#[cfg(windows)]
use tokio::net::TcpStream;

use std::{io::ErrorKind, net::SocketAddr};
use std::net::SocketAddr;

use crate::{
client::{ClientEvent, ClientHandle, ClientManager, Position},
client::{ClientHandle, ClientManager},
config::Config,
dns,
event::Event,
frontend::{self, FrontendEvent, FrontendListener, FrontendNotify},
frontend::{FrontendEvent, FrontendListener, FrontendNotify},
server::producer_task::ProducerEvent,
};
use crate::{consumer, producer};

use self::consumer_task::ConsumerEvent;

mod consumer_task;
mod frontend_task;
mod producer_task;

const MAX_RESPONSE_TIME: Duration = Duration::from_millis(500);
Expand Down Expand Up @@ -79,7 +73,7 @@ impl Server {

pub async fn run(&self) -> anyhow::Result<()> {
// create frontend communication adapter
let mut frontend = match FrontendListener::new().await {
let frontend = match FrontendListener::new().await {
Some(f) => f?,
None => {
// none means some other instance is already running
Expand All @@ -89,9 +83,7 @@ impl Server {
};
let (consumer, producer) = tokio::join!(consumer::create(), producer::create());

let (frontend_tx, mut frontend_rx) = tokio::sync::mpsc::channel(32);
let (resolve_tx, mut resolve_rx) = tokio::sync::mpsc::channel(32);
let (frontend_notify_tx, mut frontend_notify_rx) = tokio::sync::mpsc::channel(32);
let (receiver_tx, receiver_rx) = tokio::sync::mpsc::channel(32);
let (sender_tx, mut sender_rx) = tokio::sync::mpsc::channel(32);
let (port_tx, mut port_rx) = tokio::sync::mpsc::channel(32);
Expand All @@ -112,38 +104,14 @@ impl Server {
);

// frontend listener
let server = self.clone();
let producer_notify = producer_channel.clone();
let consumer_notify = consumer_channel.clone();
let frontend_ch = frontend_tx.clone();
let resolve_ch = resolve_tx.clone();
let mut frontend_task = tokio::task::spawn_local(async move {
loop {
tokio::select! {
stream = frontend.accept() => {
let stream = match stream {
Ok(s) => s,
Err(e) => {
log::warn!("error accepting frontend connection: {e}");
continue;
}
};
server.handle_frontend_stream(&frontend_ch, stream).await;
}
event = frontend_rx.recv() => {
let frontend_event = event.ok_or(anyhow!("frontend channel closed"))?;
if server.handle_frontend_event(&producer_notify, &consumer_notify, &resolve_ch, &mut frontend, &port_tx, frontend_event).await {
break;
}
}
notify = frontend_notify_rx.recv() => {
let notify = notify.ok_or(anyhow!("frontend notify closed"))?;
let _ = frontend.notify_all(notify).await;
}
}
}
anyhow::Ok(())
});
let (mut frontend_task, frontend_tx, frontend_notify_tx) = frontend_task::new(
frontend,
self.clone(),
producer_channel.clone(),
consumer_channel.clone(),
resolve_tx.clone(),
port_tx,
);

// dns resolver

Expand Down Expand Up @@ -405,268 +373,6 @@ impl Server {

Ok(())
}

pub async fn add_client(
&self,
resolver_tx: &Sender<(String, ClientHandle)>,
hostname: Option<String>,
addr: HashSet<IpAddr>,
port: u16,
pos: Position,
) -> ClientHandle {
log::info!(
"adding client [{}]{} @ {:?}",
pos,
hostname.as_deref().unwrap_or(""),
&addr
);
let handle =
self.client_manager
.borrow_mut()
.add_client(hostname.clone(), addr, port, pos, false);

log::debug!("add_client {handle}");

if let Some(hostname) = hostname {
let _ = resolver_tx.send((hostname, handle)).await;
}

handle
}

pub async fn activate_client(
&self,
producer_notify_tx: &Sender<ProducerEvent>,
consumer_notify_tx: &Sender<ConsumerEvent>,
client: ClientHandle,
active: bool,
) {
let (client, pos) = match self.client_manager.borrow_mut().get_mut(client) {
Some(state) => {
state.active = active;
(state.client.handle, state.client.pos)
}
None => return,
};
if active {
let _ = producer_notify_tx
.send(ProducerEvent::ClientEvent(ClientEvent::Create(client, pos)))
.await;
let _ = consumer_notify_tx
.send(ConsumerEvent::ClientEvent(ClientEvent::Create(client, pos)))
.await;
} else {
let _ = producer_notify_tx
.send(ProducerEvent::ClientEvent(ClientEvent::Destroy(client)))
.await;
let _ = consumer_notify_tx
.send(ConsumerEvent::ClientEvent(ClientEvent::Destroy(client)))
.await;
}
}

pub async fn remove_client(
&self,
producer_notify_tx: &Sender<ProducerEvent>,
consumer_notify_tx: &Sender<ConsumerEvent>,
frontend: &mut FrontendListener,
client: ClientHandle,
) -> Option<ClientHandle> {
let _ = producer_notify_tx
.send(ProducerEvent::ClientEvent(ClientEvent::Destroy(client)))
.await;
let _ = consumer_notify_tx
.send(ConsumerEvent::ClientEvent(ClientEvent::Destroy(client)))
.await;

let Some(client) = self
.client_manager
.borrow_mut()
.remove_client(client)
.map(|s| s.client.handle)
else {
return None;
};

let notify = FrontendNotify::NotifyClientDelete(client);
log::debug!("{notify:?}");
if let Err(e) = frontend.notify_all(notify).await {
log::error!("error notifying frontend: {e}");
}
Some(client)
}

async fn update_client(
&self,
producer_notify_tx: &Sender<ProducerEvent>,
consumer_notify_tx: &Sender<ConsumerEvent>,
resolve_tx: &Sender<(String, ClientHandle)>,
client_update: (ClientHandle, Option<String>, u16, Position),
) {
let (handle, hostname, port, pos) = client_update;
let (hostname, handle, active) = {
// retrieve state
let mut client_manager = self.client_manager.borrow_mut();
let Some(state) = client_manager.get_mut(handle) else {
return;
};

// update pos
state.client.pos = pos;

// update port
if state.client.port != port {
state.client.port = port;
state.active_addr = state.active_addr.map(|a| SocketAddr::new(a.ip(), port));
}

// update hostname
if state.client.hostname != hostname {
state.client.ips = HashSet::new();
state.active_addr = None;
state.client.hostname = hostname;
}

log::debug!("client updated: {:?}", state);
(
state.client.hostname.clone(),
state.client.handle,
state.active,
)
};

// resolve dns
if let Some(hostname) = hostname {
let _ = resolve_tx.send((hostname, handle)).await;
}

// update state in event consumer & producer
if active {
let _ = producer_notify_tx
.send(ProducerEvent::ClientEvent(ClientEvent::Destroy(handle)))
.await;
let _ = consumer_notify_tx
.send(ConsumerEvent::ClientEvent(ClientEvent::Destroy(handle)))
.await;
let _ = producer_notify_tx
.send(ProducerEvent::ClientEvent(ClientEvent::Create(handle, pos)))
.await;
let _ = consumer_notify_tx
.send(ConsumerEvent::ClientEvent(ClientEvent::Create(handle, pos)))
.await;
}
}

async fn handle_frontend_stream(
&self,
frontend_tx: &Sender<FrontendEvent>,
#[cfg(unix)] mut stream: ReadHalf<UnixStream>,
#[cfg(windows)] mut stream: ReadHalf<TcpStream>,
) {
use std::io;

let tx = frontend_tx.clone();
tokio::task::spawn_local(async move {
let _ = tx.send(FrontendEvent::Enumerate()).await;
loop {
let event = frontend::read_event(&mut stream).await;
match event {
Ok(event) => {
let _ = tx.send(event).await;
}
Err(e) => {
if let Some(e) = e.downcast_ref::<io::Error>() {
if e.kind() == ErrorKind::UnexpectedEof {
return;
}
}
log::error!("error reading frontend event: {e}");
return;
}
}
}
});
}

async fn handle_frontend_event(
&self,
producer_tx: &Sender<ProducerEvent>,
consumer_tx: &Sender<ConsumerEvent>,
resolve_tx: &Sender<(String, ClientHandle)>,
frontend: &mut FrontendListener,
port_tx: &Sender<u16>,
event: FrontendEvent,
) -> bool {
log::debug!("frontend: {event:?}");
let response = match event {
FrontendEvent::AddClient(hostname, port, pos) => {
let handle = self
.add_client(resolve_tx, hostname, HashSet::new(), port, pos)
.await;

let client = self
.client_manager
.borrow()
.get(handle)
.unwrap()
.client
.clone();
Some(FrontendNotify::NotifyClientCreate(client))
}
FrontendEvent::ActivateClient(handle, active) => {
self.activate_client(producer_tx, consumer_tx, handle, active)
.await;
Some(FrontendNotify::NotifyClientActivate(handle, active))
}
FrontendEvent::ChangePort(port) => {
let _ = port_tx.send(port).await;
None
}
FrontendEvent::DelClient(handle) => {
self.remove_client(producer_tx, consumer_tx, frontend, handle)
.await;
Some(FrontendNotify::NotifyClientDelete(handle))
}
FrontendEvent::Enumerate() => {
let clients = self
.client_manager
.borrow()
.get_client_states()
.map(|s| (s.client.clone(), s.active))
.collect();
Some(FrontendNotify::Enumerate(clients))
}
FrontendEvent::Shutdown() => {
log::info!("terminating gracefully...");
return true;
}
FrontendEvent::UpdateClient(handle, hostname, port, pos) => {
self.update_client(
producer_tx,
consumer_tx,
resolve_tx,
(handle, hostname, port, pos),
)
.await;

let client = self
.client_manager
.borrow()
.get(handle)
.unwrap()
.client
.clone();
Some(FrontendNotify::NotifyClientUpdate(client))
}
};
let Some(response) = response else {
return false;
};
if let Err(e) = frontend.notify_all(response).await {
log::error!("error notifying frontend: {e}");
}
false
}
}

async fn receive_event(socket: &UdpSocket) -> anyhow::Result<(Event, SocketAddr)> {
Expand Down
Loading

0 comments on commit 622b04b

Please sign in to comment.