Skip to content

Commit

Permalink
fix(CompositeDevice): use recv_many to batch process the queue
Browse files Browse the repository at this point in the history
  • Loading branch information
ShadowApex committed May 10, 2024
1 parent ad4cf04 commit 04be9b7
Showing 1 changed file with 136 additions and 131 deletions.
267 changes: 136 additions & 131 deletions src/input/composite_device/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -281,161 +281,166 @@ impl CompositeDevice {

// Loop and listen for command events
log::debug!("CompositeDevice started");
loop {
let Some(cmd) = self.rx.recv().await else {
log::error!("Error while receiving command. Channel closed.");
let mut buffer = Vec::with_capacity(BUFFER_SIZE);
'main: loop {
let num = self.rx.recv_many(&mut buffer, BUFFER_SIZE).await;
if num == 0 {
log::warn!("Unable to receive more commands. Channel closed.");
break;
};
log::trace!("Received command: {:?}", cmd);
match cmd {
Command::ProcessEvent(device_id, event) => {
if let Err(e) = self.process_event(device_id, event).await {
log::error!("Failed to process event: {:?}", e);
// TODO: Use proper errors to check for 'SendError' and
// stop the composite device
break;
}
log::trace!("Received {num} command(s)");
for cmd in buffer.drain(..) {
log::trace!("Received command: {:?}", cmd);
match cmd {
Command::ProcessEvent(device_id, event) => {
if let Err(e) = self.process_event(device_id, event).await {
log::error!("Failed to process event: {:?}", e);
// TODO: Use proper errors to check for 'SendError' and
// stop the composite device
break 'main;
}
}
}
Command::ProcessOutputEvent(event) => {
if let Err(e) = self.process_output_event(event).await {
log::error!("Failed to process output event: {:?}", e);
Command::ProcessOutputEvent(event) => {
if let Err(e) = self.process_output_event(event).await {
log::error!("Failed to process output event: {:?}", e);
}
}
}
Command::GetCapabilities(sender) => {
if let Err(e) = sender.send(self.capabilities.clone()).await {
log::error!("Failed to send capabilities: {:?}", e);
Command::GetCapabilities(sender) => {
if let Err(e) = sender.send(self.capabilities.clone()).await {
log::error!("Failed to send capabilities: {:?}", e);
}
}
}
Command::GetTargetCapabilities(sender) => {
let target_caps = match self.get_target_capabilities().await {
Ok(caps) => caps,
Err(e) => {
log::error!("Failed to get target capabilities: {e:?}");
continue;
Command::GetTargetCapabilities(sender) => {
let target_caps = match self.get_target_capabilities().await {
Ok(caps) => caps,
Err(e) => {
log::error!("Failed to get target capabilities: {e:?}");
continue;
}
};
if let Err(e) = sender.send(target_caps).await {
log::error!("Failed to send target capabilities: {:?}", e);
}
};
if let Err(e) = sender.send(target_caps).await {
log::error!("Failed to send target capabilities: {:?}", e);
}
}
Command::SetInterceptMode(mode) => self.set_intercept_mode(mode),
Command::GetInterceptMode(sender) => {
if let Err(e) = sender.send(self.intercept_mode.clone()).await {
log::error!("Failed to send intercept mode: {:?}", e);
Command::SetInterceptMode(mode) => self.set_intercept_mode(mode),
Command::GetInterceptMode(sender) => {
if let Err(e) = sender.send(self.intercept_mode.clone()).await {
log::error!("Failed to send intercept mode: {:?}", e);
}
}
}
Command::GetSourceDevicePaths(sender) => {
if let Err(e) = sender.send(self.get_source_device_paths()).await {
log::error!("Failed to send source device paths: {:?}", e);
Command::GetSourceDevicePaths(sender) => {
if let Err(e) = sender.send(self.get_source_device_paths()).await {
log::error!("Failed to send source device paths: {:?}", e);
}
}
}
Command::GetTargetDevicePaths(sender) => {
let paths = self.target_devices.keys().cloned().collect();
if let Err(e) = sender.send(paths).await {
log::error!("Failed to send target device paths: {:?}", e);
Command::GetTargetDevicePaths(sender) => {
let paths = self.target_devices.keys().cloned().collect();
if let Err(e) = sender.send(paths).await {
log::error!("Failed to send target device paths: {:?}", e);
}
}
}
Command::GetDBusDevicePaths(sender) => {
let paths = self.target_dbus_devices.keys().cloned().collect();
if let Err(e) = sender.send(paths).await {
log::error!("Failed to send dbus device paths: {:?}", e);
Command::GetDBusDevicePaths(sender) => {
let paths = self.target_dbus_devices.keys().cloned().collect();
if let Err(e) = sender.send(paths).await {
log::error!("Failed to send dbus device paths: {:?}", e);
}
}
}
Command::SourceDeviceAdded(device_info) => {
if let Err(e) = self.on_source_device_added(device_info).await {
log::error!("Failed to add source device: {:?}", e);
Command::SourceDeviceAdded(device_info) => {
if let Err(e) = self.on_source_device_added(device_info).await {
log::error!("Failed to add source device: {:?}", e);
}
}
}
Command::SourceDeviceStopped(device_id) => {
log::debug!("Detected source device stopped: {}", device_id);
if let Err(e) = self.on_source_device_removed(device_id).await {
log::error!("Failed to remove source device: {:?}", e);
Command::SourceDeviceStopped(device_id) => {
log::debug!("Detected source device stopped: {}", device_id);
if let Err(e) = self.on_source_device_removed(device_id).await {
log::error!("Failed to remove source device: {:?}", e);
}
if self.source_devices_used.is_empty() {
log::debug!(
"No source devices remain. Stopping CompositeDevice {:?}",
self.dbus_path
);
break 'main;
}
}
if self.source_devices_used.is_empty() {
log::debug!(
"No source devices remain. Stopping CompositeDevice {:?}",
self.dbus_path
);
break;
Command::SourceDeviceRemoved(device_id) => {
log::debug!("Detected source device removed: {}", device_id);
if let Err(e) = self.on_source_device_removed(device_id).await {
log::error!("Failed to remove source device: {:?}", e);
}
if self.source_devices_used.is_empty() {
log::debug!(
"No source devices remain. Stopping CompositeDevice {:?}",
self.dbus_path
);
break 'main;
}
}
}
Command::SourceDeviceRemoved(device_id) => {
log::debug!("Detected source device removed: {}", device_id);
if let Err(e) = self.on_source_device_removed(device_id).await {
log::error!("Failed to remove source device: {:?}", e);
Command::SetTargetDevices(target_types) => {
if let Err(e) = self.set_target_devices(target_types).await {
log::error!("Failed to set target devices: {e:?}");
}
}
if self.source_devices_used.is_empty() {
log::debug!(
"No source devices remain. Stopping CompositeDevice {:?}",
self.dbus_path
);
break;
Command::AttachTargetDevices(targets) => {
if let Err(e) = self.attach_target_devices(targets).await {
log::error!("Failed to attach target devices: {e:?}");
}
}
}
Command::SetTargetDevices(target_types) => {
if let Err(e) = self.set_target_devices(target_types).await {
log::error!("Failed to set target devices: {e:?}");
Command::GetName(sender) => {
let name = self.name.clone();
if let Err(e) = sender.send(name).await {
log::error!("Failed to send device name: {:?}", e);
}
}
}
Command::AttachTargetDevices(targets) => {
if let Err(e) = self.attach_target_devices(targets).await {
log::error!("Failed to attach target devices: {e:?}");
Command::GetProfileName(sender) => {
let profile_name = self.device_profile.clone().unwrap_or_default();
if let Err(e) = sender.send(profile_name).await {
log::error!("Failed to send profile name: {:?}", e);
}
}
}
Command::GetName(sender) => {
let name = self.name.clone();
if let Err(e) = sender.send(name).await {
log::error!("Failed to send device name: {:?}", e);
Command::LoadProfilePath(path, sender) => {
log::info!("Loading profile from path: {path}");
let result = match self.load_device_profile_from_path(path.clone()) {
Ok(_) => Ok(()),
Err(e) => Err(e.to_string()),
};
if let Err(e) = sender.send(result).await {
log::error!("Failed to send load profile result: {:?}", e);
}
}
}
Command::GetProfileName(sender) => {
let profile_name = self.device_profile.clone().unwrap_or_default();
if let Err(e) = sender.send(profile_name).await {
log::error!("Failed to send profile name: {:?}", e);
Command::WriteEvent(event) => {
if let Err(e) = self.write_event(event).await {
log::error!("Failed to write event: {:?}", e);
}
}
}
Command::LoadProfilePath(path, sender) => {
log::info!("Loading profile from path: {path}");
let result = match self.load_device_profile_from_path(path.clone()) {
Ok(_) => Ok(()),
Err(e) => Err(e.to_string()),
};
if let Err(e) = sender.send(result).await {
log::error!("Failed to send load profile result: {:?}", e);
Command::WriteChordEvent(events) => {
if let Err(e) = self.write_chord_events(events).await {
log::error!("Failed to write event: {:?}", e);
}
}
}
Command::WriteEvent(event) => {
if let Err(e) = self.write_event(event).await {
log::error!("Failed to write event: {:?}", e);
Command::WriteSendEvent(event) => {
if let Err(e) = self.write_send_event(event).await {
log::error!("Failed to write event: {:?}", e);
}
}
}
Command::WriteChordEvent(events) => {
if let Err(e) = self.write_chord_events(events).await {
log::error!("Failed to write event: {:?}", e);
Command::HandleEvent(event) => {
if let Err(e) = self.handle_event(event).await {
log::error!("Failed to write event: {:?}", e);
}
}
}
Command::WriteSendEvent(event) => {
if let Err(e) = self.write_send_event(event).await {
log::error!("Failed to write event: {:?}", e);
Command::RemoveRecentEvent(cap) => {
self.translated_recent_events.remove(&cap);
}
}
Command::HandleEvent(event) => {
if let Err(e) = self.handle_event(event).await {
log::error!("Failed to write event: {:?}", e);
Command::SetInterceptActivation(activation_caps, target_cap) => {
self.set_intercept_activation(activation_caps, target_cap)
}
Command::Stop => {
log::debug!(
"Got STOP signal. Stopping CompositeDevice: {:?}",
self.dbus_path
);
break 'main;
}
}
Command::RemoveRecentEvent(cap) => {
self.translated_recent_events.remove(&cap);
}
Command::SetInterceptActivation(activation_caps, target_cap) => {
self.set_intercept_activation(activation_caps, target_cap)
}
Command::Stop => {
log::debug!(
"Got STOP signal. Stopping CompositeDevice: {:?}",
self.dbus_path
);
break;
}
}
}
Expand Down

0 comments on commit 04be9b7

Please sign in to comment.