Skip to content

Commit

Permalink
q-dev: implement part of new API for DeviceInfo
Browse files Browse the repository at this point in the history
  • Loading branch information
piotrbartman committed Jun 1, 2024
1 parent 44d2d6d commit d985cf1
Showing 1 changed file with 242 additions and 29 deletions.
271 changes: 242 additions & 29 deletions qubesusbproxy/core3ext.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,19 @@
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
#
import asyncio
import base64
import collections
import fcntl
import grp
import itertools
import os
import re
import string
import subprocess

import errno
import tempfile
from enum import Enum
from typing import List, Optional, Dict, Tuple

import qubes.devices
import qubes.ext
Expand All @@ -41,33 +44,197 @@
usb_connected_to_re = re.compile(br"^[a-zA-Z][a-zA-Z0-9_.-]*$")
usb_device_hw_ident_re = re.compile(r'^[0-9a-f]{4}:[0-9a-f]{4} ')


class USBDevice(qubes.devices.DeviceInfo):
# pylint: disable=too-few-public-methods
def __init__(self, backend_domain, ident):
super(USBDevice, self).__init__(backend_domain, ident, None)
# super(USBDevice, self).__init__(backend_domain, ident, None)
super(USBDevice, self).__init__(
backend_domain=backend_domain, ident=ident, devclass="usb")

self._qdb_ident = ident.replace('.', '_')
self._qdb_path = '/qubes-usb-devices/' + self._qdb_ident
# lazy loading
self._description = None

@property
def vendor(self) -> str:
"""
Device vendor from local database `/usr/share/hwdata/usb.ids`
Could be empty string or "unknown".
Lazy loaded.
"""
if self._vendor is None:
result = self._load_desc_from_qubesdb()["vendor"]
else:
result = self._vendor
return result

@property
def product(self) -> str:
"""
Device name from local database `/usr/share/hwdata/usb.ids`
Could be empty string or "unknown".
Lazy loaded.
"""
if self._product is None:
result = self._load_desc_from_qubesdb()["product"]
else:
result = self._product
return result

@property
def manufacturer(self) -> str:
"""
The name of the manufacturer of the device introduced by device itself
Could be empty string or "unknown".
Lazy loaded.
"""
if self._manufacturer is None:
result = self._load_desc_from_qubesdb()["manufacturer"]
else:
result = self._manufacturer
return result

@property
def name(self) -> str:
"""
The name of the device it introduced itself with (could be empty string)
Could be empty string or "unknown".
Lazy loaded.
"""
if self._name is None:
result = self._load_desc_from_qubesdb()["name"]
else:
result = self._name
return result

@property
def serial(self) -> str:
"""
The serial number of the device it introduced itself with.
Could be empty string or "unknown".
Lazy loaded.
"""
if self._serial is None:
result = self._load_desc_from_qubesdb()["serial"]
else:
result = self._serial
return result

@property
def interfaces(self) -> List[qubes.devices.DeviceInterface]:
"""
List of device interfaces.
Every device should have at least one interface.
"""
if (len(self._interfaces) == 1
and self._interfaces[0] == qubes.devices.DeviceInterface.Other):
result = self._load_interfaces_from_qubesdb()
else:
result = self._interfaces
return result

@property
def description(self):
if self._description is None:
if not self.backend_domain.is_running():
# don't cache this value
return "Unknown - domain not running"
untrusted_device_desc = self.backend_domain.untrusted_qdb.read(
def parent_device(self) -> Optional[qubes.devices.DeviceInfo]:
"""
The parent device if any.
USB device has no parents.
"""
return None

# @property
# def port_id(self) -> str:
# """
# Which port the device is connected to.
# """
# return self.ident.split("-")[1]

def _load_interfaces_from_qubesdb(self) \
-> List[qubes.devices.DeviceInterface]:
result = []
if not self.backend_domain.is_running():
# don't cache this value
return result
untrusted_interfaces: bytes = (
self.backend_domain.untrusted_qdb.read(
self._qdb_path + '/interfaces')
)
if not untrusted_interfaces:
return result
self._interfaces = result = [
qubes.devices.DeviceInterface.from_str(
self._sanitize(ifc, safe_chars=string.hexdigits)
)
for ifc in untrusted_interfaces.split(b':')
if ifc
]
return result

def _load_desc_from_qubesdb(self) -> Dict[str, str]:
unknown = "unknown"
result = {"vendor": unknown,
"product": unknown,
"manufacturer": unknown,
"name": unknown,
"serial": unknown}
if not self.backend_domain.is_running():
# don't cache this value
return result
untrusted_device_desc: bytes = (
self.backend_domain.untrusted_qdb.read(
self._qdb_path + '/desc')
if not untrusted_device_desc:
return 'Unknown'
self._description = self._sanitize_desc(untrusted_device_desc)
hw_ident_match = usb_device_hw_ident_re.match(self._description)
if hw_ident_match:
self._description = self._description[
len(hw_ident_match.group(0)):]
return self._description
)
if not untrusted_device_desc:
return result
try:
(untrusted_vendor_product, untrusted_manufacturer,
untrusted_name, untrusted_serial
) = untrusted_device_desc.split(b' ')
untrusted_vendor, untrusted_product = (
untrusted_vendor_product.split(b':'))
except ValueError:
# desc doesn't contain correctly formatted data,
# but it is not empty. We cannot parse it,
# but we can still put it to the `serial` just to provide
# some information to the user.
untrusted_vendor, untrusted_product, untrusted_manufacturer = (
unknown.encode(), unknown.encode(), unknown.encode())
untrusted_name = untrusted_device_desc.replace(b' ', b'_')
vendor, product = self._get_vendor_and_product_names(
self._sanitize(untrusted_vendor),
self._sanitize(untrusted_product),
)
self._desc_vendor = result["vendor"] = vendor
self._desc_product = result["product"] = product
self._desc_manufacturer = result["manufacturer"] = (
self._sanitize(untrusted_manufacturer))
self._desc_name = result["name"] = (
self._sanitize(untrusted_name))
return result

@staticmethod
def _sanitize(
untrusted_device_desc: bytes,
safe_chars: str =
string.ascii_letters + string.digits + string.punctuation + ' '
) -> str:
# b'USB\\x202.0\\x20Camera' -> 'USB 2.0 Camera'
untrusted_device_desc = untrusted_device_desc.decode(
'unicode_escape', errors='ignore')
return ''.join(
c if c in set(safe_chars) else '_' for c in untrusted_device_desc
)

@property
def frontend_domain(self):
Expand Down Expand Up @@ -96,15 +263,61 @@ def frontend_domain(self):
return connected_to

@staticmethod
def _sanitize_desc(untrusted_device_desc):
untrusted_device_desc = untrusted_device_desc.decode('ascii',
errors='ignore')
safe_set = set(string.ascii_letters + string.digits +
string.punctuation + ' ')
return ''.join(
c if c in safe_set else '_' for c in untrusted_device_desc
)
def _get_vendor_and_product_names(
vendor_id: str, product_id: str
) -> Tuple[str, str]:
"""
Return tuple of vendor's and product's names for the ids.
If the id is not known return ("unknown", "unknown").
"""
return (USBDevice._load_usb_known_devices()
.get(vendor_id, dict())
.get(product_id, ("unknown", "unknown"))
)

@staticmethod
def _load_usb_known_devices() -> Dict[str, Dict[str, Tuple[str, str]]]:
"""
List of known device vendors, devices and interfaces.
result[vendor_id][device_id] = (vendor_name, product_name)
"""
# Syntax:
# vendor vendor_name <-- 2 spaces between
# device device_name <-- single tab
# interface interface_name <-- two tabs
# ...
# C class class_name
# subclass subclass_name <-- single tab
# prog-if prog-if_name <-- two tabs
result = {}
with open('/usr/share/hwdata/usb.ids',
encoding='utf-8', errors='ignore') as usb_ids:
for line in usb_ids.readlines():
line = line.rstrip()
if line.startswith('#'):
# skip comments
continue
elif not line:
# skip empty lines
continue
elif line.startswith('\t\t'):
# skip interfaces
continue
elif line.startswith('C '):
# description of classes starts here, we can finish
break
elif line.startswith('\t'):
# save vendor, device pair
device_id, _, device_name = line[1:].split(' ', 2)
result[vendor_id][device_id] = vendor_name, device_name
else:
# new vendor
vendor_id, _, vendor_name = line[:].split(' ', 2)
result[vendor_id] = {}

return result

class USBProxyNotInstalled(qubes.exc.QubesException):
pass
Expand Down Expand Up @@ -170,15 +383,15 @@ class USBDeviceExtension(qubes.ext.Extension):

def __init__(self):
super(USBDeviceExtension, self).__init__()
#include dom0 devices in listing only when usb-proxy is really
# include dom0 devices in listing only when usb-proxy is really
# installed there
self.usb_proxy_installed_in_dom0 = os.path.exists(
'/etc/qubes-rpc/qubes.USB')
self.devices_cache = collections.defaultdict(dict)

@qubes.ext.handler('domain-init', 'domain-load')
def on_domain_init_load(self, vm, event):
'''Initialize watching for changes'''
"""Initialize watching for changes"""
# pylint: disable=unused-argument,no-self-use
vm.watch_qdb_path('/qubes-usb-devices')
if event == 'domain-load':
Expand All @@ -200,7 +413,7 @@ async def _attach_and_notify(self, vm, device, options):

@qubes.ext.handler('domain-qdb-change:/qubes-usb-devices')
def on_qdb_change(self, vm, event, path):
'''A change in QubesDB means a change in device list'''
"""A change in QubesDB means a change in device list"""
# pylint: disable=unused-argument,no-self-use
vm.fire_event('device-list-change:usb')
current_devices = dict((dev.ident, dev.frontend_domain)
Expand Down

0 comments on commit d985cf1

Please sign in to comment.