diff --git a/Cargo.lock b/Cargo.lock index 83e205abf..adf11d73f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -339,6 +339,7 @@ dependencies = [ "libc", "log", "native-windows-gui", + "net2", "once_cell", "parking_lot", "serde", @@ -445,6 +446,17 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "546c37ac5d9e56f55e73b677106873d9d9f5190605e41a856503623648488cae" +[[package]] +name = "net2" +version = "0.2.37" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "391630d12b68002ae1e25e8f974306474966550ad82dac6886fb8910c19568ae" +dependencies = [ + "cfg-if 0.1.10", + "libc", + "winapi", +] + [[package]] name = "num-integer" version = "0.1.42" diff --git a/Cargo.toml b/Cargo.toml index beee8840b..802436fc6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -25,6 +25,7 @@ kanata-keyberon = "0.2.4" heapless = "0.7.15" serde = { version = "1", features = ["derive"] } serde_json = "1" +net2 = "0.2" [target.'cfg(target_os = "linux")'.dependencies] evdev-rs = "0.4.0" diff --git a/src/kanata.rs b/src/kanata.rs index d744dd9b7..51972b71d 100644 --- a/src/kanata.rs +++ b/src/kanata.rs @@ -18,7 +18,7 @@ use crate::cfg::LayerInfo; use crate::custom_action::*; use crate::keys::*; use crate::oskbd::*; -use crate::tcp_server::EventNotification; +use crate::tcp_server::ServerMessage; use crate::{cfg, ValidatedArgs}; use kanata_keyberon::key_code::*; @@ -98,7 +98,7 @@ impl Kanata { } /// Advance keyberon layout state and send events based on changes to its state. - fn handle_time_ticks(&mut self, tx: &Option>) -> Result<()> { + fn handle_time_ticks(&mut self, tx: &Option>) -> Result<()> { let now = time::Instant::now(); let ms_elapsed = now.duration_since(self.last_tick).as_millis(); @@ -266,7 +266,7 @@ impl Kanata { } } - fn check_handle_layer_change(&mut self, tx: &Option>) { + fn check_handle_layer_change(&mut self, tx: &Option>) { let cur_layer = self.layout.current_layer(); if cur_layer != self.prev_layer { let new = self.layer_info[cur_layer].name.clone(); @@ -274,7 +274,7 @@ impl Kanata { self.print_layer(cur_layer); if let Some(tx) = tx { - match tx.try_send(EventNotification::LayerChange { new }) { + match tx.try_send(ServerMessage::LayerChange { new }) { Ok(_) => {} Err(error) => { log::error!("could not sent event notification: {}", error); @@ -289,7 +289,7 @@ impl Kanata { } pub fn start_notification_loop( - rx: Receiver, + rx: Receiver, clients: Arc>>, ) { info!("Kanata: listening for event notifications to relay to connected clients"); @@ -321,12 +321,12 @@ impl Kanata { Err(_) => { // the client is no longer connected, let's remove them stale_clients.push(id.clone()); - log::debug!("removing disconnected notification client"); } } } for id in &stale_clients { + log::warn!("removing disconnected tcp client: {id}"); clients.remove(id); } } @@ -339,7 +339,7 @@ impl Kanata { pub fn start_processing_loop( kanata: Arc>, rx: Receiver, - tx: Option>, + tx: Option>, ) { info!("Kanata: entering the processing loop"); std::thread::spawn(move || { diff --git a/src/tcp_server.rs b/src/tcp_server.rs index 0b2b639fc..406e83e1b 100644 --- a/src/tcp_server.rs +++ b/src/tcp_server.rs @@ -1,25 +1,32 @@ use crate::Kanata; use anyhow::Result; +use net2::TcpStreamExt; use parking_lot::Mutex; use serde::{Deserialize, Serialize}; use std::collections::HashMap; -use std::io::{BufReader, Read}; +use std::io::Read; use std::net::{TcpListener, TcpStream}; use std::str::FromStr; use std::sync::Arc; +use std::time::Duration; #[derive(Debug, Serialize, Deserialize)] -pub enum EventNotification { +pub enum ServerMessage { LayerChange { new: String }, } -impl EventNotification { +#[derive(Debug, Serialize, Deserialize)] +pub enum ClientMessage { + ChangeLayer { new: String }, +} + +impl ServerMessage { pub fn as_bytes(&self) -> Result> { Ok(serde_json::to_string(self)?.as_bytes().to_vec()) } } -impl FromStr for EventNotification { +impl FromStr for ClientMessage { type Err = serde_json::Error; fn from_str(s: &str) -> std::result::Result { @@ -44,22 +51,27 @@ impl TcpServer { let listener = TcpListener::bind(format!("0.0.0.0:{}", self.port)) .expect("could not start the tcp server"); - let cl = self.connections.clone(); + let connections = self.connections.clone(); + std::thread::spawn(move || { for stream in listener.incoming() { match stream { Ok(stream) => { + stream + .set_keepalive(Some(Duration::from_secs(30))) + .expect("could not set tcp connection keepalive"); + let addr = stream .peer_addr() .expect("could not find peer address") .to_string(); { - cl.lock().insert(addr.clone(), stream); + connections.lock().insert(addr.clone(), stream); } - if let Some(stream) = cl.lock().get(&addr) { - let stream = stream + if let Some(stream) = connections.lock().get(&addr) { + let mut stream = stream .try_clone() .expect("could not clone tcpstream to read incoming messages"); @@ -67,14 +79,13 @@ impl TcpServer { std::thread::spawn(move || { log::info!("listening for incoming messages {}", &addr); loop { - let mut buffer: [u8; 1024] = [0; 1024]; - let mut reader = BufReader::new(&stream); - if let Ok(size) = reader.read(&mut buffer) { - if let Ok(event) = EventNotification::from_str( - &String::from_utf8_lossy(&buffer[..size]), + let mut buf = vec![0; 1024]; + if let Ok(size) = stream.read(&mut buf) { + if let Ok(event) = ClientMessage::from_str( + &String::from_utf8_lossy(&buf[..size]), ) { match event { - EventNotification::LayerChange { new } => { + ClientMessage::ChangeLayer { new } => { k_cl.lock().change_layer(new); } }