Skip to content

Commit

Permalink
fix(CompositeDevice): resolve deadlock issues with switching profiles
Browse files Browse the repository at this point in the history
  • Loading branch information
ShadowApex committed May 24, 2024
1 parent aee9d8f commit 8f569d2
Show file tree
Hide file tree
Showing 18 changed files with 502 additions and 318 deletions.
4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ serde_yaml = "0.9.27"
thiserror = "1.0.56"
tokio = { version = "*", features = ["full"] }
uhid-virt = "0.0.7"
zbus = { version = "4.1.2", default-features = false, features = ["tokio"] }
zbus_macros = "4.1.2"
zbus = { version = "4.2.2", default-features = false, features = ["tokio"] }
zbus_macros = "4.2.2"

[profile.release]
debug = false
Expand Down
14 changes: 1 addition & 13 deletions src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -438,7 +438,7 @@ impl CompositeDeviceConfig {
) -> bool {
//TODO: Check if the evdev has no proterties defined, that would always match.

if is_virtual(device) {
if device.is_virtual() {
log::debug!("{} is virtual, skipping.", device.name);
return false;
}
Expand Down Expand Up @@ -576,15 +576,3 @@ impl CompositeDeviceConfig {
Some(matches)
}
}

/// Determines if a procfs device is virtual or real.
fn is_virtual(device: &procfs::device::Device) -> bool {
if !device.phys_path.is_empty() {
return false;
}

if device.sysfs_path.contains("/devices/virtual") {
return true;
}
false
}
54 changes: 39 additions & 15 deletions src/dbus/interface/composite_device.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{collections::HashSet, str::FromStr};
use std::{collections::HashSet, str::FromStr, time::Duration};

use tokio::sync::mpsc;
use zbus::{
Expand Down Expand Up @@ -33,7 +33,7 @@ impl CompositeDeviceInterface {
async fn name(&self) -> fdo::Result<String> {
let (sender, mut receiver) = mpsc::channel::<String>(1);
self.tx
.send(Command::GetName(sender))
.send_timeout(Command::GetName(sender), Duration::from_millis(500))
.await
.map_err(|e| fdo::Error::Failed(e.to_string()))?;
let Some(name) = receiver.recv().await else {
Expand All @@ -48,7 +48,7 @@ impl CompositeDeviceInterface {
async fn profile_name(&self) -> fdo::Result<String> {
let (sender, mut receiver) = mpsc::channel::<String>(1);
self.tx
.send(Command::GetProfileName(sender))
.send_timeout(Command::GetProfileName(sender), Duration::from_millis(500))
.await
.map_err(|e| fdo::Error::Failed(e.to_string()))?;
let Some(profile_name) = receiver.recv().await else {
Expand All @@ -61,7 +61,7 @@ impl CompositeDeviceInterface {
/// Stop the composite device and all target devices
async fn stop(&self) -> fdo::Result<()> {
self.tx
.send(Command::Stop)
.send_timeout(Command::Stop, Duration::from_millis(500))
.await
.map_err(|e| fdo::Error::Failed(e.to_string()))?;
Ok(())
Expand All @@ -71,7 +71,10 @@ impl CompositeDeviceInterface {
async fn load_profile_path(&self, path: String) -> fdo::Result<()> {
let (sender, mut receiver) = mpsc::channel::<Result<(), String>>(1);
self.tx
.send(Command::LoadProfilePath(path, sender))
.send_timeout(
Command::LoadProfilePath(path, sender),
Duration::from_millis(500),
)
.await
.map_err(|e| fdo::Error::Failed(e.to_string()))?;

Expand All @@ -97,7 +100,10 @@ impl CompositeDeviceInterface {
/// new target devices.
async fn set_target_devices(&self, target_device_types: Vec<String>) -> fdo::Result<()> {
self.tx
.send(Command::SetTargetDevices(target_device_types))
.send_timeout(
Command::SetTargetDevices(target_device_types),
Duration::from_millis(500),
)
.await
.map_err(|err| fdo::Error::Failed(err.to_string()))?;
Ok(())
Expand Down Expand Up @@ -200,7 +206,7 @@ impl CompositeDeviceInterface {
}

self.tx
.send(Command::WriteChordEvent(chord))
.send_timeout(Command::WriteChordEvent(chord), Duration::from_millis(500))
.await
.map_err(|e| fdo::Error::Failed(e.to_string()))?;

Expand Down Expand Up @@ -241,7 +247,10 @@ impl CompositeDeviceInterface {
}

self.tx
.send(Command::SetInterceptActivation(activation_caps, target_cap))
.send_timeout(
Command::SetInterceptActivation(activation_caps, target_cap),
Duration::from_millis(500),
)
.await
.map_err(|e| fdo::Error::Failed(e.to_string()))?;

Expand All @@ -253,7 +262,7 @@ impl CompositeDeviceInterface {
async fn capabilities(&self) -> fdo::Result<Vec<String>> {
let (sender, mut receiver) = mpsc::channel::<HashSet<Capability>>(1);
self.tx
.send(Command::GetCapabilities(sender))
.send_timeout(Command::GetCapabilities(sender), Duration::from_millis(500))
.await
.map_err(|e| fdo::Error::Failed(e.to_string()))?;
let Some(capabilities) = receiver.recv().await else {
Expand Down Expand Up @@ -288,7 +297,10 @@ impl CompositeDeviceInterface {
async fn target_capabilities(&self) -> fdo::Result<Vec<String>> {
let (sender, mut receiver) = mpsc::channel::<HashSet<Capability>>(1);
self.tx
.send(Command::GetTargetCapabilities(sender))
.send_timeout(
Command::GetTargetCapabilities(sender),
Duration::from_millis(500),
)
.await
.map_err(|e| fdo::Error::Failed(e.to_string()))?;
let Some(capabilities) = receiver.recv().await else {
Expand Down Expand Up @@ -323,7 +335,10 @@ impl CompositeDeviceInterface {
async fn source_device_paths(&self) -> fdo::Result<Vec<String>> {
let (sender, mut receiver) = mpsc::channel::<Vec<String>>(1);
self.tx
.send(Command::GetSourceDevicePaths(sender))
.send_timeout(
Command::GetSourceDevicePaths(sender),
Duration::from_millis(500),
)
.await
.map_err(|e| fdo::Error::Failed(e.to_string()))?;
let Some(paths) = receiver.recv().await else {
Expand All @@ -338,7 +353,10 @@ impl CompositeDeviceInterface {
async fn intercept_mode(&self) -> fdo::Result<u32> {
let (sender, mut receiver) = mpsc::channel::<InterceptMode>(1);
self.tx
.send(Command::GetInterceptMode(sender))
.send_timeout(
Command::GetInterceptMode(sender),
Duration::from_millis(500),
)
.await
.map_err(|e| fdo::Error::Failed(e.to_string()))?;
let Some(mode) = receiver.recv().await else {
Expand All @@ -361,7 +379,7 @@ impl CompositeDeviceInterface {
_ => InterceptMode::None,
};
self.tx
.send(Command::SetInterceptMode(mode))
.send_timeout(Command::SetInterceptMode(mode), Duration::from_millis(500))
.await
.map_err(|err| zbus::Error::Failure(err.to_string()))?;
Ok(())
Expand All @@ -372,7 +390,10 @@ impl CompositeDeviceInterface {
async fn target_devices(&self) -> fdo::Result<Vec<String>> {
let (sender, mut receiver) = mpsc::channel::<Vec<String>>(1);
self.tx
.send(Command::GetTargetDevicePaths(sender))
.send_timeout(
Command::GetTargetDevicePaths(sender),
Duration::from_millis(500),
)
.await
.map_err(|e| fdo::Error::Failed(e.to_string()))?;
let Some(paths) = receiver.recv().await else {
Expand All @@ -387,7 +408,10 @@ impl CompositeDeviceInterface {
async fn dbus_devices(&self) -> fdo::Result<Vec<String>> {
let (sender, mut receiver) = mpsc::channel::<Vec<String>>(1);
self.tx
.send(Command::GetDBusDevicePaths(sender))
.send_timeout(
Command::GetDBusDevicePaths(sender),
Duration::from_millis(500),
)
.await
.map_err(|e| fdo::Error::Failed(e.to_string()))?;
let Some(paths) = receiver.recv().await else {
Expand Down
40 changes: 29 additions & 11 deletions src/dbus/interface/manager.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
use tokio::sync::{broadcast, mpsc};
use std::time::Duration;

use tokio::sync::mpsc;
use zbus::fdo;
use zbus_macros::interface;

Expand All @@ -8,11 +10,11 @@ use crate::{config::CompositeDeviceConfig, input::manager::ManagerCommand};
/// a [Manager]. It works by sending command messages to a channel that the
/// [Manager] is listening on.
pub struct ManagerInterface {
tx: broadcast::Sender<ManagerCommand>,
tx: mpsc::Sender<ManagerCommand>,
}

impl ManagerInterface {
pub fn new(tx: broadcast::Sender<ManagerCommand>) -> ManagerInterface {
pub fn new(tx: mpsc::Sender<ManagerCommand>) -> ManagerInterface {
ManagerInterface { tx }
}
}
Expand All @@ -30,7 +32,11 @@ impl ManagerInterface {
let device = CompositeDeviceConfig::from_yaml_file(config_path)
.map_err(|err| fdo::Error::Failed(err.to_string()))?;
self.tx
.send(ManagerCommand::CreateCompositeDevice { config: device })
.send_timeout(
ManagerCommand::CreateCompositeDevice { config: device },
Duration::from_millis(500),
)
.await
.map_err(|err| fdo::Error::Failed(err.to_string()))?;
Ok("".to_string())
}
Expand All @@ -40,7 +46,11 @@ impl ManagerInterface {
async fn create_target_device(&self, kind: String) -> fdo::Result<String> {
let (sender, mut receiver) = mpsc::channel(1);
self.tx
.send(ManagerCommand::CreateTargetDevice { kind, sender })
.send_timeout(
ManagerCommand::CreateTargetDevice { kind, sender },
Duration::from_millis(500),
)
.await
.map_err(|err| fdo::Error::Failed(err.to_string()))?;

// Read the response from the manager
Expand All @@ -61,7 +71,11 @@ impl ManagerInterface {
/// Stop the given target device
async fn stop_target_device(&self, path: String) -> fdo::Result<()> {
self.tx
.send(ManagerCommand::StopTargetDevice { path })
.send_timeout(
ManagerCommand::StopTargetDevice { path },
Duration::from_millis(500),
)
.await
.map_err(|err| fdo::Error::Failed(err.to_string()))?;
Ok(())
}
Expand All @@ -74,11 +88,15 @@ impl ManagerInterface {
) -> fdo::Result<()> {
let (sender, mut receiver) = mpsc::channel(1);
self.tx
.send(ManagerCommand::AttachTargetDevice {
target_path: target_path.clone(),
composite_path: composite_path.clone(),
sender,
})
.send_timeout(
ManagerCommand::AttachTargetDevice {
target_path: target_path.clone(),
composite_path: composite_path.clone(),
sender,
},
Duration::from_millis(500),
)
.await
.map_err(|err| fdo::Error::Failed(err.to_string()))?;

// Read the response from the manager
Expand Down
13 changes: 12 additions & 1 deletion src/dbus/interface/source/evdev.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,20 @@ impl SourceEventDeviceInterface {
handler: String,
info: procfs::device::Device,
) -> Result<(), Box<dyn Error>> {
log::debug!("Starting to listen on dbus interface for {handler}");
let path = get_dbus_path(handler.clone());
log::debug!("Got dbus path {path}");
let iface = SourceEventDeviceInterface::new(handler.clone(), info);
conn.object_server().at(path, iface).await?;
log::debug!("Created interface for {handler}");
tokio::task::spawn(async move {
log::debug!("Starting dbus interface: {path}");
let result = conn.object_server().at(path.clone(), iface).await;
if let Err(e) = result {
log::debug!("Failed to start dbus interface {path}: {e:?}");
} else {
log::debug!("Started dbus interface: {path}");
}
});
Ok(())
}
}
Expand Down
10 changes: 9 additions & 1 deletion src/dbus/interface/source/hidraw.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,15 @@ impl SourceHIDRawInterface {
pub async fn listen_on_dbus(conn: Connection, info: DeviceInfo) -> Result<(), Box<dyn Error>> {
let path = get_dbus_path(info.path().to_string_lossy().to_string());
let iface = SourceHIDRawInterface::new(info);
conn.object_server().at(path, iface).await?;
tokio::task::spawn(async move {
log::debug!("Starting dbus interface: {path}");
let result = conn.object_server().at(path.clone(), iface).await;
if let Err(e) = result {
log::debug!("Failed to start dbus interface {path}: {e:?}");
} else {
log::debug!("Started dbus interface: {path}");
}
});
Ok(())
}
}
Expand Down
4 changes: 3 additions & 1 deletion src/dbus/interface/target/keyboard.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::time::Duration;

use tokio::sync::mpsc;
use zbus::fdo;
use zbus_macros::interface;
Expand Down Expand Up @@ -41,7 +43,7 @@ impl TargetKeyboardInterface {

// Write the event to the virtual device
self.command_tx
.send(TargetCommand::WriteEvent(event))
.send_timeout(TargetCommand::WriteEvent(event), Duration::from_millis(500))
.await
.map_err(|err| fdo::Error::Failed(err.to_string()))?;

Expand Down
Loading

0 comments on commit 8f569d2

Please sign in to comment.