From fd28431dd62883c06911923fa76016c0f38aa3c2 Mon Sep 17 00:00:00 2001 From: LGUG2Z Date: Thu, 21 Jul 2022 07:31:05 -0700 Subject: [PATCH] feat(tcp): add keep-alive, split client/server messages This commit sets a 30-second keep-alive on opened TcpStreams (using net2::TcpStreamExt as keep-alive functionality is not currently in the standard library) and splits the previous EventNotification enum into a ServerMessage and a ClientMessage enum respectively. I have changed the semantics between ServerMessage and ClientMessage slightly so that ClientMessage variants are imperative requests that align closer with the names of the handler functions, ie. ChangeLayer calls fn change_layer(). Whenever a client's TcpStream cannot be written to, either because it has notified the server of a disconnect or because it has failed the keep-alive, it will be removed from the connections HashMap on the TcpServer struct. re #47 --- Cargo.lock | 12 ++++++++++++ Cargo.toml | 1 + src/kanata.rs | 14 +++++++------- src/tcp_server.rs | 39 +++++++++++++++++++++++++-------------- 4 files changed, 45 insertions(+), 21 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 83e205abf..adf11d73f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -339,6 +339,7 @@ dependencies = [ "libc", "log", "native-windows-gui", + "net2", "once_cell", "parking_lot", "serde", @@ -445,6 +446,17 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "546c37ac5d9e56f55e73b677106873d9d9f5190605e41a856503623648488cae" +[[package]] +name = "net2" +version = "0.2.37" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "391630d12b68002ae1e25e8f974306474966550ad82dac6886fb8910c19568ae" +dependencies = [ + "cfg-if 0.1.10", + "libc", + "winapi", +] + [[package]] name = "num-integer" version = "0.1.42" diff --git a/Cargo.toml b/Cargo.toml index beee8840b..802436fc6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -25,6 +25,7 @@ kanata-keyberon = "0.2.4" heapless = "0.7.15" serde = { version = "1", features = ["derive"] } serde_json = "1" +net2 = "0.2" [target.'cfg(target_os = "linux")'.dependencies] evdev-rs = "0.4.0" diff --git a/src/kanata.rs b/src/kanata.rs index d744dd9b7..51972b71d 100644 --- a/src/kanata.rs +++ b/src/kanata.rs @@ -18,7 +18,7 @@ use crate::cfg::LayerInfo; use crate::custom_action::*; use crate::keys::*; use crate::oskbd::*; -use crate::tcp_server::EventNotification; +use crate::tcp_server::ServerMessage; use crate::{cfg, ValidatedArgs}; use kanata_keyberon::key_code::*; @@ -98,7 +98,7 @@ impl Kanata { } /// Advance keyberon layout state and send events based on changes to its state. - fn handle_time_ticks(&mut self, tx: &Option>) -> Result<()> { + fn handle_time_ticks(&mut self, tx: &Option>) -> Result<()> { let now = time::Instant::now(); let ms_elapsed = now.duration_since(self.last_tick).as_millis(); @@ -266,7 +266,7 @@ impl Kanata { } } - fn check_handle_layer_change(&mut self, tx: &Option>) { + fn check_handle_layer_change(&mut self, tx: &Option>) { let cur_layer = self.layout.current_layer(); if cur_layer != self.prev_layer { let new = self.layer_info[cur_layer].name.clone(); @@ -274,7 +274,7 @@ impl Kanata { self.print_layer(cur_layer); if let Some(tx) = tx { - match tx.try_send(EventNotification::LayerChange { new }) { + match tx.try_send(ServerMessage::LayerChange { new }) { Ok(_) => {} Err(error) => { log::error!("could not sent event notification: {}", error); @@ -289,7 +289,7 @@ impl Kanata { } pub fn start_notification_loop( - rx: Receiver, + rx: Receiver, clients: Arc>>, ) { info!("Kanata: listening for event notifications to relay to connected clients"); @@ -321,12 +321,12 @@ impl Kanata { Err(_) => { // the client is no longer connected, let's remove them stale_clients.push(id.clone()); - log::debug!("removing disconnected notification client"); } } } for id in &stale_clients { + log::warn!("removing disconnected tcp client: {id}"); clients.remove(id); } } @@ -339,7 +339,7 @@ impl Kanata { pub fn start_processing_loop( kanata: Arc>, rx: Receiver, - tx: Option>, + tx: Option>, ) { info!("Kanata: entering the processing loop"); std::thread::spawn(move || { diff --git a/src/tcp_server.rs b/src/tcp_server.rs index 0b2b639fc..406e83e1b 100644 --- a/src/tcp_server.rs +++ b/src/tcp_server.rs @@ -1,25 +1,32 @@ use crate::Kanata; use anyhow::Result; +use net2::TcpStreamExt; use parking_lot::Mutex; use serde::{Deserialize, Serialize}; use std::collections::HashMap; -use std::io::{BufReader, Read}; +use std::io::Read; use std::net::{TcpListener, TcpStream}; use std::str::FromStr; use std::sync::Arc; +use std::time::Duration; #[derive(Debug, Serialize, Deserialize)] -pub enum EventNotification { +pub enum ServerMessage { LayerChange { new: String }, } -impl EventNotification { +#[derive(Debug, Serialize, Deserialize)] +pub enum ClientMessage { + ChangeLayer { new: String }, +} + +impl ServerMessage { pub fn as_bytes(&self) -> Result> { Ok(serde_json::to_string(self)?.as_bytes().to_vec()) } } -impl FromStr for EventNotification { +impl FromStr for ClientMessage { type Err = serde_json::Error; fn from_str(s: &str) -> std::result::Result { @@ -44,22 +51,27 @@ impl TcpServer { let listener = TcpListener::bind(format!("0.0.0.0:{}", self.port)) .expect("could not start the tcp server"); - let cl = self.connections.clone(); + let connections = self.connections.clone(); + std::thread::spawn(move || { for stream in listener.incoming() { match stream { Ok(stream) => { + stream + .set_keepalive(Some(Duration::from_secs(30))) + .expect("could not set tcp connection keepalive"); + let addr = stream .peer_addr() .expect("could not find peer address") .to_string(); { - cl.lock().insert(addr.clone(), stream); + connections.lock().insert(addr.clone(), stream); } - if let Some(stream) = cl.lock().get(&addr) { - let stream = stream + if let Some(stream) = connections.lock().get(&addr) { + let mut stream = stream .try_clone() .expect("could not clone tcpstream to read incoming messages"); @@ -67,14 +79,13 @@ impl TcpServer { std::thread::spawn(move || { log::info!("listening for incoming messages {}", &addr); loop { - let mut buffer: [u8; 1024] = [0; 1024]; - let mut reader = BufReader::new(&stream); - if let Ok(size) = reader.read(&mut buffer) { - if let Ok(event) = EventNotification::from_str( - &String::from_utf8_lossy(&buffer[..size]), + let mut buf = vec![0; 1024]; + if let Ok(size) = stream.read(&mut buf) { + if let Ok(event) = ClientMessage::from_str( + &String::from_utf8_lossy(&buf[..size]), ) { match event { - EventNotification::LayerChange { new } => { + ClientMessage::ChangeLayer { new } => { k_cl.lock().change_layer(new); } }