-
Notifications
You must be signed in to change notification settings - Fork 29
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
1f3aa0c
commit ece1fd0
Showing
1 changed file
with
45 additions
and
28 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -5,6 +5,8 @@ | |
# | ||
# Copyright (C) 2016 Marek Marczykowski-Górecki | ||
# <[email protected]> | ||
# Copyright (C) 2024 Piotr Bartman-Szwarc | ||
# <[email protected]> | ||
# | ||
# This program is free software; you can redistribute it and/or modify | ||
# it under the terms of the GNU General Public License as published by | ||
|
@@ -19,21 +21,18 @@ | |
# You should have received a copy of the GNU General Public License along | ||
# with this program; if not, write to the Free Software Foundation, Inc., | ||
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. | ||
# | ||
|
||
import asyncio | ||
import base64 | ||
import collections | ||
import fcntl | ||
import grp | ||
import itertools | ||
import os | ||
import re | ||
import string | ||
import subprocess | ||
import sys | ||
|
||
import tempfile | ||
from enum import Enum | ||
from typing import List, Optional, Dict, Tuple | ||
|
||
import qubes.devices | ||
|
@@ -152,13 +151,6 @@ def parent_device(self) -> Optional[qubes.devices.DeviceInfo]: | |
""" | ||
return None | ||
|
||
# @property | ||
# def port_id(self) -> str: | ||
# """ | ||
# Which port the device is connected to. | ||
# """ | ||
# return self.ident.split("-")[1] | ||
|
||
def _load_interfaces_from_qubesdb(self) \ | ||
-> List[qubes.devices.DeviceInterface]: | ||
result = [qubes.devices.DeviceInterface.unknown()] | ||
|
@@ -256,8 +248,8 @@ def frontend_domain(self): | |
untrusted_connected_to] | ||
except KeyError: | ||
self.backend_domain.log.warning( | ||
'Device {} has invalid VM name in connected-to ' | ||
'property: '.format(self.ident, untrusted_connected_to)) | ||
f'Device {self.ident} has invalid VM name in connected-to ' | ||
f'property: {untrusted_connected_to}') | ||
return None | ||
return connected_to | ||
|
||
|
@@ -327,6 +319,10 @@ class QubesUSBException(qubes.exc.QubesException): | |
pass | ||
|
||
|
||
class UnrecognizedDevice(QubesUSBException): | ||
pass | ||
|
||
|
||
def modify_qrexec_policy(service, line, add): | ||
""" | ||
Add/remove *line* to qrexec policy of a *service*. | ||
|
@@ -407,11 +403,13 @@ def on_domain_init_load(self, vm, event): | |
|
||
async def _attach_and_notify(self, vm, device, options): | ||
# bypass DeviceCollection logic preventing double attach | ||
await self.on_device_attach_usb(vm, | ||
'device-pre-attach:usb', device, options) | ||
await vm.fire_event_async('device-attach:usb', | ||
device=device, | ||
options=options) | ||
try: | ||
await self.on_device_attach_usb( | ||
vm, 'device-pre-attach:usb', device, options) | ||
except UnrecognizedDevice: | ||
return | ||
await vm.fire_event_async( | ||
'device-attach:usb', device=device, options=options) | ||
|
||
@qubes.ext.handler('domain-qdb-change:/qubes-usb-devices') | ||
def on_qdb_change(self, vm, event, path): | ||
|
@@ -469,7 +467,9 @@ def on_qdb_change(self, vm, event, path): | |
for dev_ident, front_vm in attached.items(): | ||
dev = USBDevice(vm, dev_ident) | ||
asyncio.ensure_future(front_vm.fire_event_async( | ||
'device-attach:usb', device=dev, options={})) | ||
'device-attach:usb', device=dev, | ||
options={'identity': dev.full_identity}) | ||
) | ||
|
||
self.devices_cache[vm.name] = current_devices | ||
|
||
|
@@ -536,29 +536,46 @@ def on_device_list_attached(self, vm, event, **kwargs): | |
|
||
for dev in self.get_all_devices(vm.app): | ||
if dev.frontend_domain == vm: | ||
yield (dev, {}) | ||
yield (dev, {'identity': dev.full_identity}) | ||
|
||
@qubes.ext.handler('device-pre-attach:usb') | ||
async def on_device_attach_usb(self, vm, event, device, options): | ||
# pylint: disable=unused-argument | ||
|
||
if options: | ||
if list(options.keys()) != ['identity']: | ||
raise qubes.exc.QubesException( | ||
'USB device attach do not support user options') | ||
identity = options['identity'] | ||
if device.full_identity != identity: | ||
print(f"Unrecognized identity, skipping attachment of {device}", | ||
file=sys.stderr) | ||
raise UnrecognizedDevice( | ||
"Device presented identity " | ||
f"{device.full_identity} " | ||
f"does not match expected {identity}" | ||
) | ||
|
||
if not vm.is_running() or vm.qid == 0: | ||
# print(f"Qube is not running, skipping attachment of {device}", | ||
# file=sys.stderr) | ||
return | ||
|
||
if not isinstance(device, USBDevice): | ||
# print("The device is not recognized as usb device, " | ||
# f"skipping attachment of {device}", | ||
# file=sys.stderr) | ||
return | ||
|
||
if options: | ||
raise qubes.exc.QubesException( | ||
'USB device attach do not support options') | ||
|
||
if device.frontend_domain: | ||
raise qubes.devices.DeviceAlreadyAttached( | ||
'Device {!s} already attached to {!s}'.format(device, | ||
device.frontend_domain) | ||
'Device {!s} already attached to {!s}'.format( | ||
device, device.frontend_domain) | ||
) | ||
|
||
stubdom_qrexec = (vm.virt_mode == 'hvm' and \ | ||
vm.features.check_with_template('stubdom-qrexec', False)) | ||
stubdom_qrexec = ( | ||
vm.virt_mode == 'hvm' | ||
and vm.features.check_with_template('stubdom-qrexec', False)) | ||
|
||
name = vm.name + '-dm' if stubdom_qrexec else vm.name | ||
|
||
|